Added metadata to account and container servers

This commit is contained in:
gholt 2010-08-10 12:18:15 -07:00
parent 40779d8d21
commit 15009bb76e
8 changed files with 718 additions and 62 deletions

View File

@ -20,7 +20,6 @@ import time
import traceback
from urllib import unquote
from swift.common.utils import get_logger
from webob import Request, Response
from webob.exc import HTTPAccepted, HTTPBadRequest, \
@ -30,8 +29,8 @@ import simplejson
from xml.sax import saxutils
from swift.common.db import AccountBroker
from swift.common.utils import get_param, split_path, storage_directory, \
hash_path
from swift.common.utils import get_logger, get_param, hash_path, \
normalize_timestamp, split_path, storage_directory
from swift.common.constraints import ACCOUNT_LISTING_LIMIT, \
check_mount, check_float, check_xml_encodable
from swift.common.healthcheck import healthcheck
@ -100,13 +99,26 @@ class AccountController(object):
else:
return HTTPCreated(request=req)
else: # put account
timestamp = normalize_timestamp(req.headers['x-timestamp'])
if not os.path.exists(broker.db_file):
broker.initialize(req.headers['x-timestamp'])
return HTTPCreated(request=req)
broker.initialize(timestamp)
created = True
elif broker.is_status_deleted():
return HTTPForbidden(request=req, body='Recently deleted')
else:
broker.update_put_timestamp(req.headers['x-timestamp'])
created = broker.is_deleted()
broker.update_put_timestamp(timestamp)
if broker.is_deleted():
return HTTPConflict(request=req)
metadata = {}
metadata.update((key, (value, timestamp))
for key, value in req.headers.iteritems()
if key.lower().startswith('x-account-meta-'))
if metadata:
broker.metadata = metadata
if created:
return HTTPCreated(request=req)
else:
return HTTPAccepted(request=req)
def HEAD(self, req):
@ -142,6 +154,9 @@ class AccountController(object):
container_ts = broker.get_container_timestamp(container)
if container_ts is not None:
headers['X-Container-Timestamp'] = container_ts
headers.update((key, value)
for key, (value, timestamp) in broker.metadata.iteritems()
if value != '')
return HTTPNoContent(request=req, headers=headers)
def GET(self, req):
@ -165,6 +180,9 @@ class AccountController(object):
'X-Account-Bytes-Used': info['bytes_used'],
'X-Timestamp': info['created_at'],
'X-PUT-Timestamp': info['put_timestamp']}
resp_headers.update((key, value)
for key, (value, timestamp) in broker.metadata.iteritems()
if value != '')
try:
prefix = get_param(req, 'prefix')
delimiter = get_param(req, 'delimiter')
@ -229,9 +247,9 @@ class AccountController(object):
ret.charset = 'utf8'
return ret
def POST(self, req):
def REPLICATE(self, req):
"""
Handle HTTP POST request.
Handle HTTP REPLICATE request.
Handler for RPC calls for account replication.
"""
try:
@ -250,6 +268,31 @@ class AccountController(object):
ret.request = req
return ret
def POST(self, req):
"""Handle HTTP POST request."""
try:
drive, part, account = split_path(unquote(req.path), 3)
except ValueError, err:
return HTTPBadRequest(body=str(err), content_type='text/plain',
request=req)
if 'x-timestamp' not in req.headers or \
not check_float(req.headers['x-timestamp']):
return HTTPBadRequest(body='Missing or bad timestamp',
request=req, content_type='text/plain')
if self.mount_check and not check_mount(self.root, drive):
return Response(status='507 %s is not mounted' % drive)
broker = self._get_account_broker(drive, part, account)
if broker.is_deleted():
return HTTPNotFound(request=req)
timestamp = normalize_timestamp(req.headers['x-timestamp'])
metadata = {}
metadata.update((key, (value, timestamp))
for key, value in req.headers.iteritems()
if key.lower().startswith('x-account-meta-'))
if metadata:
broker.metadata = metadata
return HTTPNoContent(request=req)
def __call__(self, env, start_response):
start_time = time.time()
req = Request(env)
@ -283,7 +326,7 @@ class AccountController(object):
req.referer or '-', req.user_agent or '-',
trans_time,
additional_info)
if req.method.upper() == 'POST':
if req.method.upper() == 'REPLICATE':
self.logger.debug(log_message)
else:
self.logger.info(log_message)

View File

