Add Cassandra db support

Added cassandra db plugins in persister, inlcuding Java and
python version.

Change-Id: I4bbe48f8fe550385de6ed3e14120850a8542c7c9
story: 2001231
task: 5758
This commit is contained in:
James Gu 2017-08-31 15:17:11 -07:00 committed by Witold Bedyk
parent e73c693c51
commit 20337572e3
28 changed files with 2271 additions and 286 deletions

View File

@ -49,6 +49,26 @@
<artifactId>monasca-common-influxdb</artifactId> <artifactId>monasca-common-influxdb</artifactId>
<version>${mon.common.version}</version> <version>${mon.common.version}</version>
</dependency> </dependency>
<dependency>
<groupId>monasca-common</groupId>
<artifactId>monasca-common-cassandra</artifactId>
<version>${mon.common.version}</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-mapping</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-extras</artifactId>
<version>3.1.0</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.kafka</groupId> <groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId> <artifactId>kafka_2.11</artifactId>
@ -72,6 +92,12 @@
<groupId>io.dropwizard</groupId> <groupId>io.dropwizard</groupId>
<artifactId>dropwizard-core</artifactId> <artifactId>dropwizard-core</artifactId>
<version>0.7.0</version> <version>0.7.0</version>
<exclusions>
<exclusion>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>io.dropwizard</groupId> <groupId>io.dropwizard</groupId>
@ -88,6 +114,11 @@
<artifactId>guice-assistedinject</artifactId> <artifactId>guice-assistedinject</artifactId>
<version>3.0</version> <version>3.0</version>
</dependency> </dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>17.0</version>
</dependency>
<dependency> <dependency>
<groupId>org.mockito</groupId> <groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId> <artifactId>mockito-all</artifactId>

View File

