Add Event methods to db api.

Start of adding the ability to store raw events in Ceilometer.
Query Filter mechanism is crude for now. Need to find a more expressive
grammar than this EventFilter thing.

Change-Id: I3c64e3858df756d93dd5bf3c6c9651be25cd2b55
Blueprint: add-event-table
This commit is contained in:
Sandy Walsh 2013-04-29 17:45:32 -03:00
parent a031f7b779
commit 2b15389317
13 changed files with 626 additions and 16 deletions

View File

@ -19,14 +19,13 @@
"""
import datetime
import urlparse
from oslo.config import cfg
from stevedore import driver
from ceilometer.openstack.common import log
from ceilometer.openstack.common import timeutils
from ceilometer import utils
LOG = log.getLogger(__name__)
@ -85,17 +84,31 @@ class SampleFilter(object):
resource=None, meter=None, source=None, metaquery={}):
self.user = user
self.project = project
self.start = self._sanitize_timestamp(start)
self.end = self._sanitize_timestamp(end)
self.start = utils.sanitize_timestamp(start)
self.end = utils.sanitize_timestamp(end)
self.resource = resource
self.meter = meter
self.source = source
self.metaquery = metaquery
def _sanitize_timestamp(self, timestamp):
"""Return a naive utc datetime object."""
if not timestamp:
return timestamp
if not isinstance(timestamp, datetime.datetime):
timestamp = timeutils.parse_isotime(timestamp)
return timeutils.normalize_time(timestamp)
class EventFilter(object):
"""Properties for building an Event query.
:param start: UTC start datetime (mandatory)
:param end: UTC end datetime (mandatory)
:param event_name: the name of the event. None for all.
:param traits: the trait filter dict, all of which are optional
{'key': <key>,
't_string': <value>,
't_int': <value>,
't_datetime': <value>
't_float': <value>}
currently, only one trait dict is supported.
"""
def __init__(self, start, end, event_name=None, traits={}):
self.start = utils.sanitize_timestamp(start)
self.end = utils.sanitize_timestamp(end)
self.event_name = event_name
self.traits = traits

View File

@ -156,3 +156,15 @@ class Connection(object):
@abc.abstractmethod
def clear(self):
"""Clear database."""
@abc.abstractmethod
def record_events(self, events):
"""Write the events to the backend storage system.
:param events: a list of model.Event objects.
"""
@abc.abstractmethod
def get_events(self, event_filter):
"""Return an iterable of model.Event objects.
"""

View File

@ -508,6 +508,20 @@ class Connection(base.Connection):
"""
raise NotImplementedError('Alarms not implemented')
def record_events(self, events):
"""Write the events.
:param events: a list of model.Event objects.
"""
raise NotImplementedError('Events not implemented.')
def get_events(self, event_filter):
"""Return an iterable of model.Event objects.
:param event_filter: EventFilter instance
"""
raise NotImplementedError('Events not implemented.')
###############
# This is a very crude version of "in-memory HBase", which implements just

View File

@ -160,3 +160,17 @@ class Connection(base.Connection):
def delete_alarm(self, alarm_id):
"""Delete a alarm
"""
def record_events(self, events):
"""Write the events.
:param events: a list of model.Event objects.
"""
raise NotImplementedError('Events not implemented.')
def get_events(self, event_filter):
"""Return an iterable of model.Event objects.
:param event_filter: EventFilter instance
"""
raise NotImplementedError('Events not implemented.')

View File

@ -551,6 +551,20 @@ class Connection(base.Connection):
"""
self.db.alarm.remove({'alarm_id': alarm_id})
def record_events(self, events):
"""Write the events.
:param events: a list of model.Event objects.
"""
raise NotImplementedError('Events not implemented.')
def get_events(self, event_filter):
"""Return an iterable of model.Event objects.
:param event_filter: EventFilter instance
"""
raise NotImplementedError('Events not implemented.')
def require_map_reduce(conn):
"""Raises SkipTest if the connection is using mim.

View File

