From dbab337d762b60199f995653131e33c41113cf16 Mon Sep 17 00:00:00 2001 From: Craig Bryant Date: Fri, 18 Jul 2014 16:09:36 -0600 Subject: [PATCH] Commit kafka reads once the item was persisted In order to continue using the Kafka High Consumer API, the disruptor was removed. This allows a direct call to to kafka to commit the offsets when items are flushed. Different ConsumerConnectors had to be created for Metrics and Alarms so the offsets could be committed separately Changed configuration to match the new model. Remove configuration parameters that were no longer needed Changed the name Disruptor to Pipeline Allow only one EventHandler per pipeline Added code to flush the Metrics and Alarms, and shutdown the Kafka ConsumerConnections on a normal shutdown. This keeps the persister from losing Metrics and Alarms. Made measurementTimeStampSimpleDateFormat not static since SimpleDateFormat is not thread safe Changed some logging debug statements so Strings weren't created if debug not on Created FlushableHandler as a base class and moved duplicate code into it from MetricHandler and AlarmStateTransitionHistoryHandler Change-Id: Id31a1d148f8e796f5be483dd02544be49c009b18 Changed MetricHandler to take MetricEnvelope[] Change-Id: Ifabbe253cc0163f150ada2252a41a5d9fb9ab423 --- .gitignore | 6 + pom.xml | 6 - .../persister/MonPersisterApplication.java | 100 ++++++++++++-- .../mon/persister/MonPersisterModule.java | 70 ++++++---- .../configuration/DeduperConfiguration.java | 33 ----- .../configuration/DisruptorConfiguration.java | 40 ------ .../configuration/KafkaConfiguration.java | 42 ------ .../MonPersisterConfiguration.java | 38 +----- .../OutputProcessorConfiguration.java | 33 ----- .../configuration/PipelineConfiguration.java | 96 ++++++++++++++ ...java => AlarmStateTransitionConsumer.java} | 13 +- .../AlarmStateTransitionConsumerFactory.java | 25 ++++ .../mon/persister/consumer/Consumer.java | 14 +- .../KafkaAlarmStateTransitionConsumer.java | 26 ++-- ...aAlarmStateTransitionConsumerFactory.java} | 15 +-- ...mStateTransitionConsumerRunnableBasic.java | 55 +++----- ...ransitionConsumerRunnableBasicFactory.java | 4 +- .../mon/persister/consumer/KafkaChannel.java | 122 +++++++++++++++++ ...Provider.java => KafkaChannelFactory.java} | 22 +--- .../mon/persister/consumer/KafkaConsumer.java | 79 +++++------ .../consumer/KafkaConsumerRunnableBasic.java | 81 ++++++++++++ .../consumer/KafkaMetricsConsumer.java | 26 ++-- .../KafkaMetricsConsumerFactory.java} | 15 +-- .../KafkaMetricsConsumerRunnableBasic.java | 59 +++------ ...kaMetricsConsumerRunnableBasicFactory.java | 4 +- .../mon/persister/consumer/KafkaStreams.java | 112 ---------------- .../persister/consumer/MetricsConsumer.java | 11 +- .../consumer/MetricsConsumerFactory.java | 21 +++ .../AlarmHistoryDisruptorProvider.java | 94 ------------- .../disruptor/AlarmStateHistoryDisruptor.java | 39 ------ .../disruptor/DisruptorExceptionHandler.java | 49 ------- .../persister/disruptor/ManagedDisruptor.java | 52 -------- .../persister/disruptor/MetricDisruptor.java | 39 ------ .../disruptor/MetricDisruptorProvider.java | 92 ------------- .../AlarmStateTransitionedEventFactory.java | 29 ---- .../AlarmStateTransitionedEventHandler.java | 124 ------------------ .../AlarmStateTransitionPipeline.java} | 17 ++- .../AlarmStateTransitionPipelineFactory.java} | 14 +- .../persister/pipeline/ManagedPipeline.java | 46 +++++++ .../MetricPipeline.java} | 17 ++- .../MetricPipelineFactory.java} | 8 +- .../AlarmStateTransitionedEventHandler.java | 64 +++++++++ ...mStateTransitionedEventHandlerFactory.java | 8 +- .../pipeline/event/FlushableHandler.java | 116 ++++++++++++++++ .../event/MetricHandler.java | 114 +++++----------- .../event/MetricHandlerFactory.java | 8 +- .../repository/InfluxDBAlarmRepository.java | 8 +- .../repository/InfluxDBMetricRepository.java | 30 ++--- .../repository/RepositoryCommitHeartbeat.java | 115 ---------------- src/main/resources/mon-persister-config.yml | 37 +++--- .../persister/MonPersisterConsumerTest.java | 38 ++---- .../java/com/hpcloud/mon/persister/Test.java | 74 ----------- 52 files changed, 971 insertions(+), 1429 deletions(-) create mode 100644 .gitignore delete mode 100644 src/main/java/com/hpcloud/mon/persister/configuration/DeduperConfiguration.java delete mode 100644 src/main/java/com/hpcloud/mon/persister/configuration/DisruptorConfiguration.java delete mode 100644 src/main/java/com/hpcloud/mon/persister/configuration/OutputProcessorConfiguration.java create mode 100644 src/main/java/com/hpcloud/mon/persister/configuration/PipelineConfiguration.java rename src/main/java/com/hpcloud/mon/persister/consumer/{AlarmStateTransitionsConsumer.java => AlarmStateTransitionConsumer.java} (60%) create mode 100644 src/main/java/com/hpcloud/mon/persister/consumer/AlarmStateTransitionConsumerFactory.java rename src/main/java/com/hpcloud/mon/persister/{disruptor/event/MetricFactory.java => consumer/KafkaAlarmStateTransitionConsumerFactory.java} (67%) create mode 100644 src/main/java/com/hpcloud/mon/persister/consumer/KafkaChannel.java rename src/main/java/com/hpcloud/mon/persister/consumer/{KafkaStreamsProvider.java => KafkaChannelFactory.java} (64%) create mode 100644 src/main/java/com/hpcloud/mon/persister/consumer/KafkaConsumerRunnableBasic.java rename src/main/java/com/hpcloud/mon/persister/{configuration/AlarmHistoryConfiguration.java => consumer/KafkaMetricsConsumerFactory.java} (70%) delete mode 100644 src/main/java/com/hpcloud/mon/persister/consumer/KafkaStreams.java create mode 100644 src/main/java/com/hpcloud/mon/persister/consumer/MetricsConsumerFactory.java delete mode 100644 src/main/java/com/hpcloud/mon/persister/disruptor/AlarmHistoryDisruptorProvider.java delete mode 100644 src/main/java/com/hpcloud/mon/persister/disruptor/AlarmStateHistoryDisruptor.java delete mode 100644 src/main/java/com/hpcloud/mon/persister/disruptor/DisruptorExceptionHandler.java delete mode 100644 src/main/java/com/hpcloud/mon/persister/disruptor/ManagedDisruptor.java delete mode 100644 src/main/java/com/hpcloud/mon/persister/disruptor/MetricDisruptor.java delete mode 100644 src/main/java/com/hpcloud/mon/persister/disruptor/MetricDisruptorProvider.java delete mode 100644 src/main/java/com/hpcloud/mon/persister/disruptor/event/AlarmStateTransitionedEventFactory.java delete mode 100644 src/main/java/com/hpcloud/mon/persister/disruptor/event/AlarmStateTransitionedEventHandler.java rename src/main/java/com/hpcloud/mon/persister/{disruptor/event/AlarmStateTransitionedEventHolder.java => pipeline/AlarmStateTransitionPipeline.java} (62%) rename src/main/java/com/hpcloud/mon/persister/{configuration/MetricConfiguration.java => pipeline/AlarmStateTransitionPipelineFactory.java} (70%) create mode 100644 src/main/java/com/hpcloud/mon/persister/pipeline/ManagedPipeline.java rename src/main/java/com/hpcloud/mon/persister/{disruptor/event/MetricHolder.java => pipeline/MetricPipeline.java} (65%) rename src/main/java/com/hpcloud/mon/persister/{disruptor/event/FlushableHandler.java => pipeline/MetricPipelineFactory.java} (75%) create mode 100644 src/main/java/com/hpcloud/mon/persister/pipeline/event/AlarmStateTransitionedEventHandler.java rename src/main/java/com/hpcloud/mon/persister/{disruptor => pipeline}/event/AlarmStateTransitionedEventHandlerFactory.java (72%) create mode 100644 src/main/java/com/hpcloud/mon/persister/pipeline/event/FlushableHandler.java rename src/main/java/com/hpcloud/mon/persister/{disruptor => pipeline}/event/MetricHandler.java (66%) rename src/main/java/com/hpcloud/mon/persister/{disruptor => pipeline}/event/MetricHandlerFactory.java (73%) delete mode 100644 src/main/java/com/hpcloud/mon/persister/repository/RepositoryCommitHeartbeat.java delete mode 100644 src/test/java/com/hpcloud/mon/persister/Test.java diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..0d252954 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +target/ +*.classpath +*.project +*.settings/ +debs/ +logs/ diff --git a/pom.xml b/pom.xml index a15315eb..678de22f 100644 --- a/pom.xml +++ b/pom.xml @@ -100,12 +100,6 @@ 4.11 test - - com.lmax - disruptor - 3.2.0 - - com.vertica vertica-jdbc diff --git a/src/main/java/com/hpcloud/mon/persister/MonPersisterApplication.java b/src/main/java/com/hpcloud/mon/persister/MonPersisterApplication.java index fc1e90ca..5634c185 100644 --- a/src/main/java/com/hpcloud/mon/persister/MonPersisterApplication.java +++ b/src/main/java/com/hpcloud/mon/persister/MonPersisterApplication.java @@ -18,10 +18,23 @@ package com.hpcloud.mon.persister; import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; -import com.hpcloud.mon.persister.consumer.AlarmStateTransitionsConsumer; +import com.hpcloud.mon.persister.consumer.AlarmStateTransitionConsumer; +import com.hpcloud.mon.persister.consumer.AlarmStateTransitionConsumerFactory; +import com.hpcloud.mon.persister.consumer.KafkaAlarmStateTransitionConsumer; +import com.hpcloud.mon.persister.consumer.KafkaAlarmStateTransitionConsumerFactory; +import com.hpcloud.mon.persister.consumer.KafkaChannel; +import com.hpcloud.mon.persister.consumer.KafkaChannelFactory; +import com.hpcloud.mon.persister.consumer.KafkaMetricsConsumer; +import com.hpcloud.mon.persister.consumer.KafkaMetricsConsumerFactory; import com.hpcloud.mon.persister.consumer.MetricsConsumer; +import com.hpcloud.mon.persister.consumer.MetricsConsumerFactory; import com.hpcloud.mon.persister.healthcheck.SimpleHealthCheck; -import com.hpcloud.mon.persister.repository.RepositoryCommitHeartbeat; +import com.hpcloud.mon.persister.pipeline.AlarmStateTransitionPipeline; +import com.hpcloud.mon.persister.pipeline.AlarmStateTransitionPipelineFactory; +import com.hpcloud.mon.persister.pipeline.MetricPipeline; +import com.hpcloud.mon.persister.pipeline.MetricPipelineFactory; +import com.hpcloud.mon.persister.pipeline.event.AlarmStateTransitionedEventHandlerFactory; +import com.hpcloud.mon.persister.pipeline.event.MetricHandlerFactory; import com.hpcloud.mon.persister.resource.Resource; import com.google.inject.Guice; @@ -31,7 +44,11 @@ import io.dropwizard.Application; import io.dropwizard.setup.Bootstrap; import io.dropwizard.setup.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class MonPersisterApplication extends Application { + private static final Logger logger = LoggerFactory.getLogger(MonPersisterApplication.class); public static void main(String[] args) throws Exception { new MonPersisterApplication().run(args); @@ -58,16 +75,79 @@ public class MonPersisterApplication extends Application { +public class AlarmStateTransitionConsumer extends Consumer { @Inject - public AlarmStateTransitionsConsumer(KafkaAlarmStateTransitionConsumer kafkaConsumer, - AlarmStateHistoryDisruptor disruptor) { - super(kafkaConsumer, disruptor); + public AlarmStateTransitionConsumer(@Assisted KafkaAlarmStateTransitionConsumer kafkaConsumer, + @Assisted AlarmStateTransitionPipeline pipeline) { + super(kafkaConsumer, pipeline); } } diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/AlarmStateTransitionConsumerFactory.java b/src/main/java/com/hpcloud/mon/persister/consumer/AlarmStateTransitionConsumerFactory.java new file mode 100644 index 00000000..c57990d4 --- /dev/null +++ b/src/main/java/com/hpcloud/mon/persister/consumer/AlarmStateTransitionConsumerFactory.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hpcloud.mon.persister.consumer; + +import com.hpcloud.mon.persister.pipeline.AlarmStateTransitionPipeline; + +public interface AlarmStateTransitionConsumerFactory { + AlarmStateTransitionConsumer create(KafkaAlarmStateTransitionConsumer kafkaConsumer, + AlarmStateTransitionPipeline pipeline); +} diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/Consumer.java b/src/main/java/com/hpcloud/mon/persister/consumer/Consumer.java index 371b416b..4f69934a 100644 --- a/src/main/java/com/hpcloud/mon/persister/consumer/Consumer.java +++ b/src/main/java/com/hpcloud/mon/persister/consumer/Consumer.java @@ -17,7 +17,7 @@ package com.hpcloud.mon.persister.consumer; -import com.hpcloud.mon.persister.disruptor.ManagedDisruptor; +import com.hpcloud.mon.persister.pipeline.ManagedPipeline; import com.google.inject.Inject; @@ -29,25 +29,25 @@ import org.slf4j.LoggerFactory; public class Consumer implements Managed { private static final Logger logger = LoggerFactory.getLogger(Consumer.class); - private final KafkaConsumer consumer; - private final ManagedDisruptor disruptor; + private final KafkaConsumer consumer; + private final ManagedPipeline pipeline; @Inject - public Consumer(KafkaConsumer kafkaConsumer, ManagedDisruptor disruptor) { + public Consumer(KafkaConsumer kafkaConsumer, ManagedPipeline pipeline) { this.consumer = kafkaConsumer; - this.disruptor = disruptor; + this.pipeline = pipeline; } @Override public void start() throws Exception { logger.debug("start"); - consumer.run(); + consumer.start(); } @Override public void stop() throws Exception { logger.debug("stop"); consumer.stop(); - disruptor.shutdown(); + pipeline.shutdown(); } } diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaAlarmStateTransitionConsumer.java b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaAlarmStateTransitionConsumer.java index 954e5b1f..fad04a78 100644 --- a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaAlarmStateTransitionConsumer.java +++ b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaAlarmStateTransitionConsumer.java @@ -17,29 +17,29 @@ package com.hpcloud.mon.persister.consumer; -import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; +import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent; +import com.hpcloud.mon.persister.pipeline.AlarmStateTransitionPipeline; import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; -import kafka.consumer.KafkaStream; - -public class KafkaAlarmStateTransitionConsumer extends KafkaConsumer { +public class KafkaAlarmStateTransitionConsumer extends KafkaConsumer { @Inject private KafkaAlarmStateTransitionConsumerRunnableBasicFactory factory; + private final AlarmStateTransitionPipeline pipeline; + @Inject - public KafkaAlarmStateTransitionConsumer(MonPersisterConfiguration configuration) { - super(configuration); + public KafkaAlarmStateTransitionConsumer(@Assisted KafkaChannel kafkaChannel, + @Assisted int threadNum, @Assisted final AlarmStateTransitionPipeline pipeline) { + super(kafkaChannel, threadNum); + this.pipeline = pipeline; } @Override - protected Runnable createRunnable(KafkaStream stream, int threadNumber) { - return factory.create(stream, threadNumber); - } - - @Override - protected String getStreamName() { - return this.configuration.getAlarmHistoryConfiguration().getTopic(); + protected KafkaConsumerRunnableBasic createRunnable( + KafkaChannel kafkaChannel, int threadNumber) { + return factory.create(pipeline, kafkaChannel, threadNumber); } } diff --git a/src/main/java/com/hpcloud/mon/persister/disruptor/event/MetricFactory.java b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaAlarmStateTransitionConsumerFactory.java similarity index 67% rename from src/main/java/com/hpcloud/mon/persister/disruptor/event/MetricFactory.java rename to src/main/java/com/hpcloud/mon/persister/consumer/KafkaAlarmStateTransitionConsumerFactory.java index ece7fa32..2c2b554f 100644 --- a/src/main/java/com/hpcloud/mon/persister/disruptor/event/MetricFactory.java +++ b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaAlarmStateTransitionConsumerFactory.java @@ -15,16 +15,11 @@ * limitations under the License. */ -package com.hpcloud.mon.persister.disruptor.event; +package com.hpcloud.mon.persister.consumer; -import com.lmax.disruptor.EventFactory; +import com.hpcloud.mon.persister.pipeline.AlarmStateTransitionPipeline; -public class MetricFactory implements EventFactory { - - public static final MetricFactory INSTANCE = new MetricFactory(); - - @Override - public MetricHolder newInstance() { - return new MetricHolder(); - } +public interface KafkaAlarmStateTransitionConsumerFactory { + KafkaAlarmStateTransitionConsumer create(KafkaChannel kafkaChannel, int threadNum, + final AlarmStateTransitionPipeline pipeline); } diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaAlarmStateTransitionConsumerRunnableBasic.java b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaAlarmStateTransitionConsumerRunnableBasic.java index cd47c572..7d415206 100644 --- a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaAlarmStateTransitionConsumerRunnableBasic.java +++ b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaAlarmStateTransitionConsumerRunnableBasic.java @@ -18,67 +18,50 @@ package com.hpcloud.mon.persister.consumer; import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent; -import com.hpcloud.mon.persister.disruptor.AlarmStateHistoryDisruptor; -import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHolder; +import com.hpcloud.mon.persister.pipeline.AlarmStateTransitionPipeline; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; -import com.lmax.disruptor.EventTranslator; - -import kafka.consumer.ConsumerIterator; -import kafka.consumer.KafkaStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class KafkaAlarmStateTransitionConsumerRunnableBasic implements Runnable { +public class KafkaAlarmStateTransitionConsumerRunnableBasic extends + KafkaConsumerRunnableBasic { private static final Logger logger = LoggerFactory .getLogger(KafkaAlarmStateTransitionConsumerRunnableBasic.class); - private final KafkaStream stream; - private final int threadNumber; - private final AlarmStateHistoryDisruptor disruptor; private final ObjectMapper objectMapper; @Inject - public KafkaAlarmStateTransitionConsumerRunnableBasic(AlarmStateHistoryDisruptor disruptor, - @Assisted KafkaStream stream, @Assisted int threadNumber) { - this.stream = stream; - this.threadNumber = threadNumber; - this.disruptor = disruptor; + public KafkaAlarmStateTransitionConsumerRunnableBasic(@Assisted AlarmStateTransitionPipeline pipeline, + @Assisted KafkaChannel kafkaChannel, @Assisted int threadNumber) { + super(kafkaChannel, pipeline, threadNumber); this.objectMapper = new ObjectMapper(); objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY); objectMapper.enable(DeserializationFeature.UNWRAP_ROOT_VALUE); } - public void run() { - ConsumerIterator it = stream.iterator(); - while (it.hasNext()) { + @Override + protected void publishHeartbeat() { + publishEvent(null); + } - final String s = new String(it.next().message()); + @Override + protected void handleMessage(String message) { + try { + final AlarmStateTransitionedEvent event = + objectMapper.readValue(message, AlarmStateTransitionedEvent.class); - logger.debug("Thread " + threadNumber + ": " + s); + logger.debug(event.toString()); - try { - final AlarmStateTransitionedEvent event = - objectMapper.readValue(s, AlarmStateTransitionedEvent.class); - - logger.debug(event.toString()); - - disruptor.publishEvent(new EventTranslator() { - @Override - public void translateTo(AlarmStateTransitionedEventHolder eventHolder, long sequence) { - eventHolder.setEvent(event); - } - }); - } catch (Exception e) { - logger.error("Failed to deserialize JSON message and place on disruptor queue: " + s, e); - } + publishEvent(event); + } catch (Exception e) { + logger.error("Failed to deserialize JSON message and send to handler: " + message, e); } - logger.debug("Shutting down Thread: " + threadNumber); } } diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaAlarmStateTransitionConsumerRunnableBasicFactory.java b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaAlarmStateTransitionConsumerRunnableBasicFactory.java index 91b60fbc..eebb2243 100644 --- a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaAlarmStateTransitionConsumerRunnableBasicFactory.java +++ b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaAlarmStateTransitionConsumerRunnableBasicFactory.java @@ -17,9 +17,9 @@ package com.hpcloud.mon.persister.consumer; -import kafka.consumer.KafkaStream; +import com.hpcloud.mon.persister.pipeline.AlarmStateTransitionPipeline; public interface KafkaAlarmStateTransitionConsumerRunnableBasicFactory { - KafkaAlarmStateTransitionConsumerRunnableBasic create(KafkaStream stream, + KafkaAlarmStateTransitionConsumerRunnableBasic create(AlarmStateTransitionPipeline pipeline, KafkaChannel kafkaChannel, int threadNumber); } diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaChannel.java b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaChannel.java new file mode 100644 index 00000000..71618c27 --- /dev/null +++ b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaChannel.java @@ -0,0 +1,122 @@ +/* + * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hpcloud.mon.persister.consumer; + +import com.hpcloud.mon.persister.configuration.KafkaConfiguration; +import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; +import com.hpcloud.mon.persister.configuration.PipelineConfiguration; + +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + +import kafka.consumer.Consumer; +import kafka.consumer.ConsumerConfig; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +public class KafkaChannel { + private static final String KAFKA_CONFIGURATION = "Kafka configuration:"; + private static final Logger logger = LoggerFactory.getLogger(KafkaChannel.class); + + private final String topic; + private final ConsumerConnector consumerConnector; + private final int threadNum; + + @Inject + public KafkaChannel(@Assisted MonPersisterConfiguration configuration, + @Assisted PipelineConfiguration pipelineConfiguration, @Assisted int threadNum) { + this.topic = pipelineConfiguration.getTopic(); + this.threadNum = threadNum; + Properties kafkaProperties = + createKafkaProperties(configuration.getKafkaConfiguration(), pipelineConfiguration); + consumerConnector = Consumer.createJavaConsumerConnector(createConsumerConfig(kafkaProperties)); + } + + public final void markRead() { + this.consumerConnector.commitOffsets(); + } + + public KafkaStream getKafkaStream() { + final Map topicCountMap = new HashMap<>(); + topicCountMap.put(this.topic, 1); + Map>> streamMap = + this.consumerConnector.createMessageStreams(topicCountMap); + List> streams = streamMap.values().iterator().next(); + if (streams.size() != 1) { + throw new IllegalStateException(String.format( + "Expected only one stream but instead there are %d", streams.size())); + } + return streams.get(0); + } + + public void stop() { + this.consumerConnector.shutdown(); + } + + private ConsumerConfig createConsumerConfig(Properties kafkaProperties) { + return new ConsumerConfig(kafkaProperties); + } + + private Properties createKafkaProperties(KafkaConfiguration kafkaConfiguration, + final PipelineConfiguration pipelineConfiguration) { + Properties properties = new Properties(); + + properties.put("group.id", pipelineConfiguration.getGroupId()); + properties.put("zookeeper.connect", kafkaConfiguration.getZookeeperConnect()); + properties.put("consumer.id", + String.format("%s_%d", pipelineConfiguration.getConsumerId(), this.threadNum)); + properties.put("socket.timeout.ms", kafkaConfiguration.getSocketTimeoutMs().toString()); + properties.put("socket.receive.buffer.bytes", kafkaConfiguration.getSocketReceiveBufferBytes() + .toString()); + properties.put("fetch.message.max.bytes", kafkaConfiguration.getFetchMessageMaxBytes() + .toString()); + // Set auto commit to false because the persister is going to explicitly commit + properties.put("auto.commit.enable", "false"); + properties.put("queued.max.message.chunks", kafkaConfiguration.getQueuedMaxMessageChunks() + .toString()); + properties.put("rebalance.max.retries", kafkaConfiguration.getRebalanceMaxRetries().toString()); + properties.put("fetch.min.bytes", kafkaConfiguration.getFetchMinBytes().toString()); + properties.put("fetch.wait.max.ms", kafkaConfiguration.getFetchWaitMaxMs().toString()); + properties.put("rebalance.backoff.ms", kafkaConfiguration.getRebalanceBackoffMs().toString()); + properties.put("refresh.leader.backoff.ms", kafkaConfiguration.getRefreshLeaderBackoffMs() + .toString()); + properties.put("auto.offset.reset", kafkaConfiguration.getAutoOffsetReset()); + properties.put("consumer.timeout.ms", kafkaConfiguration.getConsumerTimeoutMs().toString()); + properties.put("client.id", String.format("%s_%d", pipelineConfiguration.getClientId(), threadNum)); + properties.put("zookeeper.session.timeout.ms", kafkaConfiguration + .getZookeeperSessionTimeoutMs().toString()); + properties.put("zookeeper.connection.timeout.ms", kafkaConfiguration + .getZookeeperConnectionTimeoutMs().toString()); + properties + .put("zookeeper.sync.time.ms", kafkaConfiguration.getZookeeperSyncTimeMs().toString()); + + for (String key : properties.stringPropertyNames()) { + logger.info(KAFKA_CONFIGURATION + " " + key + " = " + properties.getProperty(key)); + } + + return properties; + } +} diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaStreamsProvider.java b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaChannelFactory.java similarity index 64% rename from src/main/java/com/hpcloud/mon/persister/consumer/KafkaStreamsProvider.java rename to src/main/java/com/hpcloud/mon/persister/consumer/KafkaChannelFactory.java index fdbab298..35c568da 100644 --- a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaStreamsProvider.java +++ b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaChannelFactory.java @@ -18,21 +18,9 @@ package com.hpcloud.mon.persister.consumer; import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; +import com.hpcloud.mon.persister.configuration.PipelineConfiguration; -import javax.inject.Inject; -import javax.inject.Provider; - -public class KafkaStreamsProvider implements Provider { - - private final MonPersisterConfiguration configuration; - - @Inject - public KafkaStreamsProvider(MonPersisterConfiguration configuration) { - this.configuration = configuration; - } - - @Override - public KafkaStreams get() { - return new KafkaStreams(configuration); - } -} \ No newline at end of file +public interface KafkaChannelFactory { + KafkaChannel create(MonPersisterConfiguration configuration, + PipelineConfiguration pipelineConfiguration, int threadNum); +} diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaConsumer.java b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaConsumer.java index 5a044c32..d085f5a6 100644 --- a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaConsumer.java +++ b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaConsumer.java @@ -14,70 +14,53 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.hpcloud.mon.persister.consumer; -import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; - -import com.google.inject.Inject; - -import kafka.consumer.KafkaStream; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -public abstract class KafkaConsumer { +public abstract class KafkaConsumer { - private static final String KAFKA_CONFIGURATION = "Kafka configuration:"; - private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); + private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); - private static final int WAIT_TIME = 10; + private static final int WAIT_TIME = 10; - protected final MonPersisterConfiguration configuration; + private ExecutorService executorService; + private final KafkaChannel kafkaChannel; + private final int threadNum; + private KafkaConsumerRunnableBasic kafkaConsumerRunnableBasic; - private final Integer numThreads; - private ExecutorService executorService; - @Inject - private KafkaStreams kafkaStreams; + public KafkaConsumer(KafkaChannel kafkaChannel, int threadNum) { + this.kafkaChannel = kafkaChannel; + this.threadNum = threadNum; + } - protected abstract Runnable createRunnable(KafkaStream stream, int threadNumber); - protected abstract String getStreamName(); + protected abstract KafkaConsumerRunnableBasic createRunnable(KafkaChannel kafkaChannel, + int threadNumber); - @Inject - public KafkaConsumer(MonPersisterConfiguration configuration) { + public void start() { + executorService = Executors.newFixedThreadPool(1); + KafkaConsumerRunnableBasic kafkaConsumerRunnableBasic = + createRunnable(kafkaChannel, this.threadNum); + executorService.submit(kafkaConsumerRunnableBasic); + } - this.configuration = configuration; - - this.numThreads = configuration.getKafkaConfiguration().getNumThreads(); - logger.info(KAFKA_CONFIGURATION + " numThreads = " + numThreads); - } - - public void run() { - List> streams = kafkaStreams.getStreams().get(getStreamName()); - executorService = Executors.newFixedThreadPool(numThreads); - - int threadNumber = 0; - for (final KafkaStream stream : streams) { - executorService.submit(createRunnable(stream, threadNumber)); - threadNumber++; - } - } - - public void stop() { - kafkaStreams.stop(); - if (executorService != null) { - executorService.shutdown(); - try { - if (!executorService.awaitTermination(WAIT_TIME, TimeUnit.SECONDS)) { - logger.warn("Did not shut down in %d seconds", WAIT_TIME); - } - } catch (InterruptedException e) { - logger.info("awaitTerminiation interrupted", e); - } + public void stop() { + kafkaConsumerRunnableBasic.stop(); + if (executorService != null) { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(WAIT_TIME, TimeUnit.SECONDS)) { + logger.warn("Did not shut down in {} seconds", WAIT_TIME); } + } catch (InterruptedException e) { + logger.info("awaitTerminiation interrupted", e); + } } + } } diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaConsumerRunnableBasic.java b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaConsumerRunnableBasic.java new file mode 100644 index 00000000..aa14655e --- /dev/null +++ b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaConsumerRunnableBasic.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hpcloud.mon.persister.consumer; + +import com.hpcloud.mon.persister.pipeline.ManagedPipeline; + +import kafka.consumer.ConsumerIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class KafkaConsumerRunnableBasic implements Runnable { + + private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerRunnableBasic.class); + private final KafkaChannel kafkaChannel; + private final int threadNumber; + private final ManagedPipeline pipeline; + private volatile boolean stop = false; + + public KafkaConsumerRunnableBasic(KafkaChannel kafkaChannel, + ManagedPipeline pipeline, + int threadNumber) { + this.kafkaChannel = kafkaChannel; + this.pipeline = pipeline; + this.threadNumber = threadNumber; + } + + abstract protected void publishHeartbeat(); + + abstract protected void handleMessage(String message); + + protected void markRead() { + this.kafkaChannel.markRead(); + } + + public void stop() { + this.stop = true; + } + + public void run() { + final ConsumerIterator it = kafkaChannel.getKafkaStream().iterator(); + logger.debug("KafkaChannel {} has stream", this.threadNumber); + while (!this.stop) { + try { + if (it.hasNext()) { + final String s = new String(it.next().message()); + + logger.debug("Thread {}: {}", threadNumber, s); + + handleMessage(s); + } + } catch (kafka.consumer.ConsumerTimeoutException cte) { + publishHeartbeat(); + continue; + } + } + logger.debug("Shutting down Thread: {}", threadNumber); + this.kafkaChannel.stop(); + } + + protected void publishEvent(final T event) { + if (pipeline.publishEvent(event)) { + markRead(); + } + } +} diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaMetricsConsumer.java b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaMetricsConsumer.java index ac1834ca..7f88594f 100644 --- a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaMetricsConsumer.java +++ b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaMetricsConsumer.java @@ -17,29 +17,29 @@ package com.hpcloud.mon.persister.consumer; -import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; +import com.hpcloud.mon.common.model.metric.MetricEnvelope; +import com.hpcloud.mon.persister.pipeline.MetricPipeline; import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; -import kafka.consumer.KafkaStream; - -public class KafkaMetricsConsumer extends KafkaConsumer { +public class KafkaMetricsConsumer extends KafkaConsumer { @Inject private KafkaMetricsConsumerRunnableBasicFactory factory; + private final MetricPipeline pipeline; + @Inject - public KafkaMetricsConsumer(MonPersisterConfiguration configuration) { - super(configuration); + public KafkaMetricsConsumer(@Assisted KafkaChannel kafkaChannel, @Assisted int threadNum, + @Assisted MetricPipeline pipeline) { + super(kafkaChannel, threadNum); + this.pipeline = pipeline; } @Override - protected Runnable createRunnable(KafkaStream stream, int threadNumber) { - return factory.create(stream, threadNumber); - } - - @Override - protected String getStreamName() { - return this.configuration.getMetricConfiguration().getTopic(); + protected KafkaConsumerRunnableBasic createRunnable(KafkaChannel kafkaChannel, + int threadNumber) { + return factory.create(pipeline, kafkaChannel, threadNumber); } } diff --git a/src/main/java/com/hpcloud/mon/persister/configuration/AlarmHistoryConfiguration.java b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaMetricsConsumerFactory.java similarity index 70% rename from src/main/java/com/hpcloud/mon/persister/configuration/AlarmHistoryConfiguration.java rename to src/main/java/com/hpcloud/mon/persister/consumer/KafkaMetricsConsumerFactory.java index 41786e08..630ec21d 100644 --- a/src/main/java/com/hpcloud/mon/persister/configuration/AlarmHistoryConfiguration.java +++ b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaMetricsConsumerFactory.java @@ -15,16 +15,11 @@ * limitations under the License. */ -package com.hpcloud.mon.persister.configuration; +package com.hpcloud.mon.persister.consumer; -import com.fasterxml.jackson.annotation.JsonProperty; +import com.hpcloud.mon.persister.pipeline.MetricPipeline; -public class AlarmHistoryConfiguration { - - @JsonProperty - String topic; - - public String getTopic() { - return topic; - } +public interface KafkaMetricsConsumerFactory { + public KafkaMetricsConsumer create(KafkaChannel kafkaChannel, int threadNum, + MetricPipeline pipeline); } diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaMetricsConsumerRunnableBasic.java b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaMetricsConsumerRunnableBasic.java index 7a783a9f..960fcc11 100644 --- a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaMetricsConsumerRunnableBasic.java +++ b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaMetricsConsumerRunnableBasic.java @@ -18,68 +18,49 @@ package com.hpcloud.mon.persister.consumer; import com.hpcloud.mon.common.model.metric.MetricEnvelope; -import com.hpcloud.mon.persister.disruptor.MetricDisruptor; -import com.hpcloud.mon.persister.disruptor.event.MetricHolder; +import com.hpcloud.mon.persister.pipeline.MetricPipeline; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; -import com.lmax.disruptor.EventTranslator; - -import kafka.consumer.ConsumerIterator; -import kafka.consumer.KafkaStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class KafkaMetricsConsumerRunnableBasic implements Runnable { +public class KafkaMetricsConsumerRunnableBasic extends KafkaConsumerRunnableBasic { private static final Logger logger = LoggerFactory .getLogger(KafkaMetricsConsumerRunnableBasic.class); - private final KafkaStream stream; - private final int threadNumber; - private final MetricDisruptor disruptor; private final ObjectMapper objectMapper; @Inject - public KafkaMetricsConsumerRunnableBasic(MetricDisruptor disruptor, - @Assisted KafkaStream stream, @Assisted int threadNumber) { - this.stream = stream; - this.threadNumber = threadNumber; - this.disruptor = disruptor; + public KafkaMetricsConsumerRunnableBasic(@Assisted MetricPipeline pipeline, + @Assisted KafkaChannel kafkaChannel, @Assisted int threadNumber) { + super(kafkaChannel, pipeline, threadNumber); this.objectMapper = new ObjectMapper(); objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY); } - public void run() { - ConsumerIterator it = stream.iterator(); - while (it.hasNext()) { + @Override + protected void publishHeartbeat() { + publishEvent(null); + } - final String s = new String(it.next().message()); + @Override + protected void handleMessage(String message) { + try { + final MetricEnvelope[] envelopes = objectMapper.readValue(message, MetricEnvelope[].class); - logger.debug("Thread {}: {}", threadNumber, s); - - try { - final MetricEnvelope[] envelopes = objectMapper.readValue(s, MetricEnvelope[].class); - - for (final MetricEnvelope envelope : envelopes) { - - logger.debug("{}", envelope); - - disruptor.publishEvent(new EventTranslator() { - @Override - public void translateTo(MetricHolder event, long sequence) { - event.setEnvelope(envelope); - } - - }); - } - } catch (Exception e) { - logger.error("Failed to deserialize JSON message and place on disruptor queue: {}", e); + for (final MetricEnvelope envelope : envelopes) { + logger.debug("{}", envelope); } + + publishEvent(envelopes); + } catch (Exception e) { + logger.error("Failed to deserialize JSON message and place on pipeline queue: " + message, + e); } - logger.debug("Shutting down Thread: {}", threadNumber); } } diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaMetricsConsumerRunnableBasicFactory.java b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaMetricsConsumerRunnableBasicFactory.java index 72dec0ae..83310f93 100644 --- a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaMetricsConsumerRunnableBasicFactory.java +++ b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaMetricsConsumerRunnableBasicFactory.java @@ -17,8 +17,8 @@ package com.hpcloud.mon.persister.consumer; -import kafka.consumer.KafkaStream; +import com.hpcloud.mon.persister.pipeline.MetricPipeline; public interface KafkaMetricsConsumerRunnableBasicFactory { - KafkaMetricsConsumerRunnableBasic create(KafkaStream stream, int threadNumber); + KafkaMetricsConsumerRunnableBasic create(MetricPipeline pipeline, KafkaChannel kafkaChannel, int threadNumber); } diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaStreams.java b/src/main/java/com/hpcloud/mon/persister/consumer/KafkaStreams.java deleted file mode 100644 index ce9c04d5..00000000 --- a/src/main/java/com/hpcloud/mon/persister/consumer/KafkaStreams.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hpcloud.mon.persister.consumer; - -import com.hpcloud.mon.persister.configuration.KafkaConfiguration; -import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; - -import kafka.consumer.Consumer; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -public class KafkaStreams { - private static final String KAFKA_CONFIGURATION = "Kafka configuration:"; - private static final Logger logger = LoggerFactory.getLogger(KafkaStreams.class); - - private final MonPersisterConfiguration configuration; - - private final ConsumerConnector consumerConnector; - private final Map>> consumerMap; - - public KafkaStreams(MonPersisterConfiguration configuration) { - this.configuration = configuration; - Properties kafkaProperties = createKafkaProperties(configuration.getKafkaConfiguration()); - ConsumerConfig consumerConfig = createConsumerConfig(kafkaProperties); - consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); - Map topicCountMap = new HashMap<>(); - Integer numThreads = configuration.getKafkaConfiguration().getNumThreads(); - topicCountMap.put(this.configuration.getMetricConfiguration().getTopic(), (int) numThreads); - topicCountMap.put(this.configuration.getAlarmHistoryConfiguration().getTopic(), - (int) numThreads); - consumerMap = consumerConnector.createMessageStreams(topicCountMap); - } - - public final Map>> getStreams() { - return consumerMap; - } - - private ConsumerConfig createConsumerConfig(Properties kafkaProperties) { - return new ConsumerConfig(kafkaProperties); - } - - private Properties createKafkaProperties(KafkaConfiguration metricsKafkaConfiguration) { - Properties properties = new Properties(); - - properties.put("group.id", metricsKafkaConfiguration.getGroupId()); - properties.put("zookeeper.connect", metricsKafkaConfiguration.getZookeeperConnect()); - properties.put("consumer.id", metricsKafkaConfiguration.getConsumerId()); - properties.put("socket.timeout.ms", metricsKafkaConfiguration.getSocketTimeoutMs().toString()); - properties.put("socket.receive.buffer.bytes", metricsKafkaConfiguration - .getSocketReceiveBufferBytes().toString()); - properties.put("fetch.message.max.bytes", metricsKafkaConfiguration.getFetchMessageMaxBytes() - .toString()); - properties - .put("auto.commit.enable", metricsKafkaConfiguration.getAutoCommitEnable().toString()); - properties.put("auto.commit.interval.ms", metricsKafkaConfiguration.getAutoCommitIntervalMs() - .toString()); - properties.put("queued.max.message.chunks", metricsKafkaConfiguration - .getQueuedMaxMessageChunks().toString()); - properties.put("rebalance.max.retries", metricsKafkaConfiguration.getRebalanceMaxRetries() - .toString()); - properties.put("fetch.min.bytes", metricsKafkaConfiguration.getFetchMinBytes().toString()); - properties.put("fetch.wait.max.ms", metricsKafkaConfiguration.getFetchWaitMaxMs().toString()); - properties.put("rebalance.backoff.ms", metricsKafkaConfiguration.getRebalanceBackoffMs() - .toString()); - properties.put("refresh.leader.backoff.ms", metricsKafkaConfiguration - .getRefreshLeaderBackoffMs().toString()); - properties.put("auto.offset.reset", metricsKafkaConfiguration.getAutoOffsetReset()); - properties.put("consumer.timeout.ms", metricsKafkaConfiguration.getConsumerTimeoutMs() - .toString()); - properties.put("client.id", metricsKafkaConfiguration.getClientId()); - properties.put("zookeeper.session.timeout.ms", metricsKafkaConfiguration - .getZookeeperSessionTimeoutMs().toString()); - properties.put("zookeeper.connection.timeout.ms", metricsKafkaConfiguration - .getZookeeperConnectionTimeoutMs().toString()); - properties.put("zookeeper.sync.time.ms", metricsKafkaConfiguration.getZookeeperSyncTimeMs() - .toString()); - - for (String key : properties.stringPropertyNames()) { - logger.info(KAFKA_CONFIGURATION + " " + key + " = " + properties.getProperty(key)); - } - - return properties; - } - - public void stop() { - consumerConnector.shutdown(); - } -} diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/MetricsConsumer.java b/src/main/java/com/hpcloud/mon/persister/consumer/MetricsConsumer.java index 74fd81e9..858e1252 100644 --- a/src/main/java/com/hpcloud/mon/persister/consumer/MetricsConsumer.java +++ b/src/main/java/com/hpcloud/mon/persister/consumer/MetricsConsumer.java @@ -17,15 +17,16 @@ package com.hpcloud.mon.persister.consumer; -import com.hpcloud.mon.persister.disruptor.MetricDisruptor; -import com.hpcloud.mon.persister.disruptor.event.MetricHolder; +import com.hpcloud.mon.common.model.metric.MetricEnvelope; +import com.hpcloud.mon.persister.pipeline.MetricPipeline; import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; -public class MetricsConsumer extends Consumer { +public class MetricsConsumer extends Consumer { @Inject - public MetricsConsumer(KafkaMetricsConsumer kafkaConsumer, MetricDisruptor disruptor) { - super(kafkaConsumer, disruptor); + public MetricsConsumer(@Assisted KafkaMetricsConsumer kafkaConsumer, @Assisted MetricPipeline pipeline) { + super(kafkaConsumer, pipeline); } } diff --git a/src/main/java/com/hpcloud/mon/persister/consumer/MetricsConsumerFactory.java b/src/main/java/com/hpcloud/mon/persister/consumer/MetricsConsumerFactory.java new file mode 100644 index 00000000..c8899cf0 --- /dev/null +++ b/src/main/java/com/hpcloud/mon/persister/consumer/MetricsConsumerFactory.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package com.hpcloud.mon.persister.consumer; + +import com.hpcloud.mon.persister.pipeline.MetricPipeline; + +public interface MetricsConsumerFactory { + MetricsConsumer create(KafkaMetricsConsumer kafkaConsumer, MetricPipeline pipeline); +} diff --git a/src/main/java/com/hpcloud/mon/persister/disruptor/AlarmHistoryDisruptorProvider.java b/src/main/java/com/hpcloud/mon/persister/disruptor/AlarmHistoryDisruptorProvider.java deleted file mode 100644 index cbe2d010..00000000 --- a/src/main/java/com/hpcloud/mon/persister/disruptor/AlarmHistoryDisruptorProvider.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hpcloud.mon.persister.disruptor; - -import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; -import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventFactory; -import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHandler; -import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHandlerFactory; - -import com.google.inject.Inject; -import com.google.inject.Provider; -import com.lmax.disruptor.ExceptionHandler; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; - -public class AlarmHistoryDisruptorProvider implements Provider { - - private static final Logger logger = LoggerFactory.getLogger(AlarmHistoryDisruptorProvider.class); - - private final MonPersisterConfiguration configuration; - private final AlarmStateTransitionedEventHandlerFactory eventHandlerFactory; - private final ExceptionHandler exceptionHandler; - private final AlarmStateHistoryDisruptor instance; - - @Inject - public AlarmHistoryDisruptorProvider(MonPersisterConfiguration configuration, - AlarmStateTransitionedEventHandlerFactory eventHandlerFactory, - ExceptionHandler exceptionHandler) { - this.configuration = configuration; - this.eventHandlerFactory = eventHandlerFactory; - this.exceptionHandler = exceptionHandler; - this.instance = createInstance(); - } - - private AlarmStateHistoryDisruptor createInstance() { - - logger.debug("Creating disruptor..."); - - Executor executor = Executors.newCachedThreadPool(); - AlarmStateTransitionedEventFactory eventFactory = new AlarmStateTransitionedEventFactory(); - - int bufferSize = configuration.getDisruptorConfiguration().getBufferSize(); - logger.debug("Buffer size for instance of disruptor [" + bufferSize + "]"); - - AlarmStateHistoryDisruptor disruptor = - new AlarmStateHistoryDisruptor(eventFactory, bufferSize, executor); - disruptor.handleExceptionsWith(exceptionHandler); - - int batchSize = configuration.getOutputProcessorConfiguration().getBatchSize(); - logger.debug("Batch size for each output processor [" + batchSize + "]"); - - int numOutputProcessors = configuration.getDisruptorConfiguration().getNumProcessors(); - logger.debug("Number of output processors [" + numOutputProcessors + "]"); - - AlarmStateTransitionedEventHandler[] eventHandlers = - new AlarmStateTransitionedEventHandler[numOutputProcessors]; - - for (int i = 0; i < numOutputProcessors; ++i) { - eventHandlers[i] = eventHandlerFactory.create(i, numOutputProcessors, batchSize); - } - - disruptor.handleEventsWith(eventHandlers); - disruptor.setHandlers(eventHandlers); - disruptor.start(); - - logger.debug("Instance of disruptor successfully started"); - logger.debug("Instance of disruptor fully created"); - - return disruptor; - } - - public AlarmStateHistoryDisruptor get() { - return instance; - } -} diff --git a/src/main/java/com/hpcloud/mon/persister/disruptor/AlarmStateHistoryDisruptor.java b/src/main/java/com/hpcloud/mon/persister/disruptor/AlarmStateHistoryDisruptor.java deleted file mode 100644 index c49e7596..00000000 --- a/src/main/java/com/hpcloud/mon/persister/disruptor/AlarmStateHistoryDisruptor.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hpcloud.mon.persister.disruptor; - -import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHolder; - -import com.lmax.disruptor.EventFactory; -import com.lmax.disruptor.WaitStrategy; -import com.lmax.disruptor.dsl.ProducerType; - -import java.util.concurrent.Executor; - -public class AlarmStateHistoryDisruptor extends ManagedDisruptor { - public AlarmStateHistoryDisruptor(EventFactory eventFactory, - int ringBufferSize, Executor executor) { - super(eventFactory, ringBufferSize, executor); - } - - public AlarmStateHistoryDisruptor( - final EventFactory eventFactory, int ringBufferSize, - Executor executor, ProducerType producerType, WaitStrategy waitStrategy) { - super(eventFactory, ringBufferSize, executor, producerType, waitStrategy); - } -} diff --git a/src/main/java/com/hpcloud/mon/persister/disruptor/DisruptorExceptionHandler.java b/src/main/java/com/hpcloud/mon/persister/disruptor/DisruptorExceptionHandler.java deleted file mode 100644 index 01228c0d..00000000 --- a/src/main/java/com/hpcloud/mon/persister/disruptor/DisruptorExceptionHandler.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hpcloud.mon.persister.disruptor; - -import com.lmax.disruptor.ExceptionHandler; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DisruptorExceptionHandler implements ExceptionHandler { - - private static final Logger logger = LoggerFactory.getLogger(DisruptorExceptionHandler.class); - - @Override - public void handleEventException(Throwable ex, long sequence, Object event) { - - logger.error("Disruptor encountered an exception during normal operation", ex); - throw new RuntimeException(ex); - } - - @Override - public void handleOnStartException(Throwable ex) { - - logger.error("Disruptor encountered an exception during startup", ex); - throw new RuntimeException(ex); - } - - @Override - public void handleOnShutdownException(Throwable ex) { - - logger.error("Disruptor encountered an exception during shutdown", ex); - throw new RuntimeException(ex); - } -} diff --git a/src/main/java/com/hpcloud/mon/persister/disruptor/ManagedDisruptor.java b/src/main/java/com/hpcloud/mon/persister/disruptor/ManagedDisruptor.java deleted file mode 100644 index 8d4e9ff2..00000000 --- a/src/main/java/com/hpcloud/mon/persister/disruptor/ManagedDisruptor.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hpcloud.mon.persister.disruptor; - -import com.hpcloud.mon.persister.disruptor.event.FlushableHandler; - -import com.lmax.disruptor.EventFactory; -import com.lmax.disruptor.WaitStrategy; -import com.lmax.disruptor.dsl.Disruptor; -import com.lmax.disruptor.dsl.ProducerType; - -import java.util.concurrent.Executor; - -public class ManagedDisruptor extends Disruptor { - private FlushableHandler[] handlers = new FlushableHandler[0]; - - public ManagedDisruptor(EventFactory eventFactory, int ringBufferSize, Executor executor) { - super(eventFactory, ringBufferSize, executor); - } - - public ManagedDisruptor(final EventFactory eventFactory, int ringBufferSize, - Executor executor, ProducerType producerType, WaitStrategy waitStrategy) { - super(eventFactory, ringBufferSize, executor, producerType, waitStrategy); - } - - @Override - public void shutdown() { - for (FlushableHandler handler : handlers) { - handler.flush(); - } - super.shutdown(); - } - - public void setHandlers(FlushableHandler[] handlers) { - this.handlers = handlers; - } -} diff --git a/src/main/java/com/hpcloud/mon/persister/disruptor/MetricDisruptor.java b/src/main/java/com/hpcloud/mon/persister/disruptor/MetricDisruptor.java deleted file mode 100644 index ef028da1..00000000 --- a/src/main/java/com/hpcloud/mon/persister/disruptor/MetricDisruptor.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hpcloud.mon.persister.disruptor; - -import com.hpcloud.mon.persister.disruptor.event.MetricHolder; - -import com.lmax.disruptor.EventFactory; -import com.lmax.disruptor.WaitStrategy; -import com.lmax.disruptor.dsl.ProducerType; - -import java.util.concurrent.Executor; - -public class MetricDisruptor extends ManagedDisruptor { - - public MetricDisruptor(EventFactory eventFactory, int ringBufferSize, - Executor executor) { - super(eventFactory, ringBufferSize, executor); - } - - public MetricDisruptor(final EventFactory eventFactory, int ringBufferSize, - Executor executor, ProducerType producerType, WaitStrategy waitStrategy) { - super(eventFactory, ringBufferSize, executor, producerType, waitStrategy); - } -} diff --git a/src/main/java/com/hpcloud/mon/persister/disruptor/MetricDisruptorProvider.java b/src/main/java/com/hpcloud/mon/persister/disruptor/MetricDisruptorProvider.java deleted file mode 100644 index 1d0c3098..00000000 --- a/src/main/java/com/hpcloud/mon/persister/disruptor/MetricDisruptorProvider.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hpcloud.mon.persister.disruptor; - -import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; -import com.hpcloud.mon.persister.disruptor.event.MetricFactory; -import com.hpcloud.mon.persister.disruptor.event.MetricHandler; -import com.hpcloud.mon.persister.disruptor.event.MetricHandlerFactory; - -import com.google.inject.Inject; -import com.google.inject.Provider; -import com.lmax.disruptor.ExceptionHandler; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; - -public class MetricDisruptorProvider implements Provider { - - private static final Logger logger = LoggerFactory.getLogger(MetricDisruptorProvider.class); - - private final MonPersisterConfiguration configuration; - private final MetricHandlerFactory eventHandlerFactory; - private final ExceptionHandler exceptionHandler; - private final MetricDisruptor instance; - - @Inject - public MetricDisruptorProvider(MonPersisterConfiguration configuration, - MetricHandlerFactory eventHandlerFactory, ExceptionHandler exceptionHandler) { - - this.configuration = configuration; - this.eventHandlerFactory = eventHandlerFactory; - this.exceptionHandler = exceptionHandler; - this.instance = createInstance(); - } - - private MetricDisruptor createInstance() { - - logger.debug("Creating disruptor..."); - - Executor executor = Executors.newCachedThreadPool(); - MetricFactory eventFactory = new MetricFactory(); - - int bufferSize = configuration.getDisruptorConfiguration().getBufferSize(); - logger.debug("Buffer size for instance of disruptor [" + bufferSize + "]"); - - MetricDisruptor disruptor = new MetricDisruptor(eventFactory, bufferSize, executor); - disruptor.handleExceptionsWith(exceptionHandler); - - int batchSize = configuration.getOutputProcessorConfiguration().getBatchSize(); - logger.debug("Batch size for each output processor [" + batchSize + "]"); - - int numOutputProcessors = configuration.getDisruptorConfiguration().getNumProcessors(); - logger.debug("Number of output processors [" + numOutputProcessors + "]"); - - MetricHandler[] metricHandlers = new MetricHandler[numOutputProcessors]; - - for (int i = 0; i < numOutputProcessors; ++i) { - metricHandlers[i] = eventHandlerFactory.create(i, numOutputProcessors, batchSize); - } - - disruptor.handleEventsWith(metricHandlers); - disruptor.setHandlers(metricHandlers); - disruptor.start(); - - logger.debug("Instance of disruptor successfully started"); - logger.debug("Instance of disruptor fully created"); - - return disruptor; - } - - public MetricDisruptor get() { - return instance; - } -} diff --git a/src/main/java/com/hpcloud/mon/persister/disruptor/event/AlarmStateTransitionedEventFactory.java b/src/main/java/com/hpcloud/mon/persister/disruptor/event/AlarmStateTransitionedEventFactory.java deleted file mode 100644 index 809c8849..00000000 --- a/src/main/java/com/hpcloud/mon/persister/disruptor/event/AlarmStateTransitionedEventFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hpcloud.mon.persister.disruptor.event; - -import com.lmax.disruptor.EventFactory; - -public class AlarmStateTransitionedEventFactory implements - EventFactory { - - @Override - public AlarmStateTransitionedEventHolder newInstance() { - return new AlarmStateTransitionedEventHolder(); - } -} diff --git a/src/main/java/com/hpcloud/mon/persister/disruptor/event/AlarmStateTransitionedEventHandler.java b/src/main/java/com/hpcloud/mon/persister/disruptor/event/AlarmStateTransitionedEventHandler.java deleted file mode 100644 index 2dd7376b..00000000 --- a/src/main/java/com/hpcloud/mon/persister/disruptor/event/AlarmStateTransitionedEventHandler.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hpcloud.mon.persister.disruptor.event; - -import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent; -import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; -import com.hpcloud.mon.persister.repository.AlarmRepository; - -import com.codahale.metrics.Meter; -import com.codahale.metrics.Timer; -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; -import com.lmax.disruptor.EventHandler; - -import io.dropwizard.setup.Environment; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AlarmStateTransitionedEventHandler implements - EventHandler, FlushableHandler { - - private static final Logger logger = LoggerFactory - .getLogger(AlarmStateTransitionedEventHandler.class); - private final int ordinal; - private final int numProcessors; - private final int batchSize; - - private long millisSinceLastFlush = System.currentTimeMillis(); - private final long millisBetweenFlushes; - private final int secondsBetweenFlushes; - - private final AlarmRepository repository; - private final Environment environment; - - private final Meter processedMeter; - private final Meter commitMeter; - private final Timer commitTimer; - - @Inject - public AlarmStateTransitionedEventHandler(AlarmRepository repository, - MonPersisterConfiguration configuration, Environment environment, - @Assisted("ordinal") int ordinal, @Assisted("numProcessors") int numProcessors, - @Assisted("batchSize") int batchSize) { - - this.repository = repository; - this.environment = environment; - this.processedMeter = - this.environment.metrics().meter( - this.getClass().getName() + "." + "alarm-messages-processed-processedMeter"); - this.commitMeter = - this.environment.metrics().meter( - this.getClass().getName() + "." + "commits-executed-processedMeter"); - this.commitTimer = - this.environment.metrics().timer( - this.getClass().getName() + "." + "total-commit-and-flush-timer"); - - this.secondsBetweenFlushes = - configuration.getMonDeDuperConfiguration().getDedupeRunFrequencySeconds(); - this.millisBetweenFlushes = secondsBetweenFlushes * 1000; - - this.ordinal = ordinal; - this.numProcessors = numProcessors; - this.batchSize = batchSize; - } - - @Override - public void onEvent(AlarmStateTransitionedEventHolder eventHolder, long sequence, boolean b) - throws Exception { - - if (eventHolder.getEvent() == null) { - logger.debug("Received heartbeat message. Checking last flush time."); - if (millisSinceLastFlush + millisBetweenFlushes < System.currentTimeMillis()) { - logger.debug("It's been more than " + secondsBetweenFlushes - + " seconds since last flush. Flushing staging tables now..."); - flush(); - } else { - logger.debug("It has not been more than " + secondsBetweenFlushes - + " seconds since last flush. No need to perform flush at this time."); - } - return; - } - - if (((sequence / batchSize) % this.numProcessors) != this.ordinal) { - return; - } - - processedMeter.mark(); - - logger.debug("Sequence number: " + sequence + " Ordinal: " + ordinal + " Event: " - + eventHolder.getEvent()); - - AlarmStateTransitionedEvent event = eventHolder.getEvent(); - repository.addToBatch(event); - - if (sequence % batchSize == (batchSize - 1)) { - Timer.Context context = commitTimer.time(); - flush(); - context.stop(); - commitMeter.mark(); - } - } - - @Override - public void flush() { - repository.flush(); - millisSinceLastFlush = System.currentTimeMillis(); - } -} diff --git a/src/main/java/com/hpcloud/mon/persister/disruptor/event/AlarmStateTransitionedEventHolder.java b/src/main/java/com/hpcloud/mon/persister/pipeline/AlarmStateTransitionPipeline.java similarity index 62% rename from src/main/java/com/hpcloud/mon/persister/disruptor/event/AlarmStateTransitionedEventHolder.java rename to src/main/java/com/hpcloud/mon/persister/pipeline/AlarmStateTransitionPipeline.java index bed54199..aa9c3bd9 100644 --- a/src/main/java/com/hpcloud/mon/persister/disruptor/event/AlarmStateTransitionedEventHolder.java +++ b/src/main/java/com/hpcloud/mon/persister/pipeline/AlarmStateTransitionPipeline.java @@ -15,18 +15,17 @@ * limitations under the License. */ -package com.hpcloud.mon.persister.disruptor.event; +package com.hpcloud.mon.persister.pipeline; import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent; +import com.hpcloud.mon.persister.pipeline.event.AlarmStateTransitionedEventHandler; -public class AlarmStateTransitionedEventHolder { - AlarmStateTransitionedEvent event; +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; - public AlarmStateTransitionedEvent getEvent() { - return event; - } - - public void setEvent(AlarmStateTransitionedEvent event) { - this.event = event; +public class AlarmStateTransitionPipeline extends ManagedPipeline { + @Inject + public AlarmStateTransitionPipeline(@Assisted AlarmStateTransitionedEventHandler handler) { + super(handler); } } diff --git a/src/main/java/com/hpcloud/mon/persister/configuration/MetricConfiguration.java b/src/main/java/com/hpcloud/mon/persister/pipeline/AlarmStateTransitionPipelineFactory.java similarity index 70% rename from src/main/java/com/hpcloud/mon/persister/configuration/MetricConfiguration.java rename to src/main/java/com/hpcloud/mon/persister/pipeline/AlarmStateTransitionPipelineFactory.java index 35b3fdac..ce3ca57c 100644 --- a/src/main/java/com/hpcloud/mon/persister/configuration/MetricConfiguration.java +++ b/src/main/java/com/hpcloud/mon/persister/pipeline/AlarmStateTransitionPipelineFactory.java @@ -15,16 +15,10 @@ * limitations under the License. */ -package com.hpcloud.mon.persister.configuration; +package com.hpcloud.mon.persister.pipeline; -import com.fasterxml.jackson.annotation.JsonProperty; +import com.hpcloud.mon.persister.pipeline.event.AlarmStateTransitionedEventHandler; -public class MetricConfiguration { - - @JsonProperty - String topic; - - public String getTopic() { - return topic; - } +public interface AlarmStateTransitionPipelineFactory { + AlarmStateTransitionPipeline create(AlarmStateTransitionedEventHandler handler); } diff --git a/src/main/java/com/hpcloud/mon/persister/pipeline/ManagedPipeline.java b/src/main/java/com/hpcloud/mon/persister/pipeline/ManagedPipeline.java new file mode 100644 index 00000000..336d0a99 --- /dev/null +++ b/src/main/java/com/hpcloud/mon/persister/pipeline/ManagedPipeline.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hpcloud.mon.persister.pipeline; + +import com.hpcloud.mon.persister.pipeline.event.FlushableHandler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ManagedPipeline { + private static final Logger logger = LoggerFactory.getLogger(ManagedPipeline.class); + + private final FlushableHandler eventHandler; + + public ManagedPipeline(FlushableHandler eventHandler) { + this.eventHandler = eventHandler; + } + + public void shutdown() { + eventHandler.flush(); + } + + public boolean publishEvent(T holder) { + try { + return this.eventHandler.onEvent(holder); + } catch (Exception e) { + logger.error("Failed to handle event", e); + return false; + } + } +} diff --git a/src/main/java/com/hpcloud/mon/persister/disruptor/event/MetricHolder.java b/src/main/java/com/hpcloud/mon/persister/pipeline/MetricPipeline.java similarity index 65% rename from src/main/java/com/hpcloud/mon/persister/disruptor/event/MetricHolder.java rename to src/main/java/com/hpcloud/mon/persister/pipeline/MetricPipeline.java index 0c531103..50b4fbb8 100644 --- a/src/main/java/com/hpcloud/mon/persister/disruptor/event/MetricHolder.java +++ b/src/main/java/com/hpcloud/mon/persister/pipeline/MetricPipeline.java @@ -15,19 +15,18 @@ * limitations under the License. */ -package com.hpcloud.mon.persister.disruptor.event; +package com.hpcloud.mon.persister.pipeline; import com.hpcloud.mon.common.model.metric.MetricEnvelope; +import com.hpcloud.mon.persister.pipeline.event.MetricHandler; -public class MetricHolder { +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; - MetricEnvelope metricEnvelope; +public class MetricPipeline extends ManagedPipeline { - public MetricEnvelope getMetricEnvelope() { - return metricEnvelope; - } - - public void setEnvelope(MetricEnvelope metricEnvelope) { - this.metricEnvelope = metricEnvelope; + @Inject + public MetricPipeline(@Assisted MetricHandler metricHandler) { + super(metricHandler); } } diff --git a/src/main/java/com/hpcloud/mon/persister/disruptor/event/FlushableHandler.java b/src/main/java/com/hpcloud/mon/persister/pipeline/MetricPipelineFactory.java similarity index 75% rename from src/main/java/com/hpcloud/mon/persister/disruptor/event/FlushableHandler.java rename to src/main/java/com/hpcloud/mon/persister/pipeline/MetricPipelineFactory.java index b0fd8628..94df9d21 100644 --- a/src/main/java/com/hpcloud/mon/persister/disruptor/event/FlushableHandler.java +++ b/src/main/java/com/hpcloud/mon/persister/pipeline/MetricPipelineFactory.java @@ -15,8 +15,10 @@ * limitations under the License. */ -package com.hpcloud.mon.persister.disruptor.event; +package com.hpcloud.mon.persister.pipeline; -public interface FlushableHandler { - public void flush(); +import com.hpcloud.mon.persister.pipeline.event.MetricHandler; + +public interface MetricPipelineFactory { + MetricPipeline create(MetricHandler metricHandler); } diff --git a/src/main/java/com/hpcloud/mon/persister/pipeline/event/AlarmStateTransitionedEventHandler.java b/src/main/java/com/hpcloud/mon/persister/pipeline/event/AlarmStateTransitionedEventHandler.java new file mode 100644 index 00000000..21871647 --- /dev/null +++ b/src/main/java/com/hpcloud/mon/persister/pipeline/event/AlarmStateTransitionedEventHandler.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hpcloud.mon.persister.pipeline.event; + +import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent; +import com.hpcloud.mon.persister.configuration.PipelineConfiguration; +import com.hpcloud.mon.persister.repository.AlarmRepository; + +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + +import io.dropwizard.setup.Environment; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AlarmStateTransitionedEventHandler extends + FlushableHandler { + + private static final Logger logger = LoggerFactory + .getLogger(AlarmStateTransitionedEventHandler.class); + + private final AlarmRepository repository; + private final int ordinal; + + @Inject + public AlarmStateTransitionedEventHandler(AlarmRepository repository, + @Assisted PipelineConfiguration configuration, Environment environment, + @Assisted("ordinal") int ordinal, + @Assisted("batchSize") int batchSize) { + super(configuration, environment, ordinal, batchSize, + AlarmStateTransitionedEventHandler.class.getName()); + this.repository = repository; + this.ordinal = ordinal; + } + + @Override + protected int process(AlarmStateTransitionedEvent event) throws Exception { + logger.debug("Ordinal: Event: {}", this.ordinal, event); + + repository.addToBatch(event); + return 1; + } + + @Override + protected void flushRepository() { + repository.flush(); + } +} diff --git a/src/main/java/com/hpcloud/mon/persister/disruptor/event/AlarmStateTransitionedEventHandlerFactory.java b/src/main/java/com/hpcloud/mon/persister/pipeline/event/AlarmStateTransitionedEventHandlerFactory.java similarity index 72% rename from src/main/java/com/hpcloud/mon/persister/disruptor/event/AlarmStateTransitionedEventHandlerFactory.java rename to src/main/java/com/hpcloud/mon/persister/pipeline/event/AlarmStateTransitionedEventHandlerFactory.java index 4e7bfc80..b08f785c 100644 --- a/src/main/java/com/hpcloud/mon/persister/disruptor/event/AlarmStateTransitionedEventHandlerFactory.java +++ b/src/main/java/com/hpcloud/mon/persister/pipeline/event/AlarmStateTransitionedEventHandlerFactory.java @@ -15,11 +15,13 @@ * limitations under the License. */ -package com.hpcloud.mon.persister.disruptor.event; +package com.hpcloud.mon.persister.pipeline.event; + +import com.hpcloud.mon.persister.configuration.PipelineConfiguration; import com.google.inject.assistedinject.Assisted; public interface AlarmStateTransitionedEventHandlerFactory { - AlarmStateTransitionedEventHandler create(@Assisted("ordinal") int ordinal, - @Assisted("numProcessors") int numProcessors, @Assisted("batchSize") int batchSize); + AlarmStateTransitionedEventHandler create(PipelineConfiguration configuration, + @Assisted("ordinal") int ordinal, @Assisted("batchSize") int batchSize); } diff --git a/src/main/java/com/hpcloud/mon/persister/pipeline/event/FlushableHandler.java b/src/main/java/com/hpcloud/mon/persister/pipeline/event/FlushableHandler.java new file mode 100644 index 00000000..675e76a8 --- /dev/null +++ b/src/main/java/com/hpcloud/mon/persister/pipeline/event/FlushableHandler.java @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hpcloud.mon.persister.pipeline.event; + +import com.hpcloud.mon.persister.configuration.PipelineConfiguration; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.Timer; + +import io.dropwizard.setup.Environment; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class FlushableHandler { + + private static final Logger logger = LoggerFactory.getLogger(FlushableHandler.class); + private final int ordinal; + private final int batchSize; + private final String handlerName; + + private long millisSinceLastFlush = System.currentTimeMillis(); + private final long millisBetweenFlushes; + private final int secondsBetweenFlushes; + private int eventCount = 0; + + private final Environment environment; + + private final Meter processedMeter; + private final Meter commitMeter; + private final Timer commitTimer; + + protected FlushableHandler(PipelineConfiguration configuration, Environment environment, + int ordinal, int batchSize, String baseName) { + + this.handlerName = String.format("%s[%d]", baseName, ordinal); + this.environment = environment; + this.processedMeter = + this.environment.metrics() + .meter(handlerName + "." + "events-processed-processedMeter"); + this.commitMeter = + this.environment.metrics().meter(handlerName + "." + "commits-executed-processedMeter"); + this.commitTimer = + this.environment.metrics().timer(handlerName + "." + "total-commit-and-flush-timer"); + + this.secondsBetweenFlushes = configuration.getMaxBatchTime(); + this.millisBetweenFlushes = secondsBetweenFlushes * 1000; + + this.ordinal = ordinal; + this.batchSize = batchSize; + } + + protected abstract void flushRepository(); + + protected abstract int process(T metricEvent) throws Exception; + + public boolean onEvent(final T event) throws Exception { + + if (event == null) { + long delta = millisSinceLastFlush + millisBetweenFlushes; + logger.debug("{} received heartbeat message, flush every {} seconds.", this.handlerName, + this.secondsBetweenFlushes); + if (delta < System.currentTimeMillis()) { + logger.debug("{}: {} seconds since last flush. Flushing to repository now.", + this.handlerName, delta); + flush(); + return true; + } else { + logger.debug("{}: {} seconds since last flush. No need to flush at this time.", + this.handlerName, delta); + return false; + } + } + + processedMeter.mark(); + + logger.debug("Ordinal: Event: {}", ordinal, event); + + eventCount += process(event); + + if (eventCount >= batchSize) { + flush(); + return true; + } else { + return false; + } + } + + public void flush() { + if (eventCount == 0) { + logger.debug("{}: Nothing to flush", this.handlerName); + } + Timer.Context context = commitTimer.time(); + flushRepository(); + context.stop(); + commitMeter.mark(); + millisSinceLastFlush = System.currentTimeMillis(); + logger.debug("{}: Flushed {} events", this.handlerName, this.eventCount); + eventCount = 0; + } +} diff --git a/src/main/java/com/hpcloud/mon/persister/disruptor/event/MetricHandler.java b/src/main/java/com/hpcloud/mon/persister/pipeline/event/MetricHandler.java similarity index 66% rename from src/main/java/com/hpcloud/mon/persister/disruptor/event/MetricHandler.java rename to src/main/java/com/hpcloud/mon/persister/pipeline/event/MetricHandler.java index e9b1aeac..60792979 100644 --- a/src/main/java/com/hpcloud/mon/persister/disruptor/event/MetricHandler.java +++ b/src/main/java/com/hpcloud/mon/persister/pipeline/event/MetricHandler.java @@ -15,21 +15,19 @@ * limitations under the License. */ -package com.hpcloud.mon.persister.disruptor.event; +package com.hpcloud.mon.persister.pipeline.event; import static com.hpcloud.mon.persister.repository.VerticaMetricsConstants.MAX_COLUMN_LENGTH; import com.hpcloud.mon.common.model.metric.Metric; -import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; +import com.hpcloud.mon.common.model.metric.MetricEnvelope; +import com.hpcloud.mon.persister.configuration.PipelineConfiguration; import com.hpcloud.mon.persister.repository.MetricRepository; import com.hpcloud.mon.persister.repository.Sha1HashId; import com.codahale.metrics.Counter; -import com.codahale.metrics.Meter; -import com.codahale.metrics.Timer; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; -import com.lmax.disruptor.EventHandler; import io.dropwizard.setup.Environment; @@ -43,106 +41,65 @@ import java.util.Map; import java.util.TimeZone; import java.util.TreeMap; -public class MetricHandler implements EventHandler, FlushableHandler { +public class MetricHandler extends FlushableHandler { private static final Logger logger = LoggerFactory.getLogger(MetricHandler.class); private static final String TENANT_ID = "tenantId"; private static final String REGION = "region"; private final int ordinal; - private final int numProcessors; - private final int batchSize; private final SimpleDateFormat simpleDateFormat; - private long millisSinceLastFlush = System.currentTimeMillis(); - private final long millisBetweenFlushes; - private final int secondsBetweenFlushes; - private final MetricRepository verticaMetricRepository; - private final Environment environment; private final Counter metricCounter; private final Counter definitionCounter; private final Counter dimensionCounter; private final Counter definitionDimensionsCounter; - private final Meter metricMeter; - private final Meter commitMeter; - private final Timer commitTimer; @Inject - public MetricHandler(MetricRepository metricRepository, MonPersisterConfiguration configuration, + public MetricHandler(MetricRepository metricRepository, @Assisted PipelineConfiguration configuration, Environment environment, @Assisted("ordinal") int ordinal, - @Assisted("numProcessors") int numProcessors, @Assisted("batchSize") int batchSize) { - + @Assisted("batchSize") int batchSize) { + super(configuration, environment, ordinal, batchSize, MetricHandler.class.getName()); + final String handlerName = String.format("%s[%d]", MetricHandler.class.getName(), ordinal); this.verticaMetricRepository = metricRepository; - this.environment = environment; this.metricCounter = - this.environment.metrics().counter( - this.getClass().getName() + "." + "metrics-added-to-batch-counter"); + environment.metrics().counter(handlerName + "." + "metrics-added-to-batch-counter"); this.definitionCounter = - this.environment.metrics().counter( - this.getClass().getName() + "." + "metric-definitions-added-to-batch-counter"); + environment.metrics().counter( + handlerName + "." + "metric-definitions-added-to-batch-counter"); this.dimensionCounter = - this.environment.metrics().counter( - this.getClass().getName() + "." + "metric-dimensions-added-to-batch-counter"); + environment.metrics().counter( + handlerName + "." + "metric-dimensions-added-to-batch-counter"); this.definitionDimensionsCounter = - this.environment.metrics() - .counter( - this.getClass().getName() + "." - + "metric-definition-dimensions-added-to-batch-counter"); - this.metricMeter = - this.environment.metrics().meter( - this.getClass().getName() + "." + "metrics-messages-processed-meter"); - this.commitMeter = - this.environment.metrics() - .meter(this.getClass().getName() + "." + "commits-executed-meter"); - this.commitTimer = - this.environment.metrics().timer( - this.getClass().getName() + "." + "total-commit-and-flush-timer"); - - this.secondsBetweenFlushes = - configuration.getMonDeDuperConfiguration().getDedupeRunFrequencySeconds(); - this.millisBetweenFlushes = secondsBetweenFlushes * 1000; + environment.metrics().counter( + handlerName + "." + "metric-definition-dimensions-added-to-batch-counter"); this.ordinal = ordinal; - this.numProcessors = numProcessors; - this.batchSize = batchSize; simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); simpleDateFormat.setTimeZone(TimeZone.getTimeZone("GMT-0")); - } @Override - public void onEvent(MetricHolder metricEvent, long sequence, boolean b) throws Exception { - - if (metricEvent.getMetricEnvelope() == null) { - logger.debug("Received heartbeat message. Checking last flush time."); - if (millisSinceLastFlush + millisBetweenFlushes < System.currentTimeMillis()) { - logger.debug("It's been more than " + secondsBetweenFlushes - + " seconds since last flush. Flushing staging tables now..."); - flush(); - } else { - logger.debug("It has not been more than " + secondsBetweenFlushes - + " seconds since last flush. No need to perform flush at this time."); - } - return; + public int process(MetricEnvelope[] metricEnvelopes) throws Exception { + int metricCount = 0; + for (final MetricEnvelope metricEnvelope : metricEnvelopes) { + metricCount += processEnvelope(metricEnvelope); } + return metricCount; + } - if (((sequence / batchSize) % this.numProcessors) != this.ordinal) { - return; - } + private int processEnvelope(MetricEnvelope metricEnvelope) { + int metricCount = 0; + Metric metric = metricEnvelope.metric; + Map meta = metricEnvelope.meta; - metricMeter.mark(); - - Metric metric = metricEvent.getMetricEnvelope().metric; - Map meta = metricEvent.getMetricEnvelope().meta; - - logger.debug("sequence number: " + sequence); - logger.debug("ordinal: " + ordinal); - logger.debug("metric: " + metric.toString()); - logger.debug("meta: " + meta.toString()); + logger.debug("ordinal: {}", ordinal); + logger.debug("metric: {}", metric); + logger.debug("meta: {}", meta); String tenantId = ""; if (meta.containsKey(TENANT_ID)) { @@ -225,27 +182,21 @@ public class MetricHandler implements EventHandler, FlushableHandl double value = timeValuePairs[1]; verticaMetricRepository.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value); metricCounter.inc(); + metricCount++; } } else { String timeStamp = simpleDateFormat.format(new Date(metric.getTimestamp() * 1000)); double value = metric.getValue(); verticaMetricRepository.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value); metricCounter.inc(); + metricCount++; } - - if (sequence % batchSize == (batchSize - 1)) { - Timer.Context context = commitTimer.time(); - flush(); - context.stop(); - commitMeter.mark(); - } - + return metricCount; } @Override - public void flush() { + public void flushRepository() { verticaMetricRepository.flush(); - millisSinceLastFlush = System.currentTimeMillis(); } private String trunc(String s, int l) { @@ -261,6 +212,5 @@ public class MetricHandler implements EventHandler, FlushableHandl logger.warn("Resulting string {}", r); return r; } - } } diff --git a/src/main/java/com/hpcloud/mon/persister/disruptor/event/MetricHandlerFactory.java b/src/main/java/com/hpcloud/mon/persister/pipeline/event/MetricHandlerFactory.java similarity index 73% rename from src/main/java/com/hpcloud/mon/persister/disruptor/event/MetricHandlerFactory.java rename to src/main/java/com/hpcloud/mon/persister/pipeline/event/MetricHandlerFactory.java index df63c192..b9b42461 100644 --- a/src/main/java/com/hpcloud/mon/persister/disruptor/event/MetricHandlerFactory.java +++ b/src/main/java/com/hpcloud/mon/persister/pipeline/event/MetricHandlerFactory.java @@ -15,11 +15,13 @@ * limitations under the License. */ -package com.hpcloud.mon.persister.disruptor.event; +package com.hpcloud.mon.persister.pipeline.event; + +import com.hpcloud.mon.persister.configuration.PipelineConfiguration; import com.google.inject.assistedinject.Assisted; public interface MetricHandlerFactory { - MetricHandler create(@Assisted("ordinal") int ordinal, - @Assisted("numProcessors") int numProcessors, @Assisted("batchSize") int batchSize); + MetricHandler create(PipelineConfiguration pipelineConfiguration, + @Assisted("ordinal") int ordinal, @Assisted("batchSize") int batchSize); } diff --git a/src/main/java/com/hpcloud/mon/persister/repository/InfluxDBAlarmRepository.java b/src/main/java/com/hpcloud/mon/persister/repository/InfluxDBAlarmRepository.java index fce6162c..3f0a60fe 100644 --- a/src/main/java/com/hpcloud/mon/persister/repository/InfluxDBAlarmRepository.java +++ b/src/main/java/com/hpcloud/mon/persister/repository/InfluxDBAlarmRepository.java @@ -95,7 +95,7 @@ public class InfluxDBAlarmRepository implements AlarmRepository { Timer.Context context = flushTimer.time(); Serie serie = new Serie(ALARM_STATE_HISTORY_NAME); - logger.debug("Created serie: " + serie.getName()); + logger.debug("Created serie: {}", serie.getName()); serie.setColumns(this.colNamesStringArry); @@ -131,7 +131,7 @@ public class InfluxDBAlarmRepository implements AlarmRepository { context.stop(); long endTime = System.currentTimeMillis(); - logger.debug("Commiting batch took " + (endTime - startTime) / 1000 + " seconds"); + logger.debug("Commiting batch took {} seconds", (endTime - startTime) / 1000); } catch (Exception e) { logger.error("Failed to write alarm state history to database", e); @@ -154,7 +154,7 @@ public class InfluxDBAlarmRepository implements AlarmRepository { } sb.append(colVal); } - logger.debug("Array of column values[{}]: [" + sb.toString() + "]", outerIdx); + logger.debug("Array of column values[{}]: [{}]", outerIdx, sb); outerIdx++; } } @@ -171,6 +171,6 @@ public class InfluxDBAlarmRepository implements AlarmRepository { } sb.append(colName); } - logger.debug("Array of column names: [" + sb.toString() + "]"); + logger.debug("Array of column names: [{}]", sb); } } diff --git a/src/main/java/com/hpcloud/mon/persister/repository/InfluxDBMetricRepository.java b/src/main/java/com/hpcloud/mon/persister/repository/InfluxDBMetricRepository.java index 1939555f..d9d9805b 100644 --- a/src/main/java/com/hpcloud/mon/persister/repository/InfluxDBMetricRepository.java +++ b/src/main/java/com/hpcloud/mon/persister/repository/InfluxDBMetricRepository.java @@ -17,11 +17,13 @@ package com.hpcloud.mon.persister.repository; -import com.google.inject.Inject; +import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; -import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; +import com.google.inject.Inject; + +import io.dropwizard.setup.Environment; import org.apache.commons.codec.digest.DigestUtils; import org.influxdb.InfluxDB; @@ -41,8 +43,6 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.TimeUnit; -import io.dropwizard.setup.Environment; - public class InfluxDBMetricRepository implements MetricRepository { private static final Logger logger = LoggerFactory.getLogger(InfluxDBMetricRepository.class); @@ -59,7 +59,7 @@ public class InfluxDBMetricRepository implements MetricRepository { private final com.codahale.metrics.Timer flushTimer; public final Meter measurementMeter; - private static final SimpleDateFormat measurementTimeStampSimpleDateFormat = new + private final SimpleDateFormat measurementTimeStampSimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss zzz"); private static final Sha1HashId BLANK_SHA_1_HASH_ID = new Sha1HashId(DigestUtils.sha("")); @@ -123,8 +123,8 @@ public class InfluxDBMetricRepository implements MetricRepository { TimeUnit.SECONDS); long endTime = System.currentTimeMillis(); context.stop(); - logger.debug("Writing measurements, definitions, and dimensions to database took " + - (endTime - startTime) / 1000 + " seconds"); + logger.debug("Writing measurements, definitions, and dimensions to database took {} seconds", + (endTime - startTime) / 1000); } catch (Exception e) { logger.error("Failed to write measurements to database", e); } @@ -148,7 +148,7 @@ public class InfluxDBMetricRepository implements MetricRepository { for (Set dimNameSet : dimNameSetMap.keySet()) { Serie serie = new Serie(definition.name); - logger.debug("Created serie: " + serie.getName()); + logger.debug("Created serie: {}", serie.getName()); // Add 4 for the tenant id, region, timestamp, and value. String[] colNameStringArry = new String[dimNameSet.size() + 4]; @@ -158,7 +158,7 @@ public class InfluxDBMetricRepository implements MetricRepository { colNameStringArry[1] = "region"; int j = 2; for (String dimName : dimNameSet) { - logger.debug("Adding column name[{}]: " + dimName, j); + logger.debug("Adding column name[{}]: {}", j, dimName); colNameStringArry[j++] = dimName; } logger.debug("Adding column name[{}]: time", j); @@ -181,9 +181,9 @@ public class InfluxDBMetricRepository implements MetricRepository { Object[][] colValsObjectArry = new Object[pointList.size()][dimNameSet.size() + 4]; int k = 0; for (Point point : pointList) { - logger.debug("Adding column value[{}][0]: " + definition.tenantId, k, 0); + logger.debug("Adding column value[{}][0]: {}", k, definition.tenantId); colValsObjectArry[k][0] = definition.tenantId; - logger.debug("Adding column value[{}][1]: " + definition.region, k, 1); + logger.debug("Adding column value[{}][1]: {}", k, definition.region); colValsObjectArry[k][1] = definition.region; int l = 2; for (String dimName : dimNameSet) { @@ -196,9 +196,9 @@ public class InfluxDBMetricRepository implements MetricRepository { } Date d = measurementTimeStampSimpleDateFormat.parse(point.measurement.timeStamp + " UTC"); Long time = d.getTime() / 1000; - logger.debug("Adding column value[{}][{}]: " + time, k, l); + logger.debug("Adding column value[{}][{}]: {}", k, l, time); colValsObjectArry[k][l++] = time; - logger.debug("Adding column value[{}][{}]: " + point.measurement.value, k, l); + logger.debug("Adding column value[{}][{}]: {}", k, l, point.measurement.value); colValsObjectArry[k][l++] = point.measurement.value; measurementMeter.mark(); k++; @@ -232,7 +232,7 @@ public class InfluxDBMetricRepository implements MetricRepository { } sb.append(colVal); } - logger.debug("Array of column values[{}]: [" + sb.toString() + "]", outerIdx); + logger.debug("Array of column values[{}]: [{}]", outerIdx, sb); outerIdx++; } } @@ -249,7 +249,7 @@ public class InfluxDBMetricRepository implements MetricRepository { } sb.append(colName); } - logger.debug("Array of column names: [" + sb.toString() + "]"); + logger.debug("Array of column names: [{}]", sb); } /** diff --git a/src/main/java/com/hpcloud/mon/persister/repository/RepositoryCommitHeartbeat.java b/src/main/java/com/hpcloud/mon/persister/repository/RepositoryCommitHeartbeat.java deleted file mode 100644 index 1fc36819..00000000 --- a/src/main/java/com/hpcloud/mon/persister/repository/RepositoryCommitHeartbeat.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hpcloud.mon.persister.repository; - -import com.hpcloud.mon.persister.disruptor.AlarmStateHistoryDisruptor; -import com.hpcloud.mon.persister.disruptor.MetricDisruptor; -import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHolder; -import com.hpcloud.mon.persister.disruptor.event.MetricHolder; - -import com.google.inject.Inject; -import com.lmax.disruptor.EventTranslator; -import com.lmax.disruptor.dsl.Disruptor; - -import io.dropwizard.lifecycle.Managed; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class RepositoryCommitHeartbeat implements Managed { - - private final HeartbeatRunnable deduperRunnable; - - @Inject - public RepositoryCommitHeartbeat(MetricDisruptor metricDisruptor, - AlarmStateHistoryDisruptor alarmHistoryDisruptor) { - this.deduperRunnable = new HeartbeatRunnable(metricDisruptor, alarmHistoryDisruptor); - } - - @Override - public void start() throws Exception { - - Thread heartbeatThread = new Thread(deduperRunnable); - heartbeatThread.start(); - } - - @Override - public void stop() throws Exception { - this.deduperRunnable.stop(); - } - - private static class HeartbeatRunnable implements Runnable { - - private static final Logger logger = LoggerFactory.getLogger(HeartbeatRunnable.class); - private final Disruptor metricDisruptor; - private final Disruptor alarmHistoryDisruptor; - - private boolean stop = false; - - private HeartbeatRunnable(MetricDisruptor metricDisruptor, - AlarmStateHistoryDisruptor alarmHistoryDisruptor) { - this.metricDisruptor = metricDisruptor; - this.alarmHistoryDisruptor = alarmHistoryDisruptor; - } - - @Override - public void run() { - for (;;) { - try { - // Send a heartbeat every second. - synchronized (this) { - this.wait(1000); - if (stop) { - logger.debug("Heartbeat thread is exiting"); - break; - } - } - logger.debug("Waking up after sleeping 1 seconds, yawn..."); - - // Send heartbeat - logger.debug("Sending heartbeat message"); - metricDisruptor.publishEvent(new EventTranslator() { - - @Override - public void translateTo(MetricHolder event, long sequence) { - event.setEnvelope(null); - } - }); - alarmHistoryDisruptor - .publishEvent(new EventTranslator() { - - @Override - public void translateTo(AlarmStateTransitionedEventHolder event, long sequence) { - event.setEvent(null); - } - }); - - } catch (Exception e) { - logger.error("Failed to send heartbeat", e); - } - - } - - } - - public synchronized void stop() { - stop = true; - this.notify(); - } - } -} diff --git a/src/main/resources/mon-persister-config.yml b/src/main/resources/mon-persister-config.yml index b4e90bc1..69b0a9ec 100644 --- a/src/main/resources/mon-persister-config.yml +++ b/src/main/resources/mon-persister-config.yml @@ -1,24 +1,33 @@ name: mon-persister alarmHistoryConfiguration: + batchSize: 100 + numThreads: 1 + maxBatchTime: 15 +# See http://kafka.apache.org/documentation.html#api for semantics and defaults. topic: alarm-state-transitions + groupId: persister_alarms + consumerId: 1 + clientId: 1 metricConfiguration: + batchSize: 1000 + numThreads: 2 + maxBatchTime: 30 +# See http://kafka.apache.org/documentation.html#api for semantics and defaults. topic: metrics + groupId: persister_metrics + consumerId: 1 + clientId: 1 #Kafka settings. kafkaConfiguration: -# See http://kafka.apache.org/documentation.html#api for semantics and defaults. - numThreads: 1 - groupId: 1 #zookeeperConnect: localhost:2181 +# See http://kafka.apache.org/documentation.html#api for semantics and defaults. zookeeperConnect: 192.168.10.4:2181 - consumerId: 1 socketTimeoutMs: 30000 socketReceiveBufferBytes : 65536 fetchMessageMaxBytes: 1048576 - autoCommitEnable: true - autoCommitIntervalMs: 60000 queuedMaxMessageChunks: 10 rebalanceMaxRetries: 4 fetchMinBytes: 1 @@ -26,23 +35,11 @@ kafkaConfiguration: rebalanceBackoffMs: 2000 refreshLeaderBackoffMs: 200 autoOffsetReset: largest - consumerTimeoutMs: -1 - clientId: 1 + consumerTimeoutMs: 1000 zookeeperSessionTimeoutMs : 60000 zookeeperConnectionTimeoutMs : 6000 zookeeperSyncTimeMs: 2000 - -disruptorConfiguration: - bufferSize: 1048576 - numProcessors: 1 - -outputProcessorConfiguration: - batchSize: 100 - -monDeDuperConfiguration: - dedupeRunFrequencySeconds: 30 - verticaMetricRepositoryConfiguration: maxCacheSize: 2000000 @@ -54,7 +51,7 @@ databaseConfiguration: influxDbConfiguration: name: mon replicationFactor: 1 - url: http://127.0.0.1:8086 + url: http://192.168.10.4:8086 user: root password: root diff --git a/src/test/java/com/hpcloud/mon/persister/MonPersisterConsumerTest.java b/src/test/java/com/hpcloud/mon/persister/MonPersisterConsumerTest.java index 0c2b2cb3..feacff23 100644 --- a/src/test/java/com/hpcloud/mon/persister/MonPersisterConsumerTest.java +++ b/src/test/java/com/hpcloud/mon/persister/MonPersisterConsumerTest.java @@ -19,12 +19,13 @@ package com.hpcloud.mon.persister; import com.hpcloud.mon.persister.consumer.KafkaMetricsConsumer; import com.hpcloud.mon.persister.consumer.MetricsConsumer; +import com.hpcloud.mon.persister.pipeline.MetricPipeline; +import com.hpcloud.mon.persister.pipeline.event.MetricHandler; -import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.mockito.InjectMocks; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; public class MonPersisterConsumerTest { @@ -35,33 +36,22 @@ public class MonPersisterConsumerTest { @Mock private MetricsConsumer monConsumer; + private MetricHandler metricHandler; + + private MetricPipeline metricPipeline; + @Before public void initMocks() { + metricHandler = Mockito.mock(MetricHandler.class); + metricPipeline = Mockito.spy(new MetricPipeline(metricHandler)); MockitoAnnotations.initMocks(this); } @Test - public void testKafkaConsumerStart() { - try { - monConsumer.start(); - } catch (Exception e) { - e.printStackTrace(); // To change body of catch statement use File | Settings | File Templates. - } - - } - - @Test - public void testKafkaConsumerStop() { - try { - monConsumer.stop(); - } catch (Exception e) { - e.printStackTrace(); // To change body of catch statement use File | Settings | File - // Templates. - } - } - - @After - public void after() { - System.out.println("after"); + public void testKafkaConsumerLifecycle() throws Exception { + monConsumer.start(); + monConsumer.stop(); + metricPipeline.shutdown(); + Mockito.verify(metricHandler).flush(); } } diff --git a/src/test/java/com/hpcloud/mon/persister/Test.java b/src/test/java/com/hpcloud/mon/persister/Test.java deleted file mode 100644 index 13c3192c..00000000 --- a/src/test/java/com/hpcloud/mon/persister/Test.java +++ /dev/null @@ -1,74 +0,0 @@ -package com.hpcloud.mon.persister; - -import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration; -import com.hpcloud.mon.persister.consumer.KafkaConsumer; -import com.hpcloud.util.config.ConfigurationException; -import com.hpcloud.util.config.ConfigurationFactory; - -import kafka.consumer.ConsumerIterator; -import kafka.consumer.KafkaStream; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; - -public class Test extends KafkaConsumer { - private static final Logger logger = LoggerFactory.getLogger(Test.class); - - private static final String TOPIC = "Test"; - - public Test(MonPersisterConfiguration configuration) { - super(configuration); - } - - public static void main(String[] args) throws IOException, ConfigurationException { - final MonPersisterConfiguration config = createConfig(args[0]); - config.getKafkaConfiguration(); - final Test test = new Test(config); - test.run(); - } - - private static MonPersisterConfiguration createConfig(String configFileName) throws IOException, - ConfigurationException { - return ConfigurationFactory - .forClass(MonPersisterConfiguration.class).build( - new File(configFileName)); - } - - @Override - protected Runnable createRunnable(KafkaStream stream, int threadNumber) { - logger.info("Created KafkaReader for {}", threadNumber); - return new KafkaReader(stream, threadNumber); - } - - @Override - protected String getStreamName() { - return TOPIC; - } - - protected class KafkaReader implements Runnable { - - private final KafkaStream stream; - private final int threadNumber; - - public KafkaReader(KafkaStream stream, int threadNumber) { - this.threadNumber = threadNumber; - this.stream = stream; - } - - - public void run() { - ConsumerIterator it = stream.iterator(); - while (it.hasNext()) { - - final String s = new String(it.next().message()); - - logger.debug("Thread {}: {}", threadNumber, s); - } - logger.debug("Shutting down Thread: " + threadNumber); - } - } - -}