@ -29,6 +29,7 @@ from random import randint
from tempfile import mkstemp
from eventlet import sleep
import simplejson as json
import sqlite3
from swift.common.utils import normalize_timestamp, renamer, \
@ -396,23 +397,32 @@ class DatabaseBroker(object):
"""
Get information about the DB required for replication.
:returns: tuple of (hash, id, created_at, put_timestamp,
delete_timestamp) from the DB
:returns: dict containing keys: hash, id, created_at, put_timestamp,
delete_timestamp, count, max_row, and metadata
"""
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
query_part1 = '''
SELECT hash, id, created_at, put_timestamp, delete_timestamp,
%s_count AS count,
CASE WHEN SQLITE_SEQUENCE.seq IS NOT NULL
THEN SQLITE_SEQUENCE.seq ELSE -1 END AS max_row, ''' % \
self.db_contains_type
query_part2 = '''
FROM (%s_stat LEFT JOIN SQLITE_SEQUENCE
ON SQLITE_SEQUENCE.name == '%s') LIMIT 1
''' % (self.db_type, self.db_contains_type)
with self.get() as conn:
curs = conn.execute('''
SELECT hash, id, created_at, put_timestamp, delete_timestamp,
%s_count AS count,
CASE WHEN SQLITE_SEQUENCE.seq IS NOT NULL
THEN SQLITE_SEQUENCE.seq ELSE -1 END AS max_row
FROM (%s_stat LEFT JOIN SQLITE_SEQUENCE
ON SQLITE_SEQUENCE.name == '%s') LIMIT 1
''' % (self.db_contains_type, self.db_type, self.db_contains_type))
try:
curs = conn.execute(query_part1 + 'metadata' + query_part2)
except sqlite3.OperationalError, err:
if 'no such column: metadata' not in str(err):
raise
curs = conn.execute(query_part1 + "'' as metadata" +
query_part2)
curs.row_factory = dict_factory
return curs.fetchone()
@ -472,6 +482,88 @@ class DatabaseBroker(object):
with open(self.db_file, 'rb+') as fp:
fallocate(fp.fileno(), int(prealloc_size))
@property
def metadata(self):
"""
Returns the metadata dict for the database. The metadata dict values
are tuples of (value, timestamp) where the timestamp indicates when
that key was set to that value.
"""
with self.get() as conn:
try:
metadata = conn.execute('SELECT metadata FROM %s_stat' %
self.db_type).fetchone()[0]
except sqlite3.OperationalError, err:
if 'no such column: metadata' not in str(err):
raise
metadata = ''
if metadata:
metadata = json.loads(metadata)
else:
metadata = {}
return metadata
@metadata.setter
def metadata(self, new_metadata):
"""
Updates the metadata dict for the database. The metadata dict values
are tuples of (value, timestamp) where the timestamp indicates when
that key was set to that value. Key/values will only be overwritten if
the timestamp is newer. To delete a key, set its value to ('',
timestamp). These empty keys will eventually be removed by
:func:reclaim
"""
old_metadata = self.metadata
if set(new_metadata).issubset(set(old_metadata)):
for key, (value, timestamp) in new_metadata.iteritems():
if timestamp > old_metadata[key][1]:
break
else:
return
with self.get() as conn:
try:
md = conn.execute('SELECT metadata FROM %s_stat' %
self.db_type).fetchone()[0]
md = md and json.loads(md) or {}
except sqlite3.OperationalError, err:
if 'no such column: metadata' not in str(err):
raise
conn.execute("""
ALTER TABLE %s_stat
ADD COLUMN metadata TEXT DEFAULT '' """ % self.db_type)
md = {}
for key, value_timestamp in new_metadata.iteritems():
value, timestamp = value_timestamp
if key not in md or timestamp > md[key][1]:
md[key] = value_timestamp
conn.execute('UPDATE %s_stat SET metadata = ?' % self.db_type,
(json.dumps(md),))
conn.commit()
def reclaim(self, timestamp):
"""Removes any empty metadata values older than the timestamp"""
if not self.metadata:
return
with self.get() as conn:
try:
md = conn.execute('SELECT metadata FROM %s_stat' %
self.db_type).fetchone()[0]
if md:
md = json.loads(md)
keys_to_delete = []
for key, (value, value_timestamp) in md.iteritems():
if value == '' and value_timestamp < timestamp:
keys_to_delete.append(key)
if keys_to_delete:
for key in keys_to_delete:
del md[key]
conn.execute('UPDATE %s_stat SET metadata = ?' %
self.db_type, (json.dumps(md),))
conn.commit()
except sqlite3.OperationalError, err:
if 'no such column: metadata' not in str(err):
raise
class ContainerBroker(DatabaseBroker):
"""Encapsulates working with a container database."""
@ -532,7 +624,7 @@ class ContainerBroker(DatabaseBroker):
def create_container_stat_table(self, conn, put_timestamp=None):
"""
Create the container_stat table which is specifc to the container DB.
Create the container_stat table which is specific to the container DB.
:param conn: DB connection object
:param put_timestamp: put timestamp
@ -555,7 +647,8 @@ class ContainerBroker(DatabaseBroker):
hash TEXT default '00000000000000000000000000000000',
id TEXT,
status TEXT DEFAULT '',
status_changed_at TEXT DEFAULT '0'
status_changed_at TEXT DEFAULT '0',
metadata TEXT DEFAULT ''
);
INSERT INTO container_stat (object_count, bytes_used)
@ -658,6 +751,8 @@ class ContainerBroker(DatabaseBroker):
from incoming_sync and outgoing_sync where the updated_at timestamp is
< sync_timestamp.
In addition, this calls the DatabaseBroker's :func:reclaim method.
:param object_timestamp: max created_at timestamp of object rows to
delete
:param sync_timestamp: max update_at timestamp of sync rows to delete
@ -680,6 +775,7 @@ class ContainerBroker(DatabaseBroker):
if 'no such column: updated_at' not in str(err):
raise
conn.commit()
DatabaseBroker.reclaim(self, object_timestamp)
def delete_object(self, name, timestamp):
"""
@ -1034,7 +1130,8 @@ class AccountBroker(DatabaseBroker):
hash TEXT default '00000000000000000000000000000000',
id TEXT,
status TEXT DEFAULT '',
status_changed_at TEXT DEFAULT '0'
status_changed_at TEXT DEFAULT '0',
metadata TEXT DEFAULT ''
);
INSERT INTO account_stat (container_count) VALUES (0);
@ -1133,6 +1230,8 @@ class AccountBroker(DatabaseBroker):
from incoming_sync and outgoing_sync where the updated_at timestamp is
< sync_timestamp.
In addition, this calls the DatabaseBroker's :func:reclaim method.
:param object_timestamp: max created_at timestamp of container rows to
delete
:param sync_timestamp: max update_at timestamp of sync rows to delete
@ -1156,6 +1255,7 @@ class AccountBroker(DatabaseBroker):
if 'no such column: updated_at' not in str(err):
raise
conn.commit()
DatabaseBroker.reclaim(self, container_timestamp)
def get_container_timestamp(self, container_name):
"""

View File

@ -53,7 +53,7 @@ def quarantine_db(object_file, server_type):
class ReplConnection(BufferedHTTPConnection):
"""
Helper to simplify POSTing to a remote server.
Helper to simplify REPLICATEing to a remote server.
"""
def __init__(self, node, partition, hash_, logger):
@ -63,9 +63,9 @@ class ReplConnection(BufferedHTTPConnection):
BufferedHTTPConnection.__init__(self, '%(ip)s:%(port)s' % node)
self.path = '/%s/%s/%s' % (node['device'], partition, hash_)
def post(self, *args):
def replicate(self, *args):
"""
Make an HTTP POST request
Make an HTTP REPLICATE request
:param args: list of json-encodable objects
@ -73,7 +73,7 @@ class ReplConnection(BufferedHTTPConnection):
"""
try:
body = simplejson.dumps(args)
self.request('POST', self.path, body,
self.request('REPLICATE', self.path, body,
{'Content-Type': 'application/json'})
response = self.getresponse()
response.data = response.read()
@ -158,7 +158,7 @@ class Replicator(object):
return proc.returncode == 0
def _rsync_db(self, broker, device, http, local_id,
post_method='complete_rsync', post_timeout=None):
replicate_method='complete_rsync', replicate_timeout=None):
"""
Sync a whole db using rsync.
@ -166,8 +166,8 @@ class Replicator(object):
:param device: device to sync to
:param http: ReplConnection object
:param local_id: unique ID of the local database replica
:param post_method: remote operation to perform after rsync
:param post_timeout: timeout to wait in seconds
:param replicate_method: remote operation to perform after rsync
:param replicate_timeout: timeout to wait in seconds
"""
if self.vm_test_mode:
remote_file = '%s::%s%s/%s/tmp/%s' % (device['ip'],
@ -186,8 +186,8 @@ class Replicator(object):
with broker.lock():
if not self._rsync_file(broker.db_file, remote_file, False):
return False
with Timeout(post_timeout or self.node_timeout):
response = http.post(post_method, local_id)
with Timeout(replicate_timeout or self.node_timeout):
response = http.replicate(replicate_method, local_id)
return response and response.status >= 200 and response.status < 300
def _usync_db(self, point, broker, http, remote_id, local_id):
@ -208,7 +208,7 @@ class Replicator(object):
objects = broker.get_items_since(point, self.per_diff)
while len(objects):
with Timeout(self.node_timeout):
response = http.post('merge_items', objects, local_id)
response = http.replicate('merge_items', objects, local_id)
if not response or response.status >= 300 or response.status < 200:
if response:
self.logger.error('ERROR Bad response %s from %s' %
@ -217,7 +217,7 @@ class Replicator(object):
point = objects[-1]['ROWID']
objects = broker.get_items_since(point, self.per_diff)
with Timeout(self.node_timeout):
response = http.post('merge_syncs', sync_table)
response = http.replicate('merge_syncs', sync_table)
if response and response.status >= 200 and response.status < 300:
broker.merge_syncs([{'remote_id': remote_id,
'sync_point': point}], incoming=False)
@ -266,7 +266,8 @@ class Replicator(object):
:param broker: DB broker for the DB to be replication
:param partition: partition on the node to replicate to
:param info: DB info as a dictionary of {'max_row', 'hash', 'id',
'created_at', 'put_timestamp', 'delete_timestamp'}
'created_at', 'put_timestamp', 'delete_timestamp',
'metadata'}
:returns: True if successful, False otherwise
"""
@ -277,9 +278,9 @@ class Replicator(object):
'ERROR Unable to connect to remote server: %s' % node)
return False
with Timeout(self.node_timeout):
response = http.post('sync', info['max_row'], info['hash'],
response = http.replicate('sync', info['max_row'], info['hash'],
info['id'], info['created_at'], info['put_timestamp'],
info['delete_timestamp'])
info['delete_timestamp'], info['metadata'])
if not response:
return False
elif response.status == HTTPNotFound.code: # completely missing, rsync
@ -297,8 +298,8 @@ class Replicator(object):
if rinfo['max_row'] / float(info['max_row']) < 0.5:
self.stats['remote_merge'] += 1
return self._rsync_db(broker, node, http, info['id'],
post_method='rsync_then_merge',
post_timeout=(info['count'] / 2000))
replicate_method='rsync_then_merge',
replicate_timeout=(info['count'] / 2000))
# else send diffs over to the remote server
return self._usync_db(max(rinfo['point'], local_sync),
broker, http, rinfo['id'], info['id'])
@ -445,11 +446,11 @@ class ReplicatorRpc(object):
self.broker_class = broker_class
self.mount_check = mount_check
def dispatch(self, post_args, args):
def dispatch(self, replicate_args, args):
if not hasattr(args, 'pop'):
return HTTPBadRequest(body='Invalid object type')
op = args.pop(0)
drive, partition, hsh = post_args
drive, partition, hsh = replicate_args
if self.mount_check and \
not os.path.ismount(os.path.join(self.root, drive)):
return Response(status='507 %s is not mounted' % drive)
@ -469,7 +470,7 @@ class ReplicatorRpc(object):
def sync(self, broker, args):
(remote_sync, hash_, id_, created_at, put_timestamp,
delete_timestamp) = args
delete_timestamp, metadata) = args
try:
info = broker.get_replication_info()
except Exception, e:
@ -479,6 +480,8 @@ class ReplicatorRpc(object):
quarantine_db(broker.db_file, broker.db_type)
return HTTPNotFound()
raise
if metadata:
broker.metadata = simplejson.loads(metadata)
if info['put_timestamp'] != put_timestamp or \
info['created_at'] != created_at or \
info['delete_timestamp'] != delete_timestamp:

