stacktach-notigen/notigen/__init__.py
2014-08-19 07:21:40 +00:00

254 lines
9.9 KiB
Python

# Copyright (c) 2014 Dark Secret Software Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""OpenStack-like Notification Generation Library
Originally built from work done in
https://github.com/SandyWalsh/twobillion
"""
import datetime
import heapq
import json
import operator
import os
import os.path
import uuid as uuidlib
import random
import sys
import time
import uuid
import dateutil.parser
class EventGenerator(object):
def __init__(self, template_dir, operations_per_hour=1, exists_hours=24):
self.exists_hours = exists_hours # num hours between .exists
self.instances = {} # { uuid: compute_node }
# Many actions can be performed concurrently.
# We might start working on instance #1 and then, while that
# effort is still underway, start doing something with instances
# #2, 3 and 4. They need to interleave each other.
#
# An "action", below, is a list of each of the steps necessary
# to perform that operation, but with a time component relative to
# the starting time passed in.
# It's our responsibility to fire off the events when sufficient "time"
# has passed.
#
# The thing we don't want to have to deal with is overlapping commands
# (like instance.delete starting while instance.create is still underway)
# That's too much headache.
operations_per_second = float(operations_per_hour) / 3600.0
# An operation will happen every so many milliseconds to
# get our operations/sec. We call this a Tick.
self.millisecond_per_tick = 1000.0 / float(operations_per_second)
#print "Operation every %d ms (%.1f/sec)" % (self.millisecond_per_tick,
# operations_per_second)
self.next_events = [] # priority queue
self.instances_in_use = set()
now = datetime.datetime.utcnow()
self.tick = now + datetime.timedelta(
milliseconds=self.millisecond_per_tick)
# When were last .exists sent?
self.last_exists = now.replace(hour=0, minute=0, second=0, microsecond=0)
# Load all the templates ...
template_filenames = [f for f in os.listdir(template_dir)
if os.path.isfile(os.path.join(
template_dir, f)) and
f[-5:] == ".json"]
self.templates = []
for filename in template_filenames:
with open(os.path.join(template_dir, filename), "r") as f:
print filename
template = json.load(f)
# Keep it as a raw string to make replacements easier ...
raw = json.dumps(template[1:], sort_keys=True, indent=4)
if filename == "eod_exists.json":
self.exists_template = (template[0], raw)
else:
operation = filename[:filename.index('_')]
self.templates.append((operation, template[0], raw))
def move_to_next_tick(self, now):
return now + datetime.timedelta(milliseconds=self.millisecond_per_tick)
def generate(self, now):
self._add_new_sequence(now)
return self._get_ready_events(now)
def _add_new_sequence(self, now):
"""Add a new operation to the queue.
This is the entire sequence of events going into
the future. They will be interwoven with other
future events and pumped out in proper (interleaving)
order."""
if now >= self.tick:
context, sequence = self._get_sequence(now)
for idx, when_event in enumerate(sequence):
when, event = when_event
event['____context____'] = context # delete before returning
# (when, is_first_event, is_last_event)
heapq.heappush(self.next_events,
(when, event, idx==0, idx==len(sequence)-1))
self.tick = self.move_to_next_tick(now)
return now
def _get_ready_events(self, now):
"""Pump out the events that are due now."""
ready = []
while True:
if not self.next_events:
return ready
when, event, start, end = self.next_events[0] # peek
if when > now:
break
when, event, start, end = heapq.heappop(self.next_events)
context = event['____context____']
operation = context['operation']
uuid = context['instance_id']
if end:
if operation == 'compute.instance.delete.start':
try:
self.instances_in_use.remove(uuid)
except KeyError:
pass
else:
self.instances_in_use.add(uuid)
del event['____context____']
ready.append(event)
if (now - self.last_exists).days > 0:
flattened = now.replace(hour=0, minute=0, second=0, microsecond=0)
self.last_exists = flattened
audit_period_start = flattened - datetime.timedelta(days=1)
audit_period_end = flattened
operation = "eod-exists"
context_hints, template = self.exists_template
for instance in self.instances_in_use:
context, sequence = self._make_sequence_from_template(now,
operation, context_hints, template)
for when, event in sequence:
payload = event['payload']
payload['audit_period_beginning'] = str(audit_period_start)
payload['audit_period_ending'] = str(audit_period_end)
payload['instance_id'] = instance
ready.append(event)
self.instances_in_use = set()
return ready
def _get_sequence(self, now):
"""Grab a template and make a sequence from it.
"""
operation, context_hints, template = random.choice(self.templates)
return self._make_sequence_from_template(now, operation,
context_hints, template)
def _make_sequence_from_template(self, now, operation, context_hints,
template):
sequence = []
context = {}
time_map = context_hints['time_map']
for key, values in time_map.iteritems():
context[key] = str(now + datetime.timedelta(days=values[0],
seconds=values[1],
milliseconds=values[2]))
for num in range(context_hints['uuid']):
context["[[[[UUID_%d]]]]" % num] = str(uuid.uuid4())
for num in range(context_hints['xuuid']):
u = str(uuid.uuid4()).replace("-", "")
context["[[[[XUUID_%d]]]]" % num] = u
for num in range(context_hints['v4']):
nums = [127 + random.randrange(127) for x in range(4)]
v4 = "%d.%d.%d.%d" % tuple(nums)
context["[[[[V4_%d]]]]" % num] = v4
for num in range(context_hints['v6']):
res = [hex(random.randint(0, 65535))[2:].zfill(4)
for i in range(0, 8)]
v6 = ":".join(res)
context["[[[[V6_%d]]]]" % num] = v6
# The rest of the context ...
context["[[[[tenant_id]]]]"] = str(100000 + random.randrange(899999))
context["[[[[user_id]]]]"] = str(100000 + random.randrange(899999))
context["[[[[display_name]]]]"] = "Instance_%d" % random.randrange(
899999)
context["[[[[host]]]]"] = "host-%d" % random.randrange(899999)
context["[[[[hostname]]]]"] = "server-%d" % random.randrange(899999)
context["[[[[node]]]]"] = "node-%d" % random.randrange(899999)
context["[[[[reservation_id]]]]"] = "res-%d" % random.randrange(899999)
context["[[[[image_name]]]]"] = "image-%d" % random.randrange(899999)
context["[[[[device_name]]]]"] = "device-%d" % random.randrange(899999)
context["[[[[publisher_id]]]]"] = "publisher-%d" % random.randrange(
899999)
for key, value in context.iteritems():
template = template.replace(key, value)
struct = json.loads(template)
instance_id = None
for event in struct:
inst_id = event['payload'].get('instance_id')
if inst_id:
if instance_id and instance_id != inst_id:
print "changing instance id", instance_id, inst_id
instance_id = inst_id
when = dateutil.parser.parse(event['timestamp'])
sequence.append((when, event))
context['instance_id'] = instance_id
context['operation'] = operation
return context, sorted(sequence)
if __name__ == '__main__':
real_time = False
g = EventGenerator("templates", 1)
now = datetime.datetime.utcnow()
print "starting at", now
end = now + datetime.timedelta(days=1) # ensure .exists get generated
start = now
nevents = 0
while now < end:
e = g.generate(now)
if e:
nevents += len(e)
if real_time:
now = datetime.datetime.utcnow()
else:
now = g.move_to_next_tick(now)
print "Elapsed: ", datetime.datetime.utcnow() - start