Merge "utils: move code where it's actually used and remove"

This commit is contained in:
Jenkins 2015-11-19 19:10:56 +00:00 committed by Gerrit Code Review
commit fce1988d36
16 changed files with 304 additions and 375 deletions

View File

@ -49,7 +49,6 @@ from aodh import keystone_client
from aodh import messaging
from aodh import notifier
from aodh.storage import models
from aodh import utils
LOG = log.getLogger(__name__)
@ -492,6 +491,13 @@ def _send_notification(event, payload):
notifier.info(context.RequestContext(), notification, payload)
def stringify_timestamps(data):
"""Stringify any datetimes in given dict."""
return dict((k, v.isoformat()
if isinstance(v, datetime.datetime) else v)
for (k, v) in six.iteritems(data))
class AlarmController(rest.RestController):
"""Manages operations on a single alarm."""
@ -518,7 +524,7 @@ class AlarmController(rest.RestController):
if not pecan.request.cfg.record_history:
return
type = type or models.AlarmChange.RULE_CHANGE
scrubbed_data = utils.stringify_timestamps(data)
scrubbed_data = stringify_timestamps(data)
detail = json.dumps(scrubbed_data)
user_id = pecan.request.headers.get('X-User-Id')
project_id = pecan.request.headers.get('X-Project-Id')
@ -698,7 +704,7 @@ class AlarmsController(rest.RestController):
if not pecan.request.cfg.record_history:
return
type = models.AlarmChange.CREATION
scrubbed_data = utils.stringify_timestamps(data)
scrubbed_data = stringify_timestamps(data)
detail = json.dumps(scrubbed_data)
user_id = pecan.request.headers.get('X-User-Id')
project_id = pecan.request.headers.get('X-Project-Id')

View File

@ -20,15 +20,49 @@
import pecan
from pecan import rest
import six
from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan
from aodh.api.controllers.v2 import base
from aodh import utils
def _decode_unicode(input):
"""Decode the unicode of the message, and encode it into utf-8."""
if isinstance(input, dict):
temp = {}
# If the input data is a dict, create an equivalent dict with a
# predictable insertion order to avoid inconsistencies in the
# message signature computation for equivalent payloads modulo
# ordering
for key, value in sorted(six.iteritems(input)):
temp[_decode_unicode(key)] = _decode_unicode(value)
return temp
elif isinstance(input, (tuple, list)):
# When doing a pair of JSON encode/decode operations to the tuple,
# the tuple would become list. So we have to generate the value as
# list here.
return [_decode_unicode(element) for element in input]
elif isinstance(input, six.text_type):
return input.encode('utf-8')
else:
return input
def _recursive_keypairs(d, separator=':'):
"""Generator that produces sequence of keypairs for nested dictionaries."""
for name, value in sorted(six.iteritems(d)):
if isinstance(value, dict):
for subname, subvalue in _recursive_keypairs(value, separator):
yield ('%s%s%s' % (name, separator, subname), subvalue)
elif isinstance(value, (tuple, list)):
yield name, _decode_unicode(value)
else:
yield name, value
def _flatten_capabilities(capabilities):
return dict((k, v) for k, v in utils.recursive_keypairs(capabilities))
return dict((k, v) for k, v in _recursive_keypairs(capabilities))
class Capabilities(base.Base):

View File

