stacktach/verifier/base_verifier.py
2017-05-11 13:15:02 +05:30

279 lines
10 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import datetime
import decimal
import os
import re
import signal
import sys
import time
import multiprocessing
import random
import librabbitmq
from django.db import transaction
from stacktach import message_service
POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir, os.pardir))
if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'stacktach')):
sys.path.insert(0, POSSIBLE_TOPDIR)
from django.db import close_connection
from django.db import reset_queries
from django.core import exceptions
from verifier import WrongTypeException
from stacktach import stacklog
stacklog.set_default_logger_name('verifier')
def _get_child_logger():
return stacklog.get_logger('verifier', is_parent=False)
def _has_field(d1, d2, field1, field2=None):
if not field2:
field2 = field1
return d1.get(field1) is not None and d2.get(field2) is not None
def _verify_simple_field(d1, d2, field1, field2=None):
if not field2:
field2 = field1
if not _has_field(d1, d2, field1, field2):
return False
else:
if d1[field1] != d2[field2]:
return False
return True
def _verify_date_field(d1, d2, same_second=False):
if d1 and d2:
if d1 == d2:
return True
elif same_second and int(d1) == int(d2):
return True
return False
def _is_like_uuid(attr_name, attr_value, exist_id):
if not re.match("[0-9A-Fa-f]{8}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{12}$",
attr_value):
raise WrongTypeException(attr_name, attr_value, exist_id, None)
def _is_like_date(attr_name, attr_value, exist_id, instance_uuid):
if not isinstance(attr_value, decimal.Decimal):
raise WrongTypeException(attr_name, attr_value, exist_id, instance_uuid)
def _is_long(attr_name, attr_value, exist_id, instance_uuid):
if not isinstance(attr_value, long):
raise WrongTypeException(attr_name, attr_value, exist_id, instance_uuid)
def _is_int_in_char(attr_name, attr_value, exist_id, instance_uuid):
try:
int(attr_value)
except ValueError:
raise WrongTypeException(attr_name, attr_value, exist_id, instance_uuid)
def _is_hex_owner_id(attr_name, attr_value, exist_id, instance_uuid):
if not re.match("^[0-9a-fA-F]+$", attr_value):
raise WrongTypeException(attr_name, attr_value, exist_id, instance_uuid)
def _is_alphanumeric(attr_name, attr_value, exist_id, instance_uuid):
if not re.match("[a-zA-Z0-9.]+$", attr_value):
raise WrongTypeException(attr_name, attr_value, exist_id, instance_uuid)
class Verifier(object):
def __init__(self, config, pool=None, reconciler=None, stats=None):
self.config = config
self.pool = pool or multiprocessing.Pool(config.pool_size())
self.enable_notifications = config.enable_notifications()
self.reconciler = reconciler
self.results = []
self.failed = []
self.batchsize = config.batchsize()
if stats is None:
self.stats = {}
else:
self.stats = stats
self.update_interval = datetime.timedelta(seconds=30)
self.next_update = datetime.datetime.utcnow() + self.update_interval
self._do_run = True
def clean_results(self):
pending = []
finished = 0
successful = 0
for result in self.results:
if result.ready():
finished += 1
if result.successful():
(verified, exists) = result.get()
if self.reconciler and not verified:
self.failed.append(exists)
successful += 1
else:
pending.append(result)
self.results = pending
errored = finished - successful
return len(self.results), successful, errored
def check_results(self, new_added, force=False):
tick_time = self.config.tick_time()
if ((datetime.datetime.utcnow() > self.next_update)
or force or (len(self.results) > self.batchsize)):
values = ((self.exchange(), new_added,) + self.clean_results())
msg = "%s: N: %s, P: %s, S: %s, E: %s" % values
_get_child_logger().info(msg)
while len(self.results) > (self.batchsize * 0.75):
msg = "%s: Waiting on event processing. Pending: %s" % (
self.exchange(), len(self.results))
_get_child_logger().info(msg)
time.sleep(tick_time)
self.clean_results()
self.next_update = datetime.datetime.utcnow() + self.update_interval
def handle_signal(self, signal_number):
log = _get_child_logger()
if signal_number in (signal.SIGTERM, signal.SIGKILL):
self._do_run = False
log.info("%s verifier cleaning up for shutdown." % self.exchange())
if signal_number == signal.SIGUSR1:
info = """
%s verifier:
PID: %s Parent PID: %s
Last watchdog check: %s
# of items processed: %s
""" % (self.exchange(), os.getpid(), os.getppid(),
self.stats['timestamp'],
self.stats.get('total_processed',0))
log.info(info)
def _keep_running(self):
return self._do_run
def _utcnow(self):
return datetime.datetime.utcnow()
def _run(self, callback=None):
tick_time = self.config.tick_time()
settle_units = self.config.settle_units()
settle_time = self.config.settle_time()
while self._keep_running():
self.stats['timestamp'] = self._utcnow()
with transaction.commit_on_success():
now = self._utcnow()
kwargs = {settle_units: settle_time}
ending_max = now - datetime.timedelta(**kwargs)
new = self.verify_for_range(ending_max, callback=callback)
self.check_results(new, force=True)
if self.reconciler:
self.reconcile_failed()
time.sleep(tick_time)
def run(self):
logger = _get_child_logger()
self.run_startup()
if self.enable_notifications:
exchange_name = self.exchange()
exchange = message_service.create_exchange(
exchange_name, 'topic',
durable=self.config.durable_queue())
routing_keys = self.config.topics()[exchange_name]
with message_service.create_connection(
self.config.host(), self.config.port(),
self.config.userid(), self.config.password(),
"librabbitmq", self.config.virtual_host()) as conn:
def callback(result):
attempt = 0
retry_limit = self.config.get_exponential_limit()
while attempt < retry_limit:
self.stats['timestamp'] = self._utcnow()
try:
(verified, exist) = result
if verified:
self.send_verified_notification(
exist, conn, exchange,
routing_keys=routing_keys)
break
except exceptions.ObjectDoesNotExist:
if attempt < retry_limit-1:
logger.warn("ObjectDoesNotExist in callback, "
"attempting to reconnect and try "
"again.")
close_connection()
reset_queries()
else:
logger.error("ObjectDoesNotExist in callback "
"again, giving up.")
# Avoiding unnecessary sleep()
break
except librabbitmq.ConnectionError as e:
logger.error("ConnectionEror found while trying to connect to RabbitMQ. \
Attempting the {}th time.".format(attempt))
except Exception, e:
msg = "ERROR in Callback %s: %s" % (exchange_name,
e)
logger.exception(msg)
break
attempt += 1
# Exponentially timed backoff
time.sleep((2 ** attempt) / 1000.0 + (random.randint(0, 1000) / 1000.0))
self.stats['timestamp'] = self._utcnow()
total = self.stats.get('total_processed', 0) + 1
self.stats['total_processed'] = total
try:
self._run(callback=callback)
except Exception, e:
msg = "ERROR during Verification %s: %s" % (exchange_name,
e)
logger.exception(msg)
return True
else:
self._run()
return False
def run_startup(self):
pass
def verify_for_range(self, ending_max, callback=None):
pass
def reconcile_failed(self):
pass
def exchange(self):
pass