Merge "Messages not distributed across Kafka partitions"

This commit is contained in:
Jenkins 2014-12-12 18:25:28 +00:00 committed by Gerrit Code Review
commit c3e7243c01
8 changed files with 17 additions and 31 deletions

View File

@ -31,9 +31,6 @@ import javax.validation.constraints.NotNull;
* Thresholding configuration.
*/
public class ThresholdingConfiguration {
public static final String ALERTS_EXCHANGE = "thresh.external.alerts";
public static final String ALERTS_ROUTING_KEY = "thresh.external.alert";
/** Total number of workers processes across the cluster. */
@NotNull public Integer numWorkerProcesses = 12;
/** Total number of acker threads across the cluster. */
@ -63,14 +60,12 @@ public class ThresholdingConfiguration {
/** Namespaces for which metrics are received sporadically. */
@NotNull public Set<String> sporadicMetricNamespaces;
/** Configuration for the spout that receives metrics from the external exchange. */
/** Configuration for the spout that receives metrics from Kafka. */
@Valid @NotNull public MetricSpoutConfig metricSpoutConfig;
/** Configuration for the spout that receives events from the external exchange. */
/** Configuration for the spout that receives events from Kafka. */
@Valid @NotNull public EventSpoutConfig eventSpoutConfig;
/** Configuration for publishing to the alerts exchange on the external server. */
@NotEmpty public String alertsExchange = "alerts";
@NotEmpty public String alertsRoutingKey = "alert";
/** Configuration for publishing to the Kafka. */
@Valid @NotNull public KafkaProducerConfiguration kafkaProducerConfig = new KafkaProducerConfiguration();
/** Database configuration. */

View File

@ -69,8 +69,6 @@ public class TopologyModule extends AbstractModule {
stormConfig = new Config();
stormConfig.setNumWorkers(config.numWorkerProcesses);
stormConfig.setNumAckers(config.numAckerThreads);
stormConfig.put(ThresholdingConfiguration.ALERTS_EXCHANGE, config.alertsExchange);
stormConfig.put(ThresholdingConfiguration.ALERTS_ROUTING_KEY, config.alertsRoutingKey);
}
return stormConfig;

View File

@ -19,7 +19,7 @@ package monasca.thresh.infrastructure.thresholding;
public interface AlarmEventForwarder {
void send(String alertExchange, String alertRoutingKey, String json);
void send(String json);
void close();
}

View File

@ -71,8 +71,6 @@ public class AlarmThresholdingBolt extends BaseRichBolt {
private KafkaProducerConfiguration producerConfiguration;
final Map<String, Alarm> alarms = new HashMap<String, Alarm>();
final Map<String, AlarmDefinition> alarmDefinitions = new HashMap<>();
private String alertExchange;
private String alertRoutingKey;
private transient AlarmDAO alarmDAO;
private transient AlarmDefinitionDAO alarmDefinitionDAO;
private transient AlarmEventForwarder alarmEventForwarder;
@ -173,8 +171,6 @@ public class AlarmThresholdingBolt extends BaseRichBolt {
logger = LoggerFactory.getLogger(Logging.categoryFor(getClass(), context));
logger.info("Preparing");
this.collector = collector;
alertExchange = (String) config.get(ThresholdingConfiguration.ALERTS_EXCHANGE);
alertRoutingKey = (String) config.get(ThresholdingConfiguration.ALERTS_ROUTING_KEY);
if (alarmDAO == null) {
Injector.registerIfNotBound(AlarmDAO.class, new PersistenceModule(dbConfig));
@ -229,7 +225,7 @@ public class AlarmThresholdingBolt extends BaseRichBolt {
alarmDefinition.getSeverity(), alarmDefinition.isActionsEnabled(), stateChangeReason,
getTimestamp());
try {
alarmEventForwarder.send(alertExchange, alertRoutingKey, Serialization.toJson(event));
alarmEventForwarder.send(Serialization.toJson(event));
} catch (Exception ignore) {
logger.debug("Failure sending alarm", ignore);
}

View File

@ -37,6 +37,8 @@ public class KafkaAlarmEventForwarder implements AlarmEventForwarder {
private final String topic;
private long messageCount = 0;
public KafkaAlarmEventForwarder(KafkaProducerConfiguration kafkaConfig) {
this.topic = kafkaConfig.getTopic();
Properties kafkaProperties = KafkaProducerProperties.createKafkaProperties(kafkaConfig);
@ -45,11 +47,11 @@ public class KafkaAlarmEventForwarder implements AlarmEventForwarder {
}
@Override
public void send(String alertExchange, String alertRoutingKey, String json) {
logger.debug("sending alertExchange: {}, alertRoutingKey: {}, json: {}", alertExchange,
alertRoutingKey, json);
public void send(String json) {
logger.debug("sending topic: {}, json: {}", topic, json);
final String routingKey = String.valueOf(messageCount++);
final KeyedMessage<String, String> message =
new KeyedMessage<String, String>(topic, alertRoutingKey, json);
new KeyedMessage<String, String>(topic, routingKey, json);
producer.send(message);
}

View File

@ -169,7 +169,7 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase {
doAnswer(new Answer<Object>() {
public Object answer(InvocationOnMock invocation) {
final Object[] args = invocation.getArguments();
AlarmStateTransitionedEvent event = Serialization.fromJson((String) args[2]);
AlarmStateTransitionedEvent event = Serialization.fromJson((String) args[0]);
System.out.printf("Alarm transitioned from %s to %s%n", event.oldState, event.newState);
assertEquals(event.alarmName, expectedAlarmName);
assertEquals(event.alarmDefinitionId, expectedAlarmDefinitionId);
@ -183,7 +183,7 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase {
alarmsSent++;
return null;
}
}).when(alarmEventForwarder).send(anyString(), anyString(), anyString());
}).when(alarmEventForwarder).send(anyString());
final AlarmExpression initialExpression = new AlarmExpression(
"max(hpcs.compute.cpu{id=5}) >= 556 or max(hpcs.compute.mem{id=5}) >= 557");

View File

@ -202,7 +202,7 @@ public class ThresholdingEngineTest extends TopologyTestCase {
doAnswer(new Answer<Object>() {
public Object answer(InvocationOnMock invocation) {
final Object[] args = invocation.getArguments();
AlarmStateTransitionedEvent event = Serialization.fromJson((String) args[2]);
AlarmStateTransitionedEvent event = Serialization.fromJson((String) args[0]);
alarmsSent++;
System.out.printf("Alarm transitioned from %s to %s%n", event.oldState, event.newState);
assertEquals(event.alarmDefinitionId, alarmDefinition.getId());
@ -236,7 +236,7 @@ public class ThresholdingEngineTest extends TopologyTestCase {
previousState = event.newState;
return null;
}
}).when(alarmEventForwarder).send(anyString(), anyString(), anyString());
}).when(alarmEventForwarder).send(anyString());
doAnswer(new Answer<Object>() {
public Object answer(InvocationOnMock invocation) {

View File

@ -24,7 +24,6 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import monasca.thresh.ThresholdingConfiguration;
import monasca.common.model.event.AlarmDefinitionUpdatedEvent;
import monasca.common.model.event.AlarmUpdatedEvent;
import monasca.common.model.alarm.AggregateFunction;
@ -57,8 +56,6 @@ import java.util.Map;
@Test
public class AlarmThresholdingBoltTest {
private static final String ALERT_ROUTING_KEY = "Alert Routing Key";
private static final String ALERTS_EXCHANGE = "Alerts";
private static final String tenantId = "AAAAABBBBBBCCCCC";
private AlarmExpression alarmExpression;
@ -98,8 +95,6 @@ public class AlarmThresholdingBoltTest {
bolt = new MockAlarmThreshholdBolt(alarmDAO, alarmDefinitionDAO, alarmEventForwarder);
collector = mock(OutputCollector.class);
final Map<String, String> config = new HashMap<>();
config.put(ThresholdingConfiguration.ALERTS_EXCHANGE, ALERTS_EXCHANGE);
config.put(ThresholdingConfiguration.ALERTS_ROUTING_KEY, ALERT_ROUTING_KEY);
final TopologyContext context = mock(TopologyContext.class);
bolt.prepare(config, context, collector);
}
@ -129,7 +124,7 @@ public class AlarmThresholdingBoltTest {
+ "\"stateChangeReason\":\"Thresholds were exceeded for the sub-alarms: ["
+ subAlarm.getExpression().getExpression() + "]\"," + "\"severity\":\"LOW\",\"timestamp\":1395587091}}";
verify(alarmEventForwarder, times(1)).send(ALERTS_EXCHANGE, ALERT_ROUTING_KEY, alarmJson);
verify(alarmEventForwarder, times(1)).send(alarmJson);
verify(alarmDAO, times(1)).updateState(alarmId, AlarmState.ALARM);
// Now clear the alarm and ensure another notification gets sent out
@ -147,7 +142,7 @@ public class AlarmThresholdingBoltTest {
+ "\"alarmDescription\":\"Description of Alarm\",\"oldState\":\"ALARM\",\"newState\":\"OK\","
+ "\"actionsEnabled\":true,"
+ "\"stateChangeReason\":\"The alarm threshold(s) have not been exceeded\",\"severity\":\"LOW\",\"timestamp\":1395587091}}";
verify(alarmEventForwarder, times(1)).send(ALERTS_EXCHANGE, ALERT_ROUTING_KEY, okJson);
verify(alarmEventForwarder, times(1)).send(okJson);
verify(alarmDAO, times(1)).updateState(alarmId, AlarmState.OK);
}