diff --git a/monasca_persister/persister.py b/monasca_persister/persister.py index 60b485e7..04a5662d 100644 --- a/monasca_persister/persister.py +++ b/monasca_persister/persister.py @@ -212,7 +212,8 @@ class AbstractPersister(threading.Thread): self._database_batch_size = kafka_conf.database_batch_size self._kafka_topic = kafka_conf.topic - self._json_body = [] + self._message_count = 0 + self._data_points = {} self._last_flush = datetime.now() self._last_partition_check = datetime.now() @@ -222,12 +223,19 @@ class AbstractPersister(threading.Thread): def _flush(self, partitions): - if self._json_body: - self._influxdb_client.write_points(self._json_body) + if self._data_points: + try: + self._influxdb_client.write_points(self._data_points.values()) + except Exception: + log.exception("Error writing to influxdb: {}" + .format(self._data_points.values())) + raise + self._consumer.commit(partitions=partitions) LOG.info("Processed {} messages from topic '{}'".format( - len(self._json_body), self._kafka_topic)) - self._json_body = [] + self._message_count, self._kafka_topic)) + self._data_points = {} + self._message_count = 0 self._last_flush = datetime.now() def _is_time_for_repartition_check(self): @@ -238,7 +246,6 @@ class AbstractPersister(threading.Thread): self._partition_interval_recheck_secs) def _process_messages(self, partitions): - while 1: if self._is_time_for_repartition_check(): @@ -252,7 +259,17 @@ class AbstractPersister(threading.Thread): try: - self._json_body.append(self.process_message(message)) + data_point = self.process_message(message) + + key = data_point['name'] + + if key in self._data_points: + points = data_point['points'] + self._data_points[key]['points'].extend(points) + else: + self._data_points[key] = data_point + + self._message_count += 1 if self._is_time_for_repartition_check(): return @@ -261,7 +278,7 @@ class AbstractPersister(threading.Thread): LOG.exception('Error processing message. Message is ' 'being dropped. {}'.format(message)) - if len(self._json_body) >= self._database_batch_size: + if self._message_count >= self._database_batch_size: self._flush(partitions) def _get_set_partitioner(self): diff --git a/openstack/__init__.pyc b/openstack/__init__.pyc deleted file mode 100644 index 7fbb220b..00000000 Binary files a/openstack/__init__.pyc and /dev/null differ diff --git a/openstack/common/__init__.pyc b/openstack/common/__init__.pyc deleted file mode 100644 index 64028bb9..00000000 Binary files a/openstack/common/__init__.pyc and /dev/null differ diff --git a/openstack/common/gettextutils.pyc b/openstack/common/gettextutils.pyc deleted file mode 100644 index e58b4657..00000000 Binary files a/openstack/common/gettextutils.pyc and /dev/null differ diff --git a/openstack/common/importutils.pyc b/openstack/common/importutils.pyc deleted file mode 100644 index c7b43e89..00000000 Binary files a/openstack/common/importutils.pyc and /dev/null differ diff --git a/openstack/common/jsonutils.pyc b/openstack/common/jsonutils.pyc deleted file mode 100644 index 032023b4..00000000 Binary files a/openstack/common/jsonutils.pyc and /dev/null differ diff --git a/openstack/common/local.pyc b/openstack/common/local.pyc deleted file mode 100644 index 2f9d7e77..00000000 Binary files a/openstack/common/local.pyc and /dev/null differ diff --git a/openstack/common/log.pyc b/openstack/common/log.pyc deleted file mode 100644 index d82e4958..00000000 Binary files a/openstack/common/log.pyc and /dev/null differ diff --git a/openstack/common/strutils.pyc b/openstack/common/strutils.pyc deleted file mode 100644 index f2c8b373..00000000 Binary files a/openstack/common/strutils.pyc and /dev/null differ diff --git a/openstack/common/timeutils.pyc b/openstack/common/timeutils.pyc deleted file mode 100644 index 8ef1e167..00000000 Binary files a/openstack/common/timeutils.pyc and /dev/null differ