
Fix memory usage for verifiers. Events to verify were being loaded from the db into an in-memory fifo queue to spool to worker processes. This was not being limited, resulting in a large amount of memory being used if events were read from the DB faster than they were being processed. This change pauses the loading of events if the in-memory queue grows larger than specified batchsize. Also, verifier child processes were not handling signals (like SIGTERM) properly, resulting in them not shutting down properly. Added proper signal handling. Change-Id: Ife25ca07398acf111f4388071b5f2e4eafeecb05
214 lines
7.1 KiB
Python
214 lines
7.1 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
import datetime
|
|
import decimal
|
|
|
|
TENANT_ID_1 = 'testtenantid1'
|
|
TENANT_ID_2 = 'testtenantid2'
|
|
|
|
from stacktach import datetime_to_decimal as dt
|
|
|
|
IMAGE_UUID_1 = "1111aaaa-6352-4dbc-8271-96cc54bf14cd"
|
|
|
|
INSTANCE_ID_1 = "2222bbbb-6352-4dbc-8271-96cc54bf14cd"
|
|
INSTANCE_ID_2 = "3333cccc-41d3-b86d-5467-e584edc61dab"
|
|
|
|
INSTANCE_FLAVOR_ID_1 = "1"
|
|
INSTANCE_FLAVOR_ID_2 = "performance2-120"
|
|
|
|
INSTANCE_TYPE_ID_1 = "12345"
|
|
INSTANCE_TYPE_ID_2 = '54321'
|
|
|
|
one_hr = datetime.timedelta(hours=1)
|
|
DUMMY_TIME = datetime.datetime.utcnow()
|
|
EARLIER_DUMMY_TIME = DUMMY_TIME - one_hr
|
|
LATER_DUMMY_TIME = DUMMY_TIME + one_hr
|
|
DECIMAL_DUMMY_TIME = dt.dt_to_decimal(DUMMY_TIME)
|
|
|
|
MESSAGE_ID_1 = "4444dddd-29a2-43f2-9ba1-ccb3e53ab6c8"
|
|
MESSAGE_ID_2 = "5555eeee-0f04-4329-865f-7b9a7bd69bcf"
|
|
MESSAGE_ID_3 = "6666ffff-0f04-4329-865f-797387adf45c"
|
|
MESSAGE_ID_4 = "7777aaaa-0f04-4329-865f-797387adf45e"
|
|
|
|
BANDWIDTH_PUBLIC_OUTBOUND = 1697240969
|
|
|
|
REQUEST_ID_1 = 'req-8888bbbb-9e47-4b27-a95e-27996cc40c06'
|
|
REQUEST_ID_2 = 'req-9999cccc-52ee-425d-9f56-d68bd1ad00ac'
|
|
REQUEST_ID_3 = 'req-0000dddd-5849-4406-8166-4db8cd085f52'
|
|
|
|
RAX_OPTIONS_1 = '1'
|
|
RAX_OPTIONS_2 = '2'
|
|
|
|
OS_DISTRO_1 = "linux"
|
|
OS_DISTRO_2 = "selinux"
|
|
|
|
OS_ARCH_1 = "x86"
|
|
OS_ARCH_2 = "x64"
|
|
|
|
OS_VERSION_1 = "1"
|
|
OS_VERSION_2 = "2"
|
|
|
|
LAUNCHED_AT_1 = decimal.Decimal("1.1")
|
|
LAUNCHED_AT_2 = decimal.Decimal("2.1")
|
|
|
|
DELETED_AT_1 = decimal.Decimal("3.1")
|
|
DELETED_AT_2 = decimal.Decimal("4.1")
|
|
|
|
SIZE_1 = 1234
|
|
SIZE_2 = 4567
|
|
|
|
CREATED_AT_1 = decimal.Decimal("10.1")
|
|
CREATED_AT_2 = decimal.Decimal("11.1")
|
|
|
|
IMAGE_OWNER_1 = "owner_1"
|
|
IMAGE_OWNER_2 = "owner_2"
|
|
|
|
TIMESTAMP_1 = "2013-06-20 17:31:57.939614"
|
|
SETTLE_TIME = 5
|
|
SETTLE_UNITS = "minutes"
|
|
TICK_TIME = 10
|
|
HOST = '10.0.0.1'
|
|
PORT = '5672'
|
|
VIRTUAL_HOST = '/'
|
|
USERID = 'rabbit'
|
|
PASSWORD = 'password'
|
|
NOVA_VERIFIER_EVENT_TYPE = 'compute.instance.exists.verified.old'
|
|
GLANCE_VERIFIER_EVENT_TYPE = 'image.exists.verified.old'
|
|
FLAVOR_FIELD_NAME = 'flavor_field_name'
|
|
|
|
def decimal_utc(t = datetime.datetime.utcnow()):
|
|
return dt.dt_to_decimal(t)
|
|
|
|
|
|
def create_nova_notif(request_id=None, instance=INSTANCE_ID_1, type_id='1',
|
|
launched=None, deleted=None, new_type_id=None,
|
|
message_id=MESSAGE_ID_1, audit_period_beginning=None,
|
|
audit_period_ending=None, tenant_id=None,
|
|
rax_options=None, os_architecture=None,
|
|
os_version=None, os_distro=None):
|
|
notif = ['', {
|
|
'message_id': message_id,
|
|
'payload': {
|
|
'image_meta': {},
|
|
'instance_id': instance,
|
|
'instance_type_id': type_id,
|
|
}
|
|
}]
|
|
|
|
notif[1]['_context_request_id'] = request_id
|
|
notif[1]['payload']['launched_at'] = launched
|
|
notif[1]['payload']['deleted_at'] = deleted
|
|
notif[1]['payload']['new_instance_type_id'] = new_type_id
|
|
notif[1]['payload']['audit_period_beginning'] = audit_period_beginning
|
|
notif[1]['payload']['audit_period_ending'] = audit_period_ending
|
|
notif[1]['payload']['tenant_id'] = tenant_id
|
|
notif[1]['payload']['image_meta']['com.rackspace__1__options'] = rax_options
|
|
notif[1]['payload']['image_meta']['org.openstack__1__architecture'] = os_architecture
|
|
notif[1]['payload']['image_meta']['org.openstack__1__os_distro'] = os_distro
|
|
notif[1]['payload']['image_meta']['org.openstack__1__os_version'] = os_version
|
|
|
|
return notif
|
|
|
|
|
|
def create_raw(mox, when, event, instance=INSTANCE_ID_1,
|
|
request_id=REQUEST_ID_1, state='active', old_task='',
|
|
host='c.example.com', service='compute', json_str=''):
|
|
raw = mox.CreateMockAnything()
|
|
raw.host = host
|
|
raw.service = service
|
|
raw.instance = instance
|
|
raw.event = event
|
|
raw.when = when
|
|
raw.state = state
|
|
raw.old_task = old_task
|
|
raw.request_id = request_id
|
|
raw.json = json_str
|
|
return raw
|
|
|
|
|
|
def create_lifecycle(mox, instance, last_state, last_task_state, last_raw):
|
|
lifecycle = mox.CreateMockAnything()
|
|
lifecycle.instance = instance
|
|
lifecycle.last_state = last_state
|
|
lifecycle.last_task_state = last_task_state
|
|
lifecycle.last_raw = last_raw
|
|
return lifecycle
|
|
|
|
|
|
def create_timing(mox, name, lifecycle, start_raw=None, start_when=None,
|
|
end_raw=None, end_when=None, diff=None):
|
|
timing = mox.CreateMockAnything()
|
|
timing.name = name
|
|
timing.lifecycle = lifecycle
|
|
timing.start_raw = start_raw
|
|
timing.start_when = start_when
|
|
timing.end_raw = end_raw
|
|
timing.end_when = end_when
|
|
timing.diff = diff
|
|
return timing
|
|
|
|
|
|
def create_tracker(mox, request_id, lifecycle, start, last_timing=None,
|
|
duration=str(0.0)):
|
|
tracker = mox.CreateMockAnything()
|
|
tracker.request_id=request_id
|
|
tracker.lifecycle=lifecycle
|
|
tracker.start=start
|
|
tracker.last_timing=last_timing
|
|
tracker.duration=duration
|
|
return tracker
|
|
|
|
|
|
class FakeVerifierConfig(object):
|
|
def __init__(self, host, port, virtual_host, userid, password, tick_time,
|
|
settle_time, settle_units, durable_queue, topics, notifs,
|
|
nova_event_type, glance_event_type, flavor_field_name):
|
|
self.host = lambda: host
|
|
self.port = lambda: port
|
|
self.virtual_host = lambda: virtual_host
|
|
self.userid = lambda: userid
|
|
self.password = lambda: password
|
|
self.pool_size = lambda: 5
|
|
self.tick_time = lambda: tick_time
|
|
self.settle_time = lambda: settle_time
|
|
self.settle_units = lambda: settle_units
|
|
self.durable_queue = lambda: durable_queue
|
|
self.topics = lambda: topics
|
|
self.enable_notifications = lambda: notifs
|
|
self.validation_level = lambda: 'all'
|
|
self.nova_event_type = lambda: nova_event_type
|
|
self.glance_event_type = lambda: glance_event_type
|
|
self.flavor_field_name = lambda: flavor_field_name
|
|
self.batchsize = lambda: 1000
|
|
|
|
def make_verifier_config(notifs):
|
|
topics = {'exchange': ['notifications.info']}
|
|
config = FakeVerifierConfig(HOST, PORT, VIRTUAL_HOST, USERID,
|
|
PASSWORD, TICK_TIME, SETTLE_TIME,
|
|
SETTLE_UNITS, True, topics, notifs,
|
|
NOVA_VERIFIER_EVENT_TYPE,
|
|
GLANCE_VERIFIER_EVENT_TYPE,
|
|
FLAVOR_FIELD_NAME)
|
|
return config
|
|
|
|
|
|
def mock_datetime_utcnow(mox, time):
|
|
mox.StubOutWithMock(datetime, 'datetime')
|
|
datetime.datetime.utcnow().AndReturn(time)
|
|
mox.ReplayAll()
|
|
|