From d9c1f5b7784254e76c6867d26d27794d348727a5 Mon Sep 17 00:00:00 2001 From: Sandy Walsh Date: Thu, 4 Sep 2014 15:38:45 +0000 Subject: [PATCH] added /streams support --- quincy/api.py | 74 +++++++++++++++++++++++++++++++++-------------- quincy/v1_api.py | 43 ++++++++++++++++++++++----- quincy/v1_impl.py | 35 ++++++++++++++++++---- requirements.txt | 1 + 4 files changed, 118 insertions(+), 35 deletions(-) diff --git a/quincy/api.py b/quincy/api.py index d9db894..64d2e47 100644 --- a/quincy/api.py +++ b/quincy/api.py @@ -13,6 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import ConfigParser +import traceback + + import falcon import simport @@ -24,11 +28,11 @@ class NotImplemented(Exception): pass -def _load_implementations(impl_map, versions, config): +def _load_implementations(impl_map, versions, config, scratchpad): for version in versions: - target = config.get('v%d_impl' % version) + target = config.get('global', 'v%d_impl' % version) klass = simport.load(target) - impl_map[version] = klass() + impl_map[version] = klass(config, scratchpad) def _initialize(enabled_versions, implementation_map): @@ -45,32 +49,58 @@ def _initialize(enabled_versions, implementation_map): if not impl: raise NotImplemented("No implementation available for Quincy" " version %d" % version) + print "Version %d using %s" % (version, impl) routes.append(klass(version, api, impl)) - # TODO(sandy): We need to create the /v1 - # ... - # /vN + # TODO(sandy): We need to create the top-level /v1, ... /vN # resources here too. return api -# There may have been prior versions -# but they could be deprecated and dropped. -# Only the versions specified here define -# the currently supported StackTach.v3 API. -enabled_versions = [1, 2] +def _get_api(config_location=None): + print "Using config_location=%s (None means default impl)" % config_location -# The default implementation is internal and works with -# a fake/static set of data. -local_config = {'v1_impl': 'v1_impl:Impl', - 'v2_impl': 'v2_impl:Impl'} + # The default implementation is internal and works with + # a fake/static set of data. + local_config = ConfigParser.ConfigParser() + local_config.add_section('global') + local_config.set('global', 'v1_impl', 'v1_impl:Impl') + local_config.set('global', 'v2_impl', 'v2_impl:Impl') -impl_map = {} -_load_implementations(impl_map, enabled_versions, local_config) + # There may have been prior versions + # but they could be deprecated and dropped. + # Only the versions specified here define + # the currently supported StackTach.v3 API. + enabled_versions = [1, 2] -# TODO(sandy): Overlay the impl_map with the implementations -# specified in the config file. -# config = ... -# _load_implementations(impl_map, enabled_versions, config) + if config_location: + config = ConfigParser.ConfigParser() + config.read(config_location) + enabled_versions = [int(x) for x in + config.get('global', 'enabled_versions') + .split(',')] -api = _initialize(enabled_versions, impl_map) + # Rather than every implementation duplicate resources, the + # scratchpad is a shared storage area all the implementations + # can use to share things (like pipeline drivers, etc). + scratchpad = {} + impl_map = {} + _load_implementations(impl_map, enabled_versions, local_config, + scratchpad) + + if config_location: + # Overlay the impl_map with the implementations + # specified in the config file. + _load_implementations(impl_map, enabled_versions, config, + scratchpad) + + + return _initialize(enabled_versions, impl_map) + + +def get_api(config_location=None): + try: + return _get_api(config_location) + except Exception as e: + print "Error getting API:", traceback.format_exc() + return None diff --git a/quincy/v1_api.py b/quincy/v1_api.py index 9b79f37..c04679e 100644 --- a/quincy/v1_api.py +++ b/quincy/v1_api.py @@ -15,6 +15,8 @@ import json +from dateutil import parser + import common @@ -29,20 +31,45 @@ class StreamCollection(common.FalconBase): # younger_than # state # trigger_name - # id # distinquishing_traits - find stream by dtrait values. # # Actions on a Stream: - # details - get full details on stream (including distriquishing traits) - # events - get the events collected for this stream. + # details - get full details on stream (including events & + # distriquishing traits) + def on_get(self, req, resp): - streams = self.impl.get_streams(resp) - dicts = [stream.to_dict() for stream in streams] - resp.body = json.dumps(dicts) + older_than = req.get_param('older_than') + younger_than = req.get_param('younger_than') + state = req.get_param('state') + trigger = req.get_param('trigger_name') + traits = req.get_param('distinquishing_traits') + + if older_than: + older_than = parser.parse(older_than) + + if younger_than: + younger_than = parser.parse(younger_than) + + streams = self.impl.get_streams(older_than=older_than, + younger_than=younger_than, + state=state, + trigger_name=trigger, + distinquishing_traits=traits) + resp.body = json.dumps(streams) class StreamItem(common.FalconBase): - pass + def on_get(self, req, resp, stream_id, action=None): + details = action == 'details' + stream = self.impl.get_stream(stream_id, details) + resp.body = json.dumps(stream.to_dict()) + + def on_delete(self, req, resp, stream_id): + self.impl.delete_stream(stream_id) + + def on_put(self, req, resp, stream_id, action=None): + reset = action == 'reset' + self.impl.reset_stream(stream_id) class Schema(object): @@ -61,3 +88,5 @@ class Schema(object): self.stream_collection) self.api.add_route('%s/streams/{stream_id}' % self._v(), self.stream_item) + self.api.add_route('%s/streams/{stream_id}/{action}' % self._v(), + self.stream_item) diff --git a/quincy/v1_impl.py b/quincy/v1_impl.py index 96b8016..8234e77 100644 --- a/quincy/v1_impl.py +++ b/quincy/v1_impl.py @@ -23,17 +23,40 @@ class Stream(object): self.stream_id = stream_id self.trigger_name = trigger_name self.state = state + self.distinquishing_traits = [] def to_dict(self): return {"last_updated": str(self.last_updated), "stream_id": self.stream_id, "trigger_name": self.trigger_name, - "state": self.state} + "state": self.state, + "distinquishing_traits": self.distinquishing_traits} class Impl(object): - def get_stream(self, resp): - sid = str(uuid.uuid4()) - return [Stream(sid, "EOD-Exists", "Collecting") - Stream(sid, "EOD-Exists", "Error") - Stream(sid, "Request-ID", "Ready")] + def __init__(self, config, scratchpad): + self.config = config + self.scratchpad = scratchpad + + def get_streams(self, **kwargs): + """kwargs may be: + older_than + younger_than + state + trigger_name + distinquishing_traits + """ + x = [Stream(str(uuid.uuid4()), "EOD-Exists", "Collecting"), + Stream(str(uuid.uuid4()), "EOD-Exists", "Error"), + Stream(str(uuid.uuid4()), "Request-ID", "Ready")] + + return [stream.to_dict() for stream in x] + + def get_stream(self, stream_id, details): + return Stream(str(uuid.uuid4()), "Request-ID", "Ready") + + def delete_stream(self, stream_id): + pass + + def reset_stream(self, stream_id): + pass diff --git a/requirements.txt b/requirements.txt index b3e647d..d82fa8e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ +python-dateutil falcon simport >= 0.0.dev0