Merge "Fix a race condition in agents status update code"
This commit is contained in:
commit
0a522b53d1
@ -25,6 +25,7 @@ from neutron.db import model_base
|
||||
from neutron.db import models_v2
|
||||
from neutron.extensions import agent as ext_agent
|
||||
from neutron import manager
|
||||
from neutron.openstack.common.db import exception as db_exc
|
||||
from neutron.openstack.common import jsonutils
|
||||
from neutron.openstack.common import log as logging
|
||||
from neutron.openstack.common import timeutils
|
||||
@ -40,6 +41,11 @@ cfg.CONF.register_opt(
|
||||
class Agent(model_base.BASEV2, models_v2.HasId):
|
||||
"""Represents agents running in neutron deployments."""
|
||||
|
||||
__table_args__ = (
|
||||
sa.UniqueConstraint('agent_type', 'host',
|
||||
name='uniq_agents0agent_type0host'),
|
||||
)
|
||||
|
||||
# L3 agent, DHCP agent, OVS agent, LinuxBridge
|
||||
agent_type = sa.Column(sa.String(255), nullable=False)
|
||||
binary = sa.Column(sa.String(255), nullable=False)
|
||||
@ -135,8 +141,7 @@ class AgentDbMixin(ext_agent.AgentPluginBase):
|
||||
agent = self._get_agent(context, id)
|
||||
return self._make_agent_dict(agent, fields)
|
||||
|
||||
def create_or_update_agent(self, context, agent):
|
||||
"""Create or update agent according to report."""
|
||||
def _create_or_update_agent(self, context, agent):
|
||||
with context.session.begin(subtransactions=True):
|
||||
res_keys = ['agent_type', 'binary', 'host', 'topic']
|
||||
res = dict((k, agent[k]) for k in res_keys)
|
||||
@ -163,6 +168,28 @@ class AgentDbMixin(ext_agent.AgentPluginBase):
|
||||
context.session.add(agent_db)
|
||||
greenthread.sleep(0)
|
||||
|
||||
def create_or_update_agent(self, context, agent):
|
||||
"""Create or update agent according to report."""
|
||||
|
||||
try:
|
||||
return self._create_or_update_agent(context, agent)
|
||||
except db_exc.DBDuplicateEntry as e:
|
||||
if e.columns == ['agent_type', 'host']:
|
||||
# It might happen that two or more concurrent transactions are
|
||||
# trying to insert new rows having the same value of
|
||||
# (agent_type, host) pair at the same time (if there has been
|
||||
# no such entry in the table and multiple agent status updates
|
||||
# are being processed at the moment). In this case having a
|
||||
# unique constraint on (agent_type, host) columns guarantees
|
||||
# that only one transaction will succeed and insert a new agent
|
||||
# entry, others will fail and be rolled back. That means we
|
||||
# must retry them one more time: no INSERTs will be issued,
|
||||
# because _get_agent_by_type_and_host() will return the
|
||||
# existing agent entry, which will be updated multiple times
|
||||
return self._create_or_update_agent(context, agent)
|
||||
|
||||
raise
|
||||
|
||||
|
||||
class AgentExtRpcCallback(object):
|
||||
"""Processes the rpc report in plugin implementations."""
|
||||
|
@ -0,0 +1,62 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Copyright 2013 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
"""Add a unique constraint on (agent_type, host) columns to prevent a race
|
||||
condition when an agent entry is 'upserted'.
|
||||
|
||||
Revision ID: 1fcfc149aca4
|
||||
Revises: e197124d4b9
|
||||
Create Date: 2013-11-27 18:35:28.148680
|
||||
|
||||
"""
|
||||
|
||||
revision = '1fcfc149aca4'
|
||||
down_revision = 'e197124d4b9'
|
||||
|
||||
migration_for_plugins = [
|
||||
'*'
|
||||
]
|
||||
|
||||
from alembic import op
|
||||
|
||||
from neutron.db import migration
|
||||
|
||||
|
||||
TABLE_NAME = 'agents'
|
||||
UC_NAME = 'uniq_agents0agent_type0host'
|
||||
|
||||
|
||||
def upgrade(active_plugins=None, options=None):
|
||||
if not migration.should_run(active_plugins, migration_for_plugins):
|
||||
return
|
||||
|
||||
op.create_unique_constraint(
|
||||
name=UC_NAME,
|
||||
source=TABLE_NAME,
|
||||
local_cols=['agent_type', 'host']
|
||||
)
|
||||
|
||||
|
||||
def downgrade(active_plugins=None, options=None):
|
||||
if not migration.should_run(active_plugins, migration_for_plugins):
|
||||
return
|
||||
|
||||
op.drop_constraint(
|
||||
name=UC_NAME,
|
||||
table_name=TABLE_NAME,
|
||||
type_='unique'
|
||||
)
|
86
neutron/tests/unit/db/test_agent_db.py
Normal file
86
neutron/tests/unit/db/test_agent_db.py
Normal file
@ -0,0 +1,86 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Copyright (c) 2013 OpenStack Foundation.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from neutron import context
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import api as db
|
||||
from neutron.db import db_base_plugin_v2 as base_plugin
|
||||
from neutron.openstack.common.db import exception as exc
|
||||
from neutron.tests import base
|
||||
|
||||
|
||||
class FakePlugin(base_plugin.NeutronDbPluginV2, agents_db.AgentDbMixin):
|
||||
"""A fake plugin class containing all DB methods."""
|
||||
|
||||
|
||||
class TestAgentsDbMixin(base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestAgentsDbMixin, self).setUp()
|
||||
|
||||
self.context = context.get_admin_context()
|
||||
self.plugin = FakePlugin()
|
||||
self.addCleanup(db.clear_db)
|
||||
|
||||
self.agent_status = {
|
||||
'agent_type': 'Open vSwitch agent',
|
||||
'binary': 'neutron-openvswitch-agent',
|
||||
'host': 'overcloud-notcompute',
|
||||
'topic': 'N/A'
|
||||
}
|
||||
|
||||
def _assert_ref_fields_are_equal(self, reference, result):
|
||||
"""Compare (key, value) pairs of a reference dict with the result
|
||||
|
||||
Note: the result MAY have additional keys
|
||||
"""
|
||||
|
||||
for field, value in reference.items():
|
||||
self.assertEqual(value, result[field], field)
|
||||
|
||||
def test_create_or_update_agent_new_entry(self):
|
||||
self.plugin.create_or_update_agent(self.context, self.agent_status)
|
||||
|
||||
agent = self.plugin.get_agents(self.context)[0]
|
||||
self._assert_ref_fields_are_equal(self.agent_status, agent)
|
||||
|
||||
def test_create_or_update_agent_existing_entry(self):
|
||||
self.plugin.create_or_update_agent(self.context, self.agent_status)
|
||||
self.plugin.create_or_update_agent(self.context, self.agent_status)
|
||||
self.plugin.create_or_update_agent(self.context, self.agent_status)
|
||||
|
||||
agents = self.plugin.get_agents(self.context)
|
||||
self.assertEqual(len(agents), 1)
|
||||
|
||||
agent = agents[0]
|
||||
self._assert_ref_fields_are_equal(self.agent_status, agent)
|
||||
|
||||
def test_create_or_update_agent_concurrent_insert(self):
|
||||
# NOTE(rpodolyaka): emulate violation of the unique constraint caused
|
||||
# by a concurrent insert. Ensure we make another
|
||||
# attempt on fail
|
||||
with mock.patch('sqlalchemy.orm.Session.add') as add_mock:
|
||||
add_mock.side_effect = [
|
||||
exc.DBDuplicateEntry(columns=['agent_type', 'host']),
|
||||
None
|
||||
]
|
||||
|
||||
self.plugin.create_or_update_agent(self.context, self.agent_status)
|
||||
|
||||
self.assertEqual(add_mock.call_count, 2,
|
||||
"Agent entry creation hasn't been retried")
|
Loading…
Reference in New Issue
Block a user