Fixed a few bugs, added more logging.
Fixed timestamp bug, and streamstate issue missed in unittests. Added more logging for pipeline manager.
This commit is contained in:
parent
c2aa498beb
commit
a6f84d1603
@ -362,7 +362,7 @@ class Stream(ProxiedDictMixin, Base):
|
||||
self.fire_timestamp = fire_timestamp
|
||||
if state is None:
|
||||
state = StreamState.active
|
||||
self.state = state
|
||||
self.state = int(state)
|
||||
if state_serial_no is None:
|
||||
state_serial_no = 0
|
||||
self.state_serial_no = state_serial_no
|
||||
|
@ -80,7 +80,7 @@ class LoggingHandler(PipelineHandlerBase):
|
||||
def handle_events(self, events, env):
|
||||
emsg = ', '.join("%s: %s" % (event['event_type'], event['message_id'])
|
||||
for event in events)
|
||||
logger.info("Received %s events: \n%s" % (len(events)), emsg)
|
||||
logger.info("Received %s events: \n%s" % (len(events), emsg))
|
||||
return events
|
||||
|
||||
def commit(self):
|
||||
|
@ -217,6 +217,7 @@ class PipelineManager(object):
|
||||
except LockError:
|
||||
logger.debug("Stream %s locked. Moving on..." % stream.id)
|
||||
return False
|
||||
logger.debug("Firing Stream %s." % stream.id)
|
||||
trigger_def = self.trigger_map.get(stream.name)
|
||||
if trigger_def is None:
|
||||
logger.error("Stream %s has unknown trigger definition %s" % (
|
||||
@ -246,6 +247,7 @@ class PipelineManager(object):
|
||||
except LockError:
|
||||
logger.debug("Stream %s locked. Moving on..." % stream.id)
|
||||
return False
|
||||
logger.debug("Expiring Stream %s." % stream.id)
|
||||
trigger_def = self.trigger_map.get(stream.name)
|
||||
if trigger_def is None:
|
||||
logger.error("Stream %s has unknown trigger definition %s" % (
|
||||
|
@ -24,6 +24,8 @@ class EventCondenser(condenser.CondenserBase):
|
||||
self.timestamp = None
|
||||
|
||||
def add_trait(self, name, trait_type, value):
|
||||
if isinstance(value, datetime.datetime):
|
||||
value = self._fix_time(value)
|
||||
self.traits[name] = value
|
||||
|
||||
def add_envelope_info(self, event_type, message_id, when):
|
||||
@ -34,10 +36,20 @@ class EventCondenser(condenser.CondenserBase):
|
||||
def get_event(self):
|
||||
event = self.traits.copy()
|
||||
event['message_id'] = self.message_id
|
||||
event['timestamp'] = self.timestamp
|
||||
event['timestamp'] = self._fix_time(self.timestamp)
|
||||
event['event_type'] = self.event_type
|
||||
return event
|
||||
|
||||
def _fix_time(self, dt):
|
||||
"""Stackdistiller converts all times to utc.
|
||||
We store timestamps as utc datetime. However, the explicit
|
||||
UTC timezone on incoming datetimes causes comparison issues
|
||||
deep in sqlalchemy. We fix this by converting all datetimes
|
||||
to naive utc timestamps"""
|
||||
if dt.tzinfo is not None:
|
||||
dt = dt.replace(tzinfo=None)
|
||||
return dt
|
||||
|
||||
def validate(self):
|
||||
if self.event_type is None:
|
||||
return False
|
||||
|
Loading…
Reference in New Issue
Block a user