diff --git a/thresh/src/main/java/monasca/thresh/ThresholdingConfiguration.java b/thresh/src/main/java/monasca/thresh/ThresholdingConfiguration.java index 43ebd16..c8bbd11 100644 --- a/thresh/src/main/java/monasca/thresh/ThresholdingConfiguration.java +++ b/thresh/src/main/java/monasca/thresh/ThresholdingConfiguration.java @@ -31,9 +31,6 @@ import javax.validation.constraints.NotNull; * Thresholding configuration. */ public class ThresholdingConfiguration { - public static final String ALERTS_EXCHANGE = "thresh.external.alerts"; - public static final String ALERTS_ROUTING_KEY = "thresh.external.alert"; - /** Total number of workers processes across the cluster. */ @NotNull public Integer numWorkerProcesses = 12; /** Total number of acker threads across the cluster. */ @@ -63,14 +60,12 @@ public class ThresholdingConfiguration { /** Namespaces for which metrics are received sporadically. */ @NotNull public Set sporadicMetricNamespaces; - /** Configuration for the spout that receives metrics from the external exchange. */ + /** Configuration for the spout that receives metrics from Kafka. */ @Valid @NotNull public MetricSpoutConfig metricSpoutConfig; - /** Configuration for the spout that receives events from the external exchange. */ + /** Configuration for the spout that receives events from Kafka. */ @Valid @NotNull public EventSpoutConfig eventSpoutConfig; - /** Configuration for publishing to the alerts exchange on the external server. */ - @NotEmpty public String alertsExchange = "alerts"; - @NotEmpty public String alertsRoutingKey = "alert"; + /** Configuration for publishing to the Kafka. */ @Valid @NotNull public KafkaProducerConfiguration kafkaProducerConfig = new KafkaProducerConfiguration(); /** Database configuration. */ diff --git a/thresh/src/main/java/monasca/thresh/TopologyModule.java b/thresh/src/main/java/monasca/thresh/TopologyModule.java index 4adf308..004e2cb 100644 --- a/thresh/src/main/java/monasca/thresh/TopologyModule.java +++ b/thresh/src/main/java/monasca/thresh/TopologyModule.java @@ -69,8 +69,6 @@ public class TopologyModule extends AbstractModule { stormConfig = new Config(); stormConfig.setNumWorkers(config.numWorkerProcesses); stormConfig.setNumAckers(config.numAckerThreads); - stormConfig.put(ThresholdingConfiguration.ALERTS_EXCHANGE, config.alertsExchange); - stormConfig.put(ThresholdingConfiguration.ALERTS_ROUTING_KEY, config.alertsRoutingKey); } return stormConfig; diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmEventForwarder.java b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmEventForwarder.java index bf01407..e10203e 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmEventForwarder.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmEventForwarder.java @@ -19,7 +19,7 @@ package monasca.thresh.infrastructure.thresholding; public interface AlarmEventForwarder { - void send(String alertExchange, String alertRoutingKey, String json); + void send(String json); void close(); } diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmThresholdingBolt.java b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmThresholdingBolt.java index bf6013f..9eb0dd4 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmThresholdingBolt.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmThresholdingBolt.java @@ -71,8 +71,6 @@ public class AlarmThresholdingBolt extends BaseRichBolt { private KafkaProducerConfiguration producerConfiguration; final Map alarms = new HashMap(); final Map alarmDefinitions = new HashMap<>(); - private String alertExchange; - private String alertRoutingKey; private transient AlarmDAO alarmDAO; private transient AlarmDefinitionDAO alarmDefinitionDAO; private transient AlarmEventForwarder alarmEventForwarder; @@ -173,8 +171,6 @@ public class AlarmThresholdingBolt extends BaseRichBolt { logger = LoggerFactory.getLogger(Logging.categoryFor(getClass(), context)); logger.info("Preparing"); this.collector = collector; - alertExchange = (String) config.get(ThresholdingConfiguration.ALERTS_EXCHANGE); - alertRoutingKey = (String) config.get(ThresholdingConfiguration.ALERTS_ROUTING_KEY); if (alarmDAO == null) { Injector.registerIfNotBound(AlarmDAO.class, new PersistenceModule(dbConfig)); @@ -229,7 +225,7 @@ public class AlarmThresholdingBolt extends BaseRichBolt { alarmDefinition.getSeverity(), alarmDefinition.isActionsEnabled(), stateChangeReason, getTimestamp()); try { - alarmEventForwarder.send(alertExchange, alertRoutingKey, Serialization.toJson(event)); + alarmEventForwarder.send(Serialization.toJson(event)); } catch (Exception ignore) { logger.debug("Failure sending alarm", ignore); } diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/KafkaAlarmEventForwarder.java b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/KafkaAlarmEventForwarder.java index 2f811e3..7e69e8b 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/KafkaAlarmEventForwarder.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/KafkaAlarmEventForwarder.java @@ -37,6 +37,8 @@ public class KafkaAlarmEventForwarder implements AlarmEventForwarder { private final String topic; + private long messageCount = 0; + public KafkaAlarmEventForwarder(KafkaProducerConfiguration kafkaConfig) { this.topic = kafkaConfig.getTopic(); Properties kafkaProperties = KafkaProducerProperties.createKafkaProperties(kafkaConfig); @@ -45,11 +47,11 @@ public class KafkaAlarmEventForwarder implements AlarmEventForwarder { } @Override - public void send(String alertExchange, String alertRoutingKey, String json) { - logger.debug("sending alertExchange: {}, alertRoutingKey: {}, json: {}", alertExchange, - alertRoutingKey, json); + public void send(String json) { + logger.debug("sending topic: {}, json: {}", topic, json); + final String routingKey = String.valueOf(messageCount++); final KeyedMessage message = - new KeyedMessage(topic, alertRoutingKey, json); + new KeyedMessage(topic, routingKey, json); producer.send(message); } diff --git a/thresh/src/test/java/monasca/thresh/ThresholdingEngineAlarmTest.java b/thresh/src/test/java/monasca/thresh/ThresholdingEngineAlarmTest.java index f02ed37..4b4622e 100644 --- a/thresh/src/test/java/monasca/thresh/ThresholdingEngineAlarmTest.java +++ b/thresh/src/test/java/monasca/thresh/ThresholdingEngineAlarmTest.java @@ -169,7 +169,7 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase { doAnswer(new Answer() { public Object answer(InvocationOnMock invocation) { final Object[] args = invocation.getArguments(); - AlarmStateTransitionedEvent event = Serialization.fromJson((String) args[2]); + AlarmStateTransitionedEvent event = Serialization.fromJson((String) args[0]); System.out.printf("Alarm transitioned from %s to %s%n", event.oldState, event.newState); assertEquals(event.alarmName, expectedAlarmName); assertEquals(event.alarmDefinitionId, expectedAlarmDefinitionId); @@ -183,7 +183,7 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase { alarmsSent++; return null; } - }).when(alarmEventForwarder).send(anyString(), anyString(), anyString()); + }).when(alarmEventForwarder).send(anyString()); final AlarmExpression initialExpression = new AlarmExpression( "max(hpcs.compute.cpu{id=5}) >= 556 or max(hpcs.compute.mem{id=5}) >= 557"); diff --git a/thresh/src/test/java/monasca/thresh/ThresholdingEngineTest.java b/thresh/src/test/java/monasca/thresh/ThresholdingEngineTest.java index dd9a6e0..c13c675 100644 --- a/thresh/src/test/java/monasca/thresh/ThresholdingEngineTest.java +++ b/thresh/src/test/java/monasca/thresh/ThresholdingEngineTest.java @@ -202,7 +202,7 @@ public class ThresholdingEngineTest extends TopologyTestCase { doAnswer(new Answer() { public Object answer(InvocationOnMock invocation) { final Object[] args = invocation.getArguments(); - AlarmStateTransitionedEvent event = Serialization.fromJson((String) args[2]); + AlarmStateTransitionedEvent event = Serialization.fromJson((String) args[0]); alarmsSent++; System.out.printf("Alarm transitioned from %s to %s%n", event.oldState, event.newState); assertEquals(event.alarmDefinitionId, alarmDefinition.getId()); @@ -236,7 +236,7 @@ public class ThresholdingEngineTest extends TopologyTestCase { previousState = event.newState; return null; } - }).when(alarmEventForwarder).send(anyString(), anyString(), anyString()); + }).when(alarmEventForwarder).send(anyString()); doAnswer(new Answer() { public Object answer(InvocationOnMock invocation) { diff --git a/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/AlarmThresholdingBoltTest.java b/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/AlarmThresholdingBoltTest.java index 0c61284..9791dfe 100644 --- a/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/AlarmThresholdingBoltTest.java +++ b/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/AlarmThresholdingBoltTest.java @@ -24,7 +24,6 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; -import monasca.thresh.ThresholdingConfiguration; import monasca.common.model.event.AlarmDefinitionUpdatedEvent; import monasca.common.model.event.AlarmUpdatedEvent; import monasca.common.model.alarm.AggregateFunction; @@ -57,8 +56,6 @@ import java.util.Map; @Test public class AlarmThresholdingBoltTest { - private static final String ALERT_ROUTING_KEY = "Alert Routing Key"; - private static final String ALERTS_EXCHANGE = "Alerts"; private static final String tenantId = "AAAAABBBBBBCCCCC"; private AlarmExpression alarmExpression; @@ -98,8 +95,6 @@ public class AlarmThresholdingBoltTest { bolt = new MockAlarmThreshholdBolt(alarmDAO, alarmDefinitionDAO, alarmEventForwarder); collector = mock(OutputCollector.class); final Map config = new HashMap<>(); - config.put(ThresholdingConfiguration.ALERTS_EXCHANGE, ALERTS_EXCHANGE); - config.put(ThresholdingConfiguration.ALERTS_ROUTING_KEY, ALERT_ROUTING_KEY); final TopologyContext context = mock(TopologyContext.class); bolt.prepare(config, context, collector); } @@ -129,7 +124,7 @@ public class AlarmThresholdingBoltTest { + "\"stateChangeReason\":\"Thresholds were exceeded for the sub-alarms: [" + subAlarm.getExpression().getExpression() + "]\"," + "\"severity\":\"LOW\",\"timestamp\":1395587091}}"; - verify(alarmEventForwarder, times(1)).send(ALERTS_EXCHANGE, ALERT_ROUTING_KEY, alarmJson); + verify(alarmEventForwarder, times(1)).send(alarmJson); verify(alarmDAO, times(1)).updateState(alarmId, AlarmState.ALARM); // Now clear the alarm and ensure another notification gets sent out @@ -147,7 +142,7 @@ public class AlarmThresholdingBoltTest { + "\"alarmDescription\":\"Description of Alarm\",\"oldState\":\"ALARM\",\"newState\":\"OK\"," + "\"actionsEnabled\":true," + "\"stateChangeReason\":\"The alarm threshold(s) have not been exceeded\",\"severity\":\"LOW\",\"timestamp\":1395587091}}"; - verify(alarmEventForwarder, times(1)).send(ALERTS_EXCHANGE, ALERT_ROUTING_KEY, okJson); + verify(alarmEventForwarder, times(1)).send(okJson); verify(alarmDAO, times(1)).updateState(alarmId, AlarmState.OK); }