Merged from trunk

This commit is contained in:
gholt 2011-06-13 20:51:06 +00:00
commit 024df7c0f2
15 changed files with 262 additions and 70 deletions

View File

@ -547,6 +547,10 @@ error_suppression_limit 10 Error count to consider a
node error limited
allow_account_management false Whether account PUTs and DELETEs
are even callable
account_autocreate false If set to 'true' authorized
accounts that do not yet exist
within the Swift cluster will
be automatically created.
============================ =============== =============================
[tempauth]

View File

@ -19,11 +19,11 @@ the proxy log output to an hourly log file. For example, a proxy request that
is made on August 4, 2010 at 12:37 gets logged in a file named 2010080412.
This allows easy log rotation and easy per-hour log processing.
******************
Account stats logs
******************
*********************************
Account / Container DB stats logs
*********************************
Account stats logs are generated by a stats system process.
DB stats logs are generated by a stats system process.
swift-account-stats-logger runs on each account server (via cron) and walks
the filesystem looking for account databases. When an account database is
found, the logger selects the account hash, bytes_used, container_count, and
@ -34,7 +34,8 @@ runs the account stats logger every hour. Therefore, in a cluster of ten
account servers, ten csv files are produced every hour. Also, every account
will have one entry for every replica in the system. On average, there will be
three copies of each account in the aggregate of all account stat csv files
created in one system-wide run.
created in one system-wide run. The swift-container-stats-logger runs in a
similar fashion, scanning the container dbs.
----------------------
Log Processing plugins

View File

@ -54,3 +54,4 @@ processable = false
# devices = /srv/node
# mount_check = true
# user = swift
# metadata_keys = comma separated list of user metadata keys to be collected

View File

@ -40,6 +40,9 @@ use = egg:swift#proxy
# If set to 'true' any authorized user may create and delete accounts; if
# 'false' no one, even authorized, can.
# allow_account_management = false
# If set to 'true' authorized accounts that do not yet exist within the Swift
# cluster will be automatically created.
# account_autocreate = false
[filter:tempauth]
use = egg:swift#tempauth

View File

@ -43,7 +43,7 @@ class Bench(object):
self.user = conf.user
self.key = conf.key
self.auth_url = conf.auth
self.use_proxy = conf.use_proxy in TRUE_VALUES
self.use_proxy = conf.use_proxy.lower() in TRUE_VALUES
if self.use_proxy:
url, token = client.get_auth(self.auth_url, self.user, self.key)
self.token = token
@ -125,7 +125,7 @@ class BenchController(object):
self.logger = logger
self.conf = conf
self.names = []
self.delete = conf.delete in TRUE_VALUES
self.delete = conf.delete.lower() in TRUE_VALUES
self.gets = int(conf.num_gets)
def run(self):

View File

@ -75,7 +75,8 @@ def run_daemon(klass, conf_file, section_name='', once=False, **kwargs):
log_name=kwargs.get('log_name'))
# once on command line (i.e. daemonize=false) will over-ride config
once = once or conf.get('daemonize', 'true') not in utils.TRUE_VALUES
once = once or \
conf.get('daemonize', 'true').lower() not in utils.TRUE_VALUES
# pre-configure logger
if 'logger' in kwargs:

View File