@ -19,10 +19,10 @@
from __future__ import absolute_import
import operator
import os
import uuid
from sqlalchemy import func
from sqlalchemy.orm import exc
from ceilometer.openstack.common import log
from ceilometer.openstack.common import timeutils
@ -32,6 +32,9 @@ from ceilometer.storage import models as api_models
from ceilometer.storage.sqlalchemy import migration
from ceilometer.storage.sqlalchemy.models import Meter, Project, Resource
from ceilometer.storage.sqlalchemy.models import Source, User, Base, Alarm
from ceilometer.storage.sqlalchemy.models import UniqueName, Event, Trait
from ceilometer import utils
LOG = log.getLogger(__name__)
@ -95,7 +98,7 @@ class SQLAlchemyStorage(base.StorageEngine):
def make_query_from_filter(query, sample_filter, require_meter=True):
"""Return a query dictionary based on the settings in the filter.
:param filter: QueryFilter instance
:param filter: SampleFilter instance
:param require_meter: If true and the filter does not have a meter,
raise an error.
"""
@ -488,3 +491,128 @@ class Connection(base.Connection):
"""
self.session.query(Alarm).filter(Alarm.id == alarm_id).delete()
self.session.flush()
def _get_unique(self, key):
return self.session.query(UniqueName)\
.filter(UniqueName.key == key).first()
def _get_or_create_unique_name(self, key):
"""Find the UniqueName entry for a given key, creating
one if necessary.
This may result in a flush.
"""
unique = self._get_unique(key)
if not unique:
unique = UniqueName(key=key)
self.session.add(unique)
self.session.flush()
return unique
def _make_trait(self, trait_model, event):
"""Make a new Trait from a Trait model.
Doesn't flush or add to session.
"""
name = self._get_or_create_unique_name(trait_model.name)
value_map = Trait._value_map
values = {'t_string': None, 't_float': None,
't_int': None, 't_datetime': None}
value = trait_model.value
if trait_model.dtype == api_models.Trait.DATETIME_TYPE:
value = utils.dt_to_decimal(value)
values[value_map[trait_model.dtype]] = value
return Trait(name, event, trait_model.dtype, **values)
def _record_event(self, event_model):
"""Store a single Event, including related Traits.
"""
unique = self._get_or_create_unique_name(event_model.event_name)
generated = utils.dt_to_decimal(event_model.generated)
event = Event(unique, generated)
self.session.add(event)
new_traits = []
if event_model.traits:
for trait in event_model.traits:
t = self._make_trait(trait, event)
self.session.add(t)
new_traits.append(t)
# Note: we don't flush here, explicitly (unless a new uniquename
# does it). Otherwise, just wait until all the Events are staged.
return (event, new_traits)
def record_events(self, event_models):
"""Write the events to SQL database via sqlalchemy.
:param event_models: a list of model.Event objects.
Flush when they're all added, unless new UniqueNames are
added along the way.
"""
events = [self._record_event(event_model)
for event_model in event_models]
self.session.flush()
# Update the models with the underlying DB ID.
for model, actual in zip(event_models, events):
actual_event, actual_traits = actual
model.id = actual_event.id
if model.traits and actual_traits:
for trait, actual_trait in zip(model.traits, actual_traits):
trait.id = actual_trait.id
def get_events(self, event_filter):
"""Return an iterable of model.Event objects.
:param event_filter: EventFilter instance
"""
start = utils.dt_to_decimal(event_filter.start)
end = utils.dt_to_decimal(event_filter.end)
sub_query = self.session.query(Event.id)\
.join(Trait, Trait.event_id == Event.id)\
.filter(Event.generated >= start, Event.generated <= end)
if event_filter.event_name:
event_name = self._get_unique(event_filter.event_name)
sub_query = sub_query.filter(Event.unique_name == event_name)
if event_filter.traits:
for key, value in event_filter.traits.iteritems():
if key == 'key':
key = self._get_unique(value)
sub_query = sub_query.filter(Trait.name == key)
elif key == 't_string':
sub_query = sub_query.filter(Trait.t_string == value)
elif key == 't_int':
sub_query = sub_query.filter(Trait.t_int == value)
elif key == 't_datetime':
dt = utils.dt_to_decimal(value)
sub_query = sub_query.filter(Trait.t_datetime == dt)
elif key == 't_float':
sub_query = sub_query.filter(Trait.t_datetime == value)
sub_query = sub_query.subquery()
all_data = self.session.query(Trait)\
.join(sub_query, Trait.event_id == sub_query.c.id)
# Now convert the sqlalchemy objects back into Models ...
event_models_dict = {}
for trait in all_data.all():
event = event_models_dict.get(trait.event_id)
if not event:
generated = utils.decimal_to_dt(trait.event.generated)
event = api_models.Event(trait.event.unique_name.key,
generated, [])
event_models_dict[trait.event_id] = event
value = trait.get_value()
trait_model = api_models.Trait(trait.name.key, trait.t_type, value)
event.append_trait(trait_model)
event_models = event_models_dict.values()
return sorted(event_models, key=operator.attrgetter('generated'))

View File

@ -43,6 +43,46 @@ class Model(object):
return self.as_dict() == other.as_dict()
class Event(Model):
"""A raw event from the source system. Events have Traits.
Metrics will be derived from one or more Events.
"""
def __init__(self, event_name, generated, traits):
"""
:param event_name: Name of the event.
:param generated: UTC time for when the event occured.
:param traits: list of Traits on this Event.
"""
Model.__init__(self, event_name=event_name, generated=generated,
traits=traits)
def append_trait(self, trait_model):
self.traits.append(trait_model)
def __repr__(self):
trait_list = [str(trait) for trait in self.traits]
return "<Event: %s, %s %s>" % \
(self.event_name, self.generated, " ".join(trait_list))
class Trait(Model):
"""A Trait is a key/value pair of data on an Event. The value is variant
record of basic data types (int, date, float, etc).
"""
TEXT_TYPE = 1
INT_TYPE = 2
FLOAT_TYPE = 3
DATETIME_TYPE = 4
def __init__(self, name, dtype, value):
Model.__init__(self, name=name, dtype=dtype, value=value)
def __repr__(self):
return "<Trait: %s %d %s>" % (self.name, self.dtype, self.value)
class Resource(Model):
"""Something for which sample data has been collected.
"""

View File

@ -0,0 +1,61 @@
# -*- encoding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import *
meta = MetaData()
unique_name = Table(
'unique_name', meta,
Column('id', Integer, primary_key=True),
Column('key', String(32), index=True)
)
event = Table(
'event', meta,
Column('id', Integer, primary_key=True),
Column('generated', Float(asdecimal=True), index=True),
Column('unique_name_id', Integer, ForeignKey('unique_name.id'))
)
trait = Table(
'trait', meta,
Column('id', Integer, primary_key=True),
Column('name_id', Integer, ForeignKey('unique_name.id')),
Column('t_type', Integer, index=True),
Column('t_string', String(32), nullable=True, default=None, index=True),
Column('t_float', Float, nullable=True, default=None, index=True),
Column('t_int', Integer, nullable=True, default=None, index=True),
Column('t_datetime', Float(asdecimal=True), nullable=True, default=None,
index=True),
Column('event_id', Integer, ForeignKey('event.id'))
)
tables = [unique_name, event, trait]
def upgrade(migrate_engine):
meta.bind = migrate_engine
for i in sorted(tables):
i.create()
def downgrade(migrate_engine):
meta.bind = migrate_engine
for i in sorted(tables, reverse=True):
i.drop()

View File

@ -25,10 +25,13 @@ from oslo.config import cfg
from sqlalchemy import Column, Integer, String, Table, ForeignKey, DateTime
from sqlalchemy import Float, Boolean, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import backref
from sqlalchemy.orm import relationship
from sqlalchemy.types import TypeDecorator, VARCHAR
from ceilometer.openstack.common import timeutils
from ceilometer.storage import models as api_models
from ceilometer import utils
sql_opts = [
cfg.StrOpt('mysql_engine',
@ -174,3 +177,84 @@ class Alarm(Base):
insufficient_data_actions = Column(JSONEncodedDict)
matching_metadata = Column(JSONEncodedDict)
class UniqueName(Base):
"""Key names should only be stored once.
"""
__tablename__ = 'unique_name'
id = Column(Integer, primary_key=True)
key = Column(String(32), index=True, unique=True)
def __init__(self, key):
self.key = key
def __repr__(self):
return "<UniqueName: %s>" % self.key
class Event(Base):
__tablename__ = 'event'
id = Column(Integer, primary_key=True)
generated = Column(Float(asdecimal=True), index=True)
unique_name_id = Column(Integer, ForeignKey('unique_name.id'))
unique_name = relationship("UniqueName", backref=backref('unique_name',
order_by=id))
def __init__(self, event, generated):
self.unique_name = event
self.generated = generated
def __repr__(self):
return "<Event %d('Event: %s, Generated: %s')>" % \
(self.id, self.unique_name, self.generated)
class Trait(Base):
__tablename__ = 'trait'
id = Column(Integer, primary_key=True)
name_id = Column(Integer, ForeignKey('unique_name.id'))
name = relationship("UniqueName", backref=backref('name', order_by=id))
t_type = Column(Integer, index=True)
t_string = Column(String(32), nullable=True, default=None, index=True)
t_float = Column(Float, nullable=True, default=None, index=True)
t_int = Column(Integer, nullable=True, default=None, index=True)
t_datetime = Column(Float(asdecimal=True), nullable=True, default=None,
index=True)
event_id = Column(Integer, ForeignKey('event.id'))
event = relationship("Event", backref=backref('event', order_by=id))
_value_map = {api_models.Trait.TEXT_TYPE: 't_string',
api_models.Trait.FLOAT_TYPE: 't_float',
api_models.Trait.INT_TYPE: 't_int',
api_models.Trait.DATETIME_TYPE: 't_datetime'}
def __init__(self, name, event, t_type, t_string=None, t_float=None,
t_int=None, t_datetime=None):
self.name = name
self.t_type = t_type
self.t_string = t_string
self.t_float = t_float
self.t_int = t_int
self.t_datetime = t_datetime
self.event = event
def get_value(self):
if self.t_type == api_models.Trait.INT_TYPE:
return self.t_int
if self.t_type == api_models.Trait.FLOAT_TYPE:
return self.t_float
if self.t_type == api_models.Trait.DATETIME_TYPE:
return utils.decimal_to_dt(self.t_datetime)
if self.t_type == api_models.Trait.TEXT_TYPE:
return self.t_string
return None
def __repr__(self):
return "<Trait(%s) %d=%s/%s/%s/%s on %s>" % (self.name, self.t_type,
self.t_string, self.t_float, self.t_int, self.t_datetime,
self.event)

View File

@ -1,9 +1,9 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Justin Santa Barbara
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -20,9 +20,13 @@
"""Utilities and helper functions."""
import calendar
import datetime
import decimal
import os
from ceilometer.openstack.common import timeutils
def read_cached_file(filename, cache_info, reload_func=None):
"""Read from a file if it has been modified.
@ -59,3 +63,35 @@ def recursive_keypairs(d):
value))
else:
yield name, value
def dt_to_decimal(utc):
"""Datetime to Decimal.
Some databases don't store microseconds in datetime
so we always store as Decimal unixtime.
"""
decimal.getcontext().prec = 30
return decimal.Decimal(str(calendar.timegm(utc.utctimetuple()))) + \
(decimal.Decimal(str(utc.microsecond)) /
decimal.Decimal("1000000.0"))
def decimal_to_dt(dec):
"""Return a datetime from Decimal unixtime format.
"""
if dec is None:
return None
integer = int(dec)
micro = (dec - decimal.Decimal(integer)) * decimal.Decimal(1000000)
daittyme = datetime.datetime.utcfromtimestamp(integer)
return daittyme.replace(microsecond=int(round(micro)))
def sanitize_timestamp(timestamp):
"""Return a naive utc datetime object."""
if not timestamp:
return timestamp
if not isinstance(timestamp, datetime.datetime):
timestamp = timeutils.parse_isotime(timestamp)
return timeutils.normalize_time(timestamp)

