254 lines
9.9 KiB
Python
254 lines
9.9 KiB
Python
# Copyright (c) 2014 Dark Secret Software Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""OpenStack-like Notification Generation Library
|
|
|
|
Originally built from work done in
|
|
https://github.com/SandyWalsh/twobillion
|
|
"""
|
|
|
|
import datetime
|
|
import heapq
|
|
import json
|
|
import operator
|
|
import os
|
|
import os.path
|
|
import uuid as uuidlib
|
|
import random
|
|
import sys
|
|
import time
|
|
import uuid
|
|
|
|
import dateutil.parser
|
|
|
|
|
|
class EventGenerator(object):
|
|
def __init__(self, template_dir, operations_per_hour=1, exists_hours=24):
|
|
self.exists_hours = exists_hours # num hours between .exists
|
|
self.instances = {} # { uuid: compute_node }
|
|
|
|
# Many actions can be performed concurrently.
|
|
# We might start working on instance #1 and then, while that
|
|
# effort is still underway, start doing something with instances
|
|
# #2, 3 and 4. They need to interleave each other.
|
|
#
|
|
# An "action", below, is a list of each of the steps necessary
|
|
# to perform that operation, but with a time component relative to
|
|
# the starting time passed in.
|
|
# It's our responsibility to fire off the events when sufficient "time"
|
|
# has passed.
|
|
#
|
|
# The thing we don't want to have to deal with is overlapping commands
|
|
# (like instance.delete starting while instance.create is still underway)
|
|
# That's too much headache.
|
|
|
|
operations_per_second = float(operations_per_hour) / 3600.0
|
|
|
|
# An operation will happen every so many milliseconds to
|
|
# get our operations/sec. We call this a Tick.
|
|
self.millisecond_per_tick = 1000.0 / float(operations_per_second)
|
|
#print "Operation every %d ms (%.1f/sec)" % (self.millisecond_per_tick,
|
|
# operations_per_second)
|
|
self.next_events = [] # priority queue
|
|
self.instances_in_use = set()
|
|
|
|
now = datetime.datetime.utcnow()
|
|
self.tick = now + datetime.timedelta(
|
|
milliseconds=self.millisecond_per_tick)
|
|
|
|
# When were last .exists sent?
|
|
self.last_exists = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
|
|
# Load all the templates ...
|
|
template_filenames = [f for f in os.listdir(template_dir)
|
|
if os.path.isfile(os.path.join(
|
|
template_dir, f)) and
|
|
f[-5:] == ".json"]
|
|
|
|
self.templates = []
|
|
for filename in template_filenames:
|
|
with open(os.path.join(template_dir, filename), "r") as f:
|
|
print filename
|
|
template = json.load(f)
|
|
# Keep it as a raw string to make replacements easier ...
|
|
raw = json.dumps(template[1:], sort_keys=True, indent=4)
|
|
if filename == "eod_exists.json":
|
|
self.exists_template = (template[0], raw)
|
|
else:
|
|
operation = filename[:filename.index('_')]
|
|
self.templates.append((operation, template[0], raw))
|
|
|
|
|
|
def move_to_next_tick(self, now):
|
|
return now + datetime.timedelta(milliseconds=self.millisecond_per_tick)
|
|
|
|
def generate(self, now):
|
|
self._add_new_sequence(now)
|
|
return self._get_ready_events(now)
|
|
|
|
def _add_new_sequence(self, now):
|
|
"""Add a new operation to the queue.
|
|
This is the entire sequence of events going into
|
|
the future. They will be interwoven with other
|
|
future events and pumped out in proper (interleaving)
|
|
order."""
|
|
if now >= self.tick:
|
|
context, sequence = self._get_sequence(now)
|
|
for idx, when_event in enumerate(sequence):
|
|
when, event = when_event
|
|
event['____context____'] = context # delete before returning
|
|
# (when, is_first_event, is_last_event)
|
|
heapq.heappush(self.next_events,
|
|
(when, event, idx==0, idx==len(sequence)-1))
|
|
self.tick = self.move_to_next_tick(now)
|
|
return now
|
|
|
|
def _get_ready_events(self, now):
|
|
"""Pump out the events that are due now."""
|
|
ready = []
|
|
while True:
|
|
if not self.next_events:
|
|
return ready
|
|
when, event, start, end = self.next_events[0] # peek
|
|
if when > now:
|
|
break
|
|
when, event, start, end = heapq.heappop(self.next_events)
|
|
context = event['____context____']
|
|
operation = context['operation']
|
|
uuid = context['instance_id']
|
|
if end:
|
|
if operation == 'compute.instance.delete.start':
|
|
try:
|
|
self.instances_in_use.remove(uuid)
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
self.instances_in_use.add(uuid)
|
|
|
|
del event['____context____']
|
|
ready.append(event)
|
|
|
|
if (now - self.last_exists).days > 0:
|
|
flattened = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
|
|
self.last_exists = flattened
|
|
|
|
audit_period_start = flattened - datetime.timedelta(days=1)
|
|
audit_period_end = flattened
|
|
operation = "eod-exists"
|
|
context_hints, template = self.exists_template
|
|
for instance in self.instances_in_use:
|
|
context, sequence = self._make_sequence_from_template(now,
|
|
operation, context_hints, template)
|
|
for when, event in sequence:
|
|
payload = event['payload']
|
|
payload['audit_period_beginning'] = str(audit_period_start)
|
|
payload['audit_period_ending'] = str(audit_period_end)
|
|
payload['instance_id'] = instance
|
|
ready.append(event)
|
|
self.instances_in_use = set()
|
|
return ready
|
|
|
|
def _get_sequence(self, now):
|
|
"""Grab a template and make a sequence from it.
|
|
"""
|
|
|
|
operation, context_hints, template = random.choice(self.templates)
|
|
return self._make_sequence_from_template(now, operation,
|
|
context_hints, template)
|
|
|
|
def _make_sequence_from_template(self, now, operation, context_hints,
|
|
template):
|
|
sequence = []
|
|
context = {}
|
|
time_map = context_hints['time_map']
|
|
for key, values in time_map.iteritems():
|
|
context[key] = str(now + datetime.timedelta(days=values[0],
|
|
seconds=values[1],
|
|
milliseconds=values[2]))
|
|
for num in range(context_hints['uuid']):
|
|
context["[[[[UUID_%d]]]]" % num] = str(uuid.uuid4())
|
|
|
|
for num in range(context_hints['xuuid']):
|
|
u = str(uuid.uuid4()).replace("-", "")
|
|
context["[[[[XUUID_%d]]]]" % num] = u
|
|
|
|
for num in range(context_hints['v4']):
|
|
nums = [127 + random.randrange(127) for x in range(4)]
|
|
v4 = "%d.%d.%d.%d" % tuple(nums)
|
|
context["[[[[V4_%d]]]]" % num] = v4
|
|
|
|
for num in range(context_hints['v6']):
|
|
res = [hex(random.randint(0, 65535))[2:].zfill(4)
|
|
for i in range(0, 8)]
|
|
v6 = ":".join(res)
|
|
context["[[[[V6_%d]]]]" % num] = v6
|
|
|
|
# The rest of the context ...
|
|
context["[[[[tenant_id]]]]"] = str(100000 + random.randrange(899999))
|
|
context["[[[[user_id]]]]"] = str(100000 + random.randrange(899999))
|
|
context["[[[[display_name]]]]"] = "Instance_%d" % random.randrange(
|
|
899999)
|
|
context["[[[[host]]]]"] = "host-%d" % random.randrange(899999)
|
|
context["[[[[hostname]]]]"] = "server-%d" % random.randrange(899999)
|
|
context["[[[[node]]]]"] = "node-%d" % random.randrange(899999)
|
|
context["[[[[reservation_id]]]]"] = "res-%d" % random.randrange(899999)
|
|
context["[[[[image_name]]]]"] = "image-%d" % random.randrange(899999)
|
|
context["[[[[device_name]]]]"] = "device-%d" % random.randrange(899999)
|
|
context["[[[[publisher_id]]]]"] = "publisher-%d" % random.randrange(
|
|
899999)
|
|
|
|
for key, value in context.iteritems():
|
|
template = template.replace(key, value)
|
|
|
|
struct = json.loads(template)
|
|
|
|
instance_id = None
|
|
for event in struct:
|
|
inst_id = event['payload'].get('instance_id')
|
|
if inst_id:
|
|
if instance_id and instance_id != inst_id:
|
|
print "changing instance id", instance_id, inst_id
|
|
instance_id = inst_id
|
|
when = dateutil.parser.parse(event['timestamp'])
|
|
sequence.append((when, event))
|
|
|
|
context['instance_id'] = instance_id
|
|
context['operation'] = operation
|
|
return context, sorted(sequence)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
real_time = False
|
|
|
|
g = EventGenerator("templates", 1)
|
|
now = datetime.datetime.utcnow()
|
|
print "starting at", now
|
|
end = now + datetime.timedelta(days=1) # ensure .exists get generated
|
|
start = now
|
|
nevents = 0
|
|
while now < end:
|
|
e = g.generate(now)
|
|
if e:
|
|
nevents += len(e)
|
|
|
|
if real_time:
|
|
now = datetime.datetime.utcnow()
|
|
else:
|
|
now = g.move_to_next_tick(now)
|
|
|
|
print "Elapsed: ", datetime.datetime.utcnow() - start
|