91 lines
2.6 KiB
Python
91 lines
2.6 KiB
Python
import datetime
|
|
import os
|
|
import sys
|
|
import unittest
|
|
|
|
TENANT_ID_1 = 'testtenantid1'
|
|
|
|
from stacktach import datetime_to_decimal as dt
|
|
|
|
INSTANCE_ID_1 = 'testinstanceid1'
|
|
INSTANCE_ID_2 = 'testinstanceid2'
|
|
|
|
MESSAGE_ID_1 = 'testmessageid1'
|
|
MESSAGE_ID_2 = 'testmessageid2'
|
|
|
|
REQUEST_ID_1 = 'testrequestid1'
|
|
REQUEST_ID_2 = 'testrequestid2'
|
|
REQUEST_ID_3 = 'testrequestid3'
|
|
|
|
|
|
def decimal_utc(t = datetime.datetime.utcnow()):
|
|
return dt.dt_to_decimal(t)
|
|
|
|
|
|
def create_nova_notif(request_id=None, instance=INSTANCE_ID_1, type_id='1',
|
|
launched=None, deleted = None, new_type_id=None,
|
|
message_id=MESSAGE_ID_1):
|
|
notif = ['', {
|
|
'message_id': message_id,
|
|
'payload': {
|
|
'instance_id': instance,
|
|
'instance_type_id': type_id,
|
|
}
|
|
}]
|
|
|
|
if request_id:
|
|
notif[1]['_context_request_id'] = request_id
|
|
if launched:
|
|
notif[1]['payload']['launched_at'] = launched
|
|
if deleted:
|
|
notif[1]['payload']['deleted_at'] = deleted
|
|
if new_type_id:
|
|
notif[1]['payload']['new_instance_type_id'] = new_type_id
|
|
|
|
return notif
|
|
|
|
|
|
def create_raw(mox, when, event, instance=INSTANCE_ID_1,
|
|
request_id=REQUEST_ID_1, state='active', old_task='',
|
|
host='c.example.com', service='compute', json_str=''):
|
|
raw = mox.CreateMockAnything()
|
|
raw.host = host
|
|
raw.service = service
|
|
raw.instance = instance
|
|
raw.event = event
|
|
raw.when = when
|
|
raw.state = state
|
|
raw.old_task = old_task
|
|
raw.request_id = request_id
|
|
raw.json = json_str
|
|
return raw
|
|
|
|
def create_lifecycle(mox, instance, last_state, last_task_state, last_raw):
|
|
lifecycle = mox.CreateMockAnything()
|
|
lifecycle.instance = instance
|
|
lifecycle.last_state = last_state
|
|
lifecycle.last_task_state = last_task_state
|
|
lifecycle.last_raw = last_raw
|
|
return lifecycle
|
|
|
|
def create_timing(mox, name, lifecycle, start_raw=None, start_when=None,
|
|
end_raw=None, end_when=None, diff=None):
|
|
timing = mox.CreateMockAnything()
|
|
timing.name = name
|
|
timing.lifecycle = lifecycle
|
|
timing.start_raw = start_raw
|
|
timing.start_when = start_when
|
|
timing.end_raw = end_raw
|
|
timing.end_when = end_when
|
|
timing.diff = diff
|
|
return timing
|
|
|
|
def create_tracker(mox, request_id, lifecycle, start, last_timing=None,
|
|
duration=str(0.0)):
|
|
tracker = mox.CreateMockAnything()
|
|
tracker.request_id=request_id
|
|
tracker.lifecycle=lifecycle
|
|
tracker.start=start
|
|
tracker.last_timing=last_timing
|
|
tracker.duration=duration
|
|
return tracker |