Merged from trunk

This commit is contained in:
gholt 2011-06-10 18:50:07 +00:00
commit 3fc5166ff1
5 changed files with 168 additions and 35 deletions

View File

@ -19,11 +19,11 @@ the proxy log output to an hourly log file. For example, a proxy request that
is made on August 4, 2010 at 12:37 gets logged in a file named 2010080412.
This allows easy log rotation and easy per-hour log processing.
******************
Account stats logs
******************
*********************************
Account / Container DB stats logs
*********************************
Account stats logs are generated by a stats system process.
DB stats logs are generated by a stats system process.
swift-account-stats-logger runs on each account server (via cron) and walks
the filesystem looking for account databases. When an account database is
found, the logger selects the account hash, bytes_used, container_count, and
@ -34,7 +34,8 @@ runs the account stats logger every hour. Therefore, in a cluster of ten
account servers, ten csv files are produced every hour. Also, every account
will have one entry for every replica in the system. On average, there will be
three copies of each account in the aggregate of all account stat csv files
created in one system-wide run.
created in one system-wide run. The swift-container-stats-logger runs in a
similar fashion, scanning the container dbs.
----------------------
Log Processing plugins

View File

@ -54,3 +54,4 @@ processable = false
# devices = /srv/node
# mount_check = true
# user = swift
# metadata_keys = comma separated list of user metadata keys to be collected

View File

@ -879,14 +879,16 @@ class ContainerBroker(DatabaseBroker):
return (row['object_count'] in (None, '', 0, '0')) and \
(float(row['delete_timestamp']) > float(row['put_timestamp']))
def get_info(self):
def get_info(self, include_metadata=False):
"""
Get global data for the container.
:returns: sqlite.row of (account, container, created_at, put_timestamp,
delete_timestamp, object_count, bytes_used,
:returns: dict with keys: account, container, created_at,
put_timestamp, delete_timestamp, object_count, bytes_used,
reported_put_timestamp, reported_delete_timestamp,
reported_object_count, reported_bytes_used, hash, id)
reported_object_count, reported_bytes_used, hash, id
If include_metadata is set, metadata is included as a key
pointing to a dict of tuples of the metadata
"""
try:
self._commit_puts()
@ -894,13 +896,34 @@ class ContainerBroker(DatabaseBroker):
if not self.stale_reads_ok:
raise
with self.get() as conn:
return conn.execute('''
SELECT account, container, created_at, put_timestamp,
delete_timestamp, object_count, bytes_used,
reported_put_timestamp, reported_delete_timestamp,
reported_object_count, reported_bytes_used, hash, id
FROM container_stat
''').fetchone()
metadata = ''
if include_metadata:
metadata = ', metadata'
try:
data = conn.execute('''
SELECT account, container, created_at, put_timestamp,
delete_timestamp, object_count, bytes_used,
reported_put_timestamp, reported_delete_timestamp,
reported_object_count, reported_bytes_used, hash, id
%s
FROM container_stat
''' % metadata).fetchone()
except sqlite3.OperationalError, err:
if 'no such column: metadata' not in str(err):
raise
data = conn.execute('''
SELECT account, container, created_at, put_timestamp,
delete_timestamp, object_count, bytes_used,
reported_put_timestamp, reported_delete_timestamp,
reported_object_count, reported_bytes_used, hash, id
FROM container_stat''').fetchone()
data = dict(data)
if include_metadata:
try:
data['metadata'] = json.loads(data.get('metadata', ''))
except ValueError:
data['metadata'] = {}
return data
def reported(self, put_timestamp, delete_timestamp, object_count,
bytes_used):
@ -1394,9 +1417,9 @@ class AccountBroker(DatabaseBroker):
"""
Get global data for the account.
:returns: sqlite.row of (account, created_at, put_timestamp,
:returns: dict with keys: account, created_at, put_timestamp,
delete_timestamp, container_count, object_count,
bytes_used, hash, id)
bytes_used, hash, id
"""
try:
self._commit_puts()
@ -1404,11 +1427,11 @@ class AccountBroker(DatabaseBroker):
if not self.stale_reads_ok:
raise
with self.get() as conn:
return conn.execute('''
return dict(conn.execute('''
SELECT account, created_at, put_timestamp, delete_timestamp,
container_count, object_count, bytes_used, hash, id
FROM account_stat
''').fetchone()
''').fetchone())
def list_containers_iter(self, limit, marker, end_marker, prefix,
delimiter):

