fb16776f9d
- Add PEP8 section to tox.ini - Add hacking to requirements to enforce OpenStack style requirements - Change setup.py to use PBR - Add setup.cfg - Fix formatting issues flagged by flake8 check - Add copyright notices to all remaining files - Update .gitignore file - Fix bug in expression.py where a variable was set incorrectly Change-Id: I634adba3a44b2bcebb4d8c5620cbade28c6c489a
499 lines
14 KiB
Python
499 lines
14 KiB
Python
#!/usr/bin/env python
|
|
# -*- encoding: utf-8 -*-
|
|
#
|
|
# Copyright © 2014 Rackspace Hosting.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import abc
|
|
import datetime
|
|
import logging
|
|
import six
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TimexError(Exception):
|
|
pass
|
|
|
|
|
|
class TimexLexerError(TimexError):
|
|
pass
|
|
|
|
|
|
class TimexParserError(TimexError):
|
|
pass
|
|
|
|
|
|
class TimexExpressionError(TimexError):
|
|
pass
|
|
|
|
|
|
@six.add_metaclass(abc.ABCMeta)
|
|
class TimeMatcher(object):
|
|
_allow_ambig_duration = False
|
|
|
|
@abc.abstractmethod
|
|
def match(self, dt):
|
|
"""Does a specific datetime match this?"""
|
|
|
|
@abc.abstractmethod
|
|
def __add__(self, other):
|
|
"""Add a duration"""
|
|
|
|
@abc.abstractmethod
|
|
def __sub__(self, other):
|
|
"""Subtract a duration"""
|
|
|
|
@abc.abstractmethod
|
|
def __mod__(self, other):
|
|
"""Implements the replace operation with a duration"""
|
|
|
|
def total_seconds(self):
|
|
return 0
|
|
|
|
@property
|
|
def is_range(self):
|
|
return False
|
|
|
|
def __nonzero__(self):
|
|
return True
|
|
|
|
def __contains__(self, other):
|
|
return self.match(other)
|
|
|
|
def _check_duration(self, duration):
|
|
if isinstance(duration, Duration):
|
|
if ((duration.ambiguous and self._allow_ambig_duration)
|
|
or not duration.ambiguous):
|
|
return True
|
|
raise TimexExpressionError("Invalid duration for time operation")
|
|
|
|
def _dt_replace(self, dt, duration):
|
|
return dt.replace(**duration.as_dict)
|
|
|
|
def _dt_add(self, dt, duration):
|
|
d = duration.as_dict
|
|
months = d.pop('month', 0)
|
|
years = d.pop('year', 0)
|
|
if d:
|
|
delta = datetime.timedelta(
|
|
**dict((k + "s", val) for k, val in d.items()))
|
|
dt = dt + delta
|
|
if months:
|
|
newmonth = dt.month + months
|
|
years += (newmonth - 1) // 12
|
|
newmonth = ((newmonth - 1) % 12) + 1
|
|
dt = dt.replace(month=newmonth)
|
|
if years:
|
|
dt = dt.replace(year=(dt.year + years))
|
|
return dt
|
|
|
|
def _dt_sub(self, dt, duration):
|
|
d = duration.as_dict
|
|
months = d.pop('month', 0)
|
|
years = d.pop('year', 0)
|
|
if d:
|
|
delta = datetime.timedelta(
|
|
**dict((k + "s", val) for k, val in d.items()))
|
|
dt = dt - delta
|
|
if months:
|
|
newmonth = dt.month - months
|
|
years -= (newmonth - 1) // 12
|
|
newmonth = ((newmonth - 1) % 12) + 1
|
|
dt = dt.replace(month=newmonth)
|
|
if years:
|
|
dt = dt.replace(year=(dt.year - years))
|
|
return dt
|
|
|
|
|
|
class Timestamp(TimeMatcher):
|
|
"""Wrapper on a datetime with same interface as TimeRange"""
|
|
|
|
def __init__(self, dt):
|
|
self.timestamp = dt
|
|
|
|
@property
|
|
def begin(self):
|
|
return self.timestamp
|
|
|
|
@property
|
|
def end(self):
|
|
return self.timestamp
|
|
|
|
def match(self, dt):
|
|
return self.timestamp == dt
|
|
|
|
def __add__(self, other):
|
|
self._check_duration(other)
|
|
return Timestamp(self._dt_add(self.timestamp, other))
|
|
|
|
def __sub__(self, other):
|
|
self._check_duration(other)
|
|
return Timestamp(self._dt_sub(self.timestamp, other))
|
|
|
|
def __mod__(self, other):
|
|
self._check_duration(other)
|
|
return Timestamp(self._dt_replace(self.timestamp, other))
|
|
|
|
def __repr__(self):
|
|
return "Timestamp for %r" % self.timestamp
|
|
|
|
|
|
class TimeRange(TimeMatcher):
|
|
_allow_ambig_duration = True
|
|
|
|
def __init__(self, begin, end):
|
|
self.begin = begin
|
|
self.end = end
|
|
|
|
@property
|
|
def timestamp(self):
|
|
return self.begin
|
|
|
|
def total_seconds(self):
|
|
delta = self.end - self.begin
|
|
return (delta.seconds + (delta.days * 24 * 3600) +
|
|
(delta.microseconds * 1e-6))
|
|
|
|
def __nonzero__(self):
|
|
return self.total_seconds() > 0
|
|
|
|
@property
|
|
def is_range(self):
|
|
return True
|
|
|
|
def match(self, dt):
|
|
"""Match datetimes
|
|
|
|
TimeRanges match datetimes from begin (inclusive) to end (exclusive)
|
|
"""
|
|
return dt >= self.begin and dt < self.end
|
|
|
|
def __add__(self, other):
|
|
self._check_duration(other)
|
|
duration = other.in_context(self)
|
|
begin = self._dt_add(self.begin, duration)
|
|
end = self._dt_add(self.end, duration)
|
|
return TimeRange(begin, end)
|
|
|
|
def __sub__(self, other):
|
|
self._check_duration(other)
|
|
duration = other.in_context(self)
|
|
begin = self._dt_sub(self.begin, duration)
|
|
end = self._dt_sub(self.end, duration)
|
|
return TimeRange(begin, end)
|
|
|
|
def __mod__(self, other):
|
|
self._check_duration(other)
|
|
duration = other.in_context(self)
|
|
begin = self._dt_replace(self.begin, duration)
|
|
end = self._dt_replace(self.end, duration)
|
|
return TimeRange(begin, end)
|
|
|
|
def next(self):
|
|
begin = self.end
|
|
end = self._dt_add(begin, Duration(second=self.total_seconds()))
|
|
return TimeRange(begin, end)
|
|
|
|
def prev(self):
|
|
end = self.begin
|
|
begin = self._dt_sub(end, Duration(second=self.total_seconds()))
|
|
return TimeRange(begin, end)
|
|
|
|
def __repr__(self):
|
|
return "TimeRange from %r to %r" % (self.begin, self.end)
|
|
|
|
def pin(self, dt, unit):
|
|
return PinnedTimeRange(self.begin, self.end, dt, unit)
|
|
|
|
|
|
class PinnedTimeRange(TimeRange):
|
|
|
|
def __init__(self, begin, end, pinned_to, unit):
|
|
super(PinnedTimeRange, self).__init__(begin, end)
|
|
self.pinned_to = pinned_to
|
|
self.unit = unit
|
|
self.duration = Duration(**{unit: 1})
|
|
|
|
def _pin_adjust(self, time_range):
|
|
if self.pinned_to in time_range:
|
|
return time_range.pin(self.pinned_to, self.unit)
|
|
while time_range.begin > self.pinned_to:
|
|
time_range = time_range.prev()
|
|
while time_range.end <= self.pinned_to:
|
|
time_range = time_range.next()
|
|
return time_range.pin(self.pinned_to, self.unit)
|
|
|
|
def __add__(self, other):
|
|
time_range = super(PinnedTimeRange, self).__add__(other)
|
|
if other < self.duration:
|
|
return self._pin_adjust(time_range)
|
|
return self.time_range
|
|
|
|
def __sub__(self, other):
|
|
time_range = super(PinnedTimeRange, self).__sub__(other)
|
|
if other < self.duration:
|
|
return self._pin_adjust(time_range)
|
|
return self.time_range
|
|
|
|
def __mod__(self, other):
|
|
time_range = super(PinnedTimeRange, self).__mod__(other)
|
|
if other < self.duration:
|
|
return self._pin_adjust(time_range)
|
|
return self.time_range
|
|
|
|
def __repr__(self):
|
|
return ("PinnedTimeRange from %r to %r. Pinned to %s(%r)"
|
|
% (self.begin, self.end, self.unit, self.pinned_to))
|
|
|
|
|
|
class Environment(dict):
|
|
def time_func_hour(self, timestamp):
|
|
dt = timestamp.timestamp
|
|
begin = dt.replace(minute=0, second=0, microsecond=0)
|
|
end = begin + datetime.timedelta(hours=1)
|
|
return PinnedTimeRange(begin, end, dt, 'hour')
|
|
|
|
def time_func_day(self, timestamp):
|
|
dt = timestamp.timestamp
|
|
begin = dt.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
end = begin + datetime.timedelta(days=1)
|
|
return PinnedTimeRange(begin, end, dt, 'day')
|
|
|
|
def time_func_month(self, timestamp):
|
|
dt = timestamp.timestamp
|
|
begin = dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
|
|
end = Timestamp(begin) + Duration(month=1)
|
|
return PinnedTimeRange(begin, end.timestamp, dt, 'month')
|
|
|
|
def time_func_year(self, timestamp):
|
|
dt = timestamp.timestamp
|
|
begin = dt.replace(month=1, day=1, hour=0, minute=0, second=0,
|
|
microsecond=0)
|
|
end = Timestamp(begin) + Duration(year=1)
|
|
return PinnedTimeRange(begin, end.timestamp, dt, 'year')
|
|
|
|
|
|
@six.add_metaclass(abc.ABCMeta)
|
|
class TimeExpression(object):
|
|
@abc.abstractmethod
|
|
def apply(self, env):
|
|
"""Apply the expression to a given set of arguments.
|
|
|
|
:param env: a dictionary-like object. expression functions should
|
|
be methods on this object with names beginning with 'time_func_'
|
|
returns: TimeMatcher instance
|
|
"""
|
|
|
|
def __call__(self, **kw):
|
|
env = Environment()
|
|
env.update(kw)
|
|
if 'timestamp' not in env:
|
|
env['timestamp'] = datetime.datetime.utcnow()
|
|
return self.apply(env)
|
|
|
|
|
|
class TimeRangeExpression(TimeExpression):
|
|
def __init__(self, begin, end):
|
|
self.begin = begin
|
|
self.end = end
|
|
|
|
def __repr__(self):
|
|
return '%s(%r, %r)' % (self.__class__.__name__, self.begin, self.end)
|
|
|
|
def apply(self, env):
|
|
begin = self.begin.apply(env)
|
|
end = self.end.apply(env)
|
|
return TimeRange(begin.timestamp, end.timestamp)
|
|
|
|
|
|
class TimeRangeFunction(TimeRangeExpression):
|
|
def __init__(self, func_name, expr=None):
|
|
self.func_name = func_name
|
|
if expr is None:
|
|
expr = Variable('timestamp')
|
|
self.expr = expr
|
|
|
|
def __repr__(self):
|
|
return ('%s %s(%r)'
|
|
% (self.__class__.__name__, self.func_name, self.expr))
|
|
|
|
def apply(self, env):
|
|
arg = self.expr.apply(env)
|
|
try:
|
|
func = getattr(env, "time_func_%s" % self.func_name)
|
|
except AttributeError:
|
|
raise TimexExpressionError("Unknown Function %s" % self.func_name)
|
|
return func(arg)
|
|
|
|
|
|
class Variable(TimeExpression):
|
|
def __init__(self, name):
|
|
self.name = name
|
|
|
|
def __repr__(self):
|
|
return "%s (%s)" % (self.__class__.__name__, self.name)
|
|
|
|
def apply(self, env):
|
|
if self.name not in env:
|
|
raise TimexExpressionError("Variable %s not defined" % self.name)
|
|
return Timestamp(env[self.name])
|
|
|
|
|
|
class Operation(TimeExpression):
|
|
def __init__(self, expr, duration):
|
|
if duration.ambiguous and not isinstance(expr, TimeRangeExpression):
|
|
raise TimexParserError("Durations must have unit for "
|
|
"TimestampExpressions")
|
|
self.expr = expr
|
|
self.duration = duration
|
|
|
|
def __repr__(self):
|
|
return ('%s(%r, %r)'
|
|
% (self.__class__.__name__, self.expr, self.duration))
|
|
|
|
|
|
class Replace(Operation):
|
|
|
|
def apply(self, env):
|
|
val = self.expr.apply(env)
|
|
return val % self.duration
|
|
|
|
|
|
class Plus(Operation):
|
|
|
|
def apply(self, env):
|
|
val = self.expr.apply(env)
|
|
return val + self.duration
|
|
|
|
|
|
class Minus(Operation):
|
|
|
|
def apply(self, env):
|
|
val = self.expr.apply(env)
|
|
return val - self.duration
|
|
|
|
|
|
class Duration(object):
|
|
UNIT_SIZES = {'year': 365 * 24 * 60 * 60,
|
|
'month': 28 * 24 * 60 * 60,
|
|
'day': 24 * 60 * 60,
|
|
'hour': 60 * 60,
|
|
'minute': 60,
|
|
'second': 1,
|
|
'microsecond': 1e-6}
|
|
UNITS = ('year',
|
|
'month',
|
|
'day',
|
|
'hour',
|
|
'minute',
|
|
'second',
|
|
'microsecond',)
|
|
|
|
def __init__(self, year=None, month=None, day=None, hour=None,
|
|
minute=None, second=None, microsecond=None, unknown=None):
|
|
self.year = year
|
|
self.month = month
|
|
self.day = day
|
|
self.hour = hour
|
|
self.minute = minute
|
|
self.second = second
|
|
self.microsecond = microsecond
|
|
self.unknown = unknown
|
|
|
|
@property
|
|
def ambiguous(self):
|
|
return self.unknown is not None
|
|
|
|
@property
|
|
def as_dict(self):
|
|
d = dict()
|
|
for unit in self.UNITS:
|
|
val = getattr(self, unit)
|
|
if val is not None:
|
|
d[unit] = val
|
|
if self.ambiguous:
|
|
d['unknown'] = self.unknown
|
|
return d
|
|
|
|
def in_context(self, timerange):
|
|
if not self.ambiguous:
|
|
return self
|
|
d = abs(timerange.total_seconds())
|
|
if d >= self.UNIT_SIZES['year']:
|
|
unit = 'month'
|
|
elif d >= self.UNIT_SIZES['month']:
|
|
unit = 'day'
|
|
elif d >= self.UNIT_SIZES['day']:
|
|
unit = 'hour'
|
|
elif d >= self.UNIT_SIZES['hour']:
|
|
unit = 'minute'
|
|
elif d >= self.UNIT_SIZES['minute']:
|
|
unit = 'second'
|
|
else:
|
|
unit = 'microsecond'
|
|
vals = self.as_dict
|
|
del vals['unknown']
|
|
if unit in vals:
|
|
vals[unit] += self.unknown
|
|
else:
|
|
vals[unit] = self.unknown
|
|
return Duration(**vals)
|
|
|
|
def __add__(self, other):
|
|
result = self.as_dict
|
|
o = other.as_dict
|
|
for unit in o:
|
|
if unit in result:
|
|
result[unit] += o[unit]
|
|
else:
|
|
result[unit] = o[unit]
|
|
return Duration(**result)
|
|
|
|
def __repr__(self):
|
|
return '%s %s' % (self.__class__.__name__, str(self.as_dict))
|
|
|
|
def __gt__(self, other):
|
|
for unit in self.UNITS:
|
|
our_val = getattr(self, unit)
|
|
other_val = getattr(other, unit)
|
|
if our_val is not None and other_val is not None:
|
|
return our_val > other_val
|
|
elif our_val is not None and other_val is None:
|
|
return True
|
|
elif our_val is None and other_val is not None:
|
|
return False
|
|
return False
|
|
|
|
def __lt__(self, other):
|
|
for unit in self.UNITS:
|
|
our_val = getattr(self, unit)
|
|
other_val = getattr(other, unit)
|
|
if our_val is not None and other_val is not None:
|
|
return our_val < other_val
|
|
elif our_val is not None and other_val is None:
|
|
return False
|
|
elif our_val is None and other_val is not None:
|
|
return True
|
|
return False
|
|
|
|
def __eq__(self, other):
|
|
for unit in self.UNITS:
|
|
our_val = getattr(self, unit)
|
|
other_val = getattr(other, unit)
|
|
if our_val != other_val:
|
|
return False
|
|
return True
|