stacktach-timex/timex/expression.py
Levi Blackstone fb16776f9d Add PEP8 check and fix related issues
- Add PEP8 section to tox.ini
- Add hacking to requirements to enforce OpenStack style requirements
- Change setup.py to use PBR
- Add setup.cfg
- Fix formatting issues flagged by flake8 check
- Add copyright notices to all remaining files
- Update .gitignore file
- Fix bug in expression.py where a variable was set incorrectly

Change-Id: I634adba3a44b2bcebb4d8c5620cbade28c6c489a
2015-05-05 10:19:53 -05:00

499 lines
14 KiB
Python

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
#
# Copyright © 2014 Rackspace Hosting.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import datetime
import logging
import six
logger = logging.getLogger(__name__)
class TimexError(Exception):
pass
class TimexLexerError(TimexError):
pass
class TimexParserError(TimexError):
pass
class TimexExpressionError(TimexError):
pass
@six.add_metaclass(abc.ABCMeta)
class TimeMatcher(object):
_allow_ambig_duration = False
@abc.abstractmethod
def match(self, dt):
"""Does a specific datetime match this?"""
@abc.abstractmethod
def __add__(self, other):
"""Add a duration"""
@abc.abstractmethod
def __sub__(self, other):
"""Subtract a duration"""
@abc.abstractmethod
def __mod__(self, other):
"""Implements the replace operation with a duration"""
def total_seconds(self):
return 0
@property
def is_range(self):
return False
def __nonzero__(self):
return True
def __contains__(self, other):
return self.match(other)
def _check_duration(self, duration):
if isinstance(duration, Duration):
if ((duration.ambiguous and self._allow_ambig_duration)
or not duration.ambiguous):
return True
raise TimexExpressionError("Invalid duration for time operation")
def _dt_replace(self, dt, duration):
return dt.replace(**duration.as_dict)
def _dt_add(self, dt, duration):
d = duration.as_dict
months = d.pop('month', 0)
years = d.pop('year', 0)
if d:
delta = datetime.timedelta(
**dict((k + "s", val) for k, val in d.items()))
dt = dt + delta
if months:
newmonth = dt.month + months
years += (newmonth - 1) // 12
newmonth = ((newmonth - 1) % 12) + 1
dt = dt.replace(month=newmonth)
if years:
dt = dt.replace(year=(dt.year + years))
return dt
def _dt_sub(self, dt, duration):
d = duration.as_dict
months = d.pop('month', 0)
years = d.pop('year', 0)
if d:
delta = datetime.timedelta(
**dict((k + "s", val) for k, val in d.items()))
dt = dt - delta
if months:
newmonth = dt.month - months
years -= (newmonth - 1) // 12
newmonth = ((newmonth - 1) % 12) + 1
dt = dt.replace(month=newmonth)
if years:
dt = dt.replace(year=(dt.year - years))
return dt
class Timestamp(TimeMatcher):
"""Wrapper on a datetime with same interface as TimeRange"""
def __init__(self, dt):
self.timestamp = dt
@property
def begin(self):
return self.timestamp
@property
def end(self):
return self.timestamp
def match(self, dt):
return self.timestamp == dt
def __add__(self, other):
self._check_duration(other)
return Timestamp(self._dt_add(self.timestamp, other))
def __sub__(self, other):
self._check_duration(other)
return Timestamp(self._dt_sub(self.timestamp, other))
def __mod__(self, other):
self._check_duration(other)
return Timestamp(self._dt_replace(self.timestamp, other))
def __repr__(self):
return "Timestamp for %r" % self.timestamp
class TimeRange(TimeMatcher):
_allow_ambig_duration = True
def __init__(self, begin, end):
self.begin = begin
self.end = end
@property
def timestamp(self):
return self.begin
def total_seconds(self):
delta = self.end - self.begin
return (delta.seconds + (delta.days * 24 * 3600) +
(delta.microseconds * 1e-6))
def __nonzero__(self):
return self.total_seconds() > 0
@property
def is_range(self):
return True
def match(self, dt):
"""Match datetimes
TimeRanges match datetimes from begin (inclusive) to end (exclusive)
"""
return dt >= self.begin and dt < self.end
def __add__(self, other):
self._check_duration(other)
duration = other.in_context(self)
begin = self._dt_add(self.begin, duration)
end = self._dt_add(self.end, duration)
return TimeRange(begin, end)
def __sub__(self, other):
self._check_duration(other)
duration = other.in_context(self)
begin = self._dt_sub(self.begin, duration)
end = self._dt_sub(self.end, duration)
return TimeRange(begin, end)
def __mod__(self, other):
self._check_duration(other)
duration = other.in_context(self)
begin = self._dt_replace(self.begin, duration)
end = self._dt_replace(self.end, duration)
return TimeRange(begin, end)
def next(self):
begin = self.end
end = self._dt_add(begin, Duration(second=self.total_seconds()))
return TimeRange(begin, end)
def prev(self):
end = self.begin
begin = self._dt_sub(end, Duration(second=self.total_seconds()))
return TimeRange(begin, end)
def __repr__(self):
return "TimeRange from %r to %r" % (self.begin, self.end)
def pin(self, dt, unit):
return PinnedTimeRange(self.begin, self.end, dt, unit)
class PinnedTimeRange(TimeRange):
def __init__(self, begin, end, pinned_to, unit):
super(PinnedTimeRange, self).__init__(begin, end)
self.pinned_to = pinned_to
self.unit = unit
self.duration = Duration(**{unit: 1})
def _pin_adjust(self, time_range):
if self.pinned_to in time_range:
return time_range.pin(self.pinned_to, self.unit)
while time_range.begin > self.pinned_to:
time_range = time_range.prev()
while time_range.end <= self.pinned_to:
time_range = time_range.next()
return time_range.pin(self.pinned_to, self.unit)
def __add__(self, other):
time_range = super(PinnedTimeRange, self).__add__(other)
if other < self.duration:
return self._pin_adjust(time_range)
return self.time_range
def __sub__(self, other):
time_range = super(PinnedTimeRange, self).__sub__(other)
if other < self.duration:
return self._pin_adjust(time_range)
return self.time_range
def __mod__(self, other):
time_range = super(PinnedTimeRange, self).__mod__(other)
if other < self.duration:
return self._pin_adjust(time_range)
return self.time_range
def __repr__(self):
return ("PinnedTimeRange from %r to %r. Pinned to %s(%r)"
% (self.begin, self.end, self.unit, self.pinned_to))
class Environment(dict):
def time_func_hour(self, timestamp):
dt = timestamp.timestamp
begin = dt.replace(minute=0, second=0, microsecond=0)
end = begin + datetime.timedelta(hours=1)
return PinnedTimeRange(begin, end, dt, 'hour')
def time_func_day(self, timestamp):
dt = timestamp.timestamp
begin = dt.replace(hour=0, minute=0, second=0, microsecond=0)
end = begin + datetime.timedelta(days=1)
return PinnedTimeRange(begin, end, dt, 'day')
def time_func_month(self, timestamp):
dt = timestamp.timestamp
begin = dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
end = Timestamp(begin) + Duration(month=1)
return PinnedTimeRange(begin, end.timestamp, dt, 'month')
def time_func_year(self, timestamp):
dt = timestamp.timestamp
begin = dt.replace(month=1, day=1, hour=0, minute=0, second=0,
microsecond=0)
end = Timestamp(begin) + Duration(year=1)
return PinnedTimeRange(begin, end.timestamp, dt, 'year')
@six.add_metaclass(abc.ABCMeta)
class TimeExpression(object):
@abc.abstractmethod
def apply(self, env):
"""Apply the expression to a given set of arguments.
:param env: a dictionary-like object. expression functions should
be methods on this object with names beginning with 'time_func_'
returns: TimeMatcher instance
"""
def __call__(self, **kw):
env = Environment()
env.update(kw)
if 'timestamp' not in env:
env['timestamp'] = datetime.datetime.utcnow()
return self.apply(env)
class TimeRangeExpression(TimeExpression):
def __init__(self, begin, end):
self.begin = begin
self.end = end
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.begin, self.end)
def apply(self, env):
begin = self.begin.apply(env)
end = self.end.apply(env)
return TimeRange(begin.timestamp, end.timestamp)
class TimeRangeFunction(TimeRangeExpression):
def __init__(self, func_name, expr=None):
self.func_name = func_name
if expr is None:
expr = Variable('timestamp')
self.expr = expr
def __repr__(self):
return ('%s %s(%r)'
% (self.__class__.__name__, self.func_name, self.expr))
def apply(self, env):
arg = self.expr.apply(env)
try:
func = getattr(env, "time_func_%s" % self.func_name)
except AttributeError:
raise TimexExpressionError("Unknown Function %s" % self.func_name)
return func(arg)
class Variable(TimeExpression):
def __init__(self, name):
self.name = name
def __repr__(self):
return "%s (%s)" % (self.__class__.__name__, self.name)
def apply(self, env):
if self.name not in env:
raise TimexExpressionError("Variable %s not defined" % self.name)
return Timestamp(env[self.name])
class Operation(TimeExpression):
def __init__(self, expr, duration):
if duration.ambiguous and not isinstance(expr, TimeRangeExpression):
raise TimexParserError("Durations must have unit for "
"TimestampExpressions")
self.expr = expr
self.duration = duration
def __repr__(self):
return ('%s(%r, %r)'
% (self.__class__.__name__, self.expr, self.duration))
class Replace(Operation):
def apply(self, env):
val = self.expr.apply(env)
return val % self.duration
class Plus(Operation):
def apply(self, env):
val = self.expr.apply(env)
return val + self.duration
class Minus(Operation):
def apply(self, env):
val = self.expr.apply(env)
return val - self.duration
class Duration(object):
UNIT_SIZES = {'year': 365 * 24 * 60 * 60,
'month': 28 * 24 * 60 * 60,
'day': 24 * 60 * 60,
'hour': 60 * 60,
'minute': 60,
'second': 1,
'microsecond': 1e-6}
UNITS = ('year',
'month',
'day',
'hour',
'minute',
'second',
'microsecond',)
def __init__(self, year=None, month=None, day=None, hour=None,
minute=None, second=None, microsecond=None, unknown=None):
self.year = year
self.month = month
self.day = day
self.hour = hour
self.minute = minute
self.second = second
self.microsecond = microsecond
self.unknown = unknown
@property
def ambiguous(self):
return self.unknown is not None
@property
def as_dict(self):
d = dict()
for unit in self.UNITS:
val = getattr(self, unit)
if val is not None:
d[unit] = val
if self.ambiguous:
d['unknown'] = self.unknown
return d
def in_context(self, timerange):
if not self.ambiguous:
return self
d = abs(timerange.total_seconds())
if d >= self.UNIT_SIZES['year']:
unit = 'month'
elif d >= self.UNIT_SIZES['month']:
unit = 'day'
elif d >= self.UNIT_SIZES['day']:
unit = 'hour'
elif d >= self.UNIT_SIZES['hour']:
unit = 'minute'
elif d >= self.UNIT_SIZES['minute']:
unit = 'second'
else:
unit = 'microsecond'
vals = self.as_dict
del vals['unknown']
if unit in vals:
vals[unit] += self.unknown
else:
vals[unit] = self.unknown
return Duration(**vals)
def __add__(self, other):
result = self.as_dict
o = other.as_dict
for unit in o:
if unit in result:
result[unit] += o[unit]
else:
result[unit] = o[unit]
return Duration(**result)
def __repr__(self):
return '%s %s' % (self.__class__.__name__, str(self.as_dict))
def __gt__(self, other):
for unit in self.UNITS:
our_val = getattr(self, unit)
other_val = getattr(other, unit)
if our_val is not None and other_val is not None:
return our_val > other_val
elif our_val is not None and other_val is None:
return True
elif our_val is None and other_val is not None:
return False
return False
def __lt__(self, other):
for unit in self.UNITS:
our_val = getattr(self, unit)
other_val = getattr(other, unit)
if our_val is not None and other_val is not None:
return our_val < other_val
elif our_val is not None and other_val is None:
return False
elif our_val is None and other_val is not None:
return True
return False
def __eq__(self, other):
for unit in self.UNITS:
our_val = getattr(self, unit)
other_val = getattr(other, unit)
if our_val != other_val:
return False
return True