View File

@ -31,7 +31,7 @@ from webob.exc import HTTPAccepted, HTTPBadRequest, HTTPConflict, \
from swift.common.db import ContainerBroker
from swift.common.utils import get_logger, get_param, hash_path, \
storage_directory, split_path
normalize_timestamp, storage_directory, split_path
from swift.common.constraints import CONTAINER_LISTING_LIMIT, \
check_mount, check_float, check_xml_encodable
from swift.common.bufferedhttp import http_connect
@ -175,23 +175,29 @@ class ContainerController(object):
content_type='text/plain')
if self.mount_check and not check_mount(self.root, drive):
return Response(status='507 %s is not mounted' % drive)
timestamp = normalize_timestamp(req.headers['x-timestamp'])
broker = self._get_container_broker(drive, part, account, container)
if obj: # put container object
if not os.path.exists(broker.db_file):
return HTTPNotFound()
broker.put_object(obj, req.headers['x-timestamp'],
int(req.headers['x-size']), req.headers['x-content-type'],
req.headers['x-etag'])
broker.put_object(obj, timestamp, int(req.headers['x-size']),
req.headers['x-content-type'], req.headers['x-etag'])
return HTTPCreated(request=req)
else: # put container
if not os.path.exists(broker.db_file):
broker.initialize(req.headers['x-timestamp'])
broker.initialize(timestamp)
created = True
else:
created = broker.is_deleted()
broker.update_put_timestamp(req.headers['x-timestamp'])
broker.update_put_timestamp(timestamp)
if broker.is_deleted():
return HTTPConflict(request=req)
metadata = {}
metadata.update((key, (value, timestamp))
for key, value in req.headers.iteritems()
if key.lower().startswith('x-container-meta-'))
if metadata:
broker.metadata = metadata
resp = self.account_update(req, account, container, broker)
if resp:
return resp
@ -222,6 +228,9 @@ class ContainerController(object):
'X-Timestamp': info['created_at'],
'X-PUT-Timestamp': info['put_timestamp'],
}
headers.update((key, value)
for key, (value, timestamp) in broker.metadata.iteritems()
if value != '')
return HTTPNoContent(request=req, headers=headers)
def GET(self, req):
@ -246,6 +255,9 @@ class ContainerController(object):
'X-Timestamp': info['created_at'],
'X-PUT-Timestamp': info['put_timestamp'],
}
resp_headers.update((key, value)
for key, (value, timestamp) in broker.metadata.iteritems()
if value != '')
try:
path = get_param(req, 'path')
prefix = get_param(req, 'prefix')
@ -324,9 +336,9 @@ class ContainerController(object):
ret.charset = 'utf8'
return ret
def POST(self, req):
def REPLICATE(self, req):
"""
Handle HTTP POST request (json-encoded RPC calls for replication.)
Handle HTTP REPLICATE request (json-encoded RPC calls for replication.)
"""
try:
post_args = split_path(unquote(req.path), 3)
@ -344,6 +356,31 @@ class ContainerController(object):
ret.request = req
return ret
def POST(self, req):
"""Handle HTTP POST request."""
try:
drive, part, account, container = split_path(unquote(req.path), 4)
except ValueError, err:
return HTTPBadRequest(body=str(err), content_type='text/plain',
request=req)
if 'x-timestamp' not in req.headers or \
not check_float(req.headers['x-timestamp']):
return HTTPBadRequest(body='Missing or bad timestamp',
request=req, content_type='text/plain')
if self.mount_check and not check_mount(self.root, drive):
return Response(status='507 %s is not mounted' % drive)
broker = self._get_container_broker(drive, part, account, container)
if broker.is_deleted():
return HTTPNotFound(request=req)
timestamp = normalize_timestamp(req.headers['x-timestamp'])
metadata = {}
metadata.update((key, (value, timestamp))
for key, value in req.headers.iteritems()
if key.lower().startswith('x-container-meta-'))
if metadata:
broker.metadata = metadata
return HTTPNoContent(request=req)
def __call__(self, env, start_response):
start_time = time.time()
req = Request(env)
@ -373,7 +410,7 @@ class ContainerController(object):
req.headers.get('x-cf-trans-id', '-'),
req.referer or '-', req.user_agent or '-',
trans_time)
if req.method.upper() == 'POST':
if req.method.upper() == 'REPLICATE':
self.logger.debug(log_message)
else:
self.logger.info(log_message)