@ -33,7 +33,6 @@ from aodh.api.controllers.v2 import base
from aodh.api import rbac
from aodh.i18n import _
from aodh.storage import models
from aodh import utils
LOG = log.getLogger(__name__)
@ -228,9 +227,15 @@ class ValidatedComplexQuery(object):
raise base.ClientSideError(msg)
@staticmethod
def _convert_orderby_to_lower_case(orderby):
def lowercase_values(mapping):
"""Converts the values in the mapping dict to lowercase."""
items = mapping.items()
for key, value in items:
mapping[key] = value.lower()
def _convert_orderby_to_lower_case(self, orderby):
for orderby_field in orderby:
utils.lowercase_values(orderby_field)
self.lowercase_values(orderby_field)
def _normalize_field_names_in_orderby(self, orderby):
for orderby_field in orderby:
@ -307,8 +312,16 @@ class ValidatedComplexQuery(object):
del subfilter[field]
subfilter["resource_" + field] = value
@staticmethod
def lowercase_keys(mapping):
"""Converts the values of the keys in mapping to lowercase."""
items = mapping.items()
for key, value in items:
del mapping[key]
mapping[key.lower()] = value
def _convert_operator_to_lower_case(self, filter_expr):
self._traverse_postorder(filter_expr, utils.lowercase_keys)
self._traverse_postorder(filter_expr, self.lowercase_keys)
@staticmethod
def _convert_to_datetime(isotime):

View File

