diff --git a/java/pom.xml b/java/pom.xml
index e7812aa3..b9ea7277 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -49,6 +49,26 @@
monasca-common-influxdb
${mon.common.version}
+
+ monasca-common
+ monasca-common-cassandra
+ ${mon.common.version}
+
+
+ com.datastax.cassandra
+ cassandra-driver-core
+ 3.1.0
+
+
+ com.datastax.cassandra
+ cassandra-driver-mapping
+ 3.1.0
+
+
+ com.datastax.cassandra
+ cassandra-driver-extras
+ 3.1.0
+
org.apache.kafka
kafka_2.11
@@ -72,6 +92,12 @@
io.dropwizard
dropwizard-core
0.7.0
+
+
+ com.codahale.metrics
+ metrics-core
+
+
io.dropwizard
@@ -88,6 +114,11 @@
guice-assistedinject
3.0
+
+ com.google.guava
+ guava
+ 17.0
+
org.mockito
mockito-all
diff --git a/java/src/main/java/monasca/persister/PersisterModule.java b/java/src/main/java/monasca/persister/PersisterModule.java
index 67e49539..54e362cf 100644
--- a/java/src/main/java/monasca/persister/PersisterModule.java
+++ b/java/src/main/java/monasca/persister/PersisterModule.java
@@ -1,6 +1,8 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
+ * Copyright (c) 2017 SUSE LLC.
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -46,6 +48,9 @@ import monasca.persister.pipeline.event.AlarmStateTransitionHandlerFactory;
import monasca.persister.pipeline.event.MetricHandler;
import monasca.persister.pipeline.event.MetricHandlerFactory;
import monasca.persister.repository.Repo;
+import monasca.persister.repository.cassandra.CassandraAlarmRepo;
+import monasca.persister.repository.cassandra.CassandraCluster;
+import monasca.persister.repository.cassandra.CassandraMetricRepo;
import monasca.persister.repository.influxdb.InfluxV9AlarmRepo;
import monasca.persister.repository.influxdb.InfluxV9MetricRepo;
import monasca.persister.repository.influxdb.InfluxV9RepoWriter;
@@ -56,7 +61,7 @@ public class PersisterModule extends AbstractModule {
private static final String VERTICA = "vertica";
private static final String INFLUXDB = "influxdb";
-
+ private static final String CASSANDRA = "cassandra";
private static final String INFLUXDB_V9 = "v9";
private final PersisterConfig config;
@@ -168,6 +173,13 @@ public class PersisterModule extends AbstractModule {
bind(new TypeLiteral> () {})
.to(InfluxV9AlarmRepo.class);
+ } else if (config.getDatabaseConfiguration().getDatabaseType().equalsIgnoreCase(CASSANDRA)) {
+ bind(CassandraCluster.class).in(Singleton.class);
+
+ bind(new TypeLiteral>() {}).to(CassandraMetricRepo.class);
+
+ bind(new TypeLiteral>() {}).to(CassandraAlarmRepo.class);
+
} else {
System.err.println(
diff --git a/java/src/main/java/monasca/persister/configuration/PersisterConfig.java b/java/src/main/java/monasca/persister/configuration/PersisterConfig.java
index 3abf8ab8..b8d09a68 100644
--- a/java/src/main/java/monasca/persister/configuration/PersisterConfig.java
+++ b/java/src/main/java/monasca/persister/configuration/PersisterConfig.java
@@ -1,6 +1,8 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
+ * Copyright (c) 2017 SUSE LLC.
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -19,6 +21,7 @@ package monasca.persister.configuration;
import com.fasterxml.jackson.annotation.JsonProperty;
+import monasca.common.configuration.CassandraDbConfiguration;
import monasca.common.configuration.DatabaseConfiguration;
import monasca.common.configuration.InfluxDbConfiguration;
import io.dropwizard.Configuration;
@@ -97,4 +100,12 @@ public class PersisterConfig extends Configuration {
public InfluxDbConfiguration getInfluxDBConfiguration() {
return influxDbConfiguration;
}
+
+ @Valid
+ @JsonProperty
+ private final CassandraDbConfiguration cassandraDbConfiguration = new CassandraDbConfiguration();
+
+ public CassandraDbConfiguration getCassandraDbConfiguration() {
+ return cassandraDbConfiguration;
+ }
}
diff --git a/java/src/main/java/monasca/persister/configuration/PipelineConfig.java b/java/src/main/java/monasca/persister/configuration/PipelineConfig.java
index c6adef15..b5f95016 100644
--- a/java/src/main/java/monasca/persister/configuration/PipelineConfig.java
+++ b/java/src/main/java/monasca/persister/configuration/PipelineConfig.java
@@ -42,6 +42,17 @@ public class PipelineConfig {
@JsonProperty
Integer maxBatchTime;
+ @JsonProperty
+ Integer commitBatchTime;
+
+ public Integer getCommitBatchTime() {
+ return commitBatchTime;
+ }
+
+ public void setCommitBatchTime(Integer commitBatchTime) {
+ this.commitBatchTime = commitBatchTime;
+ }
+
public String getTopic() {
return topic;
}
diff --git a/java/src/main/java/monasca/persister/consumer/KafkaChannel.java b/java/src/main/java/monasca/persister/consumer/KafkaChannel.java
index 1c208110..9c863f3d 100644
--- a/java/src/main/java/monasca/persister/consumer/KafkaChannel.java
+++ b/java/src/main/java/monasca/persister/consumer/KafkaChannel.java
@@ -1,6 +1,8 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
+ * Copyright (c) 2017 SUSE LLC
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -44,33 +46,50 @@ public class KafkaChannel {
private final String topic;
private final ConsumerConnector consumerConnector;
private final String threadId;
+ private final int commitBatchtimeInMills;
+ private long nextCommitTime;
+ private boolean commitDirty = false;
@Inject
- public KafkaChannel(
- PersisterConfig configuration,
- @Assisted PipelineConfig pipelineConfig,
+ public KafkaChannel(PersisterConfig configuration, @Assisted PipelineConfig pipelineConfig,
@Assisted String threadId) {
this.topic = pipelineConfig.getTopic();
this.threadId = threadId;
- Properties kafkaProperties =
- createKafkaProperties(configuration.getKafkaConfig(), pipelineConfig);
+ this.commitBatchtimeInMills = pipelineConfig.getCommitBatchTime();
+ nextCommitTime = System.currentTimeMillis() + commitBatchtimeInMills;
+ Properties kafkaProperties = createKafkaProperties(configuration.getKafkaConfig(), pipelineConfig);
consumerConnector = Consumer.createJavaConsumerConnector(createConsumerConfig(kafkaProperties));
}
public final void markRead() {
- this.consumerConnector.commitOffsets();
+ if (commitBatchtimeInMills <= 0) {
+ consumerConnector.commitOffsets();
+ } else if (nextCommitTime <= System.currentTimeMillis()) {
+ consumerConnector.commitOffsets();
+ nextCommitTime = System.currentTimeMillis() + commitBatchtimeInMills;
+ commitDirty = false;
+ } else {
+ commitDirty = true;
+ }
+ }
+
+ public final void markReadIfDirty() {
+ if (commitDirty) {
+ this.consumerConnector.commitOffsets();
+ commitDirty = false;
+ }
}
public KafkaStream getKafkaStream() {
final Map topicCountMap = new HashMap<>();
topicCountMap.put(this.topic, 1);
- Map>> streamMap =
- this.consumerConnector.createMessageStreams(topicCountMap);
+ Map>> streamMap = this.consumerConnector
+ .createMessageStreams(topicCountMap);
List> streams = streamMap.values().iterator().next();
if (streams.size() != 1) {
- throw new IllegalStateException(String.format(
- "Expected only one stream but instead there are %d", streams.size()));
+ throw new IllegalStateException(
+ String.format("Expected only one stream but instead there are %d", streams.size()));
}
return streams.get(0);
}
@@ -92,32 +111,28 @@ public class KafkaChannel {
properties.put("consumer.id",
String.format("%s_%s", pipelineConfig.getConsumerId(), this.threadId));
properties.put("socket.timeout.ms", kafkaConfig.getSocketTimeoutMs().toString());
- properties.put("socket.receive.buffer.bytes", kafkaConfig.getSocketReceiveBufferBytes()
- .toString());
- properties.put("fetch.message.max.bytes", kafkaConfig.getFetchMessageMaxBytes()
- .toString());
+ properties.put("socket.receive.buffer.bytes", kafkaConfig.getSocketReceiveBufferBytes().toString());
+ properties.put("fetch.message.max.bytes", kafkaConfig.getFetchMessageMaxBytes().toString());
// Set auto commit to false because the persister is going to explicitly commit
properties.put("auto.commit.enable", "false");
- properties.put("queued.max.message.chunks", kafkaConfig.getQueuedMaxMessageChunks()
- .toString());
+ properties.put("queued.max.message.chunks", kafkaConfig.getQueuedMaxMessageChunks().toString());
properties.put("rebalance.max.retries", kafkaConfig.getRebalanceMaxRetries().toString());
properties.put("fetch.min.bytes", kafkaConfig.getFetchMinBytes().toString());
properties.put("fetch.wait.max.ms", kafkaConfig.getFetchWaitMaxMs().toString());
properties.put("rebalance.backoff.ms", kafkaConfig.getRebalanceBackoffMs().toString());
- properties.put("refresh.leader.backoff.ms", kafkaConfig.getRefreshLeaderBackoffMs()
- .toString());
+ properties.put("refresh.leader.backoff.ms", kafkaConfig.getRefreshLeaderBackoffMs().toString());
properties.put("auto.offset.reset", kafkaConfig.getAutoOffsetReset());
properties.put("consumer.timeout.ms", kafkaConfig.getConsumerTimeoutMs().toString());
properties.put("client.id", String.format("%s_%s", pipelineConfig.getClientId(), threadId));
- properties.put("zookeeper.session.timeout.ms", kafkaConfig
- .getZookeeperSessionTimeoutMs().toString());
- properties.put("zookeeper.connection.timeout.ms", kafkaConfig
- .getZookeeperConnectionTimeoutMs().toString());
- properties
- .put("zookeeper.sync.time.ms", kafkaConfig.getZookeeperSyncTimeMs().toString());
+ properties.put("zookeeper.session.timeout.ms",
+ kafkaConfig.getZookeeperSessionTimeoutMs().toString());
+ properties.put("zookeeper.connection.timeout.ms",
+ kafkaConfig.getZookeeperConnectionTimeoutMs().toString());
+ properties.put("zookeeper.sync.time.ms", kafkaConfig.getZookeeperSyncTimeMs().toString());
for (String key : properties.stringPropertyNames()) {
- logger.info("[{}]: " + KAFKA_CONFIGURATION + " " + key + " = " + properties.getProperty(key), threadId);
+ logger.info("[{}]: " + KAFKA_CONFIGURATION + " " + key + " = " + properties.getProperty(key),
+ threadId);
}
return properties;
diff --git a/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java b/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java
index 834a87eb..c5bb5c2a 100644
--- a/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java
+++ b/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java
@@ -1,6 +1,8 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
+ * Copyright (c) 2017 SUSE LLC.
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -38,14 +40,13 @@ public class KafkaConsumerRunnableBasic implements Runnable {
private final String threadId;
private final ManagedPipeline pipeline;
private volatile boolean stop = false;
+ private boolean active = false;
private ExecutorService executorService;
@Inject
- public KafkaConsumerRunnableBasic(
- @Assisted KafkaChannel kafkaChannel,
- @Assisted ManagedPipeline pipeline,
- @Assisted String threadId) {
+ public KafkaConsumerRunnableBasic(@Assisted KafkaChannel kafkaChannel,
+ @Assisted ManagedPipeline pipeline, @Assisted String threadId) {
this.kafkaChannel = kafkaChannel;
this.pipeline = pipeline;
@@ -67,8 +68,9 @@ public class KafkaConsumerRunnableBasic implements Runnable {
}
private void markRead() {
-
- logger.debug("[{}]: marking read", this.threadId);
+ if (logger.isDebugEnabled()) {
+ logger.debug("[{}]: marking read", this.threadId);
+ }
this.kafkaChannel.markRead();
@@ -80,12 +82,30 @@ public class KafkaConsumerRunnableBasic implements Runnable {
this.stop = true;
+ int count = 0;
+ while (active) {
+ if (count++ >= 20) {
+ break;
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ logger.error("interrupted while waiting for the run loop to stop", e);
+ break;
+ }
+ }
+
+ if (!active) {
+ this.kafkaChannel.markReadIfDirty();
+ }
}
public void run() {
logger.info("[{}]: run", this.threadId);
+ active = true;
+
final ConsumerIterator it = kafkaChannel.getKafkaStream().iterator();
logger.debug("[{}]: KafkaChannel has stream iterator", this.threadId);
@@ -121,7 +141,9 @@ public class KafkaConsumerRunnableBasic implements Runnable {
final String msg = new String(it.next().message());
- logger.debug("[{}]: {}", this.threadId, msg);
+ if (logger.isDebugEnabled()) {
+ logger.debug("[{}]: {}", this.threadId, msg);
+ }
publishEvent(msg);
@@ -149,22 +171,24 @@ public class KafkaConsumerRunnableBasic implements Runnable {
} catch (Throwable e) {
- logger.error(
- "[{}]: caught fatal exception while publishing msg. Shutting entire persister down "
- + "now!", this.threadId, e);
+ logger
+ .error("[{}]: caught fatal exception while publishing msg. Shutting entire persister down "
+ + "now!", this.threadId, e);
- logger.error("[{}]: calling shutdown on executor service", this.threadId);
- this.executorService.shutdownNow();
+ logger.error("[{}]: calling shutdown on executor service", this.threadId);
+ this.executorService.shutdownNow();
- logger.error("[{}]: shutting down system. calling system.exit(1)", this.threadId);
- System.exit(1);
+ logger.error("[{}]: shutting down system. calling system.exit(1)", this.threadId);
+ System.exit(1);
- }
+ }
}
logger.info("[{}]: calling stop on kafka channel", this.threadId);
+ active = false;
+
this.kafkaChannel.stop();
logger.debug("[{}]: exiting main run loop", this.threadId);
@@ -183,9 +207,10 @@ public class KafkaConsumerRunnableBasic implements Runnable {
private boolean isInterrupted() {
- if (Thread.currentThread().interrupted()) {
-
- logger.debug("[{}]: is interrupted. breaking out of run loop", this.threadId);
+ if (Thread.interrupted()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("[{}]: is interrupted. breaking out of run loop", this.threadId);
+ }
return true;
diff --git a/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java b/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java
index 91c4b97d..f7697607 100644
--- a/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java
+++ b/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java
@@ -1,6 +1,8 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
+ * Copyright (c) 2017 SUSE LLC
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -51,28 +53,18 @@ public abstract class FlushableHandler {
protected final String handlerName;
- protected FlushableHandler(
- PipelineConfig configuration,
- Environment environment,
- String threadId,
+ protected FlushableHandler(PipelineConfig configuration, Environment environment, String threadId,
int batchSize) {
this.threadId = threadId;
- this.handlerName =
- String.format(
- "%s[%s]",
- this.getClass().getName(),
- threadId);
+ this.handlerName = String.format("%s[%s]", this.getClass().getName(), threadId);
- this.processedMeter =
- environment.metrics().meter(handlerName + "." + "events-processed-meter");
+ this.processedMeter = environment.metrics().meter(handlerName + "." + "events-processed-meter");
- this.flushMeter =
- environment.metrics().meter(handlerName + "." + "flush-meter");
+ this.flushMeter = environment.metrics().meter(handlerName + "." + "flush-meter");
- this.flushTimer =
- environment.metrics().timer(handlerName + "." + "flush-timer");
+ this.flushTimer = environment.metrics().timer(handlerName + "." + "flush-timer");
this.secondsBetweenFlushes = configuration.getMaxBatchTime();
@@ -102,7 +94,7 @@ public abstract class FlushableHandler {
} else {
- return false;
+ return false;
}
}
@@ -126,20 +118,26 @@ public abstract class FlushableHandler {
private boolean isBatchSize() {
- logger.debug("[{}]: checking batch size", this.threadId);
+ if (logger.isDebugEnabled()) {
+
+ logger.debug("[{}]: checking batch size", this.threadId);
+
+ }
if (this.msgCount >= this.batchSize) {
- logger.debug("[{}]: batch sized {} attained", this.threadId, this.batchSize);
+ if (logger.isDebugEnabled()) {
+ logger.debug("[{}]: batch sized {} attained", this.threadId, this.batchSize);
+ }
return true;
} else {
- logger.debug("[{}]: batch size now at {}, batch size {} not attained",
- this.threadId,
- this.msgCount,
- this.batchSize);
+ if (logger.isDebugEnabled()) {
+ logger.debug("[{}]: batch size now at {}, batch size {} not attained", this.threadId,
+ this.msgCount, this.batchSize);
+ }
return false;
@@ -147,28 +145,26 @@ public abstract class FlushableHandler {
}
private boolean isFlushTime() {
-
- logger.debug("[{}]: got heartbeat message, checking flush time. flush every {} seconds.",
- this.threadId,
- this.secondsBetweenFlushes);
+ if (logger.isDebugEnabled()) {
+ logger.debug("[{}]: got heartbeat message, checking flush time. flush every {} seconds.",
+ this.threadId, this.secondsBetweenFlushes);
+ }
long now = System.currentTimeMillis();
- if (this.flushTimeMillis <= now ) {
-
- logger.debug(
- "[{}]: {} ms past flush time. flushing to repository now.",
- this.threadId,
- now - this.flushTimeMillis);
+ if (this.flushTimeMillis <= now) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("[{}]: {} ms past flush time. flushing to repository now.", this.threadId,
+ now - this.flushTimeMillis);
+ }
return true;
} else {
-
- logger.debug(
- "[{}]: {} ms to next flush time. no need to flush at this time.",
- this.threadId,
- this.flushTimeMillis - now);
+ if (logger.isDebugEnabled()) {
+ logger.debug("[{}]: {} ms to next flush time. no need to flush at this time.", this.threadId,
+ this.flushTimeMillis - now);
+ }
return false;
@@ -176,8 +172,9 @@ public abstract class FlushableHandler {
}
public int flush() throws RepoException {
-
- logger.debug("[{}]: flushing", this.threadId);
+ if (logger.isDebugEnabled()) {
+ logger.debug("[{}]: flushing", this.threadId);
+ }
Timer.Context context = this.flushTimer.time();
@@ -185,13 +182,15 @@ public abstract class FlushableHandler {
context.stop();
- this.flushMeter.mark();
+ this.flushMeter.mark(msgFlushCnt);
this.flushTimeMillis = System.currentTimeMillis() + this.millisBetweenFlushes;
- logger.debug("[{}]: flushed {} msg", this.threadId, msgFlushCnt);
+ if (logger.isDebugEnabled()) {
+ logger.debug("[{}]: flushed {} msg", this.threadId, msgFlushCnt);
+ }
- this.msgCount = 0;
+ this.msgCount -= msgFlushCnt;
this.batchCount++;
diff --git a/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java b/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java
index 76ae2697..75fe2f52 100644
--- a/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java
+++ b/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java
@@ -1,6 +1,8 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
- *
+ *
+ * Copyright (c) 2017 SUSE LLC
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -38,28 +40,23 @@ import monasca.persister.repository.RepoException;
public class MetricHandler extends FlushableHandler {
- private static final Logger logger =
- LoggerFactory.getLogger(MetricHandler.class);
+ private static final Logger logger = LoggerFactory.getLogger(MetricHandler.class);
private final Repo metricRepo;
private final Counter metricCounter;
@Inject
- public MetricHandler(
- Repo metricRepo,
- Environment environment,
- @Assisted PipelineConfig configuration,
- @Assisted("threadId") String threadId,
+ public MetricHandler(Repo metricRepo, Environment environment,
+ @Assisted PipelineConfig configuration, @Assisted("threadId") String threadId,
@Assisted("batchSize") int batchSize) {
super(configuration, environment, threadId, batchSize);
this.metricRepo = metricRepo;
- this.metricCounter =
- environment.metrics()
- .counter(this.handlerName + "." + "metrics-added-to-batch-counter");
+ this.metricCounter = environment.metrics()
+ .counter(this.handlerName + "." + "metrics-added-to-batch-counter");
}
@@ -89,12 +86,10 @@ public class MetricHandler extends FlushableHandler {
}
private void processEnvelope(MetricEnvelope metricEnvelope) {
-
- logger.debug("[{}]: [{}:{}] {}",
- this.threadId,
- this.getBatchCount(),
- this.getMsgCount(),
- metricEnvelope);
+ if (logger.isDebugEnabled()) {
+ logger.debug("[{}]: [{}:{}] {}", this.threadId, this.getBatchCount(), this.getMsgCount(),
+ metricEnvelope);
+ }
this.metricRepo.addToBatch(metricEnvelope, this.threadId);
@@ -109,8 +104,8 @@ public class MetricHandler extends FlushableHandler {
this.objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY);
- this.objectMapper.setPropertyNamingStrategy(
- PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
+ this.objectMapper
+ .setPropertyNamingStrategy(PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
}
diff --git a/java/src/main/java/monasca/persister/repository/vertica/Sha1HashId.java b/java/src/main/java/monasca/persister/repository/Sha1HashId.java
similarity index 76%
rename from java/src/main/java/monasca/persister/repository/vertica/Sha1HashId.java
rename to java/src/main/java/monasca/persister/repository/Sha1HashId.java
index 899b073b..74c8ecba 100644
--- a/java/src/main/java/monasca/persister/repository/vertica/Sha1HashId.java
+++ b/java/src/main/java/monasca/persister/repository/Sha1HashId.java
@@ -1,63 +1,73 @@
-/*
- * Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package monasca.persister.repository.vertica;
-
-import org.apache.commons.codec.binary.Hex;
-
-import java.util.Arrays;
-
-public class Sha1HashId {
- private final byte[] sha1Hash;
-
- public Sha1HashId(byte[] sha1Hash) {
- this.sha1Hash = sha1Hash;
- }
-
- @Override
- public String toString() {
- return "Sha1HashId{" + "sha1Hash=" + Hex.encodeHexString(sha1Hash) + "}";
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o)
- return true;
- if (!(o instanceof Sha1HashId))
- return false;
-
- Sha1HashId that = (Sha1HashId) o;
-
- if (!Arrays.equals(sha1Hash, that.sha1Hash))
- return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- return Arrays.hashCode(sha1Hash);
- }
-
- public byte[] getSha1Hash() {
- return sha1Hash;
- }
-
- public String toHexString() {
- return Hex.encodeHexString(sha1Hash);
- }
-}
+/*
+ * Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
+ *
+ * Copyright (c) 2017 SUSE LLC.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package monasca.persister.repository;
+
+import org.apache.commons.codec.binary.Hex;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+public class Sha1HashId {
+ private final byte[] sha1Hash;
+
+ private final String hex;
+
+ public Sha1HashId(byte[] sha1Hash) {
+ this.sha1Hash = sha1Hash;
+ hex = Hex.encodeHexString(sha1Hash);
+ }
+
+ @Override
+ public String toString() {
+ return "Sha1HashId{" + "sha1Hash=" + hex + "}";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (!(o instanceof Sha1HashId))
+ return false;
+
+ Sha1HashId that = (Sha1HashId) o;
+
+ if (!Arrays.equals(sha1Hash, that.sha1Hash))
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(sha1Hash);
+ }
+
+ public byte[] getSha1Hash() {
+ return sha1Hash;
+ }
+
+ public ByteBuffer getSha1HashByteBuffer() {
+ return ByteBuffer.wrap(sha1Hash);
+ }
+
+ public String toHexString() {
+ return hex;
+ }
+}
diff --git a/java/src/main/java/monasca/persister/repository/cassandra/CassandraAlarmRepo.java b/java/src/main/java/monasca/persister/repository/cassandra/CassandraAlarmRepo.java
new file mode 100644
index 00000000..96323099
--- /dev/null
+++ b/java/src/main/java/monasca/persister/repository/cassandra/CassandraAlarmRepo.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright (c) 2017 SUSE LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package monasca.persister.repository.cassandra;
+
+import java.security.NoSuchAlgorithmException;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import javax.inject.Inject;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategy;
+
+import io.dropwizard.setup.Environment;
+import monasca.common.model.event.AlarmStateTransitionedEvent;
+import monasca.persister.configuration.PersisterConfig;
+import monasca.persister.repository.Repo;
+import monasca.persister.repository.RepoException;
+
+/**
+ * This class is not thread safe.
+ *
+ */
+public class CassandraAlarmRepo extends CassandraRepo implements Repo {
+
+ private static final Logger logger = LoggerFactory.getLogger(CassandraAlarmRepo.class);
+
+ private static String EMPTY_REASON_DATA = "{}";
+
+ private static final int MAX_BYTES_PER_CHAR = 4;
+ private static final int MAX_LENGTH_VARCHAR = 65000;
+
+ private int retention;
+
+ private ObjectMapper objectMapper = new ObjectMapper();
+
+ @Inject
+ public CassandraAlarmRepo(CassandraCluster cluster, PersisterConfig config, Environment environment)
+ throws NoSuchAlgorithmException, SQLException {
+ super(cluster, environment, config.getCassandraDbConfiguration().getMaxWriteRetries(),
+ config.getAlarmHistoryConfiguration().getBatchSize());
+
+ this.retention = config.getCassandraDbConfiguration().getRetentionPolicy() * 24 * 3600;
+
+ logger.debug("Instantiating " + this.getClass().getName());
+
+ this.objectMapper
+ .setPropertyNamingStrategy(PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
+
+ session = cluster.getAlarmsSession();
+
+ logger.debug(this.getClass().getName() + " is fully instantiated");
+
+ }
+
+ public void addToBatch(AlarmStateTransitionedEvent message, String id) {
+
+ String metricsString = getSerializedString(message.metrics, id);
+
+ // Validate metricsString does not exceed a sufficient maximum upper bound
+ if (metricsString.length() * MAX_BYTES_PER_CHAR >= MAX_LENGTH_VARCHAR) {
+ metricsString = "[]";
+ logger.warn("length of metricsString for alarm ID {} exceeds max length of {}", message.alarmId,
+ MAX_LENGTH_VARCHAR);
+ }
+
+ String subAlarmsString = getSerializedString(message.subAlarms, id);
+
+ if (subAlarmsString.length() * MAX_BYTES_PER_CHAR >= MAX_LENGTH_VARCHAR) {
+ subAlarmsString = "[]";
+ logger.warn("length of subAlarmsString for alarm ID {} exceeds max length of {}", message.alarmId,
+ MAX_LENGTH_VARCHAR);
+ }
+
+ queue.offerLast(cluster.getAlarmHistoryInsertStmt().bind(retention, metricsString, message.oldState.name(),
+ message.newState.name(), subAlarmsString, message.stateChangeReason, EMPTY_REASON_DATA,
+ message.tenantId, message.alarmId, new Timestamp(message.timestamp)));
+ }
+
+ private String getSerializedString(Object o, String id) {
+
+ try {
+ return this.objectMapper.writeValueAsString(o);
+ } catch (JsonProcessingException e) {
+ logger.error("[[}]: failed to serialize object {}", id, o, e);
+ return "";
+ }
+ }
+
+ @Override
+ public int flush(String id) throws RepoException {
+ return handleFlush(id);
+ }
+}
diff --git a/java/src/main/java/monasca/persister/repository/cassandra/CassandraCluster.java b/java/src/main/java/monasca/persister/repository/cassandra/CassandraCluster.java
new file mode 100644
index 00000000..ac20b098
--- /dev/null
+++ b/java/src/main/java/monasca/persister/repository/cassandra/CassandraCluster.java
@@ -0,0 +1,427 @@
+/*
+ * Copyright (c) 2017 SUSE LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package monasca.persister.repository.cassandra;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Cluster.Builder;
+import com.datastax.driver.core.CodecRegistry;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PlainTextAuthProvider;
+import com.datastax.driver.core.PoolingOptions;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ProtocolOptions;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SocketOptions;
+import com.datastax.driver.core.TokenRange;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.TokenAwarePolicy;
+import com.datastax.driver.core.utils.Bytes;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.inject.Inject;
+
+import monasca.common.configuration.CassandraDbConfiguration;
+import monasca.persister.configuration.PersisterConfig;
+
+public class CassandraCluster {
+
+ private static final Logger logger = LoggerFactory.getLogger(CassandraCluster.class);
+
+ private static final String MEASUREMENT_INSERT_CQL = "update monasca.measurements USING TTL ? "
+ + "set value = ?, value_meta = ?, region = ?, tenant_id = ?, metric_name = ?, dimensions = ? "
+ + "where metric_id = ? and time_stamp = ?";
+
+ private static final String MEASUREMENT_UPDATE_CQL = "update monasca.measurements USING TTL ? "
+ + "set value = ?, value_meta = ? " + "where metric_id = ? and time_stamp = ?";
+
+ private static final String METRICS_INSERT_CQL = "update monasca.metrics USING TTL ? "
+ + "set metric_id = ?, created_at = ?, updated_at = ? "
+ + "where region = ? and tenant_id = ? and metric_name = ? and dimensions = ? and dimension_names = ?";
+
+ private static final String METRICS_UPDATE_CQL = "update monasca.metrics USING TTL ? "
+ + "set updated_at = ? "
+ + "where region = ? and tenant_id = ? and metric_name = ? and dimensions = ? and dimension_names = ?";
+
+ private static final String DIMENSION_INSERT_CQL = "insert into monasca.dimensions "
+ + "(region, tenant_id, name, value) values (?, ?, ?, ?)";
+
+ private static final String DIMENSION_METRIC_INSERT_CQL = "insert into monasca.dimensions_metrics "
+ + " (region, tenant_id, dimension_name, dimension_value, metric_name) values (?, ?, ?, ?, ?)";
+
+ private static final String METRIC_DIMENSION_INSERT_CQL = "insert into monasca.metrics_dimensions "
+ + " (region, tenant_id, metric_name, dimension_name, dimension_value) values (?, ?, ?, ?, ?)";
+
+ private static final String INSERT_ALARM_STATE_HISTORY_SQL = "update monasca.alarm_state_history USING TTL ? "
+ + " set metric = ?, old_state = ?, new_state = ?, sub_alarms = ?, reason = ?, reason_data = ?"
+ + " where tenant_id = ? and alarm_id = ? and time_stamp = ?";
+
+ private static final String RETRIEVE_METRIC_DIMENSION_CQL = "select region, tenant_id, metric_name, "
+ + "dimension_name, dimension_value from metrics_dimensions "
+ + "WHERE token(region, tenant_id, metric_name) > ? and token(region, tenant_id, metric_name) <= ? ";
+
+ private static final String RETRIEVE_METRIC_ID_CQL = "select distinct metric_id from measurements WHERE token(metric_id) > ? and token(metric_id) <= ?";
+
+ private static final String RETRIEVE_DIMENSION_CQL = "select region, tenant_id, name, value from dimensions";
+
+ private static final String NAME = "name";
+ private static final String VALUE = "value";
+ private static final String METRIC_ID = "metric_id";
+ private static final String TENANT_ID_COLUMN = "tenant_id";
+ private static final String METRIC_NAME = "metric_name";
+ private static final String DIMENSION_NAME = "dimension_name";
+ private static final String DIMENSION_VALUE = "dimension_value";
+ private static final String REGION = "region";
+
+ private CassandraDbConfiguration dbConfig;
+ private Cluster cluster;
+ private Session metricsSession;
+ private Session alarmsSession;
+
+ private TokenAwarePolicy lbPolicy;
+
+ private PreparedStatement measurementInsertStmt;
+ private PreparedStatement measurementUpdateStmt;
+ private PreparedStatement metricInsertStmt;
+ private PreparedStatement metricUpdateStmt;
+ private PreparedStatement dimensionStmt;
+ private PreparedStatement dimensionMetricStmt;
+ private PreparedStatement metricDimensionStmt;
+
+ private PreparedStatement retrieveMetricDimensionStmt;
+ private PreparedStatement retrieveMetricIdStmt;
+
+ private PreparedStatement alarmHistoryInsertStmt;
+
+ public Cache getMetricIdCache() {
+ return metricIdCache;
+ }
+
+ public Cache getDimensionCache() {
+ return dimensionCache;
+ }
+
+ public Cache getMetricDimensionCache() {
+ return metricDimensionCache;
+ }
+
+ private final Cache metricIdCache;
+
+ private final Cache dimensionCache;
+
+ private final Cache metricDimensionCache;
+
+ @Inject
+ public CassandraCluster(final PersisterConfig config) {
+
+ this.dbConfig = config.getCassandraDbConfiguration();
+
+ QueryOptions qo = new QueryOptions();
+ qo.setConsistencyLevel(ConsistencyLevel.valueOf(dbConfig.getConsistencyLevel()));
+ qo.setDefaultIdempotence(true);
+
+ String[] contactPoints = dbConfig.getContactPoints();
+ int retries = dbConfig.getMaxWriteRetries();
+ Builder builder = Cluster.builder().addContactPoints(contactPoints).withPort(dbConfig.getPort());
+ builder
+ .withSocketOptions(new SocketOptions().setConnectTimeoutMillis(dbConfig.getConnectionTimeout())
+ .setReadTimeoutMillis(dbConfig.getReadTimeout()));
+ builder.withQueryOptions(qo).withRetryPolicy(new MonascaRetryPolicy(retries, retries, retries));
+
+ lbPolicy = new TokenAwarePolicy(
+ DCAwareRoundRobinPolicy.builder().withLocalDc(dbConfig.getLocalDataCenter()).build());
+ builder.withLoadBalancingPolicy(lbPolicy);
+
+ String user = dbConfig.getUser();
+ if (user != null && !user.isEmpty()) {
+ builder.withAuthProvider(new PlainTextAuthProvider(dbConfig.getUser(), dbConfig.getPassword()));
+ }
+ cluster = builder.build();
+
+ PoolingOptions poolingOptions = cluster.getConfiguration().getPoolingOptions();
+
+ poolingOptions.setConnectionsPerHost(HostDistance.LOCAL, dbConfig.getMaxConnections(),
+ dbConfig.getMaxConnections()).setConnectionsPerHost(HostDistance.REMOTE,
+ dbConfig.getMaxConnections(), dbConfig.getMaxConnections());
+
+ poolingOptions.setMaxRequestsPerConnection(HostDistance.LOCAL, dbConfig.getMaxRequests())
+ .setMaxRequestsPerConnection(HostDistance.REMOTE, dbConfig.getMaxRequests());
+
+ metricsSession = cluster.connect(dbConfig.getKeySpace());
+
+ measurementInsertStmt = metricsSession.prepare(MEASUREMENT_INSERT_CQL).setIdempotent(true);
+ measurementUpdateStmt = metricsSession.prepare(MEASUREMENT_UPDATE_CQL).setIdempotent(true);
+ metricInsertStmt = metricsSession.prepare(METRICS_INSERT_CQL).setIdempotent(true);
+ metricUpdateStmt = metricsSession.prepare(METRICS_UPDATE_CQL).setIdempotent(true);
+ dimensionStmt = metricsSession.prepare(DIMENSION_INSERT_CQL).setIdempotent(true);
+ dimensionMetricStmt = metricsSession.prepare(DIMENSION_METRIC_INSERT_CQL).setIdempotent(true);
+ metricDimensionStmt = metricsSession.prepare(METRIC_DIMENSION_INSERT_CQL).setIdempotent(true);
+
+ retrieveMetricIdStmt = metricsSession.prepare(RETRIEVE_METRIC_ID_CQL).setIdempotent(true);
+ retrieveMetricDimensionStmt = metricsSession.prepare(RETRIEVE_METRIC_DIMENSION_CQL)
+ .setIdempotent(true);
+
+ alarmsSession = cluster.connect(dbConfig.getKeySpace());
+
+ alarmHistoryInsertStmt = alarmsSession.prepare(INSERT_ALARM_STATE_HISTORY_SQL).setIdempotent(true);
+
+ metricIdCache = CacheBuilder.newBuilder()
+ .maximumSize(config.getCassandraDbConfiguration().getDefinitionMaxCacheSize()).build();
+
+ dimensionCache = CacheBuilder.newBuilder()
+ .maximumSize(config.getCassandraDbConfiguration().getDefinitionMaxCacheSize()).build();
+
+ metricDimensionCache = CacheBuilder.newBuilder()
+ .maximumSize(config.getCassandraDbConfiguration().getDefinitionMaxCacheSize()).build();
+
+ logger.info("loading cached definitions from db");
+
+ ExecutorService executor = Executors.newFixedThreadPool(250);
+
+ //a majority of the ids are for metrics not actively receiving msgs anymore
+ //loadMetricIdCache(executor);
+
+ loadDimensionCache();
+
+ loadMetricDimensionCache(executor);
+
+ executor.shutdown();
+ }
+
+ public Session getMetricsSession() {
+ return metricsSession;
+ }
+
+ public Session getAlarmsSession() {
+ return alarmsSession;
+ }
+
+ public PreparedStatement getMeasurementInsertStmt() {
+ return measurementInsertStmt;
+ }
+
+ public PreparedStatement getMeasurementUpdateStmt() {
+ return measurementUpdateStmt;
+ }
+
+ public PreparedStatement getMetricInsertStmt() {
+ return metricInsertStmt;
+ }
+
+ public PreparedStatement getMetricUpdateStmt() {
+ return metricUpdateStmt;
+ }
+
+ public PreparedStatement getDimensionStmt() {
+ return dimensionStmt;
+ }
+
+ public PreparedStatement getDimensionMetricStmt() {
+ return dimensionMetricStmt;
+ }
+
+ public PreparedStatement getMetricDimensionStmt() {
+ return metricDimensionStmt;
+ }
+
+ public PreparedStatement getAlarmHistoryInsertStmt() {
+ return alarmHistoryInsertStmt;
+ }
+
+ public ProtocolOptions getProtocolOptions() {
+ return cluster.getConfiguration().getProtocolOptions();
+ }
+
+ public CodecRegistry getCodecRegistry() {
+ return cluster.getConfiguration().getCodecRegistry();
+ }
+
+ public Metadata getMetaData() {
+ return cluster.getMetadata();
+ }
+
+ public TokenAwarePolicy getLoadBalancePolicy() {
+ return lbPolicy;
+ }
+
+ private void loadMetricIdCache(ExecutorService executor) {
+ final AtomicInteger tasks = new AtomicInteger(0);
+ logger.info("Found token ranges: " + cluster.getMetadata().getTokenRanges().size());
+ for (TokenRange range : cluster.getMetadata().getTokenRanges()) {
+ List queries = rangeQuery(retrieveMetricIdStmt, range);
+ for (BoundStatement query : queries) {
+ tasks.incrementAndGet();
+ logger.info("adding a metric id reading task, total: " + tasks.get());
+
+ ResultSetFuture future = metricsSession.executeAsync(query);
+
+ Futures.addCallback(future, new FutureCallback() {
+ @Override
+ public void onSuccess(ResultSet result) {
+ for (Row row : result) {
+ String id = Bytes.toHexString(row.getBytes(METRIC_ID));
+ if (id != null) {
+ //remove '0x'
+ metricIdCache.put(id.substring(2), Boolean.TRUE);
+ }
+ }
+
+ tasks.decrementAndGet();
+
+ logger.info("completed a metric id read task. Remaining tasks: " + tasks.get());
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ logger.error("Failed to execute query to load metric id cache.", t);
+
+ tasks.decrementAndGet();
+
+ logger.info("Failed a metric id read task. Remaining tasks: " + tasks.get());
+ }
+ }, executor);
+
+ }
+ }
+
+ while (tasks.get() > 0) {
+ logger.debug("waiting for more metric id load tasks: " + tasks.get());
+
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ logger.warn("load metric cache was interrupted", e);
+ }
+ }
+
+ logger.info("loaded metric id cache from database: " + metricIdCache.size());
+ }
+
+ private List rangeQuery(PreparedStatement rangeStmt, TokenRange range) {
+ List res = Lists.newArrayList();
+ for (TokenRange subRange : range.unwrap()) {
+ res.add(rangeStmt.bind(subRange.getStart(), subRange.getEnd()));
+ }
+ return res;
+ }
+
+ private void loadDimensionCache() {
+
+ ResultSet results = metricsSession.execute(RETRIEVE_DIMENSION_CQL);
+
+ for (Row row : results) {
+ String key = getDimnesionEntryKey(row.getString(REGION), row.getString(TENANT_ID_COLUMN),
+ row.getString(NAME), row.getString(VALUE));
+ dimensionCache.put(key, Boolean.TRUE);
+ }
+
+ logger.info("loaded dimension cache from database: " + dimensionCache.size());
+ }
+
+ public String getDimnesionEntryKey(String region, String tenantId, String name, String value) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(region).append('\0');
+ sb.append(tenantId).append('\0');
+ sb.append(name).append('\0');
+ sb.append(value);
+ return sb.toString();
+ }
+
+ private void loadMetricDimensionCache(ExecutorService executor) {
+
+ final AtomicInteger tasks = new AtomicInteger(0);
+
+ for (TokenRange range : cluster.getMetadata().getTokenRanges()) {
+ List queries = rangeQuery(retrieveMetricDimensionStmt, range);
+ for (BoundStatement query : queries) {
+ tasks.incrementAndGet();
+
+ logger.info("Adding a metric dimnesion read task, total: " + tasks.get());
+
+ ResultSetFuture future = metricsSession.executeAsync(query);
+
+ Futures.addCallback(future, new FutureCallback() {
+ @Override
+ public void onSuccess(ResultSet result) {
+ for (Row row : result) {
+ String key = getMetricDimnesionEntryKey(row.getString(REGION),
+ row.getString(TENANT_ID_COLUMN), row.getString(METRIC_NAME),
+ row.getString(DIMENSION_NAME), row.getString(DIMENSION_VALUE));
+ metricDimensionCache.put(key, Boolean.TRUE);
+ }
+
+ tasks.decrementAndGet();
+
+ logger.info("Completed a metric dimension read task. Remaining tasks: " + tasks.get());
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ logger.error("Failed to execute query to load metric id cache.", t);
+
+ tasks.decrementAndGet();
+
+ logger.info("Failed a metric dimension read task. Remaining tasks: " + tasks.get());
+ }
+ }, executor);
+
+ }
+ }
+
+ while (tasks.get() > 0) {
+
+ logger.debug("waiting for metric dimension cache to load ...");
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ logger.warn("load metric dimension cache was interrupted", e);
+ }
+ }
+
+ logger.info("loaded metric dimension cache from database: " + metricDimensionCache.size());
+ }
+
+ public String getMetricDimnesionEntryKey(String region, String tenantId, String metricName,
+ String dimensionName, String dimensionValue) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(region).append('\0');
+ sb.append(tenantId).append('\0');
+ sb.append(metricName).append('\0');
+ sb.append(dimensionName).append('\0');
+ sb.append(dimensionValue);
+ return sb.toString();
+ }
+}
diff --git a/java/src/main/java/monasca/persister/repository/cassandra/CassandraMetricBatch.java b/java/src/main/java/monasca/persister/repository/cassandra/CassandraMetricBatch.java
new file mode 100644
index 00000000..4a275676
--- /dev/null
+++ b/java/src/main/java/monasca/persister/repository/cassandra/CassandraMetricBatch.java
@@ -0,0 +1,207 @@
+/*
+ * Copyright (c) 2017 SUSE LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package monasca.persister.repository.cassandra;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.BatchStatement.Type;
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.CodecRegistry;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.ProtocolOptions;
+import com.datastax.driver.core.Token;
+import com.datastax.driver.core.policies.TokenAwarePolicy;
+
+public class CassandraMetricBatch {
+ private static Logger logger = LoggerFactory.getLogger(CassandraMetricBatch.class);
+
+ ProtocolOptions protocol;
+ CodecRegistry codec;
+ Metadata metadata;
+ TokenAwarePolicy policy;
+ int batchLimit;
+
+ Map> metricQueries;
+ Map> dimensionQueries;
+ Map> dimensionMetricQueries;
+ Map> metricDimensionQueries;
+ Map, Deque> measurementQueries;
+
+ public CassandraMetricBatch(Metadata metadata, ProtocolOptions protocol, CodecRegistry codec,
+ TokenAwarePolicy lbPolicy, int batchLimit) {
+ this.protocol = protocol;
+ this.codec = codec;
+ this.metadata = metadata;
+ this.policy = lbPolicy;
+ metricQueries = new HashMap<>();
+ this.batchLimit = batchLimit;
+
+ metricQueries = new HashMap<>();
+ dimensionQueries = new HashMap<>();
+ dimensionMetricQueries = new HashMap<>();
+ metricDimensionQueries = new HashMap<>();
+ measurementQueries = new HashMap<>();
+ }
+
+ public void addMetricQuery(BoundStatement s) {
+ batchQueryByToken(s, metricQueries);
+ }
+
+ public void addDimensionQuery(BoundStatement s) {
+ batchQueryByToken(s, dimensionQueries);
+ }
+
+ public void addDimensionMetricQuery(BoundStatement s) {
+ batchQueryByToken(s, dimensionMetricQueries);
+ }
+
+ public void addMetricDimensionQuery(BoundStatement s) {
+ batchQueryByToken(s, metricDimensionQueries);
+ }
+
+ public void addMeasurementQuery(BoundStatement s) {
+ batchQueryByReplica(s, measurementQueries);
+ }
+
+ private void batchQueryByToken(BoundStatement s, Map> batchedQueries) {
+ ByteBuffer b = s.getRoutingKey(protocol.getProtocolVersion(), codec);
+ Token token = metadata.newToken(b);
+ Deque queue = batchedQueries.get(token);
+ if (queue == null) {
+ queue = new ArrayDeque();
+ BatchStatement bs = new BatchStatement(Type.UNLOGGED);
+ bs.add(s);
+ queue.offer(bs);
+ batchedQueries.put(token, queue);
+ } else {
+ BatchStatement bs = queue.getLast();
+ if (bs.size() < batchLimit) {
+ bs.add(s);
+ } else {
+ bs = new BatchStatement(Type.UNLOGGED);
+ bs.add(s);
+ queue.offerLast(bs);
+ }
+ }
+ }
+
+ private void batchQueryByReplica(BoundStatement s,
+ Map, Deque> batchedQueries) {
+ Iterator it = policy.newQueryPlan(s.getKeyspace(), s);
+ Set hosts = new HashSet<>();
+
+ while (it.hasNext()) {
+ hosts.add(it.next());
+ }
+
+ Deque queue = batchedQueries.get(hosts);
+ if (queue == null) {
+ queue = new ArrayDeque();
+ BatchStatement bs = new BatchStatement(Type.UNLOGGED);
+ bs.add(s);
+ queue.offer(bs);
+ batchedQueries.put(hosts, queue);
+ } else {
+ BatchStatement bs = queue.getLast();
+ if (bs.size() < 30) {
+ bs.add(s);
+ } else {
+ bs = new BatchStatement(Type.UNLOGGED);
+ bs.add(s);
+ queue.offerLast(bs);
+ }
+ }
+ }
+
+ public void clear() {
+ metricQueries.clear();
+ dimensionQueries.clear();
+ dimensionMetricQueries.clear();
+ metricDimensionQueries.clear();
+ measurementQueries.clear();
+ }
+
+ public List> getAllBatches() {
+ logTokenBatchMap("metric batches", metricQueries);
+ logTokenBatchMap("dimension batches", dimensionQueries);
+ logTokenBatchMap("dimension metric batches", dimensionMetricQueries);
+ logTokenBatchMap("metric dimension batches", metricDimensionQueries);
+ logReplicaBatchMap("measurement batches", measurementQueries);
+
+ ArrayList> list = new ArrayList<>();
+ list.addAll(metricQueries.values());
+ list.addAll(dimensionQueries.values());
+ list.addAll(dimensionMetricQueries.values());
+ list.addAll(metricDimensionQueries.values());
+ list.addAll(measurementQueries.values());
+ return list;
+ }
+
+ private void logTokenBatchMap(String name, Map> map) {
+ if (logger.isDebugEnabled()) {
+ StringBuilder sb = new StringBuilder(name);
+ sb.append(": Size: ").append(map.size());
+ sb.append("; Tokens: |");
+ for (Entry> entry : map.entrySet()) {
+ sb.append(entry.getKey().toString()).append(":");
+ for (BatchStatement bs : entry.getValue()) {
+ sb.append(bs.size()).append(",");
+ }
+ sb.append("|.");
+ }
+
+ logger.debug(sb.toString());
+ }
+ }
+
+ private void logReplicaBatchMap(String name, Map, Deque> map) {
+ if (logger.isDebugEnabled()) {
+ StringBuilder sb = new StringBuilder(name);
+ sb.append(": Size: ").append(map.size());
+ sb.append(". Replicas: |");
+ for (Entry, Deque> entry : map.entrySet()) {
+ for (Host host : entry.getKey()) {
+ sb.append(host.getAddress().toString()).append(",");
+ }
+ sb.append(":");
+ for (BatchStatement bs : entry.getValue()) {
+ sb.append(bs.size()).append(",");
+ }
+
+ sb.append("|");
+
+ }
+ logger.debug(sb.toString());
+ }
+ }
+}
diff --git a/java/src/main/java/monasca/persister/repository/cassandra/CassandraMetricRepo.java b/java/src/main/java/monasca/persister/repository/cassandra/CassandraMetricRepo.java
new file mode 100644
index 00000000..d096e784
--- /dev/null
+++ b/java/src/main/java/monasca/persister/repository/cassandra/CassandraMetricRepo.java
@@ -0,0 +1,336 @@
+/*
+ * Copyright (c) 2017 SUSE LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package monasca.persister.repository.cassandra;
+
+import java.security.NoSuchAlgorithmException;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Meter;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import io.dropwizard.setup.Environment;
+import monasca.common.model.metric.Metric;
+import monasca.common.model.metric.MetricEnvelope;
+import monasca.persister.configuration.PersisterConfig;
+import monasca.persister.repository.Repo;
+import monasca.persister.repository.RepoException;
+import monasca.persister.repository.Sha1HashId;
+
+public class CassandraMetricRepo extends CassandraRepo implements Repo {
+
+ private static final Logger logger = LoggerFactory.getLogger(CassandraMetricRepo.class);
+
+ public static final int MAX_COLUMN_LENGTH = 255;
+ public static final int MAX_VALUE_META_LENGTH = 2048;
+
+ private static final String TENANT_ID = "tenantId";
+ private static final String REGION = "region";
+ private static final String EMPTY_STR = "";
+
+ private int retention;
+
+ private CassandraMetricBatch batches;
+
+ private int metricCount;
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ public final Meter measurementMeter;
+ public final Meter metricCacheMissMeter;
+ public final Meter metricCacheHitMeter;
+ public final Meter dimensionCacheMissMeter;
+ public final Meter dimensionCacheHitMeter;
+ public final Meter metricDimensionCacheMissMeter;
+ public final Meter metricDimensionCacheHitMeter;
+
+ @Inject
+ public CassandraMetricRepo(CassandraCluster cluster, PersisterConfig config, Environment environment)
+ throws NoSuchAlgorithmException, SQLException {
+
+ super(cluster, environment, config.getCassandraDbConfiguration().getMaxWriteRetries(),
+ config.getMetricConfiguration().getBatchSize());
+
+ logger.debug("Instantiating " + this.getClass().getName());
+
+ this.retention = config.getCassandraDbConfiguration().getRetentionPolicy() * 24 * 3600;
+
+ this.measurementMeter = this.environment.metrics()
+ .meter(this.getClass().getName() + "." + "measurement-meter");
+
+ this.metricCacheMissMeter = this.environment.metrics()
+ .meter(this.getClass().getName() + "." + "definition-cache-miss-meter");
+
+ this.metricCacheHitMeter = this.environment.metrics()
+ .meter(this.getClass().getName() + "." + "definition-cache-hit-meter");
+
+ this.dimensionCacheMissMeter = this.environment.metrics()
+ .meter(this.getClass().getName() + "." + "dimension-cache-miss-meter");
+
+ this.dimensionCacheHitMeter = this.environment.metrics()
+ .meter(this.getClass().getName() + "." + "dimension-cache-hit-meter");
+
+ this.metricDimensionCacheMissMeter = this.environment.metrics()
+ .meter(this.getClass().getName() + "." + "metric-dimension-cache-miss-meter");
+
+ this.metricDimensionCacheHitMeter = this.environment.metrics()
+ .meter(this.getClass().getName() + "." + "metric-dimension-cache-hit-meter");
+
+ session = cluster.getMetricsSession();
+
+ metricCount = 0;
+
+ batches = new CassandraMetricBatch(cluster.getMetaData(), cluster.getProtocolOptions(),
+ cluster.getCodecRegistry(), cluster.getLoadBalancePolicy(),
+ config.getCassandraDbConfiguration().getMaxBatches());
+
+
+
+ logger.debug(this.getClass().getName() + " is fully instantiated");
+ }
+
+ @Override
+ public void addToBatch(MetricEnvelope metricEnvelope, String id) {
+ Metric metric = metricEnvelope.metric;
+ Map metaMap = metricEnvelope.meta;
+
+ String tenantId = getMeta(TENANT_ID, metric, metaMap, id);
+ String region = getMeta(REGION, metric, metaMap, id);
+ String metricName = metric.getName();
+ TreeMap dimensions = metric.getDimensions() == null ? new TreeMap()
+ : new TreeMap<>(metric.getDimensions());
+
+ StringBuilder sb = new StringBuilder(region).append(tenantId).append(metricName);
+
+ Iterator it = dimensions.keySet().iterator();
+ while (it.hasNext()) {
+ String k = it.next();
+ sb.append(k).append(dimensions.get(k));
+ }
+
+ byte[] defIdSha = DigestUtils.sha(sb.toString());
+ Sha1HashId defIdShaHash = new Sha1HashId(defIdSha);
+
+ if (cluster.getMetricIdCache().getIfPresent(defIdShaHash.toHexString()) == null) {
+ addDefinitionToBatch(defIdShaHash, metricName, dimensions, tenantId, region, id,
+ metric.getTimestamp());
+ batches.addMeasurementQuery(buildMeasurementInsertQuery(defIdShaHash, metric.getTimestamp(),
+ metric.getValue(), metric.getValueMeta(), region, tenantId, metricName, dimensions, id));
+ } else {
+ metricCacheHitMeter.mark();
+ batches.addMetricQuery(cluster.getMetricUpdateStmt().bind(retention,
+ new Timestamp(metric.getTimestamp()), region, tenantId, metricName,
+ getDimensionList(dimensions), new ArrayList<>(dimensions.keySet())));
+ batches.addMeasurementQuery(buildMeasurementUpdateQuery(defIdShaHash, metric.getTimestamp(),
+ metric.getValue(), metric.getValueMeta(), id));
+ }
+
+ metricCount++;
+ }
+
+ private String getMeta(String name, Metric metric, Map meta, String id) {
+ if (meta.containsKey(name)) {
+ return (String) meta.get(name);
+ } else {
+ logger.warn(
+ "[{}]: failed to find {} in message envelope meta data. metric message may be malformed. "
+ + "setting {} to empty string.",
+ id, name);
+ logger.warn("[{}]: metric: {}", id, metric.toString());
+ logger.warn("[{}]: meta: {}", id, meta.toString());
+ return EMPTY_STR;
+ }
+ }
+
+ private BoundStatement buildMeasurementUpdateQuery(Sha1HashId defId, long timeStamp, double value,
+ Map valueMeta, String id) {
+
+ String valueMetaString = getValueMetaString(valueMeta, id);
+ if (logger.isDebugEnabled()) {
+ logger.debug("[{}]: adding metric to batch: metric id: {}, time: {}, value: {}, value meta {}",
+ id, defId.toHexString(), timeStamp, value, valueMetaString);
+ }
+
+ return cluster.getMeasurementUpdateStmt().bind(retention, value, valueMetaString,
+ defId.getSha1HashByteBuffer(), new Timestamp(timeStamp));
+ }
+
+ private BoundStatement buildMeasurementInsertQuery(Sha1HashId defId, long timeStamp, double value,
+ Map valueMeta, String region, String tenantId, String metricName,
+ Map dimensions, String id) {
+
+ String valueMetaString = getValueMetaString(valueMeta, id);
+ if (logger.isDebugEnabled()) {
+ logger.debug("[{}]: adding metric to batch: metric id: {}, time: {}, value: {}, value meta {}",
+ id, defId.toHexString(), timeStamp, value, valueMetaString);
+ }
+
+ measurementMeter.mark();
+ return cluster.getMeasurementInsertStmt().bind(retention, value, valueMetaString, region, tenantId,
+ metricName, getDimensionList(dimensions), defId.getSha1HashByteBuffer(),
+ new Timestamp(timeStamp));
+ }
+
+ private String getValueMetaString(Map valueMeta, String id) {
+
+ String valueMetaString = "";
+
+ if (valueMeta != null && !valueMeta.isEmpty()) {
+
+ try {
+
+ valueMetaString = this.objectMapper.writeValueAsString(valueMeta);
+ if (valueMetaString.length() > MAX_VALUE_META_LENGTH) {
+ logger.error("[{}]: Value meta length {} longer than maximum {}, dropping value meta", id,
+ valueMetaString.length(), MAX_VALUE_META_LENGTH);
+ return "";
+ }
+
+ } catch (JsonProcessingException e) {
+
+ logger.error("[{}]: Failed to serialize value meta {}, dropping value meta from measurement",
+ id, valueMeta);
+ }
+ }
+
+ return valueMetaString;
+ }
+
+ private void addDefinitionToBatch(Sha1HashId defId, String metricName, Map dimensions,
+ String tenantId, String region, String id, long timestamp) {
+
+ metricCacheMissMeter.mark();
+ if (logger.isDebugEnabled()) {
+ logger.debug("[{}]: adding definition to batch: defId: {}, name: {}, tenantId: {}, region: {}",
+ id, defId.toHexString(), metricName, tenantId, region);
+ }
+
+ Timestamp ts = new Timestamp(timestamp);
+ batches.addMetricQuery(
+ cluster.getMetricInsertStmt().bind(retention, defId.getSha1HashByteBuffer(), ts, ts, region,
+ tenantId, metricName, getDimensionList(dimensions), new ArrayList<>(dimensions.keySet())));
+
+ for (Map.Entry entry : dimensions.entrySet()) {
+ String name = entry.getKey();
+ String value = entry.getValue();
+
+ String dimensionKey = cluster.getDimnesionEntryKey(region, tenantId, name, value);
+ if (cluster.getDimensionCache().getIfPresent(dimensionKey) != null) {
+ dimensionCacheHitMeter.mark();
+
+ } else {
+ dimensionCacheMissMeter.mark();
+ if (logger.isDebugEnabled()) {
+ logger.debug("[{}]: adding dimension to batch: defId: {}, name: {}, value: {}", id,
+ defId.toHexString(), name, value);
+ }
+ batches.addDimensionQuery(cluster.getDimensionStmt().bind(region, tenantId, name, value));
+ cluster.getDimensionCache().put(dimensionKey, Boolean.TRUE);
+ }
+
+ String metricDimensionKey = cluster.getMetricDimnesionEntryKey(region, tenantId, metricName, name, value);
+ if (cluster.getMetricDimensionCache().getIfPresent(metricDimensionKey) != null) {
+ metricDimensionCacheHitMeter.mark();
+ } else {
+ metricDimensionCacheMissMeter.mark();
+ batches.addDimensionMetricQuery(
+ cluster.getDimensionMetricStmt().bind(region, tenantId, name, value, metricName));
+
+ batches.addMetricDimensionQuery(
+ cluster.getMetricDimensionStmt().bind(region, tenantId, metricName, name, value));
+ cluster.getMetricDimensionCache().put(metricDimensionKey, Boolean.TRUE);
+ }
+ }
+
+ String metricId = defId.toHexString();
+ cluster.getMetricIdCache().put(metricId, Boolean.TRUE);
+ }
+
+ public List getDimensionList(Map dimensions) {
+ List list = new ArrayList<>(dimensions.size());
+ for (Entry dim : dimensions.entrySet()) {
+ list.add(new StringBuffer(dim.getKey()).append('\t').append(dim.getValue()).toString());
+ }
+ return list;
+ }
+
+ @Override
+ public int flush(String id) throws RepoException {
+ long startTime = System.nanoTime();
+ List results = new ArrayList<>();
+ List> list = batches.getAllBatches();
+ for (Deque q : list) {
+ BatchStatement b;
+ while ((b = q.poll()) != null) {
+ results.add(session.executeAsync(b));
+ }
+ }
+
+ List> futures = Futures.inCompletionOrder(results);
+
+ boolean cancel = false;
+ Exception ex = null;
+ for (ListenableFuture future : futures) {
+ if (cancel) {
+ future.cancel(false);
+ continue;
+ }
+ try {
+ future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ cancel = true;
+ ex = e;
+ }
+ }
+
+ this.commitTimer.update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+
+ if (ex != null) {
+ metricFailed.inc(metricCount);
+ throw new RepoException(ex);
+ }
+
+ batches.clear();
+ int flushCnt = metricCount;
+ metricCount = 0;
+ metricCompleted.inc(flushCnt);
+ return flushCnt;
+ }
+}
diff --git a/java/src/main/java/monasca/persister/repository/cassandra/CassandraRepo.java b/java/src/main/java/monasca/persister/repository/cassandra/CassandraRepo.java
new file mode 100644
index 00000000..e536d3e5
--- /dev/null
+++ b/java/src/main/java/monasca/persister/repository/cassandra/CassandraRepo.java
@@ -0,0 +1,210 @@
+/*
+ * Copyright (c) 2017 SUSE LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package monasca.persister.repository.cassandra;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Timer;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.BatchStatement.Type;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.exceptions.BootstrappingException;
+import com.datastax.driver.core.exceptions.DriverException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.exceptions.OperationTimedOutException;
+import com.datastax.driver.core.exceptions.OverloadedException;
+import com.datastax.driver.core.exceptions.QueryConsistencyException;
+import com.datastax.driver.core.exceptions.UnavailableException;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import io.dropwizard.setup.Environment;
+import monasca.persister.repository.RepoException;
+
+public abstract class CassandraRepo {
+ private static Logger logger = LoggerFactory.getLogger(CassandraRepo.class);
+
+ final Environment environment;
+
+ final Timer commitTimer;
+
+ CassandraCluster cluster;
+ Session session;
+
+ int maxWriteRetries;
+
+ int batchSize;
+
+ long lastFlushTimeStamp;
+
+ Deque queue;
+
+ Counter metricCompleted;
+
+ Counter metricFailed;
+
+ public CassandraRepo(CassandraCluster cluster, Environment env, int maxWriteRetries, int batchSize) {
+ this.cluster = cluster;
+ this.maxWriteRetries = maxWriteRetries;
+ this.batchSize = batchSize;
+
+ this.environment = env;
+
+ this.commitTimer = this.environment.metrics().timer(getClass().getName() + "." + "commit-timer");
+
+ lastFlushTimeStamp = System.currentTimeMillis();
+
+ queue = new ArrayDeque<>(batchSize);
+
+ this.metricCompleted = environment.metrics()
+ .counter(getClass().getName() + "." + "metrics-persisted-counter");
+
+ this.metricFailed = environment.metrics()
+ .counter(getClass().getName() + "." + "metrics-failed-counter");
+ }
+
+ protected void executeQuery(String id, Statement query, long startTime) throws DriverException {
+ _executeQuery(id, query, startTime, 0);
+ }
+
+ private void _executeQuery(final String id, final Statement query, final long startTime,
+ final int retryCount) {
+ try {
+ session.execute(query);
+
+ commitTimer.update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+
+ // ResultSetFuture future = session.executeAsync(query);
+
+ // Futures.addCallback(future, new FutureCallback() {
+ // @Override
+ // public void onSuccess(ResultSet result) {
+ // metricCompleted.inc();
+ // commitTimer.update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+ // }
+ //
+ // @Override
+ // public void onFailure(Throwable t) {
+ // if (t instanceof NoHostAvailableException | t instanceof
+ // BootstrappingException
+ // | t instanceof OverloadedException | t instanceof QueryConsistencyException
+ // | t instanceof UnavailableException) {
+ // retryQuery(id, query, startTime, retryCount, (DriverException) t);
+ // } else {
+ // metricFailed.inc();
+ // commitTimer.update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+ // logger.error("Failed to execute query.", t);
+ // }
+ // }
+ // }, MoreExecutors.sameThreadExecutor());
+
+ } catch (NoHostAvailableException | BootstrappingException | OverloadedException
+ | QueryConsistencyException | UnavailableException | OperationTimedOutException e) {
+ retryQuery(id, query, startTime, retryCount, e);
+ } catch (DriverException e) {
+ metricFailed.inc(((BatchStatement) query).size());
+ commitTimer.update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+ throw e;
+ }
+ }
+
+ private void retryQuery(String id, Statement query, final long startTime, int retryCount,
+ DriverException e) throws DriverException {
+ if (retryCount >= maxWriteRetries) {
+ logger.error("[{}]: Query aborted after {} retry: ", id, retryCount, e.getMessage());
+ metricFailed.inc(((BatchStatement) query).size());
+ commitTimer.update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+ throw e;
+ } else {
+ logger.warn("[{}]: Query failed, retrying {} of {}: {} ", id, retryCount, maxWriteRetries,
+ e.getMessage());
+
+ try {
+ Thread.sleep(1000 * (1 << retryCount));
+ } catch (InterruptedException ie) {
+ logger.debug("[{}]: Interrupted: {}", id, ie);
+ }
+ _executeQuery(id, query, startTime, retryCount++);
+ }
+ }
+
+ public int handleFlush_batch(String id) {
+ Statement query;
+ int flushedCount = 0;
+
+ BatchStatement batch = new BatchStatement(Type.UNLOGGED);
+ while ((query = queue.poll()) != null) {
+ flushedCount++;
+ batch.add(query);
+ }
+
+ executeQuery(id, batch, System.nanoTime());
+
+ metricCompleted.inc(flushedCount);
+
+ return flushedCount;
+ }
+
+ public int handleFlush(String id) throws RepoException {
+ long startTime = System.nanoTime();
+
+ int flushedCount = 0;
+ List results = new ArrayList<>(queue.size());
+ Statement query;
+ while ((query = queue.poll()) != null) {
+ flushedCount++;
+ results.add(session.executeAsync(query));
+ }
+
+ List> futures = Futures.inCompletionOrder(results);
+
+ boolean cancel = false;
+ Exception ex = null;
+ for (ListenableFuture future : futures) {
+ if (cancel) {
+ future.cancel(false);
+ continue;
+ }
+ try {
+ future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ cancel = true;
+ ex = e;
+ }
+ }
+
+ commitTimer.update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+
+ if (ex != null) {
+ throw new RepoException(ex);
+ }
+ return flushedCount;
+ }
+}
diff --git a/java/src/main/java/monasca/persister/repository/cassandra/MonascaRetryPolicy.java b/java/src/main/java/monasca/persister/repository/cassandra/MonascaRetryPolicy.java
new file mode 100644
index 00000000..f2d884aa
--- /dev/null
+++ b/java/src/main/java/monasca/persister/repository/cassandra/MonascaRetryPolicy.java
@@ -0,0 +1,77 @@
+package monasca.persister.repository.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.WriteType;
+import com.datastax.driver.core.exceptions.DriverException;
+import com.datastax.driver.core.policies.RetryPolicy;
+
+public class MonascaRetryPolicy implements RetryPolicy {
+
+ private final int readAttempts;
+ private final int writeAttempts;
+ private final int unavailableAttempts;
+
+ public MonascaRetryPolicy(int readAttempts, int writeAttempts, int unavailableAttempts) {
+ super();
+ this.readAttempts = readAttempts;
+ this.writeAttempts = writeAttempts;
+ this.unavailableAttempts = unavailableAttempts;
+ }
+
+ @Override
+ public RetryDecision onReadTimeout(Statement stmnt, ConsistencyLevel cl, int requiredResponses,
+ int receivedResponses, boolean dataReceived, int rTime) {
+ if (dataReceived) {
+ return RetryDecision.ignore();
+ } else if (rTime < readAttempts) {
+ return receivedResponses >= requiredResponses ? RetryDecision.retry(cl)
+ : RetryDecision.rethrow();
+ } else {
+ return RetryDecision.rethrow();
+ }
+
+ }
+
+ @Override
+ public RetryDecision onWriteTimeout(Statement stmnt, ConsistencyLevel cl, WriteType wt,
+ int requiredResponses, int receivedResponses, int wTime) {
+ if (wTime >= writeAttempts)
+ return RetryDecision.rethrow();
+
+ return RetryDecision.retry(cl);
+ }
+
+ @Override
+ public RetryDecision onUnavailable(Statement stmnt, ConsistencyLevel cl, int requiredResponses,
+ int receivedResponses, int uTime) {
+ if (uTime == 0) {
+ return RetryDecision.tryNextHost(cl);
+ } else if (uTime <= unavailableAttempts) {
+ return RetryDecision.retry(cl);
+ } else {
+ return RetryDecision.rethrow();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public RetryDecision onRequestError(Statement statement, ConsistencyLevel cl, DriverException e,
+ int nbRetry) {
+ return RetryDecision.tryNextHost(cl);
+ }
+
+ @Override
+ public void init(Cluster cluster) {
+ // nothing to do
+ }
+
+ @Override
+ public void close() {
+ // nothing to do
+ }
+
+}
diff --git a/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java b/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java
index 52baf7ee..a0c22e4b 100644
--- a/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java
+++ b/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java
@@ -1,6 +1,8 @@
/*
* (C) Copyright 2014-2016 Hewlett Packard Enterprise Development LP
*
+ * (C) Copyright 2017 SUSE LLC.
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -50,6 +52,7 @@ import monasca.common.model.metric.MetricEnvelope;
import monasca.persister.configuration.PersisterConfig;
import monasca.persister.repository.Repo;
import monasca.persister.repository.RepoException;
+import monasca.persister.repository.Sha1HashId;
public class VerticaMetricRepo extends VerticaRepo implements Repo {
diff --git a/monasca_persister/conf/cassandra.py b/monasca_persister/conf/cassandra.py
index 93cdc325..09bbe18a 100644
--- a/monasca_persister/conf/cassandra.py
+++ b/monasca_persister/conf/cassandra.py
@@ -1,6 +1,7 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
# Copyright 2017 FUJITSU LIMITED
-#
+# (C) Copyright 2017 SUSE LLC
+
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
@@ -17,13 +18,47 @@
from oslo_config import cfg
cassandra_opts = [
- cfg.ListOpt('cluster_ip_addresses',
+ cfg.ListOpt('contact_points',
help='Comma separated list of Cassandra node IP addresses',
default=['127.0.0.1'],
item_type=cfg.IPOpt),
+ cfg.IntOpt('port',
+ help='Cassandra port number',
+ default=8086),
cfg.StrOpt('keyspace',
- help='keyspace where metric are stored',
- default='monasca')]
+ help='Keyspace name where metrics are stored',
+ default='monasca'),
+ cfg.StrOpt('user',
+ help='Cassandra user name',
+ default=''),
+ cfg.StrOpt('password',
+ help='Cassandra password',
+ secret=True,
+ default=''),
+ cfg.IntOpt('connection_timeout',
+ help='Cassandra timeout in seconds when creating a new connection',
+ default=5),
+ cfg.IntOpt('read_timeout',
+ help='Cassandra read timeout in seconds',
+ default=60),
+ cfg.IntOpt('max_write_retries',
+ help='Maximum number of retries in write ops',
+ default=1),
+ cfg.IntOpt('max_definition_cache_size',
+ help='Maximum number of cached metric definition entries in memory',
+ default=20000000),
+ cfg.IntOpt('retention_policy',
+ help='Data retention period in days',
+ default=45),
+ cfg.StrOpt('consistency_level',
+ help='Cassandra default consistency level',
+ default='ONE'),
+ cfg.StrOpt('local_data_center',
+ help='Cassandra local data center name'),
+ cfg.IntOpt('max_batches',
+ help='Maximum batch size in Cassandra',
+ default=250),
+]
cassandra_group = cfg.OptGroup(name='cassandra')
diff --git a/monasca_persister/conf/kafka_alarm_history.py b/monasca_persister/conf/kafka_alarm_history.py
index 87ea58ff..77c24eea 100644
--- a/monasca_persister/conf/kafka_alarm_history.py
+++ b/monasca_persister/conf/kafka_alarm_history.py
@@ -1,5 +1,6 @@
# (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP
# Copyright 2017 FUJITSU LIMITED
+# (C) Copyright 2017 SUSE LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -40,7 +41,10 @@ kafka_alarm_history_opts = [
default='alarm-state-transitions'),
cfg.StrOpt('zookeeper_path',
help='Path in zookeeper for kafka consumer group partitioning algorithm',
- default='/persister_partitions/$kafka_alarm_history.topic')
+ default='/persister_partitions/$kafka_alarm_history.topic'),
+ cfg.IntOpt('batch_size',
+ help='Maximum number of alarm state history messages to buffer before writing to database',
+ default=1),
]
diff --git a/monasca_persister/conf/kafka_common.py b/monasca_persister/conf/kafka_common.py
index 0c2cfc76..534d5926 100644
--- a/monasca_persister/conf/kafka_common.py
+++ b/monasca_persister/conf/kafka_common.py
@@ -1,5 +1,6 @@
# (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP
# Copyright 2017 FUJITSU LIMITED
+# (C) Copyright 2017 SUSE LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -26,9 +27,6 @@ kafka_common_opts = [
help='id of persister kafka client',
advanced=True,
default='monasca-persister'),
- cfg.IntOpt('database_batch_size',
- help='Maximum number of metric to buffer before writing to database',
- default=1000),
cfg.IntOpt('max_wait_time_seconds',
help='Maximum wait time for write batch to database',
default=30),
diff --git a/monasca_persister/conf/kafka_metrics.py b/monasca_persister/conf/kafka_metrics.py
index 70c8f8f1..4382e52c 100644
--- a/monasca_persister/conf/kafka_metrics.py
+++ b/monasca_persister/conf/kafka_metrics.py
@@ -41,6 +41,9 @@ kafka_metrics_opts = [
cfg.StrOpt('zookeeper_path',
help='Path in zookeeper for kafka consumer group partitioning algorithm',
default='/persister_partitions/$kafka_metrics.topic'),
+ cfg.IntOpt('batch_size',
+ help='Maximum number of metrics to buffer before writing to database',
+ default=20000),
]
# Replace Default OPt with reference to kafka group option
diff --git a/monasca_persister/repositories/cassandra/abstract_repository.py b/monasca_persister/repositories/cassandra/abstract_repository.py
index a7257844..188f1e83 100644
--- a/monasca_persister/repositories/cassandra/abstract_repository.py
+++ b/monasca_persister/repositories/cassandra/abstract_repository.py
@@ -1,4 +1,5 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
+# (C) Copyright 2017 SUSE LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,26 +13,23 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
import abc
-from cassandra import cluster
-from cassandra import query
from oslo_config import cfg
import six
from monasca_persister.repositories import abstract_repository
+from monasca_persister.repositories.cassandra import connection_util
+conf = cfg.CONF
@six.add_metaclass(abc.ABCMeta)
class AbstractCassandraRepository(abstract_repository.AbstractRepository):
-
def __init__(self):
super(AbstractCassandraRepository, self).__init__()
- self.conf = cfg.CONF
- self._cassandra_cluster = cluster.Cluster(
- self.conf.cassandra.cluster_ip_addresses.split(','))
-
- self.cassandra_session = self._cassandra_cluster.connect(
- self.conf.cassandra.keyspace)
-
- self._batch_stmt = query.BatchStatement()
+ self._cluster = connection_util.create_cluster()
+ self._session = connection_util.create_session(self._cluster)
+ self._retention = conf.cassandra.retention_policy * 24 * 3600
+ self._cache_size = conf.cassandra.max_definition_cache_size
+ self._max_batches = conf.cassandra.max_batches
diff --git a/monasca_persister/repositories/cassandra/alarm_state_history_repository.py b/monasca_persister/repositories/cassandra/alarm_state_history_repository.py
index 7993e750..fe0c0a08 100644
--- a/monasca_persister/repositories/cassandra/alarm_state_history_repository.py
+++ b/monasca_persister/repositories/cassandra/alarm_state_history_repository.py
@@ -1,4 +1,5 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
+# (C) Copyright 2017 SUSE LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,9 +13,11 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
import ujson as json
-from cassandra import query
+from cassandra.concurrent import execute_concurrent_with_args
+from oslo_config import cfg
from oslo_log import log
from monasca_persister.repositories.cassandra import abstract_repository
@@ -22,51 +25,38 @@ from monasca_persister.repositories.utils import parse_alarm_state_hist_message
LOG = log.getLogger(__name__)
+UPSERT_CQL = ('update monasca.alarm_state_history USING TTL ? '
+ 'set metric = ?, old_state = ?, new_state = ?, sub_alarms = ?, reason = ?, reason_data = ? '
+ 'where tenant_id = ? and alarm_id = ? and time_stamp = ?')
-class AlarmStateHistCassandraRepository(
- abstract_repository.AbstractCassandraRepository):
+class AlarmStateHistCassandraRepository(abstract_repository.AbstractCassandraRepository):
def __init__(self):
-
super(AlarmStateHistCassandraRepository, self).__init__()
- self._insert_alarm_state_hist_stmt = self.cassandra_session.prepare(
- 'insert into alarm_state_history (tenant_id, alarm_id, '
- 'metrics, new_state, '
- 'old_state, reason, reason_data, '
- 'sub_alarms, time_stamp) values (?,?,?,?,?,?,?,?,?)')
+ self._upsert_stmt = self._session.prepare(UPSERT_CQL)
def process_message(self, message):
+ (alarm_id, metrics, new_state, old_state, link, lifecycle_state, state_change_reason,
+ sub_alarms_json_snake_case,
+ tenant_id, time_stamp) = parse_alarm_state_hist_message(message)
- (alarm_id, metrics, new_state, old_state, link,
- lifecycle_state, state_change_reason,
- sub_alarms_json_snake_case, tenant_id,
- time_stamp) = parse_alarm_state_hist_message(
- message)
-
- alarm_state_hist = (
- tenant_id.encode('utf8'),
- alarm_id.encode('utf8'),
- json.dumps(metrics, ensure_ascii=False).encode(
- 'utf8'),
- new_state.encode('utf8'),
- old_state.encode('utf8'),
- state_change_reason.encode('utf8'),
- "{}".encode('utf8'),
- sub_alarms_json_snake_case.encode('utf8'),
- time_stamp
- )
-
- LOG.debug(alarm_state_hist)
+ alarm_state_hist = (self._retention,
+ json.dumps(metrics, ensure_ascii=False).encode('utf8'),
+ old_state.encode('utf8'),
+ new_state.encode('utf8'),
+ sub_alarms_json_snake_case.encode('utf8'),
+ state_change_reason.encode('utf8'),
+ "{}".encode('utf8'),
+ tenant_id.encode('utf8'),
+ alarm_id.encode('utf8'),
+ time_stamp)
return alarm_state_hist
def write_batch(self, alarm_state_hists):
-
- for alarm_state_hist in alarm_state_hists:
- self._batch_stmt.add(self._insert_alarm_state_hist_stmt,
- alarm_state_hist)
-
- self.cassandra_session.execute(self._batch_stmt)
-
- self._batch_stmt = query.BatchStatement()
+ while alarm_state_hists:
+ num_rows = min(len(alarm_state_hists), cfg.CONF.kafka_alarm_history.batch_size)
+ batch = alarm_state_hists[:num_rows]
+ execute_concurrent_with_args(self._session, self._upsert_stmt, batch)
+ alarm_state_hists = alarm_state_hists[num_rows:]
diff --git a/monasca_persister/repositories/cassandra/connection_util.py b/monasca_persister/repositories/cassandra/connection_util.py
new file mode 100644
index 00000000..fce085cb
--- /dev/null
+++ b/monasca_persister/repositories/cassandra/connection_util.py
@@ -0,0 +1,51 @@
+# (C) Copyright 2017 SUSE LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from cassandra.auth import PlainTextAuthProvider
+from cassandra.cluster import Cluster
+from cassandra.cluster import ConsistencyLevel
+from cassandra.cluster import DCAwareRoundRobinPolicy
+from cassandra.cluster import TokenAwarePolicy
+from oslo_config import cfg
+
+from monasca_persister.repositories.cassandra.retry_policy import MonascaRetryPolicy
+
+conf = cfg.CONF
+
+def create_cluster():
+ user = conf.cassandra.user
+ if user:
+ auth_provider = PlainTextAuthProvider(username=user, password=conf.cassandra.password)
+ else:
+ auth_provider = None
+
+ contact_points = [ip.dest for ip in conf.cassandra.contact_points]
+ cluster = Cluster(contact_points,
+ port=conf.cassandra.port,
+ auth_provider=auth_provider,
+ connect_timeout=conf.cassandra.connection_timeout,
+ load_balancing_policy=TokenAwarePolicy(
+ DCAwareRoundRobinPolicy(local_dc=conf.cassandra.local_data_center)),
+ )
+ cluster.default_retry_policy = MonascaRetryPolicy(1, conf.cassandra.max_write_retries,
+ conf.cassandra.max_write_retries)
+ return cluster
+
+
+def create_session(cluster):
+ session = cluster.connect(conf.cassandra.keyspace)
+ session.default_timeout = conf.cassandra.read_timeout
+ session.default_consistency_level = ConsistencyLevel.name_to_value[conf.cassandra.consistency_level]
+ return session
diff --git a/monasca_persister/repositories/cassandra/metric_batch.py b/monasca_persister/repositories/cassandra/metric_batch.py
new file mode 100644
index 00000000..04a6c60b
--- /dev/null
+++ b/monasca_persister/repositories/cassandra/metric_batch.py
@@ -0,0 +1,150 @@
+# (C) Copyright 2017 SUSE LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from cassandra.query import BatchStatement
+from cassandra.query import BatchType
+from oslo_log import log
+
+LOG = log.getLogger(__name__)
+
+
+class MetricBatch(object):
+ def __init__(self, metadata, load_balance_policy, batch_limit):
+
+ self.metadata = metadata
+ self.batch_limit = batch_limit
+ self.lb_policy = load_balance_policy
+ self.metric_queries = dict()
+ self.dimension_queries = dict()
+ self.dimension_metric_queries = dict()
+ self.metric_dimension_queries = dict()
+ self.measurement_queries = dict()
+
+ def batch_query_by_token(self, bound_stmt, query_map):
+ token = self.metadata.token_map.token_class.from_key(bound_stmt.routing_key)
+
+ queue = query_map.get(token, None)
+ if not queue:
+ queue = []
+ batch = BatchStatement(BatchType.UNLOGGED)
+ batch.add(bound_stmt)
+ queue.append((batch, Counter(1)))
+ query_map[token] = queue
+ else:
+ (batch, counter) = queue[-1]
+ if counter.value() < self.batch_limit:
+ batch.add(bound_stmt)
+ counter.increment()
+ else:
+ batch = BatchStatement(BatchType.UNLOGGED)
+ batch.add(bound_stmt)
+ queue.append((batch, Counter(1)))
+
+ def add_metric_query(self, bound_stmt):
+ self.batch_query_by_token(bound_stmt, self.metric_queries)
+
+ def add_dimension_query(self, bound_stmt):
+ self.batch_query_by_token(bound_stmt, self.dimension_queries)
+
+ def add_dimension_metric_query(self, bound_stmt):
+ self.batch_query_by_token(bound_stmt, self.dimension_metric_queries)
+
+ def add_metric_dimension_query(self, bound_stmt):
+ self.batch_query_by_token(bound_stmt, self.metric_dimension_queries)
+
+ def add_measurement_query(self, bound_stmt):
+ self.batch_query_by_replicas(bound_stmt, self.measurement_queries)
+
+ def batch_query_by_replicas(self, bound_stmt, query_map):
+ hosts = tuple(self.lb_policy.make_query_plan(working_keyspace=bound_stmt.keyspace, query=bound_stmt))
+
+ queue = query_map.get(hosts, None)
+ if not queue:
+ queue = []
+ batch = BatchStatement(BatchType.UNLOGGED)
+ batch.add(bound_stmt)
+ queue.append((batch, Counter(1)))
+ query_map[hosts] = queue
+ else:
+ (batch, counter) = queue[-1]
+ if counter.value() < 30:
+ batch.add(bound_stmt)
+ counter.increment()
+ else:
+ batch = BatchStatement(BatchType.UNLOGGED)
+ batch.add(bound_stmt)
+ queue.append((batch, Counter(1)))
+
+ def clear(self):
+ self.metric_queries.clear()
+ self.dimension_queries.clear()
+ self.dimension_metric_queries.clear()
+ self.metric_dimension_queries.clear()
+ self.measurement_queries.clear()
+
+ @staticmethod
+ def log_token_batch_map(name, query_map):
+ LOG.info('%s : Size: %s; Tokens: |%s|' % (name, len(query_map),
+ '|'.join(['%s: %s' % (
+ token,
+ ','.join([str(counter.value()) for (batch, counter) in queue]))
+ for token, queue in query_map.items()])))
+
+ @staticmethod
+ def log_replica_batch_map(name, query_map):
+ LOG.info('%s : Size: %s; Replicas: |%s|' % (name, len(query_map), '|'.join([
+ '%s: %s' % (
+ ','.join([h.address for h in hosts]), ','.join([str(counter.value()) for (batch, counter) in queue]))
+ for hosts, queue in query_map.items()])))
+
+ def get_all_batches(self):
+ self.log_token_batch_map("metric batches", self.metric_queries)
+ self.log_token_batch_map("dimension batches", self.dimension_queries)
+ self.log_token_batch_map("dimension metric batches", self.dimension_metric_queries)
+ self.log_token_batch_map("metric dimension batches", self.metric_dimension_queries)
+ self.log_replica_batch_map("measurement batches", self.measurement_queries)
+
+ result_list = []
+
+ for q in self.measurement_queries.values():
+ result_list.extend(q)
+
+ for q in self.metric_queries.values():
+ result_list.extend(q)
+
+ for q in self.dimension_queries.values():
+ result_list.extend(q)
+
+ for q in self.dimension_metric_queries.values():
+ result_list.extend(q)
+
+ for q in self.metric_dimension_queries.values():
+ result_list.extend(q)
+
+ return result_list
+
+
+class Counter(object):
+ def __init__(self, init_value=0):
+ self._count = init_value
+
+ def increment(self):
+ self._count += 1
+
+ def increment_by(self, increment):
+ self._count += increment
+
+ def value(self):
+ return self._count
diff --git a/monasca_persister/repositories/cassandra/metrics_repository.py b/monasca_persister/repositories/cassandra/metrics_repository.py
index 5ec798d6..7eee8b35 100644
--- a/monasca_persister/repositories/cassandra/metrics_repository.py
+++ b/monasca_persister/repositories/cassandra/metrics_repository.py
@@ -1,4 +1,5 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
+# (C) Copyright 2017 SUSE LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,105 +13,261 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
+from __future__ import with_statement
+from cachetools import LRUCache
+from collections import namedtuple
import hashlib
+import threading
import ujson as json
-from cassandra import query
+from cassandra.concurrent import execute_concurrent
from oslo_log import log
-import urllib
from monasca_persister.repositories.cassandra import abstract_repository
+from monasca_persister.repositories.cassandra import token_range_query_manager
+from monasca_persister.repositories.cassandra.metric_batch import MetricBatch
from monasca_persister.repositories.utils import parse_measurement_message
LOG = log.getLogger(__name__)
+MEASUREMENT_INSERT_CQL = ('update monasca.measurements USING TTL ? '
+ 'set value = ?, value_meta = ?, region = ?, tenant_id = ?, metric_name = ?, dimensions = ? '
+ 'where metric_id = ? and time_stamp = ?')
-class MetricCassandraRepository(
- abstract_repository.AbstractCassandraRepository):
+MEASUREMENT_UPDATE_CQL = ('update monasca.measurements USING TTL ? '
+ 'set value = ?, value_meta = ? where metric_id = ? and time_stamp = ?')
+METRICS_INSERT_CQL = ('update monasca.metrics USING TTL ? '
+ 'set metric_id = ?, created_at = ?, updated_at = ? '
+ 'where region = ? and tenant_id = ? and metric_name = ? and dimensions = ? '
+ 'and dimension_names = ?')
+
+METRICS_UPDATE_CQL = ('update monasca.metrics USING TTL ? '
+ 'set updated_at = ? '
+ 'where region = ? and tenant_id = ? and metric_name = ? and dimensions = ? '
+ 'and dimension_names = ?')
+
+DIMENSION_INSERT_CQL = ('insert into monasca.dimensions '
+ '(region, tenant_id, name, value) values (?, ?, ?, ?)')
+
+DIMENSION_METRIC_INSERT_CQL = ('insert into monasca.dimensions_metrics '
+ '(region, tenant_id, dimension_name, dimension_value, metric_name) '
+ 'values (?, ?, ?, ?, ?)')
+
+METRIC_DIMENSION_INSERT_CQL = ('insert into monasca.metrics_dimensions '
+ '(region, tenant_id, metric_name, dimension_name, dimension_value) '
+ 'values (?, ?, ?, ?, ?)')
+
+RETRIEVE_DIMENSION_CQL = 'select region, tenant_id, name, value from dimensions'
+
+RETRIEVE_METRIC_DIMENSION_CQL = ('select region, tenant_id, metric_name, '
+ 'dimension_name, dimension_value from metrics_dimensions '
+ 'WHERE token(region, tenant_id, metric_name) > ? '
+ 'and token(region, tenant_id, metric_name) <= ? ')
+
+Metric = namedtuple('Metric', ['id', 'region', 'tenant_id', 'name', 'dimension_list', 'dimension_names',
+ 'time_stamp', 'value', 'value_meta'])
+
+
+class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository):
def __init__(self):
-
super(MetricCassandraRepository, self).__init__()
- self._insert_measurement_stmt = self.cassandra_session.prepare(
- 'insert into measurements (tenant_id,'
- 'region, metric_hash, time_stamp, value,'
- 'value_meta) values (?, ?, ?, ?, ?, ?)')
+ self._lock = threading.RLock()
- self._insert_metric_map_stmt = self.cassandra_session.prepare(
- 'insert into metric_map (tenant_id,'
- 'region, metric_hash, '
- 'metric_map) values'
- '(?,?,?,?)')
+ LOG.debug("prepare cql statements...")
+
+ self._measurement_insert_stmt = self._session.prepare(MEASUREMENT_INSERT_CQL)
+ self._measurement_insert_stmt.is_idempotent = True
+
+ self._measurement_update_stmt = self._session.prepare(MEASUREMENT_UPDATE_CQL)
+ self._measurement_update_stmt.is_idempotent = True
+
+ self._metric_insert_stmt = self._session.prepare(METRICS_INSERT_CQL)
+ self._metric_insert_stmt.is_idempotent = True
+
+ self._metric_update_stmt = self._session.prepare(METRICS_UPDATE_CQL)
+ self._metric_update_stmt.is_idempotent = True
+
+ self._dimension_stmt = self._session.prepare(DIMENSION_INSERT_CQL)
+ self._dimension_stmt.is_idempotent = True
+
+ self._dimension_metric_stmt = self._session.prepare(DIMENSION_METRIC_INSERT_CQL)
+ self._dimension_metric_stmt.is_idempotent = True
+
+ self._metric_dimension_stmt = self._session.prepare(METRIC_DIMENSION_INSERT_CQL)
+ self._metric_dimension_stmt.is_idempotent = True
+
+ self._retrieve_metric_dimension_stmt = self._session.prepare(RETRIEVE_METRIC_DIMENSION_CQL)
+
+ self._metric_batch = MetricBatch(self._cluster.metadata, self._cluster.load_balancing_policy, self._max_batches)
+
+ self._metric_id_cache = LRUCache(self._cache_size)
+ self._dimension_cache = LRUCache(self._cache_size)
+ self._metric_dimension_cache = LRUCache(self._cache_size)
+
+ self._load_dimension_cache()
+ self._load_metric_dimension_cache()
def process_message(self, message):
-
(dimensions, metric_name, region, tenant_id, time_stamp, value,
value_meta) = parse_measurement_message(message)
- metric_hash, metric_map = create_metric_hash(metric_name,
- dimensions)
+ with self._lock:
+ dim_names = []
+ dim_list = []
+ for name in sorted(dimensions.iterkeys()):
+ dim_list.append('%s\t%s' % (name, dimensions[name]))
+ dim_names.append(name)
- measurement = (tenant_id.encode('utf8'),
- region.encode('utf8'),
- metric_hash,
- time_stamp,
- value,
- json.dumps(value_meta, ensure_ascii=False).encode(
- 'utf8'))
+ hash_string = '%s\0%s\0%s\0%s' % (region, tenant_id, metric_name, '\0'.join(dim_list))
+ metric_id = hashlib.sha1(hash_string.encode('utf8')).hexdigest()
- LOG.debug(measurement)
+ metric = Metric(id=metric_id,
+ region=region,
+ tenant_id=tenant_id,
+ name=metric_name,
+ dimension_list=dim_list,
+ dimension_names=dim_names,
+ time_stamp=time_stamp,
+ value=value,
+ value_meta=json.dumps(value_meta, ensure_ascii=False))
- return MetricMeasurementInfo(
- tenant_id.encode('utf8'),
- region.encode('utf8'),
- metric_hash,
- metric_map,
- measurement)
+ id_bytes = bytearray.fromhex(metric.id)
+ if self._metric_id_cache.get(metric.id, None):
+ measurement_bound_stmt = self._measurement_update_stmt.bind((self._retention,
+ metric.value,
+ metric.value_meta,
+ id_bytes,
+ metric.time_stamp))
+ self._metric_batch.add_measurement_query(measurement_bound_stmt)
- def write_batch(self, metric_measurement_infos):
+ metric_update_bound_stmt = self._metric_update_stmt.bind((self._retention,
+ metric.time_stamp,
+ metric.region,
+ metric.tenant_id,
+ metric.name,
+ metric.dimension_list,
+ metric.dimension_names))
+ self._metric_batch.add_metric_query(metric_update_bound_stmt)
- for metric_measurement_info in metric_measurement_infos:
+ return metric
- self._batch_stmt.add(self._insert_measurement_stmt,
- metric_measurement_info.measurement)
+ self._metric_id_cache[metric.id] = metric.id
- metric_map = (metric_measurement_info.tenant_id,
- metric_measurement_info.region,
- metric_measurement_info.metric_hash,
- metric_measurement_info.metric_map)
+ metric_insert_bound_stmt = self._metric_insert_stmt.bind((self._retention,
+ id_bytes,
+ metric.time_stamp,
+ metric.time_stamp,
+ metric.region,
+ metric.tenant_id,
+ metric.name,
+ metric.dimension_list,
+ metric.dimension_names))
+ self._metric_batch.add_metric_query(metric_insert_bound_stmt)
- self._batch_stmt.add(self._insert_metric_map_stmt,
- metric_map)
+ for dim in metric.dimension_list:
+ (name, value) = dim.split('\t')
+ dim_key = self._get_dimnesion_key(metric.region, metric.tenant_id, name, value)
+ if not self._dimension_cache.get(dim_key, None):
+ dimension_bound_stmt = self._dimension_stmt.bind((metric.region,
+ metric.tenant_id,
+ name,
+ value))
+ self._metric_batch.add_dimension_query(dimension_bound_stmt)
+ self._dimension_cache[dim_key] = dim_key
- self.cassandra_session.execute(self._batch_stmt)
+ metric_dim_key = self._get_metric_dimnesion_key(metric.region, metric.tenant_id, metric.name, name,
+ value)
+ if not self._metric_dimension_cache.get(metric_dim_key, None):
+ dimension_metric_bound_stmt = self._dimension_metric_stmt.bind((metric.region,
+ metric.tenant_id,
+ name,
+ value,
+ metric.name))
+ self._metric_batch.add_dimension_metric_query(dimension_metric_bound_stmt)
- self._batch_stmt = query.BatchStatement()
+ metric_dimension_bound_stmt = self._metric_dimension_stmt.bind((metric.region,
+ metric.tenant_id,
+ metric.name,
+ name,
+ value))
+ self._metric_batch.add_metric_dimension_query(metric_dimension_bound_stmt)
+ self._metric_dimension_cache[metric_dim_key] = metric_dim_key
-class MetricMeasurementInfo(object):
+ measurement_insert_bound_stmt = self._measurement_insert_stmt.bind((self._retention,
+ metric.value,
+ metric.value_meta,
+ metric.region,
+ metric.tenant_id,
+ metric.name,
+ metric.dimension_list,
+ id_bytes,
+ metric.time_stamp))
+ self._metric_batch.add_measurement_query(measurement_insert_bound_stmt)
- def __init__(self, tenant_id, region, metric_hash, metric_map,
- measurement):
+ return metric
- self.tenant_id = tenant_id
- self.region = region
- self.metric_hash = metric_hash
- self.metric_map = metric_map
- self.measurement = measurement
+ def write_batch(self, metrics):
+ with self._lock:
+ batch_list = self._metric_batch.get_all_batches()
-def create_metric_hash(metric_name, dimensions):
+ results = execute_concurrent(self._session, batch_list, raise_on_first_error=True)
- dimensions['__name__'] = urllib.quote_plus(metric_name)
+ self._handle_results(results)
- hash_string = ''
+ self._metric_batch.clear()
- for dim_name in sorted(dimensions.iterkeys()):
- dimension = (urllib.quote_plus(dim_name) + '=' + urllib.quote_plus(
- dimensions[dim_name]))
- hash_string += dimension
+ LOG.info("flushed %s metrics", len(metrics))
- sha1_hash = hashlib.sha1(hash_string).hexdigest()
+ @staticmethod
+ def _handle_results(results):
+ for (success, result) in results:
+ if not success:
+ raise result
- return bytearray.fromhex(sha1_hash), dimensions
+ def _load_dimension_cache(self):
+
+ rows = self._session.execute(RETRIEVE_DIMENSION_CQL)
+
+ if not rows:
+ return
+
+ for row in rows:
+ key = self._get_dimnesion_key(row.region, row.tenant_id, row.name, row.value)
+ self._dimension_cache[key] = key
+
+ LOG.info("loaded %s dimension entries cache from database into cache." % self._dimension_cache.currsize)
+
+ @staticmethod
+ def _get_dimnesion_key(region, tenant_id, name, value):
+ return '%s\0%s\0%s\0%s' % (region, tenant_id, name, value)
+
+ def _load_metric_dimension_cache(self):
+ qm = token_range_query_manager.TokenRangeQueryManager(RETRIEVE_METRIC_DIMENSION_CQL,
+ self._process_metric_dimension_query)
+
+ token_ring = self._cluster.metadata.token_map.ring
+
+ qm.query(token_ring)
+
+ def _process_metric_dimension_query(self, rows):
+
+ cnt = 0
+ for row in rows:
+ key = self._get_metric_dimnesion_key(row.region, row.tenant_id, row.metric_name, row.dimension_name,
+ row.dimension_value)
+ self._metric_dimension_cache[key] = key
+ cnt += 1
+
+ LOG.info("loaded %s metric dimension entries from database into cache." % cnt)
+ LOG.info(
+ "total loaded %s metric dimension entries in cache." % self._metric_dimension_cache.currsize)
+
+ @staticmethod
+ def _get_metric_dimnesion_key(region, tenant_id, metric_name, dimension_name, dimension_value):
+
+ return '%s\0%s\0%s\0%s\0%s' % (region, tenant_id, metric_name, dimension_name, dimension_value)
diff --git a/monasca_persister/repositories/cassandra/retry_policy.py b/monasca_persister/repositories/cassandra/retry_policy.py
new file mode 100644
index 00000000..163337d6
--- /dev/null
+++ b/monasca_persister/repositories/cassandra/retry_policy.py
@@ -0,0 +1,49 @@
+# (C) Copyright 2017 SUSE LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from cassandra.policies import RetryPolicy
+
+
+class MonascaRetryPolicy(RetryPolicy):
+
+ def __init__(self, read_attempts, write_attempts, unavailable_attempts):
+
+ super(MonascaRetryPolicy, self).__init__()
+
+ self.read_attempts = read_attempts
+ self.write_attempts = write_attempts
+ self.unavailable_attempts = unavailable_attempts
+
+ def on_read_timeout(self, query, consistency, required_responses,
+ received_responses, data_retrieved, retry_num):
+
+ if retry_num >= self.read_attempts:
+ return self.RETHROW, None
+ elif received_responses >= required_responses and not data_retrieved:
+ return self.RETRY, consistency
+ else:
+ return self.RETHROW, None
+
+ def on_write_timeout(self, query, consistency, write_type,
+ required_responses, received_responses, retry_num):
+
+ if retry_num >= self.write_attempts:
+ return self.RETHROW, None
+ else:
+ return self.RETRY, consistency
+
+ def on_unavailable(self, query, consistency, required_replicas, alive_replicas, retry_num):
+
+ return (self.RETRY_NEXT_HOST, consistency) if retry_num < self.unavailable_attempts else (self.RETHROW, None)
diff --git a/monasca_persister/repositories/cassandra/token_range_query_manager.py b/monasca_persister/repositories/cassandra/token_range_query_manager.py
new file mode 100644
index 00000000..e90a47fa
--- /dev/null
+++ b/monasca_persister/repositories/cassandra/token_range_query_manager.py
@@ -0,0 +1,67 @@
+# (C) Copyright 2017 SUSE LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import multiprocessing
+
+from oslo_config import cfg
+from oslo_log import log
+
+from monasca_persister.repositories.cassandra import connection_util
+
+
+LOG = log.getLogger(__name__)
+
+conf = cfg.CONF
+
+
+class TokenRangeQueryManager(object):
+ def __init__(self, cql, result_handler, process_count=None):
+ if process_count:
+ self._process_count = process_count
+ else:
+ self._process_count = multiprocessing.cpu_count()
+
+ self._pool = multiprocessing.Pool(processes=self._process_count, initializer=self._setup,
+ initargs=(cql, result_handler,))
+
+ @classmethod
+ def _setup(cls, cql, result_handler):
+ cls.cluster = connection_util.create_cluster()
+ cls.session = connection_util.create_session(cls.cluster)
+ cls.prepared = cls.session.prepare(cql)
+ cls.result_handler = result_handler
+
+ def close_pool(self):
+ self._pool.close()
+ self._pool.join()
+
+ def query(self, token_ring):
+
+ range_size = len(token_ring) / self._process_count + 1
+ start_index = 0
+ params = []
+ while start_index < len(token_ring):
+ end_index = start_index + range_size - 1
+ if end_index >= len(token_ring):
+ end_index = len(token_ring) - 1
+ params.append((token_ring[start_index].value, token_ring[end_index].value))
+ start_index = end_index + 1
+
+ self._pool.map(execute_query_token_range, params, 1)
+
+
+def execute_query_token_range(token_range):
+ results = TokenRangeQueryManager.session.execute(TokenRangeQueryManager.prepared.bind(token_range))
+ TokenRangeQueryManager.result_handler(results)
diff --git a/monasca_persister/repositories/persister.py b/monasca_persister/repositories/persister.py
index d1a49751..51c21523 100644
--- a/monasca_persister/repositories/persister.py
+++ b/monasca_persister/repositories/persister.py
@@ -1,4 +1,5 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
+# (C) Copyright 2017 SUSE LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -29,7 +30,7 @@ class Persister(object):
self._kafka_topic = kafka_conf.topic
- self._database_batch_size = kafka_conf.database_batch_size
+ self._batch_size = kafka_conf.batch_size
self._consumer = consumer.KafkaConsumer(
kafka_conf.uri,
@@ -71,7 +72,7 @@ class Persister(object):
LOG.exception('Error processing message. Message is '
'being dropped. {}'.format(message))
- if len(self._data_points) >= self._database_batch_size:
+ if len(self._data_points) >= self._batch_size:
self._flush()
except Exception:
LOG.exception(