Merge "Fix a race condition in agents status update code"
This commit is contained in:
commit
22b4a54919
@ -25,6 +25,7 @@ from neutron.db import model_base
|
|||||||
from neutron.db import models_v2
|
from neutron.db import models_v2
|
||||||
from neutron.extensions import agent as ext_agent
|
from neutron.extensions import agent as ext_agent
|
||||||
from neutron import manager
|
from neutron import manager
|
||||||
|
from neutron.openstack.common.db import exception as db_exc
|
||||||
from neutron.openstack.common import jsonutils
|
from neutron.openstack.common import jsonutils
|
||||||
from neutron.openstack.common import log as logging
|
from neutron.openstack.common import log as logging
|
||||||
from neutron.openstack.common import timeutils
|
from neutron.openstack.common import timeutils
|
||||||
@ -40,6 +41,11 @@ cfg.CONF.register_opt(
|
|||||||
class Agent(model_base.BASEV2, models_v2.HasId):
|
class Agent(model_base.BASEV2, models_v2.HasId):
|
||||||
"""Represents agents running in neutron deployments."""
|
"""Represents agents running in neutron deployments."""
|
||||||
|
|
||||||
|
__table_args__ = (
|
||||||
|
sa.UniqueConstraint('agent_type', 'host',
|
||||||
|
name='uniq_agents0agent_type0host'),
|
||||||
|
)
|
||||||
|
|
||||||
# L3 agent, DHCP agent, OVS agent, LinuxBridge
|
# L3 agent, DHCP agent, OVS agent, LinuxBridge
|
||||||
agent_type = sa.Column(sa.String(255), nullable=False)
|
agent_type = sa.Column(sa.String(255), nullable=False)
|
||||||
binary = sa.Column(sa.String(255), nullable=False)
|
binary = sa.Column(sa.String(255), nullable=False)
|
||||||
@ -135,8 +141,7 @@ class AgentDbMixin(ext_agent.AgentPluginBase):
|
|||||||
agent = self._get_agent(context, id)
|
agent = self._get_agent(context, id)
|
||||||
return self._make_agent_dict(agent, fields)
|
return self._make_agent_dict(agent, fields)
|
||||||
|
|
||||||
def create_or_update_agent(self, context, agent):
|
def _create_or_update_agent(self, context, agent):
|
||||||
"""Create or update agent according to report."""
|
|
||||||
with context.session.begin(subtransactions=True):
|
with context.session.begin(subtransactions=True):
|
||||||
res_keys = ['agent_type', 'binary', 'host', 'topic']
|
res_keys = ['agent_type', 'binary', 'host', 'topic']
|
||||||
res = dict((k, agent[k]) for k in res_keys)
|
res = dict((k, agent[k]) for k in res_keys)
|
||||||
@ -163,6 +168,28 @@ class AgentDbMixin(ext_agent.AgentPluginBase):
|
|||||||
context.session.add(agent_db)
|
context.session.add(agent_db)
|
||||||
greenthread.sleep(0)
|
greenthread.sleep(0)
|
||||||
|
|
||||||
|
def create_or_update_agent(self, context, agent):
|
||||||
|
"""Create or update agent according to report."""
|
||||||
|
|
||||||
|
try:
|
||||||
|
return self._create_or_update_agent(context, agent)
|
||||||
|
except db_exc.DBDuplicateEntry as e:
|
||||||
|
if e.columns == ['agent_type', 'host']:
|
||||||
|
# It might happen that two or more concurrent transactions are
|
||||||
|
# trying to insert new rows having the same value of
|
||||||
|
# (agent_type, host) pair at the same time (if there has been
|
||||||
|
# no such entry in the table and multiple agent status updates
|
||||||
|
# are being processed at the moment). In this case having a
|
||||||
|
# unique constraint on (agent_type, host) columns guarantees
|
||||||
|
# that only one transaction will succeed and insert a new agent
|
||||||
|
# entry, others will fail and be rolled back. That means we
|
||||||
|
# must retry them one more time: no INSERTs will be issued,
|
||||||
|
# because _get_agent_by_type_and_host() will return the
|
||||||
|
# existing agent entry, which will be updated multiple times
|
||||||
|
return self._create_or_update_agent(context, agent)
|
||||||
|
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
class AgentExtRpcCallback(object):
|
class AgentExtRpcCallback(object):
|
||||||
"""Processes the rpc report in plugin implementations."""
|
"""Processes the rpc report in plugin implementations."""
|
||||||
|
@ -0,0 +1,62 @@
|
|||||||
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
|
#
|
||||||
|
# Copyright 2013 OpenStack Foundation
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
#
|
||||||
|
|
||||||
|
"""Add a unique constraint on (agent_type, host) columns to prevent a race
|
||||||
|
condition when an agent entry is 'upserted'.
|
||||||
|
|
||||||
|
Revision ID: 1fcfc149aca4
|
||||||
|
Revises: e197124d4b9
|
||||||
|
Create Date: 2013-11-27 18:35:28.148680
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
revision = '1fcfc149aca4'
|
||||||
|
down_revision = 'e197124d4b9'
|
||||||
|
|
||||||
|
migration_for_plugins = [
|
||||||
|
'*'
|
||||||
|
]
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
|
||||||
|
from neutron.db import migration
|
||||||
|
|
||||||
|
|
||||||
|
TABLE_NAME = 'agents'
|
||||||
|
UC_NAME = 'uniq_agents0agent_type0host'
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade(active_plugins=None, options=None):
|
||||||
|
if not migration.should_run(active_plugins, migration_for_plugins):
|
||||||
|
return
|
||||||
|
|
||||||
|
op.create_unique_constraint(
|
||||||
|
name=UC_NAME,
|
||||||
|
source=TABLE_NAME,
|
||||||
|
local_cols=['agent_type', 'host']
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade(active_plugins=None, options=None):
|
||||||
|
if not migration.should_run(active_plugins, migration_for_plugins):
|
||||||
|
return
|
||||||
|
|
||||||
|
op.drop_constraint(
|
||||||
|
name=UC_NAME,
|
||||||
|
table_name=TABLE_NAME,
|
||||||
|
type_='unique'
|
||||||
|
)
|
86
neutron/tests/unit/db/test_agent_db.py
Normal file
86
neutron/tests/unit/db/test_agent_db.py
Normal file
@ -0,0 +1,86 @@
|
|||||||
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
|
#
|
||||||
|
# Copyright (c) 2013 OpenStack Foundation.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import mock
|
||||||
|
|
||||||
|
from neutron import context
|
||||||
|
from neutron.db import agents_db
|
||||||
|
from neutron.db import api as db
|
||||||
|
from neutron.db import db_base_plugin_v2 as base_plugin
|
||||||
|
from neutron.openstack.common.db import exception as exc
|
||||||
|
from neutron.tests import base
|
||||||
|
|
||||||
|
|
||||||
|
class FakePlugin(base_plugin.NeutronDbPluginV2, agents_db.AgentDbMixin):
|
||||||
|
"""A fake plugin class containing all DB methods."""
|
||||||
|
|
||||||
|
|
||||||
|
class TestAgentsDbMixin(base.BaseTestCase):
|
||||||
|
def setUp(self):
|
||||||
|
super(TestAgentsDbMixin, self).setUp()
|
||||||
|
|
||||||
|
self.context = context.get_admin_context()
|
||||||
|
self.plugin = FakePlugin()
|
||||||
|
self.addCleanup(db.clear_db)
|
||||||
|
|
||||||
|
self.agent_status = {
|
||||||
|
'agent_type': 'Open vSwitch agent',
|
||||||
|
'binary': 'neutron-openvswitch-agent',
|
||||||
|
'host': 'overcloud-notcompute',
|
||||||
|
'topic': 'N/A'
|
||||||
|
}
|
||||||
|
|
||||||
|
def _assert_ref_fields_are_equal(self, reference, result):
|
||||||
|
"""Compare (key, value) pairs of a reference dict with the result
|
||||||
|
|
||||||
|
Note: the result MAY have additional keys
|
||||||
|
"""
|
||||||
|
|
||||||
|
for field, value in reference.items():
|
||||||
|
self.assertEqual(value, result[field], field)
|
||||||
|
|
||||||
|
def test_create_or_update_agent_new_entry(self):
|
||||||
|
self.plugin.create_or_update_agent(self.context, self.agent_status)
|
||||||
|
|
||||||
|
agent = self.plugin.get_agents(self.context)[0]
|
||||||
|
self._assert_ref_fields_are_equal(self.agent_status, agent)
|
||||||
|
|
||||||
|
def test_create_or_update_agent_existing_entry(self):
|
||||||
|
self.plugin.create_or_update_agent(self.context, self.agent_status)
|
||||||
|
self.plugin.create_or_update_agent(self.context, self.agent_status)
|
||||||
|
self.plugin.create_or_update_agent(self.context, self.agent_status)
|
||||||
|
|
||||||
|
agents = self.plugin.get_agents(self.context)
|
||||||
|
self.assertEqual(len(agents), 1)
|
||||||
|
|
||||||
|
agent = agents[0]
|
||||||
|
self._assert_ref_fields_are_equal(self.agent_status, agent)
|
||||||
|
|
||||||
|
def test_create_or_update_agent_concurrent_insert(self):
|
||||||
|
# NOTE(rpodolyaka): emulate violation of the unique constraint caused
|
||||||
|
# by a concurrent insert. Ensure we make another
|
||||||
|
# attempt on fail
|
||||||
|
with mock.patch('sqlalchemy.orm.Session.add') as add_mock:
|
||||||
|
add_mock.side_effect = [
|
||||||
|
exc.DBDuplicateEntry(columns=['agent_type', 'host']),
|
||||||
|
None
|
||||||
|
]
|
||||||
|
|
||||||
|
self.plugin.create_or_update_agent(self.context, self.agent_status)
|
||||||
|
|
||||||
|
self.assertEqual(add_mock.call_count, 2,
|
||||||
|
"Agent entry creation hasn't been retried")
|
Loading…
Reference in New Issue
Block a user