merge up to trunk

This commit is contained in:
David Goetz 2011-02-16 16:30:13 +00:00
commit d60b931181
49 changed files with 737 additions and 472 deletions

2
bin/st
View File

@ -1723,7 +1723,7 @@ Example:
error_thread.abort = True
while error_thread.isAlive():
error_thread.join(0.01)
except Exception:
except (SystemExit, Exception):
for thread in threading_enumerate():
thread.abort = True
raise

View File

@ -18,9 +18,9 @@ import gettext
from optparse import OptionParser
from os.path import basename
from sys import argv, exit
from urlparse import urlparse
from swift.common.bufferedhttp import http_connect_raw as http_connect
from swift.common.utils import urlparse
if __name__ == '__main__':

View File

@ -18,9 +18,9 @@ import gettext
from optparse import OptionParser
from os.path import basename
from sys import argv, exit
from urlparse import urlparse
from swift.common.bufferedhttp import http_connect_raw as http_connect
from swift.common.utils import urlparse
if __name__ == '__main__':

View File

@ -18,9 +18,9 @@ import gettext
from optparse import OptionParser
from os.path import basename
from sys import argv, exit
from urlparse import urlparse
from swift.common.bufferedhttp import http_connect_raw as http_connect
from swift.common.utils import urlparse
if __name__ == '__main__':

View File

@ -18,9 +18,9 @@ import gettext
from optparse import OptionParser
from os.path import basename
from sys import argv, exit
from urlparse import urlparse
from swift.common.bufferedhttp import http_connect_raw as http_connect
from swift.common.utils import urlparse
if __name__ == '__main__':

View File

@ -22,9 +22,9 @@ import gettext
from optparse import OptionParser
from os.path import basename
from sys import argv, exit
from urlparse import urlparse
from swift.common.bufferedhttp import http_connect_raw as http_connect
from swift.common.utils import urlparse
if __name__ == '__main__':

View File

@ -18,9 +18,9 @@ import gettext
from optparse import OptionParser
from os.path import basename
from sys import argv, exit
from urlparse import urlparse
from swift.common.bufferedhttp import http_connect_raw as http_connect
from swift.common.utils import urlparse
if __name__ == '__main__':

View File

@ -22,9 +22,9 @@ import gettext
from optparse import OptionParser
from os.path import basename
from sys import argv, exit
from urlparse import urlparse
from swift.common.bufferedhttp import http_connect_raw as http_connect
from swift.common.utils import urlparse
if __name__ == '__main__':

View File

@ -23,4 +23,4 @@ if __name__ == '__main__':
# currently AccountStat only supports run_once
options['once'] = True
run_daemon(AccountStat, conf_file, section_name='log-processor-stats',
**options)
log_name="account-stats", **options)

View File

@ -22,7 +22,7 @@ import uuid
from optparse import OptionParser
from swift.common.bench import BenchController
from swift.common.utils import readconf, LogAdapter, NamedFormatter
from swift.common.utils import readconf, LogAdapter
# The defaults should be sufficient to run swift-bench on a SAIO
CONF_DEFAULTS = {
@ -125,9 +125,9 @@ if __name__ == '__main__':
options.log_level.lower(), logging.INFO))
loghandler = logging.StreamHandler()
logger.addHandler(loghandler)
logger = LogAdapter(logger)
logformat = NamedFormatter('swift-bench', logger,
fmt='%(server)s %(asctime)s %(levelname)s %(message)s')
logger = LogAdapter(logger, 'swift-bench')
logformat = logging.Formatter('%(server)s %(asctime)s %(levelname)s '
'%(message)s')
loghandler.setFormatter(logformat)
controller = BenchController(logger, options)

View File

@ -99,7 +99,8 @@ if __name__ == '__main__':
device_dir = conf.get('device_dir', '/srv/node')
minutes = int(conf.get('minutes', 60))
error_limit = int(conf.get('error_limit', 1))
logger = get_logger(conf, 'drive-audit')
conf['log_name'] = conf.get('log_name', 'drive-audit')
logger = get_logger(conf, log_route='drive-audit')
devices = get_devices(device_dir, logger)
logger.debug("Devices found: %s" % str(devices))
if not devices:

View File

