Refactored the auditors to only do local checks. This should reduce the load the auditors place on a large cluster.
This commit is contained in:
commit
2f141037a8
@ -47,10 +47,6 @@ use = egg:swift#account
|
||||
# log_name = account-auditor
|
||||
# Will audit, at most, 1 account per device per interval
|
||||
# interval = 1800
|
||||
# Maximum containers randomly picked for a given account audit
|
||||
# max_container_count = 100
|
||||
# node_timeout = 10
|
||||
# conn_timeout = 0.5
|
||||
# log_facility = LOG_LOCAL0
|
||||
# log_level = INFO
|
||||
|
||||
|
@ -43,7 +43,3 @@ use = egg:swift#container
|
||||
# log_name = container-auditor
|
||||
# Will audit, at most, 1 container per device per interval
|
||||
# interval = 1800
|
||||
# Maximum objects randomly picked for a given container audit
|
||||
# max_object_count = 100
|
||||
# node_timeout = 10
|
||||
# conn_timeout = 0.5
|
||||
|
@ -50,5 +50,3 @@ use = egg:swift#object
|
||||
# log_name = object-auditor
|
||||
# Will audit, at most, 1 object per device per interval
|
||||
# interval = 1800
|
||||
# node_timeout = 10
|
||||
# conn_timeout = 0.5
|
||||
|
@ -14,25 +14,15 @@
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import socket
|
||||
import time
|
||||
from random import choice, random
|
||||
|
||||
from eventlet import Timeout
|
||||
from random import random
|
||||
|
||||
from swift.account import server as account_server
|
||||
from swift.common.db import AccountBroker
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.exceptions import ConnectionTimeout
|
||||
from swift.common.ring import Ring
|
||||
from swift.common.utils import get_logger
|
||||
from swift.common.utils import get_logger, audit_location_generator
|
||||
from swift.common.daemon import Daemon
|
||||
|
||||
|
||||
class AuditException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class AccountAuditor(Daemon):
|
||||
"""Audit accounts."""
|
||||
|
||||
@ -43,52 +33,30 @@ class AccountAuditor(Daemon):
|
||||
self.mount_check = conf.get('mount_check', 'true').lower() in \
|
||||
('true', 't', '1', 'on', 'yes', 'y')
|
||||
self.interval = int(conf.get('interval', 1800))
|
||||
swift_dir = conf.get('swift_dir', '/etc/swift')
|
||||
self.container_ring_path = os.path.join(swift_dir, 'container.ring.gz')
|
||||
self.container_ring = None
|
||||
self.node_timeout = int(conf.get('node_timeout', 10))
|
||||
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
|
||||
self.max_container_count = \
|
||||
int(conf.get('max_container_count', 100))
|
||||
self.container_passes = 0
|
||||
self.container_failures = 0
|
||||
self.container_errors = 0
|
||||
self.account_passes = 0
|
||||
self.account_failures = 0
|
||||
|
||||
def get_container_ring(self):
|
||||
"""
|
||||
Get the container ring. Load the ring if neccesary.
|
||||
|
||||
:returns: container ring
|
||||
"""
|
||||
if not self.container_ring:
|
||||
self.logger.debug(
|
||||
'Loading container ring from %s' % self.container_ring_path)
|
||||
self.container_ring = Ring(self.container_ring_path)
|
||||
return self.container_ring
|
||||
|
||||
def run_forever(self): # pragma: no cover
|
||||
def run_forever(self): # pragma: no cover
|
||||
"""Run the account audit until stopped."""
|
||||
reported = time.time()
|
||||
time.sleep(random() * self.interval)
|
||||
while True:
|
||||
begin = time.time()
|
||||
for device in os.listdir(self.devices):
|
||||
if self.mount_check and not \
|
||||
os.path.ismount(os.path.join(self.devices, device)):
|
||||
self.logger.debug(
|
||||
'Skipping %s as it is not mounted' % device)
|
||||
continue
|
||||
self.account_audit(device)
|
||||
if time.time() - reported >= 3600: # once an hour
|
||||
self.logger.info(
|
||||
'Since %s: Remote audits with containers: %s passed '
|
||||
'audit, %s failed audit, %s errors' %
|
||||
(time.ctime(reported), self.container_passes,
|
||||
self.container_failures, self.container_errors))
|
||||
reported = time.time()
|
||||
self.container_passes = 0
|
||||
self.container_failures = 0
|
||||
self.container_errors = 0
|
||||
all_locs = audit_location_generator(self.devices,
|
||||
account_server.DATADIR,
|
||||
mount_check=self.mount_check,
|
||||
logger=self.logger)
|
||||
for path, device, partition in all_locs:
|
||||
self.account_audit(path)
|
||||
if time.time() - reported >= 3600: # once an hour
|
||||
self.logger.info(
|
||||
'Since %s: Account audits: %s passed audit, '
|
||||
'%s failed audit' % (time.ctime(reported),
|
||||
self.account_passes,
|
||||
self.account_failures))
|
||||
reported = time.time()
|
||||
self.account_passes = 0
|
||||
self.account_failures = 0
|
||||
elapsed = time.time() - begin
|
||||
if elapsed < self.interval:
|
||||
time.sleep(self.interval - elapsed)
|
||||
@ -96,98 +64,41 @@ class AccountAuditor(Daemon):
|
||||
def run_once(self):
|
||||
"""Run the account audit once."""
|
||||
self.logger.info('Begin account audit "once" mode')
|
||||
begin = time.time()
|
||||
for device in os.listdir(self.devices):
|
||||
if self.mount_check and \
|
||||
not os.path.ismount(os.path.join(self.devices, device)):
|
||||
self.logger.debug(
|
||||
'Skipping %s as it is not mounted' % device)
|
||||
continue
|
||||
self.account_audit(device)
|
||||
begin = reported = time.time()
|
||||
all_locs = audit_location_generator(self.devices,
|
||||
account_server.DATADIR,
|
||||
mount_check=self.mount_check,
|
||||
logger=self.logger)
|
||||
for path, device, partition in all_locs:
|
||||
self.account_audit(path)
|
||||
if time.time() - reported >= 3600: # once an hour
|
||||
self.logger.info(
|
||||
'Since %s: Account audits: %s passed audit, '
|
||||
'%s failed audit' % (time.ctime(reported),
|
||||
self.account_passes,
|
||||
self.account_failures))
|
||||
reported = time.time()
|
||||
self.account_passes = 0
|
||||
self.account_failures = 0
|
||||
elapsed = time.time() - begin
|
||||
self.logger.info(
|
||||
'Account audit "once" mode completed: %.02fs' % elapsed)
|
||||
|
||||
def account_audit(self, device):
|
||||
def account_audit(self, path):
|
||||
"""
|
||||
Audit any accounts found on the device.
|
||||
Audits the given account path
|
||||
|
||||
:param device: device to audit
|
||||
:param path: the path to an account db
|
||||
"""
|
||||
datadir = os.path.join(self.devices, device, account_server.DATADIR)
|
||||
if not os.path.exists(datadir):
|
||||
return
|
||||
broker = None
|
||||
partition = None
|
||||
attempts = 100
|
||||
while not broker and attempts:
|
||||
attempts -= 1
|
||||
try:
|
||||
partition = choice(os.listdir(datadir))
|
||||
fpath = os.path.join(datadir, partition)
|
||||
if not os.path.isdir(fpath):
|
||||
continue
|
||||
suffix = choice(os.listdir(fpath))
|
||||
fpath = os.path.join(fpath, suffix)
|
||||
if not os.path.isdir(fpath):
|
||||
continue
|
||||
hsh = choice(os.listdir(fpath))
|
||||
fpath = os.path.join(fpath, hsh)
|
||||
if not os.path.isdir(fpath):
|
||||
continue
|
||||
except IndexError:
|
||||
continue
|
||||
for fname in sorted(os.listdir(fpath), reverse=True):
|
||||
if fname.endswith('.db'):
|
||||
broker = AccountBroker(os.path.join(fpath, fname))
|
||||
if broker.is_deleted():
|
||||
broker = None
|
||||
break
|
||||
if not broker:
|
||||
return
|
||||
info = broker.get_info()
|
||||
for container in broker.get_random_containers(
|
||||
max_count=self.max_container_count):
|
||||
found = False
|
||||
results = []
|
||||
part, nodes = \
|
||||
self.get_container_ring().get_nodes(info['account'], container)
|
||||
for node in nodes:
|
||||
try:
|
||||
with ConnectionTimeout(self.conn_timeout):
|
||||
conn = http_connect(node['ip'], node['port'],
|
||||
node['device'], part, 'HEAD',
|
||||
'/%s/%s' % (info['account'], container))
|
||||
with Timeout(self.node_timeout):
|
||||
resp = conn.getresponse()
|
||||
body = resp.read()
|
||||
if 200 <= resp.status <= 299:
|
||||
found = True
|
||||
break
|
||||
else:
|
||||
results.append('%s:%s/%s %s %s' % (node['ip'],
|
||||
node['port'], node['device'], resp.status,
|
||||
resp.reason))
|
||||
except socket.error, err:
|
||||
results.append('%s:%s/%s Socket Error: %s' % (node['ip'],
|
||||
node['port'], node['device'], err))
|
||||
except ConnectionTimeout:
|
||||
results.append(
|
||||
'%(ip)s:%(port)s/%(device)s ConnectionTimeout' % node)
|
||||
except Timeout:
|
||||
results.append('%(ip)s:%(port)s/%(device)s Timeout' % node)
|
||||
except Exception, err:
|
||||
self.logger.exception('ERROR With remote server '
|
||||
'%(ip)s:%(port)s/%(device)s' % node)
|
||||
results.append('%s:%s/%s Exception: %s' % (node['ip'],
|
||||
node['port'], node['device'], err))
|
||||
if found:
|
||||
self.container_passes += 1
|
||||
self.logger.debug('Audit passed for /%s %s container %s' %
|
||||
(info['account'], broker.db_file, container))
|
||||
else:
|
||||
self.container_errors += 1
|
||||
self.logger.error('ERROR Could not find container /%s/%s '
|
||||
'referenced by %s on any of the primary container '
|
||||
'servers it should be on: %s' % (info['account'],
|
||||
container, broker.db_file, results))
|
||||
try:
|
||||
if not path.endswith('.db'):
|
||||
return
|
||||
broker = AccountBroker(path)
|
||||
if not broker.is_deleted():
|
||||
info = broker.get_info()
|
||||
self.account_passes += 1
|
||||
self.logger.debug('Audit passed for %s' % broker.db_file)
|
||||
except Exception:
|
||||
self.account_failures += 1
|
||||
self.logger.exception('ERROR Could not get account info %s' %
|
||||
(broker.db_file))
|
||||
|
@ -621,3 +621,46 @@ def write_pickle(obj, dest, tmp):
|
||||
fo.flush()
|
||||
os.fsync(fd)
|
||||
renamer(tmppath, dest)
|
||||
|
||||
def audit_location_generator(devices, datadir, mount_check=True, logger=None):
|
||||
'''
|
||||
Given a devices path and a data directory, yield (path, device,
|
||||
partition) for all files in that directory
|
||||
|
||||
:param devices: parent directory of the devices to be audited
|
||||
:param datadir: a directory located under self.devices. This should be
|
||||
one of the DATADIR constants defined in the account, container, and
|
||||
object servers.
|
||||
:param mount_check: Flag to check if a mount check should be performed
|
||||
on devices
|
||||
:param logger: a logger object
|
||||
'''
|
||||
for device in os.listdir(devices):
|
||||
if mount_check and not\
|
||||
os.path.ismount(os.path.join(devices, device)):
|
||||
if logger:
|
||||
logger.debug(
|
||||
'Skipping %s as it is not mounted' % device)
|
||||
continue
|
||||
datadir = os.path.join(devices, device, datadir)
|
||||
if not os.path.exists(datadir):
|
||||
continue
|
||||
partitions = os.listdir(datadir)
|
||||
for partition in partitions:
|
||||
part_path = os.path.join(datadir, partition)
|
||||
if not os.path.isdir(part_path):
|
||||
continue
|
||||
suffixes = os.listdir(part_path)
|
||||
for suffix in suffixes:
|
||||
suff_path = os.path.join(part_path, suffix)
|
||||
if not os.path.isdir(suff_path):
|
||||
continue
|
||||
hashes = os.listdir(suff_path)
|
||||
for hsh in hashes:
|
||||
hash_path = os.path.join(suff_path, hsh)
|
||||
if not os.path.isdir(hash_path):
|
||||
continue
|
||||
for fname in sorted(os.listdir(hash_path),
|
||||
reverse=True):
|
||||
path = os.path.join(hash_path, fname)
|
||||
yield path, device, partition
|
||||
|
@ -14,74 +14,28 @@
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import socket
|
||||
import time
|
||||
from random import choice, random
|
||||
from urllib import quote
|
||||
|
||||
from eventlet import Timeout
|
||||
from random import random
|
||||
|
||||
from swift.container import server as container_server
|
||||
from swift.common.db import ContainerBroker
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.exceptions import ConnectionTimeout
|
||||
from swift.common.ring import Ring
|
||||
from swift.common.utils import get_logger
|
||||
from swift.common.utils import get_logger, audit_location_generator
|
||||
from swift.common.daemon import Daemon
|
||||
|
||||
|
||||
class AuditException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class ContainerAuditor(Daemon):
|
||||
"""Audit containers."""
|
||||
|
||||
def __init__(self, conf):
|
||||
self.conf = conf
|
||||
self.logger = get_logger(conf)
|
||||
self.logger = get_logger(conf, 'container-auditor')
|
||||
self.devices = conf.get('devices', '/srv/node')
|
||||
self.mount_check = conf.get('mount_check', 'true').lower() in \
|
||||
('true', 't', '1', 'on', 'yes', 'y')
|
||||
self.interval = int(conf.get('interval', 1800))
|
||||
swift_dir = conf.get('swift_dir', '/etc/swift')
|
||||
self.account_ring_path = os.path.join(swift_dir, 'account.ring.gz')
|
||||
self.account_ring = None
|
||||
self.object_ring_path = os.path.join(swift_dir, 'object.ring.gz')
|
||||
self.object_ring = None
|
||||
self.node_timeout = int(conf.get('node_timeout', 10))
|
||||
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
|
||||
self.max_object_count = int(conf.get('max_object_count', 100))
|
||||
self.account_passes = 0
|
||||
self.account_failures = 0
|
||||
self.account_errors = 0
|
||||
self.object_passes = 0
|
||||
self.object_failures = 0
|
||||
self.object_errors = 0
|
||||
|
||||
def get_account_ring(self):
|
||||
"""
|
||||
Get the account ring. Loads the ring if neccesary.
|
||||
|
||||
:returns: account ring
|
||||
"""
|
||||
if not self.account_ring:
|
||||
self.logger.debug(
|
||||
'Loading account ring from %s' % self.account_ring_path)
|
||||
self.account_ring = Ring(self.account_ring_path)
|
||||
return self.account_ring
|
||||
|
||||
def get_object_ring(self):
|
||||
"""
|
||||
Get the object ring. Loads the ring if neccesary.
|
||||
|
||||
:returns: object ring
|
||||
"""
|
||||
if not self.object_ring:
|
||||
self.logger.debug(
|
||||
'Loading object ring from %s' % self.object_ring_path)
|
||||
self.object_ring = Ring(self.object_ring_path)
|
||||
return self.object_ring
|
||||
self.container_passes = 0
|
||||
self.container_failures = 0
|
||||
|
||||
def run_forever(self): # pragma: no cover
|
||||
"""Run the container audit until stopped."""
|
||||
@ -89,29 +43,21 @@ class ContainerAuditor(Daemon):
|
||||
time.sleep(random() * self.interval)
|
||||
while True:
|
||||
begin = time.time()
|
||||
for device in os.listdir(self.devices):
|
||||
if self.mount_check and not\
|
||||
os.path.ismount(os.path.join(self.devices, device)):
|
||||
self.logger.debug(
|
||||
'Skipping %s as it is not mounted' % device)
|
||||
continue
|
||||
self.container_audit(device)
|
||||
if time.time() - reported >= 3600: # once an hour
|
||||
self.logger.info(
|
||||
'Since %s: Remote audits with accounts: %s passed audit, '
|
||||
'%s failed audit, %s errors Remote audits with objects: '
|
||||
'%s passed audit, %s failed audit, %s errors' %
|
||||
(time.ctime(reported), self.account_passes,
|
||||
self.account_failures, self.account_errors,
|
||||
self.object_passes, self.object_failures,
|
||||
self.object_errors))
|
||||
reported = time.time()
|
||||
self.account_passes = 0
|
||||
self.account_failures = 0
|
||||
self.account_errors = 0
|
||||
self.object_passes = 0
|
||||
self.object_failures = 0
|
||||
self.object_errors = 0
|
||||
all_locs = audit_location_generator(self.devices,
|
||||
container_server.DATADIR,
|
||||
mount_check=self.mount_check,
|
||||
logger=self.logger)
|
||||
for path, device, partition in all_locs:
|
||||
self.container_audit(path)
|
||||
if time.time() - reported >= 3600: # once an hour
|
||||
self.logger.info(
|
||||
'Since %s: Container audits: %s passed audit, '
|
||||
'%s failed audit' % (time.ctime(reported),
|
||||
self.container_passes,
|
||||
self.container_failures))
|
||||
reported = time.time()
|
||||
self.container_passes = 0
|
||||
self.container_failures = 0
|
||||
elapsed = time.time() - begin
|
||||
if elapsed < self.interval:
|
||||
time.sleep(self.interval - elapsed)
|
||||
@ -119,153 +65,41 @@ class ContainerAuditor(Daemon):
|
||||
def run_once(self):
|
||||
"""Run the container audit once."""
|
||||
self.logger.info('Begin container audit "once" mode')
|
||||
begin = time.time()
|
||||
for device in os.listdir(self.devices):
|
||||
if self.mount_check and \
|
||||
not os.path.ismount(os.path.join(self.devices, device)):
|
||||
self.logger.debug(
|
||||
'Skipping %s as it is not mounted' % device)
|
||||
continue
|
||||
self.container_audit(device)
|
||||
begin = reported = time.time()
|
||||
all_locs = audit_location_generator(self.devices,
|
||||
container_server.DATADIR,
|
||||
mount_check=self.mount_check,
|
||||
logger=self.logger)
|
||||
for path, device, partition in all_locs:
|
||||
self.container_audit(path)
|
||||
if time.time() - reported >= 3600: # once an hour
|
||||
self.logger.info(
|
||||
'Since %s: Container audits: %s passed audit, '
|
||||
'%s failed audit' % (time.ctime(reported),
|
||||
self.container_passes,
|
||||
self.container_failures))
|
||||
reported = time.time()
|
||||
self.container_passes = 0
|
||||
self.container_failures = 0
|
||||
elapsed = time.time() - begin
|
||||
self.logger.info(
|
||||
'Container audit "once" mode completed: %.02fs' % elapsed)
|
||||
|
||||
def container_audit(self, device):
|
||||
def container_audit(self, path):
|
||||
"""
|
||||
Audit any containers found on the device
|
||||
Audits the given container path
|
||||
|
||||
:param device: device to audit
|
||||
:param path: the path to a container db
|
||||
"""
|
||||
datadir = os.path.join(self.devices, device, container_server.DATADIR)
|
||||
if not os.path.exists(datadir):
|
||||
return
|
||||
broker = None
|
||||
partition = None
|
||||
attempts = 100
|
||||
while not broker and attempts:
|
||||
attempts -= 1
|
||||
try:
|
||||
partition = choice(os.listdir(datadir))
|
||||
fpath = os.path.join(datadir, partition)
|
||||
if not os.path.isdir(fpath):
|
||||
continue
|
||||
suffix = choice(os.listdir(fpath))
|
||||
fpath = os.path.join(fpath, suffix)
|
||||
if not os.path.isdir(fpath):
|
||||
continue
|
||||
hsh = choice(os.listdir(fpath))
|
||||
fpath = os.path.join(fpath, hsh)
|
||||
if not os.path.isdir(fpath):
|
||||
continue
|
||||
except IndexError:
|
||||
continue
|
||||
for fname in sorted(os.listdir(fpath), reverse=True):
|
||||
if fname.endswith('.db'):
|
||||
broker = ContainerBroker(os.path.join(fpath, fname))
|
||||
if broker.is_deleted():
|
||||
broker = None
|
||||
break
|
||||
if not broker:
|
||||
return
|
||||
info = broker.get_info()
|
||||
found = False
|
||||
good_response = False
|
||||
results = []
|
||||
part, nodes = self.get_account_ring().get_nodes(info['account'])
|
||||
for node in nodes:
|
||||
try:
|
||||
with ConnectionTimeout(self.conn_timeout):
|
||||
conn = http_connect(node['ip'], node['port'],
|
||||
node['device'], part, 'GET',
|
||||
'/%s' % info['account'],
|
||||
query_string='prefix=%s' %
|
||||
quote(info['container']))
|
||||
with Timeout(self.node_timeout):
|
||||
resp = conn.getresponse()
|
||||
body = resp.read()
|
||||
if 200 <= resp.status <= 299:
|
||||
for cname in body.split('\n'):
|
||||
if cname == info['container']:
|
||||
found = True
|
||||
break
|
||||
if found:
|
||||
break
|
||||
else:
|
||||
results.append('%s:%s/%s %s %s = %s' % (node['ip'],
|
||||
node['port'], node['device'], resp.status,
|
||||
resp.reason, repr(body)))
|
||||
else:
|
||||
results.append('%s:%s/%s %s %s' %
|
||||
(node['ip'], node['port'], node['device'],
|
||||
resp.status, resp.reason))
|
||||
except socket.error, err:
|
||||
results.append('%s:%s/%s Socket Error: %s' % (node['ip'],
|
||||
node['port'], node['device'], err))
|
||||
except ConnectionTimeout:
|
||||
results.append('%(ip)s:%(port)s/%(device)s ConnectionTimeout' %
|
||||
node)
|
||||
except Timeout:
|
||||
results.append('%(ip)s:%(port)s/%(device)s Timeout' % node)
|
||||
except Exception, err:
|
||||
self.logger.exception('ERROR With remote server '
|
||||
'%(ip)s:%(port)s/%(device)s' % node)
|
||||
results.append('%s:%s/%s Exception: %s' % (node['ip'],
|
||||
node['port'], node['device'], err))
|
||||
if found:
|
||||
self.account_passes += 1
|
||||
self.logger.debug('Audit passed for /%s/%s %s' % (info['account'],
|
||||
info['container'], broker.db_file))
|
||||
else:
|
||||
if good_response:
|
||||
self.account_failures += 1
|
||||
else:
|
||||
self.account_errors += 1
|
||||
self.logger.error('ERROR Could not find container /%s/%s %s on '
|
||||
'any of the primary account servers it should be on: %s' %
|
||||
(info['account'], info['container'], broker.db_file, results))
|
||||
for obj in broker.get_random_objects(max_count=self.max_object_count):
|
||||
found = False
|
||||
results = []
|
||||
part, nodes = self.get_object_ring().get_nodes(info['account'],
|
||||
info['container'], obj)
|
||||
for node in nodes:
|
||||
try:
|
||||
with ConnectionTimeout(self.conn_timeout):
|
||||
conn = http_connect(node['ip'], node['port'],
|
||||
node['device'], part, 'HEAD',
|
||||
'/%s/%s/%s' %
|
||||
(info['account'], info['container'], obj))
|
||||
with Timeout(self.node_timeout):
|
||||
resp = conn.getresponse()
|
||||
body = resp.read()
|
||||
if 200 <= resp.status <= 299:
|
||||
found = True
|
||||
break
|
||||
else:
|
||||
results.append('%s:%s/%s %s %s' % (node['ip'],
|
||||
node['port'], node['device'], resp.status,
|
||||
resp.reason))
|
||||
except socket.error, err:
|
||||
results.append('%s:%s/%s Socket Error: %s' % (node['ip'],
|
||||
node['port'], node['device'], err))
|
||||
except ConnectionTimeout:
|
||||
results.append(
|
||||
'%(ip)s:%(port)s/%(device)s ConnectionTimeout' % node)
|
||||
except Timeout:
|
||||
results.append('%(ip)s:%(port)s/%(device)s Timeout' % node)
|
||||
except Exception, err:
|
||||
self.logger.exception('ERROR With remote server '
|
||||
'%(ip)s:%(port)s/%(device)s' % node)
|
||||
results.append('%s:%s/%s Exception: %s' % (node['ip'],
|
||||
node['port'], node['device'], err))
|
||||
if found:
|
||||
self.object_passes += 1
|
||||
self.logger.debug('Audit passed for /%s/%s %s object %s' %
|
||||
(info['account'], info['container'], broker.db_file, obj))
|
||||
else:
|
||||
self.object_errors += 1
|
||||
self.logger.error('ERROR Could not find object /%s/%s/%s '
|
||||
'referenced by %s on any of the primary object '
|
||||
'servers it should be on: %s' % (info['account'],
|
||||
info['container'], obj, broker.db_file, results))
|
||||
try:
|
||||
if not path.endswith('.db'):
|
||||
return
|
||||
broker = ContainerBroker(path)
|
||||
if not broker.is_deleted():
|
||||
info = broker.get_info()
|
||||
self.container_passes += 1
|
||||
self.logger.debug('Audit passed for %s' % broker.db_file)
|
||||
except Exception:
|
||||
self.container_failures += 1
|
||||
self.logger.exception('ERROR Could not get container info %s' %
|
||||
(broker.db_file))
|
||||
|
@ -14,20 +14,13 @@
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import socket
|
||||
import time
|
||||
from hashlib import md5
|
||||
from random import choice, random
|
||||
from urllib import quote
|
||||
|
||||
from eventlet import Timeout
|
||||
from random import random
|
||||
|
||||
from swift.obj import server as object_server
|
||||
from swift.obj.replicator import invalidate_hash
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.exceptions import ConnectionTimeout
|
||||
from swift.common.ring import Ring
|
||||
from swift.common.utils import get_logger, renamer
|
||||
from swift.common.utils import get_logger, renamer, audit_location_generator
|
||||
from swift.common.exceptions import AuditException
|
||||
from swift.common.daemon import Daemon
|
||||
|
||||
@ -42,29 +35,9 @@ class ObjectAuditor(Daemon):
|
||||
self.mount_check = conf.get('mount_check', 'true').lower() in \
|
||||
('true', 't', '1', 'on', 'yes', 'y')
|
||||
self.interval = int(conf.get('interval', 1800))
|
||||
swift_dir = conf.get('swift_dir', '/etc/swift')
|
||||
self.container_ring_path = os.path.join(swift_dir, 'container.ring.gz')
|
||||
self.container_ring = None
|
||||
self.node_timeout = int(conf.get('node_timeout', 10))
|
||||
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
|
||||
self.passes = 0
|
||||
self.quarantines = 0
|
||||
self.errors = 0
|
||||
self.container_passes = 0
|
||||
self.container_failures = 0
|
||||
self.container_errors = 0
|
||||
|
||||
def get_container_ring(self):
|
||||
"""
|
||||
Get the container ring, loading it if neccesary.
|
||||
|
||||
:returns: container ring
|
||||
"""
|
||||
if not self.container_ring:
|
||||
self.logger.debug(
|
||||
'Loading container ring from %s' % self.container_ring_path)
|
||||
self.container_ring = Ring(self.container_ring_path)
|
||||
return self.container_ring
|
||||
|
||||
def run_forever(self): # pragma: no cover
|
||||
"""Run the object audit until stopped."""
|
||||
@ -72,30 +45,21 @@ class ObjectAuditor(Daemon):
|
||||
time.sleep(random() * self.interval)
|
||||
while True:
|
||||
begin = time.time()
|
||||
# read from container ring to ensure it's fresh
|
||||
self.get_container_ring().get_nodes('')
|
||||
for device in os.listdir(self.devices):
|
||||
if self.mount_check and not \
|
||||
os.path.ismount(os.path.join(self.devices, device)):
|
||||
self.logger.debug(
|
||||
'Skipping %s as it is not mounted' % device)
|
||||
continue
|
||||
self.object_audit(device)
|
||||
if time.time() - reported >= 3600: # once an hour
|
||||
self.logger.info(
|
||||
'Since %s: Locally: %d passed audit, %d quarantined, %d '
|
||||
'errors Remote audits with containers: %s passed audit, '
|
||||
'%s failed audit, %s errors' %
|
||||
(time.ctime(reported), self.passes, self.quarantines,
|
||||
self.errors, self.container_passes,
|
||||
self.container_failures, self.container_errors))
|
||||
reported = time.time()
|
||||
self.passes = 0
|
||||
self.quarantines = 0
|
||||
self.errors = 0
|
||||
self.container_passes = 0
|
||||
self.container_failures = 0
|
||||
self.container_errors = 0
|
||||
all_locs = audit_location_generator(self.devices,
|
||||
object_server.DATADIR,
|
||||
mount_check=self.mount_check,
|
||||
logger=self.logger)
|
||||
for path, device, partition in all_locs:
|
||||
self.object_audit(path, device, partition)
|
||||
if time.time() - reported >= 3600: # once an hour
|
||||
self.logger.info(
|
||||
'Since %s: Locally: %d passed audit, %d quarantined, '
|
||||
'%d errors' % (time.ctime(reported), self.passes,
|
||||
self.quarantines, self.errors))
|
||||
reported = time.time()
|
||||
self.passes = 0
|
||||
self.quarantines = 0
|
||||
self.errors = 0
|
||||
elapsed = time.time() - begin
|
||||
if elapsed < self.interval:
|
||||
time.sleep(self.interval - elapsed)
|
||||
@ -103,56 +67,46 @@ class ObjectAuditor(Daemon):
|
||||
def run_once(self):
|
||||
"""Run the object audit once."""
|
||||
self.logger.info('Begin object audit "once" mode')
|
||||
begin = time.time()
|
||||
for device in os.listdir(self.devices):
|
||||
if self.mount_check and \
|
||||
not os.path.ismount(os.path.join(self.devices, device)):
|
||||
self.logger.debug(
|
||||
'Skipping %s as it is not mounted' % device)
|
||||
continue
|
||||
self.object_audit(device)
|
||||
begin = reported = time.time()
|
||||
all_locs = audit_location_generator(self.devices,
|
||||
object_server.DATADIR,
|
||||
mount_check=self.mount_check,
|
||||
logger=self.logger)
|
||||
for path, device, partition in all_locs:
|
||||
self.object_audit(path, device, partition)
|
||||
if time.time() - reported >= 3600: # once an hour
|
||||
self.logger.info(
|
||||
'Since %s: Locally: %d passed audit, %d quarantined, '
|
||||
'%d errors' % (time.ctime(reported), self.passes,
|
||||
self.quarantines, self.errors))
|
||||
reported = time.time()
|
||||
self.passes = 0
|
||||
self.quarantines = 0
|
||||
self.errors = 0
|
||||
elapsed = time.time() - begin
|
||||
self.logger.info(
|
||||
'Object audit "once" mode completed: %.02fs' % elapsed)
|
||||
|
||||
def object_audit(self, device):
|
||||
"""Walk the device, and audit any objects found."""
|
||||
datadir = os.path.join(self.devices, device, object_server.DATADIR)
|
||||
if not os.path.exists(datadir):
|
||||
return
|
||||
name = None
|
||||
partition = None
|
||||
attempts = 100
|
||||
while not name and attempts:
|
||||
attempts -= 1
|
||||
try:
|
||||
partition = choice(os.listdir(datadir))
|
||||
fpath = os.path.join(datadir, partition)
|
||||
if not os.path.isdir(fpath):
|
||||
continue
|
||||
suffix = choice(os.listdir(fpath))
|
||||
fpath = os.path.join(fpath, suffix)
|
||||
if not os.path.isdir(fpath):
|
||||
continue
|
||||
hsh = choice(os.listdir(fpath))
|
||||
fpath = os.path.join(fpath, hsh)
|
||||
if not os.path.isdir(fpath):
|
||||
continue
|
||||
except IndexError:
|
||||
continue
|
||||
for fname in sorted(os.listdir(fpath), reverse=True):
|
||||
if fname.endswith('.ts'):
|
||||
break
|
||||
if fname.endswith('.data'):
|
||||
name = object_server.read_metadata(
|
||||
os.path.join(fpath, fname))['name']
|
||||
break
|
||||
if not name:
|
||||
return
|
||||
_, account, container, obj = name.split('/', 3)
|
||||
df = object_server.DiskFile(self.devices, device, partition, account,
|
||||
container, obj, keep_data_fp=True)
|
||||
def object_audit(self, path, device, partition):
|
||||
"""
|
||||
Audits the given object path
|
||||
|
||||
:param path: a path to an object
|
||||
:param device: the device the path is on
|
||||
:param partition: the partition the path is on
|
||||
"""
|
||||
try:
|
||||
if not path.endswith('.data'):
|
||||
return
|
||||
try:
|
||||
name = object_server.read_metadata(path)['name']
|
||||
except Exception, exc:
|
||||
raise AuditException('Error when reading metadata: %s' % exc)
|
||||
_, account, container, obj = name.split('/', 3)
|
||||
df = object_server.DiskFile(self.devices, device,
|
||||
partition, account,
|
||||
container, obj,
|
||||
keep_data_fp=True)
|
||||
if os.path.getsize(df.data_file) != \
|
||||
int(df.metadata['Content-Length']):
|
||||
raise AuditException('Content-Length of %s does not match '
|
||||
@ -168,66 +122,14 @@ class ObjectAuditor(Daemon):
|
||||
except AuditException, err:
|
||||
self.quarantines += 1
|
||||
self.logger.error('ERROR Object %s failed audit and will be '
|
||||
'quarantined: %s' % (df.datadir, err))
|
||||
invalidate_hash(os.path.dirname(df.datadir))
|
||||
renamer(df.datadir, os.path.join(self.devices, device,
|
||||
'quarantined', 'objects', os.path.basename(df.datadir)))
|
||||
'quarantined: %s' % (path, err))
|
||||
invalidate_hash(os.path.dirname(path))
|
||||
renamer_path = os.path.dirname(path)
|
||||
renamer(renamer_path, os.path.join(self.devices, device,
|
||||
'quarantined', 'objects', os.path.basename(renamer_path)))
|
||||
return
|
||||
except:
|
||||
except Exception:
|
||||
self.errors += 1
|
||||
self.logger.exception('ERROR Trying to audit %s' % df.datadir)
|
||||
self.logger.exception('ERROR Trying to audit %s' % path)
|
||||
return
|
||||
self.passes += 1
|
||||
found = False
|
||||
good_response = False
|
||||
results = []
|
||||
part, nodes = self.get_container_ring().get_nodes(account, container)
|
||||
for node in nodes:
|
||||
try:
|
||||
with ConnectionTimeout(self.conn_timeout):
|
||||
conn = http_connect(node['ip'], node['port'],
|
||||
node['device'], part, 'GET',
|
||||
'/%s/%s' % (account, container),
|
||||
query_string='prefix=%s' % quote(obj))
|
||||
with Timeout(self.node_timeout):
|
||||
resp = conn.getresponse()
|
||||
body = resp.read()
|
||||
if 200 <= resp.status <= 299:
|
||||
for oname in body.split('\n'):
|
||||
if oname == obj:
|
||||
found = True
|
||||
break
|
||||
if found:
|
||||
break
|
||||
else:
|
||||
results.append('%s:%s/%s %s %s = %s' % (node['ip'],
|
||||
node['port'], node['device'], resp.status,
|
||||
resp.reason, repr(body)))
|
||||
else:
|
||||
results.append('%s:%s/%s %s %s' %
|
||||
(node['ip'], node['port'], node['device'],
|
||||
resp.status, resp.reason))
|
||||
except socket.error, err:
|
||||
results.append('%s:%s/%s Socket Error: %s' % (node['ip'],
|
||||
node['port'], node['device'], err))
|
||||
except ConnectionTimeout:
|
||||
results.append('%(ip)s:%(port)s/%(device)s ConnectionTimeout' %
|
||||
node)
|
||||
except Timeout:
|
||||
results.append('%(ip)s:%(port)s/%(device)s Timeout' % node)
|
||||
except Exception, err:
|
||||
self.logger.exception('ERROR With remote server '
|
||||
'%(ip)s:%(port)s/%(device)s' % node)
|
||||
results.append('%s:%s/%s Exception: %s' % (node['ip'],
|
||||
node['port'], node['device'], err))
|
||||
if found:
|
||||
self.container_passes += 1
|
||||
self.logger.debug('Audit passed for %s %s' % (name, df.datadir))
|
||||
else:
|
||||
if good_response:
|
||||
self.container_failures += 1
|
||||
else:
|
||||
self.container_errors += 1
|
||||
self.logger.error('ERROR Could not find object %s %s on any of '
|
||||
'the primary container servers it should be on: %s' % (name,
|
||||
df.datadir, results))
|
||||
|
Loading…
x
Reference in New Issue
Block a user