Fix rbac system

After investigation of the heat breakage, after
we update the policy.json. We discover
that the rbac have never worked as expected.
The only rule that works is role == admin. All other
just can't works.

So the policy file have been changed to, By default project
owner of the alarm should be able to change their alarms.
And anyone can create a new alarm.

Also the alarm definition is now passed to the policy enforcer
to allow it checks owner of the alarms.

Closes-bug: #1504495
Change-Id: I408d839eec84af46adb333a6ad5c49c890513fd0
This commit is contained in:
Mehdi Abaakouk 2015-12-07 09:52:57 +01:00
parent d3fed74a62
commit dde9712152
12 changed files with 173 additions and 119 deletions

View File

@ -510,15 +510,23 @@ class AlarmController(rest.RestController):
pecan.request.context['alarm_id'] = alarm_id
self._id = alarm_id
def _alarm(self):
def _alarm(self, rbac_directive):
self.conn = pecan.request.alarm_storage_conn
# TODO(sileht): We should be able to relax this since we
# pass the alarm object to the enforcer.
auth_project = rbac.get_limited_to_project(pecan.request.headers,
pecan.request.enforcer)
alarms = list(self.conn.get_alarms(alarm_id=self._id,
project=auth_project))
if not alarms:
raise base.AlarmNotFound(alarm=self._id, auth_project=auth_project)
return alarms[0]
alarm = alarms[0]
target = {'user_id': alarm.user_id,
'project_id': alarm.project_id}
rbac.enforce(rbac_directive, pecan.request.headers,
pecan.request.enforcer, target)
return alarm
def _record_change(self, data, now, on_behalf_of=None, type=None):
if not pecan.request.cfg.record_history:
@ -552,11 +560,7 @@ class AlarmController(rest.RestController):
@wsme_pecan.wsexpose(Alarm)
def get(self):
"""Return this alarm."""
rbac.enforce('get_alarm', pecan.request.headers,
pecan.request.enforcer)
return Alarm.from_db_model(self._alarm())
return Alarm.from_db_model(self._alarm('get_alarm'))
@wsme_pecan.wsexpose(Alarm, body=Alarm)
def put(self, data):
@ -565,11 +569,8 @@ class AlarmController(rest.RestController):
:param data: an alarm within the request body.
"""
rbac.enforce('change_alarm', pecan.request.headers,
pecan.request.enforcer)
# Ensure alarm exists
alarm_in = self._alarm()
alarm_in = self._alarm('change_alarm')
now = timeutils.utcnow()
@ -624,11 +625,8 @@ class AlarmController(rest.RestController):
def delete(self):
"""Delete this alarm."""
rbac.enforce('delete_alarm', pecan.request.headers,
pecan.request.enforcer)
# ensure alarm exists before deleting
alarm = self._alarm()
alarm = self._alarm('delete_alarm')
self.conn.delete_alarm(alarm.alarm_id)
alarm_object = Alarm.from_db_model(alarm)
alarm_object.delete_actions()
@ -640,8 +638,10 @@ class AlarmController(rest.RestController):
:param q: Filter rules for the changes to be described.
"""
target = rbac.target_from_segregation_rule(
pecan.request.headers, pecan.request.enforcer)
rbac.enforce('alarm_history', pecan.request.headers,
pecan.request.enforcer)
pecan.request.enforcer, target)
q = q or []
# allow history to be returned for deleted alarms, but scope changes
@ -664,15 +664,13 @@ class AlarmController(rest.RestController):
:param state: an alarm state within the request body.
"""
rbac.enforce('change_alarm_state', pecan.request.headers,
pecan.request.enforcer)
alarm = self._alarm('change_alarm_state')
# note(sileht): body are not validated by wsme
# Workaround for https://bugs.launchpad.net/wsme/+bug/1227229
if state not in state_kind:
raise base.ClientSideError(_("state invalid"))
now = timeutils.utcnow()
alarm = self._alarm()
alarm.state = state
alarm.state_timestamp = now
alarm = self.conn.update_alarm(alarm)
@ -684,12 +682,7 @@ class AlarmController(rest.RestController):
@wsme_pecan.wsexpose(state_kind_enum)
def get_state(self):
"""Get the state of this alarm."""
rbac.enforce('get_alarm_state', pecan.request.headers,
pecan.request.enforcer)
alarm = self._alarm()
return alarm.state
return self._alarm('get_alarm_state').state
class AlarmsController(rest.RestController):
@ -735,7 +728,7 @@ class AlarmsController(rest.RestController):
:param data: an alarm within the request body.
"""
rbac.enforce('create_alarm', pecan.request.headers,
pecan.request.enforcer)
pecan.request.enforcer, {})
conn = pecan.request.alarm_storage_conn
now = timeutils.utcnow()
@ -797,9 +790,10 @@ class AlarmsController(rest.RestController):
:param q: Filter rules for the alarms to be returned.
"""
target = rbac.target_from_segregation_rule(
pecan.request.headers, pecan.request.enforcer)
rbac.enforce('get_alarms', pecan.request.headers,
pecan.request.enforcer)
pecan.request.enforcer, target)
q = q or []
# Timestamp is not supported field for Simple Alarm queries

View File

@ -349,9 +349,10 @@ class QueryAlarmHistoryController(rest.RestController):
:param body: Query rules for the alarm history to be returned.
"""
target = rbac.target_from_segregation_rule(
pecan.request.headers, pecan.request.enforcer)
rbac.enforce('query_alarm_history', pecan.request.headers,
pecan.request.enforcer)
pecan.request.enforcer, target)
query = ValidatedComplexQuery(body,
models.AlarmChange)
@ -374,8 +375,10 @@ class QueryAlarmsController(rest.RestController):
:param body: Query rules for the alarms to be returned.
"""
target = rbac.target_from_segregation_rule(
pecan.request.headers, pecan.request.enforcer)
rbac.enforce('query_alarm', pecan.request.headers,
pecan.request.enforcer)
pecan.request.enforcer, target)
query = ValidatedComplexQuery(body,
models.Alarm)

View File

@ -25,7 +25,7 @@ class ConfigHook(hooks.PecanHook):
def __init__(self, conf):
self.conf = conf
self.enforcer = policy.Enforcer(conf)
self.enforcer = policy.Enforcer(conf, default_rule="default")
def before(self, state):
state.request.cfg = self.conf

View File

@ -20,28 +20,50 @@
import pecan
def enforce(policy_name, headers, enforcer):
def target_from_segregation_rule(headers, enforcer):
"""Return a target that corresponds of an alarm returned by segregation rule
This allows to use project_id: in an oslo_policy rule for query/listing.
:param headers: HTTP headers dictionary
:param enforcer: policy enforcer
:returns: target
"""
project_id = get_limited_to_project(headers, enforcer)
if project_id is not None:
return {'project_id': project_id}
return {}
def enforce(policy_name, headers, enforcer, target):
"""Return the user and project the request should be limited to.
:param policy_name: the policy name to validate authz against.
:param headers: HTTP headers dictionary
:param enforcer: policy enforcer
:param target: the alarm or "auto" to
"""
rule_method = "telemetry:" + policy_name
policy_dict = dict()
policy_dict['roles'] = headers.get('X-Roles', "").split(",")
policy_dict['target.user_id'] = (headers.get('X-User-Id'))
policy_dict['target.project_id'] = (headers.get('X-Project-Id'))
credentials = {
'roles': headers.get('X-Roles', "").split(","),
'user_id': headers.get('X-User-Id'),
'project_id': headers.get('X-Project-Id'),
}
# TODO(sileht): add deprecation warning to be able to remove this:
# maintain backward compat with Juno and previous by allowing the action if
# there is no rule defined for it
rules = enforcer.rules.keys()
if (('default' in rules or rule_method in rules) and
not enforcer.enforce(rule_method, {}, policy_dict)):
pecan.core.abort(status_code=403, detail='RBAC Authorization Failed')
if rule_method not in rules:
return
if not enforcer.enforce(rule_method, target, credentials):
pecan.core.abort(status_code=403,
detail='RBAC Authorization Failed')
# TODO(fabiog): these methods are still used because the scoping part is really
@ -56,18 +78,19 @@ def get_limited_to(headers, enforcer):
one of these.
"""
policy_dict = dict()
policy_dict['roles'] = headers.get('X-Roles', "").split(",")
policy_dict['target.user_id'] = (headers.get('X-User-Id'))
policy_dict['target.project_id'] = (headers.get('X-Project-Id'))
# TODO(sileht): Only filtering on role work currently for segregation
# oslo.policy expects the target to be the alarm. That will allow
# to create more enhanced rbac. But for now we enforce the
# scoping of request to the project-id, so...
target = {}
credentials = {
'roles': headers.get('X-Roles', "").split(","),
}
# maintain backward compat with Juno and previous by using context_is_admin
# rule if the segregation rule (added in Kilo) is not defined
rules = enforcer.rules.keys()
rule_name = 'segregation' if 'segregation' in rules else 'context_is_admin'
if not enforcer.enforce(rule_name,
{},
policy_dict):
if not enforcer.enforce(rule_name, target, credentials):
return headers.get('X-User-Id'), headers.get('X-Project-Id')
return None, None

