Commit kafka reads once the item was persisted
In order to continue using the Kafka High Consumer API, the disruptor was removed. This allows a direct call to to kafka to commit the offsets when items are flushed. Different ConsumerConnectors had to be created for Metrics and Alarms so the offsets could be committed separately Changed configuration to match the new model. Remove configuration parameters that were no longer needed Changed the name Disruptor to Pipeline Allow only one EventHandler per pipeline Added code to flush the Metrics and Alarms, and shutdown the Kafka ConsumerConnections on a normal shutdown. This keeps the persister from losing Metrics and Alarms. Made measurementTimeStampSimpleDateFormat not static since SimpleDateFormat is not thread safe Changed some logging debug statements so Strings weren't created if debug not on Created FlushableHandler as a base class and moved duplicate code into it from MetricHandler and AlarmStateTransitionHistoryHandler Change-Id: Id31a1d148f8e796f5be483dd02544be49c009b18 Changed MetricHandler to take MetricEnvelope[] Change-Id: Ifabbe253cc0163f150ada2252a41a5d9fb9ab423
This commit is contained in:
parent
d8a5b87235
commit
dbab337d76
6
.gitignore
vendored
Normal file
6
.gitignore
vendored
Normal file
@ -0,0 +1,6 @@
|
||||
target/
|
||||
*.classpath
|
||||
*.project
|
||||
*.settings/
|
||||
debs/
|
||||
logs/
|
6
pom.xml
6
pom.xml
@ -100,12 +100,6 @@
|
||||
<version>4.11</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.lmax</groupId>
|
||||
<artifactId>disruptor</artifactId>
|
||||
<version>3.2.0</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.vertica</groupId>
|
||||
<artifactId>vertica-jdbc</artifactId>
|
||||
|
@ -18,10 +18,23 @@
|
||||
package com.hpcloud.mon.persister;
|
||||
|
||||
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
|
||||
import com.hpcloud.mon.persister.consumer.AlarmStateTransitionsConsumer;
|
||||
import com.hpcloud.mon.persister.consumer.AlarmStateTransitionConsumer;
|
||||
import com.hpcloud.mon.persister.consumer.AlarmStateTransitionConsumerFactory;
|
||||
import com.hpcloud.mon.persister.consumer.KafkaAlarmStateTransitionConsumer;
|
||||
import com.hpcloud.mon.persister.consumer.KafkaAlarmStateTransitionConsumerFactory;
|
||||
import com.hpcloud.mon.persister.consumer.KafkaChannel;
|
||||
import com.hpcloud.mon.persister.consumer.KafkaChannelFactory;
|
||||
import com.hpcloud.mon.persister.consumer.KafkaMetricsConsumer;
|
||||
import com.hpcloud.mon.persister.consumer.KafkaMetricsConsumerFactory;
|
||||
import com.hpcloud.mon.persister.consumer.MetricsConsumer;
|
||||
import com.hpcloud.mon.persister.consumer.MetricsConsumerFactory;
|
||||
import com.hpcloud.mon.persister.healthcheck.SimpleHealthCheck;
|
||||
import com.hpcloud.mon.persister.repository.RepositoryCommitHeartbeat;
|
||||
import com.hpcloud.mon.persister.pipeline.AlarmStateTransitionPipeline;
|
||||
import com.hpcloud.mon.persister.pipeline.AlarmStateTransitionPipelineFactory;
|
||||
import com.hpcloud.mon.persister.pipeline.MetricPipeline;
|
||||
import com.hpcloud.mon.persister.pipeline.MetricPipelineFactory;
|
||||
import com.hpcloud.mon.persister.pipeline.event.AlarmStateTransitionedEventHandlerFactory;
|
||||
import com.hpcloud.mon.persister.pipeline.event.MetricHandlerFactory;
|
||||
import com.hpcloud.mon.persister.resource.Resource;
|
||||
|
||||
import com.google.inject.Guice;
|
||||
@ -31,7 +44,11 @@ import io.dropwizard.Application;
|
||||
import io.dropwizard.setup.Bootstrap;
|
||||
import io.dropwizard.setup.Environment;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class MonPersisterApplication extends Application<MonPersisterConfiguration> {
|
||||
private static final Logger logger = LoggerFactory.getLogger(MonPersisterApplication.class);
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
new MonPersisterApplication().run(args);
|
||||
@ -58,16 +75,79 @@ public class MonPersisterApplication extends Application<MonPersisterConfigurati
|
||||
// Sample health check.
|
||||
environment.healthChecks().register("test-health-check", new SimpleHealthCheck());
|
||||
|
||||
MetricsConsumer metricsConsumer = injector.getInstance(MetricsConsumer.class);
|
||||
environment.lifecycle().manage(metricsConsumer);
|
||||
final KafkaChannelFactory kafkaChannelFactory = injector.getInstance(KafkaChannelFactory.class);
|
||||
final MetricsConsumerFactory metricsConsumerFactory =
|
||||
injector.getInstance(MetricsConsumerFactory.class);
|
||||
final KafkaMetricsConsumerFactory kafkaMetricsConsumerFactory =
|
||||
injector.getInstance(KafkaMetricsConsumerFactory.class);
|
||||
for (int i = 0; i < configuration.getMetricConfiguration().getNumThreads(); i++) {
|
||||
final KafkaChannel kafkaChannel =
|
||||
kafkaChannelFactory.create(configuration, configuration.getMetricConfiguration(), i);
|
||||
final MetricPipeline metricPipeline = getMetricPipeline(configuration, i, injector);
|
||||
final KafkaMetricsConsumer kafkaMetricsConsumer =
|
||||
kafkaMetricsConsumerFactory.create(kafkaChannel, i, metricPipeline);
|
||||
MetricsConsumer metricsConsumer =
|
||||
metricsConsumerFactory.create(kafkaMetricsConsumer, metricPipeline);
|
||||
environment.lifecycle().manage(metricsConsumer);
|
||||
}
|
||||
|
||||
AlarmStateTransitionsConsumer alarmStateTransitionsConsumer =
|
||||
injector.getInstance(AlarmStateTransitionsConsumer.class);
|
||||
environment.lifecycle().manage(alarmStateTransitionsConsumer);
|
||||
final AlarmStateTransitionConsumerFactory alarmStateTransitionsConsumerFactory =
|
||||
injector.getInstance(AlarmStateTransitionConsumerFactory.class);
|
||||
final KafkaAlarmStateTransitionConsumerFactory kafkaAlarmStateTransitionConsumerFactory =
|
||||
injector.getInstance(KafkaAlarmStateTransitionConsumerFactory.class);
|
||||
for (int i = 0; i < configuration.getAlarmHistoryConfiguration().getNumThreads(); i++) {
|
||||
final KafkaChannel kafkaChannel =
|
||||
kafkaChannelFactory
|
||||
.create(configuration, configuration.getAlarmHistoryConfiguration(), i);
|
||||
final AlarmStateTransitionPipeline pipeline =
|
||||
getAlarmStateHistoryPipeline(configuration, i, injector);
|
||||
final KafkaAlarmStateTransitionConsumer kafkaAlarmStateTransitionConsumer =
|
||||
kafkaAlarmStateTransitionConsumerFactory.create(kafkaChannel, i, pipeline);
|
||||
AlarmStateTransitionConsumer alarmStateTransitionConsumer =
|
||||
alarmStateTransitionsConsumerFactory.create(kafkaAlarmStateTransitionConsumer, pipeline);
|
||||
environment.lifecycle().manage(alarmStateTransitionConsumer);
|
||||
}
|
||||
}
|
||||
|
||||
RepositoryCommitHeartbeat repositoryCommitHeartbeat =
|
||||
injector.getInstance(RepositoryCommitHeartbeat.class);
|
||||
environment.lifecycle().manage(repositoryCommitHeartbeat);
|
||||
private MetricPipeline getMetricPipeline(MonPersisterConfiguration configuration, int threadNum,
|
||||
Injector injector) {
|
||||
|
||||
logger.debug("Creating metric pipeline...");
|
||||
|
||||
final int batchSize = configuration.getMetricConfiguration().getBatchSize();
|
||||
logger.debug("Batch size for metric pipeline [" + batchSize + "]");
|
||||
|
||||
MetricHandlerFactory metricEventHandlerFactory =
|
||||
injector.getInstance(MetricHandlerFactory.class);
|
||||
MetricPipelineFactory metricPipelineFactory = injector.getInstance(MetricPipelineFactory.class);
|
||||
final MetricPipeline pipeline =
|
||||
metricPipelineFactory.create(metricEventHandlerFactory.create(
|
||||
configuration.getMetricConfiguration(), threadNum, batchSize));
|
||||
|
||||
logger.debug("Instance of metric pipeline fully created");
|
||||
|
||||
return pipeline;
|
||||
}
|
||||
|
||||
public AlarmStateTransitionPipeline getAlarmStateHistoryPipeline(
|
||||
MonPersisterConfiguration configuration, int threadNum, Injector injector) {
|
||||
|
||||
logger.debug("Creating alarm state history pipeline...");
|
||||
|
||||
int batchSize = configuration.getAlarmHistoryConfiguration().getBatchSize();
|
||||
logger.debug("Batch size for each AlarmStateHistoryPipeline [" + batchSize + "]");
|
||||
AlarmStateTransitionedEventHandlerFactory alarmHistoryEventHandlerFactory =
|
||||
injector.getInstance(AlarmStateTransitionedEventHandlerFactory.class);
|
||||
|
||||
AlarmStateTransitionPipelineFactory alarmStateTransitionPipelineFactory =
|
||||
injector.getInstance(AlarmStateTransitionPipelineFactory.class);
|
||||
|
||||
AlarmStateTransitionPipeline pipeline =
|
||||
alarmStateTransitionPipelineFactory.create(alarmHistoryEventHandlerFactory.create(
|
||||
configuration.getAlarmHistoryConfiguration(), threadNum, batchSize));
|
||||
|
||||
logger.debug("Instance of alarm state history pipeline fully created");
|
||||
|
||||
return pipeline;
|
||||
}
|
||||
}
|
||||
|
@ -18,36 +18,39 @@
|
||||
package com.hpcloud.mon.persister;
|
||||
|
||||
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
|
||||
import com.hpcloud.mon.persister.consumer.AlarmStateTransitionsConsumer;
|
||||
import com.hpcloud.mon.persister.consumer.AlarmStateTransitionConsumer;
|
||||
import com.hpcloud.mon.persister.consumer.AlarmStateTransitionConsumerFactory;
|
||||
import com.hpcloud.mon.persister.consumer.KafkaAlarmStateTransitionConsumer;
|
||||
import com.hpcloud.mon.persister.consumer.KafkaAlarmStateTransitionConsumerFactory;
|
||||
import com.hpcloud.mon.persister.consumer.KafkaAlarmStateTransitionConsumerRunnableBasic;
|
||||
import com.hpcloud.mon.persister.consumer.KafkaAlarmStateTransitionConsumerRunnableBasicFactory;
|
||||
import com.hpcloud.mon.persister.consumer.KafkaChannel;
|
||||
import com.hpcloud.mon.persister.consumer.KafkaChannelFactory;
|
||||
import com.hpcloud.mon.persister.consumer.KafkaMetricsConsumer;
|
||||
import com.hpcloud.mon.persister.consumer.KafkaMetricsConsumerFactory;
|
||||
import com.hpcloud.mon.persister.consumer.KafkaMetricsConsumerRunnableBasic;
|
||||
import com.hpcloud.mon.persister.consumer.KafkaMetricsConsumerRunnableBasicFactory;
|
||||
import com.hpcloud.mon.persister.consumer.KafkaStreams;
|
||||
import com.hpcloud.mon.persister.consumer.KafkaStreamsProvider;
|
||||
import com.hpcloud.mon.persister.consumer.MetricsConsumer;
|
||||
import com.hpcloud.mon.persister.consumer.MetricsConsumerFactory;
|
||||
import com.hpcloud.mon.persister.dbi.DBIProvider;
|
||||
import com.hpcloud.mon.persister.disruptor.AlarmHistoryDisruptorProvider;
|
||||
import com.hpcloud.mon.persister.disruptor.AlarmStateHistoryDisruptor;
|
||||
import com.hpcloud.mon.persister.disruptor.DisruptorExceptionHandler;
|
||||
import com.hpcloud.mon.persister.disruptor.MetricDisruptor;
|
||||
import com.hpcloud.mon.persister.disruptor.MetricDisruptorProvider;
|
||||
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHandler;
|
||||
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHandlerFactory;
|
||||
import com.hpcloud.mon.persister.disruptor.event.MetricHandler;
|
||||
import com.hpcloud.mon.persister.disruptor.event.MetricHandlerFactory;
|
||||
import com.hpcloud.mon.persister.pipeline.AlarmStateTransitionPipeline;
|
||||
import com.hpcloud.mon.persister.pipeline.AlarmStateTransitionPipelineFactory;
|
||||
import com.hpcloud.mon.persister.pipeline.MetricPipeline;
|
||||
import com.hpcloud.mon.persister.pipeline.MetricPipelineFactory;
|
||||
import com.hpcloud.mon.persister.pipeline.event.AlarmStateTransitionedEventHandler;
|
||||
import com.hpcloud.mon.persister.pipeline.event.AlarmStateTransitionedEventHandlerFactory;
|
||||
import com.hpcloud.mon.persister.pipeline.event.MetricHandler;
|
||||
import com.hpcloud.mon.persister.pipeline.event.MetricHandlerFactory;
|
||||
import com.hpcloud.mon.persister.repository.AlarmRepository;
|
||||
import com.hpcloud.mon.persister.repository.InfluxDBAlarmRepository;
|
||||
import com.hpcloud.mon.persister.repository.InfluxDBMetricRepository;
|
||||
import com.hpcloud.mon.persister.repository.MetricRepository;
|
||||
import com.hpcloud.mon.persister.repository.RepositoryCommitHeartbeat;
|
||||
import com.hpcloud.mon.persister.repository.VerticaAlarmRepository;
|
||||
import com.hpcloud.mon.persister.repository.VerticaMetricRepository;
|
||||
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Scopes;
|
||||
import com.google.inject.assistedinject.FactoryModuleBuilder;
|
||||
import com.lmax.disruptor.ExceptionHandler;
|
||||
|
||||
import io.dropwizard.setup.Environment;
|
||||
|
||||
@ -85,12 +88,38 @@ public class MonPersisterModule extends AbstractModule {
|
||||
KafkaAlarmStateTransitionConsumerRunnableBasic.class).build(
|
||||
KafkaAlarmStateTransitionConsumerRunnableBasicFactory.class));
|
||||
|
||||
bind(ExceptionHandler.class).to(DisruptorExceptionHandler.class);
|
||||
install(new FactoryModuleBuilder().implement(
|
||||
KafkaMetricsConsumer.class,
|
||||
KafkaMetricsConsumer.class).build(
|
||||
KafkaMetricsConsumerFactory.class));
|
||||
|
||||
bind(MetricDisruptor.class).toProvider(MetricDisruptorProvider.class).in(Scopes.SINGLETON);
|
||||
install(new FactoryModuleBuilder().implement(
|
||||
MetricPipeline.class,
|
||||
MetricPipeline.class).build(
|
||||
MetricPipelineFactory.class));
|
||||
|
||||
bind(AlarmStateHistoryDisruptor.class).toProvider(AlarmHistoryDisruptorProvider.class).in(
|
||||
Scopes.SINGLETON);
|
||||
install(new FactoryModuleBuilder().implement(
|
||||
AlarmStateTransitionPipeline.class,
|
||||
AlarmStateTransitionPipeline.class).build(
|
||||
AlarmStateTransitionPipelineFactory.class));
|
||||
|
||||
install(new FactoryModuleBuilder().implement(
|
||||
AlarmStateTransitionConsumer.class,
|
||||
AlarmStateTransitionConsumer.class).build(
|
||||
AlarmStateTransitionConsumerFactory.class));
|
||||
|
||||
install(new FactoryModuleBuilder().implement(
|
||||
KafkaAlarmStateTransitionConsumer.class,
|
||||
KafkaAlarmStateTransitionConsumer.class).build(
|
||||
KafkaAlarmStateTransitionConsumerFactory.class));
|
||||
|
||||
install(new FactoryModuleBuilder().implement(
|
||||
MetricsConsumer.class,
|
||||
MetricsConsumer.class).build(
|
||||
MetricsConsumerFactory.class));
|
||||
|
||||
install(new FactoryModuleBuilder().implement(KafkaChannel.class, KafkaChannel.class).build(
|
||||
KafkaChannelFactory.class));
|
||||
|
||||
if (configuration.getDatabaseConfiguration().getDatabaseType().equals("vertica")) {
|
||||
bind(DBI.class).toProvider(DBIProvider.class).in(Scopes.SINGLETON);
|
||||
@ -106,10 +135,5 @@ public class MonPersisterModule extends AbstractModule {
|
||||
System.out.println("Check your config file.");
|
||||
System.exit(1);
|
||||
}
|
||||
|
||||
bind(KafkaStreams.class).toProvider(KafkaStreamsProvider.class).in(Scopes.SINGLETON);
|
||||
bind(MetricsConsumer.class);
|
||||
bind(AlarmStateTransitionsConsumer.class);
|
||||
bind(RepositoryCommitHeartbeat.class);
|
||||
}
|
||||
}
|
||||
|
@ -1,33 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.configuration;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
public class DeduperConfiguration {
|
||||
|
||||
@JsonProperty
|
||||
Integer dedupeRunFrequencySeconds;
|
||||
|
||||
public Integer getDedupeRunFrequencySeconds() {
|
||||
return dedupeRunFrequencySeconds;
|
||||
}
|
||||
}
|
@ -1,40 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.configuration;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
public class DisruptorConfiguration {
|
||||
|
||||
@JsonProperty
|
||||
Integer bufferSize;
|
||||
|
||||
public Integer getBufferSize() {
|
||||
return bufferSize;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
Integer numProcessors;
|
||||
|
||||
public Integer getNumProcessors() {
|
||||
return numProcessors;
|
||||
}
|
||||
}
|
@ -27,18 +27,9 @@ public class KafkaConfiguration {
|
||||
@JsonProperty
|
||||
String topic;
|
||||
|
||||
@JsonProperty
|
||||
Integer numThreads;
|
||||
|
||||
@JsonProperty
|
||||
String groupId;
|
||||
|
||||
@JsonProperty
|
||||
String zookeeperConnect;
|
||||
|
||||
@JsonProperty
|
||||
String consumerId;
|
||||
|
||||
@JsonProperty
|
||||
Integer socketTimeoutMs;
|
||||
|
||||
@ -48,12 +39,6 @@ public class KafkaConfiguration {
|
||||
@JsonProperty
|
||||
Integer fetchMessageMaxBytes;
|
||||
|
||||
@JsonProperty
|
||||
Boolean autoCommitEnable;
|
||||
|
||||
@JsonProperty
|
||||
Integer autoCommitIntervalMs;
|
||||
|
||||
@JsonProperty
|
||||
Integer queuedMaxMessageChunks;
|
||||
|
||||
@ -78,9 +63,6 @@ public class KafkaConfiguration {
|
||||
@JsonProperty
|
||||
Integer consumerTimeoutMs;
|
||||
|
||||
@JsonProperty
|
||||
String clientId;
|
||||
|
||||
@JsonProperty
|
||||
Integer zookeeperSessionTimeoutMs;
|
||||
|
||||
@ -94,22 +76,10 @@ public class KafkaConfiguration {
|
||||
return topic;
|
||||
}
|
||||
|
||||
public Integer getNumThreads() {
|
||||
return numThreads;
|
||||
}
|
||||
|
||||
public String getGroupId() {
|
||||
return groupId;
|
||||
}
|
||||
|
||||
public String getZookeeperConnect() {
|
||||
return zookeeperConnect;
|
||||
}
|
||||
|
||||
public String getConsumerId() {
|
||||
return consumerId;
|
||||
}
|
||||
|
||||
public Integer getSocketTimeoutMs() {
|
||||
return socketTimeoutMs;
|
||||
}
|
||||
@ -122,14 +92,6 @@ public class KafkaConfiguration {
|
||||
return fetchMessageMaxBytes;
|
||||
}
|
||||
|
||||
public Boolean getAutoCommitEnable() {
|
||||
return autoCommitEnable;
|
||||
}
|
||||
|
||||
public Integer getAutoCommitIntervalMs() {
|
||||
return autoCommitIntervalMs;
|
||||
}
|
||||
|
||||
public Integer getQueuedMaxMessageChunks() {
|
||||
return queuedMaxMessageChunks;
|
||||
}
|
||||
@ -162,10 +124,6 @@ public class KafkaConfiguration {
|
||||
return consumerTimeoutMs;
|
||||
}
|
||||
|
||||
public String getClientId() {
|
||||
return clientId;
|
||||
}
|
||||
|
||||
public Integer getZookeeperSessionTimeoutMs() {
|
||||
return zookeeperSessionTimeoutMs;
|
||||
}
|
||||
|
@ -42,19 +42,19 @@ public class MonPersisterConfiguration extends Configuration {
|
||||
@JsonProperty
|
||||
@NotNull
|
||||
@Valid
|
||||
private final AlarmHistoryConfiguration alarmHistoryConfiguration =
|
||||
new AlarmHistoryConfiguration();
|
||||
private final PipelineConfiguration alarmHistoryConfiguration =
|
||||
new PipelineConfiguration();
|
||||
|
||||
public AlarmHistoryConfiguration getAlarmHistoryConfiguration() {
|
||||
public PipelineConfiguration getAlarmHistoryConfiguration() {
|
||||
return alarmHistoryConfiguration;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@NotNull
|
||||
@Valid
|
||||
private final MetricConfiguration metricConfiguration = new MetricConfiguration();
|
||||
private final PipelineConfiguration metricConfiguration = new PipelineConfiguration();
|
||||
|
||||
public MetricConfiguration getMetricConfiguration() {
|
||||
public PipelineConfiguration getMetricConfiguration() {
|
||||
return metricConfiguration;
|
||||
}
|
||||
|
||||
@ -67,25 +67,6 @@ public class MonPersisterConfiguration extends Configuration {
|
||||
return kafkaConfiguration;
|
||||
}
|
||||
|
||||
@Valid
|
||||
@NotNull
|
||||
@JsonProperty
|
||||
private final DisruptorConfiguration disruptorConfiguration = new DisruptorConfiguration();
|
||||
|
||||
public DisruptorConfiguration getDisruptorConfiguration() {
|
||||
return disruptorConfiguration;
|
||||
}
|
||||
|
||||
@Valid
|
||||
@NotNull
|
||||
@JsonProperty
|
||||
private final OutputProcessorConfiguration outputProcessorConfiguration =
|
||||
new OutputProcessorConfiguration();
|
||||
|
||||
public OutputProcessorConfiguration getOutputProcessorConfiguration() {
|
||||
return outputProcessorConfiguration;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
private final DataSourceFactory dataSourceFactory = new DataSourceFactory();
|
||||
|
||||
@ -93,15 +74,6 @@ public class MonPersisterConfiguration extends Configuration {
|
||||
return dataSourceFactory;
|
||||
}
|
||||
|
||||
@Valid
|
||||
@NotNull
|
||||
@JsonProperty
|
||||
private final DeduperConfiguration monDeDuperConfiguration = new DeduperConfiguration();
|
||||
|
||||
public DeduperConfiguration getMonDeDuperConfiguration() {
|
||||
return monDeDuperConfiguration;
|
||||
}
|
||||
|
||||
@Valid
|
||||
@NotNull
|
||||
@JsonProperty
|
||||
|
@ -1,33 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.configuration;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
public class OutputProcessorConfiguration {
|
||||
|
||||
@JsonProperty
|
||||
Integer batchSize;
|
||||
|
||||
public Integer getBatchSize() {
|
||||
return batchSize;
|
||||
}
|
||||
}
|
@ -0,0 +1,96 @@
|
||||
/*
|
||||
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.configuration;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
public class PipelineConfiguration {
|
||||
|
||||
@JsonProperty
|
||||
String topic;
|
||||
|
||||
@JsonProperty
|
||||
String groupId;
|
||||
|
||||
@JsonProperty
|
||||
String consumerId;
|
||||
|
||||
@JsonProperty
|
||||
String clientId;
|
||||
|
||||
@JsonProperty
|
||||
Integer batchSize;
|
||||
|
||||
@JsonProperty
|
||||
Integer numThreads;
|
||||
|
||||
@JsonProperty
|
||||
Integer maxBatchTime;
|
||||
|
||||
public String getTopic() {
|
||||
return topic;
|
||||
}
|
||||
|
||||
public String getGroupId() {
|
||||
return groupId;
|
||||
}
|
||||
|
||||
public void setGroupId(String groupId) {
|
||||
this.groupId = groupId;
|
||||
}
|
||||
|
||||
public String getConsumerId() {
|
||||
return consumerId;
|
||||
}
|
||||
|
||||
public void setConsumerId(String consumerId) {
|
||||
this.consumerId = consumerId;
|
||||
}
|
||||
|
||||
public String getClientId() {
|
||||
return clientId;
|
||||
}
|
||||
|
||||
public void setTopic(String topic) {
|
||||
this.topic = topic;
|
||||
}
|
||||
|
||||
public void setBatchSize(Integer batchSize) {
|
||||
this.batchSize = batchSize;
|
||||
}
|
||||
|
||||
public void setNumThreads(Integer numThreads) {
|
||||
this.numThreads = numThreads;
|
||||
}
|
||||
|
||||
public void setMaxBatchTime(Integer maxBatchTime) {
|
||||
this.maxBatchTime = maxBatchTime;
|
||||
}
|
||||
|
||||
public Integer getBatchSize() {
|
||||
return batchSize;
|
||||
}
|
||||
|
||||
public Integer getNumThreads() {
|
||||
return numThreads;
|
||||
}
|
||||
|
||||
public Integer getMaxBatchTime() {
|
||||
return maxBatchTime;
|
||||
}
|
||||
}
|
@ -17,16 +17,17 @@
|
||||
|
||||
package com.hpcloud.mon.persister.consumer;
|
||||
|
||||
import com.hpcloud.mon.persister.disruptor.AlarmStateHistoryDisruptor;
|
||||
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHolder;
|
||||
import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent;
|
||||
import com.hpcloud.mon.persister.pipeline.AlarmStateTransitionPipeline;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
|
||||
public class AlarmStateTransitionsConsumer extends Consumer<AlarmStateTransitionedEventHolder> {
|
||||
public class AlarmStateTransitionConsumer extends Consumer<AlarmStateTransitionedEvent> {
|
||||
|
||||
@Inject
|
||||
public AlarmStateTransitionsConsumer(KafkaAlarmStateTransitionConsumer kafkaConsumer,
|
||||
AlarmStateHistoryDisruptor disruptor) {
|
||||
super(kafkaConsumer, disruptor);
|
||||
public AlarmStateTransitionConsumer(@Assisted KafkaAlarmStateTransitionConsumer kafkaConsumer,
|
||||
@Assisted AlarmStateTransitionPipeline pipeline) {
|
||||
super(kafkaConsumer, pipeline);
|
||||
}
|
||||
}
|
@ -0,0 +1,25 @@
|
||||
/*
|
||||
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.consumer;
|
||||
|
||||
import com.hpcloud.mon.persister.pipeline.AlarmStateTransitionPipeline;
|
||||
|
||||
public interface AlarmStateTransitionConsumerFactory {
|
||||
AlarmStateTransitionConsumer create(KafkaAlarmStateTransitionConsumer kafkaConsumer,
|
||||
AlarmStateTransitionPipeline pipeline);
|
||||
}
|
@ -17,7 +17,7 @@
|
||||
|
||||
package com.hpcloud.mon.persister.consumer;
|
||||
|
||||
import com.hpcloud.mon.persister.disruptor.ManagedDisruptor;
|
||||
import com.hpcloud.mon.persister.pipeline.ManagedPipeline;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
|
||||
@ -29,25 +29,25 @@ import org.slf4j.LoggerFactory;
|
||||
public class Consumer<T> implements Managed {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
|
||||
private final KafkaConsumer consumer;
|
||||
private final ManagedDisruptor<T> disruptor;
|
||||
private final KafkaConsumer<T> consumer;
|
||||
private final ManagedPipeline<T> pipeline;
|
||||
|
||||
@Inject
|
||||
public Consumer(KafkaConsumer kafkaConsumer, ManagedDisruptor<T> disruptor) {
|
||||
public Consumer(KafkaConsumer<T> kafkaConsumer, ManagedPipeline<T> pipeline) {
|
||||
this.consumer = kafkaConsumer;
|
||||
this.disruptor = disruptor;
|
||||
this.pipeline = pipeline;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() throws Exception {
|
||||
logger.debug("start");
|
||||
consumer.run();
|
||||
consumer.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws Exception {
|
||||
logger.debug("stop");
|
||||
consumer.stop();
|
||||
disruptor.shutdown();
|
||||
pipeline.shutdown();
|
||||
}
|
||||
}
|
||||
|
@ -17,29 +17,29 @@
|
||||
|
||||
package com.hpcloud.mon.persister.consumer;
|
||||
|
||||
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
|
||||
import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent;
|
||||
import com.hpcloud.mon.persister.pipeline.AlarmStateTransitionPipeline;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
|
||||
import kafka.consumer.KafkaStream;
|
||||
|
||||
public class KafkaAlarmStateTransitionConsumer extends KafkaConsumer {
|
||||
public class KafkaAlarmStateTransitionConsumer extends KafkaConsumer<AlarmStateTransitionedEvent> {
|
||||
|
||||
@Inject
|
||||
private KafkaAlarmStateTransitionConsumerRunnableBasicFactory factory;
|
||||
|
||||
private final AlarmStateTransitionPipeline pipeline;
|
||||
|
||||
@Inject
|
||||
public KafkaAlarmStateTransitionConsumer(MonPersisterConfiguration configuration) {
|
||||
super(configuration);
|
||||
public KafkaAlarmStateTransitionConsumer(@Assisted KafkaChannel kafkaChannel,
|
||||
@Assisted int threadNum, @Assisted final AlarmStateTransitionPipeline pipeline) {
|
||||
super(kafkaChannel, threadNum);
|
||||
this.pipeline = pipeline;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Runnable createRunnable(KafkaStream<byte[], byte[]> stream, int threadNumber) {
|
||||
return factory.create(stream, threadNumber);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getStreamName() {
|
||||
return this.configuration.getAlarmHistoryConfiguration().getTopic();
|
||||
protected KafkaConsumerRunnableBasic<AlarmStateTransitionedEvent> createRunnable(
|
||||
KafkaChannel kafkaChannel, int threadNumber) {
|
||||
return factory.create(pipeline, kafkaChannel, threadNumber);
|
||||
}
|
||||
}
|
||||
|
@ -15,16 +15,11 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.disruptor.event;
|
||||
package com.hpcloud.mon.persister.consumer;
|
||||
|
||||
import com.lmax.disruptor.EventFactory;
|
||||
import com.hpcloud.mon.persister.pipeline.AlarmStateTransitionPipeline;
|
||||
|
||||
public class MetricFactory implements EventFactory<MetricHolder> {
|
||||
|
||||
public static final MetricFactory INSTANCE = new MetricFactory();
|
||||
|
||||
@Override
|
||||
public MetricHolder newInstance() {
|
||||
return new MetricHolder();
|
||||
}
|
||||
public interface KafkaAlarmStateTransitionConsumerFactory {
|
||||
KafkaAlarmStateTransitionConsumer create(KafkaChannel kafkaChannel, int threadNum,
|
||||
final AlarmStateTransitionPipeline pipeline);
|
||||
}
|
@ -18,67 +18,50 @@
|
||||
package com.hpcloud.mon.persister.consumer;
|
||||
|
||||
import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent;
|
||||
import com.hpcloud.mon.persister.disruptor.AlarmStateHistoryDisruptor;
|
||||
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHolder;
|
||||
import com.hpcloud.mon.persister.pipeline.AlarmStateTransitionPipeline;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
import com.lmax.disruptor.EventTranslator;
|
||||
|
||||
import kafka.consumer.ConsumerIterator;
|
||||
import kafka.consumer.KafkaStream;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class KafkaAlarmStateTransitionConsumerRunnableBasic implements Runnable {
|
||||
public class KafkaAlarmStateTransitionConsumerRunnableBasic extends
|
||||
KafkaConsumerRunnableBasic<AlarmStateTransitionedEvent> {
|
||||
|
||||
private static final Logger logger = LoggerFactory
|
||||
.getLogger(KafkaAlarmStateTransitionConsumerRunnableBasic.class);
|
||||
|
||||
private final KafkaStream<byte[], byte[]> stream;
|
||||
private final int threadNumber;
|
||||
private final AlarmStateHistoryDisruptor disruptor;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
@Inject
|
||||
public KafkaAlarmStateTransitionConsumerRunnableBasic(AlarmStateHistoryDisruptor disruptor,
|
||||
@Assisted KafkaStream<byte[], byte[]> stream, @Assisted int threadNumber) {
|
||||
this.stream = stream;
|
||||
this.threadNumber = threadNumber;
|
||||
this.disruptor = disruptor;
|
||||
public KafkaAlarmStateTransitionConsumerRunnableBasic(@Assisted AlarmStateTransitionPipeline pipeline,
|
||||
@Assisted KafkaChannel kafkaChannel, @Assisted int threadNumber) {
|
||||
super(kafkaChannel, pipeline, threadNumber);
|
||||
this.objectMapper = new ObjectMapper();
|
||||
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
|
||||
objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY);
|
||||
objectMapper.enable(DeserializationFeature.UNWRAP_ROOT_VALUE);
|
||||
}
|
||||
|
||||
public void run() {
|
||||
ConsumerIterator<byte[], byte[]> it = stream.iterator();
|
||||
while (it.hasNext()) {
|
||||
@Override
|
||||
protected void publishHeartbeat() {
|
||||
publishEvent(null);
|
||||
}
|
||||
|
||||
final String s = new String(it.next().message());
|
||||
@Override
|
||||
protected void handleMessage(String message) {
|
||||
try {
|
||||
final AlarmStateTransitionedEvent event =
|
||||
objectMapper.readValue(message, AlarmStateTransitionedEvent.class);
|
||||
|
||||
logger.debug("Thread " + threadNumber + ": " + s);
|
||||
logger.debug(event.toString());
|
||||
|
||||
try {
|
||||
final AlarmStateTransitionedEvent event =
|
||||
objectMapper.readValue(s, AlarmStateTransitionedEvent.class);
|
||||
|
||||
logger.debug(event.toString());
|
||||
|
||||
disruptor.publishEvent(new EventTranslator<AlarmStateTransitionedEventHolder>() {
|
||||
@Override
|
||||
public void translateTo(AlarmStateTransitionedEventHolder eventHolder, long sequence) {
|
||||
eventHolder.setEvent(event);
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to deserialize JSON message and place on disruptor queue: " + s, e);
|
||||
}
|
||||
publishEvent(event);
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to deserialize JSON message and send to handler: " + message, e);
|
||||
}
|
||||
logger.debug("Shutting down Thread: " + threadNumber);
|
||||
}
|
||||
}
|
||||
|
@ -17,9 +17,9 @@
|
||||
|
||||
package com.hpcloud.mon.persister.consumer;
|
||||
|
||||
import kafka.consumer.KafkaStream;
|
||||
import com.hpcloud.mon.persister.pipeline.AlarmStateTransitionPipeline;
|
||||
|
||||
public interface KafkaAlarmStateTransitionConsumerRunnableBasicFactory {
|
||||
KafkaAlarmStateTransitionConsumerRunnableBasic create(KafkaStream<byte[], byte[]> stream,
|
||||
KafkaAlarmStateTransitionConsumerRunnableBasic create(AlarmStateTransitionPipeline pipeline, KafkaChannel kafkaChannel,
|
||||
int threadNumber);
|
||||
}
|
||||
|
@ -0,0 +1,122 @@
|
||||
/*
|
||||
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.consumer;
|
||||
|
||||
import com.hpcloud.mon.persister.configuration.KafkaConfiguration;
|
||||
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
|
||||
import com.hpcloud.mon.persister.configuration.PipelineConfiguration;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
|
||||
import kafka.consumer.Consumer;
|
||||
import kafka.consumer.ConsumerConfig;
|
||||
import kafka.consumer.KafkaStream;
|
||||
import kafka.javaapi.consumer.ConsumerConnector;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
public class KafkaChannel {
|
||||
private static final String KAFKA_CONFIGURATION = "Kafka configuration:";
|
||||
private static final Logger logger = LoggerFactory.getLogger(KafkaChannel.class);
|
||||
|
||||
private final String topic;
|
||||
private final ConsumerConnector consumerConnector;
|
||||
private final int threadNum;
|
||||
|
||||
@Inject
|
||||
public KafkaChannel(@Assisted MonPersisterConfiguration configuration,
|
||||
@Assisted PipelineConfiguration pipelineConfiguration, @Assisted int threadNum) {
|
||||
this.topic = pipelineConfiguration.getTopic();
|
||||
this.threadNum = threadNum;
|
||||
Properties kafkaProperties =
|
||||
createKafkaProperties(configuration.getKafkaConfiguration(), pipelineConfiguration);
|
||||
consumerConnector = Consumer.createJavaConsumerConnector(createConsumerConfig(kafkaProperties));
|
||||
}
|
||||
|
||||
public final void markRead() {
|
||||
this.consumerConnector.commitOffsets();
|
||||
}
|
||||
|
||||
public KafkaStream<byte[], byte[]> getKafkaStream() {
|
||||
final Map<String, Integer> topicCountMap = new HashMap<>();
|
||||
topicCountMap.put(this.topic, 1);
|
||||
Map<String, List<KafkaStream<byte[], byte[]>>> streamMap =
|
||||
this.consumerConnector.createMessageStreams(topicCountMap);
|
||||
List<KafkaStream<byte[], byte[]>> streams = streamMap.values().iterator().next();
|
||||
if (streams.size() != 1) {
|
||||
throw new IllegalStateException(String.format(
|
||||
"Expected only one stream but instead there are %d", streams.size()));
|
||||
}
|
||||
return streams.get(0);
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
this.consumerConnector.shutdown();
|
||||
}
|
||||
|
||||
private ConsumerConfig createConsumerConfig(Properties kafkaProperties) {
|
||||
return new ConsumerConfig(kafkaProperties);
|
||||
}
|
||||
|
||||
private Properties createKafkaProperties(KafkaConfiguration kafkaConfiguration,
|
||||
final PipelineConfiguration pipelineConfiguration) {
|
||||
Properties properties = new Properties();
|
||||
|
||||
properties.put("group.id", pipelineConfiguration.getGroupId());
|
||||
properties.put("zookeeper.connect", kafkaConfiguration.getZookeeperConnect());
|
||||
properties.put("consumer.id",
|
||||
String.format("%s_%d", pipelineConfiguration.getConsumerId(), this.threadNum));
|
||||
properties.put("socket.timeout.ms", kafkaConfiguration.getSocketTimeoutMs().toString());
|
||||
properties.put("socket.receive.buffer.bytes", kafkaConfiguration.getSocketReceiveBufferBytes()
|
||||
.toString());
|
||||
properties.put("fetch.message.max.bytes", kafkaConfiguration.getFetchMessageMaxBytes()
|
||||
.toString());
|
||||
// Set auto commit to false because the persister is going to explicitly commit
|
||||
properties.put("auto.commit.enable", "false");
|
||||
properties.put("queued.max.message.chunks", kafkaConfiguration.getQueuedMaxMessageChunks()
|
||||
.toString());
|
||||
properties.put("rebalance.max.retries", kafkaConfiguration.getRebalanceMaxRetries().toString());
|
||||
properties.put("fetch.min.bytes", kafkaConfiguration.getFetchMinBytes().toString());
|
||||
properties.put("fetch.wait.max.ms", kafkaConfiguration.getFetchWaitMaxMs().toString());
|
||||
properties.put("rebalance.backoff.ms", kafkaConfiguration.getRebalanceBackoffMs().toString());
|
||||
properties.put("refresh.leader.backoff.ms", kafkaConfiguration.getRefreshLeaderBackoffMs()
|
||||
.toString());
|
||||
properties.put("auto.offset.reset", kafkaConfiguration.getAutoOffsetReset());
|
||||
properties.put("consumer.timeout.ms", kafkaConfiguration.getConsumerTimeoutMs().toString());
|
||||
properties.put("client.id", String.format("%s_%d", pipelineConfiguration.getClientId(), threadNum));
|
||||
properties.put("zookeeper.session.timeout.ms", kafkaConfiguration
|
||||
.getZookeeperSessionTimeoutMs().toString());
|
||||
properties.put("zookeeper.connection.timeout.ms", kafkaConfiguration
|
||||
.getZookeeperConnectionTimeoutMs().toString());
|
||||
properties
|
||||
.put("zookeeper.sync.time.ms", kafkaConfiguration.getZookeeperSyncTimeMs().toString());
|
||||
|
||||
for (String key : properties.stringPropertyNames()) {
|
||||
logger.info(KAFKA_CONFIGURATION + " " + key + " = " + properties.getProperty(key));
|
||||
}
|
||||
|
||||
return properties;
|
||||
}
|
||||
}
|
@ -18,21 +18,9 @@
|
||||
package com.hpcloud.mon.persister.consumer;
|
||||
|
||||
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
|
||||
import com.hpcloud.mon.persister.configuration.PipelineConfiguration;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Provider;
|
||||
|
||||
public class KafkaStreamsProvider implements Provider<KafkaStreams> {
|
||||
|
||||
private final MonPersisterConfiguration configuration;
|
||||
|
||||
@Inject
|
||||
public KafkaStreamsProvider(MonPersisterConfiguration configuration) {
|
||||
this.configuration = configuration;
|
||||
}
|
||||
|
||||
@Override
|
||||
public KafkaStreams get() {
|
||||
return new KafkaStreams(configuration);
|
||||
}
|
||||
}
|
||||
public interface KafkaChannelFactory {
|
||||
KafkaChannel create(MonPersisterConfiguration configuration,
|
||||
PipelineConfiguration pipelineConfiguration, int threadNum);
|
||||
}
|
@ -14,70 +14,53 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.consumer;
|
||||
|
||||
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
|
||||
import kafka.consumer.KafkaStream;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public abstract class KafkaConsumer {
|
||||
public abstract class KafkaConsumer<T> {
|
||||
|
||||
private static final String KAFKA_CONFIGURATION = "Kafka configuration:";
|
||||
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
|
||||
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
|
||||
|
||||
private static final int WAIT_TIME = 10;
|
||||
private static final int WAIT_TIME = 10;
|
||||
|
||||
protected final MonPersisterConfiguration configuration;
|
||||
private ExecutorService executorService;
|
||||
private final KafkaChannel kafkaChannel;
|
||||
private final int threadNum;
|
||||
private KafkaConsumerRunnableBasic<T> kafkaConsumerRunnableBasic;
|
||||
|
||||
private final Integer numThreads;
|
||||
private ExecutorService executorService;
|
||||
@Inject
|
||||
private KafkaStreams kafkaStreams;
|
||||
public KafkaConsumer(KafkaChannel kafkaChannel, int threadNum) {
|
||||
this.kafkaChannel = kafkaChannel;
|
||||
this.threadNum = threadNum;
|
||||
}
|
||||
|
||||
protected abstract Runnable createRunnable(KafkaStream<byte[], byte[]> stream, int threadNumber);
|
||||
protected abstract String getStreamName();
|
||||
protected abstract KafkaConsumerRunnableBasic<T> createRunnable(KafkaChannel kafkaChannel,
|
||||
int threadNumber);
|
||||
|
||||
@Inject
|
||||
public KafkaConsumer(MonPersisterConfiguration configuration) {
|
||||
public void start() {
|
||||
executorService = Executors.newFixedThreadPool(1);
|
||||
KafkaConsumerRunnableBasic<T> kafkaConsumerRunnableBasic =
|
||||
createRunnable(kafkaChannel, this.threadNum);
|
||||
executorService.submit(kafkaConsumerRunnableBasic);
|
||||
}
|
||||
|
||||
this.configuration = configuration;
|
||||
|
||||
this.numThreads = configuration.getKafkaConfiguration().getNumThreads();
|
||||
logger.info(KAFKA_CONFIGURATION + " numThreads = " + numThreads);
|
||||
}
|
||||
|
||||
public void run() {
|
||||
List<KafkaStream<byte[], byte[]>> streams = kafkaStreams.getStreams().get(getStreamName());
|
||||
executorService = Executors.newFixedThreadPool(numThreads);
|
||||
|
||||
int threadNumber = 0;
|
||||
for (final KafkaStream<byte[], byte[]> stream : streams) {
|
||||
executorService.submit(createRunnable(stream, threadNumber));
|
||||
threadNumber++;
|
||||
}
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
kafkaStreams.stop();
|
||||
if (executorService != null) {
|
||||
executorService.shutdown();
|
||||
try {
|
||||
if (!executorService.awaitTermination(WAIT_TIME, TimeUnit.SECONDS)) {
|
||||
logger.warn("Did not shut down in %d seconds", WAIT_TIME);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
logger.info("awaitTerminiation interrupted", e);
|
||||
}
|
||||
public void stop() {
|
||||
kafkaConsumerRunnableBasic.stop();
|
||||
if (executorService != null) {
|
||||
executorService.shutdown();
|
||||
try {
|
||||
if (!executorService.awaitTermination(WAIT_TIME, TimeUnit.SECONDS)) {
|
||||
logger.warn("Did not shut down in {} seconds", WAIT_TIME);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
logger.info("awaitTerminiation interrupted", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,81 @@
|
||||
/*
|
||||
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.consumer;
|
||||
|
||||
import com.hpcloud.mon.persister.pipeline.ManagedPipeline;
|
||||
|
||||
import kafka.consumer.ConsumerIterator;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public abstract class KafkaConsumerRunnableBasic<T> implements Runnable {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerRunnableBasic.class);
|
||||
private final KafkaChannel kafkaChannel;
|
||||
private final int threadNumber;
|
||||
private final ManagedPipeline<T> pipeline;
|
||||
private volatile boolean stop = false;
|
||||
|
||||
public KafkaConsumerRunnableBasic(KafkaChannel kafkaChannel,
|
||||
ManagedPipeline<T> pipeline,
|
||||
int threadNumber) {
|
||||
this.kafkaChannel = kafkaChannel;
|
||||
this.pipeline = pipeline;
|
||||
this.threadNumber = threadNumber;
|
||||
}
|
||||
|
||||
abstract protected void publishHeartbeat();
|
||||
|
||||
abstract protected void handleMessage(String message);
|
||||
|
||||
protected void markRead() {
|
||||
this.kafkaChannel.markRead();
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
this.stop = true;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
final ConsumerIterator<byte[], byte[]> it = kafkaChannel.getKafkaStream().iterator();
|
||||
logger.debug("KafkaChannel {} has stream", this.threadNumber);
|
||||
while (!this.stop) {
|
||||
try {
|
||||
if (it.hasNext()) {
|
||||
final String s = new String(it.next().message());
|
||||
|
||||
logger.debug("Thread {}: {}", threadNumber, s);
|
||||
|
||||
handleMessage(s);
|
||||
}
|
||||
} catch (kafka.consumer.ConsumerTimeoutException cte) {
|
||||
publishHeartbeat();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
logger.debug("Shutting down Thread: {}", threadNumber);
|
||||
this.kafkaChannel.stop();
|
||||
}
|
||||
|
||||
protected void publishEvent(final T event) {
|
||||
if (pipeline.publishEvent(event)) {
|
||||
markRead();
|
||||
}
|
||||
}
|
||||
}
|
@ -17,29 +17,29 @@
|
||||
|
||||
package com.hpcloud.mon.persister.consumer;
|
||||
|
||||
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
|
||||
import com.hpcloud.mon.common.model.metric.MetricEnvelope;
|
||||
import com.hpcloud.mon.persister.pipeline.MetricPipeline;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
|
||||
import kafka.consumer.KafkaStream;
|
||||
|
||||
public class KafkaMetricsConsumer extends KafkaConsumer {
|
||||
public class KafkaMetricsConsumer extends KafkaConsumer<MetricEnvelope[]> {
|
||||
|
||||
@Inject
|
||||
private KafkaMetricsConsumerRunnableBasicFactory factory;
|
||||
|
||||
private final MetricPipeline pipeline;
|
||||
|
||||
@Inject
|
||||
public KafkaMetricsConsumer(MonPersisterConfiguration configuration) {
|
||||
super(configuration);
|
||||
public KafkaMetricsConsumer(@Assisted KafkaChannel kafkaChannel, @Assisted int threadNum,
|
||||
@Assisted MetricPipeline pipeline) {
|
||||
super(kafkaChannel, threadNum);
|
||||
this.pipeline = pipeline;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Runnable createRunnable(KafkaStream<byte[], byte[]> stream, int threadNumber) {
|
||||
return factory.create(stream, threadNumber);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getStreamName() {
|
||||
return this.configuration.getMetricConfiguration().getTopic();
|
||||
protected KafkaConsumerRunnableBasic<MetricEnvelope[]> createRunnable(KafkaChannel kafkaChannel,
|
||||
int threadNumber) {
|
||||
return factory.create(pipeline, kafkaChannel, threadNumber);
|
||||
}
|
||||
}
|
||||
|
@ -15,16 +15,11 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.configuration;
|
||||
package com.hpcloud.mon.persister.consumer;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.hpcloud.mon.persister.pipeline.MetricPipeline;
|
||||
|
||||
public class AlarmHistoryConfiguration {
|
||||
|
||||
@JsonProperty
|
||||
String topic;
|
||||
|
||||
public String getTopic() {
|
||||
return topic;
|
||||
}
|
||||
public interface KafkaMetricsConsumerFactory {
|
||||
public KafkaMetricsConsumer create(KafkaChannel kafkaChannel, int threadNum,
|
||||
MetricPipeline pipeline);
|
||||
}
|
@ -18,68 +18,49 @@
|
||||
package com.hpcloud.mon.persister.consumer;
|
||||
|
||||
import com.hpcloud.mon.common.model.metric.MetricEnvelope;
|
||||
import com.hpcloud.mon.persister.disruptor.MetricDisruptor;
|
||||
import com.hpcloud.mon.persister.disruptor.event.MetricHolder;
|
||||
import com.hpcloud.mon.persister.pipeline.MetricPipeline;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
import com.lmax.disruptor.EventTranslator;
|
||||
|
||||
import kafka.consumer.ConsumerIterator;
|
||||
import kafka.consumer.KafkaStream;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class KafkaMetricsConsumerRunnableBasic implements Runnable {
|
||||
public class KafkaMetricsConsumerRunnableBasic extends KafkaConsumerRunnableBasic<MetricEnvelope[]> {
|
||||
|
||||
private static final Logger logger = LoggerFactory
|
||||
.getLogger(KafkaMetricsConsumerRunnableBasic.class);
|
||||
private final KafkaStream<byte[], byte[]> stream;
|
||||
private final int threadNumber;
|
||||
private final MetricDisruptor disruptor;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
@Inject
|
||||
public KafkaMetricsConsumerRunnableBasic(MetricDisruptor disruptor,
|
||||
@Assisted KafkaStream<byte[], byte[]> stream, @Assisted int threadNumber) {
|
||||
this.stream = stream;
|
||||
this.threadNumber = threadNumber;
|
||||
this.disruptor = disruptor;
|
||||
public KafkaMetricsConsumerRunnableBasic(@Assisted MetricPipeline pipeline,
|
||||
@Assisted KafkaChannel kafkaChannel, @Assisted int threadNumber) {
|
||||
super(kafkaChannel, pipeline, threadNumber);
|
||||
this.objectMapper = new ObjectMapper();
|
||||
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
|
||||
objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY);
|
||||
}
|
||||
|
||||
public void run() {
|
||||
ConsumerIterator<byte[], byte[]> it = stream.iterator();
|
||||
while (it.hasNext()) {
|
||||
@Override
|
||||
protected void publishHeartbeat() {
|
||||
publishEvent(null);
|
||||
}
|
||||
|
||||
final String s = new String(it.next().message());
|
||||
@Override
|
||||
protected void handleMessage(String message) {
|
||||
try {
|
||||
final MetricEnvelope[] envelopes = objectMapper.readValue(message, MetricEnvelope[].class);
|
||||
|
||||
logger.debug("Thread {}: {}", threadNumber, s);
|
||||
|
||||
try {
|
||||
final MetricEnvelope[] envelopes = objectMapper.readValue(s, MetricEnvelope[].class);
|
||||
|
||||
for (final MetricEnvelope envelope : envelopes) {
|
||||
|
||||
logger.debug("{}", envelope);
|
||||
|
||||
disruptor.publishEvent(new EventTranslator<MetricHolder>() {
|
||||
@Override
|
||||
public void translateTo(MetricHolder event, long sequence) {
|
||||
event.setEnvelope(envelope);
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to deserialize JSON message and place on disruptor queue: {}", e);
|
||||
for (final MetricEnvelope envelope : envelopes) {
|
||||
logger.debug("{}", envelope);
|
||||
}
|
||||
|
||||
publishEvent(envelopes);
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to deserialize JSON message and place on pipeline queue: " + message,
|
||||
e);
|
||||
}
|
||||
logger.debug("Shutting down Thread: {}", threadNumber);
|
||||
}
|
||||
}
|
||||
|
@ -17,8 +17,8 @@
|
||||
|
||||
package com.hpcloud.mon.persister.consumer;
|
||||
|
||||
import kafka.consumer.KafkaStream;
|
||||
import com.hpcloud.mon.persister.pipeline.MetricPipeline;
|
||||
|
||||
public interface KafkaMetricsConsumerRunnableBasicFactory {
|
||||
KafkaMetricsConsumerRunnableBasic create(KafkaStream<byte[], byte[]> stream, int threadNumber);
|
||||
KafkaMetricsConsumerRunnableBasic create(MetricPipeline pipeline, KafkaChannel kafkaChannel, int threadNumber);
|
||||
}
|
||||
|
@ -1,112 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.consumer;
|
||||
|
||||
import com.hpcloud.mon.persister.configuration.KafkaConfiguration;
|
||||
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
|
||||
|
||||
import kafka.consumer.Consumer;
|
||||
import kafka.consumer.ConsumerConfig;
|
||||
import kafka.consumer.KafkaStream;
|
||||
import kafka.javaapi.consumer.ConsumerConnector;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
public class KafkaStreams {
|
||||
private static final String KAFKA_CONFIGURATION = "Kafka configuration:";
|
||||
private static final Logger logger = LoggerFactory.getLogger(KafkaStreams.class);
|
||||
|
||||
private final MonPersisterConfiguration configuration;
|
||||
|
||||
private final ConsumerConnector consumerConnector;
|
||||
private final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap;
|
||||
|
||||
public KafkaStreams(MonPersisterConfiguration configuration) {
|
||||
this.configuration = configuration;
|
||||
Properties kafkaProperties = createKafkaProperties(configuration.getKafkaConfiguration());
|
||||
ConsumerConfig consumerConfig = createConsumerConfig(kafkaProperties);
|
||||
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
|
||||
Map<String, Integer> topicCountMap = new HashMap<>();
|
||||
Integer numThreads = configuration.getKafkaConfiguration().getNumThreads();
|
||||
topicCountMap.put(this.configuration.getMetricConfiguration().getTopic(), (int) numThreads);
|
||||
topicCountMap.put(this.configuration.getAlarmHistoryConfiguration().getTopic(),
|
||||
(int) numThreads);
|
||||
consumerMap = consumerConnector.createMessageStreams(topicCountMap);
|
||||
}
|
||||
|
||||
public final Map<String, List<KafkaStream<byte[], byte[]>>> getStreams() {
|
||||
return consumerMap;
|
||||
}
|
||||
|
||||
private ConsumerConfig createConsumerConfig(Properties kafkaProperties) {
|
||||
return new ConsumerConfig(kafkaProperties);
|
||||
}
|
||||
|
||||
private Properties createKafkaProperties(KafkaConfiguration metricsKafkaConfiguration) {
|
||||
Properties properties = new Properties();
|
||||
|
||||
properties.put("group.id", metricsKafkaConfiguration.getGroupId());
|
||||
properties.put("zookeeper.connect", metricsKafkaConfiguration.getZookeeperConnect());
|
||||
properties.put("consumer.id", metricsKafkaConfiguration.getConsumerId());
|
||||
properties.put("socket.timeout.ms", metricsKafkaConfiguration.getSocketTimeoutMs().toString());
|
||||
properties.put("socket.receive.buffer.bytes", metricsKafkaConfiguration
|
||||
.getSocketReceiveBufferBytes().toString());
|
||||
properties.put("fetch.message.max.bytes", metricsKafkaConfiguration.getFetchMessageMaxBytes()
|
||||
.toString());
|
||||
properties
|
||||
.put("auto.commit.enable", metricsKafkaConfiguration.getAutoCommitEnable().toString());
|
||||
properties.put("auto.commit.interval.ms", metricsKafkaConfiguration.getAutoCommitIntervalMs()
|
||||
.toString());
|
||||
properties.put("queued.max.message.chunks", metricsKafkaConfiguration
|
||||
.getQueuedMaxMessageChunks().toString());
|
||||
properties.put("rebalance.max.retries", metricsKafkaConfiguration.getRebalanceMaxRetries()
|
||||
.toString());
|
||||
properties.put("fetch.min.bytes", metricsKafkaConfiguration.getFetchMinBytes().toString());
|
||||
properties.put("fetch.wait.max.ms", metricsKafkaConfiguration.getFetchWaitMaxMs().toString());
|
||||
properties.put("rebalance.backoff.ms", metricsKafkaConfiguration.getRebalanceBackoffMs()
|
||||
.toString());
|
||||
properties.put("refresh.leader.backoff.ms", metricsKafkaConfiguration
|
||||
.getRefreshLeaderBackoffMs().toString());
|
||||
properties.put("auto.offset.reset", metricsKafkaConfiguration.getAutoOffsetReset());
|
||||
properties.put("consumer.timeout.ms", metricsKafkaConfiguration.getConsumerTimeoutMs()
|
||||
.toString());
|
||||
properties.put("client.id", metricsKafkaConfiguration.getClientId());
|
||||
properties.put("zookeeper.session.timeout.ms", metricsKafkaConfiguration
|
||||
.getZookeeperSessionTimeoutMs().toString());
|
||||
properties.put("zookeeper.connection.timeout.ms", metricsKafkaConfiguration
|
||||
.getZookeeperConnectionTimeoutMs().toString());
|
||||
properties.put("zookeeper.sync.time.ms", metricsKafkaConfiguration.getZookeeperSyncTimeMs()
|
||||
.toString());
|
||||
|
||||
for (String key : properties.stringPropertyNames()) {
|
||||
logger.info(KAFKA_CONFIGURATION + " " + key + " = " + properties.getProperty(key));
|
||||
}
|
||||
|
||||
return properties;
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
consumerConnector.shutdown();
|
||||
}
|
||||
}
|
@ -17,15 +17,16 @@
|
||||
|
||||
package com.hpcloud.mon.persister.consumer;
|
||||
|
||||
import com.hpcloud.mon.persister.disruptor.MetricDisruptor;
|
||||
import com.hpcloud.mon.persister.disruptor.event.MetricHolder;
|
||||
import com.hpcloud.mon.common.model.metric.MetricEnvelope;
|
||||
import com.hpcloud.mon.persister.pipeline.MetricPipeline;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
|
||||
public class MetricsConsumer extends Consumer<MetricHolder> {
|
||||
public class MetricsConsumer extends Consumer<MetricEnvelope[]> {
|
||||
|
||||
@Inject
|
||||
public MetricsConsumer(KafkaMetricsConsumer kafkaConsumer, MetricDisruptor disruptor) {
|
||||
super(kafkaConsumer, disruptor);
|
||||
public MetricsConsumer(@Assisted KafkaMetricsConsumer kafkaConsumer, @Assisted MetricPipeline pipeline) {
|
||||
super(kafkaConsumer, pipeline);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,21 @@
|
||||
/*
|
||||
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
* or implied. See the License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.consumer;
|
||||
|
||||
import com.hpcloud.mon.persister.pipeline.MetricPipeline;
|
||||
|
||||
public interface MetricsConsumerFactory {
|
||||
MetricsConsumer create(KafkaMetricsConsumer kafkaConsumer, MetricPipeline pipeline);
|
||||
}
|
@ -1,94 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.disruptor;
|
||||
|
||||
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
|
||||
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventFactory;
|
||||
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHandler;
|
||||
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHandlerFactory;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Provider;
|
||||
import com.lmax.disruptor.ExceptionHandler;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
public class AlarmHistoryDisruptorProvider implements Provider<AlarmStateHistoryDisruptor> {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(AlarmHistoryDisruptorProvider.class);
|
||||
|
||||
private final MonPersisterConfiguration configuration;
|
||||
private final AlarmStateTransitionedEventHandlerFactory eventHandlerFactory;
|
||||
private final ExceptionHandler exceptionHandler;
|
||||
private final AlarmStateHistoryDisruptor instance;
|
||||
|
||||
@Inject
|
||||
public AlarmHistoryDisruptorProvider(MonPersisterConfiguration configuration,
|
||||
AlarmStateTransitionedEventHandlerFactory eventHandlerFactory,
|
||||
ExceptionHandler exceptionHandler) {
|
||||
this.configuration = configuration;
|
||||
this.eventHandlerFactory = eventHandlerFactory;
|
||||
this.exceptionHandler = exceptionHandler;
|
||||
this.instance = createInstance();
|
||||
}
|
||||
|
||||
private AlarmStateHistoryDisruptor createInstance() {
|
||||
|
||||
logger.debug("Creating disruptor...");
|
||||
|
||||
Executor executor = Executors.newCachedThreadPool();
|
||||
AlarmStateTransitionedEventFactory eventFactory = new AlarmStateTransitionedEventFactory();
|
||||
|
||||
int bufferSize = configuration.getDisruptorConfiguration().getBufferSize();
|
||||
logger.debug("Buffer size for instance of disruptor [" + bufferSize + "]");
|
||||
|
||||
AlarmStateHistoryDisruptor disruptor =
|
||||
new AlarmStateHistoryDisruptor(eventFactory, bufferSize, executor);
|
||||
disruptor.handleExceptionsWith(exceptionHandler);
|
||||
|
||||
int batchSize = configuration.getOutputProcessorConfiguration().getBatchSize();
|
||||
logger.debug("Batch size for each output processor [" + batchSize + "]");
|
||||
|
||||
int numOutputProcessors = configuration.getDisruptorConfiguration().getNumProcessors();
|
||||
logger.debug("Number of output processors [" + numOutputProcessors + "]");
|
||||
|
||||
AlarmStateTransitionedEventHandler[] eventHandlers =
|
||||
new AlarmStateTransitionedEventHandler[numOutputProcessors];
|
||||
|
||||
for (int i = 0; i < numOutputProcessors; ++i) {
|
||||
eventHandlers[i] = eventHandlerFactory.create(i, numOutputProcessors, batchSize);
|
||||
}
|
||||
|
||||
disruptor.handleEventsWith(eventHandlers);
|
||||
disruptor.setHandlers(eventHandlers);
|
||||
disruptor.start();
|
||||
|
||||
logger.debug("Instance of disruptor successfully started");
|
||||
logger.debug("Instance of disruptor fully created");
|
||||
|
||||
return disruptor;
|
||||
}
|
||||
|
||||
public AlarmStateHistoryDisruptor get() {
|
||||
return instance;
|
||||
}
|
||||
}
|
@ -1,39 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.disruptor;
|
||||
|
||||
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHolder;
|
||||
|
||||
import com.lmax.disruptor.EventFactory;
|
||||
import com.lmax.disruptor.WaitStrategy;
|
||||
import com.lmax.disruptor.dsl.ProducerType;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
public class AlarmStateHistoryDisruptor extends ManagedDisruptor<AlarmStateTransitionedEventHolder> {
|
||||
public AlarmStateHistoryDisruptor(EventFactory<AlarmStateTransitionedEventHolder> eventFactory,
|
||||
int ringBufferSize, Executor executor) {
|
||||
super(eventFactory, ringBufferSize, executor);
|
||||
}
|
||||
|
||||
public AlarmStateHistoryDisruptor(
|
||||
final EventFactory<AlarmStateTransitionedEventHolder> eventFactory, int ringBufferSize,
|
||||
Executor executor, ProducerType producerType, WaitStrategy waitStrategy) {
|
||||
super(eventFactory, ringBufferSize, executor, producerType, waitStrategy);
|
||||
}
|
||||
}
|
@ -1,49 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.disruptor;
|
||||
|
||||
import com.lmax.disruptor.ExceptionHandler;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class DisruptorExceptionHandler implements ExceptionHandler {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(DisruptorExceptionHandler.class);
|
||||
|
||||
@Override
|
||||
public void handleEventException(Throwable ex, long sequence, Object event) {
|
||||
|
||||
logger.error("Disruptor encountered an exception during normal operation", ex);
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleOnStartException(Throwable ex) {
|
||||
|
||||
logger.error("Disruptor encountered an exception during startup", ex);
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleOnShutdownException(Throwable ex) {
|
||||
|
||||
logger.error("Disruptor encountered an exception during shutdown", ex);
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
@ -1,52 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.disruptor;
|
||||
|
||||
import com.hpcloud.mon.persister.disruptor.event.FlushableHandler;
|
||||
|
||||
import com.lmax.disruptor.EventFactory;
|
||||
import com.lmax.disruptor.WaitStrategy;
|
||||
import com.lmax.disruptor.dsl.Disruptor;
|
||||
import com.lmax.disruptor.dsl.ProducerType;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
public class ManagedDisruptor<T> extends Disruptor<T> {
|
||||
private FlushableHandler[] handlers = new FlushableHandler[0];
|
||||
|
||||
public ManagedDisruptor(EventFactory<T> eventFactory, int ringBufferSize, Executor executor) {
|
||||
super(eventFactory, ringBufferSize, executor);
|
||||
}
|
||||
|
||||
public ManagedDisruptor(final EventFactory<T> eventFactory, int ringBufferSize,
|
||||
Executor executor, ProducerType producerType, WaitStrategy waitStrategy) {
|
||||
super(eventFactory, ringBufferSize, executor, producerType, waitStrategy);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
for (FlushableHandler handler : handlers) {
|
||||
handler.flush();
|
||||
}
|
||||
super.shutdown();
|
||||
}
|
||||
|
||||
public void setHandlers(FlushableHandler[] handlers) {
|
||||
this.handlers = handlers;
|
||||
}
|
||||
}
|
@ -1,39 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.disruptor;
|
||||
|
||||
import com.hpcloud.mon.persister.disruptor.event.MetricHolder;
|
||||
|
||||
import com.lmax.disruptor.EventFactory;
|
||||
import com.lmax.disruptor.WaitStrategy;
|
||||
import com.lmax.disruptor.dsl.ProducerType;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
public class MetricDisruptor extends ManagedDisruptor<MetricHolder> {
|
||||
|
||||
public MetricDisruptor(EventFactory<MetricHolder> eventFactory, int ringBufferSize,
|
||||
Executor executor) {
|
||||
super(eventFactory, ringBufferSize, executor);
|
||||
}
|
||||
|
||||
public MetricDisruptor(final EventFactory<MetricHolder> eventFactory, int ringBufferSize,
|
||||
Executor executor, ProducerType producerType, WaitStrategy waitStrategy) {
|
||||
super(eventFactory, ringBufferSize, executor, producerType, waitStrategy);
|
||||
}
|
||||
}
|
@ -1,92 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.disruptor;
|
||||
|
||||
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
|
||||
import com.hpcloud.mon.persister.disruptor.event.MetricFactory;
|
||||
import com.hpcloud.mon.persister.disruptor.event.MetricHandler;
|
||||
import com.hpcloud.mon.persister.disruptor.event.MetricHandlerFactory;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Provider;
|
||||
import com.lmax.disruptor.ExceptionHandler;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
public class MetricDisruptorProvider implements Provider<MetricDisruptor> {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(MetricDisruptorProvider.class);
|
||||
|
||||
private final MonPersisterConfiguration configuration;
|
||||
private final MetricHandlerFactory eventHandlerFactory;
|
||||
private final ExceptionHandler exceptionHandler;
|
||||
private final MetricDisruptor instance;
|
||||
|
||||
@Inject
|
||||
public MetricDisruptorProvider(MonPersisterConfiguration configuration,
|
||||
MetricHandlerFactory eventHandlerFactory, ExceptionHandler exceptionHandler) {
|
||||
|
||||
this.configuration = configuration;
|
||||
this.eventHandlerFactory = eventHandlerFactory;
|
||||
this.exceptionHandler = exceptionHandler;
|
||||
this.instance = createInstance();
|
||||
}
|
||||
|
||||
private MetricDisruptor createInstance() {
|
||||
|
||||
logger.debug("Creating disruptor...");
|
||||
|
||||
Executor executor = Executors.newCachedThreadPool();
|
||||
MetricFactory eventFactory = new MetricFactory();
|
||||
|
||||
int bufferSize = configuration.getDisruptorConfiguration().getBufferSize();
|
||||
logger.debug("Buffer size for instance of disruptor [" + bufferSize + "]");
|
||||
|
||||
MetricDisruptor disruptor = new MetricDisruptor(eventFactory, bufferSize, executor);
|
||||
disruptor.handleExceptionsWith(exceptionHandler);
|
||||
|
||||
int batchSize = configuration.getOutputProcessorConfiguration().getBatchSize();
|
||||
logger.debug("Batch size for each output processor [" + batchSize + "]");
|
||||
|
||||
int numOutputProcessors = configuration.getDisruptorConfiguration().getNumProcessors();
|
||||
logger.debug("Number of output processors [" + numOutputProcessors + "]");
|
||||
|
||||
MetricHandler[] metricHandlers = new MetricHandler[numOutputProcessors];
|
||||
|
||||
for (int i = 0; i < numOutputProcessors; ++i) {
|
||||
metricHandlers[i] = eventHandlerFactory.create(i, numOutputProcessors, batchSize);
|
||||
}
|
||||
|
||||
disruptor.handleEventsWith(metricHandlers);
|
||||
disruptor.setHandlers(metricHandlers);
|
||||
disruptor.start();
|
||||
|
||||
logger.debug("Instance of disruptor successfully started");
|
||||
logger.debug("Instance of disruptor fully created");
|
||||
|
||||
return disruptor;
|
||||
}
|
||||
|
||||
public MetricDisruptor get() {
|
||||
return instance;
|
||||
}
|
||||
}
|
@ -1,29 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.disruptor.event;
|
||||
|
||||
import com.lmax.disruptor.EventFactory;
|
||||
|
||||
public class AlarmStateTransitionedEventFactory implements
|
||||
EventFactory<AlarmStateTransitionedEventHolder> {
|
||||
|
||||
@Override
|
||||
public AlarmStateTransitionedEventHolder newInstance() {
|
||||
return new AlarmStateTransitionedEventHolder();
|
||||
}
|
||||
}
|
@ -1,124 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.disruptor.event;
|
||||
|
||||
import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent;
|
||||
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
|
||||
import com.hpcloud.mon.persister.repository.AlarmRepository;
|
||||
|
||||
import com.codahale.metrics.Meter;
|
||||
import com.codahale.metrics.Timer;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
|
||||
import io.dropwizard.setup.Environment;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class AlarmStateTransitionedEventHandler implements
|
||||
EventHandler<AlarmStateTransitionedEventHolder>, FlushableHandler {
|
||||
|
||||
private static final Logger logger = LoggerFactory
|
||||
.getLogger(AlarmStateTransitionedEventHandler.class);
|
||||
private final int ordinal;
|
||||
private final int numProcessors;
|
||||
private final int batchSize;
|
||||
|
||||
private long millisSinceLastFlush = System.currentTimeMillis();
|
||||
private final long millisBetweenFlushes;
|
||||
private final int secondsBetweenFlushes;
|
||||
|
||||
private final AlarmRepository repository;
|
||||
private final Environment environment;
|
||||
|
||||
private final Meter processedMeter;
|
||||
private final Meter commitMeter;
|
||||
private final Timer commitTimer;
|
||||
|
||||
@Inject
|
||||
public AlarmStateTransitionedEventHandler(AlarmRepository repository,
|
||||
MonPersisterConfiguration configuration, Environment environment,
|
||||
@Assisted("ordinal") int ordinal, @Assisted("numProcessors") int numProcessors,
|
||||
@Assisted("batchSize") int batchSize) {
|
||||
|
||||
this.repository = repository;
|
||||
this.environment = environment;
|
||||
this.processedMeter =
|
||||
this.environment.metrics().meter(
|
||||
this.getClass().getName() + "." + "alarm-messages-processed-processedMeter");
|
||||
this.commitMeter =
|
||||
this.environment.metrics().meter(
|
||||
this.getClass().getName() + "." + "commits-executed-processedMeter");
|
||||
this.commitTimer =
|
||||
this.environment.metrics().timer(
|
||||
this.getClass().getName() + "." + "total-commit-and-flush-timer");
|
||||
|
||||
this.secondsBetweenFlushes =
|
||||
configuration.getMonDeDuperConfiguration().getDedupeRunFrequencySeconds();
|
||||
this.millisBetweenFlushes = secondsBetweenFlushes * 1000;
|
||||
|
||||
this.ordinal = ordinal;
|
||||
this.numProcessors = numProcessors;
|
||||
this.batchSize = batchSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onEvent(AlarmStateTransitionedEventHolder eventHolder, long sequence, boolean b)
|
||||
throws Exception {
|
||||
|
||||
if (eventHolder.getEvent() == null) {
|
||||
logger.debug("Received heartbeat message. Checking last flush time.");
|
||||
if (millisSinceLastFlush + millisBetweenFlushes < System.currentTimeMillis()) {
|
||||
logger.debug("It's been more than " + secondsBetweenFlushes
|
||||
+ " seconds since last flush. Flushing staging tables now...");
|
||||
flush();
|
||||
} else {
|
||||
logger.debug("It has not been more than " + secondsBetweenFlushes
|
||||
+ " seconds since last flush. No need to perform flush at this time.");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (((sequence / batchSize) % this.numProcessors) != this.ordinal) {
|
||||
return;
|
||||
}
|
||||
|
||||
processedMeter.mark();
|
||||
|
||||
logger.debug("Sequence number: " + sequence + " Ordinal: " + ordinal + " Event: "
|
||||
+ eventHolder.getEvent());
|
||||
|
||||
AlarmStateTransitionedEvent event = eventHolder.getEvent();
|
||||
repository.addToBatch(event);
|
||||
|
||||
if (sequence % batchSize == (batchSize - 1)) {
|
||||
Timer.Context context = commitTimer.time();
|
||||
flush();
|
||||
context.stop();
|
||||
commitMeter.mark();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() {
|
||||
repository.flush();
|
||||
millisSinceLastFlush = System.currentTimeMillis();
|
||||
}
|
||||
}
|
@ -15,18 +15,17 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.disruptor.event;
|
||||
package com.hpcloud.mon.persister.pipeline;
|
||||
|
||||
import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent;
|
||||
import com.hpcloud.mon.persister.pipeline.event.AlarmStateTransitionedEventHandler;
|
||||
|
||||
public class AlarmStateTransitionedEventHolder {
|
||||
AlarmStateTransitionedEvent event;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
|
||||
public AlarmStateTransitionedEvent getEvent() {
|
||||
return event;
|
||||
}
|
||||
|
||||
public void setEvent(AlarmStateTransitionedEvent event) {
|
||||
this.event = event;
|
||||
public class AlarmStateTransitionPipeline extends ManagedPipeline<AlarmStateTransitionedEvent> {
|
||||
@Inject
|
||||
public AlarmStateTransitionPipeline(@Assisted AlarmStateTransitionedEventHandler handler) {
|
||||
super(handler);
|
||||
}
|
||||
}
|
@ -15,16 +15,10 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.configuration;
|
||||
package com.hpcloud.mon.persister.pipeline;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.hpcloud.mon.persister.pipeline.event.AlarmStateTransitionedEventHandler;
|
||||
|
||||
public class MetricConfiguration {
|
||||
|
||||
@JsonProperty
|
||||
String topic;
|
||||
|
||||
public String getTopic() {
|
||||
return topic;
|
||||
}
|
||||
public interface AlarmStateTransitionPipelineFactory {
|
||||
AlarmStateTransitionPipeline create(AlarmStateTransitionedEventHandler handler);
|
||||
}
|
@ -0,0 +1,46 @@
|
||||
/*
|
||||
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.pipeline;
|
||||
|
||||
import com.hpcloud.mon.persister.pipeline.event.FlushableHandler;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class ManagedPipeline<T> {
|
||||
private static final Logger logger = LoggerFactory.getLogger(ManagedPipeline.class);
|
||||
|
||||
private final FlushableHandler<T> eventHandler;
|
||||
|
||||
public ManagedPipeline(FlushableHandler<T> eventHandler) {
|
||||
this.eventHandler = eventHandler;
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
eventHandler.flush();
|
||||
}
|
||||
|
||||
public boolean publishEvent(T holder) {
|
||||
try {
|
||||
return this.eventHandler.onEvent(holder);
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to handle event", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
@ -15,19 +15,18 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.disruptor.event;
|
||||
package com.hpcloud.mon.persister.pipeline;
|
||||
|
||||
import com.hpcloud.mon.common.model.metric.MetricEnvelope;
|
||||
import com.hpcloud.mon.persister.pipeline.event.MetricHandler;
|
||||
|
||||
public class MetricHolder {
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
|
||||
MetricEnvelope metricEnvelope;
|
||||
public class MetricPipeline extends ManagedPipeline<MetricEnvelope[]> {
|
||||
|
||||
public MetricEnvelope getMetricEnvelope() {
|
||||
return metricEnvelope;
|
||||
}
|
||||
|
||||
public void setEnvelope(MetricEnvelope metricEnvelope) {
|
||||
this.metricEnvelope = metricEnvelope;
|
||||
@Inject
|
||||
public MetricPipeline(@Assisted MetricHandler metricHandler) {
|
||||
super(metricHandler);
|
||||
}
|
||||
}
|
@ -15,8 +15,10 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.disruptor.event;
|
||||
package com.hpcloud.mon.persister.pipeline;
|
||||
|
||||
public interface FlushableHandler {
|
||||
public void flush();
|
||||
import com.hpcloud.mon.persister.pipeline.event.MetricHandler;
|
||||
|
||||
public interface MetricPipelineFactory {
|
||||
MetricPipeline create(MetricHandler metricHandler);
|
||||
}
|
@ -0,0 +1,64 @@
|
||||
/*
|
||||
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.pipeline.event;
|
||||
|
||||
import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent;
|
||||
import com.hpcloud.mon.persister.configuration.PipelineConfiguration;
|
||||
import com.hpcloud.mon.persister.repository.AlarmRepository;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
|
||||
import io.dropwizard.setup.Environment;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class AlarmStateTransitionedEventHandler extends
|
||||
FlushableHandler<AlarmStateTransitionedEvent> {
|
||||
|
||||
private static final Logger logger = LoggerFactory
|
||||
.getLogger(AlarmStateTransitionedEventHandler.class);
|
||||
|
||||
private final AlarmRepository repository;
|
||||
private final int ordinal;
|
||||
|
||||
@Inject
|
||||
public AlarmStateTransitionedEventHandler(AlarmRepository repository,
|
||||
@Assisted PipelineConfiguration configuration, Environment environment,
|
||||
@Assisted("ordinal") int ordinal,
|
||||
@Assisted("batchSize") int batchSize) {
|
||||
super(configuration, environment, ordinal, batchSize,
|
||||
AlarmStateTransitionedEventHandler.class.getName());
|
||||
this.repository = repository;
|
||||
this.ordinal = ordinal;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int process(AlarmStateTransitionedEvent event) throws Exception {
|
||||
logger.debug("Ordinal: Event: {}", this.ordinal, event);
|
||||
|
||||
repository.addToBatch(event);
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void flushRepository() {
|
||||
repository.flush();
|
||||
}
|
||||
}
|
@ -15,11 +15,13 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.disruptor.event;
|
||||
package com.hpcloud.mon.persister.pipeline.event;
|
||||
|
||||
import com.hpcloud.mon.persister.configuration.PipelineConfiguration;
|
||||
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
|
||||
public interface AlarmStateTransitionedEventHandlerFactory {
|
||||
AlarmStateTransitionedEventHandler create(@Assisted("ordinal") int ordinal,
|
||||
@Assisted("numProcessors") int numProcessors, @Assisted("batchSize") int batchSize);
|
||||
AlarmStateTransitionedEventHandler create(PipelineConfiguration configuration,
|
||||
@Assisted("ordinal") int ordinal, @Assisted("batchSize") int batchSize);
|
||||
}
|
@ -0,0 +1,116 @@
|
||||
/*
|
||||
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.pipeline.event;
|
||||
|
||||
import com.hpcloud.mon.persister.configuration.PipelineConfiguration;
|
||||
|
||||
import com.codahale.metrics.Meter;
|
||||
import com.codahale.metrics.Timer;
|
||||
|
||||
import io.dropwizard.setup.Environment;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public abstract class FlushableHandler<T> {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(FlushableHandler.class);
|
||||
private final int ordinal;
|
||||
private final int batchSize;
|
||||
private final String handlerName;
|
||||
|
||||
private long millisSinceLastFlush = System.currentTimeMillis();
|
||||
private final long millisBetweenFlushes;
|
||||
private final int secondsBetweenFlushes;
|
||||
private int eventCount = 0;
|
||||
|
||||
private final Environment environment;
|
||||
|
||||
private final Meter processedMeter;
|
||||
private final Meter commitMeter;
|
||||
private final Timer commitTimer;
|
||||
|
||||
protected FlushableHandler(PipelineConfiguration configuration, Environment environment,
|
||||
int ordinal, int batchSize, String baseName) {
|
||||
|
||||
this.handlerName = String.format("%s[%d]", baseName, ordinal);
|
||||
this.environment = environment;
|
||||
this.processedMeter =
|
||||
this.environment.metrics()
|
||||
.meter(handlerName + "." + "events-processed-processedMeter");
|
||||
this.commitMeter =
|
||||
this.environment.metrics().meter(handlerName + "." + "commits-executed-processedMeter");
|
||||
this.commitTimer =
|
||||
this.environment.metrics().timer(handlerName + "." + "total-commit-and-flush-timer");
|
||||
|
||||
this.secondsBetweenFlushes = configuration.getMaxBatchTime();
|
||||
this.millisBetweenFlushes = secondsBetweenFlushes * 1000;
|
||||
|
||||
this.ordinal = ordinal;
|
||||
this.batchSize = batchSize;
|
||||
}
|
||||
|
||||
protected abstract void flushRepository();
|
||||
|
||||
protected abstract int process(T metricEvent) throws Exception;
|
||||
|
||||
public boolean onEvent(final T event) throws Exception {
|
||||
|
||||
if (event == null) {
|
||||
long delta = millisSinceLastFlush + millisBetweenFlushes;
|
||||
logger.debug("{} received heartbeat message, flush every {} seconds.", this.handlerName,
|
||||
this.secondsBetweenFlushes);
|
||||
if (delta < System.currentTimeMillis()) {
|
||||
logger.debug("{}: {} seconds since last flush. Flushing to repository now.",
|
||||
this.handlerName, delta);
|
||||
flush();
|
||||
return true;
|
||||
} else {
|
||||
logger.debug("{}: {} seconds since last flush. No need to flush at this time.",
|
||||
this.handlerName, delta);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
processedMeter.mark();
|
||||
|
||||
logger.debug("Ordinal: Event: {}", ordinal, event);
|
||||
|
||||
eventCount += process(event);
|
||||
|
||||
if (eventCount >= batchSize) {
|
||||
flush();
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public void flush() {
|
||||
if (eventCount == 0) {
|
||||
logger.debug("{}: Nothing to flush", this.handlerName);
|
||||
}
|
||||
Timer.Context context = commitTimer.time();
|
||||
flushRepository();
|
||||
context.stop();
|
||||
commitMeter.mark();
|
||||
millisSinceLastFlush = System.currentTimeMillis();
|
||||
logger.debug("{}: Flushed {} events", this.handlerName, this.eventCount);
|
||||
eventCount = 0;
|
||||
}
|
||||
}
|
@ -15,21 +15,19 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.disruptor.event;
|
||||
package com.hpcloud.mon.persister.pipeline.event;
|
||||
|
||||
import static com.hpcloud.mon.persister.repository.VerticaMetricsConstants.MAX_COLUMN_LENGTH;
|
||||
|
||||
import com.hpcloud.mon.common.model.metric.Metric;
|
||||
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
|
||||
import com.hpcloud.mon.common.model.metric.MetricEnvelope;
|
||||
import com.hpcloud.mon.persister.configuration.PipelineConfiguration;
|
||||
import com.hpcloud.mon.persister.repository.MetricRepository;
|
||||
import com.hpcloud.mon.persister.repository.Sha1HashId;
|
||||
|
||||
import com.codahale.metrics.Counter;
|
||||
import com.codahale.metrics.Meter;
|
||||
import com.codahale.metrics.Timer;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
|
||||
import io.dropwizard.setup.Environment;
|
||||
|
||||
@ -43,106 +41,65 @@ import java.util.Map;
|
||||
import java.util.TimeZone;
|
||||
import java.util.TreeMap;
|
||||
|
||||
public class MetricHandler implements EventHandler<MetricHolder>, FlushableHandler {
|
||||
public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(MetricHandler.class);
|
||||
private static final String TENANT_ID = "tenantId";
|
||||
private static final String REGION = "region";
|
||||
|
||||
private final int ordinal;
|
||||
private final int numProcessors;
|
||||
private final int batchSize;
|
||||
|
||||
private final SimpleDateFormat simpleDateFormat;
|
||||
|
||||
private long millisSinceLastFlush = System.currentTimeMillis();
|
||||
private final long millisBetweenFlushes;
|
||||
private final int secondsBetweenFlushes;
|
||||
|
||||
private final MetricRepository verticaMetricRepository;
|
||||
private final Environment environment;
|
||||
|
||||
private final Counter metricCounter;
|
||||
private final Counter definitionCounter;
|
||||
private final Counter dimensionCounter;
|
||||
private final Counter definitionDimensionsCounter;
|
||||
private final Meter metricMeter;
|
||||
private final Meter commitMeter;
|
||||
private final Timer commitTimer;
|
||||
|
||||
@Inject
|
||||
public MetricHandler(MetricRepository metricRepository, MonPersisterConfiguration configuration,
|
||||
public MetricHandler(MetricRepository metricRepository, @Assisted PipelineConfiguration configuration,
|
||||
Environment environment, @Assisted("ordinal") int ordinal,
|
||||
@Assisted("numProcessors") int numProcessors, @Assisted("batchSize") int batchSize) {
|
||||
|
||||
@Assisted("batchSize") int batchSize) {
|
||||
super(configuration, environment, ordinal, batchSize, MetricHandler.class.getName());
|
||||
final String handlerName = String.format("%s[%d]", MetricHandler.class.getName(), ordinal);
|
||||
this.verticaMetricRepository = metricRepository;
|
||||
this.environment = environment;
|
||||
this.metricCounter =
|
||||
this.environment.metrics().counter(
|
||||
this.getClass().getName() + "." + "metrics-added-to-batch-counter");
|
||||
environment.metrics().counter(handlerName + "." + "metrics-added-to-batch-counter");
|
||||
this.definitionCounter =
|
||||
this.environment.metrics().counter(
|
||||
this.getClass().getName() + "." + "metric-definitions-added-to-batch-counter");
|
||||
environment.metrics().counter(
|
||||
handlerName + "." + "metric-definitions-added-to-batch-counter");
|
||||
this.dimensionCounter =
|
||||
this.environment.metrics().counter(
|
||||
this.getClass().getName() + "." + "metric-dimensions-added-to-batch-counter");
|
||||
environment.metrics().counter(
|
||||
handlerName + "." + "metric-dimensions-added-to-batch-counter");
|
||||
this.definitionDimensionsCounter =
|
||||
this.environment.metrics()
|
||||
.counter(
|
||||
this.getClass().getName() + "."
|
||||
+ "metric-definition-dimensions-added-to-batch-counter");
|
||||
this.metricMeter =
|
||||
this.environment.metrics().meter(
|
||||
this.getClass().getName() + "." + "metrics-messages-processed-meter");
|
||||
this.commitMeter =
|
||||
this.environment.metrics()
|
||||
.meter(this.getClass().getName() + "." + "commits-executed-meter");
|
||||
this.commitTimer =
|
||||
this.environment.metrics().timer(
|
||||
this.getClass().getName() + "." + "total-commit-and-flush-timer");
|
||||
|
||||
this.secondsBetweenFlushes =
|
||||
configuration.getMonDeDuperConfiguration().getDedupeRunFrequencySeconds();
|
||||
this.millisBetweenFlushes = secondsBetweenFlushes * 1000;
|
||||
environment.metrics().counter(
|
||||
handlerName + "." + "metric-definition-dimensions-added-to-batch-counter");
|
||||
|
||||
this.ordinal = ordinal;
|
||||
this.numProcessors = numProcessors;
|
||||
this.batchSize = batchSize;
|
||||
|
||||
simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
simpleDateFormat.setTimeZone(TimeZone.getTimeZone("GMT-0"));
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onEvent(MetricHolder metricEvent, long sequence, boolean b) throws Exception {
|
||||
|
||||
if (metricEvent.getMetricEnvelope() == null) {
|
||||
logger.debug("Received heartbeat message. Checking last flush time.");
|
||||
if (millisSinceLastFlush + millisBetweenFlushes < System.currentTimeMillis()) {
|
||||
logger.debug("It's been more than " + secondsBetweenFlushes
|
||||
+ " seconds since last flush. Flushing staging tables now...");
|
||||
flush();
|
||||
} else {
|
||||
logger.debug("It has not been more than " + secondsBetweenFlushes
|
||||
+ " seconds since last flush. No need to perform flush at this time.");
|
||||
}
|
||||
return;
|
||||
public int process(MetricEnvelope[] metricEnvelopes) throws Exception {
|
||||
int metricCount = 0;
|
||||
for (final MetricEnvelope metricEnvelope : metricEnvelopes) {
|
||||
metricCount += processEnvelope(metricEnvelope);
|
||||
}
|
||||
return metricCount;
|
||||
}
|
||||
|
||||
if (((sequence / batchSize) % this.numProcessors) != this.ordinal) {
|
||||
return;
|
||||
}
|
||||
private int processEnvelope(MetricEnvelope metricEnvelope) {
|
||||
int metricCount = 0;
|
||||
Metric metric = metricEnvelope.metric;
|
||||
Map<String, Object> meta = metricEnvelope.meta;
|
||||
|
||||
metricMeter.mark();
|
||||
|
||||
Metric metric = metricEvent.getMetricEnvelope().metric;
|
||||
Map<String, Object> meta = metricEvent.getMetricEnvelope().meta;
|
||||
|
||||
logger.debug("sequence number: " + sequence);
|
||||
logger.debug("ordinal: " + ordinal);
|
||||
logger.debug("metric: " + metric.toString());
|
||||
logger.debug("meta: " + meta.toString());
|
||||
logger.debug("ordinal: {}", ordinal);
|
||||
logger.debug("metric: {}", metric);
|
||||
logger.debug("meta: {}", meta);
|
||||
|
||||
String tenantId = "";
|
||||
if (meta.containsKey(TENANT_ID)) {
|
||||
@ -225,27 +182,21 @@ public class MetricHandler implements EventHandler<MetricHolder>, FlushableHandl
|
||||
double value = timeValuePairs[1];
|
||||
verticaMetricRepository.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value);
|
||||
metricCounter.inc();
|
||||
metricCount++;
|
||||
}
|
||||
} else {
|
||||
String timeStamp = simpleDateFormat.format(new Date(metric.getTimestamp() * 1000));
|
||||
double value = metric.getValue();
|
||||
verticaMetricRepository.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value);
|
||||
metricCounter.inc();
|
||||
metricCount++;
|
||||
}
|
||||
|
||||
if (sequence % batchSize == (batchSize - 1)) {
|
||||
Timer.Context context = commitTimer.time();
|
||||
flush();
|
||||
context.stop();
|
||||
commitMeter.mark();
|
||||
}
|
||||
|
||||
return metricCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() {
|
||||
public void flushRepository() {
|
||||
verticaMetricRepository.flush();
|
||||
millisSinceLastFlush = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
private String trunc(String s, int l) {
|
||||
@ -261,6 +212,5 @@ public class MetricHandler implements EventHandler<MetricHolder>, FlushableHandl
|
||||
logger.warn("Resulting string {}", r);
|
||||
return r;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -15,11 +15,13 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.disruptor.event;
|
||||
package com.hpcloud.mon.persister.pipeline.event;
|
||||
|
||||
import com.hpcloud.mon.persister.configuration.PipelineConfiguration;
|
||||
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
|
||||
public interface MetricHandlerFactory {
|
||||
MetricHandler create(@Assisted("ordinal") int ordinal,
|
||||
@Assisted("numProcessors") int numProcessors, @Assisted("batchSize") int batchSize);
|
||||
MetricHandler create(PipelineConfiguration pipelineConfiguration,
|
||||
@Assisted("ordinal") int ordinal, @Assisted("batchSize") int batchSize);
|
||||
}
|
@ -95,7 +95,7 @@ public class InfluxDBAlarmRepository implements AlarmRepository {
|
||||
Timer.Context context = flushTimer.time();
|
||||
|
||||
Serie serie = new Serie(ALARM_STATE_HISTORY_NAME);
|
||||
logger.debug("Created serie: " + serie.getName());
|
||||
logger.debug("Created serie: {}", serie.getName());
|
||||
|
||||
serie.setColumns(this.colNamesStringArry);
|
||||
|
||||
@ -131,7 +131,7 @@ public class InfluxDBAlarmRepository implements AlarmRepository {
|
||||
|
||||
context.stop();
|
||||
long endTime = System.currentTimeMillis();
|
||||
logger.debug("Commiting batch took " + (endTime - startTime) / 1000 + " seconds");
|
||||
logger.debug("Commiting batch took {} seconds", (endTime - startTime) / 1000);
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to write alarm state history to database", e);
|
||||
@ -154,7 +154,7 @@ public class InfluxDBAlarmRepository implements AlarmRepository {
|
||||
}
|
||||
sb.append(colVal);
|
||||
}
|
||||
logger.debug("Array of column values[{}]: [" + sb.toString() + "]", outerIdx);
|
||||
logger.debug("Array of column values[{}]: [{}]", outerIdx, sb);
|
||||
outerIdx++;
|
||||
}
|
||||
}
|
||||
@ -171,6 +171,6 @@ public class InfluxDBAlarmRepository implements AlarmRepository {
|
||||
}
|
||||
sb.append(colName);
|
||||
}
|
||||
logger.debug("Array of column names: [" + sb.toString() + "]");
|
||||
logger.debug("Array of column names: [{}]", sb);
|
||||
}
|
||||
}
|
||||
|
@ -17,11 +17,13 @@
|
||||
|
||||
package com.hpcloud.mon.persister.repository;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
|
||||
|
||||
import com.codahale.metrics.Meter;
|
||||
import com.codahale.metrics.Timer;
|
||||
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
|
||||
import com.google.inject.Inject;
|
||||
|
||||
import io.dropwizard.setup.Environment;
|
||||
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.influxdb.InfluxDB;
|
||||
@ -41,8 +43,6 @@ import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.dropwizard.setup.Environment;
|
||||
|
||||
public class InfluxDBMetricRepository implements MetricRepository {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(InfluxDBMetricRepository.class);
|
||||
@ -59,7 +59,7 @@ public class InfluxDBMetricRepository implements MetricRepository {
|
||||
private final com.codahale.metrics.Timer flushTimer;
|
||||
public final Meter measurementMeter;
|
||||
|
||||
private static final SimpleDateFormat measurementTimeStampSimpleDateFormat = new
|
||||
private final SimpleDateFormat measurementTimeStampSimpleDateFormat = new
|
||||
SimpleDateFormat("yyyy-MM-dd HH:mm:ss zzz");
|
||||
private static final Sha1HashId BLANK_SHA_1_HASH_ID = new Sha1HashId(DigestUtils.sha(""));
|
||||
|
||||
@ -123,8 +123,8 @@ public class InfluxDBMetricRepository implements MetricRepository {
|
||||
TimeUnit.SECONDS);
|
||||
long endTime = System.currentTimeMillis();
|
||||
context.stop();
|
||||
logger.debug("Writing measurements, definitions, and dimensions to database took " +
|
||||
(endTime - startTime) / 1000 + " seconds");
|
||||
logger.debug("Writing measurements, definitions, and dimensions to database took {} seconds",
|
||||
(endTime - startTime) / 1000);
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to write measurements to database", e);
|
||||
}
|
||||
@ -148,7 +148,7 @@ public class InfluxDBMetricRepository implements MetricRepository {
|
||||
for (Set<String> dimNameSet : dimNameSetMap.keySet()) {
|
||||
|
||||
Serie serie = new Serie(definition.name);
|
||||
logger.debug("Created serie: " + serie.getName());
|
||||
logger.debug("Created serie: {}", serie.getName());
|
||||
|
||||
// Add 4 for the tenant id, region, timestamp, and value.
|
||||
String[] colNameStringArry = new String[dimNameSet.size() + 4];
|
||||
@ -158,7 +158,7 @@ public class InfluxDBMetricRepository implements MetricRepository {
|
||||
colNameStringArry[1] = "region";
|
||||
int j = 2;
|
||||
for (String dimName : dimNameSet) {
|
||||
logger.debug("Adding column name[{}]: " + dimName, j);
|
||||
logger.debug("Adding column name[{}]: {}", j, dimName);
|
||||
colNameStringArry[j++] = dimName;
|
||||
}
|
||||
logger.debug("Adding column name[{}]: time", j);
|
||||
@ -181,9 +181,9 @@ public class InfluxDBMetricRepository implements MetricRepository {
|
||||
Object[][] colValsObjectArry = new Object[pointList.size()][dimNameSet.size() + 4];
|
||||
int k = 0;
|
||||
for (Point point : pointList) {
|
||||
logger.debug("Adding column value[{}][0]: " + definition.tenantId, k, 0);
|
||||
logger.debug("Adding column value[{}][0]: {}", k, definition.tenantId);
|
||||
colValsObjectArry[k][0] = definition.tenantId;
|
||||
logger.debug("Adding column value[{}][1]: " + definition.region, k, 1);
|
||||
logger.debug("Adding column value[{}][1]: {}", k, definition.region);
|
||||
colValsObjectArry[k][1] = definition.region;
|
||||
int l = 2;
|
||||
for (String dimName : dimNameSet) {
|
||||
@ -196,9 +196,9 @@ public class InfluxDBMetricRepository implements MetricRepository {
|
||||
}
|
||||
Date d = measurementTimeStampSimpleDateFormat.parse(point.measurement.timeStamp + " UTC");
|
||||
Long time = d.getTime() / 1000;
|
||||
logger.debug("Adding column value[{}][{}]: " + time, k, l);
|
||||
logger.debug("Adding column value[{}][{}]: {}", k, l, time);
|
||||
colValsObjectArry[k][l++] = time;
|
||||
logger.debug("Adding column value[{}][{}]: " + point.measurement.value, k, l);
|
||||
logger.debug("Adding column value[{}][{}]: {}", k, l, point.measurement.value);
|
||||
colValsObjectArry[k][l++] = point.measurement.value;
|
||||
measurementMeter.mark();
|
||||
k++;
|
||||
@ -232,7 +232,7 @@ public class InfluxDBMetricRepository implements MetricRepository {
|
||||
}
|
||||
sb.append(colVal);
|
||||
}
|
||||
logger.debug("Array of column values[{}]: [" + sb.toString() + "]", outerIdx);
|
||||
logger.debug("Array of column values[{}]: [{}]", outerIdx, sb);
|
||||
outerIdx++;
|
||||
}
|
||||
}
|
||||
@ -249,7 +249,7 @@ public class InfluxDBMetricRepository implements MetricRepository {
|
||||
}
|
||||
sb.append(colName);
|
||||
}
|
||||
logger.debug("Array of column names: [" + sb.toString() + "]");
|
||||
logger.debug("Array of column names: [{}]", sb);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1,115 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.hpcloud.mon.persister.repository;
|
||||
|
||||
import com.hpcloud.mon.persister.disruptor.AlarmStateHistoryDisruptor;
|
||||
import com.hpcloud.mon.persister.disruptor.MetricDisruptor;
|
||||
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHolder;
|
||||
import com.hpcloud.mon.persister.disruptor.event.MetricHolder;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.lmax.disruptor.EventTranslator;
|
||||
import com.lmax.disruptor.dsl.Disruptor;
|
||||
|
||||
import io.dropwizard.lifecycle.Managed;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class RepositoryCommitHeartbeat implements Managed {
|
||||
|
||||
private final HeartbeatRunnable deduperRunnable;
|
||||
|
||||
@Inject
|
||||
public RepositoryCommitHeartbeat(MetricDisruptor metricDisruptor,
|
||||
AlarmStateHistoryDisruptor alarmHistoryDisruptor) {
|
||||
this.deduperRunnable = new HeartbeatRunnable(metricDisruptor, alarmHistoryDisruptor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() throws Exception {
|
||||
|
||||
Thread heartbeatThread = new Thread(deduperRunnable);
|
||||
heartbeatThread.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws Exception {
|
||||
this.deduperRunnable.stop();
|
||||
}
|
||||
|
||||
private static class HeartbeatRunnable implements Runnable {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(HeartbeatRunnable.class);
|
||||
private final Disruptor<MetricHolder> metricDisruptor;
|
||||
private final Disruptor<AlarmStateTransitionedEventHolder> alarmHistoryDisruptor;
|
||||
|
||||
private boolean stop = false;
|
||||
|
||||
private HeartbeatRunnable(MetricDisruptor metricDisruptor,
|
||||
AlarmStateHistoryDisruptor alarmHistoryDisruptor) {
|
||||
this.metricDisruptor = metricDisruptor;
|
||||
this.alarmHistoryDisruptor = alarmHistoryDisruptor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
for (;;) {
|
||||
try {
|
||||
// Send a heartbeat every second.
|
||||
synchronized (this) {
|
||||
this.wait(1000);
|
||||
if (stop) {
|
||||
logger.debug("Heartbeat thread is exiting");
|
||||
break;
|
||||
}
|
||||
}
|
||||
logger.debug("Waking up after sleeping 1 seconds, yawn...");
|
||||
|
||||
// Send heartbeat
|
||||
logger.debug("Sending heartbeat message");
|
||||
metricDisruptor.publishEvent(new EventTranslator<MetricHolder>() {
|
||||
|
||||
@Override
|
||||
public void translateTo(MetricHolder event, long sequence) {
|
||||
event.setEnvelope(null);
|
||||
}
|
||||
});
|
||||
alarmHistoryDisruptor
|
||||
.publishEvent(new EventTranslator<AlarmStateTransitionedEventHolder>() {
|
||||
|
||||
@Override
|
||||
public void translateTo(AlarmStateTransitionedEventHolder event, long sequence) {
|
||||
event.setEvent(null);
|
||||
}
|
||||
});
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to send heartbeat", e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public synchronized void stop() {
|
||||
stop = true;
|
||||
this.notify();
|
||||
}
|
||||
}
|
||||
}
|
@ -1,24 +1,33 @@
|
||||
name: mon-persister
|
||||
|
||||
alarmHistoryConfiguration:
|
||||
batchSize: 100
|
||||
numThreads: 1
|
||||
maxBatchTime: 15
|
||||
# See http://kafka.apache.org/documentation.html#api for semantics and defaults.
|
||||
topic: alarm-state-transitions
|
||||
groupId: persister_alarms
|
||||
consumerId: 1
|
||||
clientId: 1
|
||||
|
||||
metricConfiguration:
|
||||
batchSize: 1000
|
||||
numThreads: 2
|
||||
maxBatchTime: 30
|
||||
# See http://kafka.apache.org/documentation.html#api for semantics and defaults.
|
||||
topic: metrics
|
||||
groupId: persister_metrics
|
||||
consumerId: 1
|
||||
clientId: 1
|
||||
|
||||
#Kafka settings.
|
||||
kafkaConfiguration:
|
||||
# See http://kafka.apache.org/documentation.html#api for semantics and defaults.
|
||||
numThreads: 1
|
||||
groupId: 1
|
||||
#zookeeperConnect: localhost:2181
|
||||
# See http://kafka.apache.org/documentation.html#api for semantics and defaults.
|
||||
zookeeperConnect: 192.168.10.4:2181
|
||||
consumerId: 1
|
||||
socketTimeoutMs: 30000
|
||||
socketReceiveBufferBytes : 65536
|
||||
fetchMessageMaxBytes: 1048576
|
||||
autoCommitEnable: true
|
||||
autoCommitIntervalMs: 60000
|
||||
queuedMaxMessageChunks: 10
|
||||
rebalanceMaxRetries: 4
|
||||
fetchMinBytes: 1
|
||||
@ -26,23 +35,11 @@ kafkaConfiguration:
|
||||
rebalanceBackoffMs: 2000
|
||||
refreshLeaderBackoffMs: 200
|
||||
autoOffsetReset: largest
|
||||
consumerTimeoutMs: -1
|
||||
clientId: 1
|
||||
consumerTimeoutMs: 1000
|
||||
zookeeperSessionTimeoutMs : 60000
|
||||
zookeeperConnectionTimeoutMs : 6000
|
||||
zookeeperSyncTimeMs: 2000
|
||||
|
||||
|
||||
disruptorConfiguration:
|
||||
bufferSize: 1048576
|
||||
numProcessors: 1
|
||||
|
||||
outputProcessorConfiguration:
|
||||
batchSize: 100
|
||||
|
||||
monDeDuperConfiguration:
|
||||
dedupeRunFrequencySeconds: 30
|
||||
|
||||
verticaMetricRepositoryConfiguration:
|
||||
maxCacheSize: 2000000
|
||||
|
||||
@ -54,7 +51,7 @@ databaseConfiguration:
|
||||
influxDbConfiguration:
|
||||
name: mon
|
||||
replicationFactor: 1
|
||||
url: http://127.0.0.1:8086
|
||||
url: http://192.168.10.4:8086
|
||||
user: root
|
||||
password: root
|
||||
|
||||
|
@ -19,12 +19,13 @@ package com.hpcloud.mon.persister;
|
||||
|
||||
import com.hpcloud.mon.persister.consumer.KafkaMetricsConsumer;
|
||||
import com.hpcloud.mon.persister.consumer.MetricsConsumer;
|
||||
import com.hpcloud.mon.persister.pipeline.MetricPipeline;
|
||||
import com.hpcloud.mon.persister.pipeline.event.MetricHandler;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
|
||||
public class MonPersisterConsumerTest {
|
||||
@ -35,33 +36,22 @@ public class MonPersisterConsumerTest {
|
||||
@Mock
|
||||
private MetricsConsumer monConsumer;
|
||||
|
||||
private MetricHandler metricHandler;
|
||||
|
||||
private MetricPipeline metricPipeline;
|
||||
|
||||
@Before
|
||||
public void initMocks() {
|
||||
metricHandler = Mockito.mock(MetricHandler.class);
|
||||
metricPipeline = Mockito.spy(new MetricPipeline(metricHandler));
|
||||
MockitoAnnotations.initMocks(this);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKafkaConsumerStart() {
|
||||
try {
|
||||
monConsumer.start();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace(); // To change body of catch statement use File | Settings | File Templates.
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKafkaConsumerStop() {
|
||||
try {
|
||||
monConsumer.stop();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace(); // To change body of catch statement use File | Settings | File
|
||||
// Templates.
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() {
|
||||
System.out.println("after");
|
||||
public void testKafkaConsumerLifecycle() throws Exception {
|
||||
monConsumer.start();
|
||||
monConsumer.stop();
|
||||
metricPipeline.shutdown();
|
||||
Mockito.verify(metricHandler).flush();
|
||||
}
|
||||
}
|
||||
|
@ -1,74 +0,0 @@
|
||||
package com.hpcloud.mon.persister;
|
||||
|
||||
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
|
||||
import com.hpcloud.mon.persister.consumer.KafkaConsumer;
|
||||
import com.hpcloud.util.config.ConfigurationException;
|
||||
import com.hpcloud.util.config.ConfigurationFactory;
|
||||
|
||||
import kafka.consumer.ConsumerIterator;
|
||||
import kafka.consumer.KafkaStream;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
public class Test extends KafkaConsumer {
|
||||
private static final Logger logger = LoggerFactory.getLogger(Test.class);
|
||||
|
||||
private static final String TOPIC = "Test";
|
||||
|
||||
public Test(MonPersisterConfiguration configuration) {
|
||||
super(configuration);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws IOException, ConfigurationException {
|
||||
final MonPersisterConfiguration config = createConfig(args[0]);
|
||||
config.getKafkaConfiguration();
|
||||
final Test test = new Test(config);
|
||||
test.run();
|
||||
}
|
||||
|
||||
private static MonPersisterConfiguration createConfig(String configFileName) throws IOException,
|
||||
ConfigurationException {
|
||||
return ConfigurationFactory
|
||||
.<MonPersisterConfiguration>forClass(MonPersisterConfiguration.class).build(
|
||||
new File(configFileName));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Runnable createRunnable(KafkaStream<byte[], byte[]> stream, int threadNumber) {
|
||||
logger.info("Created KafkaReader for {}", threadNumber);
|
||||
return new KafkaReader(stream, threadNumber);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getStreamName() {
|
||||
return TOPIC;
|
||||
}
|
||||
|
||||
protected class KafkaReader implements Runnable {
|
||||
|
||||
private final KafkaStream<byte[], byte[]> stream;
|
||||
private final int threadNumber;
|
||||
|
||||
public KafkaReader(KafkaStream<byte[], byte[]> stream, int threadNumber) {
|
||||
this.threadNumber = threadNumber;
|
||||
this.stream = stream;
|
||||
}
|
||||
|
||||
|
||||
public void run() {
|
||||
ConsumerIterator<byte[], byte[]> it = stream.iterator();
|
||||
while (it.hasNext()) {
|
||||
|
||||
final String s = new String(it.next().message());
|
||||
|
||||
logger.debug("Thread {}: {}", threadNumber, s);
|
||||
}
|
||||
logger.debug("Shutting down Thread: " + threadNumber);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user