diff --git a/releasenotes/notes/introduce-topic-resource-9b40674cac06bdc2.yaml b/releasenotes/notes/introduce-topic-resource-9b40674cac06bdc2.yaml new file mode 100644 index 000000000..c10c4ccde --- /dev/null +++ b/releasenotes/notes/introduce-topic-resource-9b40674cac06bdc2.yaml @@ -0,0 +1,8 @@ +--- +features: + - Introduce a new resource called Topic into Zaqar. + Topic is a concept from AWS Simple Notification Service (SNS), it will has + relevance with subscriptions. User can send message to a topic, and then + the subscribers will get the message according to different protocols, + like http, email, sms, etc. This feature will help Zaqar to split + Messaging Queue Service and Notification Service clearly. diff --git a/setup.cfg b/setup.cfg index a0681b267..98a79eb15 100644 --- a/setup.cfg +++ b/setup.cfg @@ -61,6 +61,15 @@ zaqar.storage.redis.driver.queue.stages = zaqar.storage.swift.driver.queue.stages = message_queue_handler = zaqar.storage.swift.messages:MessageQueueHandler +zaqar.storage.mongodb.driver.topic.stages = + message_queue_handler = zaqar.storage.mongodb.topic_messages:MessageTopicHandler + +zaqar.storage.redis.driver.topic.stages = + message_queue_handler = zaqar.storage.redis.messages:MessageTopicHandler + +zaqar.storage.swift.driver.topic.stages = + message_queue_handler = zaqar.storage.swift.messages:MessageTopicHandler + zaqar.notification.tasks = http = zaqar.notification.tasks.webhook:WebhookTask https = zaqar.notification.tasks.webhook:WebhookTask diff --git a/zaqar/common/policies/__init__.py b/zaqar/common/policies/__init__.py index b13e8829f..58c9e1fa3 100644 --- a/zaqar/common/policies/__init__.py +++ b/zaqar/common/policies/__init__.py @@ -20,6 +20,7 @@ from zaqar.common.policies import messages from zaqar.common.policies import pools from zaqar.common.policies import queues from zaqar.common.policies import subscription +from zaqar.common.policies import topics def list_rules(): @@ -31,5 +32,6 @@ def list_rules(): messages.list_rules(), pools.list_rules(), queues.list_rules(), - subscription.list_rules() + subscription.list_rules(), + topics.list_rules(), ) diff --git a/zaqar/common/policies/topics.py b/zaqar/common/policies/topics.py new file mode 100644 index 000000000..171bbf729 --- /dev/null +++ b/zaqar/common/policies/topics.py @@ -0,0 +1,101 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_policy import policy + +from zaqar.common.policies import base + +TOPICS = 'topics:%s' + +rules = [ + policy.DocumentedRuleDefault( + name=TOPICS % 'get_all', + check_str=base.UNPROTECTED, + description='List all topics.', + operations=[ + { + 'path': '/v2/topics', + 'method': 'GET' + } + ] + ), + policy.DocumentedRuleDefault( + name=TOPICS % 'create', + check_str=base.UNPROTECTED, + description='Create a topic.', + operations=[ + { + 'path': '/v2/topics/{topic_name}', + 'method': 'PUT' + } + ] + ), + policy.DocumentedRuleDefault( + name=TOPICS % 'get', + check_str=base.UNPROTECTED, + description='Get details about a specific topic.', + operations=[ + { + 'path': '/v2/topics/{topic_name}', + 'method': 'GET' + } + ] + ), + policy.DocumentedRuleDefault( + name=TOPICS % 'delete', + check_str=base.UNPROTECTED, + description='Delete a topic.', + operations=[ + { + 'path': '/v2/topics/{topic_name}', + 'method': 'DELETE' + } + ] + ), + policy.DocumentedRuleDefault( + name=TOPICS % 'update', + check_str=base.UNPROTECTED, + description='Update a topic.', + operations=[ + { + 'path': '/v2/topics/{topic_name}', + 'method': 'PATCH' + } + ] + ), + policy.DocumentedRuleDefault( + name=TOPICS % 'stats', + check_str=base.UNPROTECTED, + description='Get statistics about a specific topic.', + operations=[ + { + 'path': '/v2/topics/{topic_name}/stats', + 'method': 'GET' + } + ] + ), + policy.DocumentedRuleDefault( + name=TOPICS % 'purge', + check_str=base.UNPROTECTED, + description='Purge resources from a particular topic.', + operations=[ + { + 'path': '/v2/topic/{topic_name}/purge', + 'method': 'POST' + } + ] + ) +] + + +def list_rules(): + return rules diff --git a/zaqar/common/transport/wsgi/helpers.py b/zaqar/common/transport/wsgi/helpers.py index 9e9a2ecce..5d4448d42 100644 --- a/zaqar/common/transport/wsgi/helpers.py +++ b/zaqar/common/transport/wsgi/helpers.py @@ -277,3 +277,44 @@ def inject_context(req, resp, params): project_domain_id=project_domain_id, user_domain_id=user_domain_id) req.env['zaqar.context'] = ctxt + + +def validate_topic_identification(validate, req, resp, params): + """Hook for validating the topic name and project id in requests. + + The queue name validation is short-circuited if 'topic_name' does + not exist in `params`. + + This hook depends on the `get_project` hook, which must be + installed upstream. + + + :param validate: A validator function that will + be used to check the topic name against configured + limits. functools.partial or a closure must be used to + set this first arg, and expose the remaining ones as + a Falcon hook interface. + :param req: Falcon request object + :param resp: Falcon response object + :param params: Responder params dict + """ + + try: + validate(params['topic_name'], + params['project_id']) + except KeyError: + # NOTE(kgriffs): topic not in params, so nothing to do + pass + except validation.ValidationFailed: + project = params['project_id'] + queue = params['topic_name'] + if six.PY2: + queue = queue.decode('utf-8', 'replace') + + LOG.debug(u'Invalid topic name "%(topic)s" submitted for ' + u'project: %(project)s', + {'topic': queue, 'project': project}) + + raise falcon.HTTPBadRequest(_(u'Invalid topic identification'), + _(u'The format of the submitted topic ' + u'name or project id is not valid.')) diff --git a/zaqar/conf/storage.py b/zaqar/conf/storage.py index 2779e3c83..2db74a2c0 100644 --- a/zaqar/conf/storage.py +++ b/zaqar/conf/storage.py @@ -44,12 +44,20 @@ subscription_pipeline = cfg.ListOpt( 'controller methods.')) +topic_pipeline = cfg.ListOpt( + 'topic_pipeline', default=[], + help=_('Pipeline to use for processing topic operations. This ' + 'pipeline will be consumed before calling the storage driver\'s ' + 'controller methods.')) + + GROUP_NAME = 'storage' ALL_OPTS = [ queue_pipeline, message_pipeline, claim_pipeline, - subscription_pipeline + subscription_pipeline, + topic_pipeline ] diff --git a/zaqar/storage/__init__.py b/zaqar/storage/__init__.py index 795647b1d..e663df51d 100644 --- a/zaqar/storage/__init__.py +++ b/zaqar/storage/__init__.py @@ -27,10 +27,12 @@ Queue = base.Queue Subscription = base.Subscription PoolsBase = base.PoolsBase FlavorsBase = base.FlavorsBase +Topic = base.Topic DEFAULT_QUEUES_PER_PAGE = base.DEFAULT_QUEUES_PER_PAGE DEFAULT_MESSAGES_PER_PAGE = base.DEFAULT_MESSAGES_PER_PAGE DEFAULT_POOLS_PER_PAGE = base.DEFAULT_POOLS_PER_PAGE DEFAULT_SUBSCRIPTIONS_PER_PAGE = base.DEFAULT_SUBSCRIPTIONS_PER_PAGE +DEFAULT_TOPICS_PER_PAGE = base.DEFAULT_TOPICS_PER_PAGE DEFAULT_MESSAGES_PER_CLAIM = base.DEFAULT_MESSAGES_PER_CLAIM diff --git a/zaqar/storage/base.py b/zaqar/storage/base.py index 271359049..80335c474 100644 --- a/zaqar/storage/base.py +++ b/zaqar/storage/base.py @@ -35,6 +35,7 @@ DEFAULT_QUEUES_PER_PAGE = 10 DEFAULT_MESSAGES_PER_PAGE = 10 DEFAULT_POOLS_PER_PAGE = 10 DEFAULT_SUBSCRIPTIONS_PER_PAGE = 10 +DEFAULT_TOPICS_PER_PAGE = 10 DEFAULT_MESSAGES_PER_CLAIM = 10 @@ -242,6 +243,11 @@ class DataDriverBase(DriverBase): """Returns the driver's subscription controller.""" raise NotImplementedError + @decorators.lazy_property(write=False) + def topic_controller(self): + """Returns the driver's topic controller.""" + return self.control_driver.topic_controller + @six.add_metaclass(abc.ABCMeta) class ControlDriverBase(DriverBase): @@ -281,6 +287,11 @@ class ControlDriverBase(DriverBase): """Returns the driver's queue controller.""" raise NotImplementedError + @abc.abstractproperty + def topic_controller(self): + """Returns the driver's topic controller.""" + raise NotImplementedError + @abc.abstractmethod def close(self): """Close connections to the backend.""" @@ -1094,3 +1105,113 @@ class FlavorsBase(ControllerBase): """Deletes all flavors from storage.""" raise NotImplementedError + + +@six.add_metaclass(abc.ABCMeta) +class Topic(ControllerBase): + """This class is responsible for managing topics. + + Topic operations include CRUD, etc. + + Storage driver implementations of this class should + be capable of handling high workloads and huge + numbers of topics. + """ + + def list(self, project=None, kfilter={}, marker=None, + limit=DEFAULT_TOPICS_PER_PAGE, detailed=False, name=None): + """Base method for listing topics. + + :param project: Project id + :param kfilter: The key-value of metadata which user want to filter + :param marker: The last topic name + :param limit: (Default 10) Max number of topics to return + :param detailed: Whether metadata is included + :param name: The topic name which user want to filter + + :returns: An iterator giving a sequence of topics + and the marker of the next page. + """ + return self._list(project, kfilter, marker, limit, detailed, name) + + _list = abc.abstractmethod(lambda x: None) + + def get(self, name, project=None): + """Base method for topic metadata retrieval. + + :param name: The topic name + :param project: Project id + + :returns: Dictionary containing topic metadata + :raises DoesNotExist: if topic metadata does not exist + """ + return self._get(name, project) + + _get = abc.abstractmethod(lambda x: None) + + def get_metadata(self, name, project=None): + """Base method for topic metadata retrieval. + + :param name: The topic name + :param project: Project id + + :returns: Dictionary containing topic metadata + :raises DoesNotExist: if topic metadata does not exist + """ + raise NotImplementedError + + def set_metadata(self, name, metadata, project=None): + """Base method for updating a topic metadata. + + :param name: The topic name + :param metadata: Topic metadata as a dict + :param project: Project id + :raises DoesNotExist: if topic metadata can not be updated + """ + raise NotImplementedError + + def create(self, name, metadata=None, project=None): + """Base method for topic creation. + + :param name: The topic name + :param project: Project id + :returns: True if a topic was created and False + if it was updated. + """ + return self._create(name, metadata, project) + + _create = abc.abstractmethod(lambda x: None) + + def exists(self, name, project=None): + """Base method for testing topic existence. + + :param name: The topic name + :param project: Project id + :returns: True if a topic exists and False + if it does not. + """ + return self._exists(name, project) + + _exists = abc.abstractmethod(lambda x: None) + + def delete(self, name, project=None): + """Base method for deleting a topic. + + :param name: The topic name + :param project: Project id + """ + return self._delete(name, project) + + _delete = abc.abstractmethod(lambda x: None) + + def stats(self, name, project=None): + """Base method for topic stats. + + :param name: The topic name + :param project: Project id + :returns: Dictionary with the + queue stats + """ + return self._stats(name, project) + + _stats = abc.abstractmethod(lambda x: None) diff --git a/zaqar/storage/errors.py b/zaqar/storage/errors.py index 9ac908224..d35c8a282 100644 --- a/zaqar/storage/errors.py +++ b/zaqar/storage/errors.py @@ -225,3 +225,19 @@ class SubscriptionAlreadyExists(Conflict): msg_format = (u'Such subscription already exists. Subscriptions ' u'are unique by project + queue + subscriber URI.') + + +class TopicDoesNotExist(DoesNotExist): + + msg_format = u'Topic {name} does not exist for project {project}' + + def __init__(self, name, project): + super(TopicDoesNotExist, self).__init__(name=name, project=project) + + +class TopicIsEmpty(ExceptionBase): + + msg_format = u'Topic {name} in project {project} is empty' + + def __init__(self, name, project): + super(TopicIsEmpty, self).__init__(name=name, project=project) diff --git a/zaqar/storage/mongodb/controllers.py b/zaqar/storage/mongodb/controllers.py index 7706f1538..90021a585 100644 --- a/zaqar/storage/mongodb/controllers.py +++ b/zaqar/storage/mongodb/controllers.py @@ -29,6 +29,7 @@ from zaqar.storage.mongodb import messages from zaqar.storage.mongodb import pools from zaqar.storage.mongodb import queues from zaqar.storage.mongodb import subscriptions +from zaqar.storage.mongodb import topics CatalogueController = catalogue.CatalogueController @@ -39,3 +40,4 @@ FIFOMessageController = messages.FIFOMessageController QueueController = queues.QueueController PoolsController = pools.PoolsController SubscriptionController = subscriptions.SubscriptionController +TopicController = topics.TopicController diff --git a/zaqar/storage/mongodb/driver.py b/zaqar/storage/mongodb/driver.py index 2e84c4afa..f1a677734 100644 --- a/zaqar/storage/mongodb/driver.py +++ b/zaqar/storage/mongodb/driver.py @@ -271,6 +271,17 @@ class ControlDriver(storage.ControlDriverBase): name = self.mongodb_conf.database + '_queues' return self.connection[name] + @decorators.lazy_property(write=False) + def topics_database(self): + """Database dedicated to the "topics" collection. + + The topics collection is separated out into its own database + to avoid writer lock contention with the messages collections. + """ + + name = self.mongodb_conf.database + '_topics' + return self.connection[name] + @decorators.lazy_property(write=False) def queue_controller(self): controller = controllers.QueueController(self) @@ -308,3 +319,13 @@ class ControlDriver(storage.ControlDriverBase): return profiler.trace_cls("mongodb_flavors_controller")(controller) else: return controller + + @decorators.lazy_property(write=False) + def topic_controller(self): + controller = controllers.TopicController(self) + if (self.conf.profiler.enabled and + (self.conf.profiler.trace_message_store or + self.conf.profiler.trace_management_store)): + return profiler.trace_cls("mongodb_topics_controller")(controller) + else: + return controller diff --git a/zaqar/storage/mongodb/topic_messages.py b/zaqar/storage/mongodb/topic_messages.py new file mode 100644 index 000000000..55bd2cc23 --- /dev/null +++ b/zaqar/storage/mongodb/topic_messages.py @@ -0,0 +1,976 @@ +# Copyright (c) 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Implements MongoDB the storage controller for messages. + +Field Mappings: + In order to reduce the disk / memory space used, + field names will be, most of the time, the first + letter of their long name. +""" + +import datetime +import time + +from bson import errors as bsonerror +from bson import objectid +from oslo_log import log as logging +from oslo_utils import timeutils +import pymongo.errors +import pymongo.read_preferences + +from zaqar.i18n import _ +from zaqar import storage +from zaqar.storage import errors +from zaqar.storage.mongodb import utils +from zaqar.storage import utils as s_utils + + +LOG = logging.getLogger(__name__) + +# NOTE(kgriffs): This value, in seconds, should be at least less than the +# minimum allowed TTL for messages (60 seconds). Make it 45 to allow for +# some fudge room. +MAX_RETRY_POST_DURATION = 45 + +# NOTE(kgriffs): It is extremely unlikely that all workers would somehow hang +# for more than 5 seconds, without a single one being able to succeed in +# posting some messages and incrementing the counter, thus allowing the other +# producers to succeed in turn. +COUNTER_STALL_WINDOW = 5 + +# For hinting +ID_INDEX_FIELDS = [('_id', 1)] + +# For removing expired messages +TTL_INDEX_FIELDS = [ + ('e', 1), +] + +# to unify use of project/topic across mongodb +# storage impls. +PROJ_TOPIC = utils.PROJ_TOPIC_KEY + +# NOTE(kgriffs): This index is for listing messages, usually +# filtering out claimed ones. +ACTIVE_INDEX_FIELDS = [ + (PROJ_TOPIC, 1), # Project will be unique, so put first + ('k', 1), # Used for sorting and paging, must come before range queries +] + +# For counting +COUNTING_INDEX_FIELDS = [ + (PROJ_TOPIC, 1), # Project will be unique, so put first +] + +# This index is meant to be used as a shard-key and to ensure +# uniqueness for markers. +# +# As for other compound indexes, order matters. The marker `k` +# gives enough cardinality to ensure chunks are evenly distributed, +# whereas the `p_q` field helps keeping chunks from the same project +# and queue together. +# +# In a sharded environment, uniqueness of this index is still guaranteed +# because it's used as a shard key. +MARKER_INDEX_FIELDS = [ + ('k', 1), + (PROJ_TOPIC, 1), +] + +TRANSACTION_INDEX_FIELDS = [ + ('tx', 1), +] + + +class MessageController(storage.Message): + """Implements message resource operations using MongoDB. + + Messages are scoped by project + topic. + + :: + + Messages: + Name Field + ------------------------- + scope -> p_t + ttl -> t + expires -> e + marker -> k + body -> b + client uuid -> u + transaction -> tx + delay -> d + checksum -> cs + """ + + def __init__(self, *args, **kwargs): + super(MessageController, self).__init__(*args, **kwargs) + + # Cache for convenience and performance + self._num_partitions = self.driver.mongodb_conf.partitions + self._topic_ctrl = self.driver.topic_controller + self._retry_range = range(self.driver.mongodb_conf.max_attempts) + + # Create a list of 'messages' collections, one for each database + # partition, ordered by partition number. + # + # NOTE(kgriffs): Order matters, since it is used to lookup the + # collection by partition number. For example, self._collections[2] + # would provide access to zaqar_p2.messages (partition numbers are + # zero-based). + self._collections = [db.messages + for db in self.driver.message_databases] + + # Ensure indexes are initialized before any queries are performed + for collection in self._collections: + self._ensure_indexes(collection) + + # ---------------------------------------------------------------------- + # Helpers + # ---------------------------------------------------------------------- + + def _ensure_indexes(self, collection): + """Ensures that all indexes are created.""" + + collection.ensure_index(TTL_INDEX_FIELDS, + name='ttl', + expireAfterSeconds=0, + background=True) + + collection.ensure_index(ACTIVE_INDEX_FIELDS, + name='active', + background=True) + + collection.ensure_index(COUNTING_INDEX_FIELDS, + name='counting', + background=True) + + collection.ensure_index(MARKER_INDEX_FIELDS, + name='queue_marker', + background=True) + + collection.ensure_index(TRANSACTION_INDEX_FIELDS, + name='transaction', + background=True) + + def _collection(self, topic_name, project=None): + """Get a partitioned collection instance.""" + return self._collections[utils.get_partition(self._num_partitions, + topic_name, project)] + + def _backoff_sleep(self, attempt): + """Sleep between retries using a jitter algorithm. + + Mitigates thrashing between multiple parallel requests, and + creates backpressure on clients to slow down the rate + at which they submit requests. + + :param attempt: current attempt number, zero-based + """ + conf = self.driver.mongodb_conf + seconds = utils.calculate_backoff(attempt, conf.max_attempts, + conf.max_retry_sleep, + conf.max_retry_jitter) + + time.sleep(seconds) + + def _purge_topic(self, topic_name, project=None): + """Removes all messages from the queue. + + Warning: Only use this when deleting the queue; otherwise + you can cause a side-effect of reseting the marker counter + which can cause clients to miss tons of messages. + + If the queue does not exist, this method fails silently. + + :param topic_name: name of the queue to purge + :param project: ID of the project to which the queue belongs + """ + scope = utils.scope_queue_name(topic_name, project) + collection = self._collection(topic_name, project) + collection.delete_many({PROJ_TOPIC: scope}) + + def _list(self, topic_name, project=None, marker=None, + echo=False, client_uuid=None, projection=None, + include_claimed=False, include_delayed=False, + sort=1, limit=None): + """Message document listing helper. + + :param topic_name: Name of the topic to list + :param project: (Default None) Project `topic_name` belongs to. If + not specified, queries the "global" namespace/project. + :param marker: (Default None) Message marker from which to start + iterating. If not specified, starts with the first message + available in the topic. + :param echo: (Default False) Whether to return messages that match + client_uuid + :param client_uuid: (Default None) UUID for the client that + originated this request + :param projection: (Default None) a list of field names that should be + returned in the result set or a dict specifying the fields to + include or exclude + :param include_claimed: (Default False) Whether to include + claimed messages, not just active ones + :param include_delayed: (Default False) Whether to include + delayed messages, not just active ones + :param sort: (Default 1) Sort order for the listing. Pass 1 for + ascending (oldest message first), or -1 for descending (newest + message first). + :param limit: (Default None) The maximum number of messages + to list. The results may include fewer messages than the + requested `limit` if not enough are available. If limit is + not specified + + :returns: Generator yielding up to `limit` messages. + """ + + if sort not in (1, -1): + raise ValueError(u'sort must be either 1 (ascending) ' + u'or -1 (descending)') + + now = timeutils.utcnow_ts() + + query = { + # Messages must belong to this topic and project. + PROJ_TOPIC: utils.scope_queue_name(topic_name, project), + + # NOTE(kgriffs): Messages must be finalized (i.e., must not + # be part of an unfinalized transaction). + # + # See also the note wrt 'tx' within the definition + # of ACTIVE_INDEX_FIELDS. + 'tx': None, + } + + if not echo: + query['u'] = {'$ne': client_uuid} + + if marker is not None: + query['k'] = {'$gt': marker} + + collection = self._collection(topic_name, project) + + if not include_delayed: + # NOTE(cdyangzhenyu): Only include messages that are not + # part of any delay, or are part of an expired delay. if + # the message has no attribute 'd', it will also be obtained. + # This is for compatibility with old data. + query['$or'] = [{'d': {'$lte': now}}, + {'d': {'$exists': False}}] + + # Construct the request + cursor = collection.find(query, + projection=projection, + sort=[('k', sort)]) + + if limit is not None: + cursor.limit(limit) + + # NOTE(flaper87): Suggest the index to use for this query to + # ensure the most performant one is chosen. + return cursor.hint(ACTIVE_INDEX_FIELDS) + + # ---------------------------------------------------------------------- + # "Friends" interface + # ---------------------------------------------------------------------- + + def _count(self, topic_name, project=None, include_claimed=False): + """Return total number of messages in a topic. + + This method is designed to very quickly count the number + of messages in a given topic. Expired messages are not + counted, of course. If the queue does not exist, the + count will always be 0. + + Note: Some expired messages may be included in the count if + they haven't been GC'd yet. This is done for performance. + """ + query = { + # Messages must belong to this queue and project. + PROJ_TOPIC: utils.scope_queue_name(topic_name, project), + + # NOTE(kgriffs): Messages must be finalized (i.e., must not + # be part of an unfinalized transaction). + # + # See also the note wrt 'tx' within the definition + # of ACTIVE_INDEX_FIELDS. + 'tx': None, + } + + collection = self._collection(topic_name, project) + return collection.count(filter=query, hint=COUNTING_INDEX_FIELDS) + + def _active(self, topic_name, marker=None, echo=False, + client_uuid=None, projection=None, project=None, + limit=None, include_delayed=False): + + return self._list(topic_name, project=project, marker=marker, + echo=echo, client_uuid=client_uuid, + projection=projection, include_claimed=False, + include_delayed=include_delayed, limit=limit) + + def _inc_counter(self, topic_name, project=None, amount=1, window=None): + """Increments the message counter and returns the new value. + + :param topic_name: Name of the topic to which the counter is scoped + :param project: Queue's project name + :param amount: (Default 1) Amount by which to increment the counter + :param window: (Default None) A time window, in seconds, that + must have elapsed since the counter was last updated, in + order to increment the counter. + + :returns: Updated message counter value, or None if window + was specified, and the counter has already been updated + within the specified time period. + + :raises QueueDoesNotExist: if not found + """ + + # NOTE(flaper87): If this `if` is True, it means we're + # using a mongodb in the control plane. To avoid breaking + # environments doing so already, we'll keep using the counter + # in the mongodb topic_controller rather than the one in the + # message_controller. This should go away, eventually + if hasattr(self._topic_ctrl, '_inc_counter'): + return self._topic_ctrl._inc_counter(topic_name, project, + amount, window) + + now = timeutils.utcnow_ts() + + update = {'$inc': {'c.v': amount}, '$set': {'c.t': now}} + query = _get_scoped_query(topic_name, project) + if window is not None: + threshold = now - window + query['c.t'] = {'$lt': threshold} + + while True: + try: + collection = self._collection(topic_name, project).stats + doc = collection.find_one_and_update( + query, update, + return_document=pymongo.ReturnDocument.AFTER, + projection={'c.v': 1, '_id': 0}) + + break + except pymongo.errors.AutoReconnect as ex: + LOG.exception(ex) + + if doc is None: + if window is None: + # NOTE(kgriffs): Since we did not filter by a time window, + # the topic should have been found and updated. Perhaps + # the topic has been deleted? + message = (u'Failed to increment the message ' + u'counter for topic %(name)s and ' + u'project %(project)s') + message %= dict(name=topic_name, project=project) + + LOG.warning(message) + + raise errors.TopicDoesNotExist(topic_name, project) + + # NOTE(kgriffs): Assume the queue existed, but the counter + # was recently updated, causing the range query on 'c.t' to + # exclude the record. + return None + + return doc['c']['v'] + + def _get_counter(self, topic_name, project=None): + """Retrieves the current message counter value for a given topic. + + This helper is used to generate monotonic pagination + markers that are saved as part of the message + document. + + Note 1: Markers are scoped per-queue and so are *not* + globally unique or globally ordered. + + Note 2: If two or more requests to this method are made + in parallel, this method will return the same counter + value. This is done intentionally so that the caller + can detect a parallel message post, allowing it to + mitigate race conditions between producer and + observer clients. + + :param topic_name: Name of the topic to which the counter is scoped + :param project: Topic's project + :returns: current message counter as an integer + """ + + # NOTE(flaper87): If this `if` is True, it means we're + # using a mongodb in the control plane. To avoid breaking + # environments doing so already, we'll keep using the counter + # in the mongodb queue_controller rather than the one in the + # message_controller. This should go away, eventually + if hasattr(self._topic_ctrl, '_get_counter'): + return self._topic_ctrl._get_counter(topic_name, project) + + update = {'$inc': {'c.v': 0, 'c.t': 0}} + query = _get_scoped_query(topic_name, project) + + try: + collection = self._collection(topic_name, project).stats + doc = collection.find_one_and_update( + query, update, upsert=True, + return_document=pymongo.ReturnDocument.AFTER, + projection={'c.v': 1, '_id': 0}) + + return doc['c']['v'] + except pymongo.errors.AutoReconnect as ex: + LOG.exception(ex) + + # ---------------------------------------------------------------------- + # Public interface + # ---------------------------------------------------------------------- + + def list(self, topic_name, project=None, marker=None, + limit=storage.DEFAULT_MESSAGES_PER_PAGE, + echo=False, client_uuid=None, include_claimed=False, + include_delayed=False): + + if marker is not None: + try: + marker = int(marker) + except ValueError: + yield iter([]) + + messages = self._list(topic_name, project=project, marker=marker, + client_uuid=client_uuid, echo=echo, + include_claimed=include_claimed, + include_delayed=include_delayed, limit=limit) + + marker_id = {} + + now = timeutils.utcnow_ts() + + # NOTE (kgriffs) @utils.raises_conn_error not needed on this + # function, since utils.HookedCursor already has it. + def denormalizer(msg): + marker_id['next'] = msg['k'] + + return _basic_message(msg, now) + + yield utils.HookedCursor(messages, denormalizer) + yield str(marker_id['next']) + + @utils.raises_conn_error + @utils.retries_on_autoreconnect + def first(self, topic_name, project=None, sort=1): + cursor = self._list(topic_name, project=project, + include_claimed=True, sort=sort, + limit=1) + try: + message = next(cursor) + except StopIteration: + raise errors.TopicIsEmpty(topic_name, project) + + now = timeutils.utcnow_ts() + return _basic_message(message, now) + + @utils.raises_conn_error + @utils.retries_on_autoreconnect + def get(self, topic_name, message_id, project=None): + mid = utils.to_oid(message_id) + if mid is None: + raise errors.MessageDoesNotExist(message_id, topic_name, + project) + + now = timeutils.utcnow_ts() + + query = { + '_id': mid, + PROJ_TOPIC: utils.scope_queue_name(topic_name, project), + } + + collection = self._collection(topic_name, project) + message = list(collection.find(query).limit(1).hint(ID_INDEX_FIELDS)) + + if not message: + raise errors.MessageDoesNotExist(message_id, topic_name, + project) + + return _basic_message(message[0], now) + + @utils.raises_conn_error + @utils.retries_on_autoreconnect + def bulk_get(self, topic_name, message_ids, project=None): + message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid] + if not message_ids: + return iter([]) + + now = timeutils.utcnow_ts() + + # Base query, always check expire time + query = { + '_id': {'$in': message_ids}, + PROJ_TOPIC: utils.scope_queue_name(topic_name, project), + } + + collection = self._collection(topic_name, project) + + # NOTE(flaper87): Should this query + # be sorted? + messages = collection.find(query).hint(ID_INDEX_FIELDS) + + def denormalizer(msg): + return _basic_message(msg, now) + + return utils.HookedCursor(messages, denormalizer) + + @utils.raises_conn_error + @utils.retries_on_autoreconnect + def post(self, topic_name, messages, client_uuid, project=None): + # NOTE(flaper87): This method should be safe to retry on + # autoreconnect, since we've a 2-step insert for messages. + # The worst-case scenario is that we'll increase the counter + # several times and we'd end up with some non-active messages. + + if not self._topic_ctrl.exists(topic_name, project): + raise errors.TopicDoesNotExist(topic_name, project) + + # NOTE(flaper87): Make sure the counter exists. This method + # is an upsert. + self._get_counter(topic_name, project) + now = timeutils.utcnow_ts() + now_dt = datetime.datetime.utcfromtimestamp(now) + collection = self._collection(topic_name, project) + + messages = list(messages) + msgs_n = len(messages) + next_marker = self._inc_counter(topic_name, + project, + amount=msgs_n) - msgs_n + + prepared_messages = [] + for index, message in enumerate(messages): + msg = { + PROJ_TOPIC: utils.scope_queue_name(topic_name, project), + 't': message['ttl'], + 'e': now_dt + datetime.timedelta(seconds=message['ttl']), + 'u': client_uuid, + 'd': now + message.get('delay', 0), + 'b': message['body'] if 'body' in message else {}, + 'k': next_marker + index, + 'tx': None + } + if self.driver.conf.enable_checksum: + msg['cs'] = s_utils.get_checksum(message.get('body', None)) + + prepared_messages.append(msg) + + res = collection.insert_many(prepared_messages, + bypass_document_validation=True) + + return [str(id_) for id_ in res.inserted_ids] + + @utils.raises_conn_error + @utils.retries_on_autoreconnect + def delete(self, topic_name, message_id, project=None, claim=None): + # NOTE(cpp-cabrera): return early - this is an invalid message + # id so we won't be able to find it any way + mid = utils.to_oid(message_id) + if mid is None: + return + + collection = self._collection(topic_name, project) + + query = { + '_id': mid, + PROJ_TOPIC: utils.scope_queue_name(topic_name, project), + } + + cid = utils.to_oid(claim) + if cid is None: + raise errors.ClaimDoesNotExist(claim, topic_name, project) + + now = timeutils.utcnow_ts() + cursor = collection.find(query).hint(ID_INDEX_FIELDS) + + try: + message = next(cursor) + except StopIteration: + return + + if claim is None: + if _is_claimed(message, now): + raise errors.MessageIsClaimed(message_id) + + else: + if message['c']['id'] != cid: + kwargs = {} + # NOTE(flaper87): In pymongo 3.0 PRIMARY is the default and + # `read_preference` is read only. We'd need to set it when the + # client is created. + # NOTE(kgriffs): Read from primary in case the message + # was just barely claimed, and claim hasn't made it to + # the secondary. + message = collection.find_one(query, **kwargs) + + if message['c']['id'] != cid: + if _is_claimed(message, now): + raise errors.MessageNotClaimedBy(message_id, claim) + + raise errors.MessageNotClaimed(message_id) + + collection.delete_one(query) + + @utils.raises_conn_error + @utils.retries_on_autoreconnect + def bulk_delete(self, topic_name, message_ids, project=None, + claim_ids=None): + message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid] + if claim_ids: + claim_ids = [cid for cid in map(utils.to_oid, claim_ids) if cid] + query = { + '_id': {'$in': message_ids}, + PROJ_TOPIC: utils.scope_queue_name(topic_name, project), + } + + collection = self._collection(topic_name, project) + if claim_ids: + message_claim_ids = [] + messages = collection.find(query).hint(ID_INDEX_FIELDS) + for message in messages: + message_claim_ids.append(message['c']['id']) + for cid in claim_ids: + if cid not in message_claim_ids: + raise errors.ClaimDoesNotExist(cid, topic_name, project) + + collection.delete_many(query) + + @utils.raises_conn_error + @utils.retries_on_autoreconnect + def pop(self, topic_name, limit, project=None): + query = { + PROJ_TOPIC: utils.scope_queue_name(topic_name, project), + } + + # Only include messages that are not part of + # any claim, or are part of an expired claim. + now = timeutils.utcnow_ts() + query['c.e'] = {'$lte': now} + + collection = self._collection(topic_name, project) + projection = {'_id': 1, 't': 1, 'b': 1, 'c.id': 1} + + messages = (collection.find_one_and_delete(query, + projection=projection) + for _ in range(limit)) + + final_messages = [_basic_message(message, now) + for message in messages + if message] + + return final_messages + + +class FIFOMessageController(MessageController): + + def _ensure_indexes(self, collection): + """Ensures that all indexes are created.""" + + collection.ensure_index(TTL_INDEX_FIELDS, + name='ttl', + expireAfterSeconds=0, + background=True) + + collection.ensure_index(ACTIVE_INDEX_FIELDS, + name='active', + background=True) + + collection.ensure_index(COUNTING_INDEX_FIELDS, + name='counting', + background=True) + + # NOTE(kgriffs): This index must be unique so that + # inserting a message with the same marker to the + # same queue will fail; this is used to detect a + # race condition which can cause an observer client + # to miss a message when there is more than one + # producer posting messages to the same queue, in + # parallel. + collection.ensure_index(MARKER_INDEX_FIELDS, + name='queue_marker', + unique=True, + background=True) + + collection.ensure_index(TRANSACTION_INDEX_FIELDS, + name='transaction', + background=True) + + @utils.raises_conn_error + @utils.retries_on_autoreconnect + def post(self, topic_name, messages, client_uuid, project=None): + # NOTE(flaper87): This method should be safe to retry on + # autoreconnect, since we've a 2-step insert for messages. + # The worst-case scenario is that we'll increase the counter + # several times and we'd end up with some non-active messages. + + if not self._topic_ctrl.exists(topic_name, project): + raise errors.TopicDoesNotExist(topic_name, project) + + # NOTE(flaper87): Make sure the counter exists. This method + # is an upsert. + self._get_counter(topic_name, project) + now = timeutils.utcnow_ts() + now_dt = datetime.datetime.utcfromtimestamp(now) + collection = self._collection(topic_name, project) + + # Set the next basis marker for the first attempt. + # + # Note that we don't increment the counter right away because + # if 2 concurrent posts happen and the one with the higher counter + # ends before the one with the lower counter, there's a window + # where a client paging through the queue may get the messages + # with the higher counter and skip the previous ones. This would + # make our FIFO guarantee unsound. + next_marker = self._get_counter(topic_name, project) + + # Unique transaction ID to facilitate atomic batch inserts + transaction = objectid.ObjectId() + + prepared_messages = [] + for index, message in enumerate(messages): + msg = { + PROJ_TOPIC: utils.scope_queue_name(topic_name, project), + 't': message['ttl'], + 'e': now_dt + datetime.timedelta(seconds=message['ttl']), + 'u': client_uuid, + 'd': now + message.get('delay', 0), + 'b': message['body'] if 'body' in message else {}, + 'k': next_marker + index, + 'tx': None + } + if self.driver.conf.enable_checksum: + msg['cs'] = s_utils.get_checksum(message.get('body', None)) + + prepared_messages.append(msg) + + # NOTE(kgriffs): Don't take the time to do a 2-phase insert + # if there is no way for it to partially succeed. + if len(prepared_messages) == 1: + transaction = None + prepared_messages[0]['tx'] = None + + # Use a retry range for sanity, although we expect + # to rarely, if ever, reach the maximum number of + # retries. + # + # NOTE(kgriffs): With the default configuration (100 ms + # max sleep, 1000 max attempts), the max stall time + # before the operation is abandoned is 49.95 seconds. + for attempt in self._retry_range: + try: + res = collection.insert_many(prepared_messages, + bypass_document_validation=True) + + # Log a message if we retried, for debugging perf issues + if attempt != 0: + msgtmpl = _(u'%(attempts)d attempt(s) required to post ' + u'%(num_messages)d messages to queue ' + u'"%(topic)s" under project %(project)s') + + LOG.debug(msgtmpl, + dict(topic=topic_name, + attempts=attempt + 1, + num_messages=len(res.inserted_ids), + project=project)) + + # Update the counter in preparation for the next batch + # + # NOTE(kgriffs): Due to the unique index on the messages + # collection, competing inserts will fail as a whole, + # and keep retrying until the counter is incremented + # such that the competing marker's will start at a + # unique number, 1 past the max of the messages just + # inserted above. + self._inc_counter(topic_name, project, + amount=len(res.inserted_ids)) + + # NOTE(kgriffs): Finalize the insert once we can say that + # all the messages made it. This makes bulk inserts + # atomic, assuming queries filter out any non-finalized + # messages. + if transaction is not None: + collection.update_many({'tx': transaction}, + {'$set': {'tx': None}}, + upsert=False) + + return [str(id_) for id_ in res.inserted_ids] + + except (pymongo.errors.DuplicateKeyError, + pymongo.errors.BulkWriteError) as ex: + # TODO(kgriffs): Record stats of how often retries happen, + # and how many attempts, on average, are required to insert + # messages. + + # NOTE(kgriffs): This can be used in conjunction with the + # log line, above, that is emitted after all messages have + # been posted, to gauge how long it is taking for messages + # to be posted to a given topic, or overall. + # + # TODO(kgriffs): Add transaction ID to help match up loglines + if attempt == 0: + msgtmpl = _(u'First attempt failed while ' + u'adding messages to topic ' + u'"%(topic)s" under project %(project)s') + + LOG.debug(msgtmpl, dict(topic=topic_name, project=project)) + + # NOTE(kgriffs): Never retry past the point that competing + # messages expire and are GC'd, since once they are gone, + # the unique index no longer protects us from getting out + # of order, which could cause an observer to miss this + # message. The code below provides a sanity-check to ensure + # this situation can not happen. + elapsed = timeutils.utcnow_ts() - now + if elapsed > MAX_RETRY_POST_DURATION: + msgtmpl = (u'Exceeded maximum retry duration for topic ' + u'"%(topic)s" under project %(project)s') + + LOG.warning(msgtmpl, + dict(topic=topic_name, project=project)) + break + + # Chill out for a moment to mitigate thrashing/thundering + self._backoff_sleep(attempt) + + # NOTE(kgriffs): Perhaps we failed because a worker crashed + # after inserting messages, but before incrementing the + # counter; that would cause all future requests to stall, + # since they would keep getting the same base marker that is + # conflicting with existing messages, until the messages that + # "won" expire, at which time we would end up reusing markers, + # and that could make some messages invisible to an observer + # that is querying with a marker that is large than the ones + # being reused. + # + # To mitigate this, we apply a heuristic to determine whether + # a counter has stalled. We attempt to increment the counter, + # but only if it hasn't been updated for a few seconds, which + # should mean that nobody is left to update it! + # + # Note that we increment one at a time until the logjam is + # broken, since we don't know how many messages were posted + # by the worker before it crashed. + next_marker = self._inc_counter( + topic_name, project, window=COUNTER_STALL_WINDOW) + + # Retry the entire batch with a new sequence of markers. + # + # NOTE(kgriffs): Due to the unique index, and how + # MongoDB works with batch requests, we will never + # end up with a partially-successful update. The first + # document in the batch will fail to insert, and the + # remainder of the documents will not be attempted. + if next_marker is None: + # NOTE(kgriffs): Usually we will end up here, since + # it should be rare that a counter becomes stalled. + next_marker = self._get_counter( + topic_name, project) + else: + msgtmpl = (u'Detected a stalled message counter ' + u'for topic "%(topic)s" under ' + u'project %(project)s.' + u'The counter was incremented to %(value)d.') + + LOG.warning(msgtmpl, + dict(topic=topic_name, + project=project, + value=next_marker)) + + for index, message in enumerate(prepared_messages): + message['k'] = next_marker + index + except bsonerror.InvalidDocument as ex: + LOG.exception(ex) + raise + except Exception as ex: + LOG.exception(ex) + raise + + msgtmpl = (u'Hit maximum number of attempts (%(max)s) for topic ' + u'"%(topic)s" under project %(project)s') + + LOG.warning(msgtmpl, + dict(max=self.driver.mongodb_conf.max_attempts, + topic=topic_name, + project=project)) + + raise errors.MessageConflict(topic_name, project) + + +def _is_claimed(msg, now): + return (msg['c']['id'] is not None and + msg['c']['e'] > now) + + +def _basic_message(msg, now): + oid = msg['_id'] + age = now - utils.oid_ts(oid) + res = { + 'id': str(oid), + 'age': int(age), + 'ttl': msg['t'], + 'body': msg['b'] + } + if msg.get('cs'): + res['checksum'] = msg.get('cs') + + return res + + +class MessageTopicHandler(object): + + def __init__(self, driver, control_driver): + self.driver = driver + self._cache = self.driver.cache + self.topic_controller = self.driver.topic_controller + self.message_controller = self.driver.message_controller + + def delete(self, topic_name, project=None): + self.message_controller._purge_queue(topic_name, project) + + @utils.raises_conn_error + @utils.retries_on_autoreconnect + def stats(self, name, project=None): + if not self.topic_controller.exists(name, project=project): + raise errors.TopicDoesNotExist(name, project) + + controller = self.message_controller + + total = controller._count(name, project=project, + include_claimed=True) + + message_stats = { + 'total': total, + } + + try: + oldest = controller.first(name, project=project, sort=1) + newest = controller.first(name, project=project, sort=-1) + except errors.QueueIsEmpty: + pass + else: + now = timeutils.utcnow_ts() + message_stats['oldest'] = utils.stat_message(oldest, now) + message_stats['newest'] = utils.stat_message(newest, now) + + return {'messages': message_stats} + + +def _get_scoped_query(name, project): + return {'p_t': utils.scope_queue_name(name, project)} diff --git a/zaqar/storage/mongodb/topics.py b/zaqar/storage/mongodb/topics.py new file mode 100644 index 000000000..1dca69052 --- /dev/null +++ b/zaqar/storage/mongodb/topics.py @@ -0,0 +1,279 @@ +# Copyright (c) 2019 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Implements the MongoDB storage controller for topics. + +Field Mappings: + In order to reduce the disk / memory space used, + field names will be, most of the time, the first + letter of their long name. +""" + +from oslo_log import log as logging +from oslo_utils import timeutils +from pymongo.collection import ReturnDocument +import pymongo.errors + +from zaqar.common import decorators +from zaqar.i18n import _ +from zaqar import storage +from zaqar.storage import errors +from zaqar.storage.mongodb import utils + +LOG = logging.getLogger(__name__) + +# NOTE(wanghao): Keep this as same as queues' +_TOPIC_CACHE_PREFIX = 'topiccontroller:' +_TOPIC_CACHE_TTL = 5 + + +def _topic_exists_key(topic, project=None): + # NOTE(kgriffs): Use string concatenation for performance, + # also put project first since it is guaranteed to be + # unique, which should reduce lookup time. + return _TOPIC_CACHE_PREFIX + 'exists:' + str(project) + '/' + topic + + +class TopicController(storage.Topic): + """Implements Topic resource operations using MongoDB. + + Topics are scoped by project, which is prefixed to the + topic name. + + :: + + Topic: + + Name Field + --------------------- + name -> p_t + msg counter -> c + metadata -> m + + Message Counter: + + Name Field + ------------------- + value -> v + modified ts -> t + """ + + def __init__(self, *args, **kwargs): + super(TopicController, self).__init__(*args, **kwargs) + + self._cache = self.driver.cache + self._collection = self.driver.topics_database.topics + + # NOTE(flaper87): This creates a unique index for + # project and name. Using project as the prefix + # allows for querying by project and project+name. + # This is also useful for retrieving the queues list for + # a specific project, for example. Order matters! + self._collection.ensure_index([('p_t', 1)], unique=True) + + # ---------------------------------------------------------------------- + # Helpers + # ---------------------------------------------------------------------- + + def _get_counter(self, name, project=None): + """Retrieves the current message counter value for a given topic. + + This helper is used to generate monotonic pagination + markers that are saved as part of the message + document. + + Note 1: Markers are scoped per-topic and so are *not* + globally unique or globally ordered. + + Note 2: If two or more requests to this method are made + in parallel, this method will return the same counter + value. This is done intentionally so that the caller + can detect a parallel message post, allowing it to + mitigate race conditions between producer and + observer clients. + + :param name: Name of the queue to which the counter is scoped + :param project: Topic's project + :returns: current message counter as an integer + """ + + doc = self._collection.find_one(_get_scoped_query(name, project), + projection={'c.v': 1, '_id': 0}) + + if doc is None: + raise errors.TopicDoesNotExist(name, project) + + return doc['c']['v'] + + def _inc_counter(self, name, project=None, amount=1, window=None): + """Increments the message counter and returns the new value. + + :param name: Name of the topic to which the counter is scoped + :param project: Topic's project name + :param amount: (Default 1) Amount by which to increment the counter + :param window: (Default None) A time window, in seconds, that + must have elapsed since the counter was last updated, in + order to increment the counter. + + :returns: Updated message counter value, or None if window + was specified, and the counter has already been updated + within the specified time period. + + :raises TopicDoesNotExist: if not found + """ + now = timeutils.utcnow_ts() + + update = {'$inc': {'c.v': amount}, '$set': {'c.t': now}} + query = _get_scoped_query(name, project) + if window is not None: + threshold = now - window + query['c.t'] = {'$lt': threshold} + + while True: + try: + doc = self._collection.find_one_and_update( + query, update, return_document=ReturnDocument.AFTER, + projection={'c.v': 1, '_id': 0}) + + break + except pymongo.errors.AutoReconnect as ex: + LOG.exception(ex) + + if doc is None: + if window is None: + # NOTE(kgriffs): Since we did not filter by a time window, + # the topic should have been found and updated. Perhaps + # the topic has been deleted? + message = _(u'Failed to increment the message ' + u'counter for topic %(name)s and ' + u'project %(project)s') + message %= dict(name=name, project=project) + + LOG.warning(message) + + raise errors.TopicDoesNotExist(name, project) + + # NOTE(kgriffs): Assume the topic existed, but the counter + # was recently updated, causing the range topic on 'c.t' to + # exclude the record. + return None + + return doc['c']['v'] + + # ---------------------------------------------------------------------- + # Interface + # ---------------------------------------------------------------------- + + def _get(self, name, project=None): + try: + return self.get_metadata(name, project) + except errors.TopicDoesNotExist: + return {} + + def _list(self, project=None, kfilter={}, marker=None, + limit=storage.DEFAULT_TOPICS_PER_PAGE, detailed=False, + name=None): + + query = utils.scoped_query(marker, project, name, kfilter, + key_value='p_t') + + projection = {'p_t': 1, '_id': 0} + if detailed: + projection['m'] = 1 + + cursor = self._collection.find(query, projection=projection) + cursor = cursor.limit(limit).sort('p_t') + marker_name = {} + + def normalizer(record): + topic = {'name': utils.descope_queue_name(record['p_t'])} + marker_name['next'] = topic['name'] + if detailed: + topic['metadata'] = record['m'] + return topic + + yield utils.HookedCursor(cursor, normalizer) + yield marker_name and marker_name['next'] + + @utils.raises_conn_error + @utils.retries_on_autoreconnect + def get_metadata(self, name, project=None): + queue = self._collection.find_one(_get_scoped_query(name, project), + projection={'m': 1, '_id': 0}) + if queue is None: + raise errors.TopicDoesNotExist(name, project) + + return queue.get('m', {}) + + @utils.raises_conn_error + # @utils.retries_on_autoreconnect + def _create(self, name, metadata=None, project=None): + # NOTE(flaper87): If the connection fails after it was called + # and we retry to insert the topic, we could end up returning + # `False` because of the `DuplicatedKeyError` although the + # topic was indeed created by this API call. + # + # TODO(kgriffs): Commented out `retries_on_autoreconnect` for + # now due to the above issue, since creating a topic is less + # important to make super HA. + + try: + # NOTE(kgriffs): Start counting at 1, and assume the first + # message ever posted will succeed and set t to a UNIX + # "modified at" timestamp. + counter = {'v': 1, 't': 0} + + scoped_name = utils.scope_queue_name(name, project) + self._collection.insert_one( + {'p_t': scoped_name, 'm': metadata or {}, + 'c': counter}) + + except pymongo.errors.DuplicateKeyError: + return False + else: + return True + + # NOTE(kgriffs): Only cache when it exists; if it doesn't exist, and + # someone creates it, we want it to be immediately visible. + @utils.raises_conn_error + @utils.retries_on_autoreconnect + @decorators.caches(_topic_exists_key, _TOPIC_CACHE_TTL, lambda v: v) + def _exists(self, name, project=None): + query = _get_scoped_query(name, project) + return self._collection.find_one(query) is not None + + @utils.raises_conn_error + @utils.retries_on_autoreconnect + def set_metadata(self, name, metadata, project=None): + rst = self._collection.update_one(_get_scoped_query(name, project), + {'$set': {'m': metadata}}) + + if rst.matched_count == 0: + raise errors.TopicDoesNotExist(name, project) + + @utils.raises_conn_error + @utils.retries_on_autoreconnect + @_exists.purges + def _delete(self, name, project=None): + self._collection.delete_one(_get_scoped_query(name, project)) + + @utils.raises_conn_error + @utils.retries_on_autoreconnect + def _stats(self, name, project=None): + pass + + +def _get_scoped_query(name, project): + return {'p_t': utils.scope_queue_name(name, project)} diff --git a/zaqar/storage/mongodb/utils.py b/zaqar/storage/mongodb/utils.py index 58c9682e1..2227b3385 100644 --- a/zaqar/storage/mongodb/utils.py +++ b/zaqar/storage/mongodb/utils.py @@ -38,6 +38,8 @@ EPOCH = datetime.datetime.utcfromtimestamp(0).replace(tzinfo=tz_util.utc) # NOTE(cpp-cabrera): the authoritative form of project/queue keys. PROJ_QUEUE_KEY = 'p_q' +PROJ_TOPIC_KEY = 'p_t' + LOG = logging.getLogger(__name__) @@ -191,7 +193,8 @@ def parse_scoped_project_queue(scoped_name): return scoped_name.split('/') -def scoped_query(queue, project, name=None, kfilter={}): +def scoped_query(queue, project, name=None, kfilter={}, + key_value=PROJ_QUEUE_KEY): """Returns a dict usable for querying for scoped project/queues. :param queue: name of queue to seek @@ -201,7 +204,7 @@ def scoped_query(queue, project, name=None, kfilter={}): :returns: query to issue :rtype: dict """ - key = PROJ_QUEUE_KEY + key = key_value query = {} scoped_name = scope_queue_name(queue, project) diff --git a/zaqar/storage/pipeline.py b/zaqar/storage/pipeline.py index f49439608..3309f72f6 100644 --- a/zaqar/storage/pipeline.py +++ b/zaqar/storage/pipeline.py @@ -159,3 +159,11 @@ class DataDriver(base.DataDriverBase): stages.extend(_get_storage_pipeline('subscription', self.conf)) stages.append(self._storage.subscription_controller) return common.Pipeline(stages) + + @decorators.lazy_property(write=False) + def topic_controller(self): + stages = _get_builtin_entry_points('topic', self._storage, + self.control_driver, self.conf) + stages.extend(_get_storage_pipeline('topic', self.conf)) + stages.append(self._storage.topic_controller) + return common.Pipeline(stages) diff --git a/zaqar/storage/pooling.py b/zaqar/storage/pooling.py index 327324329..fae42eb60 100644 --- a/zaqar/storage/pooling.py +++ b/zaqar/storage/pooling.py @@ -147,6 +147,14 @@ class DataDriver(storage.DataDriverBase): else: return controller + @decorators.lazy_property(write=False) + def topic_controller(self): + controller = TopicController(self._pool_catalog) + if self.conf.profiler.enabled: + return profiler.trace_cls("pooling_topic_controller")(controller) + else: + return controller + class QueueController(storage.Queue): """Routes operations to get the appropriate queue controller. @@ -635,6 +643,20 @@ class Catalog(object): target = self.lookup(queue, project) return target and target.subscription_controller + def get_topic_controller(self, topic, project=None): + """Lookup the topic controller for the given queue and project. + + :param topic: Name of the topic for which to find a pool + :param project: Project to which the topic belongs, or + None to specify the "global" or "generic" project. + + :returns: The topic controller associated with the data driver for + the pool containing (queue, project) or None if this doesn't exist. + :rtype: Maybe TopicController + """ + target = self.lookup(topic, project) + return target and target.topic_controller + def get_default_pool(self, use_listing=True): if use_listing: cursor = self._pools_ctrl.list(limit=0) @@ -714,3 +736,112 @@ class Catalog(object): self._drivers[pool_id] = self._init_driver(pool_id, pool_conf) return self._drivers[pool_id] + + +class TopicController(storage.Topic): + """Routes operations to get the appropriate topic controller. + + :param pool_catalog: a catalog of available pools + :type pool_catalog: queues.pooling.base.Catalog + """ + + def __init__(self, pool_catalog): + super(TopicController, self).__init__(None) + self._pool_catalog = pool_catalog + self._mgt_topic_ctrl = self._pool_catalog.control.topic_controller + self._get_controller = self._pool_catalog.get_topic_controller + + def _list(self, project=None, kfilter={}, marker=None, + limit=storage.DEFAULT_TOPICS_PER_PAGE, detailed=False, + name=None): + + def all_pages(): + yield next(self._mgt_topic_ctrl.list( + project=project, + kfilter=kfilter, + marker=marker, + limit=limit, + detailed=detailed, + name=name)) + + # make a heap compared with 'name' + ls = heapq.merge(*[ + utils.keyify('name', page) + for page in all_pages() + ]) + + marker_name = {} + + # limit the iterator and strip out the comparison wrapper + def it(): + for topic_cmp in itertools.islice(ls, limit): + marker_name['next'] = topic_cmp.obj['name'] + yield topic_cmp.obj + + yield it() + yield marker_name and marker_name['next'] + + def _get(self, name, project=None): + try: + return self.get_metadata(name, project) + except errors.TopicDoesNotExist: + return {} + + def _create(self, name, metadata=None, project=None): + flavor = None + if isinstance(metadata, dict): + flavor = metadata.get('_flavor') + + self._pool_catalog.register(name, project=project, flavor=flavor) + + # NOTE(cpp-cabrera): This should always succeed since we just + # registered the project/topic. There is a race condition, + # however. If between the time we register a topic and go to + # look it up, the topic is deleted, then this assertion will + # fail. + pool = self._pool_catalog.lookup(name, project) + if not pool: + raise RuntimeError('Failed to register topic') + return self._mgt_topic_ctrl.create(name, metadata=metadata, + project=project) + + def _delete(self, name, project=None): + mtHandler = self._get_controller(name, project) + if mtHandler: + # NOTE(cpp-cabrera): delete from the catalogue first. If + # zaqar crashes in the middle of these two operations, + # it is desirable that the entry be missing from the + # catalogue and present in storage, rather than the + # reverse. The former case leads to all operations + # behaving as expected: 404s across the board, and a + # functionally equivalent 204 on a create queue. The + # latter case is more difficult to reason about, and may + # yield 500s in some operations. + self._pool_catalog.deregister(name, project) + mtHandler.delete(name, project) + + return self._mgt_topic_ctrl.delete(name, project) + + def _exists(self, name, project=None): + return self._mgt_topic_ctrl.exists(name, project=project) + + def get_metadata(self, name, project=None): + return self._mgt_topic_ctrl.get_metadata(name, project=project) + + def set_metadata(self, name, metadata, project=None): + # NOTE(gengchc2): If flavor metadata is modified in topic, + # The topic needs to be re-registered to pools, otherwise + # the topic flavor parameter is not consistent with the pool. + flavor = None + if isinstance(metadata, dict): + flavor = metadata.get('_flavor') + self._pool_catalog.register(name, project=project, flavor=flavor) + + return self._mgt_topic_ctrl.set_metadata(name, metadata=metadata, + project=project) + + def _stats(self, name, project=None): + mtHandler = self._get_controller(name, project) + if mtHandler: + return mtHandler.stats(name, project=project) + raise errors.TopicDoesNotExist(name, project) diff --git a/zaqar/storage/redis/driver.py b/zaqar/storage/redis/driver.py index ef891ac51..e8eba6e49 100644 --- a/zaqar/storage/redis/driver.py +++ b/zaqar/storage/redis/driver.py @@ -296,6 +296,10 @@ class ControlDriver(storage.ControlDriverBase): else: return controller + @decorators.lazy_property(write=False) + def topic_controller(self): + pass + def _get_redis_client(driver): conf = driver.redis_conf diff --git a/zaqar/storage/redis/messages.py b/zaqar/storage/redis/messages.py index 9f35ad080..183f0fb51 100644 --- a/zaqar/storage/redis/messages.py +++ b/zaqar/storage/redis/messages.py @@ -637,3 +637,53 @@ class MessageQueueHandler(object): message_stats['oldest'] = oldest return {'messages': message_stats} + + +class MessageTopicHandler(object): + def __init__(self, driver, control_driver): + self.driver = driver + self._client = self.driver.connection + self._topic_ctrl = self.driver.topic_controller + self._message_ctrl = self.driver.message_controller + + @utils.raises_conn_error + def create(self, name, metadata=None, project=None): + with self._client.pipeline() as pipe: + self._message_ctrl._create_msgset(name, project, pipe) + + try: + pipe.execute() + except redis.exceptions.ResponseError: + return False + + @utils.raises_conn_error + @utils.retries_on_connection_error + def delete(self, name, project=None): + with self._client.pipeline() as pipe: + self._message_ctrl._delete_msgset(name, project, pipe) + self._message_ctrl._delete_queue_messages(name, project, pipe) + pipe.execute() + + @utils.raises_conn_error + @utils.retries_on_connection_error + def stats(self, name, project=None): + if not self._topic_ctrl.exists(name, project=project): + raise errors.TopicDoesNotExist(name, project) + + total = self._message_ctrl._count(name, project) + + message_stats = { + 'total': total + } + + if total: + try: + newest = self._message_ctrl.first(name, project, -1) + oldest = self._message_ctrl.first(name, project, 1) + except errors.QueueIsEmpty: + pass + else: + message_stats['newest'] = newest + message_stats['oldest'] = oldest + + return {'messages': message_stats} diff --git a/zaqar/storage/sqlalchemy/driver.py b/zaqar/storage/sqlalchemy/driver.py index 966d63540..b74acbae8 100644 --- a/zaqar/storage/sqlalchemy/driver.py +++ b/zaqar/storage/sqlalchemy/driver.py @@ -110,3 +110,7 @@ class ControlDriver(storage.ControlDriverBase): "controller")(controller) else: return controller + + @property + def topic_controller(self): + pass diff --git a/zaqar/storage/swift/messages.py b/zaqar/storage/swift/messages.py index c8ddae74e..833b85e07 100644 --- a/zaqar/storage/swift/messages.py +++ b/zaqar/storage/swift/messages.py @@ -386,3 +386,89 @@ class MessageQueueHandler(object): raise else: return True + + +class MessageTopicHandler(object): + def __init__(self, driver, control_driver): + self.driver = driver + self._client = self.driver.connection + self._topic_ctrl = self.driver.topic_controller + self._message_ctrl = self.driver.message_controller + + def create(self, name, metadata=None, project=None): + self._client.put_container(utils._message_container(name, project)) + + def delete(self, name, project=None): + for container in [utils._message_container(name, project)]: + try: + headers, objects = self._client.get_container(container) + except swiftclient.ClientException as exc: + if exc.http_status != 404: + raise + else: + for obj in objects: + try: + self._client.delete_object(container, obj['name']) + except swiftclient.ClientException as exc: + if exc.http_status != 404: + raise + try: + self._client.delete_container(container) + except swiftclient.ClientException as exc: + if exc.http_status not in (404, 409): + raise + + def stats(self, name, project=None): + if not self._topic_ctrl.exists(name, project=project): + raise errors.TopicDoesNotExist(name, project) + + total = 0 + container = utils._message_container(name, project) + + try: + _, objects = self._client.get_container(container) + except swiftclient.ClientException as exc: + if exc.http_status == 404: + raise errors.QueueIsEmpty(name, project) + + newest = None + oldest = None + now = timeutils.utcnow_ts(True) + for obj in objects: + try: + headers = self._client.head_object(container, obj['name']) + except swiftclient.ClientException as exc: + if exc.http_status != 404: + raise + else: + created = float(headers['x-timestamp']) + created_iso = datetime.datetime.utcfromtimestamp( + created).strftime('%Y-%m-%dT%H:%M:%SZ') + newest = { + 'id': obj['name'], + 'age': now - created, + 'created': created_iso} + if oldest is None: + oldest = copy.deepcopy(newest) + total += 1 + + msg_stats = { + 'total': total, + } + if newest is not None: + msg_stats['newest'] = newest + msg_stats['oldest'] = oldest + + return {'messages': msg_stats} + + def exists(self, topic, project=None): + try: + self._client.head_container(utils._message_container(topic, + project)) + + except swiftclient.ClientException as exc: + if exc.http_status == 404: + return False + raise + else: + return True diff --git a/zaqar/tests/faulty_storage.py b/zaqar/tests/faulty_storage.py index b9bbea7a5..2be44faf4 100644 --- a/zaqar/tests/faulty_storage.py +++ b/zaqar/tests/faulty_storage.py @@ -61,6 +61,10 @@ class DataDriver(storage.DataDriverBase): def subscription_controller(self): return None + @property + def topic_controller(self): + return self.control_driver.topic_controller + class ControlDriver(storage.ControlDriverBase): @@ -86,6 +90,10 @@ class ControlDriver(storage.ControlDriverBase): def flavors_controller(self): return None + @property + def topic_controller(self): + return TopicController(self) + class QueueController(storage.Queue): def __init__(self, driver): @@ -144,3 +152,32 @@ class MessageController(storage.Message): def bulk_delete(self, queue, message_ids, project=None, claim_ids=None): raise NotImplementedError() + + +class TopicController(storage.Topic): + def __init__(self, driver): + pass + + def _list(self, project=None): + raise NotImplementedError() + + def _get(self, name, project=None): + raise NotImplementedError() + + def get_metadata(self, name, project=None): + raise NotImplementedError() + + def _create(self, name, metadata=None, project=None): + raise NotImplementedError() + + def _exists(self, name, project=None): + raise NotImplementedError() + + def set_metadata(self, name, metadata, project=None): + raise NotImplementedError() + + def _delete(self, name, project=None): + raise NotImplementedError() + + def _stats(self, name, project=None): + raise NotImplementedError() diff --git a/zaqar/tests/unit/transport/wsgi/v2_0/test_topic_lifecycle.py b/zaqar/tests/unit/transport/wsgi/v2_0/test_topic_lifecycle.py new file mode 100644 index 000000000..99f0948f3 --- /dev/null +++ b/zaqar/tests/unit/transport/wsgi/v2_0/test_topic_lifecycle.py @@ -0,0 +1,608 @@ +# Copyright (c) 2019 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy +# of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + + +import ddt +import falcon +import mock +from oslo_serialization import jsonutils +from oslo_utils import uuidutils +import six + +from zaqar.storage import errors as storage_errors +from zaqar import tests as testing +from zaqar.tests.unit.transport.wsgi import base + + +@ddt.ddt +class TestTopicLifecycleMongoDB(base.V2Base): + + config_file = 'wsgi_mongodb.conf' + + @testing.requires_mongodb + def setUp(self): + super(TestTopicLifecycleMongoDB, self).setUp() + + self.topic_path = self.url_prefix + '/topics' + self.mars_topic_path = self.topic_path + '/mars' + self.venus_topic_path = self.topic_path + '/venus' + + self.headers = { + 'Client-ID': uuidutils.generate_uuid(), + 'X-Project-ID': '3387309841abc_' + } + + def tearDown(self): + control = self.boot.control + storage = self.boot.storage._storage + connection = storage.connection + + connection.drop_database(control.topics_database) + + for db in storage.message_databases: + connection.drop_database(db) + + super(TestTopicLifecycleMongoDB, self).tearDown() + + def test_without_project_id(self): + headers = { + 'Client-ID': uuidutils.generate_uuid(), + } + + self.simulate_put(self.mars_topic_path, headers=headers, + need_project_id=False) + self.assertEqual(falcon.HTTP_400, self.srmock.status) + + self.simulate_delete(self.mars_topic_path, headers=headers, + need_project_id=False) + self.assertEqual(falcon.HTTP_400, self.srmock.status) + + def test_empty_project_id(self): + headers = { + 'Client-ID': uuidutils.generate_uuid(), + 'X-Project-ID': '' + } + + self.simulate_put(self.mars_topic_path, headers=headers) + self.assertEqual(falcon.HTTP_400, self.srmock.status) + + self.simulate_delete(self.mars_topic_path, headers=headers) + self.assertEqual(falcon.HTTP_400, self.srmock.status) + + @ddt.data('480924', 'foo') + def test_basics_thoroughly(self, project_id): + headers = { + 'Client-ID': uuidutils.generate_uuid(), + 'X-Project-ID': project_id + } + mars_topic_path_stats = self.mars_topic_path + '/stats' + + # Stats are empty - topic not created yet + self.simulate_get(mars_topic_path_stats, headers=headers) + self.assertEqual(falcon.HTTP_200, self.srmock.status) + + # Create + doc = '{"messages": {"ttl": 600}}' + self.simulate_put(self.mars_topic_path, + headers=headers, body=doc) + self.assertEqual(falcon.HTTP_201, self.srmock.status) + + location = self.srmock.headers_dict['Location'] + self.assertEqual(location, self.mars_topic_path) + + # Fetch metadata + result = self.simulate_get(self.mars_topic_path, + headers=headers) + result_doc = jsonutils.loads(result[0]) + self.assertEqual(falcon.HTTP_200, self.srmock.status) + ref_doc = jsonutils.loads(doc) + ref_doc['_default_message_ttl'] = 3600 + ref_doc['_max_messages_post_size'] = 262144 + ref_doc['_default_message_delay'] = 0 + self.assertEqual(ref_doc, result_doc) + + # Stats empty topic + self.simulate_get(mars_topic_path_stats, headers=headers) + self.assertEqual(falcon.HTTP_200, self.srmock.status) + + # Delete + self.simulate_delete(self.mars_topic_path, headers=headers) + self.assertEqual(falcon.HTTP_204, self.srmock.status) + + # Get non-existent stats + self.simulate_get(mars_topic_path_stats, headers=headers) + self.assertEqual(falcon.HTTP_200, self.srmock.status) + + @ddt.data('1234567890', '11111111111111111111111111111111111') + def test_basics_thoroughly_with_different_client_id(self, client_id): + self.conf.set_override('client_id_uuid_safe', 'off', 'transport') + headers = { + 'Client-ID': client_id, + 'X-Project-ID': '480924' + } + mars_topic_path_stats = self.mars_topic_path + '/stats' + + # Stats are empty - topic not created yet + self.simulate_get(mars_topic_path_stats, headers=headers) + self.assertEqual(falcon.HTTP_200, self.srmock.status) + + # Create + doc = '{"messages": {"ttl": 600}}' + self.simulate_put(self.mars_topic_path, + headers=headers, body=doc) + self.assertEqual(falcon.HTTP_201, self.srmock.status) + + location = self.srmock.headers_dict['Location'] + self.assertEqual(location, self.mars_topic_path) + + # Fetch metadata + result = self.simulate_get(self.mars_topic_path, + headers=headers) + result_doc = jsonutils.loads(result[0]) + self.assertEqual(falcon.HTTP_200, self.srmock.status) + ref_doc = jsonutils.loads(doc) + ref_doc['_default_message_ttl'] = 3600 + ref_doc['_max_messages_post_size'] = 262144 + ref_doc['_default_message_delay'] = 0 + self.assertEqual(ref_doc, result_doc) + + # Stats empty topic + self.simulate_get(mars_topic_path_stats, headers=headers) + self.assertEqual(falcon.HTTP_200, self.srmock.status) + + # Delete + self.simulate_delete(self.mars_topic_path, headers=headers) + self.assertEqual(falcon.HTTP_204, self.srmock.status) + + # Get non-existent stats + self.simulate_get(mars_topic_path_stats, headers=headers) + self.assertEqual(falcon.HTTP_200, self.srmock.status) + + def test_name_restrictions(self): + self.simulate_put(self.topic_path + '/Nice-Boat_2', + headers=self.headers) + self.assertEqual(falcon.HTTP_201, self.srmock.status) + + self.simulate_put(self.topic_path + '/Nice-Bo@t', + headers=self.headers) + self.assertEqual(falcon.HTTP_400, self.srmock.status) + + self.simulate_put(self.topic_path + '/_' + 'niceboat' * 8, + headers=self.headers) + self.assertEqual(falcon.HTTP_400, self.srmock.status) + + self.simulate_put(self.topic_path + '/Service.test_topic', + headers=self.headers) + self.assertEqual(falcon.HTTP_201, self.srmock.status) + + def test_project_id_restriction(self): + muvluv_topic_path = self.topic_path + '/Muv-Luv' + + self.simulate_put(muvluv_topic_path, + headers={'Client-ID': uuidutils.generate_uuid(), + 'X-Project-ID': 'JAM Project' * 24}) + self.assertEqual(falcon.HTTP_400, self.srmock.status) + + # no charset restrictions + self.simulate_put(muvluv_topic_path, + headers={'Client-ID': uuidutils.generate_uuid(), + 'X-Project-ID': 'JAM Project'}) + self.assertEqual(falcon.HTTP_201, self.srmock.status) + + def test_non_ascii_name(self): + test_params = ((u'/topics/non-ascii-n\u0153me', 'utf-8'), + (u'/topics/non-ascii-n\xc4me', 'iso8859-1')) + + for uri, enc in test_params: + uri = self.url_prefix + uri + + if six.PY2: + uri = uri.encode(enc) + + self.simulate_put(uri, headers=self.headers) + self.assertEqual(falcon.HTTP_400, self.srmock.status) + + self.simulate_delete(uri, headers=self.headers) + self.assertEqual(falcon.HTTP_400, self.srmock.status) + + def test_no_metadata(self): + self.simulate_put(self.venus_topic_path, + headers=self.headers) + self.assertEqual(falcon.HTTP_201, self.srmock.status) + + self.simulate_put(self.venus_topic_path, body='', + headers=self.headers) + self.assertEqual(falcon.HTTP_204, self.srmock.status) + + result = self.simulate_get(self.venus_topic_path, + headers=self.headers) + result_doc = jsonutils.loads(result[0]) + self.assertEqual(256 * 1024, + result_doc.get('_max_messages_post_size')) + self.assertEqual(3600, + result_doc.get('_default_message_ttl')) + self.assertEqual(0, + result_doc.get('_default_message_delay')) + + @ddt.data('{', '[]', '.', ' ') + def test_bad_metadata(self, document): + self.simulate_put(self.venus_topic_path, + headers=self.headers, + body=document) + self.assertEqual(falcon.HTTP_400, self.srmock.status) + + def test_too_much_metadata(self): + self.simulate_put(self.venus_topic_path, headers=self.headers) + self.assertEqual(falcon.HTTP_201, self.srmock.status) + doc = '{{"messages": {{"ttl": 600}}, "padding": "{pad}"}}' + + max_size = self.transport_cfg.max_queue_metadata + padding_len = max_size - (len(doc) - 10) + 1 + + doc = doc.format(pad='x' * padding_len) + + self.simulate_put(self.venus_topic_path, + headers=self.headers, + body=doc) + self.assertEqual(falcon.HTTP_400, self.srmock.status) + + def test_way_too_much_metadata(self): + self.simulate_put(self.venus_topic_path, headers=self.headers) + self.assertEqual(falcon.HTTP_201, self.srmock.status) + doc = '{{"messages": {{"ttl": 600}}, "padding": "{pad}"}}' + + max_size = self.transport_cfg.max_queue_metadata + padding_len = max_size * 100 + + doc = doc.format(pad='x' * padding_len) + + self.simulate_put(self.venus_topic_path, + headers=self.headers, body=doc) + self.assertEqual(falcon.HTTP_400, self.srmock.status) + + def test_custom_metadata(self): + # Set + doc = '{{"messages": {{"ttl": 600}}, "padding": "{pad}"}}' + + max_size = self.transport_cfg.max_queue_metadata + padding_len = max_size - (len(doc) - 2) + + doc = doc.format(pad='x' * padding_len) + self.simulate_put(self.venus_topic_path, + headers=self.headers, + body=doc) + self.assertEqual(falcon.HTTP_201, self.srmock.status) + + # Get + result = self.simulate_get(self.venus_topic_path, + headers=self.headers) + result_doc = jsonutils.loads(result[0]) + ref_doc = jsonutils.loads(doc) + ref_doc['_default_message_ttl'] = 3600 + ref_doc['_max_messages_post_size'] = 262144 + ref_doc['_default_message_delay'] = 0 + self.assertEqual(ref_doc, result_doc) + self.assertEqual(falcon.HTTP_200, self.srmock.status) + + def test_update_metadata(self): + xyz_topic_path = self.url_prefix + '/topics/xyz' + xyz_topic_path_metadata = xyz_topic_path + headers = { + 'Client-ID': uuidutils.generate_uuid(), + 'X-Project-ID': uuidutils.generate_uuid() + } + # Create + self.simulate_put(xyz_topic_path, headers=headers) + self.assertEqual(falcon.HTTP_201, self.srmock.status) + + headers.update({'Content-Type': + "application/openstack-messaging-v2.0-json-patch"}) + # add metadata + doc1 = ('[{"op":"add", "path": "/metadata/key1", "value": 1},' + '{"op":"add", "path": "/metadata/key2", "value": 1}]') + self.simulate_patch(xyz_topic_path_metadata, + headers=headers, + body=doc1) + self.assertEqual(falcon.HTTP_200, self.srmock.status) + + # remove reserved metadata, zaqar will do nothing and return 200, + # because + doc3 = '[{"op":"remove", "path": "/metadata/_default_message_ttl"}]' + self.simulate_patch(xyz_topic_path_metadata, + headers=headers, + body=doc3) + self.assertEqual(falcon.HTTP_200, self.srmock.status) + + # replace metadata + doc2 = '[{"op":"replace", "path": "/metadata/key1", "value": 2}]' + self.simulate_patch(xyz_topic_path_metadata, + headers=headers, + body=doc2) + self.assertEqual(falcon.HTTP_200, self.srmock.status) + + # replace reserved metadata, zaqar will store the reserved metadata + doc2 = ('[{"op":"replace", "path": "/metadata/_default_message_ttl",' + '"value": 300}]') + self.simulate_patch(xyz_topic_path_metadata, + headers=headers, + body=doc2) + self.assertEqual(falcon.HTTP_200, self.srmock.status) + + # Get + result = self.simulate_get(xyz_topic_path_metadata, + headers=headers) + result_doc = jsonutils.loads(result[0]) + self.assertEqual({'key1': 2, 'key2': 1, + '_default_message_ttl': 300, + '_max_messages_post_size': 262144, + '_default_message_delay': 0}, result_doc) + + # remove metadata + doc3 = '[{"op":"remove", "path": "/metadata/key1"}]' + self.simulate_patch(xyz_topic_path_metadata, + headers=headers, + body=doc3) + self.assertEqual(falcon.HTTP_200, self.srmock.status) + + # remove reserved metadata + doc3 = '[{"op":"remove", "path": "/metadata/_default_message_ttl"}]' + self.simulate_patch(xyz_topic_path_metadata, + headers=headers, + body=doc3) + self.assertEqual(falcon.HTTP_200, self.srmock.status) + + # Get + result = self.simulate_get(xyz_topic_path_metadata, + headers=headers) + result_doc = jsonutils.loads(result[0]) + self.assertEqual({'key2': 1, '_default_message_ttl': 3600, + '_max_messages_post_size': 262144, + '_default_message_delay': 0}, result_doc) + + # replace non-existent metadata + doc4 = '[{"op":"replace", "path": "/metadata/key3", "value":2}]' + self.simulate_patch(xyz_topic_path_metadata, + headers=headers, + body=doc4) + self.assertEqual(falcon.HTTP_409, self.srmock.status) + + # remove non-existent metadata + doc5 = '[{"op":"remove", "path": "/metadata/key3"}]' + self.simulate_patch(xyz_topic_path_metadata, + headers=headers, + body=doc5) + self.assertEqual(falcon.HTTP_409, self.srmock.status) + + self.simulate_delete(xyz_topic_path, headers=headers) + + # add metadata to non-existent topic + doc1 = ('[{"op":"add", "path": "/metadata/key1", "value": 1},' + '{"op":"add", "path": "/metadata/key2", "value": 1}]') + self.simulate_patch(xyz_topic_path_metadata, + headers=headers, + body=doc1) + self.assertEqual(falcon.HTTP_404, self.srmock.status) + + # replace metadata in non-existent topic + doc4 = '[{"op":"replace", "path": "/metadata/key3", "value":2}]' + self.simulate_patch(xyz_topic_path_metadata, + headers=headers, + body=doc4) + self.assertEqual(falcon.HTTP_404, self.srmock.status) + + # remove metadata from non-existent topic + doc5 = '[{"op":"remove", "path": "/metadata/key3"}]' + self.simulate_patch(xyz_topic_path_metadata, + headers=headers, + body=doc5) + self.assertEqual(falcon.HTTP_404, self.srmock.status) + + def test_list(self): + arbitrary_number = 644079696574693 + project_id = str(arbitrary_number) + client_id = uuidutils.generate_uuid() + header = { + 'X-Project-ID': project_id, + 'Client-ID': client_id + } + + # NOTE(kgriffs): It's important that this one sort after the one + # above. This is in order to prove that bug/1236605 is fixed, and + # stays fixed! + alt_project_id = str(arbitrary_number + 1) + + # List empty + result = self.simulate_get(self.topic_path, headers=header) + self.assertEqual(falcon.HTTP_200, self.srmock.status) + results = jsonutils.loads(result[0]) + self.assertEqual([], results['topics']) + self.assertIn('links', results) + self.assertEqual(0, len(results['links'])) + + # Payload exceeded + self.simulate_get(self.topic_path, headers=header, + query_string='limit=21') + self.assertEqual(falcon.HTTP_400, self.srmock.status) + + # Create some + def create_topic(name, project_id, body): + altheader = {'Client-ID': client_id} + if project_id is not None: + altheader['X-Project-ID'] = project_id + uri = self.topic_path + '/' + name + self.simulate_put(uri, headers=altheader, body=body) + + create_topic('q1', project_id, '{"node": 31}') + create_topic('q2', project_id, '{"node": 32}') + create_topic('q3', project_id, '{"node": 33}') + + create_topic('q3', alt_project_id, '{"alt": 1}') + + # List (limit) + result = self.simulate_get(self.topic_path, headers=header, + query_string='limit=2') + + result_doc = jsonutils.loads(result[0]) + self.assertEqual(2, len(result_doc['topics'])) + + # List (no metadata, get all) + result = self.simulate_get(self.topic_path, + headers=header, query_string='limit=5') + + result_doc = jsonutils.loads(result[0]) + [target, params] = result_doc['links'][0]['href'].split('?') + self.simulate_get(target, headers=header, query_string=params) + self.assertEqual(falcon.HTTP_200, self.srmock.status) + + # Ensure we didn't pick up the topic from the alt project. + topics = result_doc['topics'] + self.assertEqual(3, len(topics)) + + # List with metadata + result = self.simulate_get(self.topic_path, headers=header, + query_string='detailed=true') + + self.assertEqual(falcon.HTTP_200, self.srmock.status) + result_doc = jsonutils.loads(result[0]) + [target, params] = result_doc['links'][0]['href'].split('?') + + topic = result_doc['topics'][0] + result = self.simulate_get(topic['href'], headers=header) + result_doc = jsonutils.loads(result[0]) + self.assertEqual(topic['metadata'], result_doc) + self.assertEqual({'node': 31, '_default_message_ttl': 3600, + '_max_messages_post_size': 262144, + '_default_message_delay': 0}, result_doc) + + # topic filter + result = self.simulate_get(self.topic_path, headers=header, + query_string='node=34') + self.assertEqual(falcon.HTTP_200, self.srmock.status) + result_doc = jsonutils.loads(result[0]) + self.assertEqual(0, len(result_doc['topics'])) + + # List tail + self.simulate_get(target, headers=header, query_string=params) + self.assertEqual(falcon.HTTP_200, self.srmock.status) + + # List manually-constructed tail + self.simulate_get(target, headers=header, query_string='marker=zzz') + self.assertEqual(falcon.HTTP_200, self.srmock.status) + + def test_list_returns_503_on_nopoolfound_exception(self): + arbitrary_number = 644079696574693 + project_id = str(arbitrary_number) + client_id = uuidutils.generate_uuid() + header = { + 'X-Project-ID': project_id, + 'Client-ID': client_id + } + + topic_controller = self.boot.storage.topic_controller + + with mock.patch.object(topic_controller, 'list') as mock_topic_list: + + def topic_generator(): + raise storage_errors.NoPoolFound() + + # This generator tries to be like topic controller list generator + # in some ways. + def fake_generator(): + yield topic_generator() + yield {} + mock_topic_list.return_value = fake_generator() + self.simulate_get(self.topic_path, headers=header) + self.assertEqual(falcon.HTTP_503, self.srmock.status) + + def test_list_with_filter(self): + arbitrary_number = 644079696574693 + project_id = str(arbitrary_number) + client_id = uuidutils.generate_uuid() + header = { + 'X-Project-ID': project_id, + 'Client-ID': client_id + } + + # Create some + def create_topic(name, project_id, body): + altheader = {'Client-ID': client_id} + if project_id is not None: + altheader['X-Project-ID'] = project_id + uri = self.topic_path + '/' + name + self.simulate_put(uri, headers=altheader, body=body) + + create_topic('q1', project_id, '{"test_metadata_key1": "value1"}') + create_topic('q2', project_id, '{"_max_messages_post_size": 2000}') + create_topic('q3', project_id, '{"test_metadata_key2": 30}') + + # List (filter query) + result = self.simulate_get(self.topic_path, headers=header, + query_string='name=q&test_metadata_key2=30') + + result_doc = jsonutils.loads(result[0]) + self.assertEqual(1, len(result_doc['topics'])) + self.assertEqual('q3', result_doc['topics'][0]['name']) + + # List (filter query) + result = self.simulate_get(self.topic_path, headers=header, + query_string='_max_messages_post_size=2000') + + result_doc = jsonutils.loads(result[0]) + self.assertEqual(1, len(result_doc['topics'])) + self.assertEqual('q2', result_doc['topics'][0]['name']) + + # List (filter query) + result = self.simulate_get(self.topic_path, headers=header, + query_string='name=q') + + result_doc = jsonutils.loads(result[0]) + self.assertEqual(3, len(result_doc['topics'])) + + +class TestTopicLifecycleFaultyDriver(base.V2BaseFaulty): + + config_file = 'wsgi_faulty.conf' + + def test_simple(self): + self.headers = { + 'Client-ID': uuidutils.generate_uuid(), + 'X-Project-ID': '338730984abc_1' + } + + mars_topic_path = self.url_prefix + '/topics/mars' + doc = '{"messages": {"ttl": 600}}' + self.simulate_put(mars_topic_path, + headers=self.headers, + body=doc) + self.assertEqual(falcon.HTTP_503, self.srmock.status) + + location = ('Location', mars_topic_path) + self.assertNotIn(location, self.srmock.headers) + + result = self.simulate_get(mars_topic_path, + headers=self.headers) + result_doc = jsonutils.loads(result[0]) + self.assertEqual(falcon.HTTP_503, self.srmock.status) + self.assertNotEqual(result_doc, jsonutils.loads(doc)) + + self.simulate_get(mars_topic_path + '/stats', + headers=self.headers) + self.assertEqual(falcon.HTTP_503, self.srmock.status) + + self.simulate_get(self.url_prefix + '/topics', + headers=self.headers) + self.assertEqual(falcon.HTTP_503, self.srmock.status) + + self.simulate_delete(mars_topic_path, headers=self.headers) + self.assertEqual(falcon.HTTP_503, self.srmock.status) diff --git a/zaqar/transport/validation.py b/zaqar/transport/validation.py index 501f93610..e66784b3f 100644 --- a/zaqar/transport/validation.py +++ b/zaqar/transport/validation.py @@ -676,3 +676,27 @@ class Validator(object): self._limits_conf.max_length_client_id) if self._limits_conf.client_id_uuid_safe == 'strict': uuid.UUID(client_id) + + def topic_identification(self, topic, project): + """Restrictions on a project id & topic name pair. + + :param queue: Name of the topic + :param project: Project id + :raises ValidationFailed: if the `name` is longer than 64 + characters or contains anything other than ASCII digits and + letters, underscores, and dashes. Also raises if `project` + is not None but longer than 256 characters. + """ + + if project is not None and len(project) > PROJECT_ID_MAX_LEN: + msg = _(u'Project ids may not be more than {0} characters long.') + raise ValidationFailed(msg, PROJECT_ID_MAX_LEN) + + if len(topic) > QUEUE_NAME_MAX_LEN: + msg = _(u'Topic names may not be more than {0} characters long.') + raise ValidationFailed(msg, QUEUE_NAME_MAX_LEN) + + if not QUEUE_NAME_REGEX.match(topic): + raise ValidationFailed( + _(u'Topic names may only contain ASCII letters, digits, ' + 'underscores, and dashes.')) diff --git a/zaqar/transport/wsgi/driver.py b/zaqar/transport/wsgi/driver.py index afcc9cd5f..81516262b 100644 --- a/zaqar/transport/wsgi/driver.py +++ b/zaqar/transport/wsgi/driver.py @@ -72,6 +72,10 @@ class Driver(transport.DriverBase): return helpers.validate_queue_identification( self._validate.queue_identification, req, resp, params) + def _validate_topic_identification(self, req, resp, params): + return helpers.validate_topic_identification( + self._validate.topic_identification, req, resp, params) + def _require_client_id(self, req, resp, params): return helpers.require_client_id( self._validate.client_id_uuid_safe, req, resp, params) @@ -91,7 +95,10 @@ class Driver(transport.DriverBase): helpers.inject_context, # NOTE(kgriffs): Depends on project_id being extracted, above - self._validate_queue_identification + self._validate_queue_identification, + + # NOTE(kgriffs): Depends on project_id being extracted, above + self._validate_topic_identification ] def _init_routes(self): diff --git a/zaqar/transport/wsgi/v2_0/__init__.py b/zaqar/transport/wsgi/v2_0/__init__.py index cc4b5b0ce..266e27fea 100644 --- a/zaqar/transport/wsgi/v2_0/__init__.py +++ b/zaqar/transport/wsgi/v2_0/__init__.py @@ -24,6 +24,9 @@ from zaqar.transport.wsgi.v2_0 import purge from zaqar.transport.wsgi.v2_0 import queues from zaqar.transport.wsgi.v2_0 import stats from zaqar.transport.wsgi.v2_0 import subscriptions +from zaqar.transport.wsgi.v2_0 import topic +from zaqar.transport.wsgi.v2_0 import topic_purge +from zaqar.transport.wsgi.v2_0 import topic_stats from zaqar.transport.wsgi.v2_0 import urls @@ -52,6 +55,7 @@ def public_endpoints(driver, conf): message_controller = driver._storage.message_controller claim_controller = driver._storage.claim_controller subscription_controller = driver._storage.subscription_controller + topic_controller = driver._storage.topic_controller defaults = driver._defaults @@ -119,6 +123,42 @@ def public_endpoints(driver, conf): # Pre-Signed URL Endpoint ('/queues/{queue_name}/share', urls.Resource(driver)), + + # Topics Endpoints + ('/topics', + topic.CollectionResource(driver._validate, topic_controller)), + ('/topics/{topic_name}', + topic.ItemResource(driver._validate, topic_controller, + message_controller)), + ('/topics/{topic_name}/stats', + topic_stats.Resource(topic_controller)), + ('/topics/{topic_name}/purge', + topic_purge.Resource(driver)), + # Topic Messages Endpoints + ('/topics/{topic_name}/messages', + messages.CollectionResource(driver._wsgi_conf, + driver._validate, + message_controller, + topic_controller, + defaults.message_ttl)), + ('/topics/{topic_name}/messages/{message_id}', + messages.ItemResource(message_controller)), + # Topic Subscription Endpoints + ('/topics/{topic_name}/subscriptions', + subscriptions.CollectionResource(driver._validate, + subscription_controller, + defaults.subscription_ttl, + topic_controller, + conf)), + + ('/topics/{topic_name}/subscriptions/{subscription_id}', + subscriptions.ItemResource(driver._validate, + subscription_controller)), + + ('/topics/{topic_name}/subscriptions/{subscription_id}/confirm', + subscriptions.ConfirmResource(driver._validate, + subscription_controller, + conf)), ] diff --git a/zaqar/transport/wsgi/v2_0/topic.py b/zaqar/transport/wsgi/v2_0/topic.py new file mode 100644 index 000000000..c8171aad4 --- /dev/null +++ b/zaqar/transport/wsgi/v2_0/topic.py @@ -0,0 +1,333 @@ +# Copyright (c) 2019 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import falcon +from oslo_log import log as logging +import six + +from zaqar.common import decorators +from zaqar.i18n import _ +from zaqar.storage import errors as storage_errors +from zaqar.transport import acl +from zaqar.transport import utils +from zaqar.transport import validation +from zaqar.transport.wsgi import errors as wsgi_errors +from zaqar.transport.wsgi import utils as wsgi_utils + +LOG = logging.getLogger(__name__) + + +def _get_reserved_metadata(validate): + _reserved_metadata = ['max_messages_post_size', 'default_message_ttl', + 'default_message_delay'] + reserved_metadata = { + '_%s' % meta: + validate.get_limit_conf_value(meta) + for meta in _reserved_metadata + } + + return reserved_metadata + + +class ItemResource(object): + + __slots__ = ('_validate', '_topic_controller', '_message_controller', + '_reserved_metadata') + + def __init__(self, validate, topic_controller, message_controller): + self._validate = validate + self._topic_controller = topic_controller + self._message_controller = message_controller + + @decorators.TransportLog("Topics item") + @acl.enforce("topics:get") + def on_get(self, req, resp, project_id, topic_name): + try: + resp_dict = self._topic_controller.get(topic_name, + project=project_id) + for meta, value in _get_reserved_metadata(self._validate).items(): + if not resp_dict.get(meta): + resp_dict[meta] = value + except storage_errors.DoesNotExist as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPNotFound(six.text_type(ex)) + + except Exception as ex: + LOG.exception(ex) + description = _(u'Topic metadata could not be retrieved.') + raise wsgi_errors.HTTPServiceUnavailable(description) + + resp.body = utils.to_json(resp_dict) + # status defaults to 200 + + @decorators.TransportLog("Topics item") + @acl.enforce("topics:create") + def on_put(self, req, resp, project_id, topic_name): + try: + # Place JSON size restriction before parsing + self._validate.queue_metadata_length(req.content_length) + # Deserialize Topic metadata + metadata = None + if req.content_length: + document = wsgi_utils.deserialize(req.stream, + req.content_length) + metadata = wsgi_utils.sanitize(document) + self._validate.queue_metadata_putting(metadata) + except validation.ValidationFailed as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) + + try: + created = self._topic_controller.create(topic_name, + metadata=metadata, + project=project_id) + + except storage_errors.FlavorDoesNotExist as ex: + LOG.exception(ex) + raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) + except Exception as ex: + LOG.exception(ex) + description = _(u'Topic could not be created.') + raise wsgi_errors.HTTPServiceUnavailable(description) + + resp.status = falcon.HTTP_201 if created else falcon.HTTP_204 + resp.location = req.path + + @decorators.TransportLog("Topics item") + @acl.enforce("topics:delete") + def on_delete(self, req, resp, project_id, topic_name): + LOG.debug(u'Topic item DELETE - topic: %(topic)s, ' + u'project: %(project)s', + {'topic': topic_name, 'project': project_id}) + try: + self._topic_controller.delete(topic_name, project=project_id) + + except Exception as ex: + LOG.exception(ex) + description = _(u'Topic could not be deleted.') + raise wsgi_errors.HTTPServiceUnavailable(description) + + resp.status = falcon.HTTP_204 + + @decorators.TransportLog("Topics item") + @acl.enforce("topics:update") + def on_patch(self, req, resp, project_id, topic_name): + """Allows one to update a topic's metadata. + + This method expects the user to submit a JSON object. There is also + strict format checking through the use of + jsonschema. Appropriate errors are returned in each case for + badly formatted input. + + :returns: HTTP | 200,400,409,503 + """ + LOG.debug(u'PATCH topic - name: %s', topic_name) + + try: + # Place JSON size restriction before parsing + self._validate.queue_metadata_length(req.content_length) + except validation.ValidationFailed as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPBadRequestBody(six.text_type(ex)) + + # NOTE(flwang): See below link to get more details about draft 10, + # tools.ietf.org/html/draft-ietf-appsawg-json-patch-10 + content_types = { + 'application/openstack-messaging-v2.0-json-patch': 10, + } + + if req.content_type not in content_types: + headers = {'Accept-Patch': + ', '.join(sorted(content_types.keys()))} + msg = _("Accepted media type for PATCH: %s.") + LOG.debug(msg, headers) + raise wsgi_errors.HTTPUnsupportedMediaType(msg % headers) + + if req.content_length: + try: + changes = utils.read_json(req.stream, req.content_length) + changes = wsgi_utils.sanitize(changes, doctype=list) + except utils.MalformedJSON as ex: + LOG.debug(ex) + description = _(u'Request body could not be parsed.') + raise wsgi_errors.HTTPBadRequestBody(description) + + except utils.OverflowedJSONInteger as ex: + LOG.debug(ex) + description = _(u'JSON contains integer that is too large.') + raise wsgi_errors.HTTPBadRequestBody(description) + + except Exception as ex: + # Error while reading from the network/server + LOG.exception(ex) + description = _(u'Request body could not be read.') + raise wsgi_errors.HTTPServiceUnavailable(description) + else: + msg = _("PATCH body could not be empty for update.") + LOG.debug(msg) + raise wsgi_errors.HTTPBadRequestBody(msg) + + try: + changes = self._validate.queue_patching(req, changes) + + # NOTE(Eva-i): using 'get_metadata' instead of 'get', so + # QueueDoesNotExist error will be thrown in case of non-existent + # queue. + metadata = self._topic_controller.get_metadata(topic_name, + project=project_id) + reserved_metadata = _get_reserved_metadata(self._validate) + for change in changes: + change_method_name = '_do_%s' % change['op'] + change_method = getattr(self, change_method_name) + change_method(req, metadata, reserved_metadata, change) + + self._validate.queue_metadata_putting(metadata) + + self._topic_controller.set_metadata(topic_name, + metadata, + project_id) + except storage_errors.DoesNotExist as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPNotFound(six.text_type(ex)) + except validation.ValidationFailed as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPBadRequestBody(six.text_type(ex)) + except wsgi_errors.HTTPConflict as ex: + raise ex + except Exception as ex: + LOG.exception(ex) + description = _(u'Topic could not be updated.') + raise wsgi_errors.HTTPServiceUnavailable(description) + for meta, value in _get_reserved_metadata(self._validate).items(): + if not metadata.get(meta): + metadata[meta] = value + resp.body = utils.to_json(metadata) + + def _do_replace(self, req, metadata, reserved_metadata, change): + path = change['path'] + path_child = path[1] + value = change['value'] + if path_child in metadata or path_child in reserved_metadata: + metadata[path_child] = value + else: + msg = _("Can't replace non-existent object %s.") + raise wsgi_errors.HTTPConflict(msg % path_child) + + def _do_add(self, req, metadata, reserved_metadata, change): + path = change['path'] + path_child = path[1] + value = change['value'] + metadata[path_child] = value + + def _do_remove(self, req, metadata, reserved_metadata, change): + path = change['path'] + path_child = path[1] + if path_child in metadata: + metadata.pop(path_child) + elif path_child not in reserved_metadata: + msg = _("Can't remove non-existent object %s.") + raise wsgi_errors.HTTPConflict(msg % path_child) + + +class CollectionResource(object): + + __slots__ = ('_topic_controller', '_validate', '_reserved_metadata') + + def __init__(self, validate, topic_controller): + self._topic_controller = topic_controller + self._validate = validate + + def _topic_list(self, project_id, path, kfilter, **kwargs): + try: + self._validate.queue_listing(**kwargs) + results = self._topic_controller.list(project=project_id, + kfilter=kfilter, **kwargs) + + # Buffer list of topics + topics = list(next(results)) + + except validation.ValidationFailed as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) + + except Exception as ex: + LOG.exception(ex) + description = _(u'Topics could not be listed.') + raise wsgi_errors.HTTPServiceUnavailable(description) + + # Got some. Prepare the response. + kwargs['marker'] = next(results) or kwargs.get('marker', '') + reserved_metadata = _get_reserved_metadata(self._validate).items() + for each_topic in topics: + each_topic['href'] = path + '/' + each_topic['name'] + if kwargs.get('detailed'): + for meta, value in reserved_metadata: + if not each_topic.get('metadata', {}).get(meta): + each_topic['metadata'][meta] = value + + return topics, kwargs['marker'] + + def _on_get_with_kfilter(self, req, resp, project_id, kfilter={}): + kwargs = {} + + # NOTE(kgriffs): This syntax ensures that + # we don't clobber default values with None. + req.get_param('marker', store=kwargs) + req.get_param_as_int('limit', store=kwargs) + req.get_param_as_bool('detailed', store=kwargs) + req.get_param('name', store=kwargs) + + topics, marker = self._topic_list(project_id, + req.path, kfilter, **kwargs) + + links = [] + kwargs['marker'] = marker + if topics: + links = [ + { + 'rel': 'next', + 'href': req.path + falcon.to_query_str(kwargs) + } + ] + + response_body = { + 'topics': topics, + 'links': links + } + + resp.body = utils.to_json(response_body) + # status defaults to 200 + + @decorators.TransportLog("Topics collection") + @acl.enforce("topics:get_all") + def on_get(self, req, resp, project_id): + field = ('marker', 'limit', 'detailed', 'name') + kfilter = copy.deepcopy(req.params) + + for key in req.params.keys(): + if key in field: + kfilter.pop(key) + + kfilter = kfilter if len(kfilter) > 0 else {} + for key in kfilter.keys(): + # Since we get the filter value from URL, so need to + # turn the string to integer if using integer filter value. + try: + kfilter[key] = int(kfilter[key]) + except ValueError: + continue + self._on_get_with_kfilter(req, resp, project_id, kfilter) + # status defaults to 200 diff --git a/zaqar/transport/wsgi/v2_0/topic_purge.py b/zaqar/transport/wsgi/v2_0/topic_purge.py new file mode 100644 index 000000000..3869616bb --- /dev/null +++ b/zaqar/transport/wsgi/v2_0/topic_purge.py @@ -0,0 +1,82 @@ +# Copyright 2019 Catalyst IT Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy +# of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +import falcon + +from oslo_log import log as logging +import six + +from zaqar.common import decorators +from zaqar.i18n import _ +from zaqar.transport import acl +from zaqar.transport import validation +from zaqar.transport.wsgi import errors as wsgi_errors +from zaqar.transport.wsgi import utils as wsgi_utils + +LOG = logging.getLogger(__name__) + + +class Resource(object): + + __slots__ = ('_driver', '_conf', + '_message_ctrl', '_subscription_ctrl', '_validate') + + def __init__(self, driver): + self._driver = driver + self._conf = driver._conf + self._message_ctrl = driver._storage.message_controller + self._subscription_ctrl = driver._storage.subscription_controller + self._validate = driver._validate + + @decorators.TransportLog("Topics item") + @acl.enforce("topics:purge") + def on_post(self, req, resp, project_id, topic_name): + try: + if req.content_length: + document = wsgi_utils.deserialize(req.stream, + req.content_length) + self._validate.queue_purging(document) + else: + document = {'resource_types': ['messages', 'subscriptions']} + except validation.ValidationFailed as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) + + try: + if "messages" in document['resource_types']: + pop_limit = 100 + LOG.debug("Purge all messages under topic %s", topic_name) + messages = self._message_ctrl.pop(topic_name, pop_limit, + project=project_id) + while messages: + messages = self._message_ctrl.pop(topic_name, pop_limit, + project=project_id) + + if "subscriptions" in document['resource_types']: + LOG.debug("Purge all subscriptions under topic %s", topic_name) + results = self._subscription_ctrl.list(topic_name, + project=project_id) + subscriptions = list(next(results)) + for sub in subscriptions: + self._subscription_ctrl.delete(topic_name, + sub['id'], + project=project_id) + except ValueError as err: + raise wsgi_errors.HTTPBadRequestAPI(str(err)) + except Exception as ex: + LOG.exception(ex) + description = _(u'Topic could not be purged.') + raise wsgi_errors.HTTPServiceUnavailable(description) + + resp.status = falcon.HTTP_204 diff --git a/zaqar/transport/wsgi/v2_0/topic_stats.py b/zaqar/transport/wsgi/v2_0/topic_stats.py new file mode 100644 index 000000000..89c6c4f5c --- /dev/null +++ b/zaqar/transport/wsgi/v2_0/topic_stats.py @@ -0,0 +1,78 @@ +# Copyright (c) 2019 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging +import six + +from zaqar.common import decorators +from zaqar.i18n import _ +from zaqar.storage import errors as storage_errors +from zaqar.transport import acl +from zaqar.transport import utils +from zaqar.transport.wsgi import errors as wsgi_errors + + +LOG = logging.getLogger(__name__) + + +class Resource(object): + + __slots__ = '_topic_ctrl' + + def __init__(self, topic_controller): + self._topic_ctrl = topic_controller + + @decorators.TransportLog("Topics stats item") + @acl.enforce("topics:stats") + def on_get(self, req, resp, project_id, topic_name): + try: + resp_dict = self._topic_ctrl.stats(topic_name, + project=project_id) + + message_stats = resp_dict['messages'] + + if message_stats['total'] != 0: + base_path = req.path[:req.path.rindex('/')] + '/messages/' + + newest = message_stats['newest'] + newest['href'] = base_path + newest['id'] + del newest['id'] + + oldest = message_stats['oldest'] + oldest['href'] = base_path + oldest['id'] + del oldest['id'] + + resp.body = utils.to_json(resp_dict) + # status defaults to 200 + + except (storage_errors.TopicDoesNotExist, + storage_errors.TopicIsEmpty) as ex: + resp_dict = { + 'messages': { + 'claimed': 0, + 'free': 0, + 'total': 0 + } + } + resp.body = utils.to_json(resp_dict) + + except storage_errors.DoesNotExist as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPNotFound(six.text_type(ex)) + + except Exception as ex: + LOG.exception(ex) + description = _(u'Topic stats could not be read.') + raise wsgi_errors.HTTPServiceUnavailable(description)