Fix for timestamp precision in SQLAlchemy

This fix adds the PreciseTimestamp custom type to address how
MySQL < 5.6.4 truncates temporal columns to the second:
http://dev.mysql.com/doc/refman/5.6/en/fractional-seconds.html

Change-Id: I9d324d1cb6867cf99e15e2ecdc566dc5f8fef536
Closes-Bug: #1215676
This commit is contained in:
Thomas Maddox 2013-09-11 17:02:00 +00:00
parent 42f02ab3c4
commit a86e7423f1
4 changed files with 204 additions and 2 deletions

View File

@ -0,0 +1,74 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2013 Rackspace Hosting
#
# Author: Thomas Maddox <thomas.maddox@rackspace.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy as sa
from ceilometer.storage.sqlalchemy.models import PreciseTimestamp
_col = 'timestamp'
def _paged(query, size):
offset = 0
while True:
page = query.offset(offset).limit(size).execute()
if page.rowcount <= 0:
# There are no more rows
break
for row in page:
yield row
offset += size
def _convert_data_type(table, col, from_t, to_t, pk_attr='id', index=False):
temp_col_n = 'convert_data_type_temp_col'
# Override column we're going to convert with from_t, since the type we're
# replacing could be custom and we need to tell SQLALchemy how to perform
# CRUD operations with it.
table = sa.Table(table.name, table.metadata, sa.Column(col, from_t),
extend_existing=True)
sa.Column(temp_col_n, to_t).create(table)
key_attr = getattr(table.c, pk_attr)
orig_col = getattr(table.c, col)
new_col = getattr(table.c, temp_col_n)
query = sa.select([key_attr, orig_col])
for key, value in _paged(query, 1000):
table.update().where(key_attr == key)\
.values({temp_col_n: value}).execute()
orig_col.drop()
new_col.alter(name=col)
if index:
sa.Index('ix_%s_%s' % (table.name, col), new_col).create()
def upgrade(migrate_engine):
if migrate_engine.name == 'mysql':
meta = sa.MetaData(bind=migrate_engine)
meter = sa.Table('meter', meta, autoload=True)
_convert_data_type(meter, _col, sa.DateTime(), PreciseTimestamp(),
pk_attr='id', index=True)
def downgrade(migrate_engine):
if migrate_engine.name == 'mysql':
meta = sa.MetaData(bind=migrate_engine)
meter = sa.Table('meter', meta, autoload=True)
_convert_data_type(meter, _col, PreciseTimestamp(), sa.DateTime(),
pk_attr='id', index=True)

View File

@ -25,10 +25,11 @@ from oslo.config import cfg
from sqlalchemy import Column, Integer, String, Table, ForeignKey, DateTime, \ from sqlalchemy import Column, Integer, String, Table, ForeignKey, DateTime, \
Index, UniqueConstraint Index, UniqueConstraint
from sqlalchemy import Float, Boolean, Text from sqlalchemy import Float, Boolean, Text
from sqlalchemy.dialects.mysql import DECIMAL
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import backref from sqlalchemy.orm import backref
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from sqlalchemy.types import TypeDecorator from sqlalchemy.types import TypeDecorator, DATETIME
from ceilometer.openstack.common import timeutils from ceilometer.openstack.common import timeutils
from ceilometer.storage import models as api_models from ceilometer.storage import models as api_models
@ -67,6 +68,33 @@ class JSONEncodedDict(TypeDecorator):
return value return value
class PreciseTimestamp(TypeDecorator):
"""Represents a timestamp precise to the microsecond."""
impl = DATETIME
def load_dialect_impl(self, dialect):
if dialect.name == 'mysql':
return dialect.type_descriptor(DECIMAL(precision=20,
scale=6,
asdecimal=True))
return dialect.type_descriptor(DATETIME())
def process_bind_param(self, value, dialect):
if value is None:
return value
elif dialect.name == 'mysql':
return utils.dt_to_decimal(value)
return value
def process_result_value(self, value, dialect):
if value is None:
return value
elif dialect.name == 'mysql':
return utils.decimal_to_dt(value)
return value
class CeilometerBase(object): class CeilometerBase(object):
"""Base class for Ceilometer Models.""" """Base class for Ceilometer Models."""
__table_args__ = table_args() __table_args__ = table_args()
@ -133,7 +161,7 @@ class Meter(Base):
counter_type = Column(String(255)) counter_type = Column(String(255))
counter_unit = Column(String(255)) counter_unit = Column(String(255))
counter_volume = Column(Float(53)) counter_volume = Column(Float(53))
timestamp = Column(DateTime, default=timeutils.utcnow) timestamp = Column(PreciseTimestamp(), default=timeutils.utcnow)
message_signature = Column(String(1000)) message_signature = Column(String(1000))
message_id = Column(String(1000)) message_id = Column(String(1000))

