make record_metering_data concurrency safe

Recording metering might happen in parallel, and if two sqlalchemy
session try to create a source, a user, a project or a resource at the
same time, one of them fail with a IntegrityError.

This patch ensure the record_metering_data sqlalchemy session always
have the needed source, user, project and a resource by creating missing
obj in an other sqlalchemy session.

Fixes bug: #1237671

Change-Id: Ie8761ce1615c3142ad6ad2a400537f465f3bbf0a
This commit is contained in:
Mehdi Abaakouk 2013-10-23 13:33:53 +02:00
parent fe158993bb
commit dd62f9ce0d

View File

@ -199,7 +199,42 @@ class Connection(base.Connection):
engine.execute(table.delete())
@staticmethod
def record_metering_data(data):
def _create_or_update(session, model_class, _id, source=None, **kwargs):
if not _id:
return None
try:
# create a nested session for the case of two call of
# record_metering_data run in parallel to not fail the
# record of this sample
# (except for sqlite, that doesn't support nested
# transaction and doesn't have concurrency problem)
nested = session.connection().dialect.name != 'sqlite'
# raise dbexc.DBDuplicateEntry manually for sqlite
# to not break the current session
if not nested and session.query(model_class).get(str(_id)):
raise dbexc.DBDuplicateEntry()
with session.begin(nested=nested,
subtransactions=not nested):
obj = model_class(id=str(_id))
session.add(obj)
except dbexc.DBDuplicateEntry:
# requery the object from the db if this is an other
# parallel/previous call of record_metering_data that
# have successfully created this object
obj = session.query(model_class).get(str(_id))
# update the object
if source and not filter(lambda x: x.id == source.id, obj.sources):
obj.sources.append(source)
for k in kwargs:
setattr(obj, k, kwargs[k])
return obj
@classmethod
def record_metering_data(cls, data):
"""Write the data to the backend storage system.
:param data: a dictionary such as returned by
@ -207,39 +242,17 @@ class Connection(base.Connection):
"""
session = sqlalchemy_session.get_session()
with session.begin():
if data['source']:
source = session.query(Source).get(data['source'])
if not source:
source = Source(id=data['source'])
session.add(source)
else:
source = None
# create/update user && project, add/update their sources list
if data['user_id']:
user = session.merge(User(id=str(data['user_id'])))
if not filter(lambda x: x.id == source.id, user.sources):
user.sources.append(source)
else:
user = None
if data['project_id']:
project = session.merge(Project(id=str(data['project_id'])))
if not filter(lambda x: x.id == source.id, project.sources):
project.sources.append(source)
else:
project = None
# Record the updated resource metadata
rmetadata = data['resource_metadata']
resource = session.merge(Resource(id=str(data['resource_id'])))
if not filter(lambda x: x.id == source.id, resource.sources):
resource.sources.append(source)
resource.project = project
resource.user = user
# Current metadata being used and when it was last updated.
resource.resource_metadata = rmetadata
source = cls._create_or_update(session, Source, data['source'])
user = cls._create_or_update(session, User, data['user_id'],
source)
project = cls._create_or_update(session, Project,
data['project_id'], source)
resource = cls._create_or_update(session, Resource,
data['resource_id'], source,
user=user, project=project,
resource_metadata=rmetadata)
# Record the raw data for the meter.
meter = Meter(counter_type=data['counter_type'],
@ -270,8 +283,6 @@ class Connection(base.Connection):
meta_key=key,
value=v))
session.flush()
@staticmethod
def clear_expired_metering_data(ttl):
"""Clear expired data from the backend storage system according to the