diff --git a/etc/valet/valet.conf b/etc/valet/valet.conf index ed3c4f2..9f0edf3 100644 --- a/etc/valet/valet.conf +++ b/etc/valet/valet.conf @@ -36,8 +36,8 @@ host = music_host port = 8080 keyspace = valet_keyspace replication_factor = 3 -# tries = 10 -# interval = 1 +# tries = 100 +# interval = 0.1 # request_table = placement_requests # response_table = placement_results # event_table = oslo_messages @@ -54,35 +54,16 @@ replication_factor = 3 # [engine] +# Used for Ostro active/passive selection +priority = 1 + # Set the location of daemon process id pid = /var/run/valet/ostro-daemon.pid # Set IP of this Ostro # ip = localhost -# Used for Ostro active/passive selection -priority = 1 - - -#------------------------------------------------------------------------------------------------------------ -# Logging configuration -#------------------------------------------------------------------------------------------------------------ -# Set logging parameters -# logger_name = test -# logging level = [debug|info] -# logging_level = debug - -# Set the directory to locate the log file -# logging_dir = /var/log/valet/engine/ - -# Set the maximum size of the main logger as Byte -# max_main_log_size = 5000000 - -# Set the maximum logfile size as Byte for time-series log files -# max_log_size = 1000000 - -# Set the maximum number of time-series log files -# max_num_of_logs = 20 +# health_timeout = 10 #------------------------------------------------------------------------------------------------------------ # Management configuration diff --git a/tools/utils/cleandb.sh b/tools/utils/cleandb.sh index a291fc5..ca7cf5a 100644 --- a/tools/utils/cleandb.sh +++ b/tools/utils/cleandb.sh @@ -1,16 +1,30 @@ #!/usr/bin/env bash +if [ -z $VALET_KEYSPACE ]; then + echo "ERR: VALET_KEYSPACE is not defined." + exit +else + sed -ie "s/#VALET_KEYSPACE#/${VALET_KEYSPACE}/g" ./populate.cql +fi + +if [ -z $CASSANDRA_BIN ]; then + echo "ERR: CASSANDRA_BIN is not defined." + exit +fi + # drop keyspace -echo "drop valet keyspace" -/opt/app/apache-cassandra-2.1.1/bin/cqlsh -e "DROP KEYSPACE valet_test;" +echo "Drop Valet keyspace - ${VALET_KEYSPACE}" +${CASSANDRA_BIN}cqlsh -e "DROP KEYSPACE ${VALET_KEYSPACE};" sleep 5 # populate tables -echo "populate valet tables" -# /opt/app/apache-cassandra-2.1.1/bin/cqlsh -f ./populate.cql +echo "Populate Valet Api tables" pecan populate /var/www/valet/config.py -/opt/app/apache-cassandra-2.1.1/bin/cqlsh -e "DESCRIBE KEYSPACE valet_test;" +echo "Populate Valet Engine tables + Api indexes" +${CASSANDRA_BIN}cqlsh -f ./populate.cql -echo "Done populating" +${CASSANDRA_BIN}cqlsh -e "DESCRIBE KEYSPACE ${VALET_KEYSPACE};" + +echo "Done populating - ${VALET_KEYSPACE}" diff --git a/tools/utils/populate.cql b/tools/utils/populate.cql index 2af8642..4838d7b 100644 --- a/tools/utils/populate.cql +++ b/tools/utils/populate.cql @@ -1,23 +1,33 @@ -CREATE KEYSPACE IF NOT EXISTS valet_test WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor': '3' } AND durable_writes = true; +CREATE KEYSPACE IF NOT EXISTS #VALET_KEYSPACE# WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor': '3' } AND durable_writes = true; -CREATE TABLE IF NOT EXISTS valet_test.placements(id text PRIMARY KEY, name text, orchestration_id text, resource_id text, location text, reserved boolean, plan_id text); +CREATE TABLE IF NOT EXISTS #VALET_KEYSPACE#.placements(id text PRIMARY KEY, name text, orchestration_id text, resource_id text, location text, reserved boolean, plan_id text); -CREATE TABLE IF NOT EXISTS valet_test.groups(id text PRIMARY KEY, name text, description text, type text, members text); +CREATE TABLE IF NOT EXISTS #VALET_KEYSPACE#.groups(id text PRIMARY KEY, name text, description text, type text, members text); -CREATE TABLE IF NOT EXISTS valet_test.placement_requests(stack_id text PRIMARY KEY, request text); +CREATE TABLE IF NOT EXISTS #VALET_KEYSPACE#.placement_requests(stack_id text PRIMARY KEY, request text); -CREATE TABLE IF NOT EXISTS valet_test.placement_results(stack_id text PRIMARY KEY, placement text); +CREATE TABLE IF NOT EXISTS #VALET_KEYSPACE#.placement_results(stack_id text PRIMARY KEY, placement text); -CREATE TABLE IF NOT EXISTS valet_test.oslo_messages ("timestamp" text PRIMARY KEY, args text, exchange text, method text); +CREATE TABLE IF NOT EXISTS #VALET_KEYSPACE#.oslo_messages ("timestamp" text PRIMARY KEY, args text, exchange text, method text); -CREATE TABLE IF NOT EXISTS valet_test.plans (id text PRIMARY KEY, name text, stack_id text); +CREATE TABLE IF NOT EXISTS #VALET_KEYSPACE#.plans (id text PRIMARY KEY, name text, stack_id text); -CREATE TABLE IF NOT EXISTS valet_test.uuid_map (uuid text PRIMARY KEY, h_uuid text, s_uuid text); +CREATE TABLE IF NOT EXISTS #VALET_KEYSPACE#.uuid_map (uuid text PRIMARY KEY, h_uuid text, s_uuid text); -CREATE TABLE IF NOT EXISTS valet_test.app (stack_id text PRIMARY KEY, app text); +CREATE TABLE IF NOT EXISTS #VALET_KEYSPACE#.app (stack_id text PRIMARY KEY, app text); -CREATE TABLE IF NOT EXISTS valet_test.resource_status (site_name text PRIMARY KEY, resource text); +CREATE TABLE IF NOT EXISTS #VALET_KEYSPACE#.resource_status (site_name text PRIMARY KEY, resource text); -CREATE TABLE IF NOT EXISTS valet_test.resource_log_index (site_name text PRIMARY KEY, resource_log_index text); +CREATE TABLE IF NOT EXISTS #VALET_KEYSPACE#.resource_log_index (site_name text PRIMARY KEY, resource_log_index text); -CREATE TABLE IF NOT EXISTS valet_test.app_log_index ( site_name text PRIMARY KEY, app_log_index text); +CREATE TABLE IF NOT EXISTS #VALET_KEYSPACE#.app_log_index ( site_name text PRIMARY KEY, app_log_index text); + + + +CREATE INDEX IF NOT EXISTS ON #VALET_KEYSPACE#.plans (stack_id); + +CREATE INDEX IF NOT EXISTS ON #VALET_KEYSPACE#.placements (plan_id); + +CREATE INDEX IF NOT EXISTS ON #VALET_KEYSPACE#.placements (orchestration_id); + +CREATE INDEX IF NOT EXISTS ON #VALET_KEYSPACE#.placements (resource_id); diff --git a/valet/api/common/ostro_helper.py b/valet/api/common/ostro_helper.py index 5d6f6d1..807028d 100644 --- a/valet/api/common/ostro_helper.py +++ b/valet/api/common/ostro_helper.py @@ -94,8 +94,8 @@ class Ostro(object): def __init__(self): """Initializer.""" - self.tries = conf.music.get('tries', 10) - self.interval = conf.music.get('interval', 1) + self.tries = conf.music.get('tries', 100) + self.interval = conf.music.get('interval', 0.1) def _map_names_to_uuids(self, mapping, data): """Map resource names to their UUID equivalents.""" @@ -127,21 +127,31 @@ class Ostro(object): # TODO(JD): This really belongs in valet-engine once it exists. def _send(self, stack_id, request): - """Send request.""" - # Creating the placement request effectively enqueues it. - PlacementRequest(stack_id=stack_id, request=request) # pylint: disable=W0612 - # Wait for a response. - # TODO(JD): This is a blocking operation at the moment. + """Send request.""" + request_query = Query(PlacementRequest) + result_query = Query(PlacementResult) + requested = False + for __ in range(self.tries, 0, -1): # pylint: disable=W0612 - query = Query(PlacementResult) - placement_result = query.filter_by(stack_id=stack_id).first() - if placement_result: - placement = placement_result.placement - placement_result.delete() + # Take a breather in between checks. + # TODO(JD): This is a blocking operation at the moment. + time.sleep(self.interval) + + # First, check to see if there's already a response. + result = result_query.filter_by(stack_id=stack_id).first() + if result: + placement = result.placement + result.delete() return placement - else: - time.sleep(self.interval) + elif not requested: + # Next, check to see if there's already a request. + prior_request = request_query.filter_by( + stack_id=stack_id).first() + if not prior_request: + # No request? Make one! Creating it enqueues it. + PlacementRequest(stack_id=stack_id, request=request) # pylint: disable=W0612 + requested = True self.error_uri = '/errors/server_error' message = "Timed out waiting for a response." diff --git a/valet/api/db/models/music/__init__.py b/valet/api/db/models/music/__init__.py index 82aff4a..b6e481b 100644 --- a/valet/api/db/models/music/__init__.py +++ b/valet/api/db/models/music/__init__.py @@ -231,9 +231,39 @@ class Query(object): rows = conf.music.engine.read_all_rows(**kwargs) return self.__rows_to_objects(rows) + def all_matching_key(self, key=None, value=None): + '''Return all objects matching a particular key/value''' + if not key: + key = self.model.pk_name() + kwargs = self.__kwargs() + rows = conf.music.engine.read_row( + pk_name=key, pk_value=value, **kwargs) + return self.__rows_to_objects(rows) + def filter_by(self, **kwargs): """Filter objects""" + # Music doesn't allow filtering on anything but the primary key. + # This leads to a default O(n) filtering algorithm. However, we + # can make it O(1) in some cases. + if len(kwargs) == 1: + # *Iff* a secondary key has been manually made via cql, e.g.: + # CREATE INDEX ON keyspace.table (field); + # and that field/value is the only one in kwargs, we'll try it. + key = kwargs.keys()[0] + value = kwargs[key] + try: + filtered_items = self.all_matching_key(key=key, value=value) + return filtered_items + except Exception: + # If there's any kind of exception, we will take that + # to mean there was no primary/secondary key (though + # there can be other reasons). In this case, passthrough + # and use the original O(n) filtering method. + # + # Not logging in this module just yet. (Never use print()!) + pass + # We need to get all items and then go looking for what we want. all_items = self.all() filtered_items = Results([]) diff --git a/valet/api/db/models/music/plans.py b/valet/api/db/models/music/plans.py index c1af6ff..3515134 100644 --- a/valet/api/db/models/music/plans.py +++ b/valet/api/db/models/music/plans.py @@ -64,12 +64,9 @@ class Plan(Base): def placements(self): """Return list of placements.""" - # TODO(UNKNOWN): Make this a property? - all_results = Query("Placement").all() - results = [] - for placement in all_results: - if placement.plan_id == self.id: - results.append(placement) + + # TODO(JD): Make this a property? + results = Query("Placement").filter_by(plan_id=self.id) return results @property diff --git a/valet/engine/conf.py b/valet/engine/conf.py index 6c9a38a..a44b932 100644 --- a/valet/engine/conf.py +++ b/valet/engine/conf.py @@ -36,7 +36,7 @@ engine_opts = [ cfg.BoolOpt('network_control', default=False, help='whether network controller (i.e., Tegu) has been deployed'), cfg.StrOpt('network_control_url', default='http://network_control:29444/tegu/api'), cfg.StrOpt('ip', default='localhost'), - cfg.IntOpt('health_timeout', default=6, help='health check grace period (seconds, default=5)'), + cfg.IntOpt('health_timeout', default=10, help='health check grace period (seconds, default=10)'), cfg.IntOpt('priority', default=1, help='this instance priority (master=1)'), cfg.StrOpt('rpc_server_ip', default='localhost', help='Set RPC server ip and port if used. Otherwise, ignore these parameters'), diff --git a/valet/engine/optimizer/ostro_server/health_checker.py b/valet/engine/optimizer/ostro_server/health_checker.py index a3a3939..5c978fa 100644 --- a/valet/engine/optimizer/ostro_server/health_checker.py +++ b/valet/engine/optimizer/ostro_server/health_checker.py @@ -17,7 +17,7 @@ class HealthCheck(object): def __init__(self, hosts=[], port='8080', keyspace='valet'): - self.tries = CONF.engine.health_timeout * 2 + self.tries = CONF.engine.health_timeout * 2 # default health_timeout=10 self.uuid = str(uuid.uuid4()) kwargs = { @@ -52,7 +52,7 @@ class HealthCheck(object): 'table': CONF.music.request_table, } response = self.rest.request(method='post', path=path, data=data) - # print "SEND response: " + str(response.status_code) + return response.status_code == 204 if response else False def _read_response(self, my_id): @@ -64,23 +64,23 @@ class HealthCheck(object): 'uid': self.uuid, } - for i in range(self.tries): # default: 12 * 0.5 = 6 sec. + for i in range(self.tries): # default 20 tries * 0.5 sec = 10 sec. timeout time.sleep(0.5) - response = self.rest.request(method='get', path=path) + try: + response = self.rest.request(method='get', path=path) - # logger.debug("READ respons body text: " + str(response.text)) + if response.status_code == 200 and len(response.text) > 3: - if response.status_code == 200 and len(response.text) > 3: + j = json.loads(response.text) + stack_id = j['row 0']['stack_id'] + placement = json.loads(j['row 0']['placement']) + engine_id = placement['resources']['id'] - j = json.loads(response.text) - stack_id = j['row 0']['stack_id'] - placement = json.loads(j['row 0']['placement']) - engine_id = placement['resources']['id'] - - # logger.debug("health 'stack_id': " + stack_id + ", engine_id=" + str(engine_id)) - if stack_id == self.uuid and engine_id == my_id: - found = True - break + if stack_id == self.uuid and engine_id == my_id: + found = True + break + except Exception: + pass return found @@ -103,8 +103,6 @@ class HealthCheck(object): 'uid': self.uuid } self.rest.request(method='delete', path=path, data=data) - - # print "DELETE response: " + str(response.status_code) except Exception: pass diff --git a/valet/ha/ha_valet.cfg b/valet/ha/ha_valet.cfg index 70e6a8a..7aaa39d 100644 --- a/valet/ha/ha_valet.cfg +++ b/valet/ha/ha_valet.cfg @@ -60,7 +60,7 @@ priority=1 host=valet1 stand_by_list=valet2 user=m04060 -start="ssh -o ConnectTimeout=1 %s@%s 'sudo apachectl start'" % (user, host) +start="ssh -o ConnectTimeout=1 %s@%s 'sudo service apache2 restart'" % (user, host) stop="ssh -o ConnectTimeout=1 %s@%s 'sudo apachectl stop'" % (user, host) test="exit $(wget -T 1 -t 1 -qO- http://%s:8090/v1 | grep CURRENT | wc -l)" % (host) diff --git a/valet/ha/ha_valet.py b/valet/ha/ha_valet.py index 0a1ec4b..954d9e4 100644 --- a/valet/ha/ha_valet.py +++ b/valet/ha/ha_valet.py @@ -73,7 +73,7 @@ MAX_QUICK_STARTS = 10 # we stop if there are > 10 restart in quick succession QUICK_RESTART_SEC = 150 # we consider it a quick restart if less than this # HA Configuration -HEARTBEAT_SEC = 10 # Heartbeat interval in seconds +HEARTBEAT_SEC = 20 # Heartbeat interval in seconds NAME = 'name' diff --git a/valet/ha/ha_valet2.cfg b/valet/ha/ha_valet2.cfg index a9de2f7..2fd6154 100644 --- a/valet/ha/ha_valet2.cfg +++ b/valet/ha/ha_valet2.cfg @@ -60,7 +60,7 @@ priority=2 host=valet2 stand_by_list=valet1 user=m04060 -start="ssh -o ConnectTimeout=1 %s@%s 'sudo apachectl start'" % (user, host) +start="ssh -o ConnectTimeout=1 %s@%s 'sudo service apache2 restart'" % (user, host) stop="ssh -o ConnectTimeout=1 %s@%s 'sudo apachectl stop'" % (user, host) test="exit $(wget -T 1 -t 1 -qO- http://%s:8090/v1 | grep CURRENT | wc -l)" % (host)