View File

@ -196,6 +196,94 @@ class TestAccountController(unittest.TestCase):
self.assertEquals(resp.status_int, 403)
self.assertEquals(resp.body, 'Recently deleted')
def test_PUT_GET_metadata(self):
# Set metadata header
req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(1),
'X-Account-Meta-Test': 'Value'})
resp = self.controller.PUT(req)
self.assertEquals(resp.status_int, 201)
req = Request.blank('/sda1/p/a')
resp = self.controller.GET(req)
self.assertEquals(resp.status_int, 204)
self.assertEquals(resp.headers.get('x-account-meta-test'), 'Value')
# Update metadata header
req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(3),
'X-Account-Meta-Test': 'New Value'})
resp = self.controller.PUT(req)
self.assertEquals(resp.status_int, 202)
req = Request.blank('/sda1/p/a')
resp = self.controller.GET(req)
self.assertEquals(resp.status_int, 204)
self.assertEquals(resp.headers.get('x-account-meta-test'), 'New Value')
# Send old update to metadata header
req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(2),
'X-Account-Meta-Test': 'Old Value'})
resp = self.controller.PUT(req)
self.assertEquals(resp.status_int, 202)
req = Request.blank('/sda1/p/a')
resp = self.controller.GET(req)
self.assertEquals(resp.status_int, 204)
self.assertEquals(resp.headers.get('x-account-meta-test'), 'New Value')
# Remove metadata header (by setting it to empty)
req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(4),
'X-Account-Meta-Test': ''})
resp = self.controller.PUT(req)
self.assertEquals(resp.status_int, 202)
req = Request.blank('/sda1/p/a')
resp = self.controller.GET(req)
self.assertEquals(resp.status_int, 204)
self.assert_('x-account-meta-test' not in resp.headers)
def test_POST_HEAD_metadata(self):
req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(1)})
resp = self.controller.PUT(req)
self.assertEquals(resp.status_int, 201)
# Set metadata header
req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': normalize_timestamp(1),
'X-Account-Meta-Test': 'Value'})
resp = self.controller.POST(req)
self.assertEquals(resp.status_int, 204)
req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'HEAD'})
resp = self.controller.HEAD(req)
self.assertEquals(resp.status_int, 204)
self.assertEquals(resp.headers.get('x-account-meta-test'), 'Value')
# Update metadata header
req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': normalize_timestamp(3),
'X-Account-Meta-Test': 'New Value'})
resp = self.controller.POST(req)
self.assertEquals(resp.status_int, 204)
req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'HEAD'})
resp = self.controller.HEAD(req)
self.assertEquals(resp.status_int, 204)
self.assertEquals(resp.headers.get('x-account-meta-test'), 'New Value')
# Send old update to metadata header
req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': normalize_timestamp(2),
'X-Account-Meta-Test': 'Old Value'})
resp = self.controller.POST(req)
self.assertEquals(resp.status_int, 204)
req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'HEAD'})
resp = self.controller.HEAD(req)
self.assertEquals(resp.status_int, 204)
self.assertEquals(resp.headers.get('x-account-meta-test'), 'New Value')
# Remove metadata header (by setting it to empty)
req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': normalize_timestamp(4),
'X-Account-Meta-Test': ''})
resp = self.controller.POST(req)
self.assertEquals(resp.status_int, 204)
req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'HEAD'})
resp = self.controller.HEAD(req)
self.assertEquals(resp.status_int, 204)
self.assert_('x-account-meta-test' not in resp.headers)
def test_GET_not_found_plain(self):
req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'GET'})
resp = self.controller.GET(req)