@ -1,6 +1,8 @@
/* /*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P. * Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
* *
* Copyright (c) 2017 SUSE LLC.
*
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
@ -46,6 +48,9 @@ import monasca.persister.pipeline.event.AlarmStateTransitionHandlerFactory;
import monasca.persister.pipeline.event.MetricHandler; import monasca.persister.pipeline.event.MetricHandler;
import monasca.persister.pipeline.event.MetricHandlerFactory; import monasca.persister.pipeline.event.MetricHandlerFactory;
import monasca.persister.repository.Repo; import monasca.persister.repository.Repo;
import monasca.persister.repository.cassandra.CassandraAlarmRepo;
import monasca.persister.repository.cassandra.CassandraCluster;
import monasca.persister.repository.cassandra.CassandraMetricRepo;
import monasca.persister.repository.influxdb.InfluxV9AlarmRepo; import monasca.persister.repository.influxdb.InfluxV9AlarmRepo;
import monasca.persister.repository.influxdb.InfluxV9MetricRepo; import monasca.persister.repository.influxdb.InfluxV9MetricRepo;
import monasca.persister.repository.influxdb.InfluxV9RepoWriter; import monasca.persister.repository.influxdb.InfluxV9RepoWriter;
@ -56,7 +61,7 @@ public class PersisterModule extends AbstractModule {
private static final String VERTICA = "vertica"; private static final String VERTICA = "vertica";
private static final String INFLUXDB = "influxdb"; private static final String INFLUXDB = "influxdb";
private static final String CASSANDRA = "cassandra";
private static final String INFLUXDB_V9 = "v9"; private static final String INFLUXDB_V9 = "v9";
private final PersisterConfig config; private final PersisterConfig config;
@ -168,6 +173,13 @@ public class PersisterModule extends AbstractModule {
bind(new TypeLiteral<Repo<AlarmStateTransitionedEvent>> () {}) bind(new TypeLiteral<Repo<AlarmStateTransitionedEvent>> () {})
.to(InfluxV9AlarmRepo.class); .to(InfluxV9AlarmRepo.class);
} else if (config.getDatabaseConfiguration().getDatabaseType().equalsIgnoreCase(CASSANDRA)) {
bind(CassandraCluster.class).in(Singleton.class);
bind(new TypeLiteral<Repo<MetricEnvelope>>() {}).to(CassandraMetricRepo.class);
bind(new TypeLiteral<Repo<AlarmStateTransitionedEvent>>() {}).to(CassandraAlarmRepo.class);
} else { } else {
System.err.println( System.err.println(

View File

@ -1,6 +1,8 @@
/* /*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P. * Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
* *
* Copyright (c) 2017 SUSE LLC.
*
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
@ -19,6 +21,7 @@ package monasca.persister.configuration;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import monasca.common.configuration.CassandraDbConfiguration;
import monasca.common.configuration.DatabaseConfiguration; import monasca.common.configuration.DatabaseConfiguration;
import monasca.common.configuration.InfluxDbConfiguration; import monasca.common.configuration.InfluxDbConfiguration;
import io.dropwizard.Configuration; import io.dropwizard.Configuration;
@ -97,4 +100,12 @@ public class PersisterConfig extends Configuration {
public InfluxDbConfiguration getInfluxDBConfiguration() { public InfluxDbConfiguration getInfluxDBConfiguration() {
return influxDbConfiguration; return influxDbConfiguration;
} }
@Valid
@JsonProperty
private final CassandraDbConfiguration cassandraDbConfiguration = new CassandraDbConfiguration();
public CassandraDbConfiguration getCassandraDbConfiguration() {
return cassandraDbConfiguration;
}
} }

View File

@ -42,6 +42,17 @@ public class PipelineConfig {
@JsonProperty @JsonProperty
Integer maxBatchTime; Integer maxBatchTime;
@JsonProperty
Integer commitBatchTime;
public Integer getCommitBatchTime() {
return commitBatchTime;
}
public void setCommitBatchTime(Integer commitBatchTime) {
this.commitBatchTime = commitBatchTime;
}
public String getTopic() { public String getTopic() {
return topic; return topic;
} }

View File

@ -1,6 +1,8 @@
/* /*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P. * Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
* *
* Copyright (c) 2017 SUSE LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
@ -44,33 +46,50 @@ public class KafkaChannel {
private final String topic; private final String topic;
private final ConsumerConnector consumerConnector; private final ConsumerConnector consumerConnector;
private final String threadId; private final String threadId;
private final int commitBatchtimeInMills;
private long nextCommitTime;
private boolean commitDirty = false;
@Inject @Inject
public KafkaChannel( public KafkaChannel(PersisterConfig configuration, @Assisted PipelineConfig pipelineConfig,
PersisterConfig configuration,
@Assisted PipelineConfig pipelineConfig,
@Assisted String threadId) { @Assisted String threadId) {
this.topic = pipelineConfig.getTopic(); this.topic = pipelineConfig.getTopic();
this.threadId = threadId; this.threadId = threadId;
Properties kafkaProperties = this.commitBatchtimeInMills = pipelineConfig.getCommitBatchTime();
createKafkaProperties(configuration.getKafkaConfig(), pipelineConfig); nextCommitTime = System.currentTimeMillis() + commitBatchtimeInMills;
Properties kafkaProperties = createKafkaProperties(configuration.getKafkaConfig(), pipelineConfig);
consumerConnector = Consumer.createJavaConsumerConnector(createConsumerConfig(kafkaProperties)); consumerConnector = Consumer.createJavaConsumerConnector(createConsumerConfig(kafkaProperties));
} }
public final void markRead() { public final void markRead() {
this.consumerConnector.commitOffsets(); if (commitBatchtimeInMills <= 0) {
consumerConnector.commitOffsets();
} else if (nextCommitTime <= System.currentTimeMillis()) {
consumerConnector.commitOffsets();
nextCommitTime = System.currentTimeMillis() + commitBatchtimeInMills;
commitDirty = false;
} else {
commitDirty = true;
}
}
public final void markReadIfDirty() {
if (commitDirty) {
this.consumerConnector.commitOffsets();
commitDirty = false;
}
} }
public KafkaStream<byte[], byte[]> getKafkaStream() { public KafkaStream<byte[], byte[]> getKafkaStream() {
final Map<String, Integer> topicCountMap = new HashMap<>(); final Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(this.topic, 1); topicCountMap.put(this.topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> streamMap = Map<String, List<KafkaStream<byte[], byte[]>>> streamMap = this.consumerConnector
this.consumerConnector.createMessageStreams(topicCountMap); .createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = streamMap.values().iterator().next(); List<KafkaStream<byte[], byte[]>> streams = streamMap.values().iterator().next();
if (streams.size() != 1) { if (streams.size() != 1) {
throw new IllegalStateException(String.format( throw new IllegalStateException(
"Expected only one stream but instead there are %d", streams.size())); String.format("Expected only one stream but instead there are %d", streams.size()));
} }
return streams.get(0); return streams.get(0);
} }
@ -92,32 +111,28 @@ public class KafkaChannel {
properties.put("consumer.id", properties.put("consumer.id",
String.format("%s_%s", pipelineConfig.getConsumerId(), this.threadId)); String.format("%s_%s", pipelineConfig.getConsumerId(), this.threadId));
properties.put("socket.timeout.ms", kafkaConfig.getSocketTimeoutMs().toString()); properties.put("socket.timeout.ms", kafkaConfig.getSocketTimeoutMs().toString());
properties.put("socket.receive.buffer.bytes", kafkaConfig.getSocketReceiveBufferBytes() properties.put("socket.receive.buffer.bytes", kafkaConfig.getSocketReceiveBufferBytes().toString());
.toString()); properties.put("fetch.message.max.bytes", kafkaConfig.getFetchMessageMaxBytes().toString());
properties.put("fetch.message.max.bytes", kafkaConfig.getFetchMessageMaxBytes()
.toString());
// Set auto commit to false because the persister is going to explicitly commit // Set auto commit to false because the persister is going to explicitly commit
properties.put("auto.commit.enable", "false"); properties.put("auto.commit.enable", "false");
properties.put("queued.max.message.chunks", kafkaConfig.getQueuedMaxMessageChunks() properties.put("queued.max.message.chunks", kafkaConfig.getQueuedMaxMessageChunks().toString());
.toString());
properties.put("rebalance.max.retries", kafkaConfig.getRebalanceMaxRetries().toString()); properties.put("rebalance.max.retries", kafkaConfig.getRebalanceMaxRetries().toString());
properties.put("fetch.min.bytes", kafkaConfig.getFetchMinBytes().toString()); properties.put("fetch.min.bytes", kafkaConfig.getFetchMinBytes().toString());
properties.put("fetch.wait.max.ms", kafkaConfig.getFetchWaitMaxMs().toString()); properties.put("fetch.wait.max.ms", kafkaConfig.getFetchWaitMaxMs().toString());
properties.put("rebalance.backoff.ms", kafkaConfig.getRebalanceBackoffMs().toString()); properties.put("rebalance.backoff.ms", kafkaConfig.getRebalanceBackoffMs().toString());
properties.put("refresh.leader.backoff.ms", kafkaConfig.getRefreshLeaderBackoffMs() properties.put("refresh.leader.backoff.ms", kafkaConfig.getRefreshLeaderBackoffMs().toString());
.toString());
properties.put("auto.offset.reset", kafkaConfig.getAutoOffsetReset()); properties.put("auto.offset.reset", kafkaConfig.getAutoOffsetReset());
properties.put("consumer.timeout.ms", kafkaConfig.getConsumerTimeoutMs().toString()); properties.put("consumer.timeout.ms", kafkaConfig.getConsumerTimeoutMs().toString());
properties.put("client.id", String.format("%s_%s", pipelineConfig.getClientId(), threadId)); properties.put("client.id", String.format("%s_%s", pipelineConfig.getClientId(), threadId));
properties.put("zookeeper.session.timeout.ms", kafkaConfig properties.put("zookeeper.session.timeout.ms",
.getZookeeperSessionTimeoutMs().toString()); kafkaConfig.getZookeeperSessionTimeoutMs().toString());
properties.put("zookeeper.connection.timeout.ms", kafkaConfig properties.put("zookeeper.connection.timeout.ms",
.getZookeeperConnectionTimeoutMs().toString()); kafkaConfig.getZookeeperConnectionTimeoutMs().toString());
properties properties.put("zookeeper.sync.time.ms", kafkaConfig.getZookeeperSyncTimeMs().toString());
.put("zookeeper.sync.time.ms", kafkaConfig.getZookeeperSyncTimeMs().toString());
for (String key : properties.stringPropertyNames()) { for (String key : properties.stringPropertyNames()) {
logger.info("[{}]: " + KAFKA_CONFIGURATION + " " + key + " = " + properties.getProperty(key), threadId); logger.info("[{}]: " + KAFKA_CONFIGURATION + " " + key + " = " + properties.getProperty(key),
threadId);
} }
return properties; return properties;

View File

@ -1,6 +1,8 @@
/* /*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P. * Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
* *
* Copyright (c) 2017 SUSE LLC.
*
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
@ -38,14 +40,13 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
private final String threadId; private final String threadId;
private final ManagedPipeline<T> pipeline; private final ManagedPipeline<T> pipeline;
private volatile boolean stop = false; private volatile boolean stop = false;
private boolean active = false;
private ExecutorService executorService; private ExecutorService executorService;
@Inject @Inject
public KafkaConsumerRunnableBasic( public KafkaConsumerRunnableBasic(@Assisted KafkaChannel kafkaChannel,
@Assisted KafkaChannel kafkaChannel, @Assisted ManagedPipeline<T> pipeline, @Assisted String threadId) {
@Assisted ManagedPipeline<T> pipeline,
@Assisted String threadId) {
this.kafkaChannel = kafkaChannel; this.kafkaChannel = kafkaChannel;
this.pipeline = pipeline; this.pipeline = pipeline;
@ -67,8 +68,9 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
} }
private void markRead() { private void markRead() {
if (logger.isDebugEnabled()) {
logger.debug("[{}]: marking read", this.threadId); logger.debug("[{}]: marking read", this.threadId);
}
this.kafkaChannel.markRead(); this.kafkaChannel.markRead();
@ -80,12 +82,30 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
this.stop = true; this.stop = true;
int count = 0;
while (active) {
if (count++ >= 20) {
break;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
logger.error("interrupted while waiting for the run loop to stop", e);
break;
}
}
if (!active) {
this.kafkaChannel.markReadIfDirty();
}
} }
public void run() { public void run() {
logger.info("[{}]: run", this.threadId); logger.info("[{}]: run", this.threadId);
active = true;
final ConsumerIterator<byte[], byte[]> it = kafkaChannel.getKafkaStream().iterator(); final ConsumerIterator<byte[], byte[]> it = kafkaChannel.getKafkaStream().iterator();
logger.debug("[{}]: KafkaChannel has stream iterator", this.threadId); logger.debug("[{}]: KafkaChannel has stream iterator", this.threadId);
@ -121,7 +141,9 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
final String msg = new String(it.next().message()); final String msg = new String(it.next().message());
logger.debug("[{}]: {}", this.threadId, msg); if (logger.isDebugEnabled()) {
logger.debug("[{}]: {}", this.threadId, msg);
}
publishEvent(msg); publishEvent(msg);
@ -149,22 +171,24 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
} catch (Throwable e) { } catch (Throwable e) {
logger.error( logger
"[{}]: caught fatal exception while publishing msg. Shutting entire persister down " .error("[{}]: caught fatal exception while publishing msg. Shutting entire persister down "
+ "now!", this.threadId, e); + "now!", this.threadId, e);
logger.error("[{}]: calling shutdown on executor service", this.threadId); logger.error("[{}]: calling shutdown on executor service", this.threadId);
this.executorService.shutdownNow(); this.executorService.shutdownNow();
logger.error("[{}]: shutting down system. calling system.exit(1)", this.threadId); logger.error("[{}]: shutting down system. calling system.exit(1)", this.threadId);
System.exit(1); System.exit(1);
} }
} }
logger.info("[{}]: calling stop on kafka channel", this.threadId); logger.info("[{}]: calling stop on kafka channel", this.threadId);
active = false;
this.kafkaChannel.stop(); this.kafkaChannel.stop();
logger.debug("[{}]: exiting main run loop", this.threadId); logger.debug("[{}]: exiting main run loop", this.threadId);
@ -183,9 +207,10 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
private boolean isInterrupted() { private boolean isInterrupted() {
if (Thread.currentThread().interrupted()) { if (Thread.interrupted()) {
if (logger.isDebugEnabled()) {
logger.debug("[{}]: is interrupted. breaking out of run loop", this.threadId); logger.debug("[{}]: is interrupted. breaking out of run loop", this.threadId);
}
return true; return true;

View File

@ -1,6 +1,8 @@
/* /*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P. * Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
* *
* Copyright (c) 2017 SUSE LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
@ -51,28 +53,18 @@ public abstract class FlushableHandler<T> {
protected final String handlerName; protected final String handlerName;
protected FlushableHandler( protected FlushableHandler(PipelineConfig configuration, Environment environment, String threadId,
PipelineConfig configuration,
Environment environment,
String threadId,
int batchSize) { int batchSize) {
this.threadId = threadId; this.threadId = threadId;
this.handlerName = this.handlerName = String.format("%s[%s]", this.getClass().getName(), threadId);
String.format(
"%s[%s]",
this.getClass().getName(),
threadId);
this.processedMeter = this.processedMeter = environment.metrics().meter(handlerName + "." + "events-processed-meter");
environment.metrics().meter(handlerName + "." + "events-processed-meter");
this.flushMeter = this.flushMeter = environment.metrics().meter(handlerName + "." + "flush-meter");
environment.metrics().meter(handlerName + "." + "flush-meter");
this.flushTimer = this.flushTimer = environment.metrics().timer(handlerName + "." + "flush-timer");
environment.metrics().timer(handlerName + "." + "flush-timer");
this.secondsBetweenFlushes = configuration.getMaxBatchTime(); this.secondsBetweenFlushes = configuration.getMaxBatchTime();
@ -102,7 +94,7 @@ public abstract class FlushableHandler<T> {
} else { } else {
return false; return false;
} }
} }
@ -126,20 +118,26 @@ public abstract class FlushableHandler<T> {
private boolean isBatchSize() { private boolean isBatchSize() {
logger.debug("[{}]: checking batch size", this.threadId); if (logger.isDebugEnabled()) {
logger.debug("[{}]: checking batch size", this.threadId);
}
if (this.msgCount >= this.batchSize) { if (this.msgCount >= this.batchSize) {
logger.debug("[{}]: batch sized {} attained", this.threadId, this.batchSize); if (logger.isDebugEnabled()) {
logger.debug("[{}]: batch sized {} attained", this.threadId, this.batchSize);
}
return true; return true;
} else { } else {
logger.debug("[{}]: batch size now at {}, batch size {} not attained", if (logger.isDebugEnabled()) {
this.threadId, logger.debug("[{}]: batch size now at {}, batch size {} not attained", this.threadId,
this.msgCount, this.msgCount, this.batchSize);
this.batchSize); }
return false; return false;
@ -147,28 +145,26 @@ public abstract class FlushableHandler<T> {
} }
private boolean isFlushTime() { private boolean isFlushTime() {
if (logger.isDebugEnabled()) {
logger.debug("[{}]: got heartbeat message, checking flush time. flush every {} seconds.", logger.debug("[{}]: got heartbeat message, checking flush time. flush every {} seconds.",
this.threadId, this.threadId, this.secondsBetweenFlushes);
this.secondsBetweenFlushes); }
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
if (this.flushTimeMillis <= now ) { if (this.flushTimeMillis <= now) {
if (logger.isDebugEnabled()) {
logger.debug( logger.debug("[{}]: {} ms past flush time. flushing to repository now.", this.threadId,
"[{}]: {} ms past flush time. flushing to repository now.", now - this.flushTimeMillis);
this.threadId, }
now - this.flushTimeMillis);
return true; return true;
} else { } else {
if (logger.isDebugEnabled()) {
logger.debug( logger.debug("[{}]: {} ms to next flush time. no need to flush at this time.", this.threadId,
"[{}]: {} ms to next flush time. no need to flush at this time.", this.flushTimeMillis - now);
this.threadId, }
this.flushTimeMillis - now);
return false; return false;
@ -176,8 +172,9 @@ public abstract class FlushableHandler<T> {
} }
public int flush() throws RepoException { public int flush() throws RepoException {
if (logger.isDebugEnabled()) {
logger.debug("[{}]: flushing", this.threadId); logger.debug("[{}]: flushing", this.threadId);
}
Timer.Context context = this.flushTimer.time(); Timer.Context context = this.flushTimer.time();
@ -185,13 +182,15 @@ public abstract class FlushableHandler<T> {
context.stop(); context.stop();
this.flushMeter.mark(); this.flushMeter.mark(msgFlushCnt);
this.flushTimeMillis = System.currentTimeMillis() + this.millisBetweenFlushes; this.flushTimeMillis = System.currentTimeMillis() + this.millisBetweenFlushes;
logger.debug("[{}]: flushed {} msg", this.threadId, msgFlushCnt); if (logger.isDebugEnabled()) {
logger.debug("[{}]: flushed {} msg", this.threadId, msgFlushCnt);
}
this.msgCount = 0; this.msgCount -= msgFlushCnt;
this.batchCount++; this.batchCount++;

View File

@ -1,6 +1,8 @@
/* /*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P. * Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
* *
* Copyright (c) 2017 SUSE LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
@ -38,28 +40,23 @@ import monasca.persister.repository.RepoException;
public class MetricHandler extends FlushableHandler<MetricEnvelope[]> { public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
private static final Logger logger = private static final Logger logger = LoggerFactory.getLogger(MetricHandler.class);
LoggerFactory.getLogger(MetricHandler.class);
private final Repo<MetricEnvelope> metricRepo; private final Repo<MetricEnvelope> metricRepo;
private final Counter metricCounter; private final Counter metricCounter;
@Inject @Inject
public MetricHandler( public MetricHandler(Repo<MetricEnvelope> metricRepo, Environment environment,
Repo<MetricEnvelope> metricRepo, @Assisted PipelineConfig configuration, @Assisted("threadId") String threadId,
Environment environment,
@Assisted PipelineConfig configuration,
@Assisted("threadId") String threadId,
@Assisted("batchSize") int batchSize) { @Assisted("batchSize") int batchSize) {
super(configuration, environment, threadId, batchSize); super(configuration, environment, threadId, batchSize);
this.metricRepo = metricRepo; this.metricRepo = metricRepo;
this.metricCounter = this.metricCounter = environment.metrics()
environment.metrics() .counter(this.handlerName + "." + "metrics-added-to-batch-counter");
.counter(this.handlerName + "." + "metrics-added-to-batch-counter");
} }
@ -89,12 +86,10 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
} }
private void processEnvelope(MetricEnvelope metricEnvelope) { private void processEnvelope(MetricEnvelope metricEnvelope) {
if (logger.isDebugEnabled()) {
logger.debug("[{}]: [{}:{}] {}", logger.debug("[{}]: [{}:{}] {}", this.threadId, this.getBatchCount(), this.getMsgCount(),
this.threadId, metricEnvelope);
this.getBatchCount(), }
this.getMsgCount(),
metricEnvelope);
this.metricRepo.addToBatch(metricEnvelope, this.threadId); this.metricRepo.addToBatch(metricEnvelope, this.threadId);
@ -109,8 +104,8 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
this.objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY); this.objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY);
this.objectMapper.setPropertyNamingStrategy( this.objectMapper
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); .setPropertyNamingStrategy(PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
} }

View File

@ -1,63 +1,73 @@
/* /*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P. * Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Copyright (c) 2017 SUSE LLC.
* you may not use this file except in compliance with the License. *
* You may obtain a copy of the License at * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* http://www.apache.org/licenses/LICENSE-2.0 * You may obtain a copy of the License at
* *
* Unless required by applicable law or agreed to in writing, software * http://www.apache.org/licenses/LICENSE-2.0
* distributed under the License is distributed on an "AS IS" BASIS, *
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * Unless required by applicable law or agreed to in writing, software
* implied. * distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* limitations under the License. * implied.
*/ * See the License for the specific language governing permissions and
* limitations under the License.
package monasca.persister.repository.vertica; */
import org.apache.commons.codec.binary.Hex; package monasca.persister.repository;
import java.util.Arrays; import org.apache.commons.codec.binary.Hex;
public class Sha1HashId { import java.nio.ByteBuffer;
private final byte[] sha1Hash; import java.util.Arrays;
public Sha1HashId(byte[] sha1Hash) { public class Sha1HashId {
this.sha1Hash = sha1Hash; private final byte[] sha1Hash;
}
private final String hex;
@Override
public String toString() { public Sha1HashId(byte[] sha1Hash) {
return "Sha1HashId{" + "sha1Hash=" + Hex.encodeHexString(sha1Hash) + "}"; this.sha1Hash = sha1Hash;
} hex = Hex.encodeHexString(sha1Hash);
}
@Override
public boolean equals(Object o) { @Override
if (this == o) public String toString() {
return true; return "Sha1HashId{" + "sha1Hash=" + hex + "}";
if (!(o instanceof Sha1HashId)) }
return false;
@Override
Sha1HashId that = (Sha1HashId) o; public boolean equals(Object o) {
if (this == o)
if (!Arrays.equals(sha1Hash, that.sha1Hash)) return true;
return false; if (!(o instanceof Sha1HashId))
return false;
return true;
} Sha1HashId that = (Sha1HashId) o;
@Override if (!Arrays.equals(sha1Hash, that.sha1Hash))
public int hashCode() { return false;
return Arrays.hashCode(sha1Hash);
} return true;
}
public byte[] getSha1Hash() {
return sha1Hash; @Override
} public int hashCode() {
return Arrays.hashCode(sha1Hash);
public String toHexString() { }
return Hex.encodeHexString(sha1Hash);
} public byte[] getSha1Hash() {
} return sha1Hash;
}
public ByteBuffer getSha1HashByteBuffer() {
return ByteBuffer.wrap(sha1Hash);
}
public String toHexString() {
return hex;
}
}

