Merge "Introduce the Topic resource into Zaqar-1"
This commit is contained in:
commit
cf01152b3a
@ -0,0 +1,8 @@
|
||||
---
|
||||
features:
|
||||
- Introduce a new resource called Topic into Zaqar.
|
||||
Topic is a concept from AWS Simple Notification Service (SNS), it will has
|
||||
relevance with subscriptions. User can send message to a topic, and then
|
||||
the subscribers will get the message according to different protocols,
|
||||
like http, email, sms, etc. This feature will help Zaqar to split
|
||||
Messaging Queue Service and Notification Service clearly.
|
@ -61,6 +61,15 @@ zaqar.storage.redis.driver.queue.stages =
|
||||
zaqar.storage.swift.driver.queue.stages =
|
||||
message_queue_handler = zaqar.storage.swift.messages:MessageQueueHandler
|
||||
|
||||
zaqar.storage.mongodb.driver.topic.stages =
|
||||
message_queue_handler = zaqar.storage.mongodb.topic_messages:MessageTopicHandler
|
||||
|
||||
zaqar.storage.redis.driver.topic.stages =
|
||||
message_queue_handler = zaqar.storage.redis.messages:MessageTopicHandler
|
||||
|
||||
zaqar.storage.swift.driver.topic.stages =
|
||||
message_queue_handler = zaqar.storage.swift.messages:MessageTopicHandler
|
||||
|
||||
zaqar.notification.tasks =
|
||||
http = zaqar.notification.tasks.webhook:WebhookTask
|
||||
https = zaqar.notification.tasks.webhook:WebhookTask
|
||||
|
@ -20,6 +20,7 @@ from zaqar.common.policies import messages
|
||||
from zaqar.common.policies import pools
|
||||
from zaqar.common.policies import queues
|
||||
from zaqar.common.policies import subscription
|
||||
from zaqar.common.policies import topics
|
||||
|
||||
|
||||
def list_rules():
|
||||
@ -31,5 +32,6 @@ def list_rules():
|
||||
messages.list_rules(),
|
||||
pools.list_rules(),
|
||||
queues.list_rules(),
|
||||
subscription.list_rules()
|
||||
subscription.list_rules(),
|
||||
topics.list_rules(),
|
||||
)
|
||||
|
101
zaqar/common/policies/topics.py
Normal file
101
zaqar/common/policies/topics.py
Normal file
@ -0,0 +1,101 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_policy import policy
|
||||
|
||||
from zaqar.common.policies import base
|
||||
|
||||
TOPICS = 'topics:%s'
|
||||
|
||||
rules = [
|
||||
policy.DocumentedRuleDefault(
|
||||
name=TOPICS % 'get_all',
|
||||
check_str=base.UNPROTECTED,
|
||||
description='List all topics.',
|
||||
operations=[
|
||||
{
|
||||
'path': '/v2/topics',
|
||||
'method': 'GET'
|
||||
}
|
||||
]
|
||||
),
|
||||
policy.DocumentedRuleDefault(
|
||||
name=TOPICS % 'create',
|
||||
check_str=base.UNPROTECTED,
|
||||
description='Create a topic.',
|
||||
operations=[
|
||||
{
|
||||
'path': '/v2/topics/{topic_name}',
|
||||
'method': 'PUT'
|
||||
}
|
||||
]
|
||||
),
|
||||
policy.DocumentedRuleDefault(
|
||||
name=TOPICS % 'get',
|
||||
check_str=base.UNPROTECTED,
|
||||
description='Get details about a specific topic.',
|
||||
operations=[
|
||||
{
|
||||
'path': '/v2/topics/{topic_name}',
|
||||
'method': 'GET'
|
||||
}
|
||||
]
|
||||
),
|
||||
policy.DocumentedRuleDefault(
|
||||
name=TOPICS % 'delete',
|
||||
check_str=base.UNPROTECTED,
|
||||
description='Delete a topic.',
|
||||
operations=[
|
||||
{
|
||||
'path': '/v2/topics/{topic_name}',
|
||||
'method': 'DELETE'
|
||||
}
|
||||
]
|
||||
),
|
||||
policy.DocumentedRuleDefault(
|
||||
name=TOPICS % 'update',
|
||||
check_str=base.UNPROTECTED,
|
||||
description='Update a topic.',
|
||||
operations=[
|
||||
{
|
||||
'path': '/v2/topics/{topic_name}',
|
||||
'method': 'PATCH'
|
||||
}
|
||||
]
|
||||
),
|
||||
policy.DocumentedRuleDefault(
|
||||
name=TOPICS % 'stats',
|
||||
check_str=base.UNPROTECTED,
|
||||
description='Get statistics about a specific topic.',
|
||||
operations=[
|
||||
{
|
||||
'path': '/v2/topics/{topic_name}/stats',
|
||||
'method': 'GET'
|
||||
}
|
||||
]
|
||||
),
|
||||
policy.DocumentedRuleDefault(
|
||||
name=TOPICS % 'purge',
|
||||
check_str=base.UNPROTECTED,
|
||||
description='Purge resources from a particular topic.',
|
||||
operations=[
|
||||
{
|
||||
'path': '/v2/topic/{topic_name}/purge',
|
||||
'method': 'POST'
|
||||
}
|
||||
]
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
def list_rules():
|
||||
return rules
|
@ -277,3 +277,44 @@ def inject_context(req, resp, params):
|
||||
project_domain_id=project_domain_id,
|
||||
user_domain_id=user_domain_id)
|
||||
req.env['zaqar.context'] = ctxt
|
||||
|
||||
|
||||
def validate_topic_identification(validate, req, resp, params):
|
||||
"""Hook for validating the topic name and project id in requests.
|
||||
|
||||
The queue name validation is short-circuited if 'topic_name' does
|
||||
not exist in `params`.
|
||||
|
||||
This hook depends on the `get_project` hook, which must be
|
||||
installed upstream.
|
||||
|
||||
|
||||
:param validate: A validator function that will
|
||||
be used to check the topic name against configured
|
||||
limits. functools.partial or a closure must be used to
|
||||
set this first arg, and expose the remaining ones as
|
||||
a Falcon hook interface.
|
||||
:param req: Falcon request object
|
||||
:param resp: Falcon response object
|
||||
:param params: Responder params dict
|
||||
"""
|
||||
|
||||
try:
|
||||
validate(params['topic_name'],
|
||||
params['project_id'])
|
||||
except KeyError:
|
||||
# NOTE(kgriffs): topic not in params, so nothing to do
|
||||
pass
|
||||
except validation.ValidationFailed:
|
||||
project = params['project_id']
|
||||
queue = params['topic_name']
|
||||
if six.PY2:
|
||||
queue = queue.decode('utf-8', 'replace')
|
||||
|
||||
LOG.debug(u'Invalid topic name "%(topic)s" submitted for '
|
||||
u'project: %(project)s',
|
||||
{'topic': queue, 'project': project})
|
||||
|
||||
raise falcon.HTTPBadRequest(_(u'Invalid topic identification'),
|
||||
_(u'The format of the submitted topic '
|
||||
u'name or project id is not valid.'))
|
||||
|
@ -44,12 +44,20 @@ subscription_pipeline = cfg.ListOpt(
|
||||
'controller methods.'))
|
||||
|
||||
|
||||
topic_pipeline = cfg.ListOpt(
|
||||
'topic_pipeline', default=[],
|
||||
help=_('Pipeline to use for processing topic operations. This '
|
||||
'pipeline will be consumed before calling the storage driver\'s '
|
||||
'controller methods.'))
|
||||
|
||||
|
||||
GROUP_NAME = 'storage'
|
||||
ALL_OPTS = [
|
||||
queue_pipeline,
|
||||
message_pipeline,
|
||||
claim_pipeline,
|
||||
subscription_pipeline
|
||||
subscription_pipeline,
|
||||
topic_pipeline
|
||||
]
|
||||
|
||||
|
||||
|
@ -27,10 +27,12 @@ Queue = base.Queue
|
||||
Subscription = base.Subscription
|
||||
PoolsBase = base.PoolsBase
|
||||
FlavorsBase = base.FlavorsBase
|
||||
Topic = base.Topic
|
||||
|
||||
DEFAULT_QUEUES_PER_PAGE = base.DEFAULT_QUEUES_PER_PAGE
|
||||
DEFAULT_MESSAGES_PER_PAGE = base.DEFAULT_MESSAGES_PER_PAGE
|
||||
DEFAULT_POOLS_PER_PAGE = base.DEFAULT_POOLS_PER_PAGE
|
||||
DEFAULT_SUBSCRIPTIONS_PER_PAGE = base.DEFAULT_SUBSCRIPTIONS_PER_PAGE
|
||||
DEFAULT_TOPICS_PER_PAGE = base.DEFAULT_TOPICS_PER_PAGE
|
||||
|
||||
DEFAULT_MESSAGES_PER_CLAIM = base.DEFAULT_MESSAGES_PER_CLAIM
|
||||
|
@ -35,6 +35,7 @@ DEFAULT_QUEUES_PER_PAGE = 10
|
||||
DEFAULT_MESSAGES_PER_PAGE = 10
|
||||
DEFAULT_POOLS_PER_PAGE = 10
|
||||
DEFAULT_SUBSCRIPTIONS_PER_PAGE = 10
|
||||
DEFAULT_TOPICS_PER_PAGE = 10
|
||||
|
||||
DEFAULT_MESSAGES_PER_CLAIM = 10
|
||||
|
||||
@ -242,6 +243,11 @@ class DataDriverBase(DriverBase):
|
||||
"""Returns the driver's subscription controller."""
|
||||
raise NotImplementedError
|
||||
|
||||
@decorators.lazy_property(write=False)
|
||||
def topic_controller(self):
|
||||
"""Returns the driver's topic controller."""
|
||||
return self.control_driver.topic_controller
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class ControlDriverBase(DriverBase):
|
||||
@ -281,6 +287,11 @@ class ControlDriverBase(DriverBase):
|
||||
"""Returns the driver's queue controller."""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractproperty
|
||||
def topic_controller(self):
|
||||
"""Returns the driver's topic controller."""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def close(self):
|
||||
"""Close connections to the backend."""
|
||||
@ -1094,3 +1105,113 @@ class FlavorsBase(ControllerBase):
|
||||
"""Deletes all flavors from storage."""
|
||||
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class Topic(ControllerBase):
|
||||
"""This class is responsible for managing topics.
|
||||
|
||||
Topic operations include CRUD, etc.
|
||||
|
||||
Storage driver implementations of this class should
|
||||
be capable of handling high workloads and huge
|
||||
numbers of topics.
|
||||
"""
|
||||
|
||||
def list(self, project=None, kfilter={}, marker=None,
|
||||
limit=DEFAULT_TOPICS_PER_PAGE, detailed=False, name=None):
|
||||
"""Base method for listing topics.
|
||||
|
||||
:param project: Project id
|
||||
:param kfilter: The key-value of metadata which user want to filter
|
||||
:param marker: The last topic name
|
||||
:param limit: (Default 10) Max number of topics to return
|
||||
:param detailed: Whether metadata is included
|
||||
:param name: The topic name which user want to filter
|
||||
|
||||
:returns: An iterator giving a sequence of topics
|
||||
and the marker of the next page.
|
||||
"""
|
||||
return self._list(project, kfilter, marker, limit, detailed, name)
|
||||
|
||||
_list = abc.abstractmethod(lambda x: None)
|
||||
|
||||
def get(self, name, project=None):
|
||||
"""Base method for topic metadata retrieval.
|
||||
|
||||
:param name: The topic name
|
||||
:param project: Project id
|
||||
|
||||
:returns: Dictionary containing topic metadata
|
||||
:raises DoesNotExist: if topic metadata does not exist
|
||||
"""
|
||||
return self._get(name, project)
|
||||
|
||||
_get = abc.abstractmethod(lambda x: None)
|
||||
|
||||
def get_metadata(self, name, project=None):
|
||||
"""Base method for topic metadata retrieval.
|
||||
|
||||
:param name: The topic name
|
||||
:param project: Project id
|
||||
|
||||
:returns: Dictionary containing topic metadata
|
||||
:raises DoesNotExist: if topic metadata does not exist
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def set_metadata(self, name, metadata, project=None):
|
||||
"""Base method for updating a topic metadata.
|
||||
|
||||
:param name: The topic name
|
||||
:param metadata: Topic metadata as a dict
|
||||
:param project: Project id
|
||||
:raises DoesNotExist: if topic metadata can not be updated
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def create(self, name, metadata=None, project=None):
|
||||
"""Base method for topic creation.
|
||||
|
||||
:param name: The topic name
|
||||
:param project: Project id
|
||||
:returns: True if a topic was created and False
|
||||
if it was updated.
|
||||
"""
|
||||
return self._create(name, metadata, project)
|
||||
|
||||
_create = abc.abstractmethod(lambda x: None)
|
||||
|
||||
def exists(self, name, project=None):
|
||||
"""Base method for testing topic existence.
|
||||
|
||||
:param name: The topic name
|
||||
:param project: Project id
|
||||
:returns: True if a topic exists and False
|
||||
if it does not.
|
||||
"""
|
||||
return self._exists(name, project)
|
||||
|
||||
_exists = abc.abstractmethod(lambda x: None)
|
||||
|
||||
def delete(self, name, project=None):
|
||||
"""Base method for deleting a topic.
|
||||
|
||||
:param name: The topic name
|
||||
:param project: Project id
|
||||
"""
|
||||
return self._delete(name, project)
|
||||
|
||||
_delete = abc.abstractmethod(lambda x: None)
|
||||
|
||||
def stats(self, name, project=None):
|
||||
"""Base method for topic stats.
|
||||
|
||||
:param name: The topic name
|
||||
:param project: Project id
|
||||
:returns: Dictionary with the
|
||||
queue stats
|
||||
"""
|
||||
return self._stats(name, project)
|
||||
|
||||
_stats = abc.abstractmethod(lambda x: None)
|
||||
|
@ -225,3 +225,19 @@ class SubscriptionAlreadyExists(Conflict):
|
||||
|
||||
msg_format = (u'Such subscription already exists. Subscriptions '
|
||||
u'are unique by project + queue + subscriber URI.')
|
||||
|
||||
|
||||
class TopicDoesNotExist(DoesNotExist):
|
||||
|
||||
msg_format = u'Topic {name} does not exist for project {project}'
|
||||
|
||||
def __init__(self, name, project):
|
||||
super(TopicDoesNotExist, self).__init__(name=name, project=project)
|
||||
|
||||
|
||||
class TopicIsEmpty(ExceptionBase):
|
||||
|
||||
msg_format = u'Topic {name} in project {project} is empty'
|
||||
|
||||
def __init__(self, name, project):
|
||||
super(TopicIsEmpty, self).__init__(name=name, project=project)
|
||||
|
@ -29,6 +29,7 @@ from zaqar.storage.mongodb import messages
|
||||
from zaqar.storage.mongodb import pools
|
||||
from zaqar.storage.mongodb import queues
|
||||
from zaqar.storage.mongodb import subscriptions
|
||||
from zaqar.storage.mongodb import topics
|
||||
|
||||
|
||||
CatalogueController = catalogue.CatalogueController
|
||||
@ -39,3 +40,4 @@ FIFOMessageController = messages.FIFOMessageController
|
||||
QueueController = queues.QueueController
|
||||
PoolsController = pools.PoolsController
|
||||
SubscriptionController = subscriptions.SubscriptionController
|
||||
TopicController = topics.TopicController
|
||||
|
@ -271,6 +271,17 @@ class ControlDriver(storage.ControlDriverBase):
|
||||
name = self.mongodb_conf.database + '_queues'
|
||||
return self.connection[name]
|
||||
|
||||
@decorators.lazy_property(write=False)
|
||||
def topics_database(self):
|
||||
"""Database dedicated to the "topics" collection.
|
||||
|
||||
The topics collection is separated out into its own database
|
||||
to avoid writer lock contention with the messages collections.
|
||||
"""
|
||||
|
||||
name = self.mongodb_conf.database + '_topics'
|
||||
return self.connection[name]
|
||||
|
||||
@decorators.lazy_property(write=False)
|
||||
def queue_controller(self):
|
||||
controller = controllers.QueueController(self)
|
||||
@ -308,3 +319,13 @@ class ControlDriver(storage.ControlDriverBase):
|
||||
return profiler.trace_cls("mongodb_flavors_controller")(controller)
|
||||
else:
|
||||
return controller
|
||||
|
||||
@decorators.lazy_property(write=False)
|
||||
def topic_controller(self):
|
||||
controller = controllers.TopicController(self)
|
||||
if (self.conf.profiler.enabled and
|
||||
(self.conf.profiler.trace_message_store or
|
||||
self.conf.profiler.trace_management_store)):
|
||||
return profiler.trace_cls("mongodb_topics_controller")(controller)
|
||||
else:
|
||||
return controller
|
||||
|
976
zaqar/storage/mongodb/topic_messages.py
Normal file
976
zaqar/storage/mongodb/topic_messages.py
Normal file
@ -0,0 +1,976 @@
|
||||
# Copyright (c) 2013 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Implements MongoDB the storage controller for messages.
|
||||
|
||||
Field Mappings:
|
||||
In order to reduce the disk / memory space used,
|
||||
field names will be, most of the time, the first
|
||||
letter of their long name.
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import time
|
||||
|
||||
from bson import errors as bsonerror
|
||||
from bson import objectid
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import timeutils
|
||||
import pymongo.errors
|
||||
import pymongo.read_preferences
|
||||
|
||||
from zaqar.i18n import _
|
||||
from zaqar import storage
|
||||
from zaqar.storage import errors
|
||||
from zaqar.storage.mongodb import utils
|
||||
from zaqar.storage import utils as s_utils
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
# NOTE(kgriffs): This value, in seconds, should be at least less than the
|
||||
# minimum allowed TTL for messages (60 seconds). Make it 45 to allow for
|
||||
# some fudge room.
|
||||
MAX_RETRY_POST_DURATION = 45
|
||||
|
||||
# NOTE(kgriffs): It is extremely unlikely that all workers would somehow hang
|
||||
# for more than 5 seconds, without a single one being able to succeed in
|
||||
# posting some messages and incrementing the counter, thus allowing the other
|
||||
# producers to succeed in turn.
|
||||
COUNTER_STALL_WINDOW = 5
|
||||
|
||||
# For hinting
|
||||
ID_INDEX_FIELDS = [('_id', 1)]
|
||||
|
||||
# For removing expired messages
|
||||
TTL_INDEX_FIELDS = [
|
||||
('e', 1),
|
||||
]
|
||||
|
||||
# to unify use of project/topic across mongodb
|
||||
# storage impls.
|
||||
PROJ_TOPIC = utils.PROJ_TOPIC_KEY
|
||||
|
||||
# NOTE(kgriffs): This index is for listing messages, usually
|
||||
# filtering out claimed ones.
|
||||
ACTIVE_INDEX_FIELDS = [
|
||||
(PROJ_TOPIC, 1), # Project will be unique, so put first
|
||||
('k', 1), # Used for sorting and paging, must come before range queries
|
||||
]
|
||||
|
||||
# For counting
|
||||
COUNTING_INDEX_FIELDS = [
|
||||
(PROJ_TOPIC, 1), # Project will be unique, so put first
|
||||
]
|
||||
|
||||
# This index is meant to be used as a shard-key and to ensure
|
||||
# uniqueness for markers.
|
||||
#
|
||||
# As for other compound indexes, order matters. The marker `k`
|
||||
# gives enough cardinality to ensure chunks are evenly distributed,
|
||||
# whereas the `p_q` field helps keeping chunks from the same project
|
||||
# and queue together.
|
||||
#
|
||||
# In a sharded environment, uniqueness of this index is still guaranteed
|
||||
# because it's used as a shard key.
|
||||
MARKER_INDEX_FIELDS = [
|
||||
('k', 1),
|
||||
(PROJ_TOPIC, 1),
|
||||
]
|
||||
|
||||
TRANSACTION_INDEX_FIELDS = [
|
||||
('tx', 1),
|
||||
]
|
||||
|
||||
|
||||
class MessageController(storage.Message):
|
||||
"""Implements message resource operations using MongoDB.
|
||||
|
||||
Messages are scoped by project + topic.
|
||||
|
||||
::
|
||||
|
||||
Messages:
|
||||
Name Field
|
||||
-------------------------
|
||||
scope -> p_t
|
||||
ttl -> t
|
||||
expires -> e
|
||||
marker -> k
|
||||
body -> b
|
||||
client uuid -> u
|
||||
transaction -> tx
|
||||
delay -> d
|
||||
checksum -> cs
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(MessageController, self).__init__(*args, **kwargs)
|
||||
|
||||
# Cache for convenience and performance
|
||||
self._num_partitions = self.driver.mongodb_conf.partitions
|
||||
self._topic_ctrl = self.driver.topic_controller
|
||||
self._retry_range = range(self.driver.mongodb_conf.max_attempts)
|
||||
|
||||
# Create a list of 'messages' collections, one for each database
|
||||
# partition, ordered by partition number.
|
||||
#
|
||||
# NOTE(kgriffs): Order matters, since it is used to lookup the
|
||||
# collection by partition number. For example, self._collections[2]
|
||||
# would provide access to zaqar_p2.messages (partition numbers are
|
||||
# zero-based).
|
||||
self._collections = [db.messages
|
||||
for db in self.driver.message_databases]
|
||||
|
||||
# Ensure indexes are initialized before any queries are performed
|
||||
for collection in self._collections:
|
||||
self._ensure_indexes(collection)
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ----------------------------------------------------------------------
|
||||
|
||||
def _ensure_indexes(self, collection):
|
||||
"""Ensures that all indexes are created."""
|
||||
|
||||
collection.ensure_index(TTL_INDEX_FIELDS,
|
||||
name='ttl',
|
||||
expireAfterSeconds=0,
|
||||
background=True)
|
||||
|
||||
collection.ensure_index(ACTIVE_INDEX_FIELDS,
|
||||
name='active',
|
||||
background=True)
|
||||
|
||||
collection.ensure_index(COUNTING_INDEX_FIELDS,
|
||||
name='counting',
|
||||
background=True)
|
||||
|
||||
collection.ensure_index(MARKER_INDEX_FIELDS,
|
||||
name='queue_marker',
|
||||
background=True)
|
||||
|
||||
collection.ensure_index(TRANSACTION_INDEX_FIELDS,
|
||||
name='transaction',
|
||||
background=True)
|
||||
|
||||
def _collection(self, topic_name, project=None):
|
||||
"""Get a partitioned collection instance."""
|
||||
return self._collections[utils.get_partition(self._num_partitions,
|
||||
topic_name, project)]
|
||||
|
||||
def _backoff_sleep(self, attempt):
|
||||
"""Sleep between retries using a jitter algorithm.
|
||||
|
||||
Mitigates thrashing between multiple parallel requests, and
|
||||
creates backpressure on clients to slow down the rate
|
||||
at which they submit requests.
|
||||
|
||||
:param attempt: current attempt number, zero-based
|
||||
"""
|
||||
conf = self.driver.mongodb_conf
|
||||
seconds = utils.calculate_backoff(attempt, conf.max_attempts,
|
||||
conf.max_retry_sleep,
|
||||
conf.max_retry_jitter)
|
||||
|
||||
time.sleep(seconds)
|
||||
|
||||
def _purge_topic(self, topic_name, project=None):
|
||||
"""Removes all messages from the queue.
|
||||
|
||||
Warning: Only use this when deleting the queue; otherwise
|
||||
you can cause a side-effect of reseting the marker counter
|
||||
which can cause clients to miss tons of messages.
|
||||
|
||||
If the queue does not exist, this method fails silently.
|
||||
|
||||
:param topic_name: name of the queue to purge
|
||||
:param project: ID of the project to which the queue belongs
|
||||
"""
|
||||
scope = utils.scope_queue_name(topic_name, project)
|
||||
collection = self._collection(topic_name, project)
|
||||
collection.delete_many({PROJ_TOPIC: scope})
|
||||
|
||||
def _list(self, topic_name, project=None, marker=None,
|
||||
echo=False, client_uuid=None, projection=None,
|
||||
include_claimed=False, include_delayed=False,
|
||||
sort=1, limit=None):
|
||||
"""Message document listing helper.
|
||||
|
||||
:param topic_name: Name of the topic to list
|
||||
:param project: (Default None) Project `topic_name` belongs to. If
|
||||
not specified, queries the "global" namespace/project.
|
||||
:param marker: (Default None) Message marker from which to start
|
||||
iterating. If not specified, starts with the first message
|
||||
available in the topic.
|
||||
:param echo: (Default False) Whether to return messages that match
|
||||
client_uuid
|
||||
:param client_uuid: (Default None) UUID for the client that
|
||||
originated this request
|
||||
:param projection: (Default None) a list of field names that should be
|
||||
returned in the result set or a dict specifying the fields to
|
||||
include or exclude
|
||||
:param include_claimed: (Default False) Whether to include
|
||||
claimed messages, not just active ones
|
||||
:param include_delayed: (Default False) Whether to include
|
||||
delayed messages, not just active ones
|
||||
:param sort: (Default 1) Sort order for the listing. Pass 1 for
|
||||
ascending (oldest message first), or -1 for descending (newest
|
||||
message first).
|
||||
:param limit: (Default None) The maximum number of messages
|
||||
to list. The results may include fewer messages than the
|
||||
requested `limit` if not enough are available. If limit is
|
||||
not specified
|
||||
|
||||
:returns: Generator yielding up to `limit` messages.
|
||||
"""
|
||||
|
||||
if sort not in (1, -1):
|
||||
raise ValueError(u'sort must be either 1 (ascending) '
|
||||
u'or -1 (descending)')
|
||||
|
||||
now = timeutils.utcnow_ts()
|
||||
|
||||
query = {
|
||||
# Messages must belong to this topic and project.
|
||||
PROJ_TOPIC: utils.scope_queue_name(topic_name, project),
|
||||
|
||||
# NOTE(kgriffs): Messages must be finalized (i.e., must not
|
||||
# be part of an unfinalized transaction).
|
||||
#
|
||||
# See also the note wrt 'tx' within the definition
|
||||
# of ACTIVE_INDEX_FIELDS.
|
||||
'tx': None,
|
||||
}
|
||||
|
||||
if not echo:
|
||||
query['u'] = {'$ne': client_uuid}
|
||||
|
||||
if marker is not None:
|
||||
query['k'] = {'$gt': marker}
|
||||
|
||||
collection = self._collection(topic_name, project)
|
||||
|
||||
if not include_delayed:
|
||||
# NOTE(cdyangzhenyu): Only include messages that are not
|
||||
# part of any delay, or are part of an expired delay. if
|
||||
# the message has no attribute 'd', it will also be obtained.
|
||||
# This is for compatibility with old data.
|
||||
query['$or'] = [{'d': {'$lte': now}},
|
||||
{'d': {'$exists': False}}]
|
||||
|
||||
# Construct the request
|
||||
cursor = collection.find(query,
|
||||
projection=projection,
|
||||
sort=[('k', sort)])
|
||||
|
||||
if limit is not None:
|
||||
cursor.limit(limit)
|
||||
|
||||
# NOTE(flaper87): Suggest the index to use for this query to
|
||||
# ensure the most performant one is chosen.
|
||||
return cursor.hint(ACTIVE_INDEX_FIELDS)
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# "Friends" interface
|
||||
# ----------------------------------------------------------------------
|
||||
|
||||
def _count(self, topic_name, project=None, include_claimed=False):
|
||||
"""Return total number of messages in a topic.
|
||||
|
||||
This method is designed to very quickly count the number
|
||||
of messages in a given topic. Expired messages are not
|
||||
counted, of course. If the queue does not exist, the
|
||||
count will always be 0.
|
||||
|
||||
Note: Some expired messages may be included in the count if
|
||||
they haven't been GC'd yet. This is done for performance.
|
||||
"""
|
||||
query = {
|
||||
# Messages must belong to this queue and project.
|
||||
PROJ_TOPIC: utils.scope_queue_name(topic_name, project),
|
||||
|
||||
# NOTE(kgriffs): Messages must be finalized (i.e., must not
|
||||
# be part of an unfinalized transaction).
|
||||
#
|
||||
# See also the note wrt 'tx' within the definition
|
||||
# of ACTIVE_INDEX_FIELDS.
|
||||
'tx': None,
|
||||
}
|
||||
|
||||
collection = self._collection(topic_name, project)
|
||||
return collection.count(filter=query, hint=COUNTING_INDEX_FIELDS)
|
||||
|
||||
def _active(self, topic_name, marker=None, echo=False,
|
||||
client_uuid=None, projection=None, project=None,
|
||||
limit=None, include_delayed=False):
|
||||
|
||||
return self._list(topic_name, project=project, marker=marker,
|
||||
echo=echo, client_uuid=client_uuid,
|
||||
projection=projection, include_claimed=False,
|
||||
include_delayed=include_delayed, limit=limit)
|
||||
|
||||
def _inc_counter(self, topic_name, project=None, amount=1, window=None):
|
||||
"""Increments the message counter and returns the new value.
|
||||
|
||||
:param topic_name: Name of the topic to which the counter is scoped
|
||||
:param project: Queue's project name
|
||||
:param amount: (Default 1) Amount by which to increment the counter
|
||||
:param window: (Default None) A time window, in seconds, that
|
||||
must have elapsed since the counter was last updated, in
|
||||
order to increment the counter.
|
||||
|
||||
:returns: Updated message counter value, or None if window
|
||||
was specified, and the counter has already been updated
|
||||
within the specified time period.
|
||||
|
||||
:raises QueueDoesNotExist: if not found
|
||||
"""
|
||||
|
||||
# NOTE(flaper87): If this `if` is True, it means we're
|
||||
# using a mongodb in the control plane. To avoid breaking
|
||||
# environments doing so already, we'll keep using the counter
|
||||
# in the mongodb topic_controller rather than the one in the
|
||||
# message_controller. This should go away, eventually
|
||||
if hasattr(self._topic_ctrl, '_inc_counter'):
|
||||
return self._topic_ctrl._inc_counter(topic_name, project,
|
||||
amount, window)
|
||||
|
||||
now = timeutils.utcnow_ts()
|
||||
|
||||
update = {'$inc': {'c.v': amount}, '$set': {'c.t': now}}
|
||||
query = _get_scoped_query(topic_name, project)
|
||||
if window is not None:
|
||||
threshold = now - window
|
||||
query['c.t'] = {'$lt': threshold}
|
||||
|
||||
while True:
|
||||
try:
|
||||
collection = self._collection(topic_name, project).stats
|
||||
doc = collection.find_one_and_update(
|
||||
query, update,
|
||||
return_document=pymongo.ReturnDocument.AFTER,
|
||||
projection={'c.v': 1, '_id': 0})
|
||||
|
||||
break
|
||||
except pymongo.errors.AutoReconnect as ex:
|
||||
LOG.exception(ex)
|
||||
|
||||
if doc is None:
|
||||
if window is None:
|
||||
# NOTE(kgriffs): Since we did not filter by a time window,
|
||||
# the topic should have been found and updated. Perhaps
|
||||
# the topic has been deleted?
|
||||
message = (u'Failed to increment the message '
|
||||
u'counter for topic %(name)s and '
|
||||
u'project %(project)s')
|
||||
message %= dict(name=topic_name, project=project)
|
||||
|
||||
LOG.warning(message)
|
||||
|
||||
raise errors.TopicDoesNotExist(topic_name, project)
|
||||
|
||||
# NOTE(kgriffs): Assume the queue existed, but the counter
|
||||
# was recently updated, causing the range query on 'c.t' to
|
||||
# exclude the record.
|
||||
return None
|
||||
|
||||
return doc['c']['v']
|
||||
|
||||
def _get_counter(self, topic_name, project=None):
|
||||
"""Retrieves the current message counter value for a given topic.
|
||||
|
||||
This helper is used to generate monotonic pagination
|
||||
markers that are saved as part of the message
|
||||
document.
|
||||
|
||||
Note 1: Markers are scoped per-queue and so are *not*
|
||||
globally unique or globally ordered.
|
||||
|
||||
Note 2: If two or more requests to this method are made
|
||||
in parallel, this method will return the same counter
|
||||
value. This is done intentionally so that the caller
|
||||
can detect a parallel message post, allowing it to
|
||||
mitigate race conditions between producer and
|
||||
observer clients.
|
||||
|
||||
:param topic_name: Name of the topic to which the counter is scoped
|
||||
:param project: Topic's project
|
||||
:returns: current message counter as an integer
|
||||
"""
|
||||
|
||||
# NOTE(flaper87): If this `if` is True, it means we're
|
||||
# using a mongodb in the control plane. To avoid breaking
|
||||
# environments doing so already, we'll keep using the counter
|
||||
# in the mongodb queue_controller rather than the one in the
|
||||
# message_controller. This should go away, eventually
|
||||
if hasattr(self._topic_ctrl, '_get_counter'):
|
||||
return self._topic_ctrl._get_counter(topic_name, project)
|
||||
|
||||
update = {'$inc': {'c.v': 0, 'c.t': 0}}
|
||||
query = _get_scoped_query(topic_name, project)
|
||||
|
||||
try:
|
||||
collection = self._collection(topic_name, project).stats
|
||||
doc = collection.find_one_and_update(
|
||||
query, update, upsert=True,
|
||||
return_document=pymongo.ReturnDocument.AFTER,
|
||||
projection={'c.v': 1, '_id': 0})
|
||||
|
||||
return doc['c']['v']
|
||||
except pymongo.errors.AutoReconnect as ex:
|
||||
LOG.exception(ex)
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# Public interface
|
||||
# ----------------------------------------------------------------------
|
||||
|
||||
def list(self, topic_name, project=None, marker=None,
|
||||
limit=storage.DEFAULT_MESSAGES_PER_PAGE,
|
||||
echo=False, client_uuid=None, include_claimed=False,
|
||||
include_delayed=False):
|
||||
|
||||
if marker is not None:
|
||||
try:
|
||||
marker = int(marker)
|
||||
except ValueError:
|
||||
yield iter([])
|
||||
|
||||
messages = self._list(topic_name, project=project, marker=marker,
|
||||
client_uuid=client_uuid, echo=echo,
|
||||
include_claimed=include_claimed,
|
||||
include_delayed=include_delayed, limit=limit)
|
||||
|
||||
marker_id = {}
|
||||
|
||||
now = timeutils.utcnow_ts()
|
||||
|
||||
# NOTE (kgriffs) @utils.raises_conn_error not needed on this
|
||||
# function, since utils.HookedCursor already has it.
|
||||
def denormalizer(msg):
|
||||
marker_id['next'] = msg['k']
|
||||
|
||||
return _basic_message(msg, now)
|
||||
|
||||
yield utils.HookedCursor(messages, denormalizer)
|
||||
yield str(marker_id['next'])
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def first(self, topic_name, project=None, sort=1):
|
||||
cursor = self._list(topic_name, project=project,
|
||||
include_claimed=True, sort=sort,
|
||||
limit=1)
|
||||
try:
|
||||
message = next(cursor)
|
||||
except StopIteration:
|
||||
raise errors.TopicIsEmpty(topic_name, project)
|
||||
|
||||
now = timeutils.utcnow_ts()
|
||||
return _basic_message(message, now)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def get(self, topic_name, message_id, project=None):
|
||||
mid = utils.to_oid(message_id)
|
||||
if mid is None:
|
||||
raise errors.MessageDoesNotExist(message_id, topic_name,
|
||||
project)
|
||||
|
||||
now = timeutils.utcnow_ts()
|
||||
|
||||
query = {
|
||||
'_id': mid,
|
||||
PROJ_TOPIC: utils.scope_queue_name(topic_name, project),
|
||||
}
|
||||
|
||||
collection = self._collection(topic_name, project)
|
||||
message = list(collection.find(query).limit(1).hint(ID_INDEX_FIELDS))
|
||||
|
||||
if not message:
|
||||
raise errors.MessageDoesNotExist(message_id, topic_name,
|
||||
project)
|
||||
|
||||
return _basic_message(message[0], now)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def bulk_get(self, topic_name, message_ids, project=None):
|
||||
message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid]
|
||||
if not message_ids:
|
||||
return iter([])
|
||||
|
||||
now = timeutils.utcnow_ts()
|
||||
|
||||
# Base query, always check expire time
|
||||
query = {
|
||||
'_id': {'$in': message_ids},
|
||||
PROJ_TOPIC: utils.scope_queue_name(topic_name, project),
|
||||
}
|
||||
|
||||
collection = self._collection(topic_name, project)
|
||||
|
||||
# NOTE(flaper87): Should this query
|
||||
# be sorted?
|
||||
messages = collection.find(query).hint(ID_INDEX_FIELDS)
|
||||
|
||||
def denormalizer(msg):
|
||||
return _basic_message(msg, now)
|
||||
|
||||
return utils.HookedCursor(messages, denormalizer)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def post(self, topic_name, messages, client_uuid, project=None):
|
||||
# NOTE(flaper87): This method should be safe to retry on
|
||||
# autoreconnect, since we've a 2-step insert for messages.
|
||||
# The worst-case scenario is that we'll increase the counter
|
||||
# several times and we'd end up with some non-active messages.
|
||||
|
||||
if not self._topic_ctrl.exists(topic_name, project):
|
||||
raise errors.TopicDoesNotExist(topic_name, project)
|
||||
|
||||
# NOTE(flaper87): Make sure the counter exists. This method
|
||||
# is an upsert.
|
||||
self._get_counter(topic_name, project)
|
||||
now = timeutils.utcnow_ts()
|
||||
now_dt = datetime.datetime.utcfromtimestamp(now)
|
||||
collection = self._collection(topic_name, project)
|
||||
|
||||
messages = list(messages)
|
||||
msgs_n = len(messages)
|
||||
next_marker = self._inc_counter(topic_name,
|
||||
project,
|
||||
amount=msgs_n) - msgs_n
|
||||
|
||||
prepared_messages = []
|
||||
for index, message in enumerate(messages):
|
||||
msg = {
|
||||
PROJ_TOPIC: utils.scope_queue_name(topic_name, project),
|
||||
't': message['ttl'],
|
||||
'e': now_dt + datetime.timedelta(seconds=message['ttl']),
|
||||
'u': client_uuid,
|
||||
'd': now + message.get('delay', 0),
|
||||
'b': message['body'] if 'body' in message else {},
|
||||
'k': next_marker + index,
|
||||
'tx': None
|
||||
}
|
||||
if self.driver.conf.enable_checksum:
|
||||
msg['cs'] = s_utils.get_checksum(message.get('body', None))
|
||||
|
||||
prepared_messages.append(msg)
|
||||
|
||||
res = collection.insert_many(prepared_messages,
|
||||
bypass_document_validation=True)
|
||||
|
||||
return [str(id_) for id_ in res.inserted_ids]
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def delete(self, topic_name, message_id, project=None, claim=None):
|
||||
# NOTE(cpp-cabrera): return early - this is an invalid message
|
||||
# id so we won't be able to find it any way
|
||||
mid = utils.to_oid(message_id)
|
||||
if mid is None:
|
||||
return
|
||||
|
||||
collection = self._collection(topic_name, project)
|
||||
|
||||
query = {
|
||||
'_id': mid,
|
||||
PROJ_TOPIC: utils.scope_queue_name(topic_name, project),
|
||||
}
|
||||
|
||||
cid = utils.to_oid(claim)
|
||||
if cid is None:
|
||||
raise errors.ClaimDoesNotExist(claim, topic_name, project)
|
||||
|
||||
now = timeutils.utcnow_ts()
|
||||
cursor = collection.find(query).hint(ID_INDEX_FIELDS)
|
||||
|
||||
try:
|
||||
message = next(cursor)
|
||||
except StopIteration:
|
||||
return
|
||||
|
||||
if claim is None:
|
||||
if _is_claimed(message, now):
|
||||
raise errors.MessageIsClaimed(message_id)
|
||||
|
||||
else:
|
||||
if message['c']['id'] != cid:
|
||||
kwargs = {}
|
||||
# NOTE(flaper87): In pymongo 3.0 PRIMARY is the default and
|
||||
# `read_preference` is read only. We'd need to set it when the
|
||||
# client is created.
|
||||
# NOTE(kgriffs): Read from primary in case the message
|
||||
# was just barely claimed, and claim hasn't made it to
|
||||
# the secondary.
|
||||
message = collection.find_one(query, **kwargs)
|
||||
|
||||
if message['c']['id'] != cid:
|
||||
if _is_claimed(message, now):
|
||||
raise errors.MessageNotClaimedBy(message_id, claim)
|
||||
|
||||
raise errors.MessageNotClaimed(message_id)
|
||||
|
||||
collection.delete_one(query)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def bulk_delete(self, topic_name, message_ids, project=None,
|
||||
claim_ids=None):
|
||||
message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid]
|
||||
if claim_ids:
|
||||
claim_ids = [cid for cid in map(utils.to_oid, claim_ids) if cid]
|
||||
query = {
|
||||
'_id': {'$in': message_ids},
|
||||
PROJ_TOPIC: utils.scope_queue_name(topic_name, project),
|
||||
}
|
||||
|
||||
collection = self._collection(topic_name, project)
|
||||
if claim_ids:
|
||||
message_claim_ids = []
|
||||
messages = collection.find(query).hint(ID_INDEX_FIELDS)
|
||||
for message in messages:
|
||||
message_claim_ids.append(message['c']['id'])
|
||||
for cid in claim_ids:
|
||||
if cid not in message_claim_ids:
|
||||
raise errors.ClaimDoesNotExist(cid, topic_name, project)
|
||||
|
||||
collection.delete_many(query)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def pop(self, topic_name, limit, project=None):
|
||||
query = {
|
||||
PROJ_TOPIC: utils.scope_queue_name(topic_name, project),
|
||||
}
|
||||
|
||||
# Only include messages that are not part of
|
||||
# any claim, or are part of an expired claim.
|
||||
now = timeutils.utcnow_ts()
|
||||
query['c.e'] = {'$lte': now}
|
||||
|
||||
collection = self._collection(topic_name, project)
|
||||
projection = {'_id': 1, 't': 1, 'b': 1, 'c.id': 1}
|
||||
|
||||
messages = (collection.find_one_and_delete(query,
|
||||
projection=projection)
|
||||
for _ in range(limit))
|
||||
|
||||
final_messages = [_basic_message(message, now)
|
||||
for message in messages
|
||||
if message]
|
||||
|
||||
return final_messages
|
||||
|
||||
|
||||
class FIFOMessageController(MessageController):
|
||||
|
||||
def _ensure_indexes(self, collection):
|
||||
"""Ensures that all indexes are created."""
|
||||
|
||||
collection.ensure_index(TTL_INDEX_FIELDS,
|
||||
name='ttl',
|
||||
expireAfterSeconds=0,
|
||||
background=True)
|
||||
|
||||
collection.ensure_index(ACTIVE_INDEX_FIELDS,
|
||||
name='active',
|
||||
background=True)
|
||||
|
||||
collection.ensure_index(COUNTING_INDEX_FIELDS,
|
||||
name='counting',
|
||||
background=True)
|
||||
|
||||
# NOTE(kgriffs): This index must be unique so that
|
||||
# inserting a message with the same marker to the
|
||||
# same queue will fail; this is used to detect a
|
||||
# race condition which can cause an observer client
|
||||
# to miss a message when there is more than one
|
||||
# producer posting messages to the same queue, in
|
||||
# parallel.
|
||||
collection.ensure_index(MARKER_INDEX_FIELDS,
|
||||
name='queue_marker',
|
||||
unique=True,
|
||||
background=True)
|
||||
|
||||
collection.ensure_index(TRANSACTION_INDEX_FIELDS,
|
||||
name='transaction',
|
||||
background=True)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def post(self, topic_name, messages, client_uuid, project=None):
|
||||
# NOTE(flaper87): This method should be safe to retry on
|
||||
# autoreconnect, since we've a 2-step insert for messages.
|
||||
# The worst-case scenario is that we'll increase the counter
|
||||
# several times and we'd end up with some non-active messages.
|
||||
|
||||
if not self._topic_ctrl.exists(topic_name, project):
|
||||
raise errors.TopicDoesNotExist(topic_name, project)
|
||||
|
||||
# NOTE(flaper87): Make sure the counter exists. This method
|
||||
# is an upsert.
|
||||
self._get_counter(topic_name, project)
|
||||
now = timeutils.utcnow_ts()
|
||||
now_dt = datetime.datetime.utcfromtimestamp(now)
|
||||
collection = self._collection(topic_name, project)
|
||||
|
||||
# Set the next basis marker for the first attempt.
|
||||
#
|
||||
# Note that we don't increment the counter right away because
|
||||
# if 2 concurrent posts happen and the one with the higher counter
|
||||
# ends before the one with the lower counter, there's a window
|
||||
# where a client paging through the queue may get the messages
|
||||
# with the higher counter and skip the previous ones. This would
|
||||
# make our FIFO guarantee unsound.
|
||||
next_marker = self._get_counter(topic_name, project)
|
||||
|
||||
# Unique transaction ID to facilitate atomic batch inserts
|
||||
transaction = objectid.ObjectId()
|
||||
|
||||
prepared_messages = []
|
||||
for index, message in enumerate(messages):
|
||||
msg = {
|
||||
PROJ_TOPIC: utils.scope_queue_name(topic_name, project),
|
||||
't': message['ttl'],
|
||||
'e': now_dt + datetime.timedelta(seconds=message['ttl']),
|
||||
'u': client_uuid,
|
||||
'd': now + message.get('delay', 0),
|
||||
'b': message['body'] if 'body' in message else {},
|
||||
'k': next_marker + index,
|
||||
'tx': None
|
||||
}
|
||||
if self.driver.conf.enable_checksum:
|
||||
msg['cs'] = s_utils.get_checksum(message.get('body', None))
|
||||
|
||||
prepared_messages.append(msg)
|
||||
|
||||
# NOTE(kgriffs): Don't take the time to do a 2-phase insert
|
||||
# if there is no way for it to partially succeed.
|
||||
if len(prepared_messages) == 1:
|
||||
transaction = None
|
||||
prepared_messages[0]['tx'] = None
|
||||
|
||||
# Use a retry range for sanity, although we expect
|
||||
# to rarely, if ever, reach the maximum number of
|
||||
# retries.
|
||||
#
|
||||
# NOTE(kgriffs): With the default configuration (100 ms
|
||||
# max sleep, 1000 max attempts), the max stall time
|
||||
# before the operation is abandoned is 49.95 seconds.
|
||||
for attempt in self._retry_range:
|
||||
try:
|
||||
res = collection.insert_many(prepared_messages,
|
||||
bypass_document_validation=True)
|
||||
|
||||
# Log a message if we retried, for debugging perf issues
|
||||
if attempt != 0:
|
||||
msgtmpl = _(u'%(attempts)d attempt(s) required to post '
|
||||
u'%(num_messages)d messages to queue '
|
||||
u'"%(topic)s" under project %(project)s')
|
||||
|
||||
LOG.debug(msgtmpl,
|
||||
dict(topic=topic_name,
|
||||
attempts=attempt + 1,
|
||||
num_messages=len(res.inserted_ids),
|
||||
project=project))
|
||||
|
||||
# Update the counter in preparation for the next batch
|
||||
#
|
||||
# NOTE(kgriffs): Due to the unique index on the messages
|
||||
# collection, competing inserts will fail as a whole,
|
||||
# and keep retrying until the counter is incremented
|
||||
# such that the competing marker's will start at a
|
||||
# unique number, 1 past the max of the messages just
|
||||
# inserted above.
|
||||
self._inc_counter(topic_name, project,
|
||||
amount=len(res.inserted_ids))
|
||||
|
||||
# NOTE(kgriffs): Finalize the insert once we can say that
|
||||
# all the messages made it. This makes bulk inserts
|
||||
# atomic, assuming queries filter out any non-finalized
|
||||
# messages.
|
||||
if transaction is not None:
|
||||
collection.update_many({'tx': transaction},
|
||||
{'$set': {'tx': None}},
|
||||
upsert=False)
|
||||
|
||||
return [str(id_) for id_ in res.inserted_ids]
|
||||
|
||||
except (pymongo.errors.DuplicateKeyError,
|
||||
pymongo.errors.BulkWriteError) as ex:
|
||||
# TODO(kgriffs): Record stats of how often retries happen,
|
||||
# and how many attempts, on average, are required to insert
|
||||
# messages.
|
||||
|
||||
# NOTE(kgriffs): This can be used in conjunction with the
|
||||
# log line, above, that is emitted after all messages have
|
||||
# been posted, to gauge how long it is taking for messages
|
||||
# to be posted to a given topic, or overall.
|
||||
#
|
||||
# TODO(kgriffs): Add transaction ID to help match up loglines
|
||||
if attempt == 0:
|
||||
msgtmpl = _(u'First attempt failed while '
|
||||
u'adding messages to topic '
|
||||
u'"%(topic)s" under project %(project)s')
|
||||
|
||||
LOG.debug(msgtmpl, dict(topic=topic_name, project=project))
|
||||
|
||||
# NOTE(kgriffs): Never retry past the point that competing
|
||||
# messages expire and are GC'd, since once they are gone,
|
||||
# the unique index no longer protects us from getting out
|
||||
# of order, which could cause an observer to miss this
|
||||
# message. The code below provides a sanity-check to ensure
|
||||
# this situation can not happen.
|
||||
elapsed = timeutils.utcnow_ts() - now
|
||||
if elapsed > MAX_RETRY_POST_DURATION:
|
||||
msgtmpl = (u'Exceeded maximum retry duration for topic '
|
||||
u'"%(topic)s" under project %(project)s')
|
||||
|
||||
LOG.warning(msgtmpl,
|
||||
dict(topic=topic_name, project=project))
|
||||
break
|
||||
|
||||
# Chill out for a moment to mitigate thrashing/thundering
|
||||
self._backoff_sleep(attempt)
|
||||
|
||||
# NOTE(kgriffs): Perhaps we failed because a worker crashed
|
||||
# after inserting messages, but before incrementing the
|
||||
# counter; that would cause all future requests to stall,
|
||||
# since they would keep getting the same base marker that is
|
||||
# conflicting with existing messages, until the messages that
|
||||
# "won" expire, at which time we would end up reusing markers,
|
||||
# and that could make some messages invisible to an observer
|
||||
# that is querying with a marker that is large than the ones
|
||||
# being reused.
|
||||
#
|
||||
# To mitigate this, we apply a heuristic to determine whether
|
||||
# a counter has stalled. We attempt to increment the counter,
|
||||
# but only if it hasn't been updated for a few seconds, which
|
||||
# should mean that nobody is left to update it!
|
||||
#
|
||||
# Note that we increment one at a time until the logjam is
|
||||
# broken, since we don't know how many messages were posted
|
||||
# by the worker before it crashed.
|
||||
next_marker = self._inc_counter(
|
||||
topic_name, project, window=COUNTER_STALL_WINDOW)
|
||||
|
||||
# Retry the entire batch with a new sequence of markers.
|
||||
#
|
||||
# NOTE(kgriffs): Due to the unique index, and how
|
||||
# MongoDB works with batch requests, we will never
|
||||
# end up with a partially-successful update. The first
|
||||
# document in the batch will fail to insert, and the
|
||||
# remainder of the documents will not be attempted.
|
||||
if next_marker is None:
|
||||
# NOTE(kgriffs): Usually we will end up here, since
|
||||
# it should be rare that a counter becomes stalled.
|
||||
next_marker = self._get_counter(
|
||||
topic_name, project)
|
||||
else:
|
||||
msgtmpl = (u'Detected a stalled message counter '
|
||||
u'for topic "%(topic)s" under '
|
||||
u'project %(project)s.'
|
||||
u'The counter was incremented to %(value)d.')
|
||||
|
||||
LOG.warning(msgtmpl,
|
||||
dict(topic=topic_name,
|
||||
project=project,
|
||||
value=next_marker))
|
||||
|
||||
for index, message in enumerate(prepared_messages):
|
||||
message['k'] = next_marker + index
|
||||
except bsonerror.InvalidDocument as ex:
|
||||
LOG.exception(ex)
|
||||
raise
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
raise
|
||||
|
||||
msgtmpl = (u'Hit maximum number of attempts (%(max)s) for topic '
|
||||
u'"%(topic)s" under project %(project)s')
|
||||
|
||||
LOG.warning(msgtmpl,
|
||||
dict(max=self.driver.mongodb_conf.max_attempts,
|
||||
topic=topic_name,
|
||||
project=project))
|
||||
|
||||
raise errors.MessageConflict(topic_name, project)
|
||||
|
||||
|
||||
def _is_claimed(msg, now):
|
||||
return (msg['c']['id'] is not None and
|
||||
msg['c']['e'] > now)
|
||||
|
||||
|
||||
def _basic_message(msg, now):
|
||||
oid = msg['_id']
|
||||
age = now - utils.oid_ts(oid)
|
||||
res = {
|
||||
'id': str(oid),
|
||||
'age': int(age),
|
||||
'ttl': msg['t'],
|
||||
'body': msg['b']
|
||||
}
|
||||
if msg.get('cs'):
|
||||
res['checksum'] = msg.get('cs')
|
||||
|
||||
return res
|
||||
|
||||
|
||||
class MessageTopicHandler(object):
|
||||
|
||||
def __init__(self, driver, control_driver):
|
||||
self.driver = driver
|
||||
self._cache = self.driver.cache
|
||||
self.topic_controller = self.driver.topic_controller
|
||||
self.message_controller = self.driver.message_controller
|
||||
|
||||
def delete(self, topic_name, project=None):
|
||||
self.message_controller._purge_queue(topic_name, project)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def stats(self, name, project=None):
|
||||
if not self.topic_controller.exists(name, project=project):
|
||||
raise errors.TopicDoesNotExist(name, project)
|
||||
|
||||
controller = self.message_controller
|
||||
|
||||
total = controller._count(name, project=project,
|
||||
include_claimed=True)
|
||||
|
||||
message_stats = {
|
||||
'total': total,
|
||||
}
|
||||
|
||||
try:
|
||||
oldest = controller.first(name, project=project, sort=1)
|
||||
newest = controller.first(name, project=project, sort=-1)
|
||||
except errors.QueueIsEmpty:
|
||||
pass
|
||||
else:
|
||||
now = timeutils.utcnow_ts()
|
||||
message_stats['oldest'] = utils.stat_message(oldest, now)
|
||||
message_stats['newest'] = utils.stat_message(newest, now)
|
||||
|
||||
return {'messages': message_stats}
|
||||
|
||||
|
||||
def _get_scoped_query(name, project):
|
||||
return {'p_t': utils.scope_queue_name(name, project)}
|
279
zaqar/storage/mongodb/topics.py
Normal file
279
zaqar/storage/mongodb/topics.py
Normal file
@ -0,0 +1,279 @@
|
||||
# Copyright (c) 2019 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Implements the MongoDB storage controller for topics.
|
||||
|
||||
Field Mappings:
|
||||
In order to reduce the disk / memory space used,
|
||||
field names will be, most of the time, the first
|
||||
letter of their long name.
|
||||
"""
|
||||
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import timeutils
|
||||
from pymongo.collection import ReturnDocument
|
||||
import pymongo.errors
|
||||
|
||||
from zaqar.common import decorators
|
||||
from zaqar.i18n import _
|
||||
from zaqar import storage
|
||||
from zaqar.storage import errors
|
||||
from zaqar.storage.mongodb import utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
# NOTE(wanghao): Keep this as same as queues'
|
||||
_TOPIC_CACHE_PREFIX = 'topiccontroller:'
|
||||
_TOPIC_CACHE_TTL = 5
|
||||
|
||||
|
||||
def _topic_exists_key(topic, project=None):
|
||||
# NOTE(kgriffs): Use string concatenation for performance,
|
||||
# also put project first since it is guaranteed to be
|
||||
# unique, which should reduce lookup time.
|
||||
return _TOPIC_CACHE_PREFIX + 'exists:' + str(project) + '/' + topic
|
||||
|
||||
|
||||
class TopicController(storage.Topic):
|
||||
"""Implements Topic resource operations using MongoDB.
|
||||
|
||||
Topics are scoped by project, which is prefixed to the
|
||||
topic name.
|
||||
|
||||
::
|
||||
|
||||
Topic:
|
||||
|
||||
Name Field
|
||||
---------------------
|
||||
name -> p_t
|
||||
msg counter -> c
|
||||
metadata -> m
|
||||
|
||||
Message Counter:
|
||||
|
||||
Name Field
|
||||
-------------------
|
||||
value -> v
|
||||
modified ts -> t
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(TopicController, self).__init__(*args, **kwargs)
|
||||
|
||||
self._cache = self.driver.cache
|
||||
self._collection = self.driver.topics_database.topics
|
||||
|
||||
# NOTE(flaper87): This creates a unique index for
|
||||
# project and name. Using project as the prefix
|
||||
# allows for querying by project and project+name.
|
||||
# This is also useful for retrieving the queues list for
|
||||
# a specific project, for example. Order matters!
|
||||
self._collection.ensure_index([('p_t', 1)], unique=True)
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ----------------------------------------------------------------------
|
||||
|
||||
def _get_counter(self, name, project=None):
|
||||
"""Retrieves the current message counter value for a given topic.
|
||||
|
||||
This helper is used to generate monotonic pagination
|
||||
markers that are saved as part of the message
|
||||
document.
|
||||
|
||||
Note 1: Markers are scoped per-topic and so are *not*
|
||||
globally unique or globally ordered.
|
||||
|
||||
Note 2: If two or more requests to this method are made
|
||||
in parallel, this method will return the same counter
|
||||
value. This is done intentionally so that the caller
|
||||
can detect a parallel message post, allowing it to
|
||||
mitigate race conditions between producer and
|
||||
observer clients.
|
||||
|
||||
:param name: Name of the queue to which the counter is scoped
|
||||
:param project: Topic's project
|
||||
:returns: current message counter as an integer
|
||||
"""
|
||||
|
||||
doc = self._collection.find_one(_get_scoped_query(name, project),
|
||||
projection={'c.v': 1, '_id': 0})
|
||||
|
||||
if doc is None:
|
||||
raise errors.TopicDoesNotExist(name, project)
|
||||
|
||||
return doc['c']['v']
|
||||
|
||||
def _inc_counter(self, name, project=None, amount=1, window=None):
|
||||
"""Increments the message counter and returns the new value.
|
||||
|
||||
:param name: Name of the topic to which the counter is scoped
|
||||
:param project: Topic's project name
|
||||
:param amount: (Default 1) Amount by which to increment the counter
|
||||
:param window: (Default None) A time window, in seconds, that
|
||||
must have elapsed since the counter was last updated, in
|
||||
order to increment the counter.
|
||||
|
||||
:returns: Updated message counter value, or None if window
|
||||
was specified, and the counter has already been updated
|
||||
within the specified time period.
|
||||
|
||||
:raises TopicDoesNotExist: if not found
|
||||
"""
|
||||
now = timeutils.utcnow_ts()
|
||||
|
||||
update = {'$inc': {'c.v': amount}, '$set': {'c.t': now}}
|
||||
query = _get_scoped_query(name, project)
|
||||
if window is not None:
|
||||
threshold = now - window
|
||||
query['c.t'] = {'$lt': threshold}
|
||||
|
||||
while True:
|
||||
try:
|
||||
doc = self._collection.find_one_and_update(
|
||||
query, update, return_document=ReturnDocument.AFTER,
|
||||
projection={'c.v': 1, '_id': 0})
|
||||
|
||||
break
|
||||
except pymongo.errors.AutoReconnect as ex:
|
||||
LOG.exception(ex)
|
||||
|
||||
if doc is None:
|
||||
if window is None:
|
||||
# NOTE(kgriffs): Since we did not filter by a time window,
|
||||
# the topic should have been found and updated. Perhaps
|
||||
# the topic has been deleted?
|
||||
message = _(u'Failed to increment the message '
|
||||
u'counter for topic %(name)s and '
|
||||
u'project %(project)s')
|
||||
message %= dict(name=name, project=project)
|
||||
|
||||
LOG.warning(message)
|
||||
|
||||
raise errors.TopicDoesNotExist(name, project)
|
||||
|
||||
# NOTE(kgriffs): Assume the topic existed, but the counter
|
||||
# was recently updated, causing the range topic on 'c.t' to
|
||||
# exclude the record.
|
||||
return None
|
||||
|
||||
return doc['c']['v']
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# Interface
|
||||
# ----------------------------------------------------------------------
|
||||
|
||||
def _get(self, name, project=None):
|
||||
try:
|
||||
return self.get_metadata(name, project)
|
||||
except errors.TopicDoesNotExist:
|
||||
return {}
|
||||
|
||||
def _list(self, project=None, kfilter={}, marker=None,
|
||||
limit=storage.DEFAULT_TOPICS_PER_PAGE, detailed=False,
|
||||
name=None):
|
||||
|
||||
query = utils.scoped_query(marker, project, name, kfilter,
|
||||
key_value='p_t')
|
||||
|
||||
projection = {'p_t': 1, '_id': 0}
|
||||
if detailed:
|
||||
projection['m'] = 1
|
||||
|
||||
cursor = self._collection.find(query, projection=projection)
|
||||
cursor = cursor.limit(limit).sort('p_t')
|
||||
marker_name = {}
|
||||
|
||||
def normalizer(record):
|
||||
topic = {'name': utils.descope_queue_name(record['p_t'])}
|
||||
marker_name['next'] = topic['name']
|
||||
if detailed:
|
||||
topic['metadata'] = record['m']
|
||||
return topic
|
||||
|
||||
yield utils.HookedCursor(cursor, normalizer)
|
||||
yield marker_name and marker_name['next']
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def get_metadata(self, name, project=None):
|
||||
queue = self._collection.find_one(_get_scoped_query(name, project),
|
||||
projection={'m': 1, '_id': 0})
|
||||
if queue is None:
|
||||
raise errors.TopicDoesNotExist(name, project)
|
||||
|
||||
return queue.get('m', {})
|
||||
|
||||
@utils.raises_conn_error
|
||||
# @utils.retries_on_autoreconnect
|
||||
def _create(self, name, metadata=None, project=None):
|
||||
# NOTE(flaper87): If the connection fails after it was called
|
||||
# and we retry to insert the topic, we could end up returning
|
||||
# `False` because of the `DuplicatedKeyError` although the
|
||||
# topic was indeed created by this API call.
|
||||
#
|
||||
# TODO(kgriffs): Commented out `retries_on_autoreconnect` for
|
||||
# now due to the above issue, since creating a topic is less
|
||||
# important to make super HA.
|
||||
|
||||
try:
|
||||
# NOTE(kgriffs): Start counting at 1, and assume the first
|
||||
# message ever posted will succeed and set t to a UNIX
|
||||
# "modified at" timestamp.
|
||||
counter = {'v': 1, 't': 0}
|
||||
|
||||
scoped_name = utils.scope_queue_name(name, project)
|
||||
self._collection.insert_one(
|
||||
{'p_t': scoped_name, 'm': metadata or {},
|
||||
'c': counter})
|
||||
|
||||
except pymongo.errors.DuplicateKeyError:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
# NOTE(kgriffs): Only cache when it exists; if it doesn't exist, and
|
||||
# someone creates it, we want it to be immediately visible.
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
@decorators.caches(_topic_exists_key, _TOPIC_CACHE_TTL, lambda v: v)
|
||||
def _exists(self, name, project=None):
|
||||
query = _get_scoped_query(name, project)
|
||||
return self._collection.find_one(query) is not None
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def set_metadata(self, name, metadata, project=None):
|
||||
rst = self._collection.update_one(_get_scoped_query(name, project),
|
||||
{'$set': {'m': metadata}})
|
||||
|
||||
if rst.matched_count == 0:
|
||||
raise errors.TopicDoesNotExist(name, project)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
@_exists.purges
|
||||
def _delete(self, name, project=None):
|
||||
self._collection.delete_one(_get_scoped_query(name, project))
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def _stats(self, name, project=None):
|
||||
pass
|
||||
|
||||
|
||||
def _get_scoped_query(name, project):
|
||||
return {'p_t': utils.scope_queue_name(name, project)}
|
@ -38,6 +38,8 @@ EPOCH = datetime.datetime.utcfromtimestamp(0).replace(tzinfo=tz_util.utc)
|
||||
# NOTE(cpp-cabrera): the authoritative form of project/queue keys.
|
||||
PROJ_QUEUE_KEY = 'p_q'
|
||||
|
||||
PROJ_TOPIC_KEY = 'p_t'
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@ -191,7 +193,8 @@ def parse_scoped_project_queue(scoped_name):
|
||||
return scoped_name.split('/')
|
||||
|
||||
|
||||
def scoped_query(queue, project, name=None, kfilter={}):
|
||||
def scoped_query(queue, project, name=None, kfilter={},
|
||||
key_value=PROJ_QUEUE_KEY):
|
||||
"""Returns a dict usable for querying for scoped project/queues.
|
||||
|
||||
:param queue: name of queue to seek
|
||||
@ -201,7 +204,7 @@ def scoped_query(queue, project, name=None, kfilter={}):
|
||||
:returns: query to issue
|
||||
:rtype: dict
|
||||
"""
|
||||
key = PROJ_QUEUE_KEY
|
||||
key = key_value
|
||||
query = {}
|
||||
scoped_name = scope_queue_name(queue, project)
|
||||
|
||||
|
@ -159,3 +159,11 @@ class DataDriver(base.DataDriverBase):
|
||||
stages.extend(_get_storage_pipeline('subscription', self.conf))
|
||||
stages.append(self._storage.subscription_controller)
|
||||
return common.Pipeline(stages)
|
||||
|
||||
@decorators.lazy_property(write=False)
|
||||
def topic_controller(self):
|
||||
stages = _get_builtin_entry_points('topic', self._storage,
|
||||
self.control_driver, self.conf)
|
||||
stages.extend(_get_storage_pipeline('topic', self.conf))
|
||||
stages.append(self._storage.topic_controller)
|
||||
return common.Pipeline(stages)
|
||||
|
@ -147,6 +147,14 @@ class DataDriver(storage.DataDriverBase):
|
||||
else:
|
||||
return controller
|
||||
|
||||
@decorators.lazy_property(write=False)
|
||||
def topic_controller(self):
|
||||
controller = TopicController(self._pool_catalog)
|
||||
if self.conf.profiler.enabled:
|
||||
return profiler.trace_cls("pooling_topic_controller")(controller)
|
||||
else:
|
||||
return controller
|
||||
|
||||
|
||||
class QueueController(storage.Queue):
|
||||
"""Routes operations to get the appropriate queue controller.
|
||||
@ -635,6 +643,20 @@ class Catalog(object):
|
||||
target = self.lookup(queue, project)
|
||||
return target and target.subscription_controller
|
||||
|
||||
def get_topic_controller(self, topic, project=None):
|
||||
"""Lookup the topic controller for the given queue and project.
|
||||
|
||||
:param topic: Name of the topic for which to find a pool
|
||||
:param project: Project to which the topic belongs, or
|
||||
None to specify the "global" or "generic" project.
|
||||
|
||||
:returns: The topic controller associated with the data driver for
|
||||
the pool containing (queue, project) or None if this doesn't exist.
|
||||
:rtype: Maybe TopicController
|
||||
"""
|
||||
target = self.lookup(topic, project)
|
||||
return target and target.topic_controller
|
||||
|
||||
def get_default_pool(self, use_listing=True):
|
||||
if use_listing:
|
||||
cursor = self._pools_ctrl.list(limit=0)
|
||||
@ -714,3 +736,112 @@ class Catalog(object):
|
||||
self._drivers[pool_id] = self._init_driver(pool_id, pool_conf)
|
||||
|
||||
return self._drivers[pool_id]
|
||||
|
||||
|
||||
class TopicController(storage.Topic):
|
||||
"""Routes operations to get the appropriate topic controller.
|
||||
|
||||
:param pool_catalog: a catalog of available pools
|
||||
:type pool_catalog: queues.pooling.base.Catalog
|
||||
"""
|
||||
|
||||
def __init__(self, pool_catalog):
|
||||
super(TopicController, self).__init__(None)
|
||||
self._pool_catalog = pool_catalog
|
||||
self._mgt_topic_ctrl = self._pool_catalog.control.topic_controller
|
||||
self._get_controller = self._pool_catalog.get_topic_controller
|
||||
|
||||
def _list(self, project=None, kfilter={}, marker=None,
|
||||
limit=storage.DEFAULT_TOPICS_PER_PAGE, detailed=False,
|
||||
name=None):
|
||||
|
||||
def all_pages():
|
||||
yield next(self._mgt_topic_ctrl.list(
|
||||
project=project,
|
||||
kfilter=kfilter,
|
||||
marker=marker,
|
||||
limit=limit,
|
||||
detailed=detailed,
|
||||
name=name))
|
||||
|
||||
# make a heap compared with 'name'
|
||||
ls = heapq.merge(*[
|
||||
utils.keyify('name', page)
|
||||
for page in all_pages()
|
||||
])
|
||||
|
||||
marker_name = {}
|
||||
|
||||
# limit the iterator and strip out the comparison wrapper
|
||||
def it():
|
||||
for topic_cmp in itertools.islice(ls, limit):
|
||||
marker_name['next'] = topic_cmp.obj['name']
|
||||
yield topic_cmp.obj
|
||||
|
||||
yield it()
|
||||
yield marker_name and marker_name['next']
|
||||
|
||||
def _get(self, name, project=None):
|
||||
try:
|
||||
return self.get_metadata(name, project)
|
||||
except errors.TopicDoesNotExist:
|
||||
return {}
|
||||
|
||||
def _create(self, name, metadata=None, project=None):
|
||||
flavor = None
|
||||
if isinstance(metadata, dict):
|
||||
flavor = metadata.get('_flavor')
|
||||
|
||||
self._pool_catalog.register(name, project=project, flavor=flavor)
|
||||
|
||||
# NOTE(cpp-cabrera): This should always succeed since we just
|
||||
# registered the project/topic. There is a race condition,
|
||||
# however. If between the time we register a topic and go to
|
||||
# look it up, the topic is deleted, then this assertion will
|
||||
# fail.
|
||||
pool = self._pool_catalog.lookup(name, project)
|
||||
if not pool:
|
||||
raise RuntimeError('Failed to register topic')
|
||||
return self._mgt_topic_ctrl.create(name, metadata=metadata,
|
||||
project=project)
|
||||
|
||||
def _delete(self, name, project=None):
|
||||
mtHandler = self._get_controller(name, project)
|
||||
if mtHandler:
|
||||
# NOTE(cpp-cabrera): delete from the catalogue first. If
|
||||
# zaqar crashes in the middle of these two operations,
|
||||
# it is desirable that the entry be missing from the
|
||||
# catalogue and present in storage, rather than the
|
||||
# reverse. The former case leads to all operations
|
||||
# behaving as expected: 404s across the board, and a
|
||||
# functionally equivalent 204 on a create queue. The
|
||||
# latter case is more difficult to reason about, and may
|
||||
# yield 500s in some operations.
|
||||
self._pool_catalog.deregister(name, project)
|
||||
mtHandler.delete(name, project)
|
||||
|
||||
return self._mgt_topic_ctrl.delete(name, project)
|
||||
|
||||
def _exists(self, name, project=None):
|
||||
return self._mgt_topic_ctrl.exists(name, project=project)
|
||||
|
||||
def get_metadata(self, name, project=None):
|
||||
return self._mgt_topic_ctrl.get_metadata(name, project=project)
|
||||
|
||||
def set_metadata(self, name, metadata, project=None):
|
||||
# NOTE(gengchc2): If flavor metadata is modified in topic,
|
||||
# The topic needs to be re-registered to pools, otherwise
|
||||
# the topic flavor parameter is not consistent with the pool.
|
||||
flavor = None
|
||||
if isinstance(metadata, dict):
|
||||
flavor = metadata.get('_flavor')
|
||||
self._pool_catalog.register(name, project=project, flavor=flavor)
|
||||
|
||||
return self._mgt_topic_ctrl.set_metadata(name, metadata=metadata,
|
||||
project=project)
|
||||
|
||||
def _stats(self, name, project=None):
|
||||
mtHandler = self._get_controller(name, project)
|
||||
if mtHandler:
|
||||
return mtHandler.stats(name, project=project)
|
||||
raise errors.TopicDoesNotExist(name, project)
|
||||
|
@ -296,6 +296,10 @@ class ControlDriver(storage.ControlDriverBase):
|
||||
else:
|
||||
return controller
|
||||
|
||||
@decorators.lazy_property(write=False)
|
||||
def topic_controller(self):
|
||||
pass
|
||||
|
||||
|
||||
def _get_redis_client(driver):
|
||||
conf = driver.redis_conf
|
||||
|
@ -637,3 +637,53 @@ class MessageQueueHandler(object):
|
||||
message_stats['oldest'] = oldest
|
||||
|
||||
return {'messages': message_stats}
|
||||
|
||||
|
||||
class MessageTopicHandler(object):
|
||||
def __init__(self, driver, control_driver):
|
||||
self.driver = driver
|
||||
self._client = self.driver.connection
|
||||
self._topic_ctrl = self.driver.topic_controller
|
||||
self._message_ctrl = self.driver.message_controller
|
||||
|
||||
@utils.raises_conn_error
|
||||
def create(self, name, metadata=None, project=None):
|
||||
with self._client.pipeline() as pipe:
|
||||
self._message_ctrl._create_msgset(name, project, pipe)
|
||||
|
||||
try:
|
||||
pipe.execute()
|
||||
except redis.exceptions.ResponseError:
|
||||
return False
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def delete(self, name, project=None):
|
||||
with self._client.pipeline() as pipe:
|
||||
self._message_ctrl._delete_msgset(name, project, pipe)
|
||||
self._message_ctrl._delete_queue_messages(name, project, pipe)
|
||||
pipe.execute()
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def stats(self, name, project=None):
|
||||
if not self._topic_ctrl.exists(name, project=project):
|
||||
raise errors.TopicDoesNotExist(name, project)
|
||||
|
||||
total = self._message_ctrl._count(name, project)
|
||||
|
||||
message_stats = {
|
||||
'total': total
|
||||
}
|
||||
|
||||
if total:
|
||||
try:
|
||||
newest = self._message_ctrl.first(name, project, -1)
|
||||
oldest = self._message_ctrl.first(name, project, 1)
|
||||
except errors.QueueIsEmpty:
|
||||
pass
|
||||
else:
|
||||
message_stats['newest'] = newest
|
||||
message_stats['oldest'] = oldest
|
||||
|
||||
return {'messages': message_stats}
|
||||
|
@ -110,3 +110,7 @@ class ControlDriver(storage.ControlDriverBase):
|
||||
"controller")(controller)
|
||||
else:
|
||||
return controller
|
||||
|
||||
@property
|
||||
def topic_controller(self):
|
||||
pass
|
||||
|
@ -386,3 +386,89 @@ class MessageQueueHandler(object):
|
||||
raise
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
class MessageTopicHandler(object):
|
||||
def __init__(self, driver, control_driver):
|
||||
self.driver = driver
|
||||
self._client = self.driver.connection
|
||||
self._topic_ctrl = self.driver.topic_controller
|
||||
self._message_ctrl = self.driver.message_controller
|
||||
|
||||
def create(self, name, metadata=None, project=None):
|
||||
self._client.put_container(utils._message_container(name, project))
|
||||
|
||||
def delete(self, name, project=None):
|
||||
for container in [utils._message_container(name, project)]:
|
||||
try:
|
||||
headers, objects = self._client.get_container(container)
|
||||
except swiftclient.ClientException as exc:
|
||||
if exc.http_status != 404:
|
||||
raise
|
||||
else:
|
||||
for obj in objects:
|
||||
try:
|
||||
self._client.delete_object(container, obj['name'])
|
||||
except swiftclient.ClientException as exc:
|
||||
if exc.http_status != 404:
|
||||
raise
|
||||
try:
|
||||
self._client.delete_container(container)
|
||||
except swiftclient.ClientException as exc:
|
||||
if exc.http_status not in (404, 409):
|
||||
raise
|
||||
|
||||
def stats(self, name, project=None):
|
||||
if not self._topic_ctrl.exists(name, project=project):
|
||||
raise errors.TopicDoesNotExist(name, project)
|
||||
|
||||
total = 0
|
||||
container = utils._message_container(name, project)
|
||||
|
||||
try:
|
||||
_, objects = self._client.get_container(container)
|
||||
except swiftclient.ClientException as exc:
|
||||
if exc.http_status == 404:
|
||||
raise errors.QueueIsEmpty(name, project)
|
||||
|
||||
newest = None
|
||||
oldest = None
|
||||
now = timeutils.utcnow_ts(True)
|
||||
for obj in objects:
|
||||
try:
|
||||
headers = self._client.head_object(container, obj['name'])
|
||||
except swiftclient.ClientException as exc:
|
||||
if exc.http_status != 404:
|
||||
raise
|
||||
else:
|
||||
created = float(headers['x-timestamp'])
|
||||
created_iso = datetime.datetime.utcfromtimestamp(
|
||||
created).strftime('%Y-%m-%dT%H:%M:%SZ')
|
||||
newest = {
|
||||
'id': obj['name'],
|
||||
'age': now - created,
|
||||
'created': created_iso}
|
||||
if oldest is None:
|
||||
oldest = copy.deepcopy(newest)
|
||||
total += 1
|
||||
|
||||
msg_stats = {
|
||||
'total': total,
|
||||
}
|
||||
if newest is not None:
|
||||
msg_stats['newest'] = newest
|
||||
msg_stats['oldest'] = oldest
|
||||
|
||||
return {'messages': msg_stats}
|
||||
|
||||
def exists(self, topic, project=None):
|
||||
try:
|
||||
self._client.head_container(utils._message_container(topic,
|
||||
project))
|
||||
|
||||
except swiftclient.ClientException as exc:
|
||||
if exc.http_status == 404:
|
||||
return False
|
||||
raise
|
||||
else:
|
||||
return True
|
||||
|
@ -61,6 +61,10 @@ class DataDriver(storage.DataDriverBase):
|
||||
def subscription_controller(self):
|
||||
return None
|
||||
|
||||
@property
|
||||
def topic_controller(self):
|
||||
return self.control_driver.topic_controller
|
||||
|
||||
|
||||
class ControlDriver(storage.ControlDriverBase):
|
||||
|
||||
@ -86,6 +90,10 @@ class ControlDriver(storage.ControlDriverBase):
|
||||
def flavors_controller(self):
|
||||
return None
|
||||
|
||||
@property
|
||||
def topic_controller(self):
|
||||
return TopicController(self)
|
||||
|
||||
|
||||
class QueueController(storage.Queue):
|
||||
def __init__(self, driver):
|
||||
@ -144,3 +152,32 @@ class MessageController(storage.Message):
|
||||
|
||||
def bulk_delete(self, queue, message_ids, project=None, claim_ids=None):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class TopicController(storage.Topic):
|
||||
def __init__(self, driver):
|
||||
pass
|
||||
|
||||
def _list(self, project=None):
|
||||
raise NotImplementedError()
|
||||
|
||||
def _get(self, name, project=None):
|
||||
raise NotImplementedError()
|
||||
|
||||
def get_metadata(self, name, project=None):
|
||||
raise NotImplementedError()
|
||||
|
||||
def _create(self, name, metadata=None, project=None):
|
||||
raise NotImplementedError()
|
||||
|
||||
def _exists(self, name, project=None):
|
||||
raise NotImplementedError()
|
||||
|
||||
def set_metadata(self, name, metadata, project=None):
|
||||
raise NotImplementedError()
|
||||
|
||||
def _delete(self, name, project=None):
|
||||
raise NotImplementedError()
|
||||
|
||||
def _stats(self, name, project=None):
|
||||
raise NotImplementedError()
|
||||
|
608
zaqar/tests/unit/transport/wsgi/v2_0/test_topic_lifecycle.py
Normal file
608
zaqar/tests/unit/transport/wsgi/v2_0/test_topic_lifecycle.py
Normal file
@ -0,0 +1,608 @@
|
||||
# Copyright (c) 2019 Rackspace, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
||||
# use this file except in compliance with the License. You may obtain a copy
|
||||
# of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations under
|
||||
# the License.
|
||||
|
||||
|
||||
import ddt
|
||||
import falcon
|
||||
import mock
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import uuidutils
|
||||
import six
|
||||
|
||||
from zaqar.storage import errors as storage_errors
|
||||
from zaqar import tests as testing
|
||||
from zaqar.tests.unit.transport.wsgi import base
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class TestTopicLifecycleMongoDB(base.V2Base):
|
||||
|
||||
config_file = 'wsgi_mongodb.conf'
|
||||
|
||||
@testing.requires_mongodb
|
||||
def setUp(self):
|
||||
super(TestTopicLifecycleMongoDB, self).setUp()
|
||||
|
||||
self.topic_path = self.url_prefix + '/topics'
|
||||
self.mars_topic_path = self.topic_path + '/mars'
|
||||
self.venus_topic_path = self.topic_path + '/venus'
|
||||
|
||||
self.headers = {
|
||||
'Client-ID': uuidutils.generate_uuid(),
|
||||
'X-Project-ID': '3387309841abc_'
|
||||
}
|
||||
|
||||
def tearDown(self):
|
||||
control = self.boot.control
|
||||
storage = self.boot.storage._storage
|
||||
connection = storage.connection
|
||||
|
||||
connection.drop_database(control.topics_database)
|
||||
|
||||
for db in storage.message_databases:
|
||||
connection.drop_database(db)
|
||||
|
||||
super(TestTopicLifecycleMongoDB, self).tearDown()
|
||||
|
||||
def test_without_project_id(self):
|
||||
headers = {
|
||||
'Client-ID': uuidutils.generate_uuid(),
|
||||
}
|
||||
|
||||
self.simulate_put(self.mars_topic_path, headers=headers,
|
||||
need_project_id=False)
|
||||
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||
|
||||
self.simulate_delete(self.mars_topic_path, headers=headers,
|
||||
need_project_id=False)
|
||||
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||
|
||||
def test_empty_project_id(self):
|
||||
headers = {
|
||||
'Client-ID': uuidutils.generate_uuid(),
|
||||
'X-Project-ID': ''
|
||||
}
|
||||
|
||||
self.simulate_put(self.mars_topic_path, headers=headers)
|
||||
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||
|
||||
self.simulate_delete(self.mars_topic_path, headers=headers)
|
||||
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||
|
||||
@ddt.data('480924', 'foo')
|
||||
def test_basics_thoroughly(self, project_id):
|
||||
headers = {
|
||||
'Client-ID': uuidutils.generate_uuid(),
|
||||
'X-Project-ID': project_id
|
||||
}
|
||||
mars_topic_path_stats = self.mars_topic_path + '/stats'
|
||||
|
||||
# Stats are empty - topic not created yet
|
||||
self.simulate_get(mars_topic_path_stats, headers=headers)
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
|
||||
# Create
|
||||
doc = '{"messages": {"ttl": 600}}'
|
||||
self.simulate_put(self.mars_topic_path,
|
||||
headers=headers, body=doc)
|
||||
self.assertEqual(falcon.HTTP_201, self.srmock.status)
|
||||
|
||||
location = self.srmock.headers_dict['Location']
|
||||
self.assertEqual(location, self.mars_topic_path)
|
||||
|
||||
# Fetch metadata
|
||||
result = self.simulate_get(self.mars_topic_path,
|
||||
headers=headers)
|
||||
result_doc = jsonutils.loads(result[0])
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
ref_doc = jsonutils.loads(doc)
|
||||
ref_doc['_default_message_ttl'] = 3600
|
||||
ref_doc['_max_messages_post_size'] = 262144
|
||||
ref_doc['_default_message_delay'] = 0
|
||||
self.assertEqual(ref_doc, result_doc)
|
||||
|
||||
# Stats empty topic
|
||||
self.simulate_get(mars_topic_path_stats, headers=headers)
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
|
||||
# Delete
|
||||
self.simulate_delete(self.mars_topic_path, headers=headers)
|
||||
self.assertEqual(falcon.HTTP_204, self.srmock.status)
|
||||
|
||||
# Get non-existent stats
|
||||
self.simulate_get(mars_topic_path_stats, headers=headers)
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
|
||||
@ddt.data('1234567890', '11111111111111111111111111111111111')
|
||||
def test_basics_thoroughly_with_different_client_id(self, client_id):
|
||||
self.conf.set_override('client_id_uuid_safe', 'off', 'transport')
|
||||
headers = {
|
||||
'Client-ID': client_id,
|
||||
'X-Project-ID': '480924'
|
||||
}
|
||||
mars_topic_path_stats = self.mars_topic_path + '/stats'
|
||||
|
||||
# Stats are empty - topic not created yet
|
||||
self.simulate_get(mars_topic_path_stats, headers=headers)
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
|
||||
# Create
|
||||
doc = '{"messages": {"ttl": 600}}'
|
||||
self.simulate_put(self.mars_topic_path,
|
||||
headers=headers, body=doc)
|
||||
self.assertEqual(falcon.HTTP_201, self.srmock.status)
|
||||
|
||||
location = self.srmock.headers_dict['Location']
|
||||
self.assertEqual(location, self.mars_topic_path)
|
||||
|
||||
# Fetch metadata
|
||||
result = self.simulate_get(self.mars_topic_path,
|
||||
headers=headers)
|
||||
result_doc = jsonutils.loads(result[0])
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
ref_doc = jsonutils.loads(doc)
|
||||
ref_doc['_default_message_ttl'] = 3600
|
||||
ref_doc['_max_messages_post_size'] = 262144
|
||||
ref_doc['_default_message_delay'] = 0
|
||||
self.assertEqual(ref_doc, result_doc)
|
||||
|
||||
# Stats empty topic
|
||||
self.simulate_get(mars_topic_path_stats, headers=headers)
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
|
||||
# Delete
|
||||
self.simulate_delete(self.mars_topic_path, headers=headers)
|
||||
self.assertEqual(falcon.HTTP_204, self.srmock.status)
|
||||
|
||||
# Get non-existent stats
|
||||
self.simulate_get(mars_topic_path_stats, headers=headers)
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
|
||||
def test_name_restrictions(self):
|
||||
self.simulate_put(self.topic_path + '/Nice-Boat_2',
|
||||
headers=self.headers)
|
||||
self.assertEqual(falcon.HTTP_201, self.srmock.status)
|
||||
|
||||
self.simulate_put(self.topic_path + '/Nice-Bo@t',
|
||||
headers=self.headers)
|
||||
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||
|
||||
self.simulate_put(self.topic_path + '/_' + 'niceboat' * 8,
|
||||
headers=self.headers)
|
||||
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||
|
||||
self.simulate_put(self.topic_path + '/Service.test_topic',
|
||||
headers=self.headers)
|
||||
self.assertEqual(falcon.HTTP_201, self.srmock.status)
|
||||
|
||||
def test_project_id_restriction(self):
|
||||
muvluv_topic_path = self.topic_path + '/Muv-Luv'
|
||||
|
||||
self.simulate_put(muvluv_topic_path,
|
||||
headers={'Client-ID': uuidutils.generate_uuid(),
|
||||
'X-Project-ID': 'JAM Project' * 24})
|
||||
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||
|
||||
# no charset restrictions
|
||||
self.simulate_put(muvluv_topic_path,
|
||||
headers={'Client-ID': uuidutils.generate_uuid(),
|
||||
'X-Project-ID': 'JAM Project'})
|
||||
self.assertEqual(falcon.HTTP_201, self.srmock.status)
|
||||
|
||||
def test_non_ascii_name(self):
|
||||
test_params = ((u'/topics/non-ascii-n\u0153me', 'utf-8'),
|
||||
(u'/topics/non-ascii-n\xc4me', 'iso8859-1'))
|
||||
|
||||
for uri, enc in test_params:
|
||||
uri = self.url_prefix + uri
|
||||
|
||||
if six.PY2:
|
||||
uri = uri.encode(enc)
|
||||
|
||||
self.simulate_put(uri, headers=self.headers)
|
||||
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||
|
||||
self.simulate_delete(uri, headers=self.headers)
|
||||
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||
|
||||
def test_no_metadata(self):
|
||||
self.simulate_put(self.venus_topic_path,
|
||||
headers=self.headers)
|
||||
self.assertEqual(falcon.HTTP_201, self.srmock.status)
|
||||
|
||||
self.simulate_put(self.venus_topic_path, body='',
|
||||
headers=self.headers)
|
||||
self.assertEqual(falcon.HTTP_204, self.srmock.status)
|
||||
|
||||
result = self.simulate_get(self.venus_topic_path,
|
||||
headers=self.headers)
|
||||
result_doc = jsonutils.loads(result[0])
|
||||
self.assertEqual(256 * 1024,
|
||||
result_doc.get('_max_messages_post_size'))
|
||||
self.assertEqual(3600,
|
||||
result_doc.get('_default_message_ttl'))
|
||||
self.assertEqual(0,
|
||||
result_doc.get('_default_message_delay'))
|
||||
|
||||
@ddt.data('{', '[]', '.', ' ')
|
||||
def test_bad_metadata(self, document):
|
||||
self.simulate_put(self.venus_topic_path,
|
||||
headers=self.headers,
|
||||
body=document)
|
||||
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||
|
||||
def test_too_much_metadata(self):
|
||||
self.simulate_put(self.venus_topic_path, headers=self.headers)
|
||||
self.assertEqual(falcon.HTTP_201, self.srmock.status)
|
||||
doc = '{{"messages": {{"ttl": 600}}, "padding": "{pad}"}}'
|
||||
|
||||
max_size = self.transport_cfg.max_queue_metadata
|
||||
padding_len = max_size - (len(doc) - 10) + 1
|
||||
|
||||
doc = doc.format(pad='x' * padding_len)
|
||||
|
||||
self.simulate_put(self.venus_topic_path,
|
||||
headers=self.headers,
|
||||
body=doc)
|
||||
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||
|
||||
def test_way_too_much_metadata(self):
|
||||
self.simulate_put(self.venus_topic_path, headers=self.headers)
|
||||
self.assertEqual(falcon.HTTP_201, self.srmock.status)
|
||||
doc = '{{"messages": {{"ttl": 600}}, "padding": "{pad}"}}'
|
||||
|
||||
max_size = self.transport_cfg.max_queue_metadata
|
||||
padding_len = max_size * 100
|
||||
|
||||
doc = doc.format(pad='x' * padding_len)
|
||||
|
||||
self.simulate_put(self.venus_topic_path,
|
||||
headers=self.headers, body=doc)
|
||||
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||
|
||||
def test_custom_metadata(self):
|
||||
# Set
|
||||
doc = '{{"messages": {{"ttl": 600}}, "padding": "{pad}"}}'
|
||||
|
||||
max_size = self.transport_cfg.max_queue_metadata
|
||||
padding_len = max_size - (len(doc) - 2)
|
||||
|
||||
doc = doc.format(pad='x' * padding_len)
|
||||
self.simulate_put(self.venus_topic_path,
|
||||
headers=self.headers,
|
||||
body=doc)
|
||||
self.assertEqual(falcon.HTTP_201, self.srmock.status)
|
||||
|
||||
# Get
|
||||
result = self.simulate_get(self.venus_topic_path,
|
||||
headers=self.headers)
|
||||
result_doc = jsonutils.loads(result[0])
|
||||
ref_doc = jsonutils.loads(doc)
|
||||
ref_doc['_default_message_ttl'] = 3600
|
||||
ref_doc['_max_messages_post_size'] = 262144
|
||||
ref_doc['_default_message_delay'] = 0
|
||||
self.assertEqual(ref_doc, result_doc)
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
|
||||
def test_update_metadata(self):
|
||||
xyz_topic_path = self.url_prefix + '/topics/xyz'
|
||||
xyz_topic_path_metadata = xyz_topic_path
|
||||
headers = {
|
||||
'Client-ID': uuidutils.generate_uuid(),
|
||||
'X-Project-ID': uuidutils.generate_uuid()
|
||||
}
|
||||
# Create
|
||||
self.simulate_put(xyz_topic_path, headers=headers)
|
||||
self.assertEqual(falcon.HTTP_201, self.srmock.status)
|
||||
|
||||
headers.update({'Content-Type':
|
||||
"application/openstack-messaging-v2.0-json-patch"})
|
||||
# add metadata
|
||||
doc1 = ('[{"op":"add", "path": "/metadata/key1", "value": 1},'
|
||||
'{"op":"add", "path": "/metadata/key2", "value": 1}]')
|
||||
self.simulate_patch(xyz_topic_path_metadata,
|
||||
headers=headers,
|
||||
body=doc1)
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
|
||||
# remove reserved metadata, zaqar will do nothing and return 200,
|
||||
# because
|
||||
doc3 = '[{"op":"remove", "path": "/metadata/_default_message_ttl"}]'
|
||||
self.simulate_patch(xyz_topic_path_metadata,
|
||||
headers=headers,
|
||||
body=doc3)
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
|
||||
# replace metadata
|
||||
doc2 = '[{"op":"replace", "path": "/metadata/key1", "value": 2}]'
|
||||
self.simulate_patch(xyz_topic_path_metadata,
|
||||
headers=headers,
|
||||
body=doc2)
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
|
||||
# replace reserved metadata, zaqar will store the reserved metadata
|
||||
doc2 = ('[{"op":"replace", "path": "/metadata/_default_message_ttl",'
|
||||
'"value": 300}]')
|
||||
self.simulate_patch(xyz_topic_path_metadata,
|
||||
headers=headers,
|
||||
body=doc2)
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
|
||||
# Get
|
||||
result = self.simulate_get(xyz_topic_path_metadata,
|
||||
headers=headers)
|
||||
result_doc = jsonutils.loads(result[0])
|
||||
self.assertEqual({'key1': 2, 'key2': 1,
|
||||
'_default_message_ttl': 300,
|
||||
'_max_messages_post_size': 262144,
|
||||
'_default_message_delay': 0}, result_doc)
|
||||
|
||||
# remove metadata
|
||||
doc3 = '[{"op":"remove", "path": "/metadata/key1"}]'
|
||||
self.simulate_patch(xyz_topic_path_metadata,
|
||||
headers=headers,
|
||||
body=doc3)
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
|
||||
# remove reserved metadata
|
||||
doc3 = '[{"op":"remove", "path": "/metadata/_default_message_ttl"}]'
|
||||
self.simulate_patch(xyz_topic_path_metadata,
|
||||
headers=headers,
|
||||
body=doc3)
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
|
||||
# Get
|
||||
result = self.simulate_get(xyz_topic_path_metadata,
|
||||
headers=headers)
|
||||
result_doc = jsonutils.loads(result[0])
|
||||
self.assertEqual({'key2': 1, '_default_message_ttl': 3600,
|
||||
'_max_messages_post_size': 262144,
|
||||
'_default_message_delay': 0}, result_doc)
|
||||
|
||||
# replace non-existent metadata
|
||||
doc4 = '[{"op":"replace", "path": "/metadata/key3", "value":2}]'
|
||||
self.simulate_patch(xyz_topic_path_metadata,
|
||||
headers=headers,
|
||||
body=doc4)
|
||||
self.assertEqual(falcon.HTTP_409, self.srmock.status)
|
||||
|
||||
# remove non-existent metadata
|
||||
doc5 = '[{"op":"remove", "path": "/metadata/key3"}]'
|
||||
self.simulate_patch(xyz_topic_path_metadata,
|
||||
headers=headers,
|
||||
body=doc5)
|
||||
self.assertEqual(falcon.HTTP_409, self.srmock.status)
|
||||
|
||||
self.simulate_delete(xyz_topic_path, headers=headers)
|
||||
|
||||
# add metadata to non-existent topic
|
||||
doc1 = ('[{"op":"add", "path": "/metadata/key1", "value": 1},'
|
||||
'{"op":"add", "path": "/metadata/key2", "value": 1}]')
|
||||
self.simulate_patch(xyz_topic_path_metadata,
|
||||
headers=headers,
|
||||
body=doc1)
|
||||
self.assertEqual(falcon.HTTP_404, self.srmock.status)
|
||||
|
||||
# replace metadata in non-existent topic
|
||||
doc4 = '[{"op":"replace", "path": "/metadata/key3", "value":2}]'
|
||||
self.simulate_patch(xyz_topic_path_metadata,
|
||||
headers=headers,
|
||||
body=doc4)
|
||||
self.assertEqual(falcon.HTTP_404, self.srmock.status)
|
||||
|
||||
# remove metadata from non-existent topic
|
||||
doc5 = '[{"op":"remove", "path": "/metadata/key3"}]'
|
||||
self.simulate_patch(xyz_topic_path_metadata,
|
||||
headers=headers,
|
||||
body=doc5)
|
||||
self.assertEqual(falcon.HTTP_404, self.srmock.status)
|
||||
|
||||
def test_list(self):
|
||||
arbitrary_number = 644079696574693
|
||||
project_id = str(arbitrary_number)
|
||||
client_id = uuidutils.generate_uuid()
|
||||
header = {
|
||||
'X-Project-ID': project_id,
|
||||
'Client-ID': client_id
|
||||
}
|
||||
|
||||
# NOTE(kgriffs): It's important that this one sort after the one
|
||||
# above. This is in order to prove that bug/1236605 is fixed, and
|
||||
# stays fixed!
|
||||
alt_project_id = str(arbitrary_number + 1)
|
||||
|
||||
# List empty
|
||||
result = self.simulate_get(self.topic_path, headers=header)
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
results = jsonutils.loads(result[0])
|
||||
self.assertEqual([], results['topics'])
|
||||
self.assertIn('links', results)
|
||||
self.assertEqual(0, len(results['links']))
|
||||
|
||||
# Payload exceeded
|
||||
self.simulate_get(self.topic_path, headers=header,
|
||||
query_string='limit=21')
|
||||
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||
|
||||
# Create some
|
||||
def create_topic(name, project_id, body):
|
||||
altheader = {'Client-ID': client_id}
|
||||
if project_id is not None:
|
||||
altheader['X-Project-ID'] = project_id
|
||||
uri = self.topic_path + '/' + name
|
||||
self.simulate_put(uri, headers=altheader, body=body)
|
||||
|
||||
create_topic('q1', project_id, '{"node": 31}')
|
||||
create_topic('q2', project_id, '{"node": 32}')
|
||||
create_topic('q3', project_id, '{"node": 33}')
|
||||
|
||||
create_topic('q3', alt_project_id, '{"alt": 1}')
|
||||
|
||||
# List (limit)
|
||||
result = self.simulate_get(self.topic_path, headers=header,
|
||||
query_string='limit=2')
|
||||
|
||||
result_doc = jsonutils.loads(result[0])
|
||||
self.assertEqual(2, len(result_doc['topics']))
|
||||
|
||||
# List (no metadata, get all)
|
||||
result = self.simulate_get(self.topic_path,
|
||||
headers=header, query_string='limit=5')
|
||||
|
||||
result_doc = jsonutils.loads(result[0])
|
||||
[target, params] = result_doc['links'][0]['href'].split('?')
|
||||
self.simulate_get(target, headers=header, query_string=params)
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
|
||||
# Ensure we didn't pick up the topic from the alt project.
|
||||
topics = result_doc['topics']
|
||||
self.assertEqual(3, len(topics))
|
||||
|
||||
# List with metadata
|
||||
result = self.simulate_get(self.topic_path, headers=header,
|
||||
query_string='detailed=true')
|
||||
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
result_doc = jsonutils.loads(result[0])
|
||||
[target, params] = result_doc['links'][0]['href'].split('?')
|
||||
|
||||
topic = result_doc['topics'][0]
|
||||
result = self.simulate_get(topic['href'], headers=header)
|
||||
result_doc = jsonutils.loads(result[0])
|
||||
self.assertEqual(topic['metadata'], result_doc)
|
||||
self.assertEqual({'node': 31, '_default_message_ttl': 3600,
|
||||
'_max_messages_post_size': 262144,
|
||||
'_default_message_delay': 0}, result_doc)
|
||||
|
||||
# topic filter
|
||||
result = self.simulate_get(self.topic_path, headers=header,
|
||||
query_string='node=34')
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
result_doc = jsonutils.loads(result[0])
|
||||
self.assertEqual(0, len(result_doc['topics']))
|
||||
|
||||
# List tail
|
||||
self.simulate_get(target, headers=header, query_string=params)
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
|
||||
# List manually-constructed tail
|
||||
self.simulate_get(target, headers=header, query_string='marker=zzz')
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
|
||||
def test_list_returns_503_on_nopoolfound_exception(self):
|
||||
arbitrary_number = 644079696574693
|
||||
project_id = str(arbitrary_number)
|
||||
client_id = uuidutils.generate_uuid()
|
||||
header = {
|
||||
'X-Project-ID': project_id,
|
||||
'Client-ID': client_id
|
||||
}
|
||||
|
||||
topic_controller = self.boot.storage.topic_controller
|
||||
|
||||
with mock.patch.object(topic_controller, 'list') as mock_topic_list:
|
||||
|
||||
def topic_generator():
|
||||
raise storage_errors.NoPoolFound()
|
||||
|
||||
# This generator tries to be like topic controller list generator
|
||||
# in some ways.
|
||||
def fake_generator():
|
||||
yield topic_generator()
|
||||
yield {}
|
||||
mock_topic_list.return_value = fake_generator()
|
||||
self.simulate_get(self.topic_path, headers=header)
|
||||
self.assertEqual(falcon.HTTP_503, self.srmock.status)
|
||||
|
||||
def test_list_with_filter(self):
|
||||
arbitrary_number = 644079696574693
|
||||
project_id = str(arbitrary_number)
|
||||
client_id = uuidutils.generate_uuid()
|
||||
header = {
|
||||
'X-Project-ID': project_id,
|
||||
'Client-ID': client_id
|
||||
}
|
||||
|
||||
# Create some
|
||||
def create_topic(name, project_id, body):
|
||||
altheader = {'Client-ID': client_id}
|
||||
if project_id is not None:
|
||||
altheader['X-Project-ID'] = project_id
|
||||
uri = self.topic_path + '/' + name
|
||||
self.simulate_put(uri, headers=altheader, body=body)
|
||||
|
||||
create_topic('q1', project_id, '{"test_metadata_key1": "value1"}')
|
||||
create_topic('q2', project_id, '{"_max_messages_post_size": 2000}')
|
||||
create_topic('q3', project_id, '{"test_metadata_key2": 30}')
|
||||
|
||||
# List (filter query)
|
||||
result = self.simulate_get(self.topic_path, headers=header,
|
||||
query_string='name=q&test_metadata_key2=30')
|
||||
|
||||
result_doc = jsonutils.loads(result[0])
|
||||
self.assertEqual(1, len(result_doc['topics']))
|
||||
self.assertEqual('q3', result_doc['topics'][0]['name'])
|
||||
|
||||
# List (filter query)
|
||||
result = self.simulate_get(self.topic_path, headers=header,
|
||||
query_string='_max_messages_post_size=2000')
|
||||
|
||||
result_doc = jsonutils.loads(result[0])
|
||||
self.assertEqual(1, len(result_doc['topics']))
|
||||
self.assertEqual('q2', result_doc['topics'][0]['name'])
|
||||
|
||||
# List (filter query)
|
||||
result = self.simulate_get(self.topic_path, headers=header,
|
||||
query_string='name=q')
|
||||
|
||||
result_doc = jsonutils.loads(result[0])
|
||||
self.assertEqual(3, len(result_doc['topics']))
|
||||
|
||||
|
||||
class TestTopicLifecycleFaultyDriver(base.V2BaseFaulty):
|
||||
|
||||
config_file = 'wsgi_faulty.conf'
|
||||
|
||||
def test_simple(self):
|
||||
self.headers = {
|
||||
'Client-ID': uuidutils.generate_uuid(),
|
||||
'X-Project-ID': '338730984abc_1'
|
||||
}
|
||||
|
||||
mars_topic_path = self.url_prefix + '/topics/mars'
|
||||
doc = '{"messages": {"ttl": 600}}'
|
||||
self.simulate_put(mars_topic_path,
|
||||
headers=self.headers,
|
||||
body=doc)
|
||||
self.assertEqual(falcon.HTTP_503, self.srmock.status)
|
||||
|
||||
location = ('Location', mars_topic_path)
|
||||
self.assertNotIn(location, self.srmock.headers)
|
||||
|
||||
result = self.simulate_get(mars_topic_path,
|
||||
headers=self.headers)
|
||||
result_doc = jsonutils.loads(result[0])
|
||||
self.assertEqual(falcon.HTTP_503, self.srmock.status)
|
||||
self.assertNotEqual(result_doc, jsonutils.loads(doc))
|
||||
|
||||
self.simulate_get(mars_topic_path + '/stats',
|
||||
headers=self.headers)
|
||||
self.assertEqual(falcon.HTTP_503, self.srmock.status)
|
||||
|
||||
self.simulate_get(self.url_prefix + '/topics',
|
||||
headers=self.headers)
|
||||
self.assertEqual(falcon.HTTP_503, self.srmock.status)
|
||||
|
||||
self.simulate_delete(mars_topic_path, headers=self.headers)
|
||||
self.assertEqual(falcon.HTTP_503, self.srmock.status)
|
@ -676,3 +676,27 @@ class Validator(object):
|
||||
self._limits_conf.max_length_client_id)
|
||||
if self._limits_conf.client_id_uuid_safe == 'strict':
|
||||
uuid.UUID(client_id)
|
||||
|
||||
def topic_identification(self, topic, project):
|
||||
"""Restrictions on a project id & topic name pair.
|
||||
|
||||
:param queue: Name of the topic
|
||||
:param project: Project id
|
||||
:raises ValidationFailed: if the `name` is longer than 64
|
||||
characters or contains anything other than ASCII digits and
|
||||
letters, underscores, and dashes. Also raises if `project`
|
||||
is not None but longer than 256 characters.
|
||||
"""
|
||||
|
||||
if project is not None and len(project) > PROJECT_ID_MAX_LEN:
|
||||
msg = _(u'Project ids may not be more than {0} characters long.')
|
||||
raise ValidationFailed(msg, PROJECT_ID_MAX_LEN)
|
||||
|
||||
if len(topic) > QUEUE_NAME_MAX_LEN:
|
||||
msg = _(u'Topic names may not be more than {0} characters long.')
|
||||
raise ValidationFailed(msg, QUEUE_NAME_MAX_LEN)
|
||||
|
||||
if not QUEUE_NAME_REGEX.match(topic):
|
||||
raise ValidationFailed(
|
||||
_(u'Topic names may only contain ASCII letters, digits, '
|
||||
'underscores, and dashes.'))
|
||||
|
@ -72,6 +72,10 @@ class Driver(transport.DriverBase):
|
||||
return helpers.validate_queue_identification(
|
||||
self._validate.queue_identification, req, resp, params)
|
||||
|
||||
def _validate_topic_identification(self, req, resp, params):
|
||||
return helpers.validate_topic_identification(
|
||||
self._validate.topic_identification, req, resp, params)
|
||||
|
||||
def _require_client_id(self, req, resp, params):
|
||||
return helpers.require_client_id(
|
||||
self._validate.client_id_uuid_safe, req, resp, params)
|
||||
@ -91,7 +95,10 @@ class Driver(transport.DriverBase):
|
||||
helpers.inject_context,
|
||||
|
||||
# NOTE(kgriffs): Depends on project_id being extracted, above
|
||||
self._validate_queue_identification
|
||||
self._validate_queue_identification,
|
||||
|
||||
# NOTE(kgriffs): Depends on project_id being extracted, above
|
||||
self._validate_topic_identification
|
||||
]
|
||||
|
||||
def _init_routes(self):
|
||||
|
@ -24,6 +24,9 @@ from zaqar.transport.wsgi.v2_0 import purge
|
||||
from zaqar.transport.wsgi.v2_0 import queues
|
||||
from zaqar.transport.wsgi.v2_0 import stats
|
||||
from zaqar.transport.wsgi.v2_0 import subscriptions
|
||||
from zaqar.transport.wsgi.v2_0 import topic
|
||||
from zaqar.transport.wsgi.v2_0 import topic_purge
|
||||
from zaqar.transport.wsgi.v2_0 import topic_stats
|
||||
from zaqar.transport.wsgi.v2_0 import urls
|
||||
|
||||
|
||||
@ -52,6 +55,7 @@ def public_endpoints(driver, conf):
|
||||
message_controller = driver._storage.message_controller
|
||||
claim_controller = driver._storage.claim_controller
|
||||
subscription_controller = driver._storage.subscription_controller
|
||||
topic_controller = driver._storage.topic_controller
|
||||
|
||||
defaults = driver._defaults
|
||||
|
||||
@ -119,6 +123,42 @@ def public_endpoints(driver, conf):
|
||||
|
||||
# Pre-Signed URL Endpoint
|
||||
('/queues/{queue_name}/share', urls.Resource(driver)),
|
||||
|
||||
# Topics Endpoints
|
||||
('/topics',
|
||||
topic.CollectionResource(driver._validate, topic_controller)),
|
||||
('/topics/{topic_name}',
|
||||
topic.ItemResource(driver._validate, topic_controller,
|
||||
message_controller)),
|
||||
('/topics/{topic_name}/stats',
|
||||
topic_stats.Resource(topic_controller)),
|
||||
('/topics/{topic_name}/purge',
|
||||
topic_purge.Resource(driver)),
|
||||
# Topic Messages Endpoints
|
||||
('/topics/{topic_name}/messages',
|
||||
messages.CollectionResource(driver._wsgi_conf,
|
||||
driver._validate,
|
||||
message_controller,
|
||||
topic_controller,
|
||||
defaults.message_ttl)),
|
||||
('/topics/{topic_name}/messages/{message_id}',
|
||||
messages.ItemResource(message_controller)),
|
||||
# Topic Subscription Endpoints
|
||||
('/topics/{topic_name}/subscriptions',
|
||||
subscriptions.CollectionResource(driver._validate,
|
||||
subscription_controller,
|
||||
defaults.subscription_ttl,
|
||||
topic_controller,
|
||||
conf)),
|
||||
|
||||
('/topics/{topic_name}/subscriptions/{subscription_id}',
|
||||
subscriptions.ItemResource(driver._validate,
|
||||
subscription_controller)),
|
||||
|
||||
('/topics/{topic_name}/subscriptions/{subscription_id}/confirm',
|
||||
subscriptions.ConfirmResource(driver._validate,
|
||||
subscription_controller,
|
||||
conf)),
|
||||
]
|
||||
|
||||
|
||||
|
333
zaqar/transport/wsgi/v2_0/topic.py
Normal file
333
zaqar/transport/wsgi/v2_0/topic.py
Normal file
@ -0,0 +1,333 @@
|
||||
# Copyright (c) 2019 Rackspace, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import copy
|
||||
import falcon
|
||||
from oslo_log import log as logging
|
||||
import six
|
||||
|
||||
from zaqar.common import decorators
|
||||
from zaqar.i18n import _
|
||||
from zaqar.storage import errors as storage_errors
|
||||
from zaqar.transport import acl
|
||||
from zaqar.transport import utils
|
||||
from zaqar.transport import validation
|
||||
from zaqar.transport.wsgi import errors as wsgi_errors
|
||||
from zaqar.transport.wsgi import utils as wsgi_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _get_reserved_metadata(validate):
|
||||
_reserved_metadata = ['max_messages_post_size', 'default_message_ttl',
|
||||
'default_message_delay']
|
||||
reserved_metadata = {
|
||||
'_%s' % meta:
|
||||
validate.get_limit_conf_value(meta)
|
||||
for meta in _reserved_metadata
|
||||
}
|
||||
|
||||
return reserved_metadata
|
||||
|
||||
|
||||
class ItemResource(object):
|
||||
|
||||
__slots__ = ('_validate', '_topic_controller', '_message_controller',
|
||||
'_reserved_metadata')
|
||||
|
||||
def __init__(self, validate, topic_controller, message_controller):
|
||||
self._validate = validate
|
||||
self._topic_controller = topic_controller
|
||||
self._message_controller = message_controller
|
||||
|
||||
@decorators.TransportLog("Topics item")
|
||||
@acl.enforce("topics:get")
|
||||
def on_get(self, req, resp, project_id, topic_name):
|
||||
try:
|
||||
resp_dict = self._topic_controller.get(topic_name,
|
||||
project=project_id)
|
||||
for meta, value in _get_reserved_metadata(self._validate).items():
|
||||
if not resp_dict.get(meta):
|
||||
resp_dict[meta] = value
|
||||
except storage_errors.DoesNotExist as ex:
|
||||
LOG.debug(ex)
|
||||
raise wsgi_errors.HTTPNotFound(six.text_type(ex))
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _(u'Topic metadata could not be retrieved.')
|
||||
raise wsgi_errors.HTTPServiceUnavailable(description)
|
||||
|
||||
resp.body = utils.to_json(resp_dict)
|
||||
# status defaults to 200
|
||||
|
||||
@decorators.TransportLog("Topics item")
|
||||
@acl.enforce("topics:create")
|
||||
def on_put(self, req, resp, project_id, topic_name):
|
||||
try:
|
||||
# Place JSON size restriction before parsing
|
||||
self._validate.queue_metadata_length(req.content_length)
|
||||
# Deserialize Topic metadata
|
||||
metadata = None
|
||||
if req.content_length:
|
||||
document = wsgi_utils.deserialize(req.stream,
|
||||
req.content_length)
|
||||
metadata = wsgi_utils.sanitize(document)
|
||||
self._validate.queue_metadata_putting(metadata)
|
||||
except validation.ValidationFailed as ex:
|
||||
LOG.debug(ex)
|
||||
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
||||
|
||||
try:
|
||||
created = self._topic_controller.create(topic_name,
|
||||
metadata=metadata,
|
||||
project=project_id)
|
||||
|
||||
except storage_errors.FlavorDoesNotExist as ex:
|
||||
LOG.exception(ex)
|
||||
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _(u'Topic could not be created.')
|
||||
raise wsgi_errors.HTTPServiceUnavailable(description)
|
||||
|
||||
resp.status = falcon.HTTP_201 if created else falcon.HTTP_204
|
||||
resp.location = req.path
|
||||
|
||||
@decorators.TransportLog("Topics item")
|
||||
@acl.enforce("topics:delete")
|
||||
def on_delete(self, req, resp, project_id, topic_name):
|
||||
LOG.debug(u'Topic item DELETE - topic: %(topic)s, '
|
||||
u'project: %(project)s',
|
||||
{'topic': topic_name, 'project': project_id})
|
||||
try:
|
||||
self._topic_controller.delete(topic_name, project=project_id)
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _(u'Topic could not be deleted.')
|
||||
raise wsgi_errors.HTTPServiceUnavailable(description)
|
||||
|
||||
resp.status = falcon.HTTP_204
|
||||
|
||||
@decorators.TransportLog("Topics item")
|
||||
@acl.enforce("topics:update")
|
||||
def on_patch(self, req, resp, project_id, topic_name):
|
||||
"""Allows one to update a topic's metadata.
|
||||
|
||||
This method expects the user to submit a JSON object. There is also
|
||||
strict format checking through the use of
|
||||
jsonschema. Appropriate errors are returned in each case for
|
||||
badly formatted input.
|
||||
|
||||
:returns: HTTP | 200,400,409,503
|
||||
"""
|
||||
LOG.debug(u'PATCH topic - name: %s', topic_name)
|
||||
|
||||
try:
|
||||
# Place JSON size restriction before parsing
|
||||
self._validate.queue_metadata_length(req.content_length)
|
||||
except validation.ValidationFailed as ex:
|
||||
LOG.debug(ex)
|
||||
raise wsgi_errors.HTTPBadRequestBody(six.text_type(ex))
|
||||
|
||||
# NOTE(flwang): See below link to get more details about draft 10,
|
||||
# tools.ietf.org/html/draft-ietf-appsawg-json-patch-10
|
||||
content_types = {
|
||||
'application/openstack-messaging-v2.0-json-patch': 10,
|
||||
}
|
||||
|
||||
if req.content_type not in content_types:
|
||||
headers = {'Accept-Patch':
|
||||
', '.join(sorted(content_types.keys()))}
|
||||
msg = _("Accepted media type for PATCH: %s.")
|
||||
LOG.debug(msg, headers)
|
||||
raise wsgi_errors.HTTPUnsupportedMediaType(msg % headers)
|
||||
|
||||
if req.content_length:
|
||||
try:
|
||||
changes = utils.read_json(req.stream, req.content_length)
|
||||
changes = wsgi_utils.sanitize(changes, doctype=list)
|
||||
except utils.MalformedJSON as ex:
|
||||
LOG.debug(ex)
|
||||
description = _(u'Request body could not be parsed.')
|
||||
raise wsgi_errors.HTTPBadRequestBody(description)
|
||||
|
||||
except utils.OverflowedJSONInteger as ex:
|
||||
LOG.debug(ex)
|
||||
description = _(u'JSON contains integer that is too large.')
|
||||
raise wsgi_errors.HTTPBadRequestBody(description)
|
||||
|
||||
except Exception as ex:
|
||||
# Error while reading from the network/server
|
||||
LOG.exception(ex)
|
||||
description = _(u'Request body could not be read.')
|
||||
raise wsgi_errors.HTTPServiceUnavailable(description)
|
||||
else:
|
||||
msg = _("PATCH body could not be empty for update.")
|
||||
LOG.debug(msg)
|
||||
raise wsgi_errors.HTTPBadRequestBody(msg)
|
||||
|
||||
try:
|
||||
changes = self._validate.queue_patching(req, changes)
|
||||
|
||||
# NOTE(Eva-i): using 'get_metadata' instead of 'get', so
|
||||
# QueueDoesNotExist error will be thrown in case of non-existent
|
||||
# queue.
|
||||
metadata = self._topic_controller.get_metadata(topic_name,
|
||||
project=project_id)
|
||||
reserved_metadata = _get_reserved_metadata(self._validate)
|
||||
for change in changes:
|
||||
change_method_name = '_do_%s' % change['op']
|
||||
change_method = getattr(self, change_method_name)
|
||||
change_method(req, metadata, reserved_metadata, change)
|
||||
|
||||
self._validate.queue_metadata_putting(metadata)
|
||||
|
||||
self._topic_controller.set_metadata(topic_name,
|
||||
metadata,
|
||||
project_id)
|
||||
except storage_errors.DoesNotExist as ex:
|
||||
LOG.debug(ex)
|
||||
raise wsgi_errors.HTTPNotFound(six.text_type(ex))
|
||||
except validation.ValidationFailed as ex:
|
||||
LOG.debug(ex)
|
||||
raise wsgi_errors.HTTPBadRequestBody(six.text_type(ex))
|
||||
except wsgi_errors.HTTPConflict as ex:
|
||||
raise ex
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _(u'Topic could not be updated.')
|
||||
raise wsgi_errors.HTTPServiceUnavailable(description)
|
||||
for meta, value in _get_reserved_metadata(self._validate).items():
|
||||
if not metadata.get(meta):
|
||||
metadata[meta] = value
|
||||
resp.body = utils.to_json(metadata)
|
||||
|
||||
def _do_replace(self, req, metadata, reserved_metadata, change):
|
||||
path = change['path']
|
||||
path_child = path[1]
|
||||
value = change['value']
|
||||
if path_child in metadata or path_child in reserved_metadata:
|
||||
metadata[path_child] = value
|
||||
else:
|
||||
msg = _("Can't replace non-existent object %s.")
|
||||
raise wsgi_errors.HTTPConflict(msg % path_child)
|
||||
|
||||
def _do_add(self, req, metadata, reserved_metadata, change):
|
||||
path = change['path']
|
||||
path_child = path[1]
|
||||
value = change['value']
|
||||
metadata[path_child] = value
|
||||
|
||||
def _do_remove(self, req, metadata, reserved_metadata, change):
|
||||
path = change['path']
|
||||
path_child = path[1]
|
||||
if path_child in metadata:
|
||||
metadata.pop(path_child)
|
||||
elif path_child not in reserved_metadata:
|
||||
msg = _("Can't remove non-existent object %s.")
|
||||
raise wsgi_errors.HTTPConflict(msg % path_child)
|
||||
|
||||
|
||||
class CollectionResource(object):
|
||||
|
||||
__slots__ = ('_topic_controller', '_validate', '_reserved_metadata')
|
||||
|
||||
def __init__(self, validate, topic_controller):
|
||||
self._topic_controller = topic_controller
|
||||
self._validate = validate
|
||||
|
||||
def _topic_list(self, project_id, path, kfilter, **kwargs):
|
||||
try:
|
||||
self._validate.queue_listing(**kwargs)
|
||||
results = self._topic_controller.list(project=project_id,
|
||||
kfilter=kfilter, **kwargs)
|
||||
|
||||
# Buffer list of topics
|
||||
topics = list(next(results))
|
||||
|
||||
except validation.ValidationFailed as ex:
|
||||
LOG.debug(ex)
|
||||
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _(u'Topics could not be listed.')
|
||||
raise wsgi_errors.HTTPServiceUnavailable(description)
|
||||
|
||||
# Got some. Prepare the response.
|
||||
kwargs['marker'] = next(results) or kwargs.get('marker', '')
|
||||
reserved_metadata = _get_reserved_metadata(self._validate).items()
|
||||
for each_topic in topics:
|
||||
each_topic['href'] = path + '/' + each_topic['name']
|
||||
if kwargs.get('detailed'):
|
||||
for meta, value in reserved_metadata:
|
||||
if not each_topic.get('metadata', {}).get(meta):
|
||||
each_topic['metadata'][meta] = value
|
||||
|
||||
return topics, kwargs['marker']
|
||||
|
||||
def _on_get_with_kfilter(self, req, resp, project_id, kfilter={}):
|
||||
kwargs = {}
|
||||
|
||||
# NOTE(kgriffs): This syntax ensures that
|
||||
# we don't clobber default values with None.
|
||||
req.get_param('marker', store=kwargs)
|
||||
req.get_param_as_int('limit', store=kwargs)
|
||||
req.get_param_as_bool('detailed', store=kwargs)
|
||||
req.get_param('name', store=kwargs)
|
||||
|
||||
topics, marker = self._topic_list(project_id,
|
||||
req.path, kfilter, **kwargs)
|
||||
|
||||
links = []
|
||||
kwargs['marker'] = marker
|
||||
if topics:
|
||||
links = [
|
||||
{
|
||||
'rel': 'next',
|
||||
'href': req.path + falcon.to_query_str(kwargs)
|
||||
}
|
||||
]
|
||||
|
||||
response_body = {
|
||||
'topics': topics,
|
||||
'links': links
|
||||
}
|
||||
|
||||
resp.body = utils.to_json(response_body)
|
||||
# status defaults to 200
|
||||
|
||||
@decorators.TransportLog("Topics collection")
|
||||
@acl.enforce("topics:get_all")
|
||||
def on_get(self, req, resp, project_id):
|
||||
field = ('marker', 'limit', 'detailed', 'name')
|
||||
kfilter = copy.deepcopy(req.params)
|
||||
|
||||
for key in req.params.keys():
|
||||
if key in field:
|
||||
kfilter.pop(key)
|
||||
|
||||
kfilter = kfilter if len(kfilter) > 0 else {}
|
||||
for key in kfilter.keys():
|
||||
# Since we get the filter value from URL, so need to
|
||||
# turn the string to integer if using integer filter value.
|
||||
try:
|
||||
kfilter[key] = int(kfilter[key])
|
||||
except ValueError:
|
||||
continue
|
||||
self._on_get_with_kfilter(req, resp, project_id, kfilter)
|
||||
# status defaults to 200
|
82
zaqar/transport/wsgi/v2_0/topic_purge.py
Normal file
82
zaqar/transport/wsgi/v2_0/topic_purge.py
Normal file
@ -0,0 +1,82 @@
|
||||
# Copyright 2019 Catalyst IT Ltd.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
||||
# use this file except in compliance with the License. You may obtain a copy
|
||||
# of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations under
|
||||
# the License.
|
||||
|
||||
import falcon
|
||||
|
||||
from oslo_log import log as logging
|
||||
import six
|
||||
|
||||
from zaqar.common import decorators
|
||||
from zaqar.i18n import _
|
||||
from zaqar.transport import acl
|
||||
from zaqar.transport import validation
|
||||
from zaqar.transport.wsgi import errors as wsgi_errors
|
||||
from zaqar.transport.wsgi import utils as wsgi_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Resource(object):
|
||||
|
||||
__slots__ = ('_driver', '_conf',
|
||||
'_message_ctrl', '_subscription_ctrl', '_validate')
|
||||
|
||||
def __init__(self, driver):
|
||||
self._driver = driver
|
||||
self._conf = driver._conf
|
||||
self._message_ctrl = driver._storage.message_controller
|
||||
self._subscription_ctrl = driver._storage.subscription_controller
|
||||
self._validate = driver._validate
|
||||
|
||||
@decorators.TransportLog("Topics item")
|
||||
@acl.enforce("topics:purge")
|
||||
def on_post(self, req, resp, project_id, topic_name):
|
||||
try:
|
||||
if req.content_length:
|
||||
document = wsgi_utils.deserialize(req.stream,
|
||||
req.content_length)
|
||||
self._validate.queue_purging(document)
|
||||
else:
|
||||
document = {'resource_types': ['messages', 'subscriptions']}
|
||||
except validation.ValidationFailed as ex:
|
||||
LOG.debug(ex)
|
||||
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
||||
|
||||
try:
|
||||
if "messages" in document['resource_types']:
|
||||
pop_limit = 100
|
||||
LOG.debug("Purge all messages under topic %s", topic_name)
|
||||
messages = self._message_ctrl.pop(topic_name, pop_limit,
|
||||
project=project_id)
|
||||
while messages:
|
||||
messages = self._message_ctrl.pop(topic_name, pop_limit,
|
||||
project=project_id)
|
||||
|
||||
if "subscriptions" in document['resource_types']:
|
||||
LOG.debug("Purge all subscriptions under topic %s", topic_name)
|
||||
results = self._subscription_ctrl.list(topic_name,
|
||||
project=project_id)
|
||||
subscriptions = list(next(results))
|
||||
for sub in subscriptions:
|
||||
self._subscription_ctrl.delete(topic_name,
|
||||
sub['id'],
|
||||
project=project_id)
|
||||
except ValueError as err:
|
||||
raise wsgi_errors.HTTPBadRequestAPI(str(err))
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _(u'Topic could not be purged.')
|
||||
raise wsgi_errors.HTTPServiceUnavailable(description)
|
||||
|
||||
resp.status = falcon.HTTP_204
|
78
zaqar/transport/wsgi/v2_0/topic_stats.py
Normal file
78
zaqar/transport/wsgi/v2_0/topic_stats.py
Normal file
@ -0,0 +1,78 @@
|
||||
# Copyright (c) 2019 Rackspace, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_log import log as logging
|
||||
import six
|
||||
|
||||
from zaqar.common import decorators
|
||||
from zaqar.i18n import _
|
||||
from zaqar.storage import errors as storage_errors
|
||||
from zaqar.transport import acl
|
||||
from zaqar.transport import utils
|
||||
from zaqar.transport.wsgi import errors as wsgi_errors
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Resource(object):
|
||||
|
||||
__slots__ = '_topic_ctrl'
|
||||
|
||||
def __init__(self, topic_controller):
|
||||
self._topic_ctrl = topic_controller
|
||||
|
||||
@decorators.TransportLog("Topics stats item")
|
||||
@acl.enforce("topics:stats")
|
||||
def on_get(self, req, resp, project_id, topic_name):
|
||||
try:
|
||||
resp_dict = self._topic_ctrl.stats(topic_name,
|
||||
project=project_id)
|
||||
|
||||
message_stats = resp_dict['messages']
|
||||
|
||||
if message_stats['total'] != 0:
|
||||
base_path = req.path[:req.path.rindex('/')] + '/messages/'
|
||||
|
||||
newest = message_stats['newest']
|
||||
newest['href'] = base_path + newest['id']
|
||||
del newest['id']
|
||||
|
||||
oldest = message_stats['oldest']
|
||||
oldest['href'] = base_path + oldest['id']
|
||||
del oldest['id']
|
||||
|
||||
resp.body = utils.to_json(resp_dict)
|
||||
# status defaults to 200
|
||||
|
||||
except (storage_errors.TopicDoesNotExist,
|
||||
storage_errors.TopicIsEmpty) as ex:
|
||||
resp_dict = {
|
||||
'messages': {
|
||||
'claimed': 0,
|
||||
'free': 0,
|
||||
'total': 0
|
||||
}
|
||||
}
|
||||
resp.body = utils.to_json(resp_dict)
|
||||
|
||||
except storage_errors.DoesNotExist as ex:
|
||||
LOG.debug(ex)
|
||||
raise wsgi_errors.HTTPNotFound(six.text_type(ex))
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _(u'Topic stats could not be read.')
|
||||
raise wsgi_errors.HTTPServiceUnavailable(description)
|
Loading…
Reference in New Issue
Block a user