View File

@ -30,6 +30,7 @@ from ceilometer import counter
from ceilometer import storage
from ceilometer.tests import db as test_db
from ceilometer.storage import models
from ceilometer import utils
class DBTestBase(test_db.TestBase):
@ -666,3 +667,144 @@ class AlarmTest(DBTestBase):
self.assertEquals(len(survivors), 2)
for s in survivors:
self.assertNotEquals(victim.name, s.name)
class EventTestBase(test_db.TestBase):
"""Separate test base class because we don't want to
inherit all the Meter stuff.
"""
__metaclass__ = abc.ABCMeta
def setUp(self):
super(EventTestBase, self).setUp()
self.prepare_data()
def prepare_data(self):
# Add some data ...
pass
class EventTest(EventTestBase):
def test_save_events_no_traits(self):
now = datetime.datetime.utcnow()
m = [models.Event("Foo", now, None), models.Event("Zoo", now, [])]
self.conn.record_events(m)
for model in m:
self.assertTrue(model.id >= 0)
self.assertNotEqual(m[0].id, m[1].id)
def test_string_traits(self):
model = models.Trait("Foo", models.Trait.TEXT_TYPE, "my_text")
trait = self.conn._make_trait(model, None)
self.assertEquals(trait.t_type, models.Trait.TEXT_TYPE)
self.assertIsNone(trait.t_float)
self.assertIsNone(trait.t_int)
self.assertIsNone(trait.t_datetime)
self.assertEquals(trait.t_string, "my_text")
self.assertIsNotNone(trait.name)
def test_int_traits(self):
model = models.Trait("Foo", models.Trait.INT_TYPE, 100)
trait = self.conn._make_trait(model, None)
self.assertEquals(trait.t_type, models.Trait.INT_TYPE)
self.assertIsNone(trait.t_float)
self.assertIsNone(trait.t_string)
self.assertIsNone(trait.t_datetime)
self.assertEquals(trait.t_int, 100)
self.assertIsNotNone(trait.name)
def test_float_traits(self):
model = models.Trait("Foo", models.Trait.FLOAT_TYPE, 123.456)
trait = self.conn._make_trait(model, None)
self.assertEquals(trait.t_type, models.Trait.FLOAT_TYPE)
self.assertIsNone(trait.t_int)
self.assertIsNone(trait.t_string)
self.assertIsNone(trait.t_datetime)
self.assertEquals(trait.t_float, 123.456)
self.assertIsNotNone(trait.name)
def test_datetime_traits(self):
now = datetime.datetime.utcnow()
model = models.Trait("Foo", models.Trait.DATETIME_TYPE, now)
trait = self.conn._make_trait(model, None)
self.assertEquals(trait.t_type, models.Trait.DATETIME_TYPE)
self.assertIsNone(trait.t_int)
self.assertIsNone(trait.t_string)
self.assertIsNone(trait.t_float)
self.assertEquals(trait.t_datetime, utils.dt_to_decimal(now))
self.assertIsNotNone(trait.name)
def test_save_events_traits(self):
event_models = []
for event_name in ['Foo', 'Bar', 'Zoo']:
now = datetime.datetime.utcnow()
trait_models = \
[models.Trait(name, dtype, value)
for name, dtype, value in [
('trait_A', models.Trait.TEXT_TYPE, "my_text"),
('trait_B', models.Trait.INT_TYPE, 199),
('trait_C', models.Trait.FLOAT_TYPE, 1.23456),
('trait_D', models.Trait.DATETIME_TYPE, now)]]
event_models.append(
models.Event(event_name, now, trait_models))
self.conn.record_events(event_models)
for model in event_models:
for trait in model.traits:
self.assertTrue(trait.id >= 0)
class GetEventTest(EventTestBase):
def prepare_data(self):
event_models = []
base = 0
self.start = datetime.datetime(2013, 12, 31, 5, 0)
now = self.start
for event_name in ['Foo', 'Bar', 'Zoo']:
trait_models = \
[models.Trait(name, dtype, value)
for name, dtype, value in [
('trait_A', models.Trait.TEXT_TYPE,
"my_%s_text" % event_name),
('trait_B', models.Trait.INT_TYPE,
base + 1),
('trait_C', models.Trait.FLOAT_TYPE,
float(base) + 0.123456),
('trait_D', models.Trait.DATETIME_TYPE, now)]]
event_models.append(
models.Event(event_name, now, trait_models))
base += 100
now = now + datetime.timedelta(hours=1)
self.end = now
self.conn.record_events(event_models)
def test_simple_get(self):
event_filter = storage.EventFilter(self.start, self.end)
events = self.conn.get_events(event_filter)
self.assertEquals(3, len(events))
start_time = None
for i, name in enumerate(["Foo", "Bar", "Zoo"]):
self.assertEquals(events[i].event_name, name)
self.assertEquals(4, len(events[i].traits))
# Ensure sorted results ...
if start_time is not None:
# Python 2.6 has no assertLess :(
self.assertTrue(start_time < events[i].generated)
start_time = events[i].generated
def test_simple_get_event_name(self):
event_filter = storage.EventFilter(self.start, self.end, "Bar")
events = self.conn.get_events(event_filter)
self.assertEquals(1, len(events))
self.assertEquals(events[0].event_name, "Bar")
self.assertEquals(4, len(events[0].traits))
def test_get_event_trait_filter(self):
trait_filters = {'key': 'trait_B', 't_int': 101}
event_filter = storage.EventFilter(self.start, self.end,
traits=trait_filters)
events = self.conn.get_events(event_filter)
self.assertEquals(1, len(events))
self.assertEquals(events[0].event_name, "Bar")
self.assertEquals(4, len(events[0].traits))