View File

@ -0,0 +1,113 @@
/*
* Copyright (c) 2017 SUSE LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.repository.cassandra;
import java.security.NoSuchAlgorithmException;
import java.sql.SQLException;
import java.sql.Timestamp;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import io.dropwizard.setup.Environment;
import monasca.common.model.event.AlarmStateTransitionedEvent;
import monasca.persister.configuration.PersisterConfig;
import monasca.persister.repository.Repo;
import monasca.persister.repository.RepoException;
/**
* This class is not thread safe.
*
*/
public class CassandraAlarmRepo extends CassandraRepo implements Repo<AlarmStateTransitionedEvent> {
private static final Logger logger = LoggerFactory.getLogger(CassandraAlarmRepo.class);
private static String EMPTY_REASON_DATA = "{}";
private static final int MAX_BYTES_PER_CHAR = 4;
private static final int MAX_LENGTH_VARCHAR = 65000;
private int retention;
private ObjectMapper objectMapper = new ObjectMapper();
@Inject
public CassandraAlarmRepo(CassandraCluster cluster, PersisterConfig config, Environment environment)
throws NoSuchAlgorithmException, SQLException {
super(cluster, environment, config.getCassandraDbConfiguration().getMaxWriteRetries(),
config.getAlarmHistoryConfiguration().getBatchSize());
this.retention = config.getCassandraDbConfiguration().getRetentionPolicy() * 24 * 3600;
logger.debug("Instantiating " + this.getClass().getName());
this.objectMapper
.setPropertyNamingStrategy(PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
session = cluster.getAlarmsSession();
logger.debug(this.getClass().getName() + " is fully instantiated");
}
public void addToBatch(AlarmStateTransitionedEvent message, String id) {
String metricsString = getSerializedString(message.metrics, id);
// Validate metricsString does not exceed a sufficient maximum upper bound
if (metricsString.length() * MAX_BYTES_PER_CHAR >= MAX_LENGTH_VARCHAR) {
metricsString = "[]";
logger.warn("length of metricsString for alarm ID {} exceeds max length of {}", message.alarmId,
MAX_LENGTH_VARCHAR);
}
String subAlarmsString = getSerializedString(message.subAlarms, id);
if (subAlarmsString.length() * MAX_BYTES_PER_CHAR >= MAX_LENGTH_VARCHAR) {
subAlarmsString = "[]";
logger.warn("length of subAlarmsString for alarm ID {} exceeds max length of {}", message.alarmId,
MAX_LENGTH_VARCHAR);
}
queue.offerLast(cluster.getAlarmHistoryInsertStmt().bind(retention, metricsString, message.oldState.name(),
message.newState.name(), subAlarmsString, message.stateChangeReason, EMPTY_REASON_DATA,
message.tenantId, message.alarmId, new Timestamp(message.timestamp)));
}
private String getSerializedString(Object o, String id) {
try {
return this.objectMapper.writeValueAsString(o);
} catch (JsonProcessingException e) {
logger.error("[[}]: failed to serialize object {}", id, o, e);
return "";
}
}
@Override
public int flush(String id) throws RepoException {
return handleFlush(id);
}
}

View File

@ -0,0 +1,427 @@
/*
* Copyright (c) 2017 SUSE LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.repository.cassandra;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Cluster.Builder;
import com.datastax.driver.core.CodecRegistry;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.PlainTextAuthProvider;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ProtocolOptions;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.TokenRange;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.datastax.driver.core.utils.Bytes;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.inject.Inject;
import monasca.common.configuration.CassandraDbConfiguration;
import monasca.persister.configuration.PersisterConfig;
public class CassandraCluster {
private static final Logger logger = LoggerFactory.getLogger(CassandraCluster.class);
private static final String MEASUREMENT_INSERT_CQL = "update monasca.measurements USING TTL ? "
+ "set value = ?, value_meta = ?, region = ?, tenant_id = ?, metric_name = ?, dimensions = ? "
+ "where metric_id = ? and time_stamp = ?";
private static final String MEASUREMENT_UPDATE_CQL = "update monasca.measurements USING TTL ? "
+ "set value = ?, value_meta = ? " + "where metric_id = ? and time_stamp = ?";
private static final String METRICS_INSERT_CQL = "update monasca.metrics USING TTL ? "
+ "set metric_id = ?, created_at = ?, updated_at = ? "
+ "where region = ? and tenant_id = ? and metric_name = ? and dimensions = ? and dimension_names = ?";
private static final String METRICS_UPDATE_CQL = "update monasca.metrics USING TTL ? "
+ "set updated_at = ? "
+ "where region = ? and tenant_id = ? and metric_name = ? and dimensions = ? and dimension_names = ?";
private static final String DIMENSION_INSERT_CQL = "insert into monasca.dimensions "
+ "(region, tenant_id, name, value) values (?, ?, ?, ?)";
private static final String DIMENSION_METRIC_INSERT_CQL = "insert into monasca.dimensions_metrics "
+ " (region, tenant_id, dimension_name, dimension_value, metric_name) values (?, ?, ?, ?, ?)";
private static final String METRIC_DIMENSION_INSERT_CQL = "insert into monasca.metrics_dimensions "
+ " (region, tenant_id, metric_name, dimension_name, dimension_value) values (?, ?, ?, ?, ?)";
private static final String INSERT_ALARM_STATE_HISTORY_SQL = "update monasca.alarm_state_history USING TTL ? "
+ " set metric = ?, old_state = ?, new_state = ?, sub_alarms = ?, reason = ?, reason_data = ?"
+ " where tenant_id = ? and alarm_id = ? and time_stamp = ?";
private static final String RETRIEVE_METRIC_DIMENSION_CQL = "select region, tenant_id, metric_name, "
+ "dimension_name, dimension_value from metrics_dimensions "
+ "WHERE token(region, tenant_id, metric_name) > ? and token(region, tenant_id, metric_name) <= ? ";
private static final String RETRIEVE_METRIC_ID_CQL = "select distinct metric_id from measurements WHERE token(metric_id) > ? and token(metric_id) <= ?";
private static final String RETRIEVE_DIMENSION_CQL = "select region, tenant_id, name, value from dimensions";
private static final String NAME = "name";
private static final String VALUE = "value";
private static final String METRIC_ID = "metric_id";
private static final String TENANT_ID_COLUMN = "tenant_id";
private static final String METRIC_NAME = "metric_name";
private static final String DIMENSION_NAME = "dimension_name";
private static final String DIMENSION_VALUE = "dimension_value";
private static final String REGION = "region";
private CassandraDbConfiguration dbConfig;
private Cluster cluster;
private Session metricsSession;
private Session alarmsSession;
private TokenAwarePolicy lbPolicy;
private PreparedStatement measurementInsertStmt;
private PreparedStatement measurementUpdateStmt;
private PreparedStatement metricInsertStmt;
private PreparedStatement metricUpdateStmt;
private PreparedStatement dimensionStmt;
private PreparedStatement dimensionMetricStmt;
private PreparedStatement metricDimensionStmt;
private PreparedStatement retrieveMetricDimensionStmt;
private PreparedStatement retrieveMetricIdStmt;
private PreparedStatement alarmHistoryInsertStmt;
public Cache<String, Boolean> getMetricIdCache() {
return metricIdCache;
}
public Cache<String, Boolean> getDimensionCache() {
return dimensionCache;
}
public Cache<String, Boolean> getMetricDimensionCache() {
return metricDimensionCache;
}
private final Cache<String, Boolean> metricIdCache;
private final Cache<String, Boolean> dimensionCache;
private final Cache<String, Boolean> metricDimensionCache;
@Inject
public CassandraCluster(final PersisterConfig config) {
this.dbConfig = config.getCassandraDbConfiguration();
QueryOptions qo = new QueryOptions();
qo.setConsistencyLevel(ConsistencyLevel.valueOf(dbConfig.getConsistencyLevel()));
qo.setDefaultIdempotence(true);
String[] contactPoints = dbConfig.getContactPoints();
int retries = dbConfig.getMaxWriteRetries();
Builder builder = Cluster.builder().addContactPoints(contactPoints).withPort(dbConfig.getPort());
builder
.withSocketOptions(new SocketOptions().setConnectTimeoutMillis(dbConfig.getConnectionTimeout())
.setReadTimeoutMillis(dbConfig.getReadTimeout()));
builder.withQueryOptions(qo).withRetryPolicy(new MonascaRetryPolicy(retries, retries, retries));
lbPolicy = new TokenAwarePolicy(
DCAwareRoundRobinPolicy.builder().withLocalDc(dbConfig.getLocalDataCenter()).build());
builder.withLoadBalancingPolicy(lbPolicy);
String user = dbConfig.getUser();
if (user != null && !user.isEmpty()) {
builder.withAuthProvider(new PlainTextAuthProvider(dbConfig.getUser(), dbConfig.getPassword()));
}
cluster = builder.build();
PoolingOptions poolingOptions = cluster.getConfiguration().getPoolingOptions();
poolingOptions.setConnectionsPerHost(HostDistance.LOCAL, dbConfig.getMaxConnections(),
dbConfig.getMaxConnections()).setConnectionsPerHost(HostDistance.REMOTE,
dbConfig.getMaxConnections(), dbConfig.getMaxConnections());
poolingOptions.setMaxRequestsPerConnection(HostDistance.LOCAL, dbConfig.getMaxRequests())
.setMaxRequestsPerConnection(HostDistance.REMOTE, dbConfig.getMaxRequests());
metricsSession = cluster.connect(dbConfig.getKeySpace());
measurementInsertStmt = metricsSession.prepare(MEASUREMENT_INSERT_CQL).setIdempotent(true);
measurementUpdateStmt = metricsSession.prepare(MEASUREMENT_UPDATE_CQL).setIdempotent(true);
metricInsertStmt = metricsSession.prepare(METRICS_INSERT_CQL).setIdempotent(true);
metricUpdateStmt = metricsSession.prepare(METRICS_UPDATE_CQL).setIdempotent(true);
dimensionStmt = metricsSession.prepare(DIMENSION_INSERT_CQL).setIdempotent(true);
dimensionMetricStmt = metricsSession.prepare(DIMENSION_METRIC_INSERT_CQL).setIdempotent(true);
metricDimensionStmt = metricsSession.prepare(METRIC_DIMENSION_INSERT_CQL).setIdempotent(true);
retrieveMetricIdStmt = metricsSession.prepare(RETRIEVE_METRIC_ID_CQL).setIdempotent(true);
retrieveMetricDimensionStmt = metricsSession.prepare(RETRIEVE_METRIC_DIMENSION_CQL)
.setIdempotent(true);
alarmsSession = cluster.connect(dbConfig.getKeySpace());
alarmHistoryInsertStmt = alarmsSession.prepare(INSERT_ALARM_STATE_HISTORY_SQL).setIdempotent(true);
metricIdCache = CacheBuilder.newBuilder()
.maximumSize(config.getCassandraDbConfiguration().getDefinitionMaxCacheSize()).build();
dimensionCache = CacheBuilder.newBuilder()
.maximumSize(config.getCassandraDbConfiguration().getDefinitionMaxCacheSize()).build();
metricDimensionCache = CacheBuilder.newBuilder()
.maximumSize(config.getCassandraDbConfiguration().getDefinitionMaxCacheSize()).build();
logger.info("loading cached definitions from db");
ExecutorService executor = Executors.newFixedThreadPool(250);
//a majority of the ids are for metrics not actively receiving msgs anymore
//loadMetricIdCache(executor);
loadDimensionCache();
loadMetricDimensionCache(executor);
executor.shutdown();
}
public Session getMetricsSession() {
return metricsSession;
}
public Session getAlarmsSession() {
return alarmsSession;
}
public PreparedStatement getMeasurementInsertStmt() {
return measurementInsertStmt;
}
public PreparedStatement getMeasurementUpdateStmt() {
return measurementUpdateStmt;
}
public PreparedStatement getMetricInsertStmt() {
return metricInsertStmt;
}
public PreparedStatement getMetricUpdateStmt() {
return metricUpdateStmt;
}
public PreparedStatement getDimensionStmt() {
return dimensionStmt;
}
public PreparedStatement getDimensionMetricStmt() {
return dimensionMetricStmt;
}
public PreparedStatement getMetricDimensionStmt() {
return metricDimensionStmt;
}
public PreparedStatement getAlarmHistoryInsertStmt() {
return alarmHistoryInsertStmt;
}
public ProtocolOptions getProtocolOptions() {
return cluster.getConfiguration().getProtocolOptions();
}
public CodecRegistry getCodecRegistry() {
return cluster.getConfiguration().getCodecRegistry();
}
public Metadata getMetaData() {
return cluster.getMetadata();
}
public TokenAwarePolicy getLoadBalancePolicy() {
return lbPolicy;
}
private void loadMetricIdCache(ExecutorService executor) {
final AtomicInteger tasks = new AtomicInteger(0);
logger.info("Found token ranges: " + cluster.getMetadata().getTokenRanges().size());
for (TokenRange range : cluster.getMetadata().getTokenRanges()) {
List<BoundStatement> queries = rangeQuery(retrieveMetricIdStmt, range);
for (BoundStatement query : queries) {
tasks.incrementAndGet();
logger.info("adding a metric id reading task, total: " + tasks.get());
ResultSetFuture future = metricsSession.executeAsync(query);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
for (Row row : result) {
String id = Bytes.toHexString(row.getBytes(METRIC_ID));
if (id != null) {
//remove '0x'
metricIdCache.put(id.substring(2), Boolean.TRUE);
}
}
tasks.decrementAndGet();
logger.info("completed a metric id read task. Remaining tasks: " + tasks.get());
}
@Override
public void onFailure(Throwable t) {
logger.error("Failed to execute query to load metric id cache.", t);
tasks.decrementAndGet();
logger.info("Failed a metric id read task. Remaining tasks: " + tasks.get());
}
}, executor);
}
}
while (tasks.get() > 0) {
logger.debug("waiting for more metric id load tasks: " + tasks.get());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
logger.warn("load metric cache was interrupted", e);
}
}
logger.info("loaded metric id cache from database: " + metricIdCache.size());
}
private List<BoundStatement> rangeQuery(PreparedStatement rangeStmt, TokenRange range) {
List<BoundStatement> res = Lists.newArrayList();
for (TokenRange subRange : range.unwrap()) {
res.add(rangeStmt.bind(subRange.getStart(), subRange.getEnd()));
}
return res;
}
private void loadDimensionCache() {
ResultSet results = metricsSession.execute(RETRIEVE_DIMENSION_CQL);
for (Row row : results) {
String key = getDimnesionEntryKey(row.getString(REGION), row.getString(TENANT_ID_COLUMN),
row.getString(NAME), row.getString(VALUE));
dimensionCache.put(key, Boolean.TRUE);
}
logger.info("loaded dimension cache from database: " + dimensionCache.size());
}
public String getDimnesionEntryKey(String region, String tenantId, String name, String value) {
StringBuilder sb = new StringBuilder();
sb.append(region).append('\0');
sb.append(tenantId).append('\0');
sb.append(name).append('\0');
sb.append(value);
return sb.toString();
}
private void loadMetricDimensionCache(ExecutorService executor) {
final AtomicInteger tasks = new AtomicInteger(0);
for (TokenRange range : cluster.getMetadata().getTokenRanges()) {
List<BoundStatement> queries = rangeQuery(retrieveMetricDimensionStmt, range);
for (BoundStatement query : queries) {
tasks.incrementAndGet();
logger.info("Adding a metric dimnesion read task, total: " + tasks.get());
ResultSetFuture future = metricsSession.executeAsync(query);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
for (Row row : result) {
String key = getMetricDimnesionEntryKey(row.getString(REGION),
row.getString(TENANT_ID_COLUMN), row.getString(METRIC_NAME),
row.getString(DIMENSION_NAME), row.getString(DIMENSION_VALUE));
metricDimensionCache.put(key, Boolean.TRUE);
}
tasks.decrementAndGet();
logger.info("Completed a metric dimension read task. Remaining tasks: " + tasks.get());
}
@Override
public void onFailure(Throwable t) {
logger.error("Failed to execute query to load metric id cache.", t);
tasks.decrementAndGet();
logger.info("Failed a metric dimension read task. Remaining tasks: " + tasks.get());
}
}, executor);
}
}
while (tasks.get() > 0) {
logger.debug("waiting for metric dimension cache to load ...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
logger.warn("load metric dimension cache was interrupted", e);
}
}
logger.info("loaded metric dimension cache from database: " + metricDimensionCache.size());
}
public String getMetricDimnesionEntryKey(String region, String tenantId, String metricName,
String dimensionName, String dimensionValue) {
StringBuilder sb = new StringBuilder();
sb.append(region).append('\0');
sb.append(tenantId).append('\0');
sb.append(metricName).append('\0');
sb.append(dimensionName).append('\0');
sb.append(dimensionValue);
return sb.toString();
}
}

View File

@ -0,0 +1,207 @@
/*
* Copyright (c) 2017 SUSE LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.repository.cassandra;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.BatchStatement.Type;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.CodecRegistry;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.ProtocolOptions;
import com.datastax.driver.core.Token;
import com.datastax.driver.core.policies.TokenAwarePolicy;
public class CassandraMetricBatch {
private static Logger logger = LoggerFactory.getLogger(CassandraMetricBatch.class);
ProtocolOptions protocol;
CodecRegistry codec;
Metadata metadata;
TokenAwarePolicy policy;
int batchLimit;
Map<Token, Deque<BatchStatement>> metricQueries;
Map<Token, Deque<BatchStatement>> dimensionQueries;
Map<Token, Deque<BatchStatement>> dimensionMetricQueries;
Map<Token, Deque<BatchStatement>> metricDimensionQueries;
Map<Set<Host>, Deque<BatchStatement>> measurementQueries;
public CassandraMetricBatch(Metadata metadata, ProtocolOptions protocol, CodecRegistry codec,
TokenAwarePolicy lbPolicy, int batchLimit) {
this.protocol = protocol;
this.codec = codec;
this.metadata = metadata;
this.policy = lbPolicy;
metricQueries = new HashMap<>();
this.batchLimit = batchLimit;
metricQueries = new HashMap<>();
dimensionQueries = new HashMap<>();
dimensionMetricQueries = new HashMap<>();
metricDimensionQueries = new HashMap<>();
measurementQueries = new HashMap<>();
}
public void addMetricQuery(BoundStatement s) {
batchQueryByToken(s, metricQueries);
}
public void addDimensionQuery(BoundStatement s) {
batchQueryByToken(s, dimensionQueries);
}
public void addDimensionMetricQuery(BoundStatement s) {
batchQueryByToken(s, dimensionMetricQueries);
}
public void addMetricDimensionQuery(BoundStatement s) {
batchQueryByToken(s, metricDimensionQueries);
}
public void addMeasurementQuery(BoundStatement s) {
batchQueryByReplica(s, measurementQueries);
}
private void batchQueryByToken(BoundStatement s, Map<Token, Deque<BatchStatement>> batchedQueries) {
ByteBuffer b = s.getRoutingKey(protocol.getProtocolVersion(), codec);
Token token = metadata.newToken(b);
Deque<BatchStatement> queue = batchedQueries.get(token);
if (queue == null) {
queue = new ArrayDeque<BatchStatement>();
BatchStatement bs = new BatchStatement(Type.UNLOGGED);
bs.add(s);
queue.offer(bs);
batchedQueries.put(token, queue);
} else {
BatchStatement bs = queue.getLast();
if (bs.size() < batchLimit) {
bs.add(s);
} else {
bs = new BatchStatement(Type.UNLOGGED);
bs.add(s);
queue.offerLast(bs);
}
}
}
private void batchQueryByReplica(BoundStatement s,
Map<Set<Host>, Deque<BatchStatement>> batchedQueries) {
Iterator<Host> it = policy.newQueryPlan(s.getKeyspace(), s);
Set<Host> hosts = new HashSet<>();
while (it.hasNext()) {
hosts.add(it.next());
}
Deque<BatchStatement> queue = batchedQueries.get(hosts);
if (queue == null) {
queue = new ArrayDeque<BatchStatement>();
BatchStatement bs = new BatchStatement(Type.UNLOGGED);
bs.add(s);
queue.offer(bs);
batchedQueries.put(hosts, queue);
} else {
BatchStatement bs = queue.getLast();
if (bs.size() < 30) {
bs.add(s);
} else {
bs = new BatchStatement(Type.UNLOGGED);
bs.add(s);
queue.offerLast(bs);
}
}
}
public void clear() {
metricQueries.clear();
dimensionQueries.clear();
dimensionMetricQueries.clear();
metricDimensionQueries.clear();
measurementQueries.clear();
}
public List<Deque<BatchStatement>> getAllBatches() {
logTokenBatchMap("metric batches", metricQueries);
logTokenBatchMap("dimension batches", dimensionQueries);
logTokenBatchMap("dimension metric batches", dimensionMetricQueries);
logTokenBatchMap("metric dimension batches", metricDimensionQueries);
logReplicaBatchMap("measurement batches", measurementQueries);
ArrayList<Deque<BatchStatement>> list = new ArrayList<>();
list.addAll(metricQueries.values());
list.addAll(dimensionQueries.values());
list.addAll(dimensionMetricQueries.values());
list.addAll(metricDimensionQueries.values());
list.addAll(measurementQueries.values());
return list;
}
private void logTokenBatchMap(String name, Map<Token, Deque<BatchStatement>> map) {
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder(name);
sb.append(": Size: ").append(map.size());
sb.append("; Tokens: |");
for (Entry<Token, Deque<BatchStatement>> entry : map.entrySet()) {
sb.append(entry.getKey().toString()).append(":");
for (BatchStatement bs : entry.getValue()) {
sb.append(bs.size()).append(",");
}
sb.append("|.");
}
logger.debug(sb.toString());
}
}
private void logReplicaBatchMap(String name, Map<Set<Host>, Deque<BatchStatement>> map) {
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder(name);
sb.append(": Size: ").append(map.size());
sb.append(". Replicas: |");
for (Entry<Set<Host>, Deque<BatchStatement>> entry : map.entrySet()) {
for (Host host : entry.getKey()) {
sb.append(host.getAddress().toString()).append(",");
}
sb.append(":");
for (BatchStatement bs : entry.getValue()) {
sb.append(bs.size()).append(",");
}
sb.append("|");
}
logger.debug(sb.toString());
}
}
}

View File

@ -0,0 +1,336 @@
/*
* Copyright (c) 2017 SUSE LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.repository.cassandra;
import java.security.NoSuchAlgorithmException;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Meter;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.dropwizard.setup.Environment;
import monasca.common.model.metric.Metric;
import monasca.common.model.metric.MetricEnvelope;
import monasca.persister.configuration.PersisterConfig;
import monasca.persister.repository.Repo;
import monasca.persister.repository.RepoException;
import monasca.persister.repository.Sha1HashId;
public class CassandraMetricRepo extends CassandraRepo implements Repo<MetricEnvelope> {
private static final Logger logger = LoggerFactory.getLogger(CassandraMetricRepo.class);
public static final int MAX_COLUMN_LENGTH = 255;
public static final int MAX_VALUE_META_LENGTH = 2048;
private static final String TENANT_ID = "tenantId";
private static final String REGION = "region";
private static final String EMPTY_STR = "";
private int retention;
private CassandraMetricBatch batches;
private int metricCount;
private final ObjectMapper objectMapper = new ObjectMapper();
public final Meter measurementMeter;
public final Meter metricCacheMissMeter;
public final Meter metricCacheHitMeter;
public final Meter dimensionCacheMissMeter;
public final Meter dimensionCacheHitMeter;
public final Meter metricDimensionCacheMissMeter;
public final Meter metricDimensionCacheHitMeter;
@Inject
public CassandraMetricRepo(CassandraCluster cluster, PersisterConfig config, Environment environment)
throws NoSuchAlgorithmException, SQLException {
super(cluster, environment, config.getCassandraDbConfiguration().getMaxWriteRetries(),
config.getMetricConfiguration().getBatchSize());
logger.debug("Instantiating " + this.getClass().getName());
this.retention = config.getCassandraDbConfiguration().getRetentionPolicy() * 24 * 3600;
this.measurementMeter = this.environment.metrics()
.meter(this.getClass().getName() + "." + "measurement-meter");
this.metricCacheMissMeter = this.environment.metrics()
.meter(this.getClass().getName() + "." + "definition-cache-miss-meter");
this.metricCacheHitMeter = this.environment.metrics()
.meter(this.getClass().getName() + "." + "definition-cache-hit-meter");
this.dimensionCacheMissMeter = this.environment.metrics()
.meter(this.getClass().getName() + "." + "dimension-cache-miss-meter");
this.dimensionCacheHitMeter = this.environment.metrics()
.meter(this.getClass().getName() + "." + "dimension-cache-hit-meter");
this.metricDimensionCacheMissMeter = this.environment.metrics()
.meter(this.getClass().getName() + "." + "metric-dimension-cache-miss-meter");
this.metricDimensionCacheHitMeter = this.environment.metrics()
.meter(this.getClass().getName() + "." + "metric-dimension-cache-hit-meter");
session = cluster.getMetricsSession();
metricCount = 0;
batches = new CassandraMetricBatch(cluster.getMetaData(), cluster.getProtocolOptions(),
cluster.getCodecRegistry(), cluster.getLoadBalancePolicy(),
config.getCassandraDbConfiguration().getMaxBatches());
logger.debug(this.getClass().getName() + " is fully instantiated");
}
@Override
public void addToBatch(MetricEnvelope metricEnvelope, String id) {
Metric metric = metricEnvelope.metric;
Map<String, Object> metaMap = metricEnvelope.meta;
String tenantId = getMeta(TENANT_ID, metric, metaMap, id);
String region = getMeta(REGION, metric, metaMap, id);
String metricName = metric.getName();
TreeMap<String, String> dimensions = metric.getDimensions() == null ? new TreeMap<String, String>()
: new TreeMap<>(metric.getDimensions());
StringBuilder sb = new StringBuilder(region).append(tenantId).append(metricName);
Iterator<String> it = dimensions.keySet().iterator();
while (it.hasNext()) {
String k = it.next();
sb.append(k).append(dimensions.get(k));
}
byte[] defIdSha = DigestUtils.sha(sb.toString());
Sha1HashId defIdShaHash = new Sha1HashId(defIdSha);
if (cluster.getMetricIdCache().getIfPresent(defIdShaHash.toHexString()) == null) {
addDefinitionToBatch(defIdShaHash, metricName, dimensions, tenantId, region, id,
metric.getTimestamp());
batches.addMeasurementQuery(buildMeasurementInsertQuery(defIdShaHash, metric.getTimestamp(),
metric.getValue(), metric.getValueMeta(), region, tenantId, metricName, dimensions, id));
} else {
metricCacheHitMeter.mark();
batches.addMetricQuery(cluster.getMetricUpdateStmt().bind(retention,
new Timestamp(metric.getTimestamp()), region, tenantId, metricName,
getDimensionList(dimensions), new ArrayList<>(dimensions.keySet())));
batches.addMeasurementQuery(buildMeasurementUpdateQuery(defIdShaHash, metric.getTimestamp(),
metric.getValue(), metric.getValueMeta(), id));
}
metricCount++;
}
private String getMeta(String name, Metric metric, Map<String, Object> meta, String id) {
if (meta.containsKey(name)) {
return (String) meta.get(name);
} else {
logger.warn(
"[{}]: failed to find {} in message envelope meta data. metric message may be malformed. "
+ "setting {} to empty string.",
id, name);
logger.warn("[{}]: metric: {}", id, metric.toString());
logger.warn("[{}]: meta: {}", id, meta.toString());
return EMPTY_STR;
}
}
private BoundStatement buildMeasurementUpdateQuery(Sha1HashId defId, long timeStamp, double value,
Map<String, String> valueMeta, String id) {
String valueMetaString = getValueMetaString(valueMeta, id);
if (logger.isDebugEnabled()) {
logger.debug("[{}]: adding metric to batch: metric id: {}, time: {}, value: {}, value meta {}",
id, defId.toHexString(), timeStamp, value, valueMetaString);
}
return cluster.getMeasurementUpdateStmt().bind(retention, value, valueMetaString,
defId.getSha1HashByteBuffer(), new Timestamp(timeStamp));
}
private BoundStatement buildMeasurementInsertQuery(Sha1HashId defId, long timeStamp, double value,
Map<String, String> valueMeta, String region, String tenantId, String metricName,
Map<String, String> dimensions, String id) {
String valueMetaString = getValueMetaString(valueMeta, id);
if (logger.isDebugEnabled()) {
logger.debug("[{}]: adding metric to batch: metric id: {}, time: {}, value: {}, value meta {}",
id, defId.toHexString(), timeStamp, value, valueMetaString);
}
measurementMeter.mark();
return cluster.getMeasurementInsertStmt().bind(retention, value, valueMetaString, region, tenantId,
metricName, getDimensionList(dimensions), defId.getSha1HashByteBuffer(),
new Timestamp(timeStamp));
}
private String getValueMetaString(Map<String, String> valueMeta, String id) {
String valueMetaString = "";
if (valueMeta != null && !valueMeta.isEmpty()) {
try {
valueMetaString = this.objectMapper.writeValueAsString(valueMeta);
if (valueMetaString.length() > MAX_VALUE_META_LENGTH) {
logger.error("[{}]: Value meta length {} longer than maximum {}, dropping value meta", id,
valueMetaString.length(), MAX_VALUE_META_LENGTH);
return "";
}
} catch (JsonProcessingException e) {
logger.error("[{}]: Failed to serialize value meta {}, dropping value meta from measurement",
id, valueMeta);
}
}
return valueMetaString;
}
private void addDefinitionToBatch(Sha1HashId defId, String metricName, Map<String, String> dimensions,
String tenantId, String region, String id, long timestamp) {
metricCacheMissMeter.mark();
if (logger.isDebugEnabled()) {
logger.debug("[{}]: adding definition to batch: defId: {}, name: {}, tenantId: {}, region: {}",
id, defId.toHexString(), metricName, tenantId, region);
}
Timestamp ts = new Timestamp(timestamp);
batches.addMetricQuery(
cluster.getMetricInsertStmt().bind(retention, defId.getSha1HashByteBuffer(), ts, ts, region,
tenantId, metricName, getDimensionList(dimensions), new ArrayList<>(dimensions.keySet())));
for (Map.Entry<String, String> entry : dimensions.entrySet()) {
String name = entry.getKey();
String value = entry.getValue();
String dimensionKey = cluster.getDimnesionEntryKey(region, tenantId, name, value);
if (cluster.getDimensionCache().getIfPresent(dimensionKey) != null) {
dimensionCacheHitMeter.mark();
} else {
dimensionCacheMissMeter.mark();
if (logger.isDebugEnabled()) {
logger.debug("[{}]: adding dimension to batch: defId: {}, name: {}, value: {}", id,
defId.toHexString(), name, value);
}
batches.addDimensionQuery(cluster.getDimensionStmt().bind(region, tenantId, name, value));
cluster.getDimensionCache().put(dimensionKey, Boolean.TRUE);
}
String metricDimensionKey = cluster.getMetricDimnesionEntryKey(region, tenantId, metricName, name, value);
if (cluster.getMetricDimensionCache().getIfPresent(metricDimensionKey) != null) {
metricDimensionCacheHitMeter.mark();
} else {
metricDimensionCacheMissMeter.mark();
batches.addDimensionMetricQuery(
cluster.getDimensionMetricStmt().bind(region, tenantId, name, value, metricName));
batches.addMetricDimensionQuery(
cluster.getMetricDimensionStmt().bind(region, tenantId, metricName, name, value));
cluster.getMetricDimensionCache().put(metricDimensionKey, Boolean.TRUE);
}
}
String metricId = defId.toHexString();
cluster.getMetricIdCache().put(metricId, Boolean.TRUE);
}
public List<String> getDimensionList(Map<String, String> dimensions) {
List<String> list = new ArrayList<>(dimensions.size());
for (Entry<String, String> dim : dimensions.entrySet()) {
list.add(new StringBuffer(dim.getKey()).append('\t').append(dim.getValue()).toString());
}
return list;
}
@Override
public int flush(String id) throws RepoException {
long startTime = System.nanoTime();
List<ResultSetFuture> results = new ArrayList<>();
List<Deque<BatchStatement>> list = batches.getAllBatches();
for (Deque<BatchStatement> q : list) {
BatchStatement b;
while ((b = q.poll()) != null) {
results.add(session.executeAsync(b));
}
}
List<ListenableFuture<ResultSet>> futures = Futures.inCompletionOrder(results);
boolean cancel = false;
Exception ex = null;
for (ListenableFuture<ResultSet> future : futures) {
if (cancel) {
future.cancel(false);
continue;
}
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
cancel = true;
ex = e;
}
}
this.commitTimer.update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
if (ex != null) {
metricFailed.inc(metricCount);
throw new RepoException(ex);
}
batches.clear();
int flushCnt = metricCount;
metricCount = 0;
metricCompleted.inc(flushCnt);
return flushCnt;
}
}

View File

@ -0,0 +1,210 @@
/*
* Copyright (c) 2017 SUSE LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.repository.cassandra;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.BatchStatement.Type;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.exceptions.BootstrappingException;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.OperationTimedOutException;
import com.datastax.driver.core.exceptions.OverloadedException;
import com.datastax.driver.core.exceptions.QueryConsistencyException;
import com.datastax.driver.core.exceptions.UnavailableException;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.dropwizard.setup.Environment;
import monasca.persister.repository.RepoException;
public abstract class CassandraRepo {
private static Logger logger = LoggerFactory.getLogger(CassandraRepo.class);
final Environment environment;
final Timer commitTimer;
CassandraCluster cluster;
Session session;
int maxWriteRetries;
int batchSize;
long lastFlushTimeStamp;
Deque<Statement> queue;
Counter metricCompleted;
Counter metricFailed;
public CassandraRepo(CassandraCluster cluster, Environment env, int maxWriteRetries, int batchSize) {
this.cluster = cluster;
this.maxWriteRetries = maxWriteRetries;
this.batchSize = batchSize;
this.environment = env;
this.commitTimer = this.environment.metrics().timer(getClass().getName() + "." + "commit-timer");
lastFlushTimeStamp = System.currentTimeMillis();
queue = new ArrayDeque<>(batchSize);
this.metricCompleted = environment.metrics()
.counter(getClass().getName() + "." + "metrics-persisted-counter");
this.metricFailed = environment.metrics()
.counter(getClass().getName() + "." + "metrics-failed-counter");
}
protected void executeQuery(String id, Statement query, long startTime) throws DriverException {
_executeQuery(id, query, startTime, 0);
}
private void _executeQuery(final String id, final Statement query, final long startTime,
final int retryCount) {
try {
session.execute(query);
commitTimer.update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
// ResultSetFuture future = session.executeAsync(query);
// Futures.addCallback(future, new FutureCallback<ResultSet>() {
// @Override
// public void onSuccess(ResultSet result) {
// metricCompleted.inc();
// commitTimer.update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
// }
//
// @Override
// public void onFailure(Throwable t) {
// if (t instanceof NoHostAvailableException | t instanceof
// BootstrappingException
// | t instanceof OverloadedException | t instanceof QueryConsistencyException
// | t instanceof UnavailableException) {
// retryQuery(id, query, startTime, retryCount, (DriverException) t);
// } else {
// metricFailed.inc();
// commitTimer.update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
// logger.error("Failed to execute query.", t);
// }
// }
// }, MoreExecutors.sameThreadExecutor());
} catch (NoHostAvailableException | BootstrappingException | OverloadedException
| QueryConsistencyException | UnavailableException | OperationTimedOutException e) {
retryQuery(id, query, startTime, retryCount, e);
} catch (DriverException e) {
metricFailed.inc(((BatchStatement) query).size());
commitTimer.update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
throw e;
}
}
private void retryQuery(String id, Statement query, final long startTime, int retryCount,
DriverException e) throws DriverException {
if (retryCount >= maxWriteRetries) {
logger.error("[{}]: Query aborted after {} retry: ", id, retryCount, e.getMessage());
metricFailed.inc(((BatchStatement) query).size());
commitTimer.update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
throw e;
} else {
logger.warn("[{}]: Query failed, retrying {} of {}: {} ", id, retryCount, maxWriteRetries,
e.getMessage());
try {
Thread.sleep(1000 * (1 << retryCount));
} catch (InterruptedException ie) {
logger.debug("[{}]: Interrupted: {}", id, ie);
}
_executeQuery(id, query, startTime, retryCount++);
}
}
public int handleFlush_batch(String id) {
Statement query;
int flushedCount = 0;
BatchStatement batch = new BatchStatement(Type.UNLOGGED);
while ((query = queue.poll()) != null) {
flushedCount++;
batch.add(query);
}
executeQuery(id, batch, System.nanoTime());
metricCompleted.inc(flushedCount);
return flushedCount;
}
public int handleFlush(String id) throws RepoException {
long startTime = System.nanoTime();
int flushedCount = 0;
List<ResultSetFuture> results = new ArrayList<>(queue.size());
Statement query;
while ((query = queue.poll()) != null) {
flushedCount++;
results.add(session.executeAsync(query));
}
List<ListenableFuture<ResultSet>> futures = Futures.inCompletionOrder(results);
boolean cancel = false;
Exception ex = null;
for (ListenableFuture<ResultSet> future : futures) {
if (cancel) {
future.cancel(false);
continue;
}
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
cancel = true;
ex = e;
}
}
commitTimer.update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
if (ex != null) {
throw new RepoException(ex);
}
return flushedCount;
}
}

View File

@ -0,0 +1,77 @@
package monasca.persister.repository.cassandra;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.WriteType;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.policies.RetryPolicy;
public class MonascaRetryPolicy implements RetryPolicy {
private final int readAttempts;
private final int writeAttempts;
private final int unavailableAttempts;
public MonascaRetryPolicy(int readAttempts, int writeAttempts, int unavailableAttempts) {
super();
this.readAttempts = readAttempts;
this.writeAttempts = writeAttempts;
this.unavailableAttempts = unavailableAttempts;
}
@Override
public RetryDecision onReadTimeout(Statement stmnt, ConsistencyLevel cl, int requiredResponses,
int receivedResponses, boolean dataReceived, int rTime) {
if (dataReceived) {
return RetryDecision.ignore();
} else if (rTime < readAttempts) {
return receivedResponses >= requiredResponses ? RetryDecision.retry(cl)
: RetryDecision.rethrow();
} else {
return RetryDecision.rethrow();
}
}
@Override
public RetryDecision onWriteTimeout(Statement stmnt, ConsistencyLevel cl, WriteType wt,
int requiredResponses, int receivedResponses, int wTime) {
if (wTime >= writeAttempts)
return RetryDecision.rethrow();
return RetryDecision.retry(cl);
}
@Override
public RetryDecision onUnavailable(Statement stmnt, ConsistencyLevel cl, int requiredResponses,
int receivedResponses, int uTime) {
if (uTime == 0) {
return RetryDecision.tryNextHost(cl);
} else if (uTime <= unavailableAttempts) {
return RetryDecision.retry(cl);
} else {
return RetryDecision.rethrow();
}
}
/**
* {@inheritDoc}
*/
@Override
public RetryDecision onRequestError(Statement statement, ConsistencyLevel cl, DriverException e,
int nbRetry) {
return RetryDecision.tryNextHost(cl);
}
@Override
public void init(Cluster cluster) {
// nothing to do
}
@Override
public void close() {
// nothing to do
}
}

