added /streams support

This commit is contained in:
Sandy Walsh 2014-09-04 15:38:45 +00:00
parent f176d68ad8
commit d9c1f5b778
4 changed files with 118 additions and 35 deletions

View File

@ -13,6 +13,10 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import ConfigParser
import traceback
import falcon import falcon
import simport import simport
@ -24,11 +28,11 @@ class NotImplemented(Exception):
pass pass
def _load_implementations(impl_map, versions, config): def _load_implementations(impl_map, versions, config, scratchpad):
for version in versions: for version in versions:
target = config.get('v%d_impl' % version) target = config.get('global', 'v%d_impl' % version)
klass = simport.load(target) klass = simport.load(target)
impl_map[version] = klass() impl_map[version] = klass(config, scratchpad)
def _initialize(enabled_versions, implementation_map): def _initialize(enabled_versions, implementation_map):
@ -45,32 +49,58 @@ def _initialize(enabled_versions, implementation_map):
if not impl: if not impl:
raise NotImplemented("No implementation available for Quincy" raise NotImplemented("No implementation available for Quincy"
" version %d" % version) " version %d" % version)
print "Version %d using %s" % (version, impl)
routes.append(klass(version, api, impl)) routes.append(klass(version, api, impl))
# TODO(sandy): We need to create the /v1 # TODO(sandy): We need to create the top-level /v1, ... /vN
# ...
# /vN
# resources here too. # resources here too.
return api return api
# There may have been prior versions def _get_api(config_location=None):
# but they could be deprecated and dropped. print "Using config_location=%s (None means default impl)" % config_location
# Only the versions specified here define
# the currently supported StackTach.v3 API.
enabled_versions = [1, 2]
# The default implementation is internal and works with # The default implementation is internal and works with
# a fake/static set of data. # a fake/static set of data.
local_config = {'v1_impl': 'v1_impl:Impl', local_config = ConfigParser.ConfigParser()
'v2_impl': 'v2_impl:Impl'} local_config.add_section('global')
local_config.set('global', 'v1_impl', 'v1_impl:Impl')
local_config.set('global', 'v2_impl', 'v2_impl:Impl')
impl_map = {} # There may have been prior versions
_load_implementations(impl_map, enabled_versions, local_config) # but they could be deprecated and dropped.
# Only the versions specified here define
# the currently supported StackTach.v3 API.
enabled_versions = [1, 2]
# TODO(sandy): Overlay the impl_map with the implementations if config_location:
# specified in the config file. config = ConfigParser.ConfigParser()
# config = ... config.read(config_location)
# _load_implementations(impl_map, enabled_versions, config) enabled_versions = [int(x) for x in
config.get('global', 'enabled_versions')
.split(',')]
api = _initialize(enabled_versions, impl_map) # Rather than every implementation duplicate resources, the
# scratchpad is a shared storage area all the implementations
# can use to share things (like pipeline drivers, etc).
scratchpad = {}
impl_map = {}
_load_implementations(impl_map, enabled_versions, local_config,
scratchpad)
if config_location:
# Overlay the impl_map with the implementations
# specified in the config file.
_load_implementations(impl_map, enabled_versions, config,
scratchpad)
return _initialize(enabled_versions, impl_map)
def get_api(config_location=None):
try:
return _get_api(config_location)
except Exception as e:
print "Error getting API:", traceback.format_exc()
return None

View File

@ -15,6 +15,8 @@
import json import json
from dateutil import parser
import common import common
@ -29,20 +31,45 @@ class StreamCollection(common.FalconBase):
# younger_than # younger_than
# state # state
# trigger_name # trigger_name
# id
# distinquishing_traits - find stream by dtrait values. # distinquishing_traits - find stream by dtrait values.
# #
# Actions on a Stream: # Actions on a Stream:
# details - get full details on stream (including distriquishing traits) # details - get full details on stream (including events &
# events - get the events collected for this stream. # distriquishing traits)
def on_get(self, req, resp): def on_get(self, req, resp):
streams = self.impl.get_streams(resp) older_than = req.get_param('older_than')
dicts = [stream.to_dict() for stream in streams] younger_than = req.get_param('younger_than')
resp.body = json.dumps(dicts) state = req.get_param('state')
trigger = req.get_param('trigger_name')
traits = req.get_param('distinquishing_traits')
if older_than:
older_than = parser.parse(older_than)
if younger_than:
younger_than = parser.parse(younger_than)
streams = self.impl.get_streams(older_than=older_than,
younger_than=younger_than,
state=state,
trigger_name=trigger,
distinquishing_traits=traits)
resp.body = json.dumps(streams)
class StreamItem(common.FalconBase): class StreamItem(common.FalconBase):
pass def on_get(self, req, resp, stream_id, action=None):
details = action == 'details'
stream = self.impl.get_stream(stream_id, details)
resp.body = json.dumps(stream.to_dict())
def on_delete(self, req, resp, stream_id):
self.impl.delete_stream(stream_id)
def on_put(self, req, resp, stream_id, action=None):
reset = action == 'reset'
self.impl.reset_stream(stream_id)
class Schema(object): class Schema(object):
@ -61,3 +88,5 @@ class Schema(object):
self.stream_collection) self.stream_collection)
self.api.add_route('%s/streams/{stream_id}' % self._v(), self.api.add_route('%s/streams/{stream_id}' % self._v(),
self.stream_item) self.stream_item)
self.api.add_route('%s/streams/{stream_id}/{action}' % self._v(),
self.stream_item)

View File

@ -23,17 +23,40 @@ class Stream(object):
self.stream_id = stream_id self.stream_id = stream_id
self.trigger_name = trigger_name self.trigger_name = trigger_name
self.state = state self.state = state
self.distinquishing_traits = []
def to_dict(self): def to_dict(self):
return {"last_updated": str(self.last_updated), return {"last_updated": str(self.last_updated),
"stream_id": self.stream_id, "stream_id": self.stream_id,
"trigger_name": self.trigger_name, "trigger_name": self.trigger_name,
"state": self.state} "state": self.state,
"distinquishing_traits": self.distinquishing_traits}
class Impl(object): class Impl(object):
def get_stream(self, resp): def __init__(self, config, scratchpad):
sid = str(uuid.uuid4()) self.config = config
return [Stream(sid, "EOD-Exists", "Collecting") self.scratchpad = scratchpad
Stream(sid, "EOD-Exists", "Error")
Stream(sid, "Request-ID", "Ready")] def get_streams(self, **kwargs):
"""kwargs may be:
older_than
younger_than
state
trigger_name
distinquishing_traits
"""
x = [Stream(str(uuid.uuid4()), "EOD-Exists", "Collecting"),
Stream(str(uuid.uuid4()), "EOD-Exists", "Error"),
Stream(str(uuid.uuid4()), "Request-ID", "Ready")]
return [stream.to_dict() for stream in x]
def get_stream(self, stream_id, details):
return Stream(str(uuid.uuid4()), "Request-ID", "Ready")
def delete_stream(self, stream_id):
pass
def reset_stream(self, stream_id):
pass

View File

@ -1,2 +1,3 @@
python-dateutil
falcon falcon
simport >= 0.0.dev0 simport >= 0.0.dev0