From a2cf06e2d3080b264cbe7c2f33b4bdb27f72eab6 Mon Sep 17 00:00:00 2001 From: Craig Bryant Date: Sun, 13 Jul 2014 16:13:32 -0600 Subject: [PATCH] Ensure that metrics and alarms are flushed on shutdown and that the Kafka connection is shutdown properly also. This ensure that on normal shutdown, no metrics or alarms are missed or duplicated. Some changes were also done to remove warnings for generics. Some unused instance properties were removed. --- .../mon/persister/consumer/Consumer.java | 13 +++-- .../mon/persister/consumer/KafkaConsumer.java | 18 ++++++- .../mon/persister/consumer/KafkaStreams.java | 4 ++ .../persister/consumer/MetricsConsumer.java | 7 ++- .../AlarmHistoryDisruptorProvider.java | 11 ++-- .../disruptor/AlarmStateHistoryDisruptor.java | 8 +-- .../persister/disruptor/ManagedDisruptor.java | 54 +++++++++++++++++++ .../persister/disruptor/MetricDisruptor.java | 9 ++-- .../disruptor/MetricDisruptorProvider.java | 12 +++-- .../AlarmStateTransitionedEventHandler.java | 22 ++++---- .../disruptor/event/FlushableHandler.java | 21 ++++++++ .../disruptor/event/MetricHandler.java | 24 +++++---- .../repository/RepositoryCommitHeartbeat.java | 29 ++++++---- 13 files changed, 174 insertions(+), 58 deletions(-) create mode 100644 src/main/java/com/hpcloud/mon/persister/disruptor/ManagedDisruptor.java create mode 100644 src/main/java/com/hpcloud/mon/persister/disruptor/event/FlushableHandler.java diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/Consumer.java b/src/main/java/com/hpcloud/mon/persister/consumer/Consumer.java index 2a7e976e..7648429d 100644 --- a/src/main/java/com/hpcloud/mon/persister/consumer/Consumer.java +++ b/src/main/java/com/hpcloud/mon/persister/consumer/Consumer.java @@ -16,20 +16,23 @@ */ package com.hpcloud.mon.persister.consumer; +import com.hpcloud.mon.persister.disruptor.ManagedDisruptor; + import com.google.inject.Inject; -import com.lmax.disruptor.dsl.Disruptor; -import io.dropwizard.lifecycle.Managed; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class Consumer implements Managed { +import io.dropwizard.lifecycle.Managed; + +public class Consumer implements Managed { private static final Logger logger = LoggerFactory.getLogger(Consumer.class); private final KafkaConsumer consumer; - private final Disruptor disruptor; + private final ManagedDisruptor disruptor; @Inject - public Consumer(KafkaConsumer kafkaConsumer, Disruptor disruptor) { + public Consumer(KafkaConsumer kafkaConsumer, ManagedDisruptor disruptor) { this.consumer = kafkaConsumer; this.disruptor = disruptor; } diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaConsumer.java b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaConsumer.java index cad89b65..93e26b30 100644 --- a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaConsumer.java +++ b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaConsumer.java @@ -17,20 +17,26 @@ package com.hpcloud.mon.persister.consumer; import com.google.inject.Inject; + import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; + import kafka.consumer.KafkaStream; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; public abstract class KafkaConsumer { private static final String KAFKA_CONFIGURATION = "Kafka configuration:"; private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); + private static final int WAIT_TIME = 10; + protected final MonPersisterConfiguration configuration; private final Integer numThreads; @@ -38,7 +44,7 @@ public abstract class KafkaConsumer { @Inject private KafkaStreams kafkaStreams; - protected abstract Runnable createRunnable(KafkaStream stream, int threadNumber); + protected abstract Runnable createRunnable(KafkaStream stream, int threadNumber); protected abstract String getStreamName(); @Inject @@ -55,15 +61,23 @@ public abstract class KafkaConsumer { executorService = Executors.newFixedThreadPool(numThreads); int threadNumber = 0; - for (final KafkaStream stream : streams) { + for (final KafkaStream stream : streams) { executorService.submit(createRunnable(stream, threadNumber)); threadNumber++; } } public void stop() { + kafkaStreams.stop(); if (executorService != null) { executorService.shutdown(); + try { + if (!executorService.awaitTermination(WAIT_TIME, TimeUnit.SECONDS)) { + logger.warn("Did not shut down in %d seconds", WAIT_TIME); + } + } catch (InterruptedException e) { + logger.info("awaitTerminiation interrupted", e); + } } } } diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaStreams.java b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaStreams.java index ba2f02ac..594ad7bb 100644 --- a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaStreams.java +++ b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaStreams.java @@ -89,4 +89,8 @@ public class KafkaStreams { return properties; } + + public void stop() { + consumerConnector.shutdown(); + } } diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/MetricsConsumer.java b/src/main/java/com/hpcloud/mon/persister/consumer/MetricsConsumer.java index be57f5e0..cb73e381 100644 --- a/src/main/java/com/hpcloud/mon/persister/consumer/MetricsConsumer.java +++ b/src/main/java/com/hpcloud/mon/persister/consumer/MetricsConsumer.java @@ -16,13 +16,16 @@ */ package com.hpcloud.mon.persister.consumer; -import com.google.inject.Inject; import com.hpcloud.mon.persister.disruptor.MetricDisruptor; +import com.hpcloud.mon.persister.disruptor.event.MetricHolder; -public class MetricsConsumer extends Consumer { +import com.google.inject.Inject; + +public class MetricsConsumer extends Consumer { @Inject public MetricsConsumer(KafkaMetricsConsumer kafkaConsumer, MetricDisruptor disruptor) { super(kafkaConsumer, disruptor); } + } diff --git a/src/main/java/com/hpcloud/mon/persister/disruptor/AlarmHistoryDisruptorProvider.java b/src/main/java/com/hpcloud/mon/persister/disruptor/AlarmHistoryDisruptorProvider.java index dd5118ab..2769380f 100644 --- a/src/main/java/com/hpcloud/mon/persister/disruptor/AlarmHistoryDisruptorProvider.java +++ b/src/main/java/com/hpcloud/mon/persister/disruptor/AlarmHistoryDisruptorProvider.java @@ -16,13 +16,15 @@ */ package com.hpcloud.mon.persister.disruptor; -import com.google.inject.Inject; -import com.google.inject.Provider; import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventFactory; +import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHandler; import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHandlerFactory; -import com.lmax.disruptor.EventHandler; + +import com.google.inject.Inject; +import com.google.inject.Provider; import com.lmax.disruptor.ExceptionHandler; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,13 +69,14 @@ public class AlarmHistoryDisruptorProvider implements Provider { - public AlarmStateHistoryDisruptor(EventFactory eventFactory, int ringBufferSize, Executor executor) { +public class AlarmStateHistoryDisruptor extends ManagedDisruptor { + public AlarmStateHistoryDisruptor(EventFactory eventFactory, int ringBufferSize, Executor executor) { super(eventFactory, ringBufferSize, executor); } - public AlarmStateHistoryDisruptor(final EventFactory eventFactory, + public AlarmStateHistoryDisruptor(final EventFactory eventFactory, int ringBufferSize, Executor executor, ProducerType producerType, diff --git a/src/main/java/com/hpcloud/mon/persister/disruptor/ManagedDisruptor.java b/src/main/java/com/hpcloud/mon/persister/disruptor/ManagedDisruptor.java new file mode 100644 index 00000000..b6e46f57 --- /dev/null +++ b/src/main/java/com/hpcloud/mon/persister/disruptor/ManagedDisruptor.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hpcloud.mon.persister.disruptor; + +import com.hpcloud.mon.persister.disruptor.event.FlushableHandler; + +import com.lmax.disruptor.EventFactory; +import com.lmax.disruptor.WaitStrategy; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; + +import java.util.concurrent.Executor; + +public class ManagedDisruptor extends Disruptor{ + private FlushableHandler[] handlers = new FlushableHandler[0]; + + public ManagedDisruptor(EventFactory eventFactory, int ringBufferSize, Executor executor) { + super(eventFactory, ringBufferSize, executor); + } + + public ManagedDisruptor(final EventFactory eventFactory, + int ringBufferSize, + Executor executor, + ProducerType producerType, + WaitStrategy waitStrategy) { + super(eventFactory, ringBufferSize, executor, producerType, waitStrategy); + } + + @Override + public void shutdown() { + for (FlushableHandler handler : handlers) { + handler.flush(); + } + super.shutdown(); + } + + public void setHandlers(FlushableHandler[] handlers) { + this.handlers = handlers; + } +} diff --git a/src/main/java/com/hpcloud/mon/persister/disruptor/MetricDisruptor.java b/src/main/java/com/hpcloud/mon/persister/disruptor/MetricDisruptor.java index d41dc749..7398c713 100644 --- a/src/main/java/com/hpcloud/mon/persister/disruptor/MetricDisruptor.java +++ b/src/main/java/com/hpcloud/mon/persister/disruptor/MetricDisruptor.java @@ -17,19 +17,20 @@ package com.hpcloud.mon.persister.disruptor; import com.hpcloud.mon.persister.disruptor.event.MetricHolder; + import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.WaitStrategy; -import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import java.util.concurrent.Executor; -public class MetricDisruptor extends Disruptor { - public MetricDisruptor(EventFactory eventFactory, int ringBufferSize, Executor executor) { +public class MetricDisruptor extends ManagedDisruptor { + + public MetricDisruptor(EventFactory eventFactory, int ringBufferSize, Executor executor) { super(eventFactory, ringBufferSize, executor); } - public MetricDisruptor(final EventFactory eventFactory, + public MetricDisruptor(final EventFactory eventFactory, int ringBufferSize, Executor executor, ProducerType producerType, diff --git a/src/main/java/com/hpcloud/mon/persister/disruptor/MetricDisruptorProvider.java b/src/main/java/com/hpcloud/mon/persister/disruptor/MetricDisruptorProvider.java index 3fe3e3a6..386e5e80 100644 --- a/src/main/java/com/hpcloud/mon/persister/disruptor/MetricDisruptorProvider.java +++ b/src/main/java/com/hpcloud/mon/persister/disruptor/MetricDisruptorProvider.java @@ -18,11 +18,14 @@ package com.hpcloud.mon.persister.disruptor; import com.google.inject.Inject; import com.google.inject.Provider; + import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; import com.hpcloud.mon.persister.disruptor.event.MetricFactory; +import com.hpcloud.mon.persister.disruptor.event.MetricHandler; import com.hpcloud.mon.persister.disruptor.event.MetricHandlerFactory; -import com.lmax.disruptor.EventHandler; + import com.lmax.disruptor.ExceptionHandler; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,13 +71,14 @@ public class MetricDisruptorProvider implements Provider { int numOutputProcessors = configuration.getDisruptorConfiguration().getNumProcessors(); logger.debug("Number of output processors [" + numOutputProcessors + "]"); - EventHandler[] eventHandlers = new EventHandler[numOutputProcessors]; + MetricHandler[] metricHandlers = new MetricHandler[numOutputProcessors]; for (int i = 0; i < numOutputProcessors; ++i) { - eventHandlers[i] = eventHandlerFactory.create(i, numOutputProcessors, batchSize); + metricHandlers[i] = eventHandlerFactory.create(i, numOutputProcessors, batchSize); } - disruptor.handleEventsWith(eventHandlers); + disruptor.handleEventsWith(metricHandlers); + disruptor.setHandlers(metricHandlers); disruptor.start(); logger.debug("Instance of disruptor successfully started"); diff --git a/src/main/java/com/hpcloud/mon/persister/disruptor/event/AlarmStateTransitionedEventHandler.java b/src/main/java/com/hpcloud/mon/persister/disruptor/event/AlarmStateTransitionedEventHandler.java index 57d24378..6bdbb143 100644 --- a/src/main/java/com/hpcloud/mon/persister/disruptor/event/AlarmStateTransitionedEventHandler.java +++ b/src/main/java/com/hpcloud/mon/persister/disruptor/event/AlarmStateTransitionedEventHandler.java @@ -16,21 +16,22 @@ */ package com.hpcloud.mon.persister.disruptor.event; -import com.codahale.metrics.Counter; +import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent; +import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; +import com.hpcloud.mon.persister.repository.AlarmRepository; + import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; -import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent; -import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; -import com.hpcloud.mon.persister.repository.AlarmRepository; -import com.hpcloud.mon.persister.repository.VerticaAlarmRepository; import com.lmax.disruptor.EventHandler; -import io.dropwizard.setup.Environment; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class AlarmStateTransitionedEventHandler implements EventHandler { +import io.dropwizard.setup.Environment; + +public class AlarmStateTransitionedEventHandler implements EventHandler, FlushableHandler { private static final Logger logger = LoggerFactory.getLogger(AlarmStateTransitionedEventHandler.class); private final int ordinal; @@ -42,10 +43,8 @@ public class AlarmStateTransitionedEventHandler implements EventHandler { +public class MetricHandler implements EventHandler, FlushableHandler { private static final Logger logger = LoggerFactory.getLogger(MetricHandler.class); private static final String TENANT_ID = "tenantId"; @@ -56,7 +59,6 @@ public class MetricHandler implements EventHandler { private final int secondsBetweenFlushes; private final MetricRepository verticaMetricRepository; - private final MonPersisterConfiguration configuration; private final Environment environment; private final Counter metricCounter; @@ -76,7 +78,6 @@ public class MetricHandler implements EventHandler { @Assisted("batchSize") int batchSize) { this.verticaMetricRepository = metricRepository; - this.configuration = configuration; this.environment = environment; this.metricCounter = this.environment.metrics().counter(this.getClass().getName() + "." + "metrics-added-to-batch-counter"); this.definitionCounter = this.environment.metrics().counter(this.getClass().getName() + "." + "metric-definitions-added-to-batch-counter"); @@ -215,7 +216,8 @@ public class MetricHandler implements EventHandler { } - private void flush() { + @Override + public void flush() { verticaMetricRepository.flush(); millisSinceLastFlush = System.currentTimeMillis(); } diff --git a/src/main/java/com/hpcloud/mon/persister/repository/RepositoryCommitHeartbeat.java b/src/main/java/com/hpcloud/mon/persister/repository/RepositoryCommitHeartbeat.java index 84e920b5..040d950a 100644 --- a/src/main/java/com/hpcloud/mon/persister/repository/RepositoryCommitHeartbeat.java +++ b/src/main/java/com/hpcloud/mon/persister/repository/RepositoryCommitHeartbeat.java @@ -29,16 +29,10 @@ import org.slf4j.LoggerFactory; public class RepositoryCommitHeartbeat implements Managed { - private static Logger logger = LoggerFactory.getLogger(RepositoryCommitHeartbeat.class); - - private final MetricDisruptor metricDisruptor; - private final AlarmStateHistoryDisruptor alarmHistoryDisruptor; private final HeartbeatRunnable deduperRunnable; @Inject public RepositoryCommitHeartbeat(MetricDisruptor metricDisruptor, AlarmStateHistoryDisruptor alarmHistoryDisruptor) { - this.metricDisruptor = metricDisruptor; - this.alarmHistoryDisruptor = alarmHistoryDisruptor; this.deduperRunnable = new HeartbeatRunnable(metricDisruptor, alarmHistoryDisruptor); } @@ -51,15 +45,19 @@ public class RepositoryCommitHeartbeat implements Managed { @Override public void stop() throws Exception { + this.deduperRunnable.stop(); } private static class HeartbeatRunnable implements Runnable { private static final Logger logger = LoggerFactory.getLogger(HeartbeatRunnable.class); - private final Disruptor metricDisruptor; - private final Disruptor alarmHistoryDisruptor; + private final Disruptor metricDisruptor; + private final Disruptor alarmHistoryDisruptor; - private HeartbeatRunnable(Disruptor metricDisruptor, Disruptor alarmHistoryDisruptor) { + private boolean stop = false; + + private HeartbeatRunnable(MetricDisruptor metricDisruptor, + AlarmStateHistoryDisruptor alarmHistoryDisruptor) { this.metricDisruptor = metricDisruptor; this.alarmHistoryDisruptor = alarmHistoryDisruptor; } @@ -69,7 +67,13 @@ public class RepositoryCommitHeartbeat implements Managed { for (; ; ) { try { // Send a heartbeat every second. - Thread.sleep(1000); + synchronized (this) { + this.wait(1000); + if (stop) { + logger.debug("Heartbeat thread is exiting"); + break; + } + } logger.debug("Waking up after sleeping 1 seconds, yawn..."); // Send heartbeat @@ -96,5 +100,10 @@ public class RepositoryCommitHeartbeat implements Managed { } } + + public synchronized void stop() { + stop = true; + this.notify(); + } } }