217 lines
7.4 KiB
Python
217 lines
7.4 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
import datetime
|
|
import decimal
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
import multiprocessing
|
|
|
|
from django.db import transaction
|
|
from stacktach import message_service
|
|
|
|
|
|
POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
|
|
os.pardir, os.pardir))
|
|
if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'stacktach')):
|
|
sys.path.insert(0, POSSIBLE_TOPDIR)
|
|
|
|
from django.db import close_connection
|
|
from django.db import reset_queries
|
|
from django.core import exceptions
|
|
|
|
from verifier import WrongTypeException
|
|
from stacktach import stacklog
|
|
|
|
stacklog.set_default_logger_name('verifier')
|
|
|
|
|
|
def _get_child_logger():
|
|
return stacklog.get_logger('verifier', is_parent=False)
|
|
|
|
|
|
def _has_field(d1, d2, field1, field2=None):
|
|
if not field2:
|
|
field2 = field1
|
|
|
|
return d1.get(field1) is not None and d2.get(field2) is not None
|
|
|
|
|
|
def _verify_simple_field(d1, d2, field1, field2=None):
|
|
if not field2:
|
|
field2 = field1
|
|
|
|
if not _has_field(d1, d2, field1, field2):
|
|
return False
|
|
else:
|
|
if d1[field1] != d2[field2]:
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def _verify_date_field(d1, d2, same_second=False):
|
|
if d1 and d2:
|
|
if d1 == d2:
|
|
return True
|
|
elif same_second and int(d1) == int(d2):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _is_like_uuid(attr_name, attr_value, exist_id):
|
|
if not re.match("[0-9A-Fa-f]{8}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{12}$",
|
|
attr_value):
|
|
raise WrongTypeException(attr_name, attr_value, exist_id, None)
|
|
|
|
|
|
def _is_like_date(attr_name, attr_value, exist_id, instance_uuid):
|
|
if not isinstance(attr_value, decimal.Decimal):
|
|
raise WrongTypeException(attr_name, attr_value, exist_id, instance_uuid)
|
|
|
|
|
|
def _is_long(attr_name, attr_value, exist_id, instance_uuid):
|
|
if not isinstance(attr_value, long):
|
|
raise WrongTypeException(attr_name, attr_value, exist_id, instance_uuid)
|
|
|
|
|
|
def _is_int_in_char(attr_name, attr_value, exist_id, instance_uuid):
|
|
try:
|
|
int(attr_value)
|
|
except ValueError:
|
|
raise WrongTypeException(attr_name, attr_value, exist_id, instance_uuid)
|
|
|
|
|
|
def _is_hex_owner_id(attr_name, attr_value, exist_id, instance_uuid):
|
|
if not re.match("^[0-9a-fA-F]+$", attr_value):
|
|
raise WrongTypeException(attr_name, attr_value, exist_id, instance_uuid)
|
|
|
|
|
|
def _is_alphanumeric(attr_name, attr_value, exist_id, instance_uuid):
|
|
if not re.match("[a-zA-Z0-9.]+$", attr_value):
|
|
raise WrongTypeException(attr_name, attr_value, exist_id, instance_uuid)
|
|
|
|
|
|
class Verifier(object):
|
|
def __init__(self, config, pool=None, reconciler=None):
|
|
self.config = config
|
|
self.pool = pool or multiprocessing.Pool(config.pool_size())
|
|
self.enable_notifications = config.enable_notifications()
|
|
self.reconciler = reconciler
|
|
self.results = []
|
|
self.failed = []
|
|
|
|
def clean_results(self):
|
|
pending = []
|
|
finished = 0
|
|
successful = 0
|
|
|
|
for result in self.results:
|
|
if result.ready():
|
|
finished += 1
|
|
if result.successful():
|
|
(verified, exists) = result.get()
|
|
if self.reconciler and not verified:
|
|
self.failed.append(exists)
|
|
successful += 1
|
|
else:
|
|
pending.append(result)
|
|
|
|
self.results = pending
|
|
errored = finished - successful
|
|
return len(self.results), successful, errored
|
|
|
|
def _keep_running(self):
|
|
return True
|
|
|
|
def _utcnow(self):
|
|
return datetime.datetime.utcnow()
|
|
|
|
def _run(self, callback=None):
|
|
tick_time = self.config.tick_time()
|
|
settle_units = self.config.settle_units()
|
|
settle_time = self.config.settle_time()
|
|
while self._keep_running():
|
|
with transaction.commit_on_success():
|
|
now = self._utcnow()
|
|
kwargs = {settle_units: settle_time}
|
|
ending_max = now - datetime.timedelta(**kwargs)
|
|
new = self.verify_for_range(ending_max, callback=callback)
|
|
values = ((self.exchange(), new,) + self.clean_results())
|
|
if self.reconciler:
|
|
self.reconcile_failed()
|
|
msg = "%s: N: %s, P: %s, S: %s, E: %s" % values
|
|
_get_child_logger().info(msg)
|
|
time.sleep(tick_time)
|
|
|
|
def run(self):
|
|
logger = _get_child_logger()
|
|
if self.enable_notifications:
|
|
exchange_name = self.exchange()
|
|
exchange = message_service.create_exchange(
|
|
exchange_name, 'topic',
|
|
durable=self.config.durable_queue())
|
|
routing_keys = self.config.topics()[exchange_name]
|
|
|
|
with message_service.create_connection(
|
|
self.config.host(), self.config.port(),
|
|
self.config.userid(), self.config.password(),
|
|
"librabbitmq", self.config.virtual_host()) as conn:
|
|
def callback(result):
|
|
attempt = 0
|
|
while attempt < 2:
|
|
try:
|
|
(verified, exist) = result
|
|
if verified:
|
|
self.send_verified_notification(
|
|
exist, conn, exchange,
|
|
routing_keys=routing_keys)
|
|
break
|
|
except exceptions.ObjectDoesNotExist:
|
|
if attempt < 1:
|
|
logger.warn("ObjectDoesNotExist in callback, "
|
|
"attempting to reconnect and try "
|
|
"again.")
|
|
close_connection()
|
|
reset_queries()
|
|
else:
|
|
logger.error("ObjectDoesNotExist in callback "
|
|
"again, giving up.")
|
|
except Exception, e:
|
|
msg = "ERROR in Callback %s: %s" % (exchange_name,
|
|
e)
|
|
logger.exception(msg)
|
|
break
|
|
attempt += 1
|
|
try:
|
|
self._run(callback=callback)
|
|
except Exception, e:
|
|
print e
|
|
raise e
|
|
else:
|
|
self._run()
|
|
|
|
def verify_for_range(self, ending_max, callback=None):
|
|
pass
|
|
|
|
def reconcile_failed(self):
|
|
pass
|
|
|
|
def exchange(self):
|
|
pass
|