View File

@ -25,9 +25,10 @@
from oslo.config import cfg
from tests.storage import base
from ceilometer.storage.sqlalchemy.models import table_args
from tests.storage import base
class SQLAlchemyEngineTestBase(base.DBTestBase):
database_connection = 'sqlite://'
@ -65,6 +66,37 @@ class AlarmTest(base.AlarmTest, SQLAlchemyEngineTestBase):
pass
class EventTestBase(base.EventTestBase):
# Note: Do not derive from SQLAlchemyEngineTestBase, since we
# don't want to automatically inherit all the Meter setup.
database_connection = 'sqlite://'
class UniqueNameTest(base.EventTest, EventTestBase):
# UniqueName is a construct specific to sqlalchemy.
# Not applicable to other drivers.
def test_unique_exists(self):
u1 = self.conn._get_or_create_unique_name("foo")
self.assertTrue(u1.id >= 0)
u2 = self.conn._get_or_create_unique_name("foo")
self.assertEqual(u1, u2)
def test_new_unique(self):
u1 = self.conn._get_or_create_unique_name("foo")
self.assertTrue(u1.id >= 0)
u2 = self.conn._get_or_create_unique_name("blah")
self.assertNotEqual(u1, u2)
class EventTest(base.EventTest, EventTestBase):
pass
class GetEventTest(base.GetEventTest, EventTestBase):
pass
def test_model_table_args():
cfg.CONF.database_connection = 'mysql://localhost'
assert table_args()

View File

@ -1,8 +1,10 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 New Dream Network, LLC (DreamHost)
# Copyright (c) 2013 OpenStack Foundation
#
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -17,7 +19,10 @@
# under the License.
"""Tests for ceilometer/utils.py
"""
import decimal
import datetime
from ceilometer.tests import base as tests_base
from ceilometer import utils
@ -34,3 +39,18 @@ def test_recursive_keypairs():
('nested:a', 'A'),
('nested:b', 'B'),
]
class TestUtils(tests_base.TestCase):
def test_datetime_to_decimal(self):
expected = 1356093296.12
utc_datetime = datetime.datetime.utcfromtimestamp(expected)
actual = utils.dt_to_decimal(utc_datetime)
self.assertEqual(float(actual), expected)
def test_decimal_to_datetime(self):
expected = 1356093296.12
dexpected = decimal.Decimal(str(expected)) # Python 2.6 wants str()
expected_datetime = datetime.datetime.utcfromtimestamp(expected)
actual_datetime = utils.decimal_to_dt(dexpected)
self.assertEqual(actual_datetime, expected_datetime)