Merged from trunk
This commit is contained in:
commit
69dd0a3f46
@ -46,8 +46,8 @@ class AccountController(object):
|
|||||||
self.root = conf.get('devices', '/srv/node')
|
self.root = conf.get('devices', '/srv/node')
|
||||||
self.mount_check = conf.get('mount_check', 'true').lower() in \
|
self.mount_check = conf.get('mount_check', 'true').lower() in \
|
||||||
('true', 't', '1', 'on', 'yes', 'y')
|
('true', 't', '1', 'on', 'yes', 'y')
|
||||||
self.replicator_rpc = \
|
self.replicator_rpc = ReplicatorRpc(self.root, DATADIR, AccountBroker,
|
||||||
ReplicatorRpc(self.root, DATADIR, AccountBroker, self.mount_check)
|
self.mount_check, logger=self.logger)
|
||||||
|
|
||||||
def _get_account_broker(self, drive, part, account):
|
def _get_account_broker(self, drive, part, account):
|
||||||
hsh = hash_path(account)
|
hsh = hash_path(account)
|
||||||
|
@ -38,6 +38,9 @@ from swift.common.exceptions import DriveNotMounted, ConnectionTimeout
|
|||||||
from swift.common.daemon import Daemon
|
from swift.common.daemon import Daemon
|
||||||
|
|
||||||
|
|
||||||
|
DEBUG_TIMINGS_THRESHOLD = 10
|
||||||
|
|
||||||
|
|
||||||
def quarantine_db(object_file, server_type):
|
def quarantine_db(object_file, server_type):
|
||||||
"""
|
"""
|
||||||
In the case that a corrupt file is found, move it to a quarantined area to
|
In the case that a corrupt file is found, move it to a quarantined area to
|
||||||
@ -448,11 +451,13 @@ class Replicator(Daemon):
|
|||||||
class ReplicatorRpc(object):
|
class ReplicatorRpc(object):
|
||||||
"""Handle Replication RPC calls. TODO(redbo): document please :)"""
|
"""Handle Replication RPC calls. TODO(redbo): document please :)"""
|
||||||
|
|
||||||
def __init__(self, root, datadir, broker_class, mount_check=True):
|
def __init__(self, root, datadir, broker_class, mount_check=True,
|
||||||
|
logger=None):
|
||||||
self.root = root
|
self.root = root
|
||||||
self.datadir = datadir
|
self.datadir = datadir
|
||||||
self.broker_class = broker_class
|
self.broker_class = broker_class
|
||||||
self.mount_check = mount_check
|
self.mount_check = mount_check
|
||||||
|
self.logger = logger or get_logger({}, log_route='replicator-rpc')
|
||||||
|
|
||||||
def dispatch(self, replicate_args, args):
|
def dispatch(self, replicate_args, args):
|
||||||
if not hasattr(args, 'pop'):
|
if not hasattr(args, 'pop'):
|
||||||
@ -479,27 +484,51 @@ class ReplicatorRpc(object):
|
|||||||
def sync(self, broker, args):
|
def sync(self, broker, args):
|
||||||
(remote_sync, hash_, id_, created_at, put_timestamp,
|
(remote_sync, hash_, id_, created_at, put_timestamp,
|
||||||
delete_timestamp, metadata) = args
|
delete_timestamp, metadata) = args
|
||||||
|
timemark = time.time()
|
||||||
try:
|
try:
|
||||||
info = broker.get_replication_info()
|
info = broker.get_replication_info()
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
if 'no such table' in str(e):
|
if 'no such table' in str(e):
|
||||||
# TODO(unknown): find a real logger
|
self.logger.error(_("Quarantining DB %s") % broker.db_file)
|
||||||
print _("Quarantining DB %s") % broker.db_file
|
|
||||||
quarantine_db(broker.db_file, broker.db_type)
|
quarantine_db(broker.db_file, broker.db_type)
|
||||||
return HTTPNotFound()
|
return HTTPNotFound()
|
||||||
raise
|
raise
|
||||||
|
timespan = time.time() - timemark
|
||||||
|
if timespan > DEBUG_TIMINGS_THRESHOLD:
|
||||||
|
self.logger.debug(_('replicator-rpc-sync time for info: %.02fs') %
|
||||||
|
timespan)
|
||||||
if metadata:
|
if metadata:
|
||||||
|
timemark = time.time()
|
||||||
broker.update_metadata(simplejson.loads(metadata))
|
broker.update_metadata(simplejson.loads(metadata))
|
||||||
|
timespan = time.time() - timemark
|
||||||
|
if timespan > DEBUG_TIMINGS_THRESHOLD:
|
||||||
|
self.logger.debug(_('replicator-rpc-sync time for '
|
||||||
|
'update_metadata: %.02fs') % timespan)
|
||||||
if info['put_timestamp'] != put_timestamp or \
|
if info['put_timestamp'] != put_timestamp or \
|
||||||
info['created_at'] != created_at or \
|
info['created_at'] != created_at or \
|
||||||
info['delete_timestamp'] != delete_timestamp:
|
info['delete_timestamp'] != delete_timestamp:
|
||||||
|
timemark = time.time()
|
||||||
broker.merge_timestamps(
|
broker.merge_timestamps(
|
||||||
created_at, put_timestamp, delete_timestamp)
|
created_at, put_timestamp, delete_timestamp)
|
||||||
|
timespan = time.time() - timemark
|
||||||
|
if timespan > DEBUG_TIMINGS_THRESHOLD:
|
||||||
|
self.logger.debug(_('replicator-rpc-sync time for '
|
||||||
|
'merge_timestamps: %.02fs') % timespan)
|
||||||
|
timemark = time.time()
|
||||||
info['point'] = broker.get_sync(id_)
|
info['point'] = broker.get_sync(id_)
|
||||||
|
timespan = time.time() - timemark
|
||||||
|
if timespan > DEBUG_TIMINGS_THRESHOLD:
|
||||||
|
self.logger.debug(_('replicator-rpc-sync time for get_sync: '
|
||||||
|
'%.02fs') % timespan)
|
||||||
if hash_ == info['hash'] and info['point'] < remote_sync:
|
if hash_ == info['hash'] and info['point'] < remote_sync:
|
||||||
|
timemark = time.time()
|
||||||
broker.merge_syncs([{'remote_id': id_,
|
broker.merge_syncs([{'remote_id': id_,
|
||||||
'sync_point': remote_sync}])
|
'sync_point': remote_sync}])
|
||||||
info['point'] = remote_sync
|
info['point'] = remote_sync
|
||||||
|
timespan = time.time() - timemark
|
||||||
|
if timespan > DEBUG_TIMINGS_THRESHOLD:
|
||||||
|
self.logger.debug(_('replicator-rpc-sync time for '
|
||||||
|
'merge_syncs: %.02fs') % timespan)
|
||||||
return Response(simplejson.dumps(info))
|
return Response(simplejson.dumps(info))
|
||||||
|
|
||||||
def merge_syncs(self, broker, args):
|
def merge_syncs(self, broker, args):
|
||||||
|
@ -56,7 +56,7 @@ class ContainerController(object):
|
|||||||
self.node_timeout = int(conf.get('node_timeout', 3))
|
self.node_timeout = int(conf.get('node_timeout', 3))
|
||||||
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
|
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
|
||||||
self.replicator_rpc = ReplicatorRpc(self.root, DATADIR,
|
self.replicator_rpc = ReplicatorRpc(self.root, DATADIR,
|
||||||
ContainerBroker, self.mount_check)
|
ContainerBroker, self.mount_check, logger=self.logger)
|
||||||
|
|
||||||
def _get_container_broker(self, drive, part, account, container):
|
def _get_container_broker(self, drive, part, account, container):
|
||||||
"""
|
"""
|
||||||
|
Loading…
x
Reference in New Issue
Block a user