@ -12,15 +12,17 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import bisect
import hashlib
import struct
import uuid
from oslo_config import cfg
from oslo_log import log
import six
import tooz.coordination
from aodh.i18n import _LE, _LI
from aodh import utils
LOG = log.getLogger(__name__)
@ -43,6 +45,36 @@ OPTS = [
]
class HashRing(object):
def __init__(self, nodes, replicas=100):
self._ring = dict()
self._sorted_keys = []
for node in nodes:
for r in six.moves.range(replicas):
hashed_key = self._hash('%s-%s' % (node, r))
self._ring[hashed_key] = node
self._sorted_keys.append(hashed_key)
self._sorted_keys.sort()
@staticmethod
def _hash(key):
return struct.unpack_from('>I',
hashlib.md5(str(key).encode()).digest())[0]
def _get_position_on_ring(self, key):
hashed_key = self._hash(key)
position = bisect.bisect(self._sorted_keys, hashed_key)
return position if position < len(self._sorted_keys) else 0
def get_node(self, key):
if not self._ring:
return None
pos = self._get_position_on_ring(key)
return self._ring[self._sorted_keys[pos]]
class PartitionCoordinator(object):
"""Workload partitioning coordinator.
@ -166,7 +198,7 @@ class PartitionCoordinator(object):
try:
members = self._get_members(group_id)
LOG.debug('Members of group: %s', members)
hr = utils.HashRing(members)
hr = HashRing(members)
filtered = [v for v in iterable
if hr.get_node(str(v)) == self._my_id]
LOG.debug('My subset: %s', filtered)

View File

@ -14,15 +14,15 @@
# under the License.
"""Storage backend management
"""
import datetime
from oslo_config import cfg
from oslo_log import log
from oslo_utils import timeutils
import retrying
import six.moves.urllib.parse as urlparse
from stevedore import driver
from aodh import utils
_NAMESPACE = 'aodh.storage'
@ -90,9 +90,9 @@ class SampleFilter(object):
metaquery=None):
self.user = user
self.project = project
self.start_timestamp = utils.sanitize_timestamp(start_timestamp)
self.start_timestamp = self.sanitize_timestamp(start_timestamp)
self.start_timestamp_op = start_timestamp_op
self.end_timestamp = utils.sanitize_timestamp(end_timestamp)
self.end_timestamp = self.sanitize_timestamp(end_timestamp)
self.end_timestamp_op = end_timestamp_op
self.resource = resource
self.meter = meter
@ -100,6 +100,15 @@ class SampleFilter(object):
self.metaquery = metaquery or {}
self.message_id = message_id
@staticmethod
def sanitize_timestamp(timestamp):
"""Return a naive utc datetime object."""
if not timestamp:
return timestamp
if not isinstance(timestamp, datetime.datetime):
timestamp = timeutils.parse_isotime(timestamp)
return timeutils.normalize_time(timestamp)
def __repr__(self):
return ("<SampleFilter(user: %s,"
" project: %s,"

View File

@ -14,7 +14,7 @@
# under the License.
"""Base classes for storage engines
"""
import copy
import inspect
import six
@ -22,6 +22,45 @@ import six
import aodh
def dict_to_keyval(value, key_base=None):
"""Expand a given dict to its corresponding key-value pairs.
Generated keys are fully qualified, delimited using dot notation.
ie. key = 'key.child_key.grandchild_key[0]'
"""
val_iter, key_func = None, None
if isinstance(value, dict):
val_iter = six.iteritems(value)
key_func = lambda k: key_base + '.' + k if key_base else k
elif isinstance(value, (tuple, list)):
val_iter = enumerate(value)
key_func = lambda k: key_base + '[%d]' % k
if val_iter:
for k, v in val_iter:
key_gen = key_func(k)
if isinstance(v, dict) or isinstance(v, (tuple, list)):
for key_gen, v in dict_to_keyval(v, key_gen):
yield key_gen, v
else:
yield key_gen, v
def update_nested(original_dict, updates):
"""Updates the leaf nodes in a nest dict.
Updates occur without replacing entire sub-dicts.
"""
dict_to_update = copy.deepcopy(original_dict)
for key, value in six.iteritems(updates):
if isinstance(value, dict):
sub_dict = update_nested(dict_to_update.get(key, {}), value)
dict_to_update[key] = sub_dict
else:
dict_to_update[key] = updates[key]
return dict_to_update
class Model(object):
"""Base class for storage API models."""

View File

@ -22,7 +22,6 @@ from aodh.storage.hbase import base as hbase_base
from aodh.storage.hbase import migration as hbase_migration
from aodh.storage.hbase import utils as hbase_utils
from aodh.storage import models
from aodh import utils
LOG = log.getLogger(__name__)
@ -61,9 +60,9 @@ class Connection(hbase_base.Connection, base.Connection):
if not determined
"""
CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES,
AVAILABLE_CAPABILITIES)
STORAGE_CAPABILITIES = utils.update_nested(
CAPABILITIES = base.update_nested(base.Connection.CAPABILITIES,
AVAILABLE_CAPABILITIES)
STORAGE_CAPABILITIES = base.update_nested(
base.Connection.STORAGE_CAPABILITIES,
AVAILABLE_STORAGE_CAPABILITIES,
)

View File

@ -30,7 +30,6 @@ from aodh.storage import base
from aodh.storage import models as alarm_api_models
from aodh.storage.sqlalchemy import models
from aodh.storage.sqlalchemy import utils as sql_utils
from aodh import utils
LOG = log.getLogger(__name__)
@ -49,9 +48,9 @@ AVAILABLE_STORAGE_CAPABILITIES = {
class Connection(base.Connection):
"""Put the data into a SQLAlchemy database. """
CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES,
AVAILABLE_CAPABILITIES)
STORAGE_CAPABILITIES = utils.update_nested(
CAPABILITIES = base.update_nested(base.Connection.CAPABILITIES,
AVAILABLE_CAPABILITIES)
STORAGE_CAPABILITIES = base.update_nested(
base.Connection.STORAGE_CAPABILITIES,
AVAILABLE_STORAGE_CAPABILITIES,
)

View File

@ -25,7 +25,6 @@ import six
from aodh.storage import base
from aodh.storage import models
from aodh.storage.mongo import utils as pymongo_utils
from aodh import utils
LOG = log.getLogger(__name__)
@ -45,10 +44,10 @@ AVAILABLE_STORAGE_CAPABILITIES = {
class Connection(base.Connection):
"""Base Alarm Connection class for MongoDB driver."""
CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES,
COMMON_AVAILABLE_CAPABILITIES)
CAPABILITIES = base.update_nested(base.Connection.CAPABILITIES,
COMMON_AVAILABLE_CAPABILITIES)
STORAGE_CAPABILITIES = utils.update_nested(
STORAGE_CAPABILITIES = base.update_nested(
base.Connection.STORAGE_CAPABILITIES,
AVAILABLE_STORAGE_CAPABILITIES,
)

View File

@ -13,17 +13,19 @@
"""
SQLAlchemy models for aodh data.
"""
import calendar
import datetime
import decimal
import json
from oslo_utils import timeutils
from oslo_utils import units
import six
from sqlalchemy import Column, String, Index, Boolean, Text, DateTime
from sqlalchemy.dialects.mysql import DECIMAL
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.types import TypeDecorator
from aodh import utils
class JSONEncodedDict(TypeDecorator):
"""Represents an immutable structure as a json-encoded string."""
@ -60,7 +62,13 @@ class PreciseTimestamp(TypeDecorator):
if value is None:
return value
elif dialect.name == 'mysql':
return utils.dt_to_decimal(value)
decimal.getcontext().prec = 30
return (
decimal.Decimal(
str(calendar.timegm(value.utctimetuple()))) +
(decimal.Decimal(str(value.microsecond)) /
decimal.Decimal("1000000.0"))
)
return value
def compare_against_backend(self, dialect, conn_type):
@ -73,7 +81,11 @@ class PreciseTimestamp(TypeDecorator):
if value is None:
return value
elif dialect.name == 'mysql':
return utils.decimal_to_dt(value)
integer = int(value)
micro = (value
- decimal.Decimal(integer)) * decimal.Decimal(units.M)
daittyme = datetime.datetime.utcfromtimestamp(integer)
return daittyme.replace(microsecond=int(round(micro)))
return value

View File

@ -22,7 +22,6 @@ from sqlalchemy.dialects.mysql import DECIMAL
from sqlalchemy.types import NUMERIC
from aodh.storage.sqlalchemy import models
from aodh import utils
class PreciseTimestampTest(base.BaseTestCase):
@ -57,11 +56,6 @@ class PreciseTimestampTest(base.BaseTestCase):
result = self._type.load_dialect_impl(self._postgres_dialect)
self.assertEqual(sqlalchemy.DateTime, type(result))
def test_process_bind_param_store_decimal_mysql(self):
expected = utils.dt_to_decimal(self._date)
result = self._type.process_bind_param(self._date, self._mysql_dialect)
self.assertEqual(expected, result)
def test_process_bind_param_store_datetime_postgres(self):
result = self._type.process_bind_param(self._date,
self._postgres_dialect)
@ -76,12 +70,6 @@ class PreciseTimestampTest(base.BaseTestCase):
self._postgres_dialect)
self.assertIsNone(result)
def test_process_result_value_datetime_mysql(self):
dec_value = utils.dt_to_decimal(self._date)
result = self._type.process_result_value(dec_value,
self._mysql_dialect)
self.assertEqual(self._date, result)
def test_process_result_value_datetime_postgres(self):
result = self._type.process_result_value(self._date,
self._postgres_dialect)

View File

@ -0,0 +1,60 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslotest import base
from aodh.api.controllers.v2 import capabilities
class TestCapabilities(base.BaseTestCase):
def test_recursive_keypairs(self):
data = {'a': 'A', 'b': 'B',
'nested': {'a': 'A', 'b': 'B'}}
pairs = list(capabilities._recursive_keypairs(data))
self.assertEqual([('a', 'A'), ('b', 'B'),
('nested:a', 'A'), ('nested:b', 'B')],
pairs)
def test_recursive_keypairs_with_separator(self):
data = {'a': 'A',
'b': 'B',
'nested': {'a': 'A',
'b': 'B',
},
}
separator = '.'
pairs = list(capabilities._recursive_keypairs(data, separator))
self.assertEqual([('a', 'A'),
('b', 'B'),
('nested.a', 'A'),
('nested.b', 'B')],
pairs)
def test_recursive_keypairs_with_list_of_dict(self):
small = 1
big = 1 << 64
expected = [('a', 'A'),
('b', 'B'),
('nested:list', [{small: 99, big: 42}])]
data = {'a': 'A',
'b': 'B',
'nested': {'list': [{small: 99, big: 42}]}}
pairs = list(capabilities._recursive_keypairs(data))
self.assertEqual(len(expected), len(pairs))
for k, v in pairs:
# the keys 1 and 1<<64 cause a hash collision on 64bit platforms
if k == 'nested:list':
self.assertIn(v,
[[{small: 99, big: 42}],
[{big: 42, small: 99}]])
else:
self.assertIn((k, v), expected)

View File

@ -20,7 +20,6 @@ import tooz.coordination
from aodh import coordination
from aodh import service
from aodh.tests import base
from aodh import utils
class MockToozCoordinator(object):
@ -105,6 +104,40 @@ class MockAsyncError(tooz.coordination.CoordAsyncResult):
return True
class TestHashRing(base.BaseTestCase):
def test_hash_ring(self):
num_nodes = 10
num_keys = 1000
nodes = [str(x) for x in range(num_nodes)]
hr = coordination.HashRing(nodes)
buckets = [0] * num_nodes
assignments = [-1] * num_keys
for k in range(num_keys):
n = int(hr.get_node(str(k)))
self.assertTrue(0 <= n <= num_nodes)
buckets[n] += 1
assignments[k] = n
# at least something in each bucket
self.assertTrue(all((c > 0 for c in buckets)))
# approximately even distribution
diff = max(buckets) - min(buckets)
self.assertTrue(diff < 0.3 * (num_keys / num_nodes))
# consistency
num_nodes += 1
nodes.append(str(num_nodes + 1))
hr = coordination.HashRing(nodes)
for k in range(num_keys):
n = int(hr.get_node(str(k)))
assignments[k] -= n
reassigned = len([c for c in assignments if c != 0])
self.assertTrue(reassigned < num_keys / num_nodes)
class TestPartitioning(base.BaseTestCase):
def setUp(self):
@ -164,7 +197,7 @@ class TestPartitioning(base.BaseTestCase):
agents = ['agent_%s' % i for i in range(10)]
expected_resources = [list() for _ in range(len(agents))]
hr = utils.HashRing(agents)
hr = coordination.HashRing(agents)
for r in all_resources:
key = agents.index(hr.get_node(r))
expected_resources[key].append(r)

View File

@ -0,0 +1,34 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslotest import base
from aodh.storage import base as storage_base
class TestUtils(base.BaseTestCase):
def test_dict_to_kv(self):
data = {'a': 'A',
'b': 'B',
'nested': {'a': 'A',
'b': 'B',
},
'nested2': [{'c': 'A'}, {'c': 'B'}]
}
pairs = list(storage_base.dict_to_keyval(data))
self.assertEqual([('a', 'A'),
('b', 'B'),
('nested.a', 'A'),
('nested.b', 'B'),
('nested2[0].c', 'A'),
('nested2[1].c', 'B')],
sorted(pairs, key=lambda x: x[0]))

View File

@ -1,137 +0,0 @@
#
# Copyright 2012 New Dream Network, LLC (DreamHost)
# Copyright (c) 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for aodh/utils.py
"""
import datetime
import decimal
from oslotest import base
from aodh import utils
class TestUtils(base.BaseTestCase):
def test_datetime_to_decimal(self):
expected = 1356093296.12
utc_datetime = datetime.datetime.utcfromtimestamp(expected)
actual = utils.dt_to_decimal(utc_datetime)
self.assertAlmostEqual(expected, float(actual), places=5)
def test_decimal_to_datetime(self):
expected = 1356093296.12
dexpected = decimal.Decimal(str(expected)) # Python 2.6 wants str()
expected_datetime = datetime.datetime.utcfromtimestamp(expected)
actual_datetime = utils.decimal_to_dt(dexpected)
# Python 3 have rounding issue on this, so use float
self.assertAlmostEqual(utils.dt_to_decimal(expected_datetime),
utils.dt_to_decimal(actual_datetime),
places=5)
def test_recursive_keypairs(self):
data = {'a': 'A', 'b': 'B',
'nested': {'a': 'A', 'b': 'B'}}
pairs = list(utils.recursive_keypairs(data))
self.assertEqual([('a', 'A'), ('b', 'B'),
('nested:a', 'A'), ('nested:b', 'B')],
pairs)
def test_recursive_keypairs_with_separator(self):
data = {'a': 'A',
'b': 'B',
'nested': {'a': 'A',
'b': 'B',
},
}
separator = '.'
pairs = list(utils.recursive_keypairs(data, separator))
self.assertEqual([('a', 'A'),
('b', 'B'),
('nested.a', 'A'),
('nested.b', 'B')],
pairs)
def test_recursive_keypairs_with_list_of_dict(self):
small = 1
big = 1 << 64
expected = [('a', 'A'),
('b', 'B'),
('nested:list', [{small: 99, big: 42}])]
data = {'a': 'A',
'b': 'B',
'nested': {'list': [{small: 99, big: 42}]}}
pairs = list(utils.recursive_keypairs(data))
self.assertEqual(len(expected), len(pairs))
for k, v in pairs:
# the keys 1 and 1<<64 cause a hash collision on 64bit platforms
if k == 'nested:list':
self.assertIn(v,
[[{small: 99, big: 42}],
[{big: 42, small: 99}]])
else:
self.assertIn((k, v), expected)
def test_decimal_to_dt_with_none_parameter(self):
self.assertIsNone(utils.decimal_to_dt(None))
def test_dict_to_kv(self):
data = {'a': 'A',
'b': 'B',
'nested': {'a': 'A',
'b': 'B',
},
'nested2': [{'c': 'A'}, {'c': 'B'}]
}
pairs = list(utils.dict_to_keyval(data))
self.assertEqual([('a', 'A'),
('b', 'B'),
('nested.a', 'A'),
('nested.b', 'B'),
('nested2[0].c', 'A'),
('nested2[1].c', 'B')],
sorted(pairs, key=lambda x: x[0]))
def test_hash_ring(self):
num_nodes = 10
num_keys = 1000
nodes = [str(x) for x in range(num_nodes)]
hr = utils.HashRing(nodes)
buckets = [0] * num_nodes
assignments = [-1] * num_keys
for k in range(num_keys):
n = int(hr.get_node(str(k)))
self.assertTrue(0 <= n <= num_nodes)
buckets[n] += 1
assignments[k] = n
# at least something in each bucket
self.assertTrue(all((c > 0 for c in buckets)))
# approximately even distribution
diff = max(buckets) - min(buckets)
self.assertTrue(diff < 0.3 * (num_keys / num_nodes))
# consistency
num_nodes += 1
nodes.append(str(num_nodes + 1))
hr = utils.HashRing(nodes)
for k in range(num_keys):
n = int(hr.get_node(str(k)))
assignments[k] -= n
reassigned = len([c for c in assignments if c != 0])
self.assertTrue(reassigned < num_keys / num_nodes)

View File

@ -1,191 +0,0 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Justin Santa Barbara
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Utilities and helper functions."""
import bisect
import calendar
import copy
import datetime
import decimal
import hashlib
import struct
from oslo_utils import timeutils
from oslo_utils import units
import six
def decode_unicode(input):
"""Decode the unicode of the message, and encode it into utf-8."""
if isinstance(input, dict):
temp = {}
# If the input data is a dict, create an equivalent dict with a
# predictable insertion order to avoid inconsistencies in the
# message signature computation for equivalent payloads modulo
# ordering
for key, value in sorted(six.iteritems(input)):
temp[decode_unicode(key)] = decode_unicode(value)
return temp
elif isinstance(input, (tuple, list)):
# When doing a pair of JSON encode/decode operations to the tuple,
# the tuple would become list. So we have to generate the value as
# list here.
return [decode_unicode(element) for element in input]
elif isinstance(input, six.text_type):
return input.encode('utf-8')
else:
return input
def recursive_keypairs(d, separator=':'):
"""Generator that produces sequence of keypairs for nested dictionaries."""
for name, value in sorted(six.iteritems(d)):
if isinstance(value, dict):
for subname, subvalue in recursive_keypairs(value, separator):
yield ('%s%s%s' % (name, separator, subname), subvalue)
elif isinstance(value, (tuple, list)):
yield name, decode_unicode(value)
else:
yield name, value
def dt_to_decimal(utc):
"""Datetime to Decimal.
Some databases don't store microseconds in datetime
so we always store as Decimal unixtime.
"""
if utc is None:
return None
decimal.getcontext().prec = 30
return (decimal.Decimal(str(calendar.timegm(utc.utctimetuple()))) +
(decimal.Decimal(str(utc.microsecond)) /
decimal.Decimal("1000000.0")))
def decimal_to_dt(dec):
"""Return a datetime from Decimal unixtime format."""
if dec is None:
return None
integer = int(dec)
micro = (dec - decimal.Decimal(integer)) * decimal.Decimal(units.M)
daittyme = datetime.datetime.utcfromtimestamp(integer)
return daittyme.replace(microsecond=int(round(micro)))
def sanitize_timestamp(timestamp):
"""Return a naive utc datetime object."""
if not timestamp:
return timestamp
if not isinstance(timestamp, datetime.datetime):
timestamp = timeutils.parse_isotime(timestamp)
return timeutils.normalize_time(timestamp)
def stringify_timestamps(data):
"""Stringify any datetimes in given dict."""
isa_timestamp = lambda v: isinstance(v, datetime.datetime)
return dict((k, v.isoformat() if isa_timestamp(v) else v)
for (k, v) in six.iteritems(data))
def dict_to_keyval(value, key_base=None):
"""Expand a given dict to its corresponding key-value pairs.
Generated keys are fully qualified, delimited using dot notation.
ie. key = 'key.child_key.grandchild_key[0]'
"""
val_iter, key_func = None, None
if isinstance(value, dict):
val_iter = six.iteritems(value)
key_func = lambda k: key_base + '.' + k if key_base else k
elif isinstance(value, (tuple, list)):
val_iter = enumerate(value)
key_func = lambda k: key_base + '[%d]' % k
if val_iter:
for k, v in val_iter:
key_gen = key_func(k)
if isinstance(v, dict) or isinstance(v, (tuple, list)):
for key_gen, v in dict_to_keyval(v, key_gen):
yield key_gen, v
else:
yield key_gen, v
def lowercase_keys(mapping):
"""Converts the values of the keys in mapping to lowercase."""
items = mapping.items()
for key, value in items:
del mapping[key]
mapping[key.lower()] = value
def lowercase_values(mapping):
"""Converts the values in the mapping dict to lowercase."""
items = mapping.items()
for key, value in items:
mapping[key] = value.lower()
def update_nested(original_dict, updates):
"""Updates the leaf nodes in a nest dict.
Updates occur without replacing entire sub-dicts.
"""
dict_to_update = copy.deepcopy(original_dict)
for key, value in six.iteritems(updates):
if isinstance(value, dict):
sub_dict = update_nested(dict_to_update.get(key, {}), value)
dict_to_update[key] = sub_dict
else:
dict_to_update[key] = updates[key]
return dict_to_update
class HashRing(object):
def __init__(self, nodes, replicas=100):
self._ring = dict()
self._sorted_keys = []
for node in nodes:
for r in six.moves.range(replicas):
hashed_key = self._hash('%s-%s' % (node, r))
self._ring[hashed_key] = node
self._sorted_keys.append(hashed_key)
self._sorted_keys.sort()
@staticmethod
def _hash(key):
return struct.unpack_from('>I',
hashlib.md5(str(key).encode()).digest())[0]
def _get_position_on_ring(self, key):
hashed_key = self._hash(key)
position = bisect.bisect(self._sorted_keys, hashed_key)
return position if position < len(self._sorted_keys) else 0
def get_node(self, key):
if not self._ring:
return None
pos = self._get_position_on_ring(key)
return self._ring[self._sorted_keys[pos]]