@ -881,15 +881,17 @@ class ContainerBroker(DatabaseBroker):
return (row['object_count'] in (None, '', 0, '0')) and \
(float(row['delete_timestamp']) > float(row['put_timestamp']))
def get_info(self):
def get_info(self, include_metadata=False):
"""
Get global data for the container.
:returns: sqlite.row of (account, container, created_at, put_timestamp,
delete_timestamp, object_count, bytes_used,
:returns: dict with keys: account, container, created_at,
put_timestamp, delete_timestamp, object_count, bytes_used,
reported_put_timestamp, reported_delete_timestamp,
reported_object_count, reported_bytes_used, hash, id,
x_container_sync_point1, x_container_sync_point2)
x_container_sync_point1, and x_container_sync_point2.
If include_metadata is set, metadata is included as a key
pointing to a dict of tuples of the metadata
"""
try:
self._commit_puts()
@ -897,27 +899,36 @@ class ContainerBroker(DatabaseBroker):
if not self.stale_reads_ok:
raise
with self.get() as conn:
try:
return conn.execute('''
SELECT account, container, created_at, put_timestamp,
delete_timestamp, object_count, bytes_used,
reported_put_timestamp, reported_delete_timestamp,
reported_object_count, reported_bytes_used, hash, id,
x_container_sync_point1, x_container_sync_point2
FROM container_stat
''').fetchone()
except sqlite3.OperationalError, err:
if 'no such column: x_container_sync_point' not in str(err):
raise
return conn.execute('''
SELECT account, container, created_at, put_timestamp,
delete_timestamp, object_count, bytes_used,
reported_put_timestamp, reported_delete_timestamp,
reported_object_count, reported_bytes_used, hash, id,
-1 AS x_container_sync_point1,
-1 AS x_container_sync_point2
FROM container_stat
''').fetchone()
data = None
trailing1 = 'metadata'
trailing2 = 'x_container_sync_point1, x_container_sync_point2'
while not data:
try:
data = conn.execute('''
SELECT account, container, created_at, put_timestamp,
delete_timestamp, object_count, bytes_used,
reported_put_timestamp, reported_delete_timestamp,
reported_object_count, reported_bytes_used, hash,
id, %s, %s
FROM container_stat
''' % (trailing1, trailing2)).fetchone()
except sqlite3.OperationalError, err:
if 'no such column: metadata' in str(err):
trailing1 = "'' as metadata"
elif 'no such column: x_container_sync_point' in str(err):
trailing2 = '-1 AS x_container_sync_point1, ' \
'-1 AS x_container_sync_point2'
else:
raise
data = dict(data)
if include_metadata:
try:
data['metadata'] = json.loads(data.get('metadata', ''))
except ValueError:
data['metadata'] = {}
elif 'metadata' in data:
del data['metadata']
return data
def set_x_container_sync_points(self, sync_point1, sync_point2):
with self.get() as conn:
@ -1449,9 +1460,9 @@ class AccountBroker(DatabaseBroker):
"""
Get global data for the account.
:returns: sqlite.row of (account, created_at, put_timestamp,
:returns: dict with keys: account, created_at, put_timestamp,
delete_timestamp, container_count, object_count,
bytes_used, hash, id)
bytes_used, hash, id
"""
try:
self._commit_puts()
@ -1459,11 +1470,11 @@ class AccountBroker(DatabaseBroker):
if not self.stale_reads_ok:
raise
with self.get() as conn:
return conn.execute('''
return dict(conn.execute('''
SELECT account, created_at, put_timestamp, delete_timestamp,
container_count, object_count, bytes_used, hash, id
FROM account_stat
''').fetchone()
''').fetchone())
def list_containers_iter(self, limit, marker, end_marker, prefix,
delimiter):

View File

