aodh/tests/api/v2/test_compute_duration_by_resource_scenarios.py
Terri Yu 3fa527f752 Add SQLAlchemy implementation of groupby
Implements: blueprint api-group-by

New class StatisticsGroupByTest contains the storage tests for group by
statistics and has its own test data

The storage tests check group by statistics for
 1) single field, "user-id"
 2) single field, "resource-id"
 3) single field, "project-id"
 4) single field, "source"
 5) single metadata field (not yet implemented)
 6) multiple fields
 7) multiple metadata fields (not yet implemented)
 8) multiple mixed fields, regular and metadata (not yet implemented)
 9) single field groupby with query filter
10) single metadata field groupby with query filter (not yet implemented)
11) multiple field group by with multiple query filters
12) multiple metadata field group by with multiple query filters
    (not yet implemented)
13) single field with period
14) single metadata field with period (not yet implemented)
15) single field with query filter and period
16) single metadata field with query filter and period (not yet implemented)

It also includes the implementation for the SQLAlchemy driver.

Change-Id: I902db657e424b9894c0382db4869b22317fc25da
2013-08-19 04:56:16 +00:00

205 lines
7.8 KiB
Python

# -*- encoding: utf-8 -*-
#
# Copyright © 2012 New Dream Network, LLC (DreamHost)
#
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Test listing raw events.
"""
import datetime
import logging
import testscenarios
from ceilometer.openstack.common import timeutils
from ceilometer.storage import models
from ceilometer.tests import db as tests_db
from .base import FunctionalTest
load_tests = testscenarios.load_tests_apply_scenarios
LOG = logging.getLogger(__name__)
class TestComputeDurationByResource(FunctionalTest,
tests_db.MixinTestsWithBackendScenarios):
def setUp(self):
super(TestComputeDurationByResource, self).setUp()
# Create events relative to the range and pretend
# that the intervening events exist.
self.early1 = datetime.datetime(2012, 8, 27, 7, 0)
self.early2 = datetime.datetime(2012, 8, 27, 17, 0)
self.start = datetime.datetime(2012, 8, 28, 0, 0)
self.middle1 = datetime.datetime(2012, 8, 28, 8, 0)
self.middle2 = datetime.datetime(2012, 8, 28, 18, 0)
self.end = datetime.datetime(2012, 8, 28, 23, 59)
self.late1 = datetime.datetime(2012, 8, 29, 9, 0)
self.late2 = datetime.datetime(2012, 8, 29, 19, 0)
def _stub_interval_func(self, func):
self.stubs.Set(type(self.conn), 'get_meter_statistics', func)
def _set_interval(self, start, end):
def get_interval(ignore_self, event_filter, period):
assert event_filter.start
assert event_filter.end
if (event_filter.start > end or event_filter.end < start):
return []
duration_start = max(event_filter.start, start)
duration_end = min(event_filter.end, end)
duration = timeutils.delta_seconds(duration_start, duration_end)
return [
models.Statistics(
unit='',
min=0,
max=0,
avg=0,
sum=0,
count=0,
period=None,
period_start=None,
period_end=None,
duration=duration,
duration_start=duration_start,
duration_end=duration_end,
groupby=None,
)
]
self._stub_interval_func(get_interval)
def _invoke_api(self):
return self.get_json('/meters/instance:m1.tiny/statistics',
q=[{'field': 'timestamp',
'op': 'ge',
'value': self.start.isoformat()},
{'field': 'timestamp',
'op': 'le',
'value': self.end.isoformat()},
{'field': 'search_offset',
'value': 10}])
def test_before_range(self):
self._set_interval(self.early1, self.early2)
data = self._invoke_api()
self.assertEqual(data, [])
def _assert_times_match(self, actual, expected):
if actual:
actual = timeutils.parse_isotime(actual)
actual = actual.replace(tzinfo=None)
assert actual == expected
def test_overlap_range_start(self):
self._set_interval(self.early1, self.middle1)
data = self._invoke_api()
self._assert_times_match(data[0]['duration_start'], self.start)
self._assert_times_match(data[0]['duration_end'], self.middle1)
self.assertEqual(data[0]['duration'], 8 * 60 * 60)
def test_within_range(self):
self._set_interval(self.middle1, self.middle2)
data = self._invoke_api()
self._assert_times_match(data[0]['duration_start'], self.middle1)
self._assert_times_match(data[0]['duration_end'], self.middle2)
self.assertEqual(data[0]['duration'], 10 * 60 * 60)
def test_within_range_zero_duration(self):
self._set_interval(self.middle1, self.middle1)
data = self._invoke_api()
self._assert_times_match(data[0]['duration_start'], self.middle1)
self._assert_times_match(data[0]['duration_end'], self.middle1)
self.assertEqual(data[0]['duration'], 0)
def test_overlap_range_end(self):
self._set_interval(self.middle2, self.late1)
data = self._invoke_api()
self._assert_times_match(data[0]['duration_start'], self.middle2)
self._assert_times_match(data[0]['duration_end'], self.end)
self.assertEqual(data[0]['duration'], ((6 * 60) - 1) * 60)
def test_after_range(self):
self._set_interval(self.late1, self.late2)
data = self._invoke_api()
self.assertEqual(data, [])
def test_without_end_timestamp(self):
def get_interval(ignore_self, event_filter, period):
return [
models.Statistics(
unit=None,
count=0,
min=None,
max=None,
avg=None,
duration=None,
duration_start=self.late1,
duration_end=self.late2,
sum=0,
period=None,
period_start=None,
period_end=None,
groupby=None,
)
]
self._stub_interval_func(get_interval)
data = self.get_json('/meters/instance:m1.tiny/statistics',
q=[{'field': 'timestamp',
'op': 'ge',
'value': self.late1.isoformat()},
{'field': 'resource_id',
'value': 'resource-id'},
{'field': 'search_offset',
'value': 10}])
self._assert_times_match(data[0]['duration_start'], self.late1)
self._assert_times_match(data[0]['duration_end'], self.late2)
def test_without_start_timestamp(self):
def get_interval(ignore_self, event_filter, period):
return [
models.Statistics(
unit=None,
count=0,
min=None,
max=None,
avg=None,
duration=None,
duration_start=self.early1,
duration_end=self.early2,
sum=0,
period=None,
period_start=None,
period_end=None,
groupby=None,
)
]
return (self.early1, self.early2)
self._stub_interval_func(get_interval)
data = self.get_json('/meters/instance:m1.tiny/statistics',
q=[{'field': 'timestamp',
'op': 'le',
'value': self.early2.isoformat()},
{'field': 'resource_id',
'value': 'resource-id'},
{'field': 'search_offset',
'value': 10}])
self._assert_times_match(data[0]['duration_start'], self.early1)
self._assert_times_match(data[0]['duration_end'], self.early2)