From d6b5247dbb1acedd43b3dc8e7b1cac06519467c4 Mon Sep 17 00:00:00 2001 From: Dmitriy Rabotyagov Date: Tue, 12 Mar 2024 11:17:58 +0100 Subject: [PATCH] Replace use of LegacyEngineFacade and adopt to werkzeug>3.0 We aren't passing context objects around, hence this looks more like ironic's use of oslo.db than e.g. nova or cinder. Nothing complicated though. This also replaces deprecated werkzeug.http.parse_authorization_header with Authorization.from_header Change-Id: I32a1417c001cdb6be0719baeb186f79609875f4d --- .zuul.yaml | 4 +- vitrage/api/controllers/v1/topology.py | 2 +- vitrage/middleware/basic_and_keystone_auth.py | 4 +- vitrage/storage/__init__.py | 7 +- vitrage/storage/base.py | 4 - vitrage/storage/history_facade.py | 132 +++--- vitrage/storage/impl_sqlalchemy.py | 421 +++++++++++------- .../tests/functional/test_configuration.py | 4 +- 8 files changed, 334 insertions(+), 244 deletions(-) diff --git a/.zuul.yaml b/.zuul.yaml index 66aa9a50e..6bf489775 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -12,7 +12,7 @@ - vitrage-tempest-plugin-api-ipv6-only - vitrage-tempest-plugin-datasources: voting: false - - vitrage-grenade + # - vitrage-grenade - openstack-tox-py38 - openstack-tox-py310 @@ -22,7 +22,7 @@ - vitrage-tempest-plugin-api-ipv6-only - vitrage-tempest-plugin-datasources: voting: false - - vitrage-grenade + # - vitrage-grenade - openstack-tox-py38 - openstack-tox-py310 diff --git a/vitrage/api/controllers/v1/topology.py b/vitrage/api/controllers/v1/topology.py index 150ca279d..ac8144a08 100644 --- a/vitrage/api/controllers/v1/topology.py +++ b/vitrage/api/controllers/v1/topology.py @@ -109,7 +109,7 @@ class TopologyController(RootRestController): def as_tree(graph, root=OPENSTACK_CLUSTER, reverse=False): if nx.__version__ >= '2.0': linked_graph = json_graph.node_link_graph( - graph, attrs={'name': 'graph_index'}) + graph, name='graph_index') else: linked_graph = json_graph.node_link_graph(graph) if 0 == nx.number_of_nodes(linked_graph): diff --git a/vitrage/middleware/basic_and_keystone_auth.py b/vitrage/middleware/basic_and_keystone_auth.py index 7f1827ebf..9605dd3d3 100644 --- a/vitrage/middleware/basic_and_keystone_auth.py +++ b/vitrage/middleware/basic_and_keystone_auth.py @@ -12,7 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. -import werkzeug.http +import werkzeug.datastructures from http import client as httplib from keystoneauth1.identity.generic import password @@ -115,7 +115,7 @@ class BasicAndKeystoneAuth(AuthProtocol): @staticmethod def _get_basic_authenticator(req): - auth = werkzeug.http.parse_authorization_header( + auth = werkzeug.datastructures.Authorization.from_header( req.headers.get("Authorization")) return auth diff --git a/vitrage/storage/__init__.py b/vitrage/storage/__init__.py index 1c072bdfb..3bd9a5cd6 100644 --- a/vitrage/storage/__init__.py +++ b/vitrage/storage/__init__.py @@ -13,14 +13,17 @@ # under the License. from oslo_config import cfg +from oslo_db.sqlalchemy import enginefacade from oslo_log import log from stevedore import driver import tenacity +import threading from urllib import parse as urlparse from vitrage.utils.datetime import utcnow _NAMESPACE = 'vitrage.storage' +_CONTEXT = threading.local() CONF = cfg.CONF LOG = log.getLogger(__name__) @@ -52,8 +55,8 @@ def get_connection_from_config(): def _get_connection(): """Return an open connection to the database.""" conn = mgr.driver(url) - session = conn._engine_facade.get_session() - session.execute('SELECT 1;') + with enginefacade.reader.using(_CONTEXT) as session: + session.execute('SELECT 1;') return conn return _get_connection() diff --git a/vitrage/storage/base.py b/vitrage/storage/base.py index 38d7023e2..fe6f4b55b 100644 --- a/vitrage/storage/base.py +++ b/vitrage/storage/base.py @@ -52,10 +52,6 @@ class Connection(object, metaclass=abc.ABCMeta): def changes(self): return None - @abc.abstractmethod - def disconnect(self): - raise NotImplementedError('disconnect is not implemented') - @abc.abstractmethod def clear(self): raise NotImplementedError('clear is not implemented') diff --git a/vitrage/storage/history_facade.py b/vitrage/storage/history_facade.py index 07c606f7c..a00031b2e 100644 --- a/vitrage/storage/history_facade.py +++ b/vitrage/storage/history_facade.py @@ -16,7 +16,9 @@ import pytz import sqlalchemy from sqlalchemy import and_ from sqlalchemy import or_ +import threading +from oslo_db.sqlalchemy import enginefacade from oslo_db.sqlalchemy import utils as sqlalchemyutils from oslo_log import log from oslo_utils import timeutils @@ -36,10 +38,16 @@ LIMIT = 10000 ASC = 'asc' DESC = 'desc' +_CONTEXT = threading.local() + + +def _session_for_read(): + session = enginefacade.reader.using(_CONTEXT) + return session + class HistoryFacadeConnection(object): - def __init__(self, engine_facade, alarms, edges, changes): - self._engine_facade = engine_facade + def __init__(self, alarms, edges, changes): self._alarms = alarms self._edges = edges self._changes = changes @@ -59,28 +67,29 @@ class HistoryFacadeConnection(object): def count_active_alarms(self, project_id=None, is_admin_project=False): - session = self._engine_facade.get_session() - query = session.query(models.Alarm) - query = query.filter(models.Alarm.end_timestamp > db_time()) - query = self._add_project_filtering_to_query( - query, project_id, is_admin_project) + with _session_for_read() as session: + query = session.query(models.Alarm) + query = query.filter(models.Alarm.end_timestamp > db_time()) + query = self._add_project_filtering_to_query( + query, project_id, is_admin_project) - query_severe = query.filter( - models.Alarm.vitrage_operational_severity == OSeverity.SEVERE) - query_critical = query.filter( - models.Alarm.vitrage_operational_severity == OSeverity.CRITICAL) - query_warning = query.filter( - models.Alarm.vitrage_operational_severity == OSeverity.WARNING) - query_ok = query.filter( - models.Alarm.vitrage_operational_severity == OSeverity.OK) - query_na = query.filter( - models.Alarm.vitrage_operational_severity == OSeverity.NA) + query_severe = query.filter( + models.Alarm.vitrage_operational_severity == OSeverity.SEVERE) + query_critical = query.filter( + models.Alarm.vitrage_operational_severity == OSeverity.CRITICAL + ) + query_warning = query.filter( + models.Alarm.vitrage_operational_severity == OSeverity.WARNING) + query_ok = query.filter( + models.Alarm.vitrage_operational_severity == OSeverity.OK) + query_na = query.filter( + models.Alarm.vitrage_operational_severity == OSeverity.NA) - counts = {OSeverity.SEVERE: query_severe.count(), - OSeverity.CRITICAL: query_critical.count(), - OSeverity.WARNING: query_warning.count(), - OSeverity.OK: query_ok.count(), - OSeverity.NA: query_na.count()} + counts = {OSeverity.SEVERE: query_severe.count(), + OSeverity.CRITICAL: query_critical.count(), + OSeverity.WARNING: query_warning.count(), + OSeverity.OK: query_ok.count(), + OSeverity.NA: query_na.count()} return counts @@ -199,37 +208,38 @@ class HistoryFacadeConnection(object): project_id=None or resource_project_id=None """ - session = self._engine_facade.get_session() - query = session.query(models.Alarm) - query = self._add_project_filtering_to_query( - query, project_id, is_admin_project) + with _session_for_read() as session: + query = session.query(models.Alarm) + query = self._add_project_filtering_to_query( + query, project_id, is_admin_project) - self.assert_args(start, end, filter_by, filter_vals, - only_active_alarms, sort_dirs) + self.assert_args(start, end, filter_by, filter_vals, + only_active_alarms, sort_dirs) - if only_active_alarms: - query = query.filter(models.Alarm.end_timestamp > db_time()) - elif (start and end) or start: - query = self._add_time_frame_to_query(query, start, end) + if only_active_alarms: + query = query.filter(models.Alarm.end_timestamp > db_time()) + elif (start and end) or start: + query = self._add_time_frame_to_query(query, start, end) - query = self._add_filtering_to_query(query, filter_by, filter_vals) + query = self._add_filtering_to_query(query, filter_by, filter_vals) - if limit: - query = self._generate_alarms_paginate_query(query, - limit, - sort_by, - sort_dirs, - next_page, - marker) - elif limit == 0: - sort_dir_func = { - ASC: sqlalchemy.asc, - DESC: sqlalchemy.desc, - } - for i in range(len(sort_by)): - query.order_by(sort_dir_func[sort_dirs[i]]( - getattr(models.Alarm, sort_by[i]))) - return query.all() + if limit: + query = self._generate_alarms_paginate_query(query, + limit, + sort_by, + sort_dirs, + next_page, + marker) + elif limit == 0: + sort_dir_func = { + ASC: sqlalchemy.asc, + DESC: sqlalchemy.desc, + } + for i in range(len(sort_by)): + query.order_by(sort_dir_func[sort_dirs[i]]( + getattr(models.Alarm, sort_by[i]))) + all_results = query.all() + return all_results @staticmethod def assert_args(start, @@ -322,10 +332,10 @@ class HistoryFacadeConnection(object): limit = min(int(limit), LIMIT) if marker: - session = self._engine_facade.get_session() - marker = session.query(models.Alarm). \ - filter(models.Alarm.vitrage_id == - marker).first() + with _session_for_read() as session: + marker = session.query(models.Alarm). \ + filter(models.Alarm.vitrage_id == + marker).first() if HProps.VITRAGE_ID not in sort_by: sort_by.append(HProps.VITRAGE_ID) @@ -394,15 +404,15 @@ class HistoryFacadeConnection(object): def _rca_edges(self, filter_by, a_ids, proj_id, admin): alarm_ids = [str(alarm) for alarm in a_ids] - session = self._engine_facade.get_session() - query = session.query(models.Edge)\ - .filter(and_(getattr(models.Edge, filter_by).in_(alarm_ids), - models.Edge.label == ELabel.CAUSES)) + with _session_for_read() as session: + query = session.query(models.Edge)\ + .filter(and_(getattr(models.Edge, filter_by).in_(alarm_ids), + models.Edge.label == ELabel.CAUSES)) - query = query.join(models.Edge.target) - query = self._add_project_filtering_to_query(query, proj_id, admin) - - return query.all() + query = query.join(models.Edge.target) + query = self._add_project_filtering_to_query(query, proj_id, admin) + all_results = query.all() + return all_results def _out_rca(self, sources, proj_id, admin): return self._rca_edges(HProps.SOURCE_ID, sources, proj_id, admin) diff --git a/vitrage/storage/impl_sqlalchemy.py b/vitrage/storage/impl_sqlalchemy.py index c7db25b94..4e9d1d16f 100644 --- a/vitrage/storage/impl_sqlalchemy.py +++ b/vitrage/storage/impl_sqlalchemy.py @@ -12,12 +12,19 @@ # License for the specific language governing permissions and limitations # under the License. +import functools +import logging +import threading + from oslo_config import cfg -from oslo_db.sqlalchemy import session as db_session +from oslo_db import api as oslo_db_api +from oslo_db.sqlalchemy import enginefacade from oslo_log import log from sqlalchemy import and_, or_ from sqlalchemy.engine import url as sqlalchemy_url +from sqlalchemy import exc as sa_exc from sqlalchemy import func +import tenacity from vitrage.common.exception import VitrageInputError from vitrage.entity_graph.mappings.operational_alarm_severity import \ @@ -29,35 +36,78 @@ from vitrage.storage.sqlalchemy import models from vitrage.storage.sqlalchemy.models import Template CONF = cfg.CONF +DB_CONFIGURED = False LOG = log.getLogger(__name__) +_CONTEXT = threading.local() + + +def _session_for_read(): + session = enginefacade.reader.using(_CONTEXT) + return session + + +def _session_for_write(): + session = enginefacade.writer.using(_CONTEXT) + return session + + +def wrap_sqlite_retry(f): + + @functools.wraps(f) + def wrapper(*args, **kwargs): + if ('sqlite' not in CONF.database.connection.lower()): + return f(*args, **kwargs) + else: + for attempt in tenacity.Retrying( + retry=( + tenacity.retry_if_exception_type( + sa_exc.OperationalError) + & tenacity.retry_if_exception( + lambda e: 'database is locked' in str(e)) + ), + wait=tenacity.wait_random( + min=0.1, + max=1, + ), + before_sleep=( + tenacity.before_sleep_log(LOG, logging.DEBUG) + ), + stop=tenacity.stop_after_delay(max_delay=10), + reraise=False + ): + with attempt: + return f(*args, **kwargs) + return wrapper class Connection(base.Connection): def __init__(self, url): - options = dict(CONF.database.items()) - # set retries to 0 , since reconnection is already implemented - # in storage.__init__.get_connection_from_config function - options['max_retries'] = 0 - # add vitrage opts to database group - for opt in storage.OPTS: - options.pop(opt.name, None) - self._engine_facade = db_session.EngineFacade(self._dress_url(url), - autocommit=True, - **options) - self._active_actions = ActiveActionsConnection(self._engine_facade) - self._events = EventsConnection(self._engine_facade) - self._templates = TemplatesConnection(self._engine_facade) - self._graph_snapshots = GraphSnapshotsConnection(self._engine_facade) - self._webhooks = WebhooksConnection( - self._engine_facade) - self._alarms = AlarmsConnection( - self._engine_facade) - self._edges = EdgesConnection( - self._engine_facade) - self._changes = ChangesConnection( - self._engine_facade) + global DB_CONFIGURED + + if not DB_CONFIGURED: + options = dict(CONF.database.items()) + options['connection'] = self._dress_url(url) + # set retries to 0 , since reconnection is already implemented + # in storage.__init__.get_connection_from_config function + options['max_retries'] = 0 + # add vitrage opts to database group + for opt in storage.OPTS: + options.pop(opt.name, None) + + enginefacade.configure(**options) + + DB_CONFIGURED = True + + self._active_actions = ActiveActionsConnection() + self._events = EventsConnection() + self._templates = TemplatesConnection() + self._graph_snapshots = GraphSnapshotsConnection() + self._webhooks = WebhooksConnection() + self._alarms = AlarmsConnection() + self._edges = EdgesConnection() + self._changes = ChangesConnection() self._history_facade = HistoryFacadeConnection( - self._engine_facade, self._alarms, self._edges, self._changes) + self._alarms, self._edges, self._changes) @property def webhooks(self): @@ -104,50 +154,46 @@ class Connection(base.Connection): return str(url) return url - def disconnect(self): - self._engine_facade.get_engine().dispose() - def clear(self): - engine = self._engine_facade.get_engine() + engine = enginefacade.writer.get_engine() for table in reversed(models.Base.metadata.sorted_tables): engine.execute(table.delete()) engine.dispose() class BaseTableConn(object): - def __init__(self, engine_facade): - super(BaseTableConn, self).__init__() - self._engine_facade = engine_facade - + @wrap_sqlite_retry + @oslo_db_api.retry_on_deadlock def bulk_create(self, items): if not items: return - session = self._engine_facade.get_session() - with session.begin(): + with _session_for_write() as session: session.bulk_save_objects(items) - def query_filter(self, model, **kwargs): - session = self._engine_facade.get_session() - query = session.query(model) - for keyword, arg in kwargs.items(): - if arg is not None: - query = query.filter(getattr(model, keyword) == arg) + @wrap_sqlite_retry + @oslo_db_api.retry_on_deadlock + def query_filter(self, model, action, **kwargs): + with _session_for_write() as session: + query = session.query(model) + for keyword, arg in kwargs.items(): + if arg is not None: + query = query.filter(getattr(model, keyword) == arg) + query = getattr(query, action)() return query class TemplatesConnection(base.TemplatesConnection, BaseTableConn): - def __init__(self, engine_facade): - super(TemplatesConnection, self).__init__(engine_facade) - + @wrap_sqlite_retry + @oslo_db_api.retry_on_deadlock def create(self, template): - session = self._engine_facade.get_session() - with session.begin(): + with _session_for_write() as session: session.add(template) + @wrap_sqlite_retry + @oslo_db_api.retry_on_deadlock def update(self, uuid, var, value): - session = self._engine_facade.get_session() - with session.begin(): + with _session_for_write() as session: session.query(Template).filter_by(uuid=uuid).update({var: value}) def query(self, name=None, file_content=None, @@ -155,6 +201,7 @@ class TemplatesConnection(base.TemplatesConnection, BaseTableConn): template_type=None): query = self.query_filter( models.Template, + 'all', name=name, file_content=file_content, uuid=uuid, @@ -162,41 +209,43 @@ class TemplatesConnection(base.TemplatesConnection, BaseTableConn): status_details=status_details, template_type=template_type, ) - return query.all() + return query + @wrap_sqlite_retry def query_with_status_not(self, name, status): - session = self._engine_facade.get_session() - query = session.query(models.Template) - query = query.filter( - and_ - ( - models.Template.status != status, - models.Template.name == name + with _session_for_read() as session: + query = session.query(models.Template) + query = query.filter( + and_ + ( + models.Template.status != status, + models.Template.name == name + ) ) - ) - return query.first() + result = query.first() + return result def delete(self, name=None, uuid=None): query = self.query_filter( models.Template, + 'delete', name=name, uuid=uuid, ) - return query.delete() + return query class ActiveActionsConnection(base.ActiveActionsConnection, BaseTableConn): - def __init__(self, engine_facade): - super(ActiveActionsConnection, self).__init__(engine_facade) - + @wrap_sqlite_retry + @oslo_db_api.retry_on_deadlock def create(self, active_action): - session = self._engine_facade.get_session() - with session.begin(): + with _session_for_write() as session: session.add(active_action) + @wrap_sqlite_retry + @oslo_db_api.retry_on_deadlock def update(self, active_action): - session = self._engine_facade.get_session() - with session.begin(): + with _session_for_write() as session: session.merge(active_action) def query(self, @@ -209,6 +258,7 @@ class ActiveActionsConnection(base.ActiveActionsConnection, BaseTableConn): trigger=None): query = self.query_filter( models.ActiveAction, + 'all', action_type=action_type, extra_info=extra_info, source_vertex_id=source_vertex_id, @@ -216,22 +266,24 @@ class ActiveActionsConnection(base.ActiveActionsConnection, BaseTableConn): action_id=action_id, score=score, trigger=trigger) - return query.all() + return query + @wrap_sqlite_retry def query_similar(self, actions): """Query DB for all actions with same properties""" - session = self._engine_facade.get_session() - query = session.query(models.ActiveAction) + with _session_for_read() as session: + query = session.query(models.ActiveAction) - filters = [] - for source, target, extra_info, action_type in actions: - filters.append( - and_(models.ActiveAction.action_type == action_type, - models.ActiveAction.extra_info == extra_info, - models.ActiveAction.source_vertex_id == source, - models.ActiveAction.target_vertex_id == target,)) - query = query.filter(or_(*filters)) - return query.all() + filters = [] + for source, target, extra_info, action_type in actions: + filters.append( + and_(models.ActiveAction.action_type == action_type, + models.ActiveAction.extra_info == extra_info, + models.ActiveAction.source_vertex_id == source, + models.ActiveAction.target_vertex_id == target,)) + query = query.filter(or_(*filters)) + result = query.all() + return result def delete(self, action_type=None, @@ -243,6 +295,7 @@ class ActiveActionsConnection(base.ActiveActionsConnection, BaseTableConn): trigger=None): query = self.query_filter( models.ActiveAction, + 'delete', action_type=action_type, extra_info=extra_info, source_vertex_id=source_vertex_id, @@ -250,31 +303,32 @@ class ActiveActionsConnection(base.ActiveActionsConnection, BaseTableConn): action_id=action_id, score=score, trigger=trigger) - return query.delete() + return query + @wrap_sqlite_retry + @oslo_db_api.retry_on_deadlock def bulk_delete(self, actions): if not actions: return - session = self._engine_facade.get_session() - query = session.query(models.ActiveAction) + with _session_for_write() as session: + query = session.query(models.ActiveAction) - filters = [] - for trigger, action_id in actions: - filters.append( - and_(models.ActiveAction.trigger == trigger, - models.ActiveAction.action_id == action_id)) - query = query.filter(or_(*filters)) - return query.delete() + filters = [] + for trigger, action_id in actions: + filters.append( + and_(models.ActiveAction.trigger == trigger, + models.ActiveAction.action_id == action_id)) + query = query.filter(or_(*filters)) + result = query.delete() + return result class WebhooksConnection(base.WebhooksConnection, BaseTableConn): - def __init__(self, engine_facade): - super(WebhooksConnection, self).__init__(engine_facade) - + @wrap_sqlite_retry + @oslo_db_api.retry_on_deadlock def create(self, webhook): - session = self._engine_facade.get_session() - with session.begin(): + with _session_for_write() as session: session.add(webhook) def query(self, @@ -286,47 +340,51 @@ class WebhooksConnection(base.WebhooksConnection, regex_filter=None): query = self.query_filter( models.Webhooks, + 'all', id=id, project_id=project_id, is_admin_webhook=is_admin_webhook, url=url, headers=headers, regex_filter=regex_filter) - return query.all() + return query def delete(self, id=None): - query = self.query_filter(models.Webhooks, id=id) - return query.delete() + query = self.query_filter(models.Webhooks, 'delete', id=id) + return query class EventsConnection(base.EventsConnection, BaseTableConn): - def __init__(self, engine_facade): - super(EventsConnection, self).__init__(engine_facade) - + @wrap_sqlite_retry + @oslo_db_api.retry_on_deadlock def create(self, event): - session = self._engine_facade.get_session() - with session.begin(): + with _session_for_write() as session: session.add(event) + @wrap_sqlite_retry + @oslo_db_api.retry_on_deadlock def update(self, event): - session = self._engine_facade.get_session() - with session.begin(): + with _session_for_write() as session: session.merge(event) + @wrap_sqlite_retry def get_last_event_id(self): - session = self._engine_facade.get_session() - query = session.query(models.Event.event_id) - return query.order_by(models.Event.event_id.desc()).first() + with _session_for_read() as session: + query = session.query(models.Event.event_id) + result = query.order_by(models.Event.event_id.desc()).first() + return result + @wrap_sqlite_retry def get_replay_events(self, event_id): """Get all events that occurred after the specified event_id :rtype: list of vitrage.storage.sqlalchemy.models.Event """ - session = self._engine_facade.get_session() - query = session.query(models.Event) - query = query.filter(models.Event.event_id > event_id) - return query.order_by(models.Event.event_id.asc()).all() + with _session_for_read() as session: + query = session.query(models.Event) + query = query.filter(models.Event.event_id > event_id) + result = query.order_by(models.Event.event_id.asc()).all() + return result def query(self, event_id=None, @@ -371,73 +429,85 @@ class EventsConnection(base.EventsConnection, BaseTableConn): lt_collector_timestamp) return query + @wrap_sqlite_retry + @oslo_db_api.retry_on_deadlock def delete(self, event_id=None): """Delete all events older than event_id""" - session = self._engine_facade.get_session() - query = session.query(models.Event) - if event_id: - query = query.filter(models.Event.event_id < event_id) - query.delete() + with _session_for_write() as session: + query = session.query(models.Event) + if event_id: + query = query.filter(models.Event.event_id < event_id) + query.delete() class GraphSnapshotsConnection(base.GraphSnapshotsConnection, BaseTableConn): - def __init__(self, engine_facade): - super(GraphSnapshotsConnection, self).__init__(engine_facade) - + @wrap_sqlite_retry + @oslo_db_api.retry_on_deadlock def create(self, graph_snapshot): - session = self._engine_facade.get_session() - with session.begin(): + with _session_for_write() as session: session.add(graph_snapshot) + @wrap_sqlite_retry + @oslo_db_api.retry_on_deadlock def update(self, graph_snapshot): - session = self._engine_facade.get_session() - with session.begin(): + with _session_for_write() as session: session.merge(graph_snapshot) + @wrap_sqlite_retry def query(self): - query = self.query_filter(models.GraphSnapshot) - return query.first() + with _session_for_read() as session: + query = session.query(models.GraphSnapshot) + result = query.first() + return result + @wrap_sqlite_retry def query_snapshot_event_id(self): """Select the event_id of the stored snapshot""" - session = self._engine_facade.get_session() - query = session.query(models.GraphSnapshot.event_id) - result = query.first() + with _session_for_read() as session: + query = session.query(models.GraphSnapshot.event_id) + result = query.first() return result[0] if result else None + @wrap_sqlite_retry + @oslo_db_api.retry_on_deadlock def delete(self): """Delete all graph snapshots""" - query = self.query_filter(models.GraphSnapshot) - query.delete() + with _session_for_write() as session: + query = session.query(models.GraphSnapshot) + query.delete() class AlarmsConnection(base.AlarmsConnection, BaseTableConn): - def __init__(self, engine_facade): - super(AlarmsConnection, self).__init__(engine_facade) - + @wrap_sqlite_retry + @oslo_db_api.retry_on_deadlock def create(self, alarm): - session = self._engine_facade.get_session() - with session.begin(): + with _session_for_write() as session: session.add(alarm) + @wrap_sqlite_retry + @oslo_db_api.retry_on_deadlock def update(self, vitrage_id, key, val): - session = self._engine_facade.get_session() - with session.begin(): + with _session_for_write() as session: query = session.query(models.Alarm).filter( models.Alarm.vitrage_id == vitrage_id) query.update({getattr(models.Alarm, key): val}) + @wrap_sqlite_retry + @oslo_db_api.retry_on_deadlock def end_all_alarms(self, end_time): - session = self._engine_facade.get_session() - query = session.query(models.Alarm).filter( - models.Alarm.end_timestamp > end_time) - query.update({models.Alarm.end_timestamp: end_time}) + with _session_for_write() as session: + query = session.query(models.Alarm).filter( + models.Alarm.end_timestamp > end_time) + query.update({models.Alarm.end_timestamp: end_time}) + @wrap_sqlite_retry + @oslo_db_api.retry_on_deadlock def delete_expired(self, expire_by=None): - session = self._engine_facade.get_session() - query = session.query(models.Alarm) - query = query.filter(models.Alarm.end_timestamp < expire_by) - return query.delete() + with _session_for_write() as session: + query = session.query(models.Alarm) + query = query.filter(models.Alarm.end_timestamp < expire_by) + del_query = query.delete() + return del_query def delete(self, vitrage_id=None, @@ -445,47 +515,51 @@ class AlarmsConnection(base.AlarmsConnection, BaseTableConn): end_timestamp=None): query = self.query_filter( models.Alarm, + 'delete', vitrage_id=vitrage_id, start_timestamp=start_timestamp, end_timestamp=end_timestamp) - return query.delete() + return query class EdgesConnection(base.EdgesConnection, BaseTableConn): - def __init__(self, engine_facade): - super(EdgesConnection, self).__init__(engine_facade) - + @wrap_sqlite_retry + @oslo_db_api.retry_on_deadlock def create(self, edge): - session = self._engine_facade.get_session() - with session.begin(): + with _session_for_write() as session: session.add(edge) + @wrap_sqlite_retry + @oslo_db_api.retry_on_deadlock def update(self, source_id, target_id, end_timestamp): - session = self._engine_facade.get_session() - with session.begin(): + with _session_for_write() as session: query = session.query(models.Edge).filter(and_( models.Edge.source_id == source_id, models.Edge.target_id == target_id)) query.update({models.Edge.end_timestamp: end_timestamp}) + @wrap_sqlite_retry + @oslo_db_api.retry_on_deadlock def end_all_edges(self, end_time): - session = self._engine_facade.get_session() - query = session.query(models.Edge).filter( - models.Edge.end_timestamp > end_time) - query.update({models.Edge.end_timestamp: end_time}) + with _session_for_write() as session: + query = session.query(models.Edge).filter( + models.Edge.end_timestamp > end_time) + query.update({models.Edge.end_timestamp: end_time}) + @wrap_sqlite_retry + @oslo_db_api.retry_on_deadlock def delete(self): - query = self.query_filter(models.Edge) - return query.delete() + with _session_for_write() as session: + query = session.query(models.Edge) + result = query.delete() + return result class ChangesConnection(base.ChangesConnection, BaseTableConn): - def __init__(self, engine_facade): - super(ChangesConnection, self).__init__(engine_facade) - + @wrap_sqlite_retry + @oslo_db_api.retry_on_deadlock def create(self, change): - session = self._engine_facade.get_session() - with session.begin(): + with _session_for_write() as session: session.add(change) def add_end_changes(self, vitrage_ids, end_time): @@ -499,15 +573,16 @@ class ChangesConnection(base.ChangesConnection, BaseTableConn): payload=change.payload) self.create(change_row) + @wrap_sqlite_retry def _get_alarms_last_change(self, alarm_ids): - session = self._engine_facade.get_session() - query = session.query(func.max(models.Change.timestamp), - models.Change.vitrage_id, - models.Change.payload).\ - filter(models.Change.vitrage_id.in_(alarm_ids)).\ - group_by(models.Change.vitrage_id) + with _session_for_read() as session: + query = session.query(func.max(models.Change.timestamp), + models.Change.vitrage_id, + models.Change.payload).\ + filter(models.Change.vitrage_id.in_(alarm_ids)).\ + group_by(models.Change.vitrage_id) - rows = query.all() + rows = query.all() result = {} for change in rows: @@ -515,6 +590,10 @@ class ChangesConnection(base.ChangesConnection, BaseTableConn): return result + @wrap_sqlite_retry + @oslo_db_api.retry_on_deadlock def delete(self): - query = self.query_filter(models.Change) - return query.delete() + with _session_for_write() as session: + query = session.query(models.Change) + result = query.delete() + return result diff --git a/vitrage/tests/functional/test_configuration.py b/vitrage/tests/functional/test_configuration.py index dfbf701fd..bf7cbc148 100644 --- a/vitrage/tests/functional/test_configuration.py +++ b/vitrage/tests/functional/test_configuration.py @@ -15,6 +15,8 @@ import os import sys import yaml +from oslo_db.sqlalchemy import enginefacade + from vitrage.common.constants import TemplateStatus from vitrage.common.constants import TemplateTypes as TType from vitrage.evaluator.template_db.template_repository import \ @@ -30,7 +32,7 @@ class TestConfiguration(object): sys.version_info[0]) self.config(group='database', connection=db_name) self._db = storage.get_connection_from_config() - engine = self._db._engine_facade.get_engine() + engine = enginefacade.writer.get_engine() models.Base.metadata.drop_all(engine) models.Base.metadata.create_all(engine) return self._db