@ -270,7 +270,7 @@ class StaticWeb(object):
:param start_response: The original WSGI start_response hook.
:param prefix: Any prefix desired for the container listing.
"""
if self._listings not in TRUE_VALUES:
if self._listings.lower() not in TRUE_VALUES:
resp = HTTPNotFound()(env, self._start_response)
return self._error_response(resp, env, start_response)
tmp_env = self._get_escalated_env(env)

View File

@ -72,7 +72,7 @@ if hash_conf.read('/etc/swift/swift.conf'):
pass
# Used when reading config values
TRUE_VALUES = set(('true', '1', 'yes', 'True', 'Yes', 'on', 'On', 't', 'y'))
TRUE_VALUES = set(('true', '1', 'yes', 'on', 't', 'y'))
def validate_configuration():

View File

@ -41,8 +41,8 @@ from webob.exc import HTTPAccepted, HTTPBadRequest, HTTPMethodNotAllowed, \
from webob import Request, Response
from swift.common.ring import Ring
from swift.common.utils import get_logger, normalize_timestamp, split_path, \
cache_from_env, ContextPool, get_remote_client
from swift.common.utils import cache_from_env, ContextPool, get_logger, \
get_remote_client, normalize_timestamp, split_path, TRUE_VALUES
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_metadata, check_object_creation, \
check_utf8, CONTAINER_LISTING_LIMIT, MAX_ACCOUNT_NAME_LENGTH, \
@ -338,7 +338,7 @@ class Controller(object):
node['errors'] = self.app.error_suppression_limit + 1
node['last_error'] = time.time()
def account_info(self, account):
def account_info(self, account, autocreate=False):
"""
Get account information, and also verify that the account exists.
@ -353,7 +353,7 @@ class Controller(object):
result_code = self.app.memcache.get(cache_key)
if result_code == 200:
return partition, nodes
elif result_code == 404:
elif result_code == 404 and not autocreate:
return None, None
result_code = 0
attempts_left = self.app.account_ring.replica_count
@ -386,6 +386,17 @@ class Controller(object):
except (Exception, TimeoutError):
self.exception_occurred(node, _('Account'),
_('Trying to get account info for %s') % path)
if result_code == 404 and autocreate:
if len(account) > MAX_ACCOUNT_NAME_LENGTH:
return None, None
headers = {'X-Timestamp': normalize_timestamp(time.time()),
'X-Trans-Id': self.trans_id}
resp = self.make_requests(Request.blank('/v1' + path),
self.app.account_ring, partition, 'PUT',
path, [headers] * len(nodes))
if resp.status_int // 100 != 2:
raise Exception('Could not autocreate account %r' % path)
result_code = 200
if self.app.memcache and result_code in (200, 404):
if result_code == 200:
cache_timeout = self.app.recheck_account_existence
@ -397,7 +408,7 @@ class Controller(object):
return partition, nodes
return None, None
def container_info(self, account, container):
def container_info(self, account, container, account_autocreate=False):
"""
Get container information and thusly verify container existance.
This will also make a call to account_info to verify that the
@ -424,7 +435,7 @@ class Controller(object):
return partition, nodes, read_acl, write_acl, sync_key
elif status == 404:
return None, None, None, None, None
if not self.account_info(account)[1]:
if not self.account_info(account, autocreate=account_autocreate)[1]:
return None, None, None, None, None
result_code = 0
read_acl = None
@ -864,7 +875,8 @@ class ObjectController(Controller):
if error_response:
return error_response
container_partition, containers, _junk, req.acl, _junk = \
self.container_info(self.account_name, self.container_name)
self.container_info(self.account_name, self.container_name,
account_autocreate=self.app.account_autocreate)
if 'swift.authorize' in req.environ:
aresp = req.environ['swift.authorize'](req)
if aresp:
@ -922,7 +934,8 @@ class ObjectController(Controller):
"""HTTP PUT request handler."""
(container_partition, containers, _junk, req.acl,
req.environ['swift_sync_key']) = \
self.container_info(self.account_name, self.container_name)
self.container_info(self.account_name, self.container_name,
account_autocreate=self.app.account_autocreate)
if 'swift.authorize' in req.environ:
aresp = req.environ['swift.authorize'](req)
if aresp:
@ -1268,7 +1281,8 @@ class ContainerController(Controller):
resp.body = 'Container name length of %d longer than %d' % \
(len(self.container_name), MAX_CONTAINER_NAME_LENGTH)
return resp
account_partition, accounts = self.account_info(self.account_name)
account_partition, accounts = self.account_info(self.account_name,
autocreate=self.app.account_autocreate)
if not accounts:
return HTTPNotFound(request=req)
container_partition, containers = self.app.container_ring.get_nodes(
@ -1298,7 +1312,8 @@ class ContainerController(Controller):
self.clean_acls(req) or check_metadata(req, 'container')
if error_response:
return error_response
account_partition, accounts = self.account_info(self.account_name)
account_partition, accounts = self.account_info(self.account_name,
autocreate=self.app.account_autocreate)
if not accounts:
return HTTPNotFound(request=req)
container_partition, containers = self.app.container_ring.get_nodes(
@ -1440,7 +1455,7 @@ class BaseApplication(object):
self.put_queue_depth = int(conf.get('put_queue_depth', 10))
self.object_chunk_size = int(conf.get('object_chunk_size', 65536))
self.client_chunk_size = int(conf.get('client_chunk_size', 65536))
self.log_headers = conf.get('log_headers') == 'True'
self.log_headers = conf.get('log_headers', 'no').lower() in TRUE_VALUES
self.error_suppression_interval = \
int(conf.get('error_suppression_interval', 60))
self.error_suppression_limit = \
@ -1450,7 +1465,7 @@ class BaseApplication(object):
self.recheck_account_existence = \
int(conf.get('recheck_account_existence', 60))
self.allow_account_management = \
conf.get('allow_account_management', 'false').lower() == 'true'
conf.get('allow_account_management', 'no').lower() in TRUE_VALUES
self.resellers_conf = ConfigParser()
self.resellers_conf.read(os.path.join(swift_dir, 'resellers.conf'))
self.object_ring = object_ring or \
@ -1462,6 +1477,8 @@ class BaseApplication(object):
self.memcache = memcache
mimetypes.init(mimetypes.knownfiles +
[os.path.join(swift_dir, 'mime.types')])
self.account_autocreate = \
conf.get('account_autocreate', 'no').lower() in TRUE_VALUES
def get_controller(self, path):
"""