@ -34,7 +34,7 @@ if __name__ == '__main__':
uploader_conf.update(plugin_conf)
# pre-configure logger
logger = utils.get_logger(uploader_conf, plugin,
logger = utils.get_logger(uploader_conf, log_route='log-uploader',
log_to_console=options.get('verbose', False))
# currently LogUploader only supports run_once
options['once'] = True

View File

@ -48,6 +48,8 @@ The <search-value> can be of the form:
/sdb1 Matches devices with the device name sdb1
_shiny Matches devices with shiny in the meta data
_"snet: 5.6.7.8" Matches devices with snet: 5.6.7.8 in the meta data
[::1] Matches devices in any zone with the ip ::1
z1-[::1]:5678 Matches devices in zone 1 with the ip ::1 and port 5678
Most specific example:
d74z1-1.2.3.4:5678/sdb1_"snet: 5.6.7.8"
Nerd explanation:
@ -76,6 +78,13 @@ The <search-value> can be of the form:
i += 1
match.append(('ip', search_value[:i]))
search_value = search_value[i:]
elif len(search_value) and search_value[0] == '[':
i = 1
while i < len(search_value) and search_value[i] != ']':
i += 1
i += 1
match.append(('ip', search_value[:i].lstrip('[').rstrip(']')))
search_value = search_value[i:]
if search_value.startswith(':'):
i = 1
while i < len(search_value) and search_value[i].isdigit():
@ -110,6 +119,16 @@ The <search-value> can be of the form:
return devs
def format_device(dev):
"""
Format a device for display.
"""
if ':' in dev['ip']:
return 'd%(id)sz%(zone)s-[%(ip)s]:%(port)s/%(device)s_"%(meta)s"' % dev
else:
return 'd%(id)sz%(zone)s-%(ip)s:%(port)s/%(device)s_"%(meta)s"' % dev
class Commands:
def unknown():
@ -235,10 +254,18 @@ swift-ring-builder <builder_file> add z<zone>-<ip>:<port>/<device_name>_<meta>
print 'Invalid add value: %s' % argv[3]
exit(EXIT_ERROR)
i = 1
while i < len(rest) and rest[i] in '0123456789.':
if rest[i] == '[':
i += 1
ip = rest[1:i]
rest = rest[i:]
while i < len(rest) and rest[i] != ']':
i += 1
i += 1
ip = rest[1:i].lstrip('[').rstrip(']')
rest = rest[i:]
else:
while i < len(rest) and rest[i] in '0123456789.':
i += 1
ip = rest[1:i]
rest = rest[i:]
if not rest.startswith(':'):
print 'Invalid add value: %s' % argv[3]
@ -279,8 +306,12 @@ swift-ring-builder <builder_file> add z<zone>-<ip>:<port>/<device_name>_<meta>
builder.add_dev({'id': next_dev_id, 'zone': zone, 'ip': ip,
'port': port, 'device': device_name, 'weight': weight,
'meta': meta})
print 'Device z%s-%s:%s/%s_"%s" with %s weight got id %s' % \
(zone, ip, port, device_name, meta, weight, next_dev_id)
if ':' in ip:
print 'Device z%s-[%s]:%s/%s_"%s" with %s weight got id %s' % \
(zone, ip, port, device_name, meta, weight, next_dev_id)
else:
print 'Device z%s-%s:%s/%s_"%s" with %s weight got id %s' % \
(zone, ip, port, device_name, meta, weight, next_dev_id)
pickle.dump(builder, open(argv[1], 'wb'), protocol=2)
exit(EXIT_RING_UNCHANGED)
@ -342,6 +373,13 @@ swift-ring-builder <builder_file> set_info <search-value>
i += 1
change.append(('ip', change_value[:i]))
change_value = change_value[i:]
elif len(change_value) and change_value[0] == '[':
i = 1
while i < len(change_value) and change_value[i] != ']':
i += 1
i += 1
change.append(('ip', change_value[:i].lstrip('[').rstrip(']')))
change_value = change_value[i:]
if change_value.startswith(':'):
i = 1
while i < len(change_value) and change_value[i].isdigit():
@ -366,15 +404,13 @@ swift-ring-builder <builder_file> set_info <search-value>
if len(devs) > 1:
print 'Matched more than one device:'
for dev in devs:
print ' d%(id)sz%(zone)s-%(ip)s:%(port)s/%(device)s_' \
'"%(meta)s"' % dev
print ' %s' % format_device(dev)
if raw_input('Are you sure you want to update the info for '
'these %s devices? (y/N) ' % len(devs)) != 'y':
print 'Aborting device modifications'
exit(EXIT_ERROR)
for dev in devs:
orig_dev_string = \
'd%(id)sz%(zone)s-%(ip)s:%(port)s/%(device)s_"%(meta)s"' % dev
orig_dev_string = format_device(dev)
test_dev = dict(dev)
for key, value in change:
test_dev[key] = value
@ -390,9 +426,7 @@ swift-ring-builder <builder_file> set_info <search-value>
exit(EXIT_ERROR)
for key, value in change:
dev[key] = value
new_dev_string = \
'd%(id)sz%(zone)s-%(ip)s:%(port)s/%(device)s_"%(meta)s"' % dev
print 'Device %s is now %s' % (orig_dev_string, new_dev_string)
print 'Device %s is now %s' % (orig_dev_string, format_device(dev))
pickle.dump(builder, open(argv[1], 'wb'), protocol=2)
exit(EXIT_RING_UNCHANGED)

View File

@ -107,6 +107,7 @@ Instructions for Deploying Debian Packages for Swift
apt-get install rsync python-openssl python-setuptools python-webob
python-simplejson python-xattr python-greenlet python-eventlet
python-netifaces
#. Install base packages::

View File

@ -24,6 +24,9 @@ use = egg:swift#proxy
# set log_name = proxy-server
# set log_facility = LOG_LOCAL0
# set log_level = INFO
# set access_log_name = proxy-server
# set access_log_facility = LOG_LOCAL0
# set access_log_level = INFO
# set log_headers = False
# recheck_account_existence = 60
# recheck_container_existence = 60

View File

@ -7,13 +7,16 @@ pid file = /var/run/rsyncd.pid
max connections = 2
path = /srv/node
read only = false
lock file = /var/lock/account.lock
[container]
max connections = 4
path = /srv/node
read only = false
lock file = /var/lock/container.lock
[object]
max connections = 8
path = /srv/node
read only = false
lock file = /var/lock/object.lock

View File

@ -28,7 +28,7 @@ class AccountAuditor(Daemon):
def __init__(self, conf):
self.conf = conf
self.logger = get_logger(conf, 'account-auditor')
self.logger = get_logger(conf, log_route='account-auditor')
self.devices = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')

View File

@ -53,7 +53,7 @@ class AccountReaper(Daemon):
def __init__(self, conf):
self.conf = conf
self.logger = get_logger(conf)
self.logger = get_logger(conf, log_route='account-reaper')
self.devices = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')

View File

@ -42,7 +42,7 @@ class AccountController(object):
"""WSGI controller for the account server."""
def __init__(self, conf):
self.logger = get_logger(conf)
self.logger = get_logger(conf, log_route='account-server')
self.root = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')
@ -86,8 +86,6 @@ class AccountController(object):
return Response(status='507 %s is not mounted' % drive)
broker = self._get_account_broker(drive, part, account)
if container: # put account container
if 'x-cf-trans-id' in req.headers:
broker.pending_timeout = 3
if req.headers.get('x-account-override-deleted', 'no').lower() != \
'yes' and broker.is_deleted():
return HTTPNotFound(request=req)
@ -140,9 +138,6 @@ class AccountController(object):
if self.mount_check and not check_mount(self.root, drive):
return Response(status='507 %s is not mounted' % drive)
broker = self._get_account_broker(drive, part, account)
if not container:
broker.pending_timeout = 0.1
broker.stale_reads_ok = True
if broker.is_deleted():
return HTTPNotFound(request=req)
info = broker.get_info()
@ -171,8 +166,6 @@ class AccountController(object):
if self.mount_check and not check_mount(self.root, drive):
return Response(status='507 %s is not mounted' % drive)
broker = self._get_account_broker(drive, part, account)
broker.pending_timeout = 0.1
broker.stale_reads_ok = True
if broker.is_deleted():
return HTTPNotFound(request=req)
info = broker.get_info()

View File

@ -20,7 +20,6 @@ from contextlib import contextmanager
from time import gmtime, strftime, time
from urllib import unquote, quote
from uuid import uuid4
from urlparse import urlparse
from hashlib import md5, sha1
import hmac
import base64
@ -32,7 +31,7 @@ from webob.exc import HTTPBadRequest, HTTPConflict, HTTPForbidden, \
from swift.common.bufferedhttp import http_connect_raw as http_connect
from swift.common.db import get_db_connection
from swift.common.utils import get_logger, split_path
from swift.common.utils import get_logger, split_path, urlparse
class AuthController(object):
@ -90,7 +89,7 @@ class AuthController(object):
"""
def __init__(self, conf):
self.logger = get_logger(conf)
self.logger = get_logger(conf, log_route='auth-server')
self.super_admin_key = conf.get('super_admin_key')
if not self.super_admin_key:
msg = _('No super_admin_key set in conf file! Exiting.')

View File

@ -16,13 +16,12 @@
import uuid
import time
import random
from urlparse import urlparse
from contextlib import contextmanager
import eventlet.pools
from eventlet.green.httplib import CannotSendRequest
from swift.common.utils import TRUE_VALUES
from swift.common.utils import TRUE_VALUES, urlparse
from swift.common import client
from swift.common import direct_client

View File

@ -26,7 +26,7 @@ class Daemon(object):
def __init__(self, conf):
self.conf = conf
self.logger = utils.get_logger(conf, 'swift-daemon')
self.logger = utils.get_logger(conf, log_route='daemon')
def run_once(self):
"""Override this to run the script once"""
@ -84,7 +84,7 @@ def run_daemon(klass, conf_file, section_name='',
logger = kwargs.pop('logger')
else:
logger = utils.get_logger(conf, conf.get('log_name', section_name),
log_to_console=kwargs.pop('verbose', False))
log_to_console=kwargs.pop('verbose', False), log_route=section_name)
try:
klass(conf).run(once=once, **kwargs)
except KeyboardInterrupt:

View File

@ -27,13 +27,14 @@ import cPickle as pickle
import errno
from random import randint
from tempfile import mkstemp
import traceback
from eventlet import sleep
import simplejson as json
import sqlite3
from swift.common.utils import normalize_timestamp, renamer, \
mkdirs, lock_parent_directory, fallocate
mkdirs, lock_parent_directory
from swift.common.exceptions import LockTimeout
@ -41,8 +42,9 @@ from swift.common.exceptions import LockTimeout
BROKER_TIMEOUT = 25
#: Pickle protocol to use
PICKLE_PROTOCOL = 2
#: Max number of pending entries
PENDING_CAP = 131072
CONNECT_ATTEMPTS = 4
PENDING_COMMIT_TIMEOUT = 900
AUTOCHECKPOINT = 8192
class DatabaseConnectionError(sqlite3.DatabaseError):
@ -123,48 +125,48 @@ def get_db_connection(path, timeout=30, okay_to_create=False):
:param okay_to_create: if True, create the DB if it doesn't exist
:returns: DB connection object
"""
try:
connect_time = time.time()
conn = sqlite3.connect(path, check_same_thread=False,
factory=GreenDBConnection, timeout=timeout)
if path != ':memory:' and not okay_to_create:
# retry logic to address:
# http://www.mail-archive.com/sqlite-users@sqlite.org/msg57092.html
for attempt in xrange(CONNECT_ATTEMPTS):
try:
connect_time = time.time()
conn = sqlite3.connect(path, check_same_thread=False,
factory=GreenDBConnection, timeout=timeout)
# attempt to detect and fail when connect creates the db file
stat = os.stat(path)
if stat.st_size == 0 and stat.st_ctime >= connect_time:
os.unlink(path)
raise DatabaseConnectionError(path,
'DB file created by connect?')
conn.row_factory = sqlite3.Row
conn.text_factory = str
conn.execute('PRAGMA synchronous = NORMAL')
conn.execute('PRAGMA count_changes = OFF')
conn.execute('PRAGMA temp_store = MEMORY')
conn.execute('PRAGMA journal_mode = DELETE')
conn.create_function('chexor', 3, chexor)
except sqlite3.DatabaseError:
import traceback
raise DatabaseConnectionError(path, traceback.format_exc(),
timeout=timeout)
return conn
if path != ':memory:' and not okay_to_create:
stat = os.stat(path)
if stat.st_size == 0 and stat.st_ctime >= connect_time:
os.unlink(path)
raise DatabaseConnectionError(path,
'DB file created by connect?')
conn.execute('PRAGMA journal_mode = WAL')
conn.execute('PRAGMA synchronous = NORMAL')
conn.execute('PRAGMA wal_autocheckpoint = %s' % AUTOCHECKPOINT)
conn.execute('PRAGMA count_changes = OFF')
conn.execute('PRAGMA temp_store = MEMORY')
conn.create_function('chexor', 3, chexor)
conn.row_factory = sqlite3.Row
conn.text_factory = str
return conn
except sqlite3.DatabaseError, e:
errstr = traceback.format_exc()
raise DatabaseConnectionError(path, errstr, timeout=timeout)
class DatabaseBroker(object):
"""Encapsulates working with a database."""
def __init__(self, db_file, timeout=BROKER_TIMEOUT, logger=None,
account=None, container=None, pending_timeout=10,
stale_reads_ok=False):
account=None, container=None):
""" Encapsulates working with a database. """
self.conn = None
self.db_file = db_file
self.pending_file = self.db_file + '.pending'
self.pending_timeout = pending_timeout
self.stale_reads_ok = stale_reads_ok
self.db_dir = os.path.dirname(db_file)
self.timeout = timeout
self.logger = logger or logging.getLogger()
self.account = account
self.container = container
self._db_version = -1
def initialize(self, put_timestamp=None):
"""
@ -233,7 +235,7 @@ class DatabaseBroker(object):
conn.close()
with open(tmp_db_file, 'r+b') as fp:
os.fsync(fp.fileno())
with lock_parent_directory(self.db_file, self.pending_timeout):
with lock_parent_directory(self.db_file, self.timeout):
if os.path.exists(self.db_file):
# It's as if there was a "condition" where different parts
# of the system were "racing" each other.
@ -285,6 +287,7 @@ class DatabaseBroker(object):
self.conn = None
orig_isolation_level = conn.isolation_level
conn.isolation_level = None
conn.execute('PRAGMA journal_mode = DELETE') # remove journal files
conn.execute('BEGIN IMMEDIATE')
try:
yield True
@ -292,6 +295,7 @@ class DatabaseBroker(object):
pass
try:
conn.execute('ROLLBACK')
conn.execute('PRAGMA journal_mode = WAL') # back to WAL mode
conn.isolation_level = orig_isolation_level
self.conn = conn
except Exception:
@ -348,11 +352,6 @@ class DatabaseBroker(object):
:param count: number to get
:returns: list of objects between start and end
"""
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn:
curs = conn.execute('''
SELECT * FROM %s WHERE ROWID > ? ORDER BY ROWID ASC LIMIT ?
@ -401,11 +400,7 @@ class DatabaseBroker(object):
:returns: dict containing keys: hash, id, created_at, put_timestamp,
delete_timestamp, count, max_row, and metadata
"""
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
self._commit_puts()
query_part1 = '''
SELECT hash, id, created_at, put_timestamp, delete_timestamp,
%s_count AS count,
@ -455,34 +450,6 @@ class DatabaseBroker(object):
(rec['sync_point'], rec['remote_id']))
conn.commit()
def _preallocate(self):
"""
The idea is to allocate space in front of an expanding db. If it gets
within 512k of a boundary, it allocates to the next boundary.
Boundaries are 2m, 5m, 10m, 25m, 50m, then every 50m after.
"""
if self.db_file == ':memory:':
return
MB = (1024 * 1024)
def prealloc_points():
for pm in (1, 2, 5, 10, 25, 50):
yield pm * MB
while True:
pm += 50
yield pm * MB
stat = os.stat(self.db_file)
file_size = stat.st_size
allocated_size = stat.st_blocks * 512
for point in prealloc_points():
if file_size <= point - MB / 2:
prealloc_size = point
break
if allocated_size < prealloc_size:
with open(self.db_file, 'rb+') as fp:
fallocate(fp.fileno(), int(prealloc_size))
@property
def metadata(self):
"""
@ -607,7 +574,7 @@ class ContainerBroker(DatabaseBroker):
conn.executescript("""
CREATE TABLE object (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE,
name TEXT,
created_at TEXT,
size INTEGER,
content_type TEXT,
@ -615,7 +582,7 @@ class ContainerBroker(DatabaseBroker):
deleted INTEGER DEFAULT 0
);
CREATE INDEX ix_object_deleted ON object (deleted);
CREATE INDEX ix_object_deleted_name ON object (deleted, name);
CREATE TRIGGER object_insert AFTER INSERT ON object
BEGIN
@ -678,6 +645,15 @@ class ContainerBroker(DatabaseBroker):
''', (self.account, self.container, normalize_timestamp(time.time()),
str(uuid4()), put_timestamp))
def _get_db_version(self, conn):
if self._db_version == -1:
self._db_version = 0
for row in conn.execute('''
SELECT name FROM sqlite_master
WHERE name = 'ix_object_deleted_name' '''):
self._db_version = 1
return self._db_version
def _newid(self, conn):
conn.execute('''
UPDATE container_stat
@ -717,11 +693,6 @@ class ContainerBroker(DatabaseBroker):
:returns: True if the database has no active objects, False otherwise
"""
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn:
row = conn.execute(
'SELECT object_count from container_stat').fetchone()
@ -729,17 +700,16 @@ class ContainerBroker(DatabaseBroker):
def _commit_puts(self, item_list=None):
"""Handles commiting rows in .pending files."""
if self.db_file == ':memory:' or not os.path.exists(self.pending_file):
pending_file = self.db_file + '.pending'
if self.db_file == ':memory:' or not os.path.exists(pending_file):
return
if not os.path.getsize(pending_file):
os.unlink(pending_file)
return
if item_list is None:
item_list = []
with lock_parent_directory(self.pending_file, self.pending_timeout):
self._preallocate()
if not os.path.getsize(self.pending_file):
if item_list:
self.merge_items(item_list)
return
with open(self.pending_file, 'r+b') as fp:
with lock_parent_directory(pending_file, PENDING_COMMIT_TIMEOUT):
with open(pending_file, 'r+b') as fp:
for entry in fp.read().split(':'):
if entry:
try:
@ -752,11 +722,11 @@ class ContainerBroker(DatabaseBroker):
except Exception:
self.logger.exception(
_('Invalid pending entry %(file)s: %(entry)s'),
{'file': self.pending_file, 'entry': entry})
{'file': pending_file, 'entry': entry})
if item_list:
self.merge_items(item_list)
try:
os.ftruncate(fp.fileno(), 0)
os.unlink(pending_file)
except OSError, err:
if err.errno != errno.ENOENT:
raise
@ -774,7 +744,6 @@ class ContainerBroker(DatabaseBroker):
delete
:param sync_timestamp: max update_at timestamp of sync rows to delete
"""
self._commit_puts()
with self.get() as conn:
conn.execute("""
DELETE FROM object
@ -818,30 +787,9 @@ class ContainerBroker(DatabaseBroker):
record = {'name': name, 'created_at': timestamp, 'size': size,
'content_type': content_type, 'etag': etag,
'deleted': deleted}
if self.db_file == ':memory:':
self.merge_items([record])
return
if not os.path.exists(self.db_file):
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
pending_size = 0
try:
pending_size = os.path.getsize(self.pending_file)
except OSError, err:
if err.errno != errno.ENOENT:
raise
if pending_size > PENDING_CAP:
self._commit_puts([record])
else:
with lock_parent_directory(
self.pending_file, self.pending_timeout):
with open(self.pending_file, 'a+b') as fp:
# Colons aren't used in base64 encoding; so they are our
# delimiter
fp.write(':')
fp.write(pickle.dumps(
(name, timestamp, size, content_type, etag, deleted),
protocol=PICKLE_PROTOCOL).encode('base64'))
fp.flush()
self.merge_items([record])
def is_deleted(self, timestamp=None):
"""
@ -851,11 +799,6 @@ class ContainerBroker(DatabaseBroker):
"""
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
return True
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn:
row = conn.execute('''
SELECT put_timestamp, delete_timestamp, object_count
@ -878,11 +821,6 @@ class ContainerBroker(DatabaseBroker):
reported_put_timestamp, reported_delete_timestamp,
reported_object_count, reported_bytes_used, hash, id)
"""
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn:
return conn.execute('''
SELECT account, container, created_at, put_timestamp,
@ -919,11 +857,6 @@ class ContainerBroker(DatabaseBroker):
:returns: list of object names
"""
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
rv = []
with self.get() as conn:
row = conn.execute('''
@ -960,11 +893,6 @@ class ContainerBroker(DatabaseBroker):
:returns: list of tuples of (name, created_at, size, content_type,
etag)
"""
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
if path is not None:
prefix = path
if path:
@ -988,7 +916,10 @@ class ContainerBroker(DatabaseBroker):
elif prefix:
query += ' name >= ? AND'
query_args.append(prefix)
query += ' +deleted = 0 ORDER BY name LIMIT ?'
if self._get_db_version(conn) < 1:
query += ' +deleted = 0 ORDER BY name LIMIT ?'
else:
query += ' deleted = 0 ORDER BY name LIMIT ?'
query_args.append(limit - len(results))
curs = conn.execute(query, query_args)
curs.row_factory = None
@ -1036,18 +967,19 @@ class ContainerBroker(DatabaseBroker):
max_rowid = -1
for rec in item_list:
conn.execute('''
DELETE FROM object WHERE name = ? AND
(created_at < ?)
DELETE FROM object WHERE name = ? AND created_at < ? AND
deleted IN (0, 1)
''', (rec['name'], rec['created_at']))
try:
if not conn.execute('''
SELECT name FROM object WHERE name = ? AND
deleted IN (0, 1)
''', (rec['name'],)).fetchall():
conn.execute('''
INSERT INTO object (name, created_at, size,
content_type, etag, deleted)
VALUES (?, ?, ?, ?, ?, ?)
''', ([rec['name'], rec['created_at'], rec['size'],
rec['content_type'], rec['etag'], rec['deleted']]))
except sqlite3.IntegrityError:
pass
if source:
max_rowid = max(max_rowid, rec['ROWID'])
if source:
@ -1091,7 +1023,7 @@ class AccountBroker(DatabaseBroker):
conn.executescript("""
CREATE TABLE container (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE,
name TEXT,
put_timestamp TEXT,
delete_timestamp TEXT,
object_count INTEGER,
@ -1099,8 +1031,9 @@ class AccountBroker(DatabaseBroker):
deleted INTEGER DEFAULT 0
);
CREATE INDEX ix_container_deleted ON container (deleted);
CREATE INDEX ix_container_name ON container (name);
CREATE INDEX ix_container_deleted_name ON
container (deleted, name);
CREATE TRIGGER container_insert AFTER INSERT ON container
BEGIN
UPDATE account_stat
@ -1164,6 +1097,15 @@ class AccountBroker(DatabaseBroker):
''', (self.account, normalize_timestamp(time.time()), str(uuid4()),
put_timestamp))
def _get_db_version(self, conn):
if self._db_version == -1:
self._db_version = 0
for row in conn.execute('''
SELECT name FROM sqlite_master
WHERE name = 'ix_container_deleted_name' '''):
self._db_version = 1
return self._db_version
def update_put_timestamp(self, timestamp):
"""
Update the put_timestamp. Only modifies it if it is greater than
@ -1193,17 +1135,16 @@ class AccountBroker(DatabaseBroker):
def _commit_puts(self, item_list=None):
"""Handles commiting rows in .pending files."""
if self.db_file == ':memory:' or not os.path.exists(self.pending_file):
pending_file = self.db_file + '.pending'
if self.db_file == ':memory:' or not os.path.exists(pending_file):
return
if not os.path.getsize(pending_file):
os.unlink(pending_file)
return
if item_list is None:
item_list = []
with lock_parent_directory(self.pending_file, self.pending_timeout):
self._preallocate()
if not os.path.getsize(self.pending_file):
if item_list:
self.merge_items(item_list)
return
with open(self.pending_file, 'r+b') as fp:
with lock_parent_directory(pending_file, PENDING_COMMIT_TIMEOUT):
with open(pending_file, 'r+b') as fp:
for entry in fp.read().split(':'):
if entry:
try:
@ -1219,11 +1160,11 @@ class AccountBroker(DatabaseBroker):
except Exception:
self.logger.exception(
_('Invalid pending entry %(file)s: %(entry)s'),
{'file': self.pending_file, 'entry': entry})
{'file': pending_file, 'entry': entry})
if item_list:
self.merge_items(item_list)
try:
os.ftruncate(fp.fileno(), 0)
os.unlink(pending_file)
except OSError, err:
if err.errno != errno.ENOENT:
raise
@ -1234,11 +1175,6 @@ class AccountBroker(DatabaseBroker):
:returns: True if the database has no active containers.
"""
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn:
row = conn.execute(
'SELECT container_count from account_stat').fetchone()
@ -1258,7 +1194,6 @@ class AccountBroker(DatabaseBroker):
:param sync_timestamp: max update_at timestamp of sync rows to delete
"""
self._commit_puts()
with self.get() as conn:
conn.execute('''
DELETE FROM container WHERE
@ -1286,11 +1221,6 @@ class AccountBroker(DatabaseBroker):
:returns: put_timestamp of the container
"""
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn:
ret = conn.execute('''
SELECT put_timestamp FROM container
@ -1311,6 +1241,8 @@ class AccountBroker(DatabaseBroker):
:param object_count: number of objects in the container
:param bytes_used: number of bytes used by the container
"""
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
if delete_timestamp > put_timestamp and \
object_count in (None, '', 0, '0'):
deleted = 1
@ -1321,24 +1253,7 @@ class AccountBroker(DatabaseBroker):
'object_count': object_count,
'bytes_used': bytes_used,
'deleted': deleted}
if self.db_file == ':memory:':
self.merge_items([record])
return
commit = False
with lock_parent_directory(self.pending_file, self.pending_timeout):
with open(self.pending_file, 'a+b') as fp:
# Colons aren't used in base64 encoding; so they are our
# delimiter
fp.write(':')
fp.write(pickle.dumps(
(name, put_timestamp, delete_timestamp, object_count,
bytes_used, deleted),
protocol=PICKLE_PROTOCOL).encode('base64'))
fp.flush()
if fp.tell() > PENDING_CAP:
commit = True
if commit:
self._commit_puts()
self.merge_items([record])
def can_delete_db(self, cutoff):
"""
@ -1346,7 +1261,6 @@ class AccountBroker(DatabaseBroker):
:returns: True if the account can be deleted, False otherwise
"""
self._commit_puts()
with self.get() as conn:
row = conn.execute('''
SELECT status, put_timestamp, delete_timestamp, container_count
@ -1372,11 +1286,6 @@ class AccountBroker(DatabaseBroker):
"""
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
return True
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn:
row = conn.execute('''
SELECT put_timestamp, delete_timestamp, container_count, status
@ -1401,11 +1310,6 @@ class AccountBroker(DatabaseBroker):
delete_timestamp, container_count, object_count,
bytes_used, hash, id)
"""
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn:
return conn.execute('''
SELECT account, created_at, put_timestamp, delete_timestamp,
@ -1422,11 +1326,6 @@ class AccountBroker(DatabaseBroker):
:returns: list of container names
"""
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
rv = []
with self.get() as conn:
row = conn.execute('''
@ -1460,11 +1359,6 @@ class AccountBroker(DatabaseBroker):
:returns: list of tuples of (name, object_count, bytes_used, 0)
"""
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
if delimiter and not prefix:
prefix = ''
orig_marker = marker
@ -1485,7 +1379,10 @@ class AccountBroker(DatabaseBroker):
elif prefix:
query += ' name >= ? AND'
query_args.append(prefix)
query += ' +deleted = 0 ORDER BY name LIMIT ?'
if self._get_db_version(conn) < 1:
query += ' +deleted = 0 ORDER BY name LIMIT ?'
else:
query += ' deleted = 0 ORDER BY name LIMIT ?'
query_args.append(limit - len(results))
curs = conn.execute(query, query_args)
curs.row_factory = None
@ -1529,51 +1426,39 @@ class AccountBroker(DatabaseBroker):
record = [rec['name'], rec['put_timestamp'],
rec['delete_timestamp'], rec['object_count'],
rec['bytes_used'], rec['deleted']]
try:
conn.execute('''
INSERT INTO container (name, put_timestamp,
delete_timestamp, object_count, bytes_used,
deleted)
VALUES (?, ?, ?, ?, ?, ?)
''', record)
except sqlite3.IntegrityError:
curs = conn.execute('''
SELECT name, put_timestamp, delete_timestamp,
object_count, bytes_used, deleted
FROM container WHERE name = ? AND
(put_timestamp < ? OR delete_timestamp < ? OR
object_count != ? OR bytes_used != ?)''',
(rec['name'], rec['put_timestamp'],
rec['delete_timestamp'], rec['object_count'],
rec['bytes_used']))
curs.row_factory = None
row = curs.fetchone()
if row:
row = list(row)
for i in xrange(5):
if record[i] is None and row[i] is not None:
record[i] = row[i]
if row[1] > record[1]: # Keep newest put_timestamp
record[1] = row[1]
if row[2] > record[2]: # Keep newest delete_timestamp
record[2] = row[2]
conn.execute('DELETE FROM container WHERE name = ?',
(record[0],))
# If deleted, mark as such
if record[2] > record[1] and \
record[3] in (None, '', 0, '0'):
record[5] = 1
else:
record[5] = 0
try:
conn.execute('''
INSERT INTO container (name, put_timestamp,
delete_timestamp, object_count, bytes_used,
deleted)
VALUES (?, ?, ?, ?, ?, ?)
''', record)
except sqlite3.IntegrityError:
continue
curs = conn.execute('''
SELECT name, put_timestamp, delete_timestamp,
object_count, bytes_used, deleted
FROM container WHERE name = ? AND
deleted IN (0, 1)
''', (rec['name'],))
curs.row_factory = None
row = curs.fetchone()
if row:
row = list(row)
for i in xrange(5):
if record[i] is None and row[i] is not None:
record[i] = row[i]
if row[1] > record[1]: # Keep newest put_timestamp
record[1] = row[1]
if row[2] > record[2]: # Keep newest delete_timestamp
record[2] = row[2]
# If deleted, mark as such
if record[2] > record[1] and \
record[3] in (None, '', 0, '0'):
record[5] = 1
else:
record[5] = 0
conn.execute('''
DELETE FROM container WHERE name = ? AND
deleted IN (0, 1)
''', (record[0],))
conn.execute('''
INSERT INTO container (name, put_timestamp,
delete_timestamp, object_count, bytes_used,
deleted)
VALUES (?, ?, ?, ?, ?, ?)
''', record)
if source:
max_rowid = max(max_rowid, rec['ROWID'])
if source:

View File

@ -92,7 +92,7 @@ class Replicator(Daemon):
def __init__(self, conf):
self.conf = conf
self.logger = get_logger(conf)
self.logger = get_logger(conf, log_route='replicator')
self.root = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')
@ -180,7 +180,9 @@ class Replicator(Daemon):
return False
# perform block-level sync if the db was modified during the first sync
if os.path.exists(broker.db_file + '-journal') or \
os.path.getmtime(broker.db_file) > mtime:
os.path.exists(broker.db_file + '-wal') or \
os.path.exists(broker.db_file + '-shm') or \
os.path.getmtime(broker.db_file) > mtime:
# grab a lock so nobody else can modify it
with broker.lock():
if not self._rsync_file(broker.db_file, remote_file, False):
@ -316,7 +318,7 @@ class Replicator(Daemon):
self.logger.debug(_('Replicating db %s'), object_file)
self.stats['attempted'] += 1
try:
broker = self.brokerclass(object_file, pending_timeout=30)
broker = self.brokerclass(object_file)
broker.reclaim(time.time() - self.reclaim_age,
time.time() - (self.reclaim_age * 2))
info = broker.get_replication_info()

View File

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from urlparse import urlparse
from swift.common.utils import urlparse
def clean_acl(name, value):

View File

@ -26,11 +26,7 @@ class CatchErrorMiddleware(object):
def __init__(self, app, conf):
self.app = app
# if the application already has a logger we should use that one
self.logger = getattr(app, 'logger', None)
if not self.logger:
# and only call get_logger if we have to
self.logger = get_logger(conf)
self.logger = get_logger(conf, log_route='catch-errors')
def __call__(self, env, start_response):
try:

View File

@ -53,7 +53,7 @@ class CNAMELookupMiddleware(object):
self.storage_domain = '.' + self.storage_domain
self.lookup_depth = int(conf.get('lookup_depth', '1'))
self.memcache = None
self.logger = get_logger(conf)
self.logger = get_logger(conf, log_route='cname-lookup')
def __call__(self, env, start_response):
if not self.storage_domain:

View File

@ -39,7 +39,7 @@ class RateLimitMiddleware(object):
if logger:
self.logger = logger
else:
self.logger = get_logger(conf)
self.logger = get_logger(conf, log_route='ratelimit')
self.account_ratelimit = float(conf.get('account_ratelimit', 0))
self.max_sleep_time_seconds = \
float(conf.get('max_sleep_time_seconds', 60))

View File

@ -21,7 +21,6 @@ from httplib import HTTPConnection, HTTPSConnection
from time import gmtime, strftime, time
from traceback import format_exc
from urllib import quote, unquote
from urlparse import urlparse
from uuid import uuid4
from hashlib import md5, sha1
import hmac
@ -36,7 +35,7 @@ from webob.exc import HTTPAccepted, HTTPBadRequest, HTTPConflict, \
from swift.common.bufferedhttp import http_connect_raw as http_connect
from swift.common.middleware.acl import clean_acl, parse_acl, referrer_allowed
from swift.common.utils import cache_from_env, get_logger, split_path
from swift.common.utils import cache_from_env, get_logger, split_path, urlparse
class Swauth(object):
@ -51,7 +50,7 @@ class Swauth(object):
def __init__(self, app, conf):
self.app = app
self.conf = conf
self.logger = get_logger(conf)
self.logger = get_logger(conf, log_route='swauth')
self.log_headers = conf.get('log_headers') == 'True'
self.reseller_prefix = conf.get('reseller_prefix', 'AUTH').strip()
if self.reseller_prefix and self.reseller_prefix[-1] != '_':

View File

@ -34,6 +34,7 @@ from ConfigParser import ConfigParser, NoSectionError, NoOptionError
from optparse import OptionParser
from tempfile import mkstemp
import cPickle as pickle
from urlparse import urlparse as stdlib_urlparse, ParseResult
import eventlet
from eventlet import greenio, GreenPool, sleep, Timeout, listen
@ -48,6 +49,10 @@ import logging
logging.thread = eventlet.green.thread
logging.threading = eventlet.green.threading
logging._lock = logging.threading.RLock()
# setup notice level logging
NOTICE = 25
logging._levelNames[NOTICE] = 'NOTICE'
SysLogHandler.priority_map['NOTICE'] = 'notice'
# These are lazily pulled from libc elsewhere
_sys_fallocate = None
@ -284,7 +289,8 @@ class LoggerFileObject(object):
return self
class LogAdapter(object):
# double inheritance to support property with setter
class LogAdapter(logging.LoggerAdapter, object):
"""
A Logger like object which performs some reformatting on calls to
:meth:`exception`. Can be used to store a threadlocal transaction id.
@ -292,11 +298,10 @@ class LogAdapter(object):
_txn_id = threading.local()
def __init__(self, logger):
self.logger = logger
for proxied_method in ('debug', 'log', 'warn', 'warning', 'error',
'critical', 'info'):
setattr(self, proxied_method, getattr(logger, proxied_method))
def __init__(self, logger, server):
logging.LoggerAdapter.__init__(self, logger, {})
self.server = server
setattr(self, 'warn', self.warning)
@property
def txn_id(self):
@ -310,15 +315,34 @@ class LogAdapter(object):
def getEffectiveLevel(self):
return self.logger.getEffectiveLevel()
def exception(self, msg, *args):
def process(self, msg, kwargs):
"""
Add extra info to message
"""
kwargs['extra'] = {'server': self.server, 'txn_id': self.txn_id}
return msg, kwargs
def notice(self, msg, *args, **kwargs):
"""
Convenience function for syslog priority LOG_NOTICE. The python
logging lvl is set to 25, just above info. SysLogHandler is
monkey patched to map this log lvl to the LOG_NOTICE syslog
priority.
"""
self.log(NOTICE, msg, *args, **kwargs)
def _exception(self, msg, *args, **kwargs):
logging.LoggerAdapter.exception(self, msg, *args, **kwargs)
def exception(self, msg, *args, **kwargs):
_junk, exc, _junk = sys.exc_info()
call = self.logger.error
call = self.error
emsg = ''
if isinstance(exc, OSError):
if exc.errno in (errno.EIO, errno.ENOSPC):
emsg = str(exc)
else:
call = self.logger.exception
call = self._exception
elif isinstance(exc, socket.error):
if exc.errno == errno.ECONNREFUSED:
emsg = _('Connection refused')
@ -327,7 +351,7 @@ class LogAdapter(object):
elif exc.errno == errno.ETIMEDOUT:
emsg = _('Connection timeout')
else:
call = self.logger.exception
call = self._exception
elif isinstance(exc, eventlet.Timeout):
emsg = exc.__class__.__name__
if hasattr(exc, 'seconds'):
@ -336,53 +360,25 @@ class LogAdapter(object):
if exc.msg:
emsg += ' %s' % exc.msg
else:
call = self.logger.exception
call('%s: %s' % (msg, emsg), *args)
call = self._exception
call('%s: %s' % (msg, emsg), *args, **kwargs)
class NamedFormatter(logging.Formatter):
class TxnFormatter(logging.Formatter):
"""
NamedFormatter is used to add additional information to log messages.
Normally it will simply add the server name as an attribute on the
LogRecord and the default format string will include it at the
begining of the log message. Additionally, if the transaction id is
available and not already included in the message, NamedFormatter will
add it.
NamedFormatter may be initialized with a format string which makes use
of the standard LogRecord attributes. In addition the format string
may include the following mapping key:
+----------------+---------------------------------------------+
| Format | Description |
+================+=============================================+
| %(server)s | Name of the swift server doing logging |
+----------------+---------------------------------------------+
:param server: the swift server name, a string.
:param logger: a Logger or :class:`LogAdapter` instance, additional
context may be pulled from attributes on this logger if
available.
:param fmt: the format string used to construct the message, if none is
supplied it defaults to ``"%(server)s %(message)s"``
Custom logging.Formatter will append txn_id to a log message if the record
has one and the message does not.
"""
def __init__(self, server, logger,
fmt="%(server)s %(message)s"):
logging.Formatter.__init__(self, fmt)
self.server = server
self.logger = logger
def format(self, record):
record.server = self.server
msg = logging.Formatter.format(self, record)
if self.logger.txn_id and (record.levelno != logging.INFO or
self.logger.txn_id not in msg):
msg = "%s (txn: %s)" % (msg, self.logger.txn_id)
if (record.txn_id and record.levelno != logging.INFO and
record.txn_id not in msg):
msg = "%s (txn: %s)" % (msg, record.txn_id)
return msg
def get_logger(conf, name=None, log_to_console=False, log_route=None):
def get_logger(conf, name=None, log_to_console=False, log_route=None,
fmt="%(server)s %(message)s"):
"""
Get the current system logger using config settings.
@ -395,51 +391,52 @@ def get_logger(conf, name=None, log_to_console=False, log_route=None):
:param conf: Configuration dict to read settings from
:param name: Name of the logger
:param log_to_console: Add handler which writes to console on stderr
:param log_route: Route for the logging, not emitted to the log, just used
to separate logging configurations
:param fmt: Override log format
"""
if not conf:
conf = {}
if not hasattr(get_logger, 'root_logger_configured'):
get_logger.root_logger_configured = True
get_logger(conf, name, log_to_console, log_route='root')
if name is None:
name = conf.get('log_name', 'swift')
if not log_route:
log_route = name
if log_route == 'root':
logger = logging.getLogger()
else:
logger = logging.getLogger(log_route)
logger.propagate = False
if not hasattr(get_logger, 'handler4facility'):
get_logger.handler4facility = {}
facility = getattr(SysLogHandler, conf.get('log_facility', 'LOG_LOCAL0'),
SysLogHandler.LOG_LOCAL0)
if facility in get_logger.handler4facility:
logger.removeHandler(get_logger.handler4facility[facility])
get_logger.handler4facility[facility].close()
del get_logger.handler4facility[facility]
if log_to_console:
# check if a previous call to get_logger already added a console logger
if hasattr(get_logger, 'console') and get_logger.console:
logger.removeHandler(get_logger.console)
get_logger.console = logging.StreamHandler(sys.__stderr__)
logger.addHandler(get_logger.console)
get_logger.handler4facility[facility] = \
SysLogHandler(address='/dev/log', facility=facility)
logger = logging.getLogger(log_route)
logger.propagate = False
# all new handlers will get the same formatter
formatter = TxnFormatter(fmt)
# get_logger will only ever add one SysLog Handler to a logger
if not hasattr(get_logger, 'handler4logger'):
get_logger.handler4logger = {}
if logger in get_logger.handler4logger:
logger.removeHandler(get_logger.handler4logger[logger])
get_logger.handler4logger[logger] = \
get_logger.handler4facility[facility]
logger.addHandler(get_logger.handler4facility[facility])
# facility for this logger will be set by last call wins
facility = getattr(SysLogHandler, conf.get('log_facility', 'LOG_LOCAL0'),
SysLogHandler.LOG_LOCAL0)
handler = SysLogHandler(address='/dev/log', facility=facility)
handler.setFormatter(formatter)
logger.addHandler(handler)
get_logger.handler4logger[logger] = handler
# setup console logging
if log_to_console or hasattr(get_logger, 'console_handler4logger'):
# remove pre-existing console handler for this logger
if not hasattr(get_logger, 'console_handler4logger'):
get_logger.console_handler4logger = {}
if logger in get_logger.console_handler4logger:
logger.removeHandler(get_logger.console_handler4logger[logger])
console_handler = logging.StreamHandler(sys.__stderr__)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
get_logger.console_handler4logger[logger] = console_handler
# set the level for the logger
logger.setLevel(
getattr(logging, conf.get('log_level', 'INFO').upper(), logging.INFO))
adapted_logger = LogAdapter(logger)
formatter = NamedFormatter(name, adapted_logger)
get_logger.handler4facility[facility].setFormatter(formatter)
if hasattr(get_logger, 'console'):
get_logger.console.setFormatter(formatter)
adapted_logger = LogAdapter(logger, name)
return adapted_logger
@ -472,8 +469,9 @@ def capture_stdio(logger, **kwargs):
# collect stdio file desc not in use for logging
stdio_fds = [0, 1, 2]
if hasattr(get_logger, 'console'):
stdio_fds.remove(get_logger.console.stream.fileno())
for _junk, handler in getattr(get_logger,
'console_handler4logger', {}).items():
stdio_fds.remove(handler.stream.fileno())
with open(os.devnull, 'r+b') as nullfile:
# close stdio (excludes fds open for logging)
@ -864,3 +862,35 @@ def ratelimit_sleep(running_time, max_rate, incr_by=1, rate_buffer=5):
elif running_time - now > time_per_request:
eventlet.sleep((running_time - now) / clock_accuracy)
return running_time + time_per_request
class ModifiedParseResult(ParseResult):
"Parse results class for urlparse."
@property
def hostname(self):
netloc = self.netloc.split('@', 1)[-1]
if netloc.startswith('['):
return netloc[1:].split(']')[0]
elif ':' in netloc:
return netloc.rsplit(':')[0]
return netloc
@property
def port(self):
netloc = self.netloc.split('@', 1)[-1]
if netloc.startswith('['):
netloc = netloc.rsplit(']')[1]
if ':' in netloc:
return int(netloc.rsplit(':')[1])
return None
def urlparse(url):
"""
urlparse augmentation.
This is necessary because urlparse can't handle RFC 2732 URLs.
:param url: URL to parse.
"""
return ModifiedParseResult(*stdlib_urlparse(url))

View File

@ -68,11 +68,15 @@ def get_socket(conf, default_port=8080):
"""
bind_addr = (conf.get('bind_ip', '0.0.0.0'),
int(conf.get('bind_port', default_port)))
address_family = [addr[0] for addr in socket.getaddrinfo(bind_addr[0],
bind_addr[1], socket.AF_UNSPEC, socket.SOCK_STREAM)
if addr[0] in (socket.AF_INET, socket.AF_INET6)][0]
sock = None
retry_until = time.time() + 30
while not sock and time.time() < retry_until:
try:
sock = listen(bind_addr, backlog=int(conf.get('backlog', 4096)))
sock = listen(bind_addr, backlog=int(conf.get('backlog', 4096)),
family=address_family)
if 'cert_file' in conf:
sock = ssl.wrap_socket(sock, certfile=conf['cert_file'],
keyfile=conf['key_file'])
@ -113,7 +117,7 @@ def run_wsgi(conf_file, app_section, *args, **kwargs):
logger = kwargs.pop('logger')
else:
logger = get_logger(conf, log_name,
log_to_console=kwargs.pop('verbose', False))
log_to_console=kwargs.pop('verbose', False), log_route='wsgi')
# redirect errors to logger and close stdio
capture_stdio(logger)
@ -168,10 +172,10 @@ def run_wsgi(conf_file, app_section, *args, **kwargs):
signal.signal(signal.SIGHUP, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
run_server()
logger.info('Child %d exiting normally' % os.getpid())
logger.notice('Child %d exiting normally' % os.getpid())
return
else:
logger.info('Started child %s' % pid)
logger.notice('Started child %s' % pid)
children.append(pid)
try:
pid, status = os.wait()
@ -182,8 +186,8 @@ def run_wsgi(conf_file, app_section, *args, **kwargs):
if err.errno not in (errno.EINTR, errno.ECHILD):
raise
except KeyboardInterrupt:
logger.info('User quit')
logger.notice('User quit')
break
greenio.shutdown_safe(sock)
sock.close()
logger.info('Exited')
logger.notice('Exited')

View File

@ -28,7 +28,7 @@ class ContainerAuditor(Daemon):
def __init__(self, conf):
self.conf = conf
self.logger = get_logger(conf, 'container-auditor')
self.logger = get_logger(conf, log_route='container-auditor')
self.devices = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')

View File

@ -49,7 +49,7 @@ class ContainerController(object):
save_headers = ['x-container-read', 'x-container-write']
def __init__(self, conf):
self.logger = get_logger(conf)
self.logger = get_logger(conf, log_route='container-server')
self.root = conf.get('devices', '/srv/node/')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')
@ -89,7 +89,7 @@ class ContainerController(object):
account_partition = req.headers.get('X-Account-Partition')
account_device = req.headers.get('X-Account-Device')
if all([account_host, account_partition, account_device]):
account_ip, account_port = account_host.split(':')
account_ip, account_port = account_host.rsplit(':', 1)
new_path = '/' + '/'.join([account, container])
info = broker.get_info()
account_headers = {'x-put-timestamp': info['put_timestamp'],
@ -219,8 +219,6 @@ class ContainerController(object):
if self.mount_check and not check_mount(self.root, drive):
return Response(status='507 %s is not mounted' % drive)
broker = self._get_container_broker(drive, part, account, container)
broker.pending_timeout = 0.1
broker.stale_reads_ok = True
if broker.is_deleted():
return HTTPNotFound(request=req)
info = broker.get_info()
@ -246,8 +244,6 @@ class ContainerController(object):
if self.mount_check and not check_mount(self.root, drive):
return Response(status='507 %s is not mounted' % drive)
broker = self._get_container_broker(drive, part, account, container)
broker.pending_timeout = 0.1
broker.stale_reads_ok = True
if broker.is_deleted():
return HTTPNotFound(request=req)
info = broker.get_info()

View File

@ -37,7 +37,7 @@ class ContainerUpdater(Daemon):
def __init__(self, conf):
self.conf = conf
self.logger = get_logger(conf, 'container-updater')
self.logger = get_logger(conf, log_route='container-updater')
self.devices = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')

View File

@ -31,7 +31,7 @@ class ObjectAuditor(Daemon):
def __init__(self, conf):
self.conf = conf
self.logger = get_logger(conf, 'object-auditor')
self.logger = get_logger(conf, log_route='object-auditor')
self.devices = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')

View File

@ -207,7 +207,7 @@ class ObjectReplicator(Daemon):
:param logger: logging object
"""
self.conf = conf
self.logger = get_logger(conf, 'object-replicator')
self.logger = get_logger(conf, log_route='object-replicator')
self.devices_dir = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')

View File

@ -266,7 +266,7 @@ class ObjectController(object):
<source-dir>/etc/object-server.conf-sample or
/etc/swift/object-server.conf-sample.
"""
self.logger = get_logger(conf)
self.logger = get_logger(conf, log_route='object-server')
self.devices = conf.get('devices', '/srv/node/')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')
@ -301,7 +301,7 @@ class ObjectController(object):
full_path = '/%s/%s/%s' % (account, container, obj)
try:
with ConnectionTimeout(self.conn_timeout):
ip, port = host.split(':')
ip, port = host.rsplit(':', 1)
conn = http_connect(ip, port, contdevice, partition, op,
full_path, headers_out)
with Timeout(self.node_timeout):

View File

@ -35,7 +35,7 @@ class ObjectUpdater(Daemon):
def __init__(self, conf):
self.conf = conf
self.logger = get_logger(conf, 'object-updater')
self.logger = get_logger(conf, log_route='object-updater')
self.devices = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')

View File

@ -624,7 +624,7 @@ class Controller(object):
res.bytes_transferred += len(chunk)
except GeneratorExit:
res.client_disconnect = True
self.app.logger.info(_('Client disconnected on read'))
self.app.logger.warn(_('Client disconnected on read'))
except (Exception, TimeoutError):
self.exception_occurred(node, _('Object'),
_('Trying to read during GET of %s') % req.path)
@ -1054,7 +1054,7 @@ class ObjectController(Controller):
if req.headers.get('transfer-encoding') and chunk == '':
break
except ChunkReadTimeout, err:
self.app.logger.info(
self.app.logger.warn(
_('ERROR Client read timeout (%ss)'), err.seconds)
return HTTPRequestTimeout(request=req)
except Exception:
@ -1064,7 +1064,7 @@ class ObjectController(Controller):
return Response(status='499 Client Disconnect')
if req.content_length and req.bytes_transferred < req.content_length:
req.client_disconnect = True
self.app.logger.info(
self.app.logger.warn(
_('Client disconnected without sending enough data'))
return Response(status='499 Client Disconnect')
statuses = []
@ -1606,12 +1606,20 @@ class BaseApplication(object):
def __init__(self, conf, memcache=None, logger=None, account_ring=None,
container_ring=None, object_ring=None):
if logger is None:
self.logger = get_logger(conf)
else:
self.logger = logger
if conf is None:
conf = {}
if logger is None:
self.logger = get_logger(conf, log_route='proxy-server')
access_log_conf = {}
for key in ('log_facility', 'log_name', 'log_level'):
value = conf.get('access_' + key, conf.get(key, None))
if value:
access_log_conf[key] = value
self.access_logger = get_logger(access_log_conf,
log_route='proxy-access')
else:
self.logger = self.access_logger = logger
swift_dir = conf.get('swift_dir', '/etc/swift')
self.node_timeout = int(conf.get('node_timeout', 10))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
@ -1790,7 +1798,7 @@ class Application(BaseApplication):
if getattr(req, 'client_disconnect', False) or \
getattr(response, 'client_disconnect', False):
status_int = 499
self.logger.info(' '.join(quote(str(x)) for x in (
self.access_logger.info(' '.join(quote(str(x)) for x in (
client or '-',
req.remote_addr or '-',
time.strftime('%d/%b/%Y/%H/%M/%S', time.gmtime()),

View File

@ -34,7 +34,7 @@ class AccessLogProcessor(object):
conf.get('service_ips', '').split(',')\
if x.strip()]
self.warn_percent = float(conf.get('warn_percent', '0.8'))
self.logger = get_logger(conf)
self.logger = get_logger(conf, log_route='access-processor')
def log_line_parser(self, raw_log):
'''given a raw access log line, return a dict of the good parts'''

View File

@ -48,7 +48,8 @@ class AccountStat(Daemon):
self.devices = server_conf.get('devices', '/srv/node')
self.mount_check = server_conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')
self.logger = get_logger(stats_conf, 'swift-account-stats-logger')
self.logger = \
get_logger(stats_conf, log_route='account-stats')
def run_once(self):
self.logger.info(_("Gathering account stats"))

View File

@ -40,7 +40,7 @@ class LogProcessor(object):
def __init__(self, conf, logger):
if isinstance(logger, tuple):
self.logger = get_logger(*logger)
self.logger = get_logger(*logger, log_route='log-processor')
else:
self.logger = logger
@ -226,7 +226,7 @@ class LogProcessorDaemon(Daemon):
c = conf.get('log-processor')
super(LogProcessorDaemon, self).__init__(c)
self.total_conf = conf
self.logger = get_logger(c)
self.logger = get_logger(c, log_route='log-processor')
self.log_processor = LogProcessor(conf, self.logger)
self.lookback_hours = int(c.get('lookback_hours', '120'))
self.lookback_window = int(c.get('lookback_window',

View File

@ -64,8 +64,9 @@ class LogUploader(Daemon):
self.container_name = container_name
self.filename_format = source_filename_format
self.internal_proxy = InternalProxy(proxy_server_conf)
log_name = 'swift-log-uploader-%s' % plugin_name
self.logger = utils.get_logger(uploader_conf, plugin_name)
log_name = '%s-log-uploader' % plugin_name
self.logger = utils.get_logger(uploader_conf, log_name,
log_route=plugin_name)
def run_once(self):
self.logger.info(_("Uploading logs"))

View File

@ -20,7 +20,7 @@ class StatsLogProcessor(object):
"""Transform account storage stat logs"""
def __init__(self, conf):
self.logger = get_logger(conf)
self.logger = get_logger(conf, log_route='stats-processor')
def process(self, obj_stream, data_object_account, data_object_container,
data_object_name):

View File

@ -456,7 +456,7 @@ class TestAuthServer(unittest.TestCase):
def test_basic_logging(self):
log = StringIO()
log_handler = StreamHandler(log)
logger = get_logger(self.conf, 'auth')
logger = get_logger(self.conf, 'auth-server', log_route='auth-server')
logger.logger.addHandler(log_handler)
try:
auth_server.http_connect = fake_http_connect(201)
@ -534,7 +534,7 @@ class TestAuthServer(unittest.TestCase):
orig_Request = auth_server.Request
log = StringIO()
log_handler = StreamHandler(log)
logger = get_logger(self.conf, 'auth')
logger = get_logger(self.conf, 'auth-server', log_route='auth-server')
logger.logger.addHandler(log_handler)
try:
auth_server.Request = request_causing_exception

View File

@ -28,7 +28,7 @@ class MyDaemon(daemon.Daemon):
def __init__(self, conf):
self.conf = conf
self.logger = utils.get_logger(None, 'server')
self.logger = utils.get_logger(None, 'server', log_route='server')
MyDaemon.forever_called = False
MyDaemon.once_called = False
@ -99,7 +99,7 @@ user = %s
sio = StringIO()
logger = logging.getLogger('server')
logger.addHandler(logging.StreamHandler(sio))
logger = utils.get_logger(None, 'server')
logger = utils.get_logger(None, 'server', log_route='server')
daemon.run_daemon(MyDaemon, conf_file, logger=logger)
self.assert_('user quit' in sio.getvalue().lower())

View File

@ -19,6 +19,7 @@ from __future__ import with_statement
import logging
import mimetools
import os
import errno
import socket
import sys
import time
@ -31,6 +32,8 @@ from tempfile import NamedTemporaryFile
from eventlet import sleep
from swift.common.exceptions import TimeoutError, MessageTimeout, \
ConnectionTimeout
from swift.common import utils
@ -76,6 +79,17 @@ class MockSys():
__stderr__ = sys.__stderr__
def reset_loggers():
if hasattr(utils.get_logger, 'handler4logger'):
for logger, handler in utils.get_logger.handler4logger.items():
logger.removeHandler(handler)
delattr(utils.get_logger, 'handler4logger')
if hasattr(utils.get_logger, 'console_handler4logger'):
for logger, h in utils.get_logger.console_handler4logger.items():
logger.removeHandler(h)
delattr(utils.get_logger, 'console_handler4logger')
class TestUtils(unittest.TestCase):
""" Tests for swift.common.utils """
@ -289,23 +303,152 @@ Error: unable to locate %s
sio = StringIO()
logger = logging.getLogger('server')
logger.addHandler(logging.StreamHandler(sio))
logger = utils.get_logger(None, 'server')
logger = utils.get_logger(None, 'server', log_route='server')
logger.warn('test1')
self.assertEquals(sio.getvalue(), 'test1\n')
logger.debug('test2')
self.assertEquals(sio.getvalue(), 'test1\n')
logger = utils.get_logger({'log_level': 'DEBUG'}, 'server')
logger = utils.get_logger({'log_level': 'DEBUG'}, 'server',
log_route='server')
logger.debug('test3')
self.assertEquals(sio.getvalue(), 'test1\ntest3\n')
# Doesn't really test that the log facility is truly being used all the
# way to syslog; but exercises the code.
logger = utils.get_logger({'log_facility': 'LOG_LOCAL3'}, 'server')
logger = utils.get_logger({'log_facility': 'LOG_LOCAL3'}, 'server',
log_route='server')
logger.warn('test4')
self.assertEquals(sio.getvalue(),
'test1\ntest3\ntest4\n')
# make sure debug doesn't log by default
logger.debug('test5')
self.assertEquals(sio.getvalue(),
'test1\ntest3\ntest4\n')
# make sure notice lvl logs by default
logger.notice('test6')
def test_clean_logger_exception(self):
# setup stream logging
sio = StringIO()
logger = utils.get_logger(None)
handler = logging.StreamHandler(sio)
logger.logger.addHandler(handler)
def strip_value(sio):
v = sio.getvalue()
sio.truncate(0)
return v
def log_exception(exc):
try:
raise exc
except (Exception, TimeoutError):
logger.exception('blah')
try:
# establish base case
self.assertEquals(strip_value(sio), '')
logger.info('test')
self.assertEquals(strip_value(sio), 'test\n')
self.assertEquals(strip_value(sio), '')
logger.info('test')
logger.info('test')
self.assertEquals(strip_value(sio), 'test\ntest\n')
self.assertEquals(strip_value(sio), '')
# test OSError
for en in (errno.EIO, errno.ENOSPC):
log_exception(OSError(en, 'my %s error message' % en))
log_msg = strip_value(sio)
self.assert_('Traceback' not in log_msg)
self.assert_('my %s error message' % en in log_msg)
# unfiltered
log_exception(OSError())
self.assert_('Traceback' in strip_value(sio))
# test socket.error
log_exception(socket.error(errno.ECONNREFUSED,
'my error message'))
log_msg = strip_value(sio)
self.assert_('Traceback' not in log_msg)
self.assert_('errno.ECONNREFUSED message test' not in log_msg)
self.assert_('Connection refused' in log_msg)
log_exception(socket.error(errno.EHOSTUNREACH,
'my error message'))
log_msg = strip_value(sio)
self.assert_('Traceback' not in log_msg)
self.assert_('my error message' not in log_msg)
self.assert_('Host unreachable' in log_msg)
log_exception(socket.error(errno.ETIMEDOUT, 'my error message'))
log_msg = strip_value(sio)
self.assert_('Traceback' not in log_msg)
self.assert_('my error message' not in log_msg)
self.assert_('Connection timeout' in log_msg)
# unfiltered
log_exception(socket.error(0, 'my error message'))
log_msg = strip_value(sio)
self.assert_('Traceback' in log_msg)
self.assert_('my error message' in log_msg)
# test eventlet.Timeout
log_exception(ConnectionTimeout(42, 'my error message'))
log_msg = strip_value(sio)
self.assert_('Traceback' not in log_msg)
self.assert_('ConnectionTimeout' in log_msg)
self.assert_('(42s)' in log_msg)
self.assert_('my error message' not in log_msg)
log_exception(MessageTimeout(42, 'my error message'))
log_msg = strip_value(sio)
self.assert_('Traceback' not in log_msg)
self.assert_('MessageTimeout' in log_msg)
self.assert_('(42s)' in log_msg)
self.assert_('my error message' in log_msg)
# test unhandled
log_exception(Exception('my error message'))
log_msg = strip_value(sio)
self.assert_('Traceback' in log_msg)
self.assert_('my error message' in log_msg)
finally:
logger.logger.removeHandler(handler)
reset_loggers()
def test_txn_formatter(self):
# setup stream logging
sio = StringIO()
logger = utils.get_logger(None)
handler = logging.StreamHandler(sio)
handler.setFormatter(utils.TxnFormatter())
logger.logger.addHandler(handler)
def strip_value(sio):
v = sio.getvalue()
sio.truncate(0)
return v
try:
self.assertFalse(logger.txn_id)
logger.error('my error message')
log_msg = strip_value(sio)
self.assert_('my error message' in log_msg)
self.assert_('txn' not in log_msg)
logger.txn_id = '12345'
logger.error('test')
log_msg = strip_value(sio)
self.assert_('txn' in log_msg)
self.assert_('12345' in log_msg)
# test no txn on info message
self.assertEquals(logger.txn_id, '12345')
logger.info('test')
log_msg = strip_value(sio)
self.assert_('txn' not in log_msg)
self.assert_('12345' not in log_msg)
# test txn already in message
self.assertEquals(logger.txn_id, '12345')
logger.warn('test 12345 test')
self.assertEquals(strip_value(sio), 'test 12345 test\n')
finally:
logger.logger.removeHandler(handler)
reset_loggers()
def test_storage_directory(self):
self.assertEquals(utils.storage_directory('objects', '1', 'ABCDEF'),
@ -391,56 +534,71 @@ log_name = yarr'''
logger = utils.get_logger(None, 'dummy')
# mock utils system modules
utils.sys = MockSys()
utils.os = MockOs()
_orig_sys = utils.sys
_orig_os = utils.os
try:
utils.sys = MockSys()
utils.os = MockOs()
# basic test
utils.capture_stdio(logger)
self.assert_(utils.sys.excepthook is not None)
self.assertEquals(utils.os.closed_fds, [0, 1, 2])
self.assert_(utils.sys.stdout is not None)
self.assert_(utils.sys.stderr is not None)
# basic test
utils.capture_stdio(logger)
self.assert_(utils.sys.excepthook is not None)
self.assertEquals(utils.os.closed_fds, [0, 1, 2])
self.assert_(utils.sys.stdout is not None)
self.assert_(utils.sys.stderr is not None)
# reset; test same args, but exc when trying to close stdio
utils.os = MockOs(raise_funcs=('dup2',))
utils.sys = MockSys()
# reset; test same args, but exc when trying to close stdio
utils.os = MockOs(raise_funcs=('dup2',))
utils.sys = MockSys()
# test unable to close stdio
utils.capture_stdio(logger)
self.assert_(utils.sys.excepthook is not None)
self.assertEquals(utils.os.closed_fds, [])
self.assert_(utils.sys.stdout is not None)
self.assert_(utils.sys.stderr is not None)
# test unable to close stdio
utils.capture_stdio(logger)
self.assert_(utils.sys.excepthook is not None)
self.assertEquals(utils.os.closed_fds, [])
self.assert_(utils.sys.stdout is not None)
self.assert_(utils.sys.stderr is not None)
# reset; test some other args
logger = utils.get_logger(None, log_to_console=True)
utils.os = MockOs()
utils.sys = MockSys()
# reset; test some other args
logger = utils.get_logger(None, log_to_console=True)
utils.os = MockOs()
utils.sys = MockSys()
# test console log
utils.capture_stdio(logger, capture_stdout=False,
capture_stderr=False)
self.assert_(utils.sys.excepthook is not None)
# when logging to console, stderr remains open
self.assertEquals(utils.os.closed_fds, [0, 1])
logger.logger.removeHandler(utils.get_logger.console)
# stdio not captured
self.assertFalse(hasattr(utils.sys, 'stdout'))
self.assertFalse(hasattr(utils.sys, 'stderr'))
# test console log
utils.capture_stdio(logger, capture_stdout=False,
capture_stderr=False)
self.assert_(utils.sys.excepthook is not None)
# when logging to console, stderr remains open
self.assertEquals(utils.os.closed_fds, [0, 1])
reset_loggers()
# stdio not captured
self.assertFalse(hasattr(utils.sys, 'stdout'))
self.assertFalse(hasattr(utils.sys, 'stderr'))
reset_loggers()
finally:
utils.sys = _orig_sys
utils.os = _orig_os
def test_get_logger_console(self):
reload(utils) # reset get_logger attrs
reset_loggers()
logger = utils.get_logger(None)
self.assertFalse(hasattr(utils.get_logger, 'console'))
console_handlers = [h for h in logger.logger.handlers if
isinstance(h, logging.StreamHandler)]
self.assertFalse(console_handlers)
logger = utils.get_logger(None, log_to_console=True)
self.assert_(hasattr(utils.get_logger, 'console'))
self.assert_(isinstance(utils.get_logger.console,
logging.StreamHandler))
console_handlers = [h for h in logger.logger.handlers if
isinstance(h, logging.StreamHandler)]
self.assert_(console_handlers)
# make sure you can't have two console handlers
old_handler = utils.get_logger.console
self.assertEquals(len(console_handlers), 1)
old_handler = console_handlers[0]
logger = utils.get_logger(None, log_to_console=True)
self.assertNotEquals(utils.get_logger.console, old_handler)
logger.logger.removeHandler(utils.get_logger.console)
console_handlers = [h for h in logger.logger.handlers if
isinstance(h, logging.StreamHandler)]
self.assertEquals(len(console_handlers), 1)
new_handler = console_handlers[0]
self.assertNotEquals(new_handler, old_handler)
reset_loggers()
def test_ratelimit_sleep(self):
running_time = 0
@ -468,6 +626,28 @@ log_name = yarr'''
total += i
self.assertTrue(abs(50 - (time.time() - start) * 100) < 10)
def test_urlparse(self):
parsed = utils.urlparse('http://127.0.0.1/')
self.assertEquals(parsed.scheme, 'http')
self.assertEquals(parsed.hostname, '127.0.0.1')
self.assertEquals(parsed.path, '/')
parsed = utils.urlparse('http://127.0.0.1:8080/')
self.assertEquals(parsed.port, 8080)
parsed = utils.urlparse('https://127.0.0.1/')
self.assertEquals(parsed.scheme, 'https')
parsed = utils.urlparse('http://[::1]/')
self.assertEquals(parsed.hostname, '::1')
parsed = utils.urlparse('http://[::1]:8080/')
self.assertEquals(parsed.hostname, '::1')
self.assertEquals(parsed.port, 8080)
parsed = utils.urlparse('www.example.com')
self.assertEquals(parsed.hostname, '')
def test_ratelimit_sleep_with_sleep(self):
running_time = 0
start = time.time()

View File

@ -14,7 +14,7 @@
# limitations under the License.
# TODO: Tests
from test import unit as _setup_mocks
from test import unit
import unittest
import tempfile
import os
@ -57,6 +57,7 @@ class TestAuditor(unittest.TestCase):
def tearDown(self):
rmtree(os.path.dirname(self.testdir), ignore_errors=1)
unit.xattr_data = {}
def test_object_audit_extra_data(self):
self.auditor = auditor.ObjectAuditor(self.conf)

View File

@ -16,6 +16,7 @@
from __future__ import with_statement
import cPickle as pickle
import logging
from logging.handlers import SysLogHandler
import os
import sys
import unittest
@ -465,8 +466,138 @@ class TestController(unittest.TestCase):
test(404, 507, 503)
test(503, 503, 503)
class TestProxyServer(unittest.TestCase):
def test_access_log(self):
class MyApp(proxy_server.Application):
def handle_request(self, req):
resp = Response(request=req)
req.response = resp
req.start_time = time()
return resp
def start_response(*args):
pass
class MockLogger():
def __init__(self):
self.buffer = StringIO()
def info(self, msg, args=None):
if args:
msg = msg % args
self.buffer.write(msg)
def strip_value(self):
rv = self.buffer.getvalue()
self.buffer.truncate(0)
return rv
class SnarfStream(object):
# i can't seem to subclass cStringIO
def __init__(self, *args, **kwargs):
self.sio = StringIO()
def strip_value(self):
rv = self.getvalue().strip()
self.truncate(0)
return rv
def __getattr__(self, name):
try:
return object.__getattr__(self, name)
except AttributeError:
try:
return getattr(self.sio, name)
except AttributeError:
return self.__getattribute__(name)
snarf = SnarfStream()
_orig_get_logger = proxy_server.get_logger
def mock_get_logger(*args, **kwargs):
if kwargs.get('log_route') != 'proxy-access':
return _orig_get_logger(*args, **kwargs)
kwargs['log_route'] = 'snarf'
logger = _orig_get_logger(*args, **kwargs)
if [h for h in logger.logger.handlers if
isinstance(h, logging.StreamHandler) and h.stream is snarf]:
# snarf handler already setup!
return logger
formatter = logger.logger.handlers[0].formatter
formatter._fmt += ' %(levelname)s'
snarf_handler = logging.StreamHandler(snarf)
snarf_handler.setFormatter(formatter)
logger.logger.addHandler(snarf_handler)
return logger
def test_conf(conf):
app = MyApp(conf, memcache=FakeMemcache(), account_ring=FakeRing(),
container_ring=FakeRing(), object_ring=FakeRing())
req = Request.blank('')
app(req.environ, start_response)
try:
proxy_server.get_logger = mock_get_logger
test_conf({})
line = snarf.strip_value()
print line
self.assert_(line.startswith('swift'))
self.assert_(line.endswith('INFO'))
test_conf({'log_name': 'snarf-test'})
line = snarf.strip_value()
print line
self.assert_(line.startswith('snarf-test'))
self.assert_(line.endswith('INFO'))
test_conf({'log_name': 'snarf-test', 'log_level': 'ERROR'})
line = snarf.strip_value()
print line
self.assertFalse(line)
test_conf({'log_name': 'snarf-test', 'log_level': 'ERROR',
'access_log_name': 'access-test',
'access_log_level': 'INFO'})
line = snarf.strip_value()
print line
self.assert_(line.startswith('access-test'))
self.assert_(line.endswith('INFO'))
# test facility
def get_facility(logger):
h = [h for h in logger.logger.handlers if
isinstance(h, SysLogHandler)][0]
return h.facility
conf = {'log_facility': 'LOG_LOCAL0'}
app = MyApp(conf, memcache=FakeMemcache(), account_ring=FakeRing(),
container_ring=FakeRing(), object_ring=FakeRing())
self.assertEquals(get_facility(app.logger),
SysLogHandler.LOG_LOCAL0)
self.assertEquals(get_facility(app.access_logger),
SysLogHandler.LOG_LOCAL0)
conf = {'log_facility': 'LOG_LOCAL0',
'access_log_facility': 'LOG_LOCAL1'}
app = MyApp(conf, memcache=FakeMemcache(), account_ring=FakeRing(),
container_ring=FakeRing(), object_ring=FakeRing())
self.assertEquals(get_facility(app.logger),
SysLogHandler.LOG_LOCAL0)
self.assertEquals(get_facility(app.access_logger),
SysLogHandler.LOG_LOCAL1)
conf = {'access_log_facility': 'LOG_LOCAL1'}
app = MyApp(conf, memcache=FakeMemcache(), account_ring=FakeRing(),
container_ring=FakeRing(), object_ring=FakeRing())
self.assertEquals(get_facility(app.logger),
SysLogHandler.LOG_LOCAL0)
self.assertEquals(get_facility(app.access_logger),
SysLogHandler.LOG_LOCAL1)
finally:
proxy_server.get_logger = _orig_get_logger
def test_unhandled_exception(self):
class MyApp(proxy_server.Application):
@ -1805,8 +1936,8 @@ class TestObjectController(unittest.TestCase):
def info(self, msg):
self.msg = msg
orig_logger = prosrv.logger
prosrv.logger = Logger()
orig_logger, orig_access_logger = prosrv.logger, prosrv.access_logger
prosrv.logger = prosrv.access_logger = Logger()
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write(
@ -1822,11 +1953,9 @@ class TestObjectController(unittest.TestCase):
prosrv.logger.msg)
exp = 'host1'
self.assertEquals(prosrv.logger.msg[:len(exp)], exp)
prosrv.logger = orig_logger
# Turn on header logging.
orig_logger = prosrv.logger
prosrv.logger = Logger()
prosrv.logger = prosrv.access_logger = Logger()
prosrv.log_headers = True
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
@ -1840,7 +1969,7 @@ class TestObjectController(unittest.TestCase):
self.assert_('Goofy-Header%3A%20True' in prosrv.logger.msg,
prosrv.logger.msg)
prosrv.log_headers = False
prosrv.logger = orig_logger
prosrv.logger, prosrv.access_logger = orig_logger, orig_access_logger
def test_chunked_put_utf8_all_the_way_down(self):
# Test UTF-8 Unicode all the way through the system