Fail fast

Change-Id: I9e3fb53de7e28fa083d91afd2bf62ed4e39238b4
This commit is contained in:
Deklan Dieterly 2015-04-29 09:15:38 -06:00
parent 3f5597c9d3
commit b829a972ce
17 changed files with 222 additions and 103 deletions

View File

@ -42,7 +42,7 @@ import monasca.persister.consumer.KafkaConsumerRunnableBasicFactory;
import monasca.persister.healthcheck.SimpleHealthCheck;
import monasca.persister.pipeline.ManagedPipeline;
import monasca.persister.pipeline.ManagedPipelineFactory;
import monasca.persister.pipeline.event.AlarmStateTransitionedEventHandlerFactory;
import monasca.persister.pipeline.event.AlarmStateTransitionHandlerFactory;
import monasca.persister.pipeline.event.MetricHandlerFactory;
import monasca.persister.resource.Resource;
@ -203,8 +203,8 @@ public class PersisterApplication extends Application<PersisterConfig> {
int batchSize = configuration.getAlarmHistoryConfiguration().getBatchSize();
logger.debug("Batch size for each AlarmStateHistoryPipeline [{}]", batchSize);
AlarmStateTransitionedEventHandlerFactory alarmHistoryEventHandlerFactory =
injector.getInstance(AlarmStateTransitionedEventHandlerFactory.class);
AlarmStateTransitionHandlerFactory alarmHistoryEventHandlerFactory =
injector.getInstance(AlarmStateTransitionHandlerFactory.class);
ManagedPipelineFactory<AlarmStateTransitionedEvent> alarmStateTransitionPipelineFactory =
injector.getInstance(new Key<ManagedPipelineFactory<AlarmStateTransitionedEvent>>(){});

View File

@ -41,8 +41,8 @@ import monasca.persister.consumer.KafkaConsumerRunnableBasicFactory;
import monasca.persister.dbi.DBIProvider;
import monasca.persister.pipeline.ManagedPipeline;
import monasca.persister.pipeline.ManagedPipelineFactory;
import monasca.persister.pipeline.event.AlarmStateTransitionedEventHandler;
import monasca.persister.pipeline.event.AlarmStateTransitionedEventHandlerFactory;
import monasca.persister.pipeline.event.AlarmStateTransitionHandler;
import monasca.persister.pipeline.event.AlarmStateTransitionHandlerFactory;
import monasca.persister.pipeline.event.MetricHandler;
import monasca.persister.pipeline.event.MetricHandlerFactory;
import monasca.persister.repository.Repo;
@ -81,9 +81,9 @@ public class PersisterModule extends AbstractModule {
install(
new FactoryModuleBuilder().implement(
AlarmStateTransitionedEventHandler.class,
AlarmStateTransitionedEventHandler.class)
.build(AlarmStateTransitionedEventHandlerFactory.class));
AlarmStateTransitionHandler.class,
AlarmStateTransitionHandler.class)
.build(AlarmStateTransitionHandlerFactory.class));
install(
new FactoryModuleBuilder().implement(

View File

@ -17,6 +17,7 @@
package monasca.persister.consumer;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
@ -25,6 +26,7 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
public class KafkaConsumer<T> {
@ -52,9 +54,14 @@ public class KafkaConsumer<T> {
logger.info("[{}]: start", this.threadId);
executorService = Executors.newFixedThreadPool(1);
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(threadId + "-%d")
.setDaemon(true)
.build();
executorService.submit(kafkaConsumerRunnableBasic);
executorService = Executors.newSingleThreadExecutor(threadFactory);
executorService.submit(kafkaConsumerRunnableBasic.setExecutorService(executorService));
}

View File

@ -17,14 +17,18 @@
package monasca.persister.consumer;
import monasca.persister.pipeline.ManagedPipeline;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import org.apache.log4j.LogManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import kafka.consumer.ConsumerIterator;
import monasca.persister.pipeline.ManagedPipeline;
public class KafkaConsumerRunnableBasic<T> implements Runnable {
@ -35,6 +39,7 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
private final ManagedPipeline<T> pipeline;
private volatile boolean stop = false;
private ExecutorService executorService;
@Inject
public KafkaConsumerRunnableBasic(
@ -47,6 +52,14 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
this.threadId = threadId;
}
public KafkaConsumerRunnableBasic<T> setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
return this;
}
protected void publishHeartbeat() {
publishEvent(null);
@ -67,9 +80,17 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
this.stop = true;
if (pipeline.shutdown()) {
try {
markRead();
if (pipeline.shutdown()) {
markRead();
}
} catch (Exception e) {
logger.error("caught fatal exception while shutting down", e);
}
}
@ -82,37 +103,61 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
logger.debug("[{}]: KafkaChannel has stream iterator", this.threadId);
while (!this.stop) {
while (!this.stop) {
try {
try {
if (it.hasNext()) {
if (it.hasNext()) {
final String msg = new String(it.next().message());
final String msg = new String(it.next().message());
logger.debug("[{}]: {}", this.threadId, msg);
logger.debug("[{}]: {}", this.threadId, msg);
publishEvent(msg);
publishEvent(msg);
}
} catch (kafka.consumer.ConsumerTimeoutException cte) {
publishHeartbeat();
}
} catch (kafka.consumer.ConsumerTimeoutException cte) {
if (Thread.currentThread().isInterrupted()) {
publishHeartbeat();
logger.debug("[{}]: is interrupted. breaking out of run loop", this.threadId);
break;
}
}
logger.info("[{}]: shutting down", this.threadId);
this.kafkaChannel.stop();
}
logger.info("[{}]: shutting down", this.threadId);
this.kafkaChannel.stop();
}
protected void publishEvent(final String msg) {
if (pipeline.publishEvent(msg)) {
try {
markRead();
if (pipeline.publishEvent(msg)) {
markRead();
}
} catch (Exception e) {
logger.error("caught fatal exception while publishing msg. Shutting entire persister down now!");
this.executorService.shutdownNow();
LogManager.shutdown();
System.exit(-1);
}
}

View File

@ -21,6 +21,7 @@ import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import monasca.persister.pipeline.event.FlushableHandler;
import monasca.persister.repository.RepoException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -42,7 +43,7 @@ public class ManagedPipeline<T> {
}
public boolean shutdown() {
public boolean shutdown() throws RepoException {
logger.info("[{}]: shutdown", this.threadId);
@ -52,26 +53,31 @@ public class ManagedPipeline<T> {
return msgFlushCnt > 0 ? true : false;
} catch (Exception e) {
} catch (RepoException e) {
logger.error("[{}}: failed to flush repo on shutdown", this.threadId, e);
logger.error(
"[{}]: pipeline broken. repo unavailable. check that database is running. shutting pipeline down now!",
this.threadId);
throw e;
return false;
}
}
public boolean publishEvent(String msg) {
public boolean publishEvent(String msg) throws RepoException {
try {
return this.handler.onEvent(msg);
} catch (Exception e) {
} catch (RepoException e) {
logger.error("[{}]: failed to handle msg: {}", this.threadId, msg, e);
logger.error("[{}]: pipeline broken. repo unavailable. check that database is running. shutting pipeline down now!", this.threadId);
return false;
throw e;
}
}

View File

@ -28,29 +28,29 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
import io.dropwizard.setup.Environment;
import monasca.persister.repository.Repo;
import monasca.persister.repository.RepoException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class AlarmStateTransitionedEventHandler extends
public class AlarmStateTransitionHandler extends
FlushableHandler<AlarmStateTransitionedEvent> {
private static final Logger logger =
LoggerFactory.getLogger(AlarmStateTransitionedEventHandler.class);
LoggerFactory.getLogger(AlarmStateTransitionHandler.class);
private final Repo<AlarmStateTransitionedEvent> alarmRepo;
private final Counter alarmStateTransitionCounter;
@Inject
public AlarmStateTransitionedEventHandler(
Repo<AlarmStateTransitionedEvent> alarmRepo,
Environment environment,
@Assisted PipelineConfig configuration,
@Assisted("threadId") String threadId,
@Assisted("batchSize") int batchSize) {
public AlarmStateTransitionHandler(Repo<AlarmStateTransitionedEvent> alarmRepo,
Environment environment,
@Assisted PipelineConfig configuration,
@Assisted("threadId") String threadId,
@Assisted("batchSize") int batchSize) {
super(configuration, environment, threadId, batchSize);
@ -104,7 +104,7 @@ public class AlarmStateTransitionedEventHandler extends
}
@Override
protected int flushRepository() throws Exception {
protected int flushRepository() throws RepoException {
return this.alarmRepo.flush(this.threadId);

View File

@ -21,9 +21,9 @@ import monasca.persister.configuration.PipelineConfig;
import com.google.inject.assistedinject.Assisted;
public interface AlarmStateTransitionedEventHandlerFactory {
public interface AlarmStateTransitionHandlerFactory {
AlarmStateTransitionedEventHandler create(
AlarmStateTransitionHandler create(
PipelineConfig configuration,
@Assisted("threadId") String threadId,
@Assisted("batchSize") int batchSize);

View File

@ -24,6 +24,7 @@ import com.codahale.metrics.Timer;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dropwizard.setup.Environment;
import monasca.persister.repository.RepoException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -85,11 +86,11 @@ public abstract class FlushableHandler<T> {
protected abstract void initObjectMapper();
protected abstract int flushRepository() throws Exception;
protected abstract int flushRepository() throws RepoException;
protected abstract int process(String msg);
public boolean onEvent(final String msg) throws Exception {
public boolean onEvent(final String msg) throws RepoException {
if (msg == null) {
@ -174,7 +175,7 @@ public abstract class FlushableHandler<T> {
}
}
public int flush() throws Exception {
public int flush() throws RepoException {
logger.debug("[{}]: flushing", this.threadId);

View File

@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import io.dropwizard.setup.Environment;
import monasca.persister.repository.RepoException;
public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
@ -114,7 +115,7 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
}
@Override
public int flushRepository() throws Exception {
public int flushRepository() throws RepoException {
return this.metricRepo.flush(this.threadId);
}

View File

@ -20,6 +20,6 @@ public interface Repo<T> {
void addToBatch(final T msg, String id);
int flush(String id) throws Exception;
int flush(String id) throws RepoException;
}

View File

@ -0,0 +1,30 @@
package monasca.persister.repository;
public class RepoException extends Exception {
public RepoException() {
super();
}
public RepoException(String message) {
super(message);
}
public RepoException(String message, Throwable cause) {
super(message, cause);
}
public RepoException(Throwable cause) {
super(cause);
}
protected RepoException(String message, Throwable cause, boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
import io.dropwizard.setup.Environment;
import monasca.persister.repository.Repo;
import monasca.persister.repository.RepoException;
public abstract class InfluxRepo<T> implements Repo<T> {
@ -38,7 +39,7 @@ public abstract class InfluxRepo<T> implements Repo<T> {
}
@Override
public int flush(String id) throws Exception {
public int flush(String id) throws RepoException {
if (isBufferEmpty()) {
@ -55,7 +56,7 @@ public abstract class InfluxRepo<T> implements Repo<T> {
}
}
private int writeToRepo(String id) throws Exception {
private int writeToRepo(String id) throws RepoException {
try {
@ -87,7 +88,7 @@ public abstract class InfluxRepo<T> implements Repo<T> {
protected abstract boolean isBufferEmpty();
protected abstract int write(String id) throws Exception;
protected abstract int write(String id) throws RepoException;
protected abstract void clearBuffers();
}

View File

@ -38,6 +38,7 @@ import java.util.List;
import java.util.Map;
import io.dropwizard.setup.Environment;
import monasca.persister.repository.RepoException;
public class InfluxV9AlarmRepo extends InfluxAlarmRepo {
@ -63,13 +64,13 @@ public class InfluxV9AlarmRepo extends InfluxAlarmRepo {
}
@Override
protected int write(String id) throws Exception {
protected int write(String id) throws RepoException {
return this.influxV9RepoWriter.write(getInfluxPointArry(id), id);
}
private InfluxPoint[] getInfluxPointArry(String id) throws Exception {
private InfluxPoint[] getInfluxPointArry(String id) {
List<InfluxPoint> influxPointList = new LinkedList<>();

View File

@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import io.dropwizard.setup.Environment;
import monasca.persister.repository.RepoException;
public class InfluxV9MetricRepo extends InfluxMetricRepo {
@ -43,13 +44,13 @@ public class InfluxV9MetricRepo extends InfluxMetricRepo {
}
@Override
protected int write(String id) throws Exception {
protected int write(String id) throws RepoException {
return this.influxV9RepoWriter.write(getInfluxPointArry(), id);
}
private InfluxPoint[] getInfluxPointArry() throws Exception {
private InfluxPoint[] getInfluxPointArry() {
List<InfluxPoint> influxPointList = new LinkedList<>();

View File

@ -18,9 +18,11 @@
package monasca.persister.repository.influxdb;
import monasca.persister.configuration.PersisterConfig;
import monasca.persister.repository.RepoException;
import com.google.inject.Inject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.codec.binary.Base64;
@ -123,7 +125,7 @@ public class InfluxV9RepoWriter {
}
}
protected int write(final InfluxPoint[] influxPointArry, String id) throws Exception {
protected int write(final InfluxPoint[] influxPointArry, String id) throws RepoException {
HttpPost request = new HttpPost(this.influxUrl);
@ -135,7 +137,7 @@ public class InfluxV9RepoWriter {
new InfluxWrite(this.influxName, this.influxRetentionPolicy, influxPointArry,
new HashMap<String, String>());
String json = this.objectMapper.writeValueAsString(influxWrite);
String jsonBody = getJsonBody(influxWrite);
if (this.gzip) {
@ -145,7 +147,7 @@ public class InfluxV9RepoWriter {
requestEntity =
EntityBuilder
.create()
.setText(json)
.setText(jsonBody)
.setContentType(ContentType.APPLICATION_JSON)
.setContentEncoding("UTF-8")
.gzipCompress()
@ -159,7 +161,7 @@ public class InfluxV9RepoWriter {
logger.debug("[{}]: gzip set to false. sending non-gzip msg", id);
StringEntity stringEntity = new StringEntity(json, "UTF-8");
StringEntity stringEntity = new StringEntity(jsonBody, "UTF-8");
request.setEntity(stringEntity);
@ -170,34 +172,71 @@ public class InfluxV9RepoWriter {
logger.debug("[{}]: sending {} points to influxdb {} at {}", id,
influxPointArry.length, this.influxName, this.influxUrl);
HttpResponse response = this.httpClient.execute(request);
HttpResponse response = null;
try {
response = this.httpClient.execute(request);
} catch (IOException e) {
throw new RepoException("failed to execute http request", e);
}
int rc = response.getStatusLine().getStatusCode();
if (rc != HttpStatus.SC_OK) {
HttpEntity responseEntity = response.getEntity();
String responseString = EntityUtils.toString(responseEntity, "UTF-8");
logger.error("[{}]: failed to send data to influxdb {} at {}: {}", id,
this.influxName, this.influxUrl, String.valueOf(rc));
HttpEntity responseEntity = response.getEntity();
String responseString = null;
try {
responseString = EntityUtils.toString(responseEntity, "UTF-8");
} catch (IOException e) {
throw new RepoException("failed to read http response for non ok return code " + rc, e);
}
logger.error("[{}]: http response: {}", id, responseString);
throw new Exception(rc + ":" + responseString);
throw new RepoException("failed to execute http request to influxdb " + rc + " - " + responseString);
} else {
logger.debug("[{}]: successfully sent {} points to influxdb {} at {}", id,
influxPointArry.length, this.influxName, this.influxUrl);
return influxPointArry.length;
}
logger
.debug("[{}]: successfully sent {} points to influxdb {} at {}", id,
influxPointArry.length, this.influxName, this.influxUrl);
return influxPointArry.length;
} finally {
request.releaseConnection();
}
}
private String getJsonBody(InfluxWrite influxWrite) throws RepoException {
String json = null;
try {
json = this.objectMapper.writeValueAsString(influxWrite);
} catch (JsonProcessingException e) {
throw new RepoException("failed to serialize json", e);
}
return json;
}
}

View File

@ -39,6 +39,7 @@ import java.util.TimeZone;
import javax.inject.Inject;
import io.dropwizard.setup.Environment;
import monasca.persister.repository.RepoException;
public class VerticaAlarmRepo extends VerticaRepo implements Repo<AlarmStateTransitionedEvent> {
@ -105,6 +106,8 @@ public class VerticaAlarmRepo extends VerticaRepo implements Repo<AlarmStateTran
.bind("reason", message.stateChangeReason)
.bind("reason_data", "{}")
.bind("time_stamp", timeStamp);
this.msgCnt++;
}
private String getSerializedString(Object o, String id) {
@ -122,30 +125,23 @@ public class VerticaAlarmRepo extends VerticaRepo implements Repo<AlarmStateTran
}
}
public int flush(String id) {
public int flush(String id) throws RepoException {
try {
commitBatch(id);
int flushCnt = msgCnt;
int commitCnt = this.msgCnt;
this.msgCnt = 0;
return flushCnt;
return commitCnt;
} catch (Exception e) {
logger.error("[{}]: failed to write alarms to database", id, e);
logger.error("[{}]: failed to write alarms to vertica", id, e);
if (handle.isInTransaction()) {
handle.rollback();
}
handle.begin();
return this.msgCnt = 0;
throw new RepoException("failed to commit batch to vertica", e);
}
}

View File

@ -48,6 +48,7 @@ import monasca.common.model.metric.Metric;
import monasca.common.model.metric.MetricEnvelope;
import monasca.persister.configuration.PersisterConfig;
import monasca.persister.repository.Repo;
import monasca.persister.repository.RepoException;
public class VerticaMetricRepo extends VerticaRepo implements Repo<MetricEnvelope> {
@ -246,11 +247,11 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo<MetricEnvelop
public void addToBatch(MetricEnvelope metricEnvelope, String id) {
Metric metric = metricEnvelope.metric;
Map<String, Object> meta = metricEnvelope.meta;
Map<String, Object> metaMap = metricEnvelope.meta;
String tenantId = getMeta(TENANT_ID, metric, meta, id);
String tenantId = getMeta(TENANT_ID, metric, metaMap, id);
String region = getMeta(REGION, metric, meta, id);
String region = getMeta(REGION, metric, metaMap, id);
// Add the definition to the batch.
StringBuilder definitionIdStringToHash =
@ -298,8 +299,8 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo<MetricEnvelop
Sha1HashId definitionDimensionsSha1HashId = new Sha1HashId(definitionDimensionsIdSha1Hash);
this.addDefinitionDimensionToBatch(definitionDimensionsSha1HashId, definitionSha1HashId,
dimensionsSha1HashId, id);
addDefinitionDimensionToBatch(definitionDimensionsSha1HashId, definitionSha1HashId,
dimensionsSha1HashId, id);
// Add the measurement to the batch.
String timeStamp = simpleDateFormat.format(new Date(metric.getTimestamp()));
@ -459,7 +460,7 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo<MetricEnvelop
}
@Override
public int flush(String id) {
public int flush(String id) throws RepoException {
try {
@ -492,20 +493,10 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo<MetricEnvelop
} catch (Exception e) {
logger.error("[{}]: failed to write measurements, definitions, or dimensions to vertica",
id, e);
logger.error("[{}]: failed to write measurements, definitions, and dimensions to vertica", id,
e);
if (handle.isInTransaction()) {
handle.rollback();
}
clearTempCaches();
handle.begin();
return this.measurementCnt = 0;
throw new RepoException("failed to commit batch to vertica", e);
}
}