View File

@ -58,7 +58,10 @@ class DatabaseStatsCollector(Daemon):
(self.stats_type, (time.time() - start) / 60))
def get_data(self):
raise Exception('Not Implemented')
raise NotImplementedError('Subclasses must override')
def get_header(self):
raise NotImplementedError('Subclasses must override')
def find_and_process(self):
src_filename = time.strftime(self.filename_format)
@ -70,6 +73,7 @@ class DatabaseStatsCollector(Daemon):
hasher = hashlib.md5()
try:
with open(tmp_filename, 'wb') as statfile:
statfile.write(self.get_header())
for device in os.listdir(self.devices):
if self.mount_check and not check_mount(self.devices,
device):
@ -122,6 +126,9 @@ class AccountStatsCollector(DatabaseStatsCollector):
info['bytes_used'])
return line_data
def get_header(self):
return ''
class ContainerStatsCollector(DatabaseStatsCollector):
"""
@ -133,20 +140,38 @@ class ContainerStatsCollector(DatabaseStatsCollector):
super(ContainerStatsCollector, self).__init__(stats_conf, 'container',
container_server_data_dir,
'container-stats-%Y%m%d%H_')
# webob calls title on all the header keys
self.metadata_keys = ['X-Container-Meta-%s' % mkey.strip().title()
for mkey in stats_conf.get('metadata_keys', '').split(',')
if mkey.strip()]
def get_header(self):
header = 'Account Hash,Container Name,Object Count,Bytes Used'
if self.metadata_keys:
xtra_headers = ','.join(self.metadata_keys)
header += ',%s' % xtra_headers
header += '\n'
return header
def get_data(self, db_path):
"""
Data for generated csv has the following columns:
Account Hash, Container Name, Object Count, Bytes Used
This will just collect whether or not the metadata is set
using a 1 or ''.
"""
line_data = None
broker = ContainerBroker(db_path)
if not broker.is_deleted():
info = broker.get_info()
info = broker.get_info(include_metadata=bool(self.metadata_keys))
encoded_container_name = urllib.quote(info['container'])
line_data = '"%s","%s",%d,%d\n' % (
info['account'],
encoded_container_name,
info['object_count'],
info['bytes_used'])
line_data = '"%s","%s",%d,%d' % (
info['account'], encoded_container_name,
info['object_count'], info['bytes_used'])
if self.metadata_keys:
metadata_results = ','.join(
[info['metadata'].get(mkey) and '1' or ''
for mkey in self.metadata_keys])
line_data += ',%s' % metadata_results
line_data += '\n'
return line_data

View File

@ -66,6 +66,16 @@ class TestDbStats(unittest.TestCase):
info = stat.get_data("%s/con.db" % self.containers)
self.assertEquals('''"test_acc","test_con",1,10\n''', info)
def test_container_stat_get_metadata(self):
stat = db_stats_collector.ContainerStatsCollector(self.conf)
container_db = ContainerBroker("%s/con.db" % self.containers,
account='test_acc', container='test_con')
container_db.initialize()
container_db.put_object('test_obj', time.time(), 10, 'text', 'faketag')
info = stat.get_data("%s/con.db" % self.containers)
self.assertEquals('''"test_acc","test_con",1,10\n''', info)
container_db.update_metadata({'test1': ('val', 1000)})
def _gen_account_stat(self):
stat = db_stats_collector.AccountStatsCollector(self.conf)
output_data = set()
@ -83,20 +93,61 @@ class TestDbStats(unittest.TestCase):
self.assertEqual(len(output_data), 10)
return stat, output_data
def _gen_container_stat(self):
def _drop_metadata_col(self, broker, acc_name):
broker.conn.execute('''drop table container_stat''')
broker.conn.executescript("""
CREATE TABLE container_stat (
account TEXT DEFAULT '%s',
container TEXT DEFAULT 'test_con',
created_at TEXT,
put_timestamp TEXT DEFAULT '0',
delete_timestamp TEXT DEFAULT '0',
object_count INTEGER,
bytes_used INTEGER,
reported_put_timestamp TEXT DEFAULT '0',
reported_delete_timestamp TEXT DEFAULT '0',
reported_object_count INTEGER DEFAULT 0,
reported_bytes_used INTEGER DEFAULT 0,
hash TEXT default '00000000000000000000000000000000',
id TEXT,
status TEXT DEFAULT '',
status_changed_at TEXT DEFAULT '0'
);
INSERT INTO container_stat (object_count, bytes_used)
VALUES (1, 10);
""" % acc_name)
def _gen_container_stat(self, set_metadata=False, drop_metadata=False):
if set_metadata:
self.conf['metadata_keys'] = 'test1,test2'
# webob runs title on all headers
stat = db_stats_collector.ContainerStatsCollector(self.conf)
output_data = set()
for i in range(10):
account_db = ContainerBroker(
cont_db = ContainerBroker(
"%s/container-stats-201001010%s-%s.db" % (self.containers, i,
uuid.uuid4().hex),
account='test_acc_%s' % i, container='test_con')
account_db.initialize()
account_db.put_object('test_obj', time.time(), 10, 'text',
'faketag')
cont_db.initialize()
cont_db.put_object('test_obj', time.time(), 10, 'text', 'faketag')
metadata_output = ''
if set_metadata:
if i % 2:
cont_db.update_metadata({'X-Container-Meta-Test1': (5, 1)})
metadata_output = ',1,'
else:
cont_db.update_metadata({'X-Container-Meta-Test2': (7, 2)})
metadata_output = ',,1'
# this will "commit" the data
account_db.get_info()
output_data.add('''"test_acc_%s","test_con",1,10''' % i),
cont_db.get_info()
if drop_metadata:
output_data.add('''"test_acc_%s","test_con",1,10,,''' % i)
else:
output_data.add('''"test_acc_%s","test_con",1,10%s''' %
(i, metadata_output))
if drop_metadata:
self._drop_metadata_col(cont_db, 'test_acc_%s' % i)
self.assertEqual(len(output_data), 10)
return stat, output_data
@ -112,6 +163,35 @@ class TestDbStats(unittest.TestCase):
self.assertEqual(len(output_data), 0)
def test_account_stat_run_once_container_metadata(self):
stat, output_data = self._gen_container_stat(set_metadata=True)
stat.run_once()
stat_file = os.listdir(self.log_dir)[0]
with open(os.path.join(self.log_dir, stat_file)) as stat_handle:
headers = stat_handle.readline()
self.assert_(headers.startswith('Account Hash,Container Name,'))
for i in range(10):
data = stat_handle.readline()
output_data.discard(data.strip())
self.assertEqual(len(output_data), 0)
def test_account_stat_run_once_container_no_metadata(self):
stat, output_data = self._gen_container_stat(set_metadata=True,
drop_metadata=True)
stat.run_once()
stat_file = os.listdir(self.log_dir)[0]
with open(os.path.join(self.log_dir, stat_file)) as stat_handle:
headers = stat_handle.readline()
self.assert_(headers.startswith('Account Hash,Container Name,'))
for i in range(10):
data = stat_handle.readline()
output_data.discard(data.strip())
self.assertEqual(len(output_data), 0)
def test_account_stat_run_once_both(self):
acc_stat, acc_output_data = self._gen_account_stat()
con_stat, con_output_data = self._gen_container_stat()
@ -128,6 +208,8 @@ class TestDbStats(unittest.TestCase):
con_stat.run_once()
stat_file = [f for f in os.listdir(self.log_dir) if f != stat_file][0]
with open(os.path.join(self.log_dir, stat_file)) as stat_handle:
headers = stat_handle.readline()
self.assert_(headers.startswith('Account Hash,Container Name,'))
for i in range(10):
data = stat_handle.readline()
con_output_data.discard(data.strip())
@ -143,7 +225,8 @@ class TestDbStats(unittest.TestCase):
def test_not_implemented(self):
db_stat = db_stats_collector.DatabaseStatsCollector(self.conf,
'account', 'test_dir', 'stats-%Y%m%d%H_')
self.assertRaises(Exception, db_stat.get_data)
self.assertRaises(NotImplementedError, db_stat.get_data)
self.assertRaises(NotImplementedError, db_stat.get_header)
def test_not_not_mounted(self):
self.conf['mount_check'] = 'true'