View File

View File

@ -0,0 +1,100 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2013 Rackspace Hosting
#
# Author: Thomas Maddox <thomas.maddox@rackspace.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
import mock
from sqlalchemy.types import DATETIME, NUMERIC
from sqlalchemy.dialects.mysql import DECIMAL
from ceilometer import utils
from ceilometer.storage.sqlalchemy import models
from ceilometer.tests import base
class PreciseTimestampTest(base.TestCase):
@staticmethod
def fake_dialect(name):
def _type_descriptor_mock(desc):
if type(desc) == DECIMAL:
return NUMERIC(precision=desc.precision, scale=desc.scale)
if type(desc) == DATETIME:
return DATETIME()
dialect = mock.MagicMock()
dialect.name = name
dialect.type_descriptor = _type_descriptor_mock
return dialect
def setUp(self):
super(PreciseTimestampTest, self).setUp()
self._mysql_dialect = self.fake_dialect('mysql')
self._postgres_dialect = self.fake_dialect('postgres')
self._type = models.PreciseTimestamp()
self._date = datetime(2012, 7, 2, 10, 44)
def test_load_dialect_impl_mysql(self):
result = self._type.load_dialect_impl(self._mysql_dialect)
self.assertEqual(type(result), NUMERIC)
self.assertEqual(result.precision, 20)
self.assertEqual(result.scale, 6)
self.assertTrue(result.asdecimal)
def test_load_dialect_impl_postgres(self):
result = self._type.load_dialect_impl(self._postgres_dialect)
self.assertEqual(type(result), DATETIME)
def test_process_bind_param_store_decimal_mysql(self):
expected = utils.dt_to_decimal(self._date)
result = self._type.process_bind_param(self._date, self._mysql_dialect)
self.assertEqual(result, expected)
def test_process_bind_param_store_datetime_postgres(self):
result = self._type.process_bind_param(self._date,
self._postgres_dialect)
self.assertEqual(result, self._date)
def test_process_bind_param_store_none_mysql(self):
result = self._type.process_bind_param(None, self._mysql_dialect)
self.assertEqual(result, None)
def test_process_bind_param_store_none_postgres(self):
result = self._type.process_bind_param(None,
self._postgres_dialect)
self.assertEqual(result, None)
def test_process_result_value_datetime_mysql(self):
dec_value = utils.dt_to_decimal(self._date)
result = self._type.process_result_value(dec_value,
self._mysql_dialect)
self.assertEqual(result, self._date)
def test_process_result_value_datetime_postgres(self):
result = self._type.process_result_value(self._date,
self._postgres_dialect)
self.assertEqual(result, self._date)
def test_process_result_value_none_mysql(self):
result = self._type.process_result_value(None,
self._mysql_dialect)
self.assertEqual(result, None)
def test_process_result_value_none_postgres(self):
result = self._type.process_result_value(None,
self._postgres_dialect)
self.assertEqual(result, None)