diff --git a/neutron/db/agents_db.py b/neutron/db/agents_db.py index e095a4c2a7..9a574a3f27 100644 --- a/neutron/db/agents_db.py +++ b/neutron/db/agents_db.py @@ -25,6 +25,7 @@ from neutron.db import model_base from neutron.db import models_v2 from neutron.extensions import agent as ext_agent from neutron import manager +from neutron.openstack.common.db import exception as db_exc from neutron.openstack.common import jsonutils from neutron.openstack.common import log as logging from neutron.openstack.common import timeutils @@ -40,6 +41,11 @@ cfg.CONF.register_opt( class Agent(model_base.BASEV2, models_v2.HasId): """Represents agents running in neutron deployments.""" + __table_args__ = ( + sa.UniqueConstraint('agent_type', 'host', + name='uniq_agents0agent_type0host'), + ) + # L3 agent, DHCP agent, OVS agent, LinuxBridge agent_type = sa.Column(sa.String(255), nullable=False) binary = sa.Column(sa.String(255), nullable=False) @@ -135,8 +141,7 @@ class AgentDbMixin(ext_agent.AgentPluginBase): agent = self._get_agent(context, id) return self._make_agent_dict(agent, fields) - def create_or_update_agent(self, context, agent): - """Create or update agent according to report.""" + def _create_or_update_agent(self, context, agent): with context.session.begin(subtransactions=True): res_keys = ['agent_type', 'binary', 'host', 'topic'] res = dict((k, agent[k]) for k in res_keys) @@ -163,6 +168,28 @@ class AgentDbMixin(ext_agent.AgentPluginBase): context.session.add(agent_db) greenthread.sleep(0) + def create_or_update_agent(self, context, agent): + """Create or update agent according to report.""" + + try: + return self._create_or_update_agent(context, agent) + except db_exc.DBDuplicateEntry as e: + if e.columns == ['agent_type', 'host']: + # It might happen that two or more concurrent transactions are + # trying to insert new rows having the same value of + # (agent_type, host) pair at the same time (if there has been + # no such entry in the table and multiple agent status updates + # are being processed at the moment). In this case having a + # unique constraint on (agent_type, host) columns guarantees + # that only one transaction will succeed and insert a new agent + # entry, others will fail and be rolled back. That means we + # must retry them one more time: no INSERTs will be issued, + # because _get_agent_by_type_and_host() will return the + # existing agent entry, which will be updated multiple times + return self._create_or_update_agent(context, agent) + + raise + class AgentExtRpcCallback(object): """Processes the rpc report in plugin implementations.""" diff --git a/neutron/db/migration/alembic_migrations/versions/1fcfc149aca4_agents_unique_by_type_and_host.py b/neutron/db/migration/alembic_migrations/versions/1fcfc149aca4_agents_unique_by_type_and_host.py new file mode 100644 index 0000000000..12d5287758 --- /dev/null +++ b/neutron/db/migration/alembic_migrations/versions/1fcfc149aca4_agents_unique_by_type_and_host.py @@ -0,0 +1,62 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +"""Add a unique constraint on (agent_type, host) columns to prevent a race +condition when an agent entry is 'upserted'. + +Revision ID: 1fcfc149aca4 +Revises: e197124d4b9 +Create Date: 2013-11-27 18:35:28.148680 + +""" + +revision = '1fcfc149aca4' +down_revision = 'e197124d4b9' + +migration_for_plugins = [ + '*' +] + +from alembic import op + +from neutron.db import migration + + +TABLE_NAME = 'agents' +UC_NAME = 'uniq_agents0agent_type0host' + + +def upgrade(active_plugins=None, options=None): + if not migration.should_run(active_plugins, migration_for_plugins): + return + + op.create_unique_constraint( + name=UC_NAME, + source=TABLE_NAME, + local_cols=['agent_type', 'host'] + ) + + +def downgrade(active_plugins=None, options=None): + if not migration.should_run(active_plugins, migration_for_plugins): + return + + op.drop_constraint( + name=UC_NAME, + table_name=TABLE_NAME, + type_='unique' + ) diff --git a/neutron/tests/unit/db/test_agent_db.py b/neutron/tests/unit/db/test_agent_db.py new file mode 100644 index 0000000000..e3dc5ee8fc --- /dev/null +++ b/neutron/tests/unit/db/test_agent_db.py @@ -0,0 +1,86 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright (c) 2013 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from neutron import context +from neutron.db import agents_db +from neutron.db import api as db +from neutron.db import db_base_plugin_v2 as base_plugin +from neutron.openstack.common.db import exception as exc +from neutron.tests import base + + +class FakePlugin(base_plugin.NeutronDbPluginV2, agents_db.AgentDbMixin): + """A fake plugin class containing all DB methods.""" + + +class TestAgentsDbMixin(base.BaseTestCase): + def setUp(self): + super(TestAgentsDbMixin, self).setUp() + + self.context = context.get_admin_context() + self.plugin = FakePlugin() + self.addCleanup(db.clear_db) + + self.agent_status = { + 'agent_type': 'Open vSwitch agent', + 'binary': 'neutron-openvswitch-agent', + 'host': 'overcloud-notcompute', + 'topic': 'N/A' + } + + def _assert_ref_fields_are_equal(self, reference, result): + """Compare (key, value) pairs of a reference dict with the result + + Note: the result MAY have additional keys + """ + + for field, value in reference.items(): + self.assertEqual(value, result[field], field) + + def test_create_or_update_agent_new_entry(self): + self.plugin.create_or_update_agent(self.context, self.agent_status) + + agent = self.plugin.get_agents(self.context)[0] + self._assert_ref_fields_are_equal(self.agent_status, agent) + + def test_create_or_update_agent_existing_entry(self): + self.plugin.create_or_update_agent(self.context, self.agent_status) + self.plugin.create_or_update_agent(self.context, self.agent_status) + self.plugin.create_or_update_agent(self.context, self.agent_status) + + agents = self.plugin.get_agents(self.context) + self.assertEqual(len(agents), 1) + + agent = agents[0] + self._assert_ref_fields_are_equal(self.agent_status, agent) + + def test_create_or_update_agent_concurrent_insert(self): + # NOTE(rpodolyaka): emulate violation of the unique constraint caused + # by a concurrent insert. Ensure we make another + # attempt on fail + with mock.patch('sqlalchemy.orm.Session.add') as add_mock: + add_mock.side_effect = [ + exc.DBDuplicateEntry(columns=['agent_type', 'host']), + None + ] + + self.plugin.create_or_update_agent(self.context, self.agent_status) + + self.assertEqual(add_mock.call_count, 2, + "Agent entry creation hasn't been retried")