View File

@ -1,6 +1,8 @@
/* /*
* (C) Copyright 2014-2016 Hewlett Packard Enterprise Development LP * (C) Copyright 2014-2016 Hewlett Packard Enterprise Development LP
* *
* (C) Copyright 2017 SUSE LLC.
*
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
@ -50,6 +52,7 @@ import monasca.common.model.metric.MetricEnvelope;
import monasca.persister.configuration.PersisterConfig; import monasca.persister.configuration.PersisterConfig;
import monasca.persister.repository.Repo; import monasca.persister.repository.Repo;
import monasca.persister.repository.RepoException; import monasca.persister.repository.RepoException;
import monasca.persister.repository.Sha1HashId;
public class VerticaMetricRepo extends VerticaRepo implements Repo<MetricEnvelope> { public class VerticaMetricRepo extends VerticaRepo implements Repo<MetricEnvelope> {

View File

@ -1,6 +1,7 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP # (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
# Copyright 2017 FUJITSU LIMITED # Copyright 2017 FUJITSU LIMITED
# # (C) Copyright 2017 SUSE LLC
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
# You may obtain a copy of the License at # You may obtain a copy of the License at
@ -17,13 +18,47 @@
from oslo_config import cfg from oslo_config import cfg
cassandra_opts = [ cassandra_opts = [
cfg.ListOpt('cluster_ip_addresses', cfg.ListOpt('contact_points',
help='Comma separated list of Cassandra node IP addresses', help='Comma separated list of Cassandra node IP addresses',
default=['127.0.0.1'], default=['127.0.0.1'],
item_type=cfg.IPOpt), item_type=cfg.IPOpt),
cfg.IntOpt('port',
help='Cassandra port number',
default=8086),
cfg.StrOpt('keyspace', cfg.StrOpt('keyspace',
help='keyspace where metric are stored', help='Keyspace name where metrics are stored',
default='monasca')] default='monasca'),
cfg.StrOpt('user',
help='Cassandra user name',
default=''),
cfg.StrOpt('password',
help='Cassandra password',
secret=True,
default=''),
cfg.IntOpt('connection_timeout',
help='Cassandra timeout in seconds when creating a new connection',
default=5),
cfg.IntOpt('read_timeout',
help='Cassandra read timeout in seconds',
default=60),
cfg.IntOpt('max_write_retries',
help='Maximum number of retries in write ops',
default=1),
cfg.IntOpt('max_definition_cache_size',
help='Maximum number of cached metric definition entries in memory',
default=20000000),
cfg.IntOpt('retention_policy',
help='Data retention period in days',
default=45),
cfg.StrOpt('consistency_level',
help='Cassandra default consistency level',
default='ONE'),
cfg.StrOpt('local_data_center',
help='Cassandra local data center name'),
cfg.IntOpt('max_batches',
help='Maximum batch size in Cassandra',
default=250),
]
cassandra_group = cfg.OptGroup(name='cassandra') cassandra_group = cfg.OptGroup(name='cassandra')

View File

@ -1,5 +1,6 @@
# (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP # (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP
# Copyright 2017 FUJITSU LIMITED # Copyright 2017 FUJITSU LIMITED
# (C) Copyright 2017 SUSE LLC
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -40,7 +41,10 @@ kafka_alarm_history_opts = [
default='alarm-state-transitions'), default='alarm-state-transitions'),
cfg.StrOpt('zookeeper_path', cfg.StrOpt('zookeeper_path',
help='Path in zookeeper for kafka consumer group partitioning algorithm', help='Path in zookeeper for kafka consumer group partitioning algorithm',
default='/persister_partitions/$kafka_alarm_history.topic') default='/persister_partitions/$kafka_alarm_history.topic'),
cfg.IntOpt('batch_size',
help='Maximum number of alarm state history messages to buffer before writing to database',
default=1),
] ]

View File

@ -1,5 +1,6 @@
# (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP # (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP
# Copyright 2017 FUJITSU LIMITED # Copyright 2017 FUJITSU LIMITED
# (C) Copyright 2017 SUSE LLC
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -26,9 +27,6 @@ kafka_common_opts = [
help='id of persister kafka client', help='id of persister kafka client',
advanced=True, advanced=True,
default='monasca-persister'), default='monasca-persister'),
cfg.IntOpt('database_batch_size',
help='Maximum number of metric to buffer before writing to database',
default=1000),
cfg.IntOpt('max_wait_time_seconds', cfg.IntOpt('max_wait_time_seconds',
help='Maximum wait time for write batch to database', help='Maximum wait time for write batch to database',
default=30), default=30),

View File

@ -41,6 +41,9 @@ kafka_metrics_opts = [
cfg.StrOpt('zookeeper_path', cfg.StrOpt('zookeeper_path',
help='Path in zookeeper for kafka consumer group partitioning algorithm', help='Path in zookeeper for kafka consumer group partitioning algorithm',
default='/persister_partitions/$kafka_metrics.topic'), default='/persister_partitions/$kafka_metrics.topic'),
cfg.IntOpt('batch_size',
help='Maximum number of metrics to buffer before writing to database',
default=20000),
] ]
# Replace Default OPt with reference to kafka group option # Replace Default OPt with reference to kafka group option

View File

@ -1,4 +1,5 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP # (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2017 SUSE LLC
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -12,26 +13,23 @@
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import abc import abc
from cassandra import cluster
from cassandra import query
from oslo_config import cfg from oslo_config import cfg
import six import six
from monasca_persister.repositories import abstract_repository from monasca_persister.repositories import abstract_repository
from monasca_persister.repositories.cassandra import connection_util
conf = cfg.CONF
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class AbstractCassandraRepository(abstract_repository.AbstractRepository): class AbstractCassandraRepository(abstract_repository.AbstractRepository):
def __init__(self): def __init__(self):
super(AbstractCassandraRepository, self).__init__() super(AbstractCassandraRepository, self).__init__()
self.conf = cfg.CONF
self._cassandra_cluster = cluster.Cluster( self._cluster = connection_util.create_cluster()
self.conf.cassandra.cluster_ip_addresses.split(',')) self._session = connection_util.create_session(self._cluster)
self._retention = conf.cassandra.retention_policy * 24 * 3600
self.cassandra_session = self._cassandra_cluster.connect( self._cache_size = conf.cassandra.max_definition_cache_size
self.conf.cassandra.keyspace) self._max_batches = conf.cassandra.max_batches
self._batch_stmt = query.BatchStatement()

View File

@ -1,4 +1,5 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP # (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2017 SUSE LLC
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -12,9 +13,11 @@
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import ujson as json import ujson as json
from cassandra import query from cassandra.concurrent import execute_concurrent_with_args
from oslo_config import cfg
from oslo_log import log from oslo_log import log
from monasca_persister.repositories.cassandra import abstract_repository from monasca_persister.repositories.cassandra import abstract_repository
@ -22,51 +25,38 @@ from monasca_persister.repositories.utils import parse_alarm_state_hist_message
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
UPSERT_CQL = ('update monasca.alarm_state_history USING TTL ? '
'set metric = ?, old_state = ?, new_state = ?, sub_alarms = ?, reason = ?, reason_data = ? '
'where tenant_id = ? and alarm_id = ? and time_stamp = ?')
class AlarmStateHistCassandraRepository(
abstract_repository.AbstractCassandraRepository):
class AlarmStateHistCassandraRepository(abstract_repository.AbstractCassandraRepository):
def __init__(self): def __init__(self):
super(AlarmStateHistCassandraRepository, self).__init__() super(AlarmStateHistCassandraRepository, self).__init__()
self._insert_alarm_state_hist_stmt = self.cassandra_session.prepare( self._upsert_stmt = self._session.prepare(UPSERT_CQL)
'insert into alarm_state_history (tenant_id, alarm_id, '
'metrics, new_state, '
'old_state, reason, reason_data, '
'sub_alarms, time_stamp) values (?,?,?,?,?,?,?,?,?)')
def process_message(self, message): def process_message(self, message):
(alarm_id, metrics, new_state, old_state, link, lifecycle_state, state_change_reason,
sub_alarms_json_snake_case,
tenant_id, time_stamp) = parse_alarm_state_hist_message(message)
(alarm_id, metrics, new_state, old_state, link, alarm_state_hist = (self._retention,
lifecycle_state, state_change_reason, json.dumps(metrics, ensure_ascii=False).encode('utf8'),
sub_alarms_json_snake_case, tenant_id, old_state.encode('utf8'),
time_stamp) = parse_alarm_state_hist_message( new_state.encode('utf8'),
message) sub_alarms_json_snake_case.encode('utf8'),
state_change_reason.encode('utf8'),
alarm_state_hist = ( "{}".encode('utf8'),
tenant_id.encode('utf8'), tenant_id.encode('utf8'),
alarm_id.encode('utf8'), alarm_id.encode('utf8'),
json.dumps(metrics, ensure_ascii=False).encode( time_stamp)
'utf8'),
new_state.encode('utf8'),
old_state.encode('utf8'),
state_change_reason.encode('utf8'),
"{}".encode('utf8'),
sub_alarms_json_snake_case.encode('utf8'),
time_stamp
)
LOG.debug(alarm_state_hist)
return alarm_state_hist return alarm_state_hist
def write_batch(self, alarm_state_hists): def write_batch(self, alarm_state_hists):
while alarm_state_hists:
for alarm_state_hist in alarm_state_hists: num_rows = min(len(alarm_state_hists), cfg.CONF.kafka_alarm_history.batch_size)
self._batch_stmt.add(self._insert_alarm_state_hist_stmt, batch = alarm_state_hists[:num_rows]
alarm_state_hist) execute_concurrent_with_args(self._session, self._upsert_stmt, batch)
alarm_state_hists = alarm_state_hists[num_rows:]
self.cassandra_session.execute(self._batch_stmt)
self._batch_stmt = query.BatchStatement()

View File

@ -0,0 +1,51 @@
# (C) Copyright 2017 SUSE LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
from cassandra.cluster import ConsistencyLevel
from cassandra.cluster import DCAwareRoundRobinPolicy
from cassandra.cluster import TokenAwarePolicy
from oslo_config import cfg
from monasca_persister.repositories.cassandra.retry_policy import MonascaRetryPolicy
conf = cfg.CONF
def create_cluster():
user = conf.cassandra.user
if user:
auth_provider = PlainTextAuthProvider(username=user, password=conf.cassandra.password)
else:
auth_provider = None
contact_points = [ip.dest for ip in conf.cassandra.contact_points]
cluster = Cluster(contact_points,
port=conf.cassandra.port,
auth_provider=auth_provider,
connect_timeout=conf.cassandra.connection_timeout,
load_balancing_policy=TokenAwarePolicy(
DCAwareRoundRobinPolicy(local_dc=conf.cassandra.local_data_center)),
)
cluster.default_retry_policy = MonascaRetryPolicy(1, conf.cassandra.max_write_retries,
conf.cassandra.max_write_retries)
return cluster
def create_session(cluster):
session = cluster.connect(conf.cassandra.keyspace)
session.default_timeout = conf.cassandra.read_timeout
session.default_consistency_level = ConsistencyLevel.name_to_value[conf.cassandra.consistency_level]
return session

View File

@ -0,0 +1,150 @@
# (C) Copyright 2017 SUSE LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from cassandra.query import BatchStatement
from cassandra.query import BatchType
from oslo_log import log
LOG = log.getLogger(__name__)
class MetricBatch(object):
def __init__(self, metadata, load_balance_policy, batch_limit):
self.metadata = metadata
self.batch_limit = batch_limit
self.lb_policy = load_balance_policy
self.metric_queries = dict()
self.dimension_queries = dict()
self.dimension_metric_queries = dict()
self.metric_dimension_queries = dict()
self.measurement_queries = dict()
def batch_query_by_token(self, bound_stmt, query_map):
token = self.metadata.token_map.token_class.from_key(bound_stmt.routing_key)
queue = query_map.get(token, None)
if not queue:
queue = []
batch = BatchStatement(BatchType.UNLOGGED)
batch.add(bound_stmt)
queue.append((batch, Counter(1)))
query_map[token] = queue
else:
(batch, counter) = queue[-1]
if counter.value() < self.batch_limit:
batch.add(bound_stmt)
counter.increment()
else:
batch = BatchStatement(BatchType.UNLOGGED)
batch.add(bound_stmt)
queue.append((batch, Counter(1)))
def add_metric_query(self, bound_stmt):
self.batch_query_by_token(bound_stmt, self.metric_queries)
def add_dimension_query(self, bound_stmt):
self.batch_query_by_token(bound_stmt, self.dimension_queries)
def add_dimension_metric_query(self, bound_stmt):
self.batch_query_by_token(bound_stmt, self.dimension_metric_queries)
def add_metric_dimension_query(self, bound_stmt):
self.batch_query_by_token(bound_stmt, self.metric_dimension_queries)
def add_measurement_query(self, bound_stmt):
self.batch_query_by_replicas(bound_stmt, self.measurement_queries)
def batch_query_by_replicas(self, bound_stmt, query_map):
hosts = tuple(self.lb_policy.make_query_plan(working_keyspace=bound_stmt.keyspace, query=bound_stmt))
queue = query_map.get(hosts, None)
if not queue:
queue = []
batch = BatchStatement(BatchType.UNLOGGED)
batch.add(bound_stmt)
queue.append((batch, Counter(1)))
query_map[hosts] = queue
else:
(batch, counter) = queue[-1]
if counter.value() < 30:
batch.add(bound_stmt)
counter.increment()
else:
batch = BatchStatement(BatchType.UNLOGGED)
batch.add(bound_stmt)
queue.append((batch, Counter(1)))
def clear(self):
self.metric_queries.clear()
self.dimension_queries.clear()
self.dimension_metric_queries.clear()
self.metric_dimension_queries.clear()
self.measurement_queries.clear()
@staticmethod
def log_token_batch_map(name, query_map):
LOG.info('%s : Size: %s; Tokens: |%s|' % (name, len(query_map),
'|'.join(['%s: %s' % (
token,
','.join([str(counter.value()) for (batch, counter) in queue]))
for token, queue in query_map.items()])))
@staticmethod
def log_replica_batch_map(name, query_map):
LOG.info('%s : Size: %s; Replicas: |%s|' % (name, len(query_map), '|'.join([
'%s: %s' % (
','.join([h.address for h in hosts]), ','.join([str(counter.value()) for (batch, counter) in queue]))
for hosts, queue in query_map.items()])))
def get_all_batches(self):
self.log_token_batch_map("metric batches", self.metric_queries)
self.log_token_batch_map("dimension batches", self.dimension_queries)
self.log_token_batch_map("dimension metric batches", self.dimension_metric_queries)
self.log_token_batch_map("metric dimension batches", self.metric_dimension_queries)
self.log_replica_batch_map("measurement batches", self.measurement_queries)
result_list = []
for q in self.measurement_queries.values():
result_list.extend(q)
for q in self.metric_queries.values():
result_list.extend(q)
for q in self.dimension_queries.values():
result_list.extend(q)
for q in self.dimension_metric_queries.values():
result_list.extend(q)
for q in self.metric_dimension_queries.values():
result_list.extend(q)
return result_list
class Counter(object):
def __init__(self, init_value=0):
self._count = init_value
def increment(self):
self._count += 1
def increment_by(self, increment):
self._count += increment
def value(self):
return self._count

View File

@ -1,4 +1,5 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP # (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2017 SUSE LLC
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -12,105 +13,261 @@
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from __future__ import with_statement
from cachetools import LRUCache
from collections import namedtuple
import hashlib import hashlib
import threading
import ujson as json import ujson as json
from cassandra import query from cassandra.concurrent import execute_concurrent
from oslo_log import log from oslo_log import log
import urllib
from monasca_persister.repositories.cassandra import abstract_repository from monasca_persister.repositories.cassandra import abstract_repository
from monasca_persister.repositories.cassandra import token_range_query_manager
from monasca_persister.repositories.cassandra.metric_batch import MetricBatch
from monasca_persister.repositories.utils import parse_measurement_message from monasca_persister.repositories.utils import parse_measurement_message
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
MEASUREMENT_INSERT_CQL = ('update monasca.measurements USING TTL ? '
'set value = ?, value_meta = ?, region = ?, tenant_id = ?, metric_name = ?, dimensions = ? '
'where metric_id = ? and time_stamp = ?')
class MetricCassandraRepository( MEASUREMENT_UPDATE_CQL = ('update monasca.measurements USING TTL ? '
abstract_repository.AbstractCassandraRepository): 'set value = ?, value_meta = ? where metric_id = ? and time_stamp = ?')
METRICS_INSERT_CQL = ('update monasca.metrics USING TTL ? '
'set metric_id = ?, created_at = ?, updated_at = ? '
'where region = ? and tenant_id = ? and metric_name = ? and dimensions = ? '
'and dimension_names = ?')
METRICS_UPDATE_CQL = ('update monasca.metrics USING TTL ? '
'set updated_at = ? '
'where region = ? and tenant_id = ? and metric_name = ? and dimensions = ? '
'and dimension_names = ?')
DIMENSION_INSERT_CQL = ('insert into monasca.dimensions '
'(region, tenant_id, name, value) values (?, ?, ?, ?)')
DIMENSION_METRIC_INSERT_CQL = ('insert into monasca.dimensions_metrics '
'(region, tenant_id, dimension_name, dimension_value, metric_name) '
'values (?, ?, ?, ?, ?)')
METRIC_DIMENSION_INSERT_CQL = ('insert into monasca.metrics_dimensions '
'(region, tenant_id, metric_name, dimension_name, dimension_value) '
'values (?, ?, ?, ?, ?)')
RETRIEVE_DIMENSION_CQL = 'select region, tenant_id, name, value from dimensions'
RETRIEVE_METRIC_DIMENSION_CQL = ('select region, tenant_id, metric_name, '
'dimension_name, dimension_value from metrics_dimensions '
'WHERE token(region, tenant_id, metric_name) > ? '
'and token(region, tenant_id, metric_name) <= ? ')
Metric = namedtuple('Metric', ['id', 'region', 'tenant_id', 'name', 'dimension_list', 'dimension_names',
'time_stamp', 'value', 'value_meta'])
class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository):
def __init__(self): def __init__(self):
super(MetricCassandraRepository, self).__init__() super(MetricCassandraRepository, self).__init__()
self._insert_measurement_stmt = self.cassandra_session.prepare( self._lock = threading.RLock()
'insert into measurements (tenant_id,'
'region, metric_hash, time_stamp, value,'
'value_meta) values (?, ?, ?, ?, ?, ?)')
self._insert_metric_map_stmt = self.cassandra_session.prepare( LOG.debug("prepare cql statements...")
'insert into metric_map (tenant_id,'
'region, metric_hash, ' self._measurement_insert_stmt = self._session.prepare(MEASUREMENT_INSERT_CQL)
'metric_map) values' self._measurement_insert_stmt.is_idempotent = True
'(?,?,?,?)')
self._measurement_update_stmt = self._session.prepare(MEASUREMENT_UPDATE_CQL)
self._measurement_update_stmt.is_idempotent = True
self._metric_insert_stmt = self._session.prepare(METRICS_INSERT_CQL)
self._metric_insert_stmt.is_idempotent = True
self._metric_update_stmt = self._session.prepare(METRICS_UPDATE_CQL)
self._metric_update_stmt.is_idempotent = True
self._dimension_stmt = self._session.prepare(DIMENSION_INSERT_CQL)
self._dimension_stmt.is_idempotent = True
self._dimension_metric_stmt = self._session.prepare(DIMENSION_METRIC_INSERT_CQL)
self._dimension_metric_stmt.is_idempotent = True
self._metric_dimension_stmt = self._session.prepare(METRIC_DIMENSION_INSERT_CQL)
self._metric_dimension_stmt.is_idempotent = True
self._retrieve_metric_dimension_stmt = self._session.prepare(RETRIEVE_METRIC_DIMENSION_CQL)
self._metric_batch = MetricBatch(self._cluster.metadata, self._cluster.load_balancing_policy, self._max_batches)
self._metric_id_cache = LRUCache(self._cache_size)
self._dimension_cache = LRUCache(self._cache_size)
self._metric_dimension_cache = LRUCache(self._cache_size)
self._load_dimension_cache()
self._load_metric_dimension_cache()
def process_message(self, message): def process_message(self, message):
(dimensions, metric_name, region, tenant_id, time_stamp, value, (dimensions, metric_name, region, tenant_id, time_stamp, value,
value_meta) = parse_measurement_message(message) value_meta) = parse_measurement_message(message)
metric_hash, metric_map = create_metric_hash(metric_name, with self._lock:
dimensions) dim_names = []
dim_list = []
for name in sorted(dimensions.iterkeys()):
dim_list.append('%s\t%s' % (name, dimensions[name]))
dim_names.append(name)
measurement = (tenant_id.encode('utf8'), hash_string = '%s\0%s\0%s\0%s' % (region, tenant_id, metric_name, '\0'.join(dim_list))
region.encode('utf8'), metric_id = hashlib.sha1(hash_string.encode('utf8')).hexdigest()
metric_hash,
time_stamp,
value,
json.dumps(value_meta, ensure_ascii=False).encode(
'utf8'))
LOG.debug(measurement) metric = Metric(id=metric_id,
region=region,
tenant_id=tenant_id,
name=metric_name,
dimension_list=dim_list,
dimension_names=dim_names,
time_stamp=time_stamp,
value=value,
value_meta=json.dumps(value_meta, ensure_ascii=False))
return MetricMeasurementInfo( id_bytes = bytearray.fromhex(metric.id)
tenant_id.encode('utf8'), if self._metric_id_cache.get(metric.id, None):
region.encode('utf8'), measurement_bound_stmt = self._measurement_update_stmt.bind((self._retention,
metric_hash, metric.value,
metric_map, metric.value_meta,
measurement) id_bytes,
metric.time_stamp))
self._metric_batch.add_measurement_query(measurement_bound_stmt)
def write_batch(self, metric_measurement_infos): metric_update_bound_stmt = self._metric_update_stmt.bind((self._retention,
metric.time_stamp,
metric.region,
metric.tenant_id,
metric.name,
metric.dimension_list,
metric.dimension_names))
self._metric_batch.add_metric_query(metric_update_bound_stmt)
for metric_measurement_info in metric_measurement_infos: return metric
self._batch_stmt.add(self._insert_measurement_stmt, self._metric_id_cache[metric.id] = metric.id
metric_measurement_info.measurement)
metric_map = (metric_measurement_info.tenant_id, metric_insert_bound_stmt = self._metric_insert_stmt.bind((self._retention,
metric_measurement_info.region, id_bytes,
metric_measurement_info.metric_hash, metric.time_stamp,
metric_measurement_info.metric_map) metric.time_stamp,
metric.region,
metric.tenant_id,
metric.name,
metric.dimension_list,
metric.dimension_names))
self._metric_batch.add_metric_query(metric_insert_bound_stmt)
self._batch_stmt.add(self._insert_metric_map_stmt, for dim in metric.dimension_list:
metric_map) (name, value) = dim.split('\t')
dim_key = self._get_dimnesion_key(metric.region, metric.tenant_id, name, value)
if not self._dimension_cache.get(dim_key, None):
dimension_bound_stmt = self._dimension_stmt.bind((metric.region,
metric.tenant_id,
name,
value))
self._metric_batch.add_dimension_query(dimension_bound_stmt)
self._dimension_cache[dim_key] = dim_key
self.cassandra_session.execute(self._batch_stmt) metric_dim_key = self._get_metric_dimnesion_key(metric.region, metric.tenant_id, metric.name, name,
value)
if not self._metric_dimension_cache.get(metric_dim_key, None):
dimension_metric_bound_stmt = self._dimension_metric_stmt.bind((metric.region,
metric.tenant_id,
name,
value,
metric.name))
self._metric_batch.add_dimension_metric_query(dimension_metric_bound_stmt)
self._batch_stmt = query.BatchStatement() metric_dimension_bound_stmt = self._metric_dimension_stmt.bind((metric.region,
metric.tenant_id,
metric.name,
name,
value))
self._metric_batch.add_metric_dimension_query(metric_dimension_bound_stmt)
self._metric_dimension_cache[metric_dim_key] = metric_dim_key
class MetricMeasurementInfo(object): measurement_insert_bound_stmt = self._measurement_insert_stmt.bind((self._retention,
metric.value,
metric.value_meta,
metric.region,
metric.tenant_id,
metric.name,
metric.dimension_list,
id_bytes,
metric.time_stamp))
self._metric_batch.add_measurement_query(measurement_insert_bound_stmt)
def __init__(self, tenant_id, region, metric_hash, metric_map, return metric
measurement):
self.tenant_id = tenant_id def write_batch(self, metrics):
self.region = region
self.metric_hash = metric_hash
self.metric_map = metric_map
self.measurement = measurement
with self._lock:
batch_list = self._metric_batch.get_all_batches()
def create_metric_hash(metric_name, dimensions): results = execute_concurrent(self._session, batch_list, raise_on_first_error=True)
dimensions['__name__'] = urllib.quote_plus(metric_name) self._handle_results(results)
hash_string = '' self._metric_batch.clear()
for dim_name in sorted(dimensions.iterkeys()): LOG.info("flushed %s metrics", len(metrics))
dimension = (urllib.quote_plus(dim_name) + '=' + urllib.quote_plus(
dimensions[dim_name]))
hash_string += dimension
sha1_hash = hashlib.sha1(hash_string).hexdigest() @staticmethod
def _handle_results(results):
for (success, result) in results:
if not success:
raise result
return bytearray.fromhex(sha1_hash), dimensions def _load_dimension_cache(self):
rows = self._session.execute(RETRIEVE_DIMENSION_CQL)
if not rows:
return
for row in rows:
key = self._get_dimnesion_key(row.region, row.tenant_id, row.name, row.value)
self._dimension_cache[key] = key
LOG.info("loaded %s dimension entries cache from database into cache." % self._dimension_cache.currsize)
@staticmethod
def _get_dimnesion_key(region, tenant_id, name, value):
return '%s\0%s\0%s\0%s' % (region, tenant_id, name, value)
def _load_metric_dimension_cache(self):
qm = token_range_query_manager.TokenRangeQueryManager(RETRIEVE_METRIC_DIMENSION_CQL,
self._process_metric_dimension_query)
token_ring = self._cluster.metadata.token_map.ring
qm.query(token_ring)
def _process_metric_dimension_query(self, rows):
cnt = 0
for row in rows:
key = self._get_metric_dimnesion_key(row.region, row.tenant_id, row.metric_name, row.dimension_name,
row.dimension_value)
self._metric_dimension_cache[key] = key
cnt += 1
LOG.info("loaded %s metric dimension entries from database into cache." % cnt)
LOG.info(
"total loaded %s metric dimension entries in cache." % self._metric_dimension_cache.currsize)
@staticmethod
def _get_metric_dimnesion_key(region, tenant_id, metric_name, dimension_name, dimension_value):
return '%s\0%s\0%s\0%s\0%s' % (region, tenant_id, metric_name, dimension_name, dimension_value)

View File

@ -0,0 +1,49 @@
# (C) Copyright 2017 SUSE LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from cassandra.policies import RetryPolicy
class MonascaRetryPolicy(RetryPolicy):
def __init__(self, read_attempts, write_attempts, unavailable_attempts):
super(MonascaRetryPolicy, self).__init__()
self.read_attempts = read_attempts
self.write_attempts = write_attempts
self.unavailable_attempts = unavailable_attempts
def on_read_timeout(self, query, consistency, required_responses,
received_responses, data_retrieved, retry_num):
if retry_num >= self.read_attempts:
return self.RETHROW, None
elif received_responses >= required_responses and not data_retrieved:
return self.RETRY, consistency
else:
return self.RETHROW, None
def on_write_timeout(self, query, consistency, write_type,
required_responses, received_responses, retry_num):
if retry_num >= self.write_attempts:
return self.RETHROW, None
else:
return self.RETRY, consistency
def on_unavailable(self, query, consistency, required_replicas, alive_replicas, retry_num):
return (self.RETRY_NEXT_HOST, consistency) if retry_num < self.unavailable_attempts else (self.RETHROW, None)

View File

@ -0,0 +1,67 @@
# (C) Copyright 2017 SUSE LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import multiprocessing
from oslo_config import cfg
from oslo_log import log
from monasca_persister.repositories.cassandra import connection_util
LOG = log.getLogger(__name__)
conf = cfg.CONF
class TokenRangeQueryManager(object):
def __init__(self, cql, result_handler, process_count=None):
if process_count:
self._process_count = process_count
else:
self._process_count = multiprocessing.cpu_count()
self._pool = multiprocessing.Pool(processes=self._process_count, initializer=self._setup,
initargs=(cql, result_handler,))
@classmethod
def _setup(cls, cql, result_handler):
cls.cluster = connection_util.create_cluster()
cls.session = connection_util.create_session(cls.cluster)
cls.prepared = cls.session.prepare(cql)
cls.result_handler = result_handler
def close_pool(self):
self._pool.close()
self._pool.join()
def query(self, token_ring):
range_size = len(token_ring) / self._process_count + 1
start_index = 0
params = []
while start_index < len(token_ring):
end_index = start_index + range_size - 1
if end_index >= len(token_ring):
end_index = len(token_ring) - 1
params.append((token_ring[start_index].value, token_ring[end_index].value))
start_index = end_index + 1
self._pool.map(execute_query_token_range, params, 1)
def execute_query_token_range(token_range):
results = TokenRangeQueryManager.session.execute(TokenRangeQueryManager.prepared.bind(token_range))
TokenRangeQueryManager.result_handler(results)

View File

@ -1,4 +1,5 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP # (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2017 SUSE LLC
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -29,7 +30,7 @@ class Persister(object):
self._kafka_topic = kafka_conf.topic self._kafka_topic = kafka_conf.topic
self._database_batch_size = kafka_conf.database_batch_size self._batch_size = kafka_conf.batch_size
self._consumer = consumer.KafkaConsumer( self._consumer = consumer.KafkaConsumer(
kafka_conf.uri, kafka_conf.uri,
@ -71,7 +72,7 @@ class Persister(object):
LOG.exception('Error processing message. Message is ' LOG.exception('Error processing message. Message is '
'being dropped. {}'.format(message)) 'being dropped. {}'.format(message))
if len(self._data_points) >= self._database_batch_size: if len(self._data_points) >= self._batch_size:
self._flush() self._flush()
except Exception: except Exception:
LOG.exception( LOG.exception(