Fix: Transaction rollback following DB creation error
This patch set employs a simple pattern to ensure that all database objects created within the scope of documents_create (which corresponds to the PUT /api/v1.0/buckets/{bucket_name/documents endpoint) fall underneath the same session transaction, such that any exception raised during that transaction results in all in-flight database objects getting rolled back. This fixes an issue where a revision could be created (and automatically committed) even if documents failed to be created immediately afterward (due to data conflict issues, for example), leading to a junk revision existing in the database. Now, the revision will no longer be created in the above scenario. This is achieved by using with session.begin() and placing all database operations underneath that transaction. Nested helper functions such as bucket_get_or_create (which is called from within documents_create) no longer uses its own session.begin() because there is no need to create a subtransaction: it still falls underneath the parent transaction as all we care about is idempotence with respect to document/revision/bucket DB object creation. A unit test is added to validate the correct behavior to avoid regression in behavior. Change-Id: Ifd19b1404a7f932cf4e045ca47acf364ce992c11
This commit is contained in:
parent
c3def624fb
commit
a342b5e7a2
@ -62,7 +62,11 @@ class BucketsResource(api_base.BaseResource):
|
|||||||
documents = self._encrypt_secret_documents(documents)
|
documents = self._encrypt_secret_documents(documents)
|
||||||
|
|
||||||
created_documents = self._create_revision_documents(
|
created_documents = self._create_revision_documents(
|
||||||
bucket_name, documents, validations)
|
bucket_name, documents)
|
||||||
|
|
||||||
|
if created_documents:
|
||||||
|
revision_id = created_documents[0]['revision_id']
|
||||||
|
self._create_revision_validations(revision_id, validations)
|
||||||
|
|
||||||
resp.body = self.view_builder.list(created_documents)
|
resp.body = self.view_builder.list(created_documents)
|
||||||
resp.status = falcon.HTTP_200
|
resp.status = falcon.HTTP_200
|
||||||
@ -75,14 +79,17 @@ class BucketsResource(api_base.BaseResource):
|
|||||||
document['data'] = secret_ref
|
document['data'] = secret_ref
|
||||||
return documents
|
return documents
|
||||||
|
|
||||||
def _create_revision_documents(self, bucket_name, documents,
|
def _create_revision_documents(self, bucket_name, documents):
|
||||||
validations):
|
|
||||||
try:
|
try:
|
||||||
created_documents = db_api.documents_create(
|
created_documents = db_api.documents_create(bucket_name, documents)
|
||||||
bucket_name, documents, validations=validations)
|
|
||||||
except (deckhand_errors.DuplicateDocumentExists,
|
except (deckhand_errors.DuplicateDocumentExists,
|
||||||
deckhand_errors.SingletonDocumentConflict) as e:
|
deckhand_errors.SingletonDocumentConflict) as e:
|
||||||
with excutils.save_and_reraise_exception():
|
with excutils.save_and_reraise_exception():
|
||||||
LOG.exception(e.format_message())
|
LOG.exception(e.format_message())
|
||||||
|
|
||||||
return created_documents
|
return created_documents
|
||||||
|
|
||||||
|
def _create_revision_validations(self, revision_id, validations):
|
||||||
|
for validation in validations:
|
||||||
|
db_api.validation_create(revision_id, validation['name'],
|
||||||
|
validation)
|
||||||
|
@ -149,8 +149,7 @@ def require_unique_document_schema(schema=None):
|
|||||||
|
|
||||||
|
|
||||||
@require_unique_document_schema(types.LAYERING_POLICY_SCHEMA)
|
@require_unique_document_schema(types.LAYERING_POLICY_SCHEMA)
|
||||||
def documents_create(bucket_name, documents, validations=None,
|
def documents_create(bucket_name, documents, session=None):
|
||||||
session=None):
|
|
||||||
"""Create a set of documents and associated bucket.
|
"""Create a set of documents and associated bucket.
|
||||||
|
|
||||||
If no changes are detected, a new revision will not be created. This
|
If no changes are detected, a new revision will not be created. This
|
||||||
@ -160,45 +159,43 @@ def documents_create(bucket_name, documents, validations=None,
|
|||||||
:param bucket_name: The name of the bucket with which to associate created
|
:param bucket_name: The name of the bucket with which to associate created
|
||||||
documents.
|
documents.
|
||||||
:param documents: List of documents to be created.
|
:param documents: List of documents to be created.
|
||||||
:param validation_policies: List of validation policies to be created.
|
|
||||||
:param session: Database session object.
|
:param session: Database session object.
|
||||||
:returns: List of created documents in dictionary format.
|
:returns: List of created documents in dictionary format.
|
||||||
:raises DocumentExists: If the document already exists in the DB for any
|
:raises DocumentExists: If the document already exists in the DB for any
|
||||||
bucket.
|
bucket.
|
||||||
"""
|
"""
|
||||||
session = session or get_session()
|
session = session or get_session()
|
||||||
documents_to_create = _documents_create(bucket_name, documents, session)
|
|
||||||
|
|
||||||
resp = []
|
resp = []
|
||||||
|
|
||||||
# The documents to be deleted are computed by comparing the documents for
|
with session.begin():
|
||||||
# the previous revision (if it exists) that belong to `bucket_name` with
|
documents_to_create = _documents_create(bucket_name, documents,
|
||||||
# `documents`: the difference between the former and the latter.
|
session=session)
|
||||||
document_history = [
|
|
||||||
d for d in revision_documents_get(bucket_name=bucket_name)
|
|
||||||
]
|
|
||||||
documents_to_delete = [
|
|
||||||
h for h in document_history if _meta(h) not in [
|
|
||||||
_meta(d) for d in documents]
|
|
||||||
]
|
|
||||||
|
|
||||||
# Only create a revision if any docs have been created, changed or deleted.
|
# The documents to be deleted are computed by comparing the documents
|
||||||
if any([documents_to_create, documents_to_delete]):
|
# for the previous revision (if it exists) that belong to `bucket_name`
|
||||||
bucket = bucket_get_or_create(bucket_name)
|
# with `documents`: the difference between the former and the latter.
|
||||||
revision = revision_create()
|
document_history = [
|
||||||
if validations:
|
d for d in revision_documents_get(bucket_name=bucket_name,
|
||||||
for validation in validations:
|
session=session)
|
||||||
validation_create(revision['id'], validation['name'],
|
]
|
||||||
validation)
|
documents_to_delete = [
|
||||||
|
h for h in document_history if _meta(h) not in [
|
||||||
|
_meta(d) for d in documents]
|
||||||
|
]
|
||||||
|
|
||||||
if documents_to_delete:
|
# Only create a revision if any docs have been created, changed or
|
||||||
LOG.debug('Deleting documents: %s.',
|
# deleted.
|
||||||
[_meta(d) for d in documents_to_delete])
|
if any([documents_to_create, documents_to_delete]):
|
||||||
deleted_documents = []
|
revision = revision_create(session=session)
|
||||||
|
bucket = bucket_get_or_create(bucket_name, session=session)
|
||||||
|
|
||||||
for d in documents_to_delete:
|
if documents_to_delete:
|
||||||
doc = models.Document()
|
LOG.debug('Deleting documents: %s.',
|
||||||
with session.begin():
|
[_meta(d) for d in documents_to_delete])
|
||||||
|
deleted_documents = []
|
||||||
|
|
||||||
|
for d in documents_to_delete:
|
||||||
|
doc = models.Document()
|
||||||
# Store bare minimum information about the document.
|
# Store bare minimum information about the document.
|
||||||
doc['schema'] = d['schema']
|
doc['schema'] = d['schema']
|
||||||
doc['name'] = d['name']
|
doc['name'] = d['name']
|
||||||
@ -219,14 +216,16 @@ def documents_create(bucket_name, documents, validations=None,
|
|||||||
name=doc['name'], bucket=bucket['name'])
|
name=doc['name'], bucket=bucket['name'])
|
||||||
doc.safe_delete(session=session)
|
doc.safe_delete(session=session)
|
||||||
deleted_documents.append(doc)
|
deleted_documents.append(doc)
|
||||||
resp.append(doc.to_dict())
|
resp.append(doc.to_dict())
|
||||||
|
|
||||||
if documents_to_create:
|
if documents_to_create:
|
||||||
LOG.debug(
|
LOG.debug(
|
||||||
'Creating documents: %s.', [(d['schema'], d['layer'], d['name'])
|
'Creating documents: %s.', [
|
||||||
for d in documents_to_create])
|
(d['schema'], d['layer'], d['name'])
|
||||||
for doc in documents_to_create:
|
for d in documents_to_create
|
||||||
with session.begin():
|
]
|
||||||
|
)
|
||||||
|
for doc in documents_to_create:
|
||||||
doc['bucket_id'] = bucket['id']
|
doc['bucket_id'] = bucket['id']
|
||||||
doc['revision_id'] = revision['id']
|
doc['revision_id'] = revision['id']
|
||||||
if not doc.get('orig_revision_id'):
|
if not doc.get('orig_revision_id'):
|
||||||
@ -239,7 +238,7 @@ def documents_create(bucket_name, documents, validations=None,
|
|||||||
schema=doc['schema'], layer=doc['layer'],
|
schema=doc['schema'], layer=doc['layer'],
|
||||||
name=doc['name'], bucket=bucket['name'])
|
name=doc['name'], bucket=bucket['name'])
|
||||||
|
|
||||||
resp.append(doc.to_dict())
|
resp.append(doc.to_dict())
|
||||||
# NOTE(fmontei): The orig_revision_id is not copied into the
|
# NOTE(fmontei): The orig_revision_id is not copied into the
|
||||||
# revision_id for each created document, because the revision_id here
|
# revision_id for each created document, because the revision_id here
|
||||||
# should reference the just-created revision. In case the user needs
|
# should reference the just-created revision. In case the user needs
|
||||||
@ -256,8 +255,7 @@ def _documents_create(bucket_name, documents, session=None):
|
|||||||
|
|
||||||
def _document_create(document):
|
def _document_create(document):
|
||||||
model = models.Document()
|
model = models.Document()
|
||||||
with session.begin():
|
model.update(document)
|
||||||
model.update(document)
|
|
||||||
return model
|
return model
|
||||||
|
|
||||||
for document in documents:
|
for document in documents:
|
||||||
@ -457,9 +455,8 @@ def bucket_get_or_create(bucket_name, session=None):
|
|||||||
.one()
|
.one()
|
||||||
except sa_orm.exc.NoResultFound:
|
except sa_orm.exc.NoResultFound:
|
||||||
bucket = models.Bucket()
|
bucket = models.Bucket()
|
||||||
with session.begin():
|
bucket.update({'name': bucket_name})
|
||||||
bucket.update({'name': bucket_name})
|
bucket.save(session=session)
|
||||||
bucket.save(session=session)
|
|
||||||
|
|
||||||
return bucket.to_dict()
|
return bucket.to_dict()
|
||||||
|
|
||||||
@ -476,8 +473,7 @@ def revision_create(session=None):
|
|||||||
session = session or get_session()
|
session = session or get_session()
|
||||||
|
|
||||||
revision = models.Revision()
|
revision = models.Revision()
|
||||||
with session.begin():
|
revision.save(session=session)
|
||||||
revision.save(session=session)
|
|
||||||
|
|
||||||
return revision.to_dict()
|
return revision.to_dict()
|
||||||
|
|
||||||
|
@ -12,6 +12,11 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
import mock
|
||||||
|
import testtools
|
||||||
|
|
||||||
|
from deckhand.db.sqlalchemy import api as db_api
|
||||||
|
from deckhand import errors
|
||||||
from deckhand import factories
|
from deckhand import factories
|
||||||
from deckhand.tests import test_utils
|
from deckhand.tests import test_utils
|
||||||
from deckhand.tests.unit.db import base
|
from deckhand.tests.unit.db import base
|
||||||
@ -299,3 +304,34 @@ class TestDocuments(base.TestDbBase):
|
|||||||
sorted(orig_documents, key=lambda d: d['created_at']),
|
sorted(orig_documents, key=lambda d: d['created_at']),
|
||||||
sorted(duplicate_documents, key=lambda d: d['created_at']),
|
sorted(duplicate_documents, key=lambda d: d['created_at']),
|
||||||
ignore=['created_at', 'updated_at', 'revision_id', 'id'])
|
ignore=['created_at', 'updated_at', 'revision_id', 'id'])
|
||||||
|
|
||||||
|
def test_document_creation_failure_rolls_back_in_flight_revision(self):
|
||||||
|
"""Regression test that an exception that occurs between creation of
|
||||||
|
a revision and creation of all bucket documents results in the
|
||||||
|
in-flight database objects getting rolled back.
|
||||||
|
"""
|
||||||
|
bucket_name = test_utils.rand_name('bucket')
|
||||||
|
payload = base.DocumentFixture.get_minimal_fixture()
|
||||||
|
|
||||||
|
original_revision_create = db_api.revision_create
|
||||||
|
|
||||||
|
# Mock the revision_create function so we can assert whether it was
|
||||||
|
# called, but still call the real function.
|
||||||
|
with mock.patch.object(
|
||||||
|
db_api, 'revision_create', autospec=True,
|
||||||
|
side_effect=original_revision_create) as m_revision_create:
|
||||||
|
# Raise any exception on a function call following the creation of
|
||||||
|
# a revision. The transaction will not complete so the result will
|
||||||
|
# not be committed to the database.
|
||||||
|
with mock.patch.object(db_api, 'bucket_get_or_create',
|
||||||
|
autospec=True,
|
||||||
|
side_effect=errors.DuplicateDocumentExists):
|
||||||
|
with testtools.ExpectedException(
|
||||||
|
errors.DuplicateDocumentExists):
|
||||||
|
self.create_documents(bucket_name, [payload])
|
||||||
|
|
||||||
|
# Validate that the actual revision_create call was indeed invoked.
|
||||||
|
self.assertTrue(m_revision_create.called)
|
||||||
|
# Finally validate that the revision doesn't exist.
|
||||||
|
with testtools.ExpectedException(errors.RevisionNotFound):
|
||||||
|
self.show_revision(1)
|
||||||
|
Loading…
Reference in New Issue
Block a user