View File

@ -16,13 +16,11 @@
"""Base classes for API tests.
"""
import json
import os
from oslo_config import fixture as fixture_config
from oslo_utils import fileutils
import pecan
import pecan.testing
import six
from aodh import service
from aodh.tests.functional import db as db_test_base
@ -43,21 +41,8 @@ class FunctionalTest(db_test_base.TestBase):
self.CONF = self.useFixture(fixture_config.Config(conf)).conf
self.setup_messaging(self.CONF)
policies = json.dumps({
"context_is_admin": "role:admin",
"context_is_project": "project_id:%(target.project_id)s",
"context_is_owner": "user_id:%(target.user_id)s",
"segregation": "rule:context_is_admin",
"default": ""
})
if six.PY3:
policies = policies.encode('utf-8')
self.CONF.set_override("policy_file",
fileutils.write_to_tempfile(
content=policies,
prefix='policy',
suffix='.json'),
self.CONF.set_override('policy_file',
os.path.abspath('etc/aodh/policy.json'),
group='oslo_policy')
self.app = self._make_app()

View File

@ -0,0 +1,7 @@
{
"context_is_admin": "role:admin",
"segregation": "rule:context_is_admin",
"admin_or_owner": "rule:context_is_admin or project_id:%(project_id)s",
"default": "rule:admin_or_owner",
"telemetry:get_alarms": "role:admin"
}

View File

@ -15,6 +15,7 @@
"""Tests alarm operation."""
import datetime
import os
import uuid
import mock
@ -161,14 +162,15 @@ class TestAlarmsBase(v2.FunctionalTest):
storage_key = key
self.assertEqual(json[key], getattr(alarm, storage_key))
def _get_alarm(self, id):
data = self.get_json('/alarms')
def _get_alarm(self, id, auth_headers=None):
data = self.get_json('/alarms',
headers=auth_headers or self.auth_headers)
match = [a for a in data if a['alarm_id'] == id]
self.assertEqual(1, len(match), 'alarm %s not found' % id)
return match[0]
def _update_alarm(self, id, updated_data, auth_headers=None):
data = self._get_alarm(id)
data = self._get_alarm(id, auth_headers)
data.update(updated_data)
self.put_json('/alarms/%s' % id,
params=data,
@ -183,7 +185,7 @@ class TestAlarmsBase(v2.FunctionalTest):
class TestListEmptyAlarms(TestAlarmsBase):
def test_empty(self):
data = self.get_json('/alarms')
data = self.get_json('/alarms', headers=self.auth_headers)
self.assertEqual([], data)
@ -195,7 +197,7 @@ class TestAlarms(TestAlarmsBase):
self.alarm_conn.update_alarm(alarm)
def test_list_alarms(self):
data = self.get_json('/alarms')
data = self.get_json('/alarms', headers=self.auth_headers)
self.assertEqual(4, len(data))
self.assertEqual(set(['name1', 'name2', 'name3', 'name4']),
set(r['name'] for r in data))
@ -210,6 +212,7 @@ class TestAlarms(TestAlarmsBase):
date_time = datetime.datetime(2012, 7, 2, 10, 41)
isotime = date_time.isoformat()
resp = self.get_json('/alarms',
headers=self.auth_headers,
q=[{'field': 'timestamp',
'op': 'gt',
'value': isotime}],
@ -222,6 +225,7 @@ class TestAlarms(TestAlarmsBase):
def test_alarms_query_with_meter(self):
resp = self.get_json('/alarms',
headers=self.auth_headers,
q=[{'field': 'meter',
'op': 'eq',
'value': 'meter.mine'}],
@ -254,6 +258,7 @@ class TestAlarms(TestAlarmsBase):
severity='critical')
self.alarm_conn.update_alarm(alarm)
resp = self.get_json('/alarms',
headers=self.auth_headers,
q=[{'field': 'state',
'op': 'eq',
'value': 'ok'}],
@ -263,6 +268,7 @@ class TestAlarms(TestAlarmsBase):
def test_list_alarms_by_type(self):
alarms = self.get_json('/alarms',
headers=self.auth_headers,
q=[{'field': 'type',
'op': 'eq',
'value': 'threshold'}])
@ -271,14 +277,18 @@ class TestAlarms(TestAlarmsBase):
set(alarm['type'] for alarm in alarms))
def test_get_not_existing_alarm(self):
resp = self.get_json('/alarms/alarm-id-3', expect_errors=True)
resp = self.get_json('/alarms/alarm-id-3',
headers=self.auth_headers,
expect_errors=True)
self.assertEqual(404, resp.status_code)
self.assertEqual('Alarm alarm-id-3 not found',
self.assertEqual('Alarm alarm-id-3 not found in project %s' %
self.auth_headers["X-Project-Id"],
jsonutils.loads(resp.body)['error_message']
['faultstring'])
def test_get_alarm(self):
alarms = self.get_json('/alarms',
headers=self.auth_headers,
q=[{'field': 'name',
'value': 'name1',
}])
@ -286,7 +296,8 @@ class TestAlarms(TestAlarmsBase):
self.assertEqual('meter.test',
alarms[0]['threshold_rule']['meter_name'])
one = self.get_json('/alarms/%s' % alarms[0]['alarm_id'])
one = self.get_json('/alarms/%s' % alarms[0]['alarm_id'],
headers=self.auth_headers)
self.assertEqual('name1', one['name'])
self.assertEqual('meter.test', one['threshold_rule']['meter_name'])
self.assertEqual(alarms[0]['alarm_id'], one['alarm_id'])
@ -315,12 +326,14 @@ class TestAlarms(TestAlarmsBase):
self.alarm_conn.update_alarm(alarm)
alarms = self.get_json('/alarms',
headers=self.auth_headers,
q=[{'field': 'enabled',
'value': 'False'}])
self.assertEqual(1, len(alarms))
self.assertEqual('disabled', alarms[0]['name'])
one = self.get_json('/alarms/%s' % alarms[0]['alarm_id'])
one = self.get_json('/alarms/%s' % alarms[0]['alarm_id'],
headers=self.auth_headers)
self.assertEqual('disabled', one['name'])
def test_get_alarm_project_filter_wrong_op_normal_user(self):
@ -348,6 +361,7 @@ class TestAlarms(TestAlarmsBase):
def _test(field):
alarms = self.get_json('/alarms',
headers=self.auth_headers,
q=[{'field': field,
'op': 'eq',
'value': project}])
@ -372,6 +386,20 @@ class TestAlarms(TestAlarmsBase):
_test('project')
_test('project_id')
def test_get_alarm_forbiden(self):
pf = os.path.abspath('aodh/tests/functional/api/v2/policy.json-test')
self.CONF.set_override('policy_file', pf, group='oslo_policy')
self.app = self._make_app()
response = self.get_json('/alarms',
expect_errors=True,
status=403,
headers=self.auth_headers)
faultstring = 'RBAC Authorization Failed'
self.assertEqual(403, response.status_code)
self.assertEqual(faultstring,
response.json['error_message']['faultstring'])
def test_post_alarm_wsme_workaround(self):
jsons = {
'type': {
@ -1343,6 +1371,7 @@ class TestAlarms(TestAlarmsBase):
}
}
data = self.get_json('/alarms',
headers=self.auth_headers,
q=[{'field': 'name',
'value': 'name1',
}])
@ -1434,6 +1463,7 @@ class TestAlarms(TestAlarmsBase):
}
}
data = self.get_json('/alarms',
headers=self.auth_headers,
q=[{'field': 'name',
'value': 'name1',
}])
@ -1472,6 +1502,7 @@ class TestAlarms(TestAlarmsBase):
}
}
data = self.get_json('/alarms',
headers=self.auth_headers,
q=[{'field': 'name',
'value': 'name2',
}])
@ -1511,6 +1542,7 @@ class TestAlarms(TestAlarmsBase):
}
}
data = self.get_json('/alarms',
headers=self.auth_headers,
q=[{'field': 'name',
'value': 'name2',
}])
@ -1559,7 +1591,7 @@ class TestAlarms(TestAlarmsBase):
['http://no-trust-something/ok'], data['ok_actions'])
def test_delete_alarm(self):
data = self.get_json('/alarms')
data = self.get_json('/alarms', headers=self.auth_headers)
self.assertEqual(4, len(data))
resp = self.delete('/alarms/%s' % data[0]['alarm_id'],
@ -1570,7 +1602,7 @@ class TestAlarms(TestAlarmsBase):
self.assertEqual(3, len(alarms))
def test_get_state_alarm(self):
data = self.get_json('/alarms')
data = self.get_json('/alarms', headers=self.auth_headers)
self.assertEqual(4, len(data))
resp = self.get_json('/alarms/%s/state' % data[0]['alarm_id'],
@ -1578,7 +1610,7 @@ class TestAlarms(TestAlarmsBase):
self.assertEqual(resp, data[0]['state'])
def test_set_state_alarm(self):
data = self.get_json('/alarms')
data = self.get_json('/alarms', headers=self.auth_headers)
self.assertEqual(4, len(data))
resp = self.put_json('/alarms/%s/state' % data[0]['alarm_id'],
@ -1590,7 +1622,7 @@ class TestAlarms(TestAlarmsBase):
self.assertEqual('alarm', resp.json)
def test_set_invalid_state_alarm(self):
data = self.get_json('/alarms')
data = self.get_json('/alarms', headers=self.auth_headers)
self.assertEqual(4, len(data))
self.put_json('/alarms/%s/state' % data[0]['alarm_id'],
@ -1783,6 +1815,7 @@ class TestAlarmsHistory(TestAlarmsBase):
headers=self.auth_headers)
alarms = self.get_json('/alarms',
headers=self.auth_headers,
q=[{'field': 'name',
'value': 'new_alarm',
}])
@ -2039,7 +2072,7 @@ class TestAlarmsQuotas(TestAlarmsBase):
resp = self.post_json('/alarms', params=alarm,
headers=self.auth_headers)
self.assertEqual(201, resp.status_code)
alarms = self.get_json('/alarms')
alarms = self.get_json('/alarms', headers=self.auth_headers)
self.assertEqual(1, len(alarms))
alarm['name'] = 'another_user_alarm'
@ -2051,7 +2084,7 @@ class TestAlarmsQuotas(TestAlarmsBase):
self.assertIn(faultstring,
resp.json['error_message']['faultstring'])
alarms = self.get_json('/alarms')
alarms = self.get_json('/alarms', headers=self.auth_headers)
self.assertEqual(1, len(alarms))
def test_alarms_quotas(self):
@ -2087,7 +2120,8 @@ class TestAlarmsQuotas(TestAlarmsBase):
'op': 'eq',
'value': value
}]
alarms = self.get_json('/alarms', q=query)
alarms = self.get_json('/alarms', q=query,
headers=self.auth_headers)
self.assertEqual(1, len(alarms))
alarm = {
@ -2121,7 +2155,8 @@ class TestAlarmsQuotas(TestAlarmsBase):
self.assertEqual(201, resp.status_code)
_test('project_id', self.auth_headers['X-Project-Id'])
alarms = self.get_json('/alarms')
self.auth_headers["X-roles"] = "admin"
alarms = self.get_json('/alarms', headers=self.auth_headers)
self.assertEqual(2, len(alarms))
@ -2333,6 +2368,7 @@ class TestAlarmsRuleCombination(TestAlarmsBase):
def test_get_alarm_combination(self):
alarms = self.get_json('/alarms',
headers=self.auth_headers,
q=[{'field': 'name',
'value': 'name4',
}])
@ -2341,7 +2377,8 @@ class TestAlarmsRuleCombination(TestAlarmsBase):
alarms[0]['combination_rule']['alarm_ids'])
self.assertEqual('or', alarms[0]['combination_rule']['operator'])
one = self.get_json('/alarms/%s' % alarms[0]['alarm_id'])
one = self.get_json('/alarms/%s' % alarms[0]['alarm_id'],
headers=self.auth_headers)
self.assertEqual('name4', one['name'])
self.assertEqual(['a', 'b'],
alarms[0]['combination_rule']['alarm_ids'])
@ -2621,6 +2658,7 @@ class TestAlarmsRuleCombination(TestAlarmsBase):
}
data = self.get_json('/alarms',
headers=self.auth_headers,
q=[{'field': 'name',
'value': 'name4',
}])
@ -2646,6 +2684,7 @@ class TestAlarmsRuleCombination(TestAlarmsBase):
}
data = self.get_json('/alarms',
headers=self.auth_headers,
q=[{'field': 'name',
'value': 'name4',
}])
@ -2673,6 +2712,7 @@ class TestAlarmsRuleCombination(TestAlarmsBase):
def test_put_combination_alarm_with_duplicate_ids(self):
"""Test combination alarm doesn't allow duplicate alarm ids."""
alarms = self.get_json('/alarms',
headers=self.auth_headers,
q=[{'field': 'name',
'value': 'name4',
}])
@ -2785,7 +2825,7 @@ class TestAlarmsRuleGnocchi(TestAlarmsBase):
self.alarm_conn.update_alarm(alarm)
def test_list_alarms(self):
data = self.get_json('/alarms')
data = self.get_json('/alarms', headers=self.auth_headers)
self.assertEqual(3, len(data))
self.assertEqual(set(['name1', 'name2', 'name3']),
set(r['name'] for r in data))
@ -2981,7 +3021,7 @@ class TestAlarmsEvent(TestAlarmsBase):
)
self.alarm_conn.update_alarm(alarm)
data = self.get_json('/alarms')
data = self.get_json('/alarms', headers=self.auth_headers)
self.assertEqual(1, len(data))
self.assertEqual(set(['event.alarm.1']),
set(r['name'] for r in data))

View File

@ -77,6 +77,7 @@ class TestQueryAlarmsController(tests_api.FunctionalTest):
def test_query_all(self):
data = self.post_json(self.alarm_url,
headers=admin_header,
params={})
self.assertEqual(12, len(data.json))
@ -86,6 +87,7 @@ class TestQueryAlarmsController(tests_api.FunctionalTest):
isotime = date_time.isoformat()
data = self.post_json(self.alarm_url,
headers=admin_header,
params={"filter":
'{">": {"timestamp": "'
+ isotime + '"}}'})
@ -101,6 +103,7 @@ class TestQueryAlarmsController(tests_api.FunctionalTest):
isotime = date_time.isoformat()
data = self.post_json(self.alarm_url,
headers=admin_header,
params={"filter":
'{">": {"state_timestamp": "'
+ isotime + '"}}'})
@ -160,6 +163,7 @@ class TestQueryAlarmsController(tests_api.FunctionalTest):
def test_query_with_field_project(self):
data = self.post_json(self.alarm_url,
headers=admin_header,
params={"filter":
'{"=": {"project": "project-id2"}}'})
@ -169,6 +173,7 @@ class TestQueryAlarmsController(tests_api.FunctionalTest):
def test_query_with_field_user_in_orderby(self):
data = self.post_json(self.alarm_url,
headers=admin_header,
params={"filter": '{"=": {"state": "alarm"}}',
"orderby": '[{"user": "DESC"}]'})
@ -179,6 +184,7 @@ class TestQueryAlarmsController(tests_api.FunctionalTest):
def test_query_with_filter_orderby_and_limit(self):
orderby = '[{"state_timestamp": "DESC"}]'
data = self.post_json(self.alarm_url,
headers=admin_header,
params={"filter": '{"=": {"state": "alarm"}}',
"orderby": orderby,
"limit": 3})
@ -193,6 +199,7 @@ class TestQueryAlarmsController(tests_api.FunctionalTest):
def test_limit_should_be_positive(self):
data = self.post_json(self.alarm_url,
headers=admin_header,
params={"limit": 0},
expect_errors=True)
@ -223,6 +230,7 @@ class TestQueryAlarmsHistoryController(tests_api.FunctionalTest):
def test_query_all(self):
data = self.post_json(self.url,
headers=admin_header,
params={})
self.assertEqual(8, len(data.json))
@ -232,6 +240,7 @@ class TestQueryAlarmsHistoryController(tests_api.FunctionalTest):
isotime = date_time.isoformat()
data = self.post_json(self.url,
headers=admin_header,
params={"filter":
'{">": {"timestamp":"'
+ isotime + '"}}'})
@ -283,6 +292,7 @@ class TestQueryAlarmsHistoryController(tests_api.FunctionalTest):
def test_query_with_filter_for_project_orderby_with_user(self):
data = self.post_json(self.url,
headers=admin_header,
params={"filter":
'{"=": {"project": "project-id1"}}',
"orderby": '[{"user": "DESC"}]',
@ -298,6 +308,7 @@ class TestQueryAlarmsHistoryController(tests_api.FunctionalTest):
def test_query_with_filter_orderby_and_limit(self):
data = self.post_json(self.url,
headers=admin_header,
params={"filter": '{"=": {"type": "creation"}}',
"orderby": '[{"timestamp": "DESC"}]',
"limit": 3})
@ -313,6 +324,7 @@ class TestQueryAlarmsHistoryController(tests_api.FunctionalTest):
def test_limit_should_be_positive(self):
data = self.post_json(self.url,
params={"limit": 0},
headers=admin_header,
expect_errors=True)
self.assertEqual(400, data.status_int)

View File

@ -15,7 +15,6 @@
"""Fixtures used during Gabbi-based test runs."""
import json
import os
from unittest import case
import uuid
@ -24,8 +23,6 @@ from gabbi import fixture
import mock
from oslo_config import fixture as fixture_config
from oslo_policy import opts
from oslo_utils import fileutils
import six
from six.moves.urllib import parse as urlparse
from aodh import service
@ -70,22 +67,10 @@ class ConfigFixture(fixture.GabbiFixture):
self.conf = conf
opts.set_defaults(self.conf)
policies = json.dumps({
"context_is_admin": "role:admin",
"context_is_project": "project_id:%(target.project_id)s",
"context_is_owner": "user_id:%(target.user_id)s",
"segregation": "rule:context_is_admin",
"default": ""
})
if six.PY3:
policies = policies.encode('utf-8')
self.conf.set_override("policy_file",
fileutils.write_to_tempfile(
content=policies,
prefix='policy',
suffix='.json'),
group='oslo_policy')
conf.set_override('policy_file',
os.path.abspath(
'aodh/tests/open-policy.json'),
group='oslo_policy')
database_name = '%s-%s' % (db_url, str(uuid.uuid4()))
conf.set_override('connection', database_name, group='database')

View File

@ -0,0 +1,5 @@
{
"context_is_admin": "role:admin",
"segregation": "rule:context_is_admin",
"default": ""
}

View File

@ -99,7 +99,7 @@ class BinApiTestCase(base.BaseTestCase):
# create aodh.conf file
self.api_port = random.randint(10000, 11000)
self.pipeline_cfg_file = self.path_get('etc/aodh/pipeline.yaml')
self.policy_file = self.path_get('etc/aodh/policy.json')
self.policy_file = self.path_get('aodh/tests/open-policy.json')
def tearDown(self):
super(BinApiTestCase, self).tearDown()

View File

@ -1,20 +1,20 @@
{
"context_is_admin": "role:admin",
"context_is_project": "project_id:%(target.project_id)s",
"context_is_owner": "user_id:%(target.user_id)s",
"segregation": "rule:context_is_admin",
"admin_or_owner": "rule:context_is_admin or project_id:%(project_id)s",
"default": "rule:admin_or_owner",
"telemetry:get_alarm": "rule:context_is_admin",
"telemetry:get_alarms": "rule:context_is_admin",
"telemetry:query_alarm": "rule:context_is_admin",
"telemetry:get_alarm": "rule:admin_or_owner",
"telemetry:get_alarms": "rule:admin_or_owner",
"telemetry:query_alarm": "rule:admin_or_owner",
"telemetry:create_alarm": "rule:context_is_admin",
"telemetry:change_alarm": "rule:context_is_admin",
"telemetry:delete_alarm": "rule:context_is_admin",
"telemetry:create_alarm": "",
"telemetry:change_alarm": "rule:admin_or_owner",
"telemetry:delete_alarm": "rule:admin_or_owner",
"telemetry:get_alarm_state": "rule:context_is_admin",
"telemetry:change_alarm_state": "rule:context_is_admin",
"telemetry:get_alarm_state": "rule:admin_or_owner",
"telemetry:change_alarm_state": "rule:admin_or_owner",
"telemetry:alarm_history": "rule:context_is_admin",
"telemetry:query_alarm_history": "rule:context_is_admin"
"telemetry:alarm_history": "rule:admin_or_owner",
"telemetry:query_alarm_history": "rule:admin_or_owner"
}