Ensure that metrics and alarms are flushed on shutdown and that the Kafka

connection is shutdown properly also. This ensure that on normal shutdown,
no metrics or alarms are missed or duplicated.

Some changes were also done to remove warnings for generics.

Some unused instance properties were removed.
This commit is contained in:
Craig Bryant 2014-07-13 16:13:32 -06:00
parent 6fc4f6ebd9
commit a2cf06e2d3
13 changed files with 174 additions and 58 deletions

View File

@ -16,20 +16,23 @@
*/
package com.hpcloud.mon.persister.consumer;
import com.hpcloud.mon.persister.disruptor.ManagedDisruptor;
import com.google.inject.Inject;
import com.lmax.disruptor.dsl.Disruptor;
import io.dropwizard.lifecycle.Managed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Consumer implements Managed {
import io.dropwizard.lifecycle.Managed;
public class Consumer<T> implements Managed {
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
private final KafkaConsumer consumer;
private final Disruptor disruptor;
private final ManagedDisruptor<T> disruptor;
@Inject
public Consumer(KafkaConsumer kafkaConsumer, Disruptor disruptor) {
public Consumer(KafkaConsumer kafkaConsumer, ManagedDisruptor<T> disruptor) {
this.consumer = kafkaConsumer;
this.disruptor = disruptor;
}

View File

@ -17,20 +17,26 @@
package com.hpcloud.mon.persister.consumer;
import com.google.inject.Inject;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import kafka.consumer.KafkaStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public abstract class KafkaConsumer {
private static final String KAFKA_CONFIGURATION = "Kafka configuration:";
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
private static final int WAIT_TIME = 10;
protected final MonPersisterConfiguration configuration;
private final Integer numThreads;
@ -38,7 +44,7 @@ public abstract class KafkaConsumer {
@Inject
private KafkaStreams kafkaStreams;
protected abstract Runnable createRunnable(KafkaStream stream, int threadNumber);
protected abstract Runnable createRunnable(KafkaStream<byte[], byte[]> stream, int threadNumber);
protected abstract String getStreamName();
@Inject
@ -55,15 +61,23 @@ public abstract class KafkaConsumer {
executorService = Executors.newFixedThreadPool(numThreads);
int threadNumber = 0;
for (final KafkaStream stream : streams) {
for (final KafkaStream<byte[], byte[]> stream : streams) {
executorService.submit(createRunnable(stream, threadNumber));
threadNumber++;
}
}
public void stop() {
kafkaStreams.stop();
if (executorService != null) {
executorService.shutdown();
try {
if (!executorService.awaitTermination(WAIT_TIME, TimeUnit.SECONDS)) {
logger.warn("Did not shut down in %d seconds", WAIT_TIME);
}
} catch (InterruptedException e) {
logger.info("awaitTerminiation interrupted", e);
}
}
}
}

View File

@ -89,4 +89,8 @@ public class KafkaStreams {
return properties;
}
public void stop() {
consumerConnector.shutdown();
}
}

View File

@ -16,13 +16,16 @@
*/
package com.hpcloud.mon.persister.consumer;
import com.google.inject.Inject;
import com.hpcloud.mon.persister.disruptor.MetricDisruptor;
import com.hpcloud.mon.persister.disruptor.event.MetricHolder;
public class MetricsConsumer extends Consumer {
import com.google.inject.Inject;
public class MetricsConsumer extends Consumer<MetricHolder> {
@Inject
public MetricsConsumer(KafkaMetricsConsumer kafkaConsumer, MetricDisruptor disruptor) {
super(kafkaConsumer, disruptor);
}
}

View File

@ -16,13 +16,15 @@
*/
package com.hpcloud.mon.persister.disruptor;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventFactory;
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHandler;
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHandlerFactory;
import com.lmax.disruptor.EventHandler;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.lmax.disruptor.ExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -67,13 +69,14 @@ public class AlarmHistoryDisruptorProvider implements Provider<AlarmStateHistory
int numOutputProcessors = configuration.getDisruptorConfiguration().getNumProcessors();
logger.debug("Number of output processors [" + numOutputProcessors + "]");
EventHandler[] eventHandlers = new EventHandler[numOutputProcessors];
AlarmStateTransitionedEventHandler[] eventHandlers = new AlarmStateTransitionedEventHandler[numOutputProcessors];
for (int i = 0; i < numOutputProcessors; ++i) {
eventHandlers[i] = eventHandlerFactory.create(i, numOutputProcessors, batchSize);
}
disruptor.handleEventsWith(eventHandlers);
disruptor.setHandlers(eventHandlers);
disruptor.start();
logger.debug("Instance of disruptor successfully started");

View File

@ -17,19 +17,19 @@
package com.hpcloud.mon.persister.disruptor;
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHolder;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.Executor;
public class AlarmStateHistoryDisruptor extends Disruptor<AlarmStateTransitionedEventHolder> {
public AlarmStateHistoryDisruptor(EventFactory eventFactory, int ringBufferSize, Executor executor) {
public class AlarmStateHistoryDisruptor extends ManagedDisruptor<AlarmStateTransitionedEventHolder> {
public AlarmStateHistoryDisruptor(EventFactory<AlarmStateTransitionedEventHolder> eventFactory, int ringBufferSize, Executor executor) {
super(eventFactory, ringBufferSize, executor);
}
public AlarmStateHistoryDisruptor(final EventFactory eventFactory,
public AlarmStateHistoryDisruptor(final EventFactory<AlarmStateTransitionedEventHolder> eventFactory,
int ringBufferSize,
Executor executor,
ProducerType producerType,

View File

@ -0,0 +1,54 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hpcloud.mon.persister.disruptor;
import com.hpcloud.mon.persister.disruptor.event.FlushableHandler;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.Executor;
public class ManagedDisruptor<T> extends Disruptor<T>{
private FlushableHandler[] handlers = new FlushableHandler[0];
public ManagedDisruptor(EventFactory<T> eventFactory, int ringBufferSize, Executor executor) {
super(eventFactory, ringBufferSize, executor);
}
public ManagedDisruptor(final EventFactory<T> eventFactory,
int ringBufferSize,
Executor executor,
ProducerType producerType,
WaitStrategy waitStrategy) {
super(eventFactory, ringBufferSize, executor, producerType, waitStrategy);
}
@Override
public void shutdown() {
for (FlushableHandler handler : handlers) {
handler.flush();
}
super.shutdown();
}
public void setHandlers(FlushableHandler[] handlers) {
this.handlers = handlers;
}
}

View File

@ -17,19 +17,20 @@
package com.hpcloud.mon.persister.disruptor;
import com.hpcloud.mon.persister.disruptor.event.MetricHolder;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.Executor;
public class MetricDisruptor extends Disruptor<MetricHolder> {
public MetricDisruptor(EventFactory eventFactory, int ringBufferSize, Executor executor) {
public class MetricDisruptor extends ManagedDisruptor<MetricHolder> {
public MetricDisruptor(EventFactory<MetricHolder> eventFactory, int ringBufferSize, Executor executor) {
super(eventFactory, ringBufferSize, executor);
}
public MetricDisruptor(final EventFactory eventFactory,
public MetricDisruptor(final EventFactory<MetricHolder> eventFactory,
int ringBufferSize,
Executor executor,
ProducerType producerType,

View File

@ -18,11 +18,14 @@ package com.hpcloud.mon.persister.disruptor;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import com.hpcloud.mon.persister.disruptor.event.MetricFactory;
import com.hpcloud.mon.persister.disruptor.event.MetricHandler;
import com.hpcloud.mon.persister.disruptor.event.MetricHandlerFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -68,13 +71,14 @@ public class MetricDisruptorProvider implements Provider<MetricDisruptor> {
int numOutputProcessors = configuration.getDisruptorConfiguration().getNumProcessors();
logger.debug("Number of output processors [" + numOutputProcessors + "]");
EventHandler[] eventHandlers = new EventHandler[numOutputProcessors];
MetricHandler[] metricHandlers = new MetricHandler[numOutputProcessors];
for (int i = 0; i < numOutputProcessors; ++i) {
eventHandlers[i] = eventHandlerFactory.create(i, numOutputProcessors, batchSize);
metricHandlers[i] = eventHandlerFactory.create(i, numOutputProcessors, batchSize);
}
disruptor.handleEventsWith(eventHandlers);
disruptor.handleEventsWith(metricHandlers);
disruptor.setHandlers(metricHandlers);
disruptor.start();
logger.debug("Instance of disruptor successfully started");

View File

@ -16,21 +16,22 @@
*/
package com.hpcloud.mon.persister.disruptor.event;
import com.codahale.metrics.Counter;
import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import com.hpcloud.mon.persister.repository.AlarmRepository;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import com.hpcloud.mon.persister.repository.AlarmRepository;
import com.hpcloud.mon.persister.repository.VerticaAlarmRepository;
import com.lmax.disruptor.EventHandler;
import io.dropwizard.setup.Environment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AlarmStateTransitionedEventHandler implements EventHandler<AlarmStateTransitionedEventHolder> {
import io.dropwizard.setup.Environment;
public class AlarmStateTransitionedEventHandler implements EventHandler<AlarmStateTransitionedEventHolder>, FlushableHandler {
private static final Logger logger = LoggerFactory.getLogger(AlarmStateTransitionedEventHandler.class);
private final int ordinal;
@ -42,10 +43,8 @@ public class AlarmStateTransitionedEventHandler implements EventHandler<AlarmSta
private final int secondsBetweenFlushes;
private final AlarmRepository repository;
private final MonPersisterConfiguration configuration;
private final Environment environment;
private final Counter batchCounter;
private final Meter processedMeter;
private final Meter commitMeter;
private final Timer commitTimer;
@ -59,9 +58,7 @@ public class AlarmStateTransitionedEventHandler implements EventHandler<AlarmSta
@Assisted("batchSize") int batchSize) {
this.repository = repository;
this.configuration = configuration;
this.environment = environment;
this.batchCounter = this.environment.metrics().counter(this.getClass().getName() + "." + "alarm-added-to-batch-batchCounter");
this.processedMeter = this.environment.metrics().meter(this.getClass().getName() + "." + "alarm-messages-processed-processedMeter");
this.commitMeter = this.environment.metrics().meter(this.getClass().getName() + "." + "commits-executed-processedMeter");
this.commitTimer = this.environment.metrics().timer(this.getClass().getName() + "." + "total-commit-and-flush-timer");
@ -109,7 +106,8 @@ public class AlarmStateTransitionedEventHandler implements EventHandler<AlarmSta
}
}
private void flush() {
@Override
public void flush() {
repository.flush();
millisSinceLastFlush = System.currentTimeMillis();
}

View File

@ -0,0 +1,21 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hpcloud.mon.persister.disruptor.event;
public interface FlushableHandler {
public void flush();
}

View File

@ -16,30 +16,33 @@
*/
package com.hpcloud.mon.persister.disruptor.event;
import static com.hpcloud.mon.persister.repository.VerticaMetricsConstants.MAX_COLUMN_LENGTH;
import com.hpcloud.mon.common.model.metric.Metric;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import com.hpcloud.mon.persister.repository.MetricRepository;
import com.hpcloud.mon.persister.repository.Sha1HashId;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.hpcloud.mon.common.model.metric.Metric;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import com.hpcloud.mon.persister.repository.MetricRepository;
import com.hpcloud.mon.persister.repository.Sha1HashId;
import com.lmax.disruptor.EventHandler;
import io.dropwizard.setup.Environment;
import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.dropwizard.setup.Environment;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.TimeZone;
import java.util.TreeMap;
import static com.hpcloud.mon.persister.repository.VerticaMetricsConstants.MAX_COLUMN_LENGTH;
public class MetricHandler implements EventHandler<MetricHolder> {
public class MetricHandler implements EventHandler<MetricHolder>, FlushableHandler {
private static final Logger logger = LoggerFactory.getLogger(MetricHandler.class);
private static final String TENANT_ID = "tenantId";
@ -56,7 +59,6 @@ public class MetricHandler implements EventHandler<MetricHolder> {
private final int secondsBetweenFlushes;
private final MetricRepository verticaMetricRepository;
private final MonPersisterConfiguration configuration;
private final Environment environment;
private final Counter metricCounter;
@ -76,7 +78,6 @@ public class MetricHandler implements EventHandler<MetricHolder> {
@Assisted("batchSize") int batchSize) {
this.verticaMetricRepository = metricRepository;
this.configuration = configuration;
this.environment = environment;
this.metricCounter = this.environment.metrics().counter(this.getClass().getName() + "." + "metrics-added-to-batch-counter");
this.definitionCounter = this.environment.metrics().counter(this.getClass().getName() + "." + "metric-definitions-added-to-batch-counter");
@ -215,7 +216,8 @@ public class MetricHandler implements EventHandler<MetricHolder> {
}
private void flush() {
@Override
public void flush() {
verticaMetricRepository.flush();
millisSinceLastFlush = System.currentTimeMillis();
}

View File

@ -29,16 +29,10 @@ import org.slf4j.LoggerFactory;
public class RepositoryCommitHeartbeat implements Managed {
private static Logger logger = LoggerFactory.getLogger(RepositoryCommitHeartbeat.class);
private final MetricDisruptor metricDisruptor;
private final AlarmStateHistoryDisruptor alarmHistoryDisruptor;
private final HeartbeatRunnable deduperRunnable;
@Inject
public RepositoryCommitHeartbeat(MetricDisruptor metricDisruptor, AlarmStateHistoryDisruptor alarmHistoryDisruptor) {
this.metricDisruptor = metricDisruptor;
this.alarmHistoryDisruptor = alarmHistoryDisruptor;
this.deduperRunnable = new HeartbeatRunnable(metricDisruptor, alarmHistoryDisruptor);
}
@ -51,15 +45,19 @@ public class RepositoryCommitHeartbeat implements Managed {
@Override
public void stop() throws Exception {
this.deduperRunnable.stop();
}
private static class HeartbeatRunnable implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(HeartbeatRunnable.class);
private final Disruptor metricDisruptor;
private final Disruptor alarmHistoryDisruptor;
private final Disruptor<MetricHolder> metricDisruptor;
private final Disruptor<AlarmStateTransitionedEventHolder> alarmHistoryDisruptor;
private HeartbeatRunnable(Disruptor metricDisruptor, Disruptor alarmHistoryDisruptor) {
private boolean stop = false;
private HeartbeatRunnable(MetricDisruptor metricDisruptor,
AlarmStateHistoryDisruptor alarmHistoryDisruptor) {
this.metricDisruptor = metricDisruptor;
this.alarmHistoryDisruptor = alarmHistoryDisruptor;
}
@ -69,7 +67,13 @@ public class RepositoryCommitHeartbeat implements Managed {
for (; ; ) {
try {
// Send a heartbeat every second.
Thread.sleep(1000);
synchronized (this) {
this.wait(1000);
if (stop) {
logger.debug("Heartbeat thread is exiting");
break;
}
}
logger.debug("Waking up after sleeping 1 seconds, yawn...");
// Send heartbeat
@ -96,5 +100,10 @@ public class RepositoryCommitHeartbeat implements Managed {
}
}
public synchronized void stop() {
stop = true;
this.notify();
}
}
}