View File

@ -58,7 +58,10 @@ class DatabaseStatsCollector(Daemon):
(self.stats_type, (time.time() - start) / 60))
def get_data(self):
raise Exception('Not Implemented')
raise NotImplementedError('Subclasses must override')
def get_header(self):
raise NotImplementedError('Subclasses must override')
def find_and_process(self):
src_filename = time.strftime(self.filename_format)
@ -70,6 +73,7 @@ class DatabaseStatsCollector(Daemon):
hasher = hashlib.md5()
try:
with open(tmp_filename, 'wb') as statfile:
statfile.write(self.get_header())
for device in os.listdir(self.devices):
if self.mount_check and not check_mount(self.devices,
device):
@ -122,6 +126,9 @@ class AccountStatsCollector(DatabaseStatsCollector):
info['bytes_used'])
return line_data
def get_header(self):
return ''
class ContainerStatsCollector(DatabaseStatsCollector):
"""
@ -133,20 +140,38 @@ class ContainerStatsCollector(DatabaseStatsCollector):
super(ContainerStatsCollector, self).__init__(stats_conf, 'container',
container_server_data_dir,
'container-stats-%Y%m%d%H_')
# webob calls title on all the header keys
self.metadata_keys = ['X-Container-Meta-%s' % mkey.strip().title()
for mkey in stats_conf.get('metadata_keys', '').split(',')
if mkey.strip()]
def get_header(self):
header = 'Account Hash,Container Name,Object Count,Bytes Used'
if self.metadata_keys:
xtra_headers = ','.join(self.metadata_keys)
header += ',%s' % xtra_headers
header += '\n'
return header
def get_data(self, db_path):
"""
Data for generated csv has the following columns:
Account Hash, Container Name, Object Count, Bytes Used
This will just collect whether or not the metadata is set
using a 1 or ''.
"""
line_data = None
broker = ContainerBroker(db_path)
if not broker.is_deleted():
info = broker.get_info()
info = broker.get_info(include_metadata=bool(self.metadata_keys))
encoded_container_name = urllib.quote(info['container'])
line_data = '"%s","%s",%d,%d\n' % (
info['account'],
encoded_container_name,
info['object_count'],
info['bytes_used'])
line_data = '"%s","%s",%d,%d' % (
info['account'], encoded_container_name,
info['object_count'], info['bytes_used'])
if self.metadata_keys:
metadata_results = ','.join(
[info['metadata'].get(mkey) and '1' or ''
for mkey in self.metadata_keys])
line_data += ',%s' % metadata_results
line_data += '\n'
return line_data

View File

@ -69,7 +69,7 @@ class LogUploader(Daemon):
self.internal_proxy = InternalProxy(proxy_server_conf)
self.new_log_cutoff = int(cutoff or
uploader_conf.get('new_log_cutoff', '7200'))
self.unlink_log = uploader_conf.get('unlink_log', 'True').lower() in \
self.unlink_log = uploader_conf.get('unlink_log', 'true').lower() in \
utils.TRUE_VALUES
self.filename_pattern = regex or \
uploader_conf.get('source_filename_pattern',

View File

@ -788,6 +788,10 @@ log_name = yarr'''
['1.1.1.1', '2.2.2.2']),
None)
def test_TRUE_VALUES(self):
for v in utils.TRUE_VALUES:
self.assertEquals(v, v.lower())
if __name__ == '__main__':
unittest.main()

View File

@ -393,6 +393,48 @@ class TestController(unittest.TestCase):
test(404, 507, 503)
test(503, 503, 503)
def test_account_info_account_autocreate(self):
with save_globals():
self.memcache.store = {}
proxy_server.http_connect = \
fake_http_connect(404, 404, 404, 201, 201, 201)
partition, nodes = \
self.controller.account_info(self.account, autocreate=False)
self.check_account_info_return(partition, nodes, is_none=True)
self.memcache.store = {}
proxy_server.http_connect = \
fake_http_connect(404, 404, 404, 201, 201, 201)
partition, nodes = \
self.controller.account_info(self.account)
self.check_account_info_return(partition, nodes, is_none=True)
self.memcache.store = {}
proxy_server.http_connect = \
fake_http_connect(404, 404, 404, 201, 201, 201)
partition, nodes = \
self.controller.account_info(self.account, autocreate=True)
self.check_account_info_return(partition, nodes)
self.memcache.store = {}
proxy_server.http_connect = \
fake_http_connect(404, 404, 404, 503, 201, 201)
partition, nodes = \
self.controller.account_info(self.account, autocreate=True)
self.check_account_info_return(partition, nodes)
self.memcache.store = {}
proxy_server.http_connect = \
fake_http_connect(404, 404, 404, 503, 201, 503)
exc = None
try:
partition, nodes = \
self.controller.account_info(self.account, autocreate=True)
except Exception, err:
exc = err
self.assertEquals(str(exc),
"Could not autocreate account '/some_account'")
def check_container_info_return(self, ret, is_none=False):
if is_none:
partition, nodes, read_acl, write_acl = None, None, None, None
@ -406,7 +448,7 @@ class TestController(unittest.TestCase):
self.assertEqual(write_acl, ret[3])
def test_container_info_invalid_account(self):
def account_info(self, account):
def account_info(self, account, autocreate=False):
return None, None
with save_globals():
@ -417,7 +459,7 @@ class TestController(unittest.TestCase):
# tests if 200 is cached and used
def test_container_info_200(self):
def account_info(self, account):
def account_info(self, account, autocreate=False):
return True, True
with save_globals():
@ -443,7 +485,7 @@ class TestController(unittest.TestCase):
# tests if 404 is cached and used
def test_container_info_404(self):
def account_info(self, account):
def account_info(self, account, autocreate=False):
return True, True
with save_globals():

View File

@ -66,6 +66,16 @@ class TestDbStats(unittest.TestCase):
info = stat.get_data("%s/con.db" % self.containers)
self.assertEquals('''"test_acc","test_con",1,10\n''', info)
def test_container_stat_get_metadata(self):
stat = db_stats_collector.ContainerStatsCollector(self.conf)
container_db = ContainerBroker("%s/con.db" % self.containers,
account='test_acc', container='test_con')
container_db.initialize()
container_db.put_object('test_obj', time.time(), 10, 'text', 'faketag')
info = stat.get_data("%s/con.db" % self.containers)
self.assertEquals('''"test_acc","test_con",1,10\n''', info)
container_db.update_metadata({'test1': ('val', 1000)})
def _gen_account_stat(self):
stat = db_stats_collector.AccountStatsCollector(self.conf)
output_data = set()
@ -83,20 +93,61 @@ class TestDbStats(unittest.TestCase):
self.assertEqual(len(output_data), 10)
return stat, output_data
def _gen_container_stat(self):
def _drop_metadata_col(self, broker, acc_name):
broker.conn.execute('''drop table container_stat''')
broker.conn.executescript("""
CREATE TABLE container_stat (
account TEXT DEFAULT '%s',
container TEXT DEFAULT 'test_con',
created_at TEXT,
put_timestamp TEXT DEFAULT '0',
delete_timestamp TEXT DEFAULT '0',
object_count INTEGER,
bytes_used INTEGER,
reported_put_timestamp TEXT DEFAULT '0',
reported_delete_timestamp TEXT DEFAULT '0',
reported_object_count INTEGER DEFAULT 0,
reported_bytes_used INTEGER DEFAULT 0,
hash TEXT default '00000000000000000000000000000000',
id TEXT,
status TEXT DEFAULT '',
status_changed_at TEXT DEFAULT '0'
);
INSERT INTO container_stat (object_count, bytes_used)
VALUES (1, 10);
""" % acc_name)
def _gen_container_stat(self, set_metadata=False, drop_metadata=False):
if set_metadata:
self.conf['metadata_keys'] = 'test1,test2'
# webob runs title on all headers
stat = db_stats_collector.ContainerStatsCollector(self.conf)
output_data = set()
for i in range(10):
account_db = ContainerBroker(
cont_db = ContainerBroker(
"%s/container-stats-201001010%s-%s.db" % (self.containers, i,
uuid.uuid4().hex),
account='test_acc_%s' % i, container='test_con')
account_db.initialize()
account_db.put_object('test_obj', time.time(), 10, 'text',
'faketag')
cont_db.initialize()
cont_db.put_object('test_obj', time.time(), 10, 'text', 'faketag')
metadata_output = ''
if set_metadata:
if i % 2:
cont_db.update_metadata({'X-Container-Meta-Test1': (5, 1)})
metadata_output = ',1,'
else:
cont_db.update_metadata({'X-Container-Meta-Test2': (7, 2)})
metadata_output = ',,1'
# this will "commit" the data
account_db.get_info()
output_data.add('''"test_acc_%s","test_con",1,10''' % i),
cont_db.get_info()
if drop_metadata:
output_data.add('''"test_acc_%s","test_con",1,10,,''' % i)
else:
output_data.add('''"test_acc_%s","test_con",1,10%s''' %
(i, metadata_output))
if drop_metadata:
self._drop_metadata_col(cont_db, 'test_acc_%s' % i)
self.assertEqual(len(output_data), 10)
return stat, output_data
@ -112,6 +163,35 @@ class TestDbStats(unittest.TestCase):
self.assertEqual(len(output_data), 0)
def test_account_stat_run_once_container_metadata(self):
stat, output_data = self._gen_container_stat(set_metadata=True)
stat.run_once()
stat_file = os.listdir(self.log_dir)[0]
with open(os.path.join(self.log_dir, stat_file)) as stat_handle:
headers = stat_handle.readline()
self.assert_(headers.startswith('Account Hash,Container Name,'))
for i in range(10):
data = stat_handle.readline()
output_data.discard(data.strip())
self.assertEqual(len(output_data), 0)
def test_account_stat_run_once_container_no_metadata(self):
stat, output_data = self._gen_container_stat(set_metadata=True,
drop_metadata=True)
stat.run_once()
stat_file = os.listdir(self.log_dir)[0]
with open(os.path.join(self.log_dir, stat_file)) as stat_handle:
headers = stat_handle.readline()
self.assert_(headers.startswith('Account Hash,Container Name,'))
for i in range(10):
data = stat_handle.readline()
output_data.discard(data.strip())
self.assertEqual(len(output_data), 0)
def test_account_stat_run_once_both(self):
acc_stat, acc_output_data = self._gen_account_stat()
con_stat, con_output_data = self._gen_container_stat()
@ -128,6 +208,8 @@ class TestDbStats(unittest.TestCase):
con_stat.run_once()
stat_file = [f for f in os.listdir(self.log_dir) if f != stat_file][0]
with open(os.path.join(self.log_dir, stat_file)) as stat_handle:
headers = stat_handle.readline()
self.assert_(headers.startswith('Account Hash,Container Name,'))
for i in range(10):
data = stat_handle.readline()
con_output_data.discard(data.strip())
@ -143,7 +225,8 @@ class TestDbStats(unittest.TestCase):
def test_not_implemented(self):
db_stat = db_stats_collector.DatabaseStatsCollector(self.conf,
'account', 'test_dir', 'stats-%Y%m%d%H_')
self.assertRaises(Exception, db_stat.get_data)
self.assertRaises(NotImplementedError, db_stat.get_data)
self.assertRaises(NotImplementedError, db_stat.get_header)
def test_not_not_mounted(self):
self.conf['mount_check'] = 'true'