Improve Music query and ORM performance
From the early days of Valet, there has been a known O(n) inefficiency concerning Music query filtering. This is further exacerbated by the instantiation of Music-backed ORM objects, leading to a O(n*m) inefficiency. This commit is a mitigation that greatly increases the likelihood of O(1) filtering performance. valet-engine job status check efficiency is also improved as a result. This commit also fixes a condition where valet-api may create multiple valet-engine placement requests for a stack id that already has a request in process (e.g., if nova-scheduler calls valet-api multiple times due to a retry by nova-controller). IMPORTANT: When applying this commit, perform the following changes. 1: In valet.conf, ensure "tries" and "interval" are set to 100 and 0.1, respectively. (If they're commented out, change the commented versions. There is no need to uncomment them.) [music] tries = 100 interval = 0.1 2: Manually create secondary keys in the valet-api cassandra tables. For instance, given a keyspace of "valet_aic", execute these commands in cqlsh on the cassandra server used by valet-api. CREATE INDEX ON valet_aic.plans (stack_id); CREATE INDEX ON valet_aic.placements (plan_id); CREATE INDEX ON valet_aic.placements (orchestration_id); CREATE INDEX ON valet_aic.placements (resource_id); Until Music is updated to handle this automatically, these commands must be executed every time the tables are dropped/re-added. 3: Determine the realistically expected number of simultaneous plan requests for valet-api. Ensure the server running valet-api is sized appropriately, set the httpd configuration's thread count to match this number (perhaps a few extra), and restart the daemon. For example, to specify 10 threads in Apache2 httpd, edit the WSGIDaemonProcess directive for valet-api (this is one line): WSGIDaemonProcess valet user=ubuntu group=ubuntu threads=10 python-home=/opt/stack/heat.venv 4: To start with a clean slate, clear out any residual placement requests and responses in the valet-engine cassandra tables. For instance, given a keyspace of "valet_aic", execute these commands in cqlsh on the cassandra server used by valet-engine. TRUNCATE valet_aic.placement_requests; TRUNCATE valet_aic.placement_results; Change-Id: I76528c9b81dc451241ecc547cadc18cc4b1284df
This commit is contained in:
parent
543ef67ed1
commit
0ff46ffb36
@ -36,8 +36,8 @@ host = music_host
|
||||
port = 8080
|
||||
keyspace = valet_keyspace
|
||||
replication_factor = 3
|
||||
# tries = 10
|
||||
# interval = 1
|
||||
# tries = 100
|
||||
# interval = 0.1
|
||||
# request_table = placement_requests
|
||||
# response_table = placement_results
|
||||
# event_table = oslo_messages
|
||||
@ -54,35 +54,16 @@ replication_factor = 3
|
||||
#
|
||||
|
||||
[engine]
|
||||
# Used for Ostro active/passive selection
|
||||
priority = 1
|
||||
|
||||
# Set the location of daemon process id
|
||||
pid = /var/run/valet/ostro-daemon.pid
|
||||
|
||||
# Set IP of this Ostro
|
||||
# ip = localhost
|
||||
|
||||
# Used for Ostro active/passive selection
|
||||
priority = 1
|
||||
|
||||
|
||||
#------------------------------------------------------------------------------------------------------------
|
||||
# Logging configuration
|
||||
#------------------------------------------------------------------------------------------------------------
|
||||
# Set logging parameters
|
||||
# logger_name = test
|
||||
# logging level = [debug|info]
|
||||
# logging_level = debug
|
||||
|
||||
# Set the directory to locate the log file
|
||||
# logging_dir = /var/log/valet/engine/
|
||||
|
||||
# Set the maximum size of the main logger as Byte
|
||||
# max_main_log_size = 5000000
|
||||
|
||||
# Set the maximum logfile size as Byte for time-series log files
|
||||
# max_log_size = 1000000
|
||||
|
||||
# Set the maximum number of time-series log files
|
||||
# max_num_of_logs = 20
|
||||
# health_timeout = 10
|
||||
|
||||
#------------------------------------------------------------------------------------------------------------
|
||||
# Management configuration
|
||||
|
@ -1,16 +1,30 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
if [ -z $VALET_KEYSPACE ]; then
|
||||
echo "ERR: VALET_KEYSPACE is not defined."
|
||||
exit
|
||||
else
|
||||
sed -ie "s/#VALET_KEYSPACE#/${VALET_KEYSPACE}/g" ./populate.cql
|
||||
fi
|
||||
|
||||
if [ -z $CASSANDRA_BIN ]; then
|
||||
echo "ERR: CASSANDRA_BIN is not defined."
|
||||
exit
|
||||
fi
|
||||
|
||||
# drop keyspace
|
||||
echo "drop valet keyspace"
|
||||
/opt/app/apache-cassandra-2.1.1/bin/cqlsh -e "DROP KEYSPACE valet_test;"
|
||||
echo "Drop Valet keyspace - ${VALET_KEYSPACE}"
|
||||
${CASSANDRA_BIN}cqlsh -e "DROP KEYSPACE ${VALET_KEYSPACE};"
|
||||
|
||||
sleep 5
|
||||
|
||||
# populate tables
|
||||
echo "populate valet tables"
|
||||
# /opt/app/apache-cassandra-2.1.1/bin/cqlsh -f ./populate.cql
|
||||
echo "Populate Valet Api tables"
|
||||
pecan populate /var/www/valet/config.py
|
||||
|
||||
/opt/app/apache-cassandra-2.1.1/bin/cqlsh -e "DESCRIBE KEYSPACE valet_test;"
|
||||
echo "Populate Valet Engine tables + Api indexes"
|
||||
${CASSANDRA_BIN}cqlsh -f ./populate.cql
|
||||
|
||||
echo "Done populating"
|
||||
${CASSANDRA_BIN}cqlsh -e "DESCRIBE KEYSPACE ${VALET_KEYSPACE};"
|
||||
|
||||
echo "Done populating - ${VALET_KEYSPACE}"
|
||||
|
@ -1,23 +1,33 @@
|
||||
CREATE KEYSPACE IF NOT EXISTS valet_test WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor': '3' } AND durable_writes = true;
|
||||
CREATE KEYSPACE IF NOT EXISTS #VALET_KEYSPACE# WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor': '3' } AND durable_writes = true;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS valet_test.placements(id text PRIMARY KEY, name text, orchestration_id text, resource_id text, location text, reserved boolean, plan_id text);
|
||||
CREATE TABLE IF NOT EXISTS #VALET_KEYSPACE#.placements(id text PRIMARY KEY, name text, orchestration_id text, resource_id text, location text, reserved boolean, plan_id text);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS valet_test.groups(id text PRIMARY KEY, name text, description text, type text, members text);
|
||||
CREATE TABLE IF NOT EXISTS #VALET_KEYSPACE#.groups(id text PRIMARY KEY, name text, description text, type text, members text);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS valet_test.placement_requests(stack_id text PRIMARY KEY, request text);
|
||||
CREATE TABLE IF NOT EXISTS #VALET_KEYSPACE#.placement_requests(stack_id text PRIMARY KEY, request text);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS valet_test.placement_results(stack_id text PRIMARY KEY, placement text);
|
||||
CREATE TABLE IF NOT EXISTS #VALET_KEYSPACE#.placement_results(stack_id text PRIMARY KEY, placement text);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS valet_test.oslo_messages ("timestamp" text PRIMARY KEY, args text, exchange text, method text);
|
||||
CREATE TABLE IF NOT EXISTS #VALET_KEYSPACE#.oslo_messages ("timestamp" text PRIMARY KEY, args text, exchange text, method text);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS valet_test.plans (id text PRIMARY KEY, name text, stack_id text);
|
||||
CREATE TABLE IF NOT EXISTS #VALET_KEYSPACE#.plans (id text PRIMARY KEY, name text, stack_id text);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS valet_test.uuid_map (uuid text PRIMARY KEY, h_uuid text, s_uuid text);
|
||||
CREATE TABLE IF NOT EXISTS #VALET_KEYSPACE#.uuid_map (uuid text PRIMARY KEY, h_uuid text, s_uuid text);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS valet_test.app (stack_id text PRIMARY KEY, app text);
|
||||
CREATE TABLE IF NOT EXISTS #VALET_KEYSPACE#.app (stack_id text PRIMARY KEY, app text);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS valet_test.resource_status (site_name text PRIMARY KEY, resource text);
|
||||
CREATE TABLE IF NOT EXISTS #VALET_KEYSPACE#.resource_status (site_name text PRIMARY KEY, resource text);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS valet_test.resource_log_index (site_name text PRIMARY KEY, resource_log_index text);
|
||||
CREATE TABLE IF NOT EXISTS #VALET_KEYSPACE#.resource_log_index (site_name text PRIMARY KEY, resource_log_index text);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS valet_test.app_log_index ( site_name text PRIMARY KEY, app_log_index text);
|
||||
CREATE TABLE IF NOT EXISTS #VALET_KEYSPACE#.app_log_index ( site_name text PRIMARY KEY, app_log_index text);
|
||||
|
||||
|
||||
|
||||
CREATE INDEX IF NOT EXISTS ON #VALET_KEYSPACE#.plans (stack_id);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS ON #VALET_KEYSPACE#.placements (plan_id);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS ON #VALET_KEYSPACE#.placements (orchestration_id);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS ON #VALET_KEYSPACE#.placements (resource_id);
|
||||
|
@ -94,8 +94,8 @@ class Ostro(object):
|
||||
|
||||
def __init__(self):
|
||||
"""Initializer."""
|
||||
self.tries = conf.music.get('tries', 10)
|
||||
self.interval = conf.music.get('interval', 1)
|
||||
self.tries = conf.music.get('tries', 100)
|
||||
self.interval = conf.music.get('interval', 0.1)
|
||||
|
||||
def _map_names_to_uuids(self, mapping, data):
|
||||
"""Map resource names to their UUID equivalents."""
|
||||
@ -127,21 +127,31 @@ class Ostro(object):
|
||||
|
||||
# TODO(JD): This really belongs in valet-engine once it exists.
|
||||
def _send(self, stack_id, request):
|
||||
"""Send request."""
|
||||
# Creating the placement request effectively enqueues it.
|
||||
PlacementRequest(stack_id=stack_id, request=request) # pylint: disable=W0612
|
||||
|
||||
# Wait for a response.
|
||||
# TODO(JD): This is a blocking operation at the moment.
|
||||
"""Send request."""
|
||||
request_query = Query(PlacementRequest)
|
||||
result_query = Query(PlacementResult)
|
||||
requested = False
|
||||
|
||||
for __ in range(self.tries, 0, -1): # pylint: disable=W0612
|
||||
query = Query(PlacementResult)
|
||||
placement_result = query.filter_by(stack_id=stack_id).first()
|
||||
if placement_result:
|
||||
placement = placement_result.placement
|
||||
placement_result.delete()
|
||||
# Take a breather in between checks.
|
||||
# TODO(JD): This is a blocking operation at the moment.
|
||||
time.sleep(self.interval)
|
||||
|
||||
# First, check to see if there's already a response.
|
||||
result = result_query.filter_by(stack_id=stack_id).first()
|
||||
if result:
|
||||
placement = result.placement
|
||||
result.delete()
|
||||
return placement
|
||||
else:
|
||||
time.sleep(self.interval)
|
||||
elif not requested:
|
||||
# Next, check to see if there's already a request.
|
||||
prior_request = request_query.filter_by(
|
||||
stack_id=stack_id).first()
|
||||
if not prior_request:
|
||||
# No request? Make one! Creating it enqueues it.
|
||||
PlacementRequest(stack_id=stack_id, request=request) # pylint: disable=W0612
|
||||
requested = True
|
||||
|
||||
self.error_uri = '/errors/server_error'
|
||||
message = "Timed out waiting for a response."
|
||||
|
@ -231,9 +231,39 @@ class Query(object):
|
||||
rows = conf.music.engine.read_all_rows(**kwargs)
|
||||
return self.__rows_to_objects(rows)
|
||||
|
||||
def all_matching_key(self, key=None, value=None):
|
||||
'''Return all objects matching a particular key/value'''
|
||||
if not key:
|
||||
key = self.model.pk_name()
|
||||
kwargs = self.__kwargs()
|
||||
rows = conf.music.engine.read_row(
|
||||
pk_name=key, pk_value=value, **kwargs)
|
||||
return self.__rows_to_objects(rows)
|
||||
|
||||
def filter_by(self, **kwargs):
|
||||
"""Filter objects"""
|
||||
|
||||
# Music doesn't allow filtering on anything but the primary key.
|
||||
# This leads to a default O(n) filtering algorithm. However, we
|
||||
# can make it O(1) in some cases.
|
||||
if len(kwargs) == 1:
|
||||
# *Iff* a secondary key has been manually made via cql, e.g.:
|
||||
# CREATE INDEX ON keyspace.table (field);
|
||||
# and that field/value is the only one in kwargs, we'll try it.
|
||||
key = kwargs.keys()[0]
|
||||
value = kwargs[key]
|
||||
try:
|
||||
filtered_items = self.all_matching_key(key=key, value=value)
|
||||
return filtered_items
|
||||
except Exception:
|
||||
# If there's any kind of exception, we will take that
|
||||
# to mean there was no primary/secondary key (though
|
||||
# there can be other reasons). In this case, passthrough
|
||||
# and use the original O(n) filtering method.
|
||||
#
|
||||
# Not logging in this module just yet. (Never use print()!)
|
||||
pass
|
||||
|
||||
# We need to get all items and then go looking for what we want.
|
||||
all_items = self.all()
|
||||
filtered_items = Results([])
|
||||
|
@ -64,12 +64,9 @@ class Plan(Base):
|
||||
|
||||
def placements(self):
|
||||
"""Return list of placements."""
|
||||
# TODO(UNKNOWN): Make this a property?
|
||||
all_results = Query("Placement").all()
|
||||
results = []
|
||||
for placement in all_results:
|
||||
if placement.plan_id == self.id:
|
||||
results.append(placement)
|
||||
|
||||
# TODO(JD): Make this a property?
|
||||
results = Query("Placement").filter_by(plan_id=self.id)
|
||||
return results
|
||||
|
||||
@property
|
||||
|
@ -36,7 +36,7 @@ engine_opts = [
|
||||
cfg.BoolOpt('network_control', default=False, help='whether network controller (i.e., Tegu) has been deployed'),
|
||||
cfg.StrOpt('network_control_url', default='http://network_control:29444/tegu/api'),
|
||||
cfg.StrOpt('ip', default='localhost'),
|
||||
cfg.IntOpt('health_timeout', default=6, help='health check grace period (seconds, default=5)'),
|
||||
cfg.IntOpt('health_timeout', default=10, help='health check grace period (seconds, default=10)'),
|
||||
cfg.IntOpt('priority', default=1, help='this instance priority (master=1)'),
|
||||
cfg.StrOpt('rpc_server_ip', default='localhost',
|
||||
help='Set RPC server ip and port if used. Otherwise, ignore these parameters'),
|
||||
|
@ -17,7 +17,7 @@ class HealthCheck(object):
|
||||
|
||||
def __init__(self, hosts=[], port='8080', keyspace='valet'):
|
||||
|
||||
self.tries = CONF.engine.health_timeout * 2
|
||||
self.tries = CONF.engine.health_timeout * 2 # default health_timeout=10
|
||||
self.uuid = str(uuid.uuid4())
|
||||
|
||||
kwargs = {
|
||||
@ -52,7 +52,7 @@ class HealthCheck(object):
|
||||
'table': CONF.music.request_table,
|
||||
}
|
||||
response = self.rest.request(method='post', path=path, data=data)
|
||||
# print "SEND response: " + str(response.status_code)
|
||||
|
||||
return response.status_code == 204 if response else False
|
||||
|
||||
def _read_response(self, my_id):
|
||||
@ -64,23 +64,23 @@ class HealthCheck(object):
|
||||
'uid': self.uuid,
|
||||
}
|
||||
|
||||
for i in range(self.tries): # default: 12 * 0.5 = 6 sec.
|
||||
for i in range(self.tries): # default 20 tries * 0.5 sec = 10 sec. timeout
|
||||
time.sleep(0.5)
|
||||
response = self.rest.request(method='get', path=path)
|
||||
try:
|
||||
response = self.rest.request(method='get', path=path)
|
||||
|
||||
# logger.debug("READ respons body text: " + str(response.text))
|
||||
if response.status_code == 200 and len(response.text) > 3:
|
||||
|
||||
if response.status_code == 200 and len(response.text) > 3:
|
||||
j = json.loads(response.text)
|
||||
stack_id = j['row 0']['stack_id']
|
||||
placement = json.loads(j['row 0']['placement'])
|
||||
engine_id = placement['resources']['id']
|
||||
|
||||
j = json.loads(response.text)
|
||||
stack_id = j['row 0']['stack_id']
|
||||
placement = json.loads(j['row 0']['placement'])
|
||||
engine_id = placement['resources']['id']
|
||||
|
||||
# logger.debug("health 'stack_id': " + stack_id + ", engine_id=" + str(engine_id))
|
||||
if stack_id == self.uuid and engine_id == my_id:
|
||||
found = True
|
||||
break
|
||||
if stack_id == self.uuid and engine_id == my_id:
|
||||
found = True
|
||||
break
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return found
|
||||
|
||||
@ -103,8 +103,6 @@ class HealthCheck(object):
|
||||
'uid': self.uuid
|
||||
}
|
||||
self.rest.request(method='delete', path=path, data=data)
|
||||
|
||||
# print "DELETE response: " + str(response.status_code)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
@ -60,7 +60,7 @@ priority=1
|
||||
host=valet1
|
||||
stand_by_list=valet2
|
||||
user=m04060
|
||||
start="ssh -o ConnectTimeout=1 %s@%s 'sudo apachectl start'" % (user, host)
|
||||
start="ssh -o ConnectTimeout=1 %s@%s 'sudo service apache2 restart'" % (user, host)
|
||||
stop="ssh -o ConnectTimeout=1 %s@%s 'sudo apachectl stop'" % (user, host)
|
||||
test="exit $(wget -T 1 -t 1 -qO- http://%s:8090/v1 | grep CURRENT | wc -l)" % (host)
|
||||
|
||||
|
@ -73,7 +73,7 @@ MAX_QUICK_STARTS = 10 # we stop if there are > 10 restart in quick succession
|
||||
QUICK_RESTART_SEC = 150 # we consider it a quick restart if less than this
|
||||
|
||||
# HA Configuration
|
||||
HEARTBEAT_SEC = 10 # Heartbeat interval in seconds
|
||||
HEARTBEAT_SEC = 20 # Heartbeat interval in seconds
|
||||
|
||||
|
||||
NAME = 'name'
|
||||
|
@ -60,7 +60,7 @@ priority=2
|
||||
host=valet2
|
||||
stand_by_list=valet1
|
||||
user=m04060
|
||||
start="ssh -o ConnectTimeout=1 %s@%s 'sudo apachectl start'" % (user, host)
|
||||
start="ssh -o ConnectTimeout=1 %s@%s 'sudo service apache2 restart'" % (user, host)
|
||||
stop="ssh -o ConnectTimeout=1 %s@%s 'sudo apachectl stop'" % (user, host)
|
||||
test="exit $(wget -T 1 -t 1 -qO- http://%s:8090/v1 | grep CURRENT | wc -l)" % (host)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user