View File

@ -367,6 +367,161 @@ class TestDatabaseBroker(unittest.TestCase):
broker.merge_syncs([{'sync_point': 5, 'remote_id': uuid2}])
self.assertEquals(broker.get_sync(uuid2), 5)
def test_get_replication_info(self):
self.get_replication_info_tester(metadata=False)
def test_get_replication_info_with_metadata(self):
self.get_replication_info_tester(metadata=True)
def get_replication_info_tester(self, metadata=False):
broker = DatabaseBroker(':memory:', account='a')
broker.db_type = 'test'
broker.db_contains_type = 'test'
broker_creation = normalize_timestamp(1)
broker_uuid = str(uuid4())
broker_metadata = metadata and simplejson.dumps(
{'Test': ('Value', normalize_timestamp(1))}) or ''
def _initialize(conn, put_timestamp):
if put_timestamp is None:
put_timestamp = normalize_timestamp(0)
conn.executescript('''
CREATE TABLE test (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE,
created_at TEXT
);
CREATE TRIGGER test_insert AFTER INSERT ON test
BEGIN
UPDATE test_stat
SET test_count = test_count + 1,
hash = chexor(hash, new.name, new.created_at);
END;
CREATE TRIGGER test_update BEFORE UPDATE ON test
BEGIN
SELECT RAISE(FAIL,
'UPDATE not allowed; DELETE and INSERT');
END;
CREATE TRIGGER test_delete AFTER DELETE ON test
BEGIN
UPDATE test_stat
SET test_count = test_count - 1,
hash = chexor(hash, old.name, old.created_at);
END;
CREATE TABLE test_stat (
account TEXT,
created_at TEXT,
put_timestamp TEXT DEFAULT '0',
delete_timestamp TEXT DEFAULT '0',
test_count INTEGER,
hash TEXT default '00000000000000000000000000000000',
id TEXT
%s
);
INSERT INTO test_stat (test_count) VALUES (0);
''' % (metadata and ", metadata TEXT DEFAULT ''" or ""))
conn.execute('''
UPDATE test_stat
SET account = ?, created_at = ?, id = ?, put_timestamp = ?
''', (broker.account, broker_creation, broker_uuid, put_timestamp))
if metadata:
conn.execute('UPDATE test_stat SET metadata = ?',
(broker_metadata,))
conn.commit()
broker._initialize = _initialize
put_timestamp = normalize_timestamp(2)
broker.initialize(put_timestamp)
info = broker.get_replication_info()
self.assertEquals(info, {'count': 0,
'hash': '00000000000000000000000000000000',
'created_at': broker_creation, 'put_timestamp': put_timestamp,
'delete_timestamp': '0', 'max_row': -1, 'id': broker_uuid,
'metadata': broker_metadata})
insert_timestamp = normalize_timestamp(3)
with broker.get() as conn:
conn.execute('''
INSERT INTO test (name, created_at) VALUES ('test', ?)
''', (insert_timestamp,))
conn.commit()
info = broker.get_replication_info()
self.assertEquals(info, {'count': 1,
'hash': 'bdc4c93f574b0d8c2911a27ce9dd38ba',
'created_at': broker_creation, 'put_timestamp': put_timestamp,
'delete_timestamp': '0', 'max_row': 1, 'id': broker_uuid,
'metadata': broker_metadata})
with broker.get() as conn:
conn.execute('DELETE FROM test')
conn.commit()
info = broker.get_replication_info()
self.assertEquals(info, {'count': 0,
'hash': '00000000000000000000000000000000',
'created_at': broker_creation, 'put_timestamp': put_timestamp,
'delete_timestamp': '0', 'max_row': 1, 'id': broker_uuid,
'metadata': broker_metadata})
return broker
def test_metadata(self):
# Initializes a good broker for us
broker = self.get_replication_info_tester(metadata=True)
# Add our first item
first_timestamp = normalize_timestamp(1)
first_value = '1'
broker.metadata = {'First': [first_value, first_timestamp]}
self.assert_('First' in broker.metadata)
self.assertEquals(broker.metadata['First'],
[first_value, first_timestamp])
# Add our second item
second_timestamp = normalize_timestamp(2)
second_value = '2'
broker.metadata = {'Second': [second_value, second_timestamp]}
self.assert_('First' in broker.metadata)
self.assertEquals(broker.metadata['First'],
[first_value, first_timestamp])
self.assert_('Second' in broker.metadata)
self.assertEquals(broker.metadata['Second'],
[second_value, second_timestamp])
# Update our first item
first_timestamp = normalize_timestamp(3)
first_value = '1b'
broker.metadata = {'First': [first_value, first_timestamp]}
self.assert_('First' in broker.metadata)
self.assertEquals(broker.metadata['First'],
[first_value, first_timestamp])
self.assert_('Second' in broker.metadata)
self.assertEquals(broker.metadata['Second'],
[second_value, second_timestamp])
# Delete our second item (by setting to empty string)
second_timestamp = normalize_timestamp(4)
second_value = ''
broker.metadata = {'Second': [second_value, second_timestamp]}
self.assert_('First' in broker.metadata)
self.assertEquals(broker.metadata['First'],
[first_value, first_timestamp])
self.assert_('Second' in broker.metadata)
self.assertEquals(broker.metadata['Second'],
[second_value, second_timestamp])
# Reclaim at point before second item was deleted
broker.reclaim(normalize_timestamp(3))
self.assert_('First' in broker.metadata)
self.assertEquals(broker.metadata['First'],
[first_value, first_timestamp])
self.assert_('Second' in broker.metadata)
self.assertEquals(broker.metadata['Second'],
[second_value, second_timestamp])
# Reclaim at point second item was deleted
broker.reclaim(normalize_timestamp(4))
self.assert_('First' in broker.metadata)
self.assertEquals(broker.metadata['First'],
[first_value, first_timestamp])
self.assert_('Second' in broker.metadata)
self.assertEquals(broker.metadata['Second'],
[second_value, second_timestamp])
# Reclaim after point second item was deleted
broker.reclaim(normalize_timestamp(5))
self.assert_('First' in broker.metadata)
self.assertEquals(broker.metadata['First'],
[first_value, first_timestamp])
self.assert_('Second' not in broker.metadata)
class TestContainerBroker(unittest.TestCase):
""" Tests for swift.common.db.ContainerBroker """
@ -1119,6 +1274,78 @@ class TestContainerBroker(unittest.TestCase):
self.assertEquals(rec['content_type'], 'text/plain')
def premetadata_create_container_stat_table(self, conn, put_timestamp=None):
"""
Copied from swift.common.db.ContainerBroker before the metadata column was
added; used for testing with TestContainerBrokerBeforeMetadata.
Create the container_stat table which is specifc to the container DB.
:param conn: DB connection object
:param put_timestamp: put timestamp
"""
if put_timestamp is None:
put_timestamp = normalize_timestamp(0)
conn.executescript("""
CREATE TABLE container_stat (
account TEXT,
container TEXT,
created_at TEXT,
put_timestamp TEXT DEFAULT '0',
delete_timestamp TEXT DEFAULT '0',
object_count INTEGER,
bytes_used INTEGER,
reported_put_timestamp TEXT DEFAULT '0',
reported_delete_timestamp TEXT DEFAULT '0',
reported_object_count INTEGER DEFAULT 0,
reported_bytes_used INTEGER DEFAULT 0,
hash TEXT default '00000000000000000000000000000000',
id TEXT,
status TEXT DEFAULT '',
status_changed_at TEXT DEFAULT '0'
);
INSERT INTO container_stat (object_count, bytes_used)
VALUES (0, 0);
""")
conn.execute('''
UPDATE container_stat
SET account = ?, container = ?, created_at = ?, id = ?,
put_timestamp = ?
''', (self.account, self.container, normalize_timestamp(time()),
str(uuid4()), put_timestamp))
class TestContainerBrokerBeforeMetadata(TestContainerBroker):
"""
Tests for swift.common.db.ContainerBroker against databases created before
the metadata column was added.
"""
def setUp(self):
self._imported_create_container_stat_table = \
ContainerBroker.create_container_stat_table
ContainerBroker.create_container_stat_table = \
premetadata_create_container_stat_table
broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(normalize_timestamp('1'))
exc = None
with broker.get() as conn:
try:
conn.execute('SELECT metadata FROM container_stat')
except BaseException, err:
exc = err
self.assert_('no such column: metadata' in str(exc))
def tearDown(self):
ContainerBroker.create_container_stat_table = \
self._imported_create_container_stat_table
broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(normalize_timestamp('1'))
with broker.get() as conn:
conn.execute('SELECT metadata FROM container_stat')
class TestAccountBroker(unittest.TestCase):
""" Tests for swift.common.db.AccountBroker """
@ -1575,5 +1802,70 @@ class TestAccountBroker(unittest.TestCase):
sorted([rec['name'] for rec in items]))
def premetadata_create_account_stat_table(self, conn, put_timestamp):
"""
Copied from swift.common.db.AccountBroker before the metadata column was
added; used for testing with TestAccountBrokerBeforeMetadata.
Create account_stat table which is specific to the account DB.
:param conn: DB connection object
:param put_timestamp: put timestamp
"""
conn.executescript("""
CREATE TABLE account_stat (
account TEXT,
created_at TEXT,
put_timestamp TEXT DEFAULT '0',
delete_timestamp TEXT DEFAULT '0',
container_count INTEGER,
object_count INTEGER DEFAULT 0,
bytes_used INTEGER DEFAULT 0,
hash TEXT default '00000000000000000000000000000000',
id TEXT,
status TEXT DEFAULT '',
status_changed_at TEXT DEFAULT '0'
);
INSERT INTO account_stat (container_count) VALUES (0);
""")
conn.execute('''
UPDATE account_stat SET account = ?, created_at = ?, id = ?,
put_timestamp = ?
''', (self.account, normalize_timestamp(time()), str(uuid4()),
put_timestamp))
class TestAccountBrokerBeforeMetadata(TestAccountBroker):
"""
Tests for swift.common.db.AccountBroker against databases created before
the metadata column was added.
"""
def setUp(self):
self._imported_create_account_stat_table = \
AccountBroker.create_account_stat_table
AccountBroker.create_account_stat_table = \
premetadata_create_account_stat_table
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp('1'))
exc = None
with broker.get() as conn:
try:
conn.execute('SELECT metadata FROM account_stat')
except BaseException, err:
exc = err
self.assert_('no such column: metadata' in str(exc))
def tearDown(self):
AccountBroker.create_account_stat_table = \
self._imported_create_account_stat_table
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp('1'))
with broker.get() as conn:
conn.execute('SELECT metadata FROM account_stat')
if __name__ == '__main__':
unittest.main()

View File

@ -19,7 +19,7 @@ import os
import logging
from swift.common import db_replicator
from swift.common import db, utils
from swift.common.utils import normalize_timestamp
from swift.container import server as container_server
@ -61,13 +61,13 @@ def _mock_process(*args):
yield
db_replicator.subprocess.Popen = orig_process
class PostReplHttp:
class ReplHttp:
def __init__(self, response=None):
self.response = response
posted = False
replicated = False
host = 'localhost'
def post(self, *args):
self.posted = True
def replicate(self, *args):
self.replicated = True
class Response:
status = 200
data = self.response
@ -125,18 +125,18 @@ class TestDBReplicator(unittest.TestCase):
conn = db_replicator.ReplConnection(node, '1234567890', 'abcdefg',
logging.getLogger())
def req(method, path, body, headers):
self.assertEquals(method, 'POST')
self.assertEquals(method, 'REPLICATE')
self.assertEquals(headers['Content-Type'], 'application/json')
class Resp:
def read(self): return 'data'
resp = Resp()
conn.request = req
conn.getresponse = lambda *args: resp
self.assertEquals(conn.post(1, 2, 3), resp)
self.assertEquals(conn.replicate(1, 2, 3), resp)
def other_req(method, path, body, headers):
raise Exception('blah')
conn.request = other_req
self.assertEquals(conn.post(1, 2, 3), None)
self.assertEquals(conn.replicate(1, 2, 3), None)
def test_rsync_file(self):
replicator = TestReplicator({}, {})
@ -153,7 +153,7 @@ class TestDBReplicator(unittest.TestCase):
replicator = TestReplicator({}, {})
replicator._rsync_file = lambda *args: True
fake_device = {'ip': '127.0.0.1', 'device': 'sda1'}
replicator._rsync_db(FakeBroker(), fake_device, PostReplHttp(), 'abcd')
replicator._rsync_db(FakeBroker(), fake_device, ReplHttp(), 'abcd')
def test_in_sync(self):
replicator = TestReplicator({}, {})
@ -175,7 +175,7 @@ class TestDBReplicator(unittest.TestCase):
replicator.replicate_once()
def test_usync(self):
fake_http = PostReplHttp()
fake_http = ReplHttp()
replicator = TestReplicator({}, {})
replicator._usync_db(0, FakeBroker(), fake_http, '12345', '67890')
@ -184,8 +184,9 @@ class TestDBReplicator(unittest.TestCase):
fake_node = {'ip': '127.0.0.1', 'device': 'sda1', 'port': 1000}
fake_info = {'id': 'a', 'point': -1, 'max_row': 0, 'hash': 'b',
'created_at': 100, 'put_timestamp': 0,
'delete_timestamp': 0}
replicator._http_connect = lambda *args: PostReplHttp('{"id": 3, "point": -1}')
'delete_timestamp': 0,
'metadata': {'Test': ('Value', normalize_timestamp(1))}}
replicator._http_connect = lambda *args: ReplHttp('{"id": 3, "point": -1}')
self.assertEquals(replicator._repl_to_node(
fake_node, FakeBroker(), '0', fake_info), True)

View File

@ -93,6 +93,98 @@ class TestContainerController(unittest.TestCase):
resp = self.controller.PUT(req)
self.assertEquals(resp.status_int, 404)
def test_PUT_GET_metadata(self):
# Set metadata header
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(1),
'X-Container-Meta-Test': 'Value'})
resp = self.controller.PUT(req)
self.assertEquals(resp.status_int, 201)
req = Request.blank('/sda1/p/a/c')
resp = self.controller.GET(req)
self.assertEquals(resp.status_int, 204)
self.assertEquals(resp.headers.get('x-container-meta-test'), 'Value')
# Update metadata header
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(3),
'X-Container-Meta-Test': 'New Value'})
resp = self.controller.PUT(req)
self.assertEquals(resp.status_int, 202)
req = Request.blank('/sda1/p/a/c')
resp = self.controller.GET(req)
self.assertEquals(resp.status_int, 204)
self.assertEquals(resp.headers.get('x-container-meta-test'),
'New Value')
# Send old update to metadata header
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(2),
'X-Container-Meta-Test': 'Old Value'})
resp = self.controller.PUT(req)
self.assertEquals(resp.status_int, 202)
req = Request.blank('/sda1/p/a/c')
resp = self.controller.GET(req)
self.assertEquals(resp.status_int, 204)
self.assertEquals(resp.headers.get('x-container-meta-test'),
'New Value')
# Remove metadata header (by setting it to empty)
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(4),
'X-Container-Meta-Test': ''})
resp = self.controller.PUT(req)
self.assertEquals(resp.status_int, 202)
req = Request.blank('/sda1/p/a/c')
resp = self.controller.GET(req)
self.assertEquals(resp.status_int, 204)
self.assert_('x-container-meta-test' not in resp.headers)
def test_POST_HEAD_metadata(self):
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(1)})
resp = self.controller.PUT(req)
self.assertEquals(resp.status_int, 201)
# Set metadata header
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': normalize_timestamp(1),
'X-Container-Meta-Test': 'Value'})
resp = self.controller.POST(req)
self.assertEquals(resp.status_int, 204)
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD'})
resp = self.controller.HEAD(req)
self.assertEquals(resp.status_int, 204)
self.assertEquals(resp.headers.get('x-container-meta-test'), 'Value')
# Update metadata header
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': normalize_timestamp(3),
'X-Container-Meta-Test': 'New Value'})
resp = self.controller.POST(req)
self.assertEquals(resp.status_int, 204)
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD'})
resp = self.controller.HEAD(req)
self.assertEquals(resp.status_int, 204)
self.assertEquals(resp.headers.get('x-container-meta-test'),
'New Value')
# Send old update to metadata header
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': normalize_timestamp(2),
'X-Container-Meta-Test': 'Old Value'})
resp = self.controller.POST(req)
self.assertEquals(resp.status_int, 204)
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD'})
resp = self.controller.HEAD(req)
self.assertEquals(resp.status_int, 204)
self.assertEquals(resp.headers.get('x-container-meta-test'),
'New Value')
# Remove metadata header (by setting it to empty)
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': normalize_timestamp(4),
'X-Container-Meta-Test': ''})
resp = self.controller.POST(req)
self.assertEquals(resp.status_int, 204)
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD'})
resp = self.controller.HEAD(req)
self.assertEquals(resp.status_int, 204)
self.assert_('x-container-meta-test' not in resp.headers)
def test_DELETE_obj_not_found(self):
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'DELETE'},