Add shard range support to container server

Support PUTs to container server with json serialized ShardRanges in
body.  Shard range PUTs may autocreate containers.

Support GET of shard ranges from container server.Shard range GETs
support X-Backend-Include-Deleted to include deleted shard ranges in
list and X-Backend-Override-Delete to get shard ranges when container
has been marked as deleted.

The X-Backend-Record-Type = ['object'|'shard'|'auto'] is introduced
to differentiate container server requests for object versus shard
ranges. When 'auto' is used with a GET request the container server
will return whichever record type is appropriate for fetchng object
listings, depending on whether the container is sharded or not.

Support container PUTs with body in direct_client .py

Co-Authored-By: Matthew Oliver <matt@oliver.net.au>
Co-Authored-By: Tim Burke <tim.burke@gmail.com>
Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com>

Change-Id: I029782ae348f38c5fb76d2759609f67a06c883ef
This commit is contained in:
Alistair Coles 2018-05-01 16:21:03 +01:00
parent 14af38a899
commit 723eac907c
6 changed files with 1334 additions and 80 deletions

View File

@ -54,22 +54,72 @@ class DirectClientException(ClientException):
http_reason=resp.reason, http_headers=headers)
def _make_req(node, part, method, path, _headers, stype,
conn_timeout=5, response_timeout=15):
def _make_req(node, part, method, path, headers, stype,
conn_timeout=5, response_timeout=15, send_timeout=15,
contents=None, content_length=None, chunk_size=65535):
"""
Make request to backend storage node.
(i.e. 'Account', 'Container', 'Object')
:param node: a node dict from a ring
:param part: an integer, the partion number
:param part: an integer, the partition number
:param method: a string, the HTTP method (e.g. 'PUT', 'DELETE', etc)
:param path: a string, the request path
:param headers: a dict, header name => value
:param stype: a string, describing the type of service
:param conn_timeout: timeout while waiting for connection; default is 5
seconds
:param response_timeout: timeout while waiting for response; default is 15
seconds
:param send_timeout: timeout for sending request body; default is 15
seconds
:param contents: an iterable or string to read object data from
:param content_length: value to send as content-length header
:param chunk_size: if defined, chunk size of data to send
:returns: an HTTPResponse object
:raises DirectClientException: if the response status is not 2xx
:raises eventlet.Timeout: if either conn_timeout or response_timeout is
exceeded
"""
if contents is not None:
if content_length is not None:
headers['Content-Length'] = str(content_length)
else:
for n, v in headers.items():
if n.lower() == 'content-length':
content_length = int(v)
if not contents:
headers['Content-Length'] = '0'
if isinstance(contents, six.string_types):
contents = [contents]
if content_length is None:
headers['Transfer-Encoding'] = 'chunked'
with Timeout(conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'], part,
method, path, headers=_headers)
method, path, headers=headers)
if contents is not None:
contents_f = FileLikeIter(contents)
with Timeout(send_timeout):
if content_length is None:
chunk = contents_f.read(chunk_size)
while chunk:
conn.send('%x\r\n%s\r\n' % (len(chunk), chunk))
chunk = contents_f.read(chunk_size)
conn.send('0\r\n\r\n')
else:
left = content_length
while left > 0:
size = chunk_size
if size > left:
size = left
chunk = contents_f.read(size)
if not chunk:
break
conn.send(chunk)
left -= len(chunk)
with Timeout(response_timeout):
resp = conn.getresponse()
resp.read()
@ -82,7 +132,7 @@ def _get_direct_account_container(path, stype, node, part,
marker=None, limit=None,
prefix=None, delimiter=None,
conn_timeout=5, response_timeout=15,
end_marker=None, reverse=None):
end_marker=None, reverse=None, headers=None):
"""Base class for get direct account and container.
Do not use directly use the get_direct_account or
@ -105,7 +155,7 @@ def _get_direct_account_container(path, stype, node, part,
with Timeout(conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'], part,
'GET', path, query_string=qs,
headers=gen_headers())
headers=gen_headers(hdrs_in=headers))
with Timeout(response_timeout):
resp = conn.getresponse()
if not is_success(resp.status):
@ -121,11 +171,12 @@ def _get_direct_account_container(path, stype, node, part,
return resp_headers, json.loads(resp.read())
def gen_headers(hdrs_in=None, add_ts=False):
def gen_headers(hdrs_in=None, add_ts=False, add_user_agent=True):
hdrs_out = HeaderKeyDict(hdrs_in) if hdrs_in else HeaderKeyDict()
if add_ts:
hdrs_out['X-Timestamp'] = Timestamp.now().internal
hdrs_out['User-Agent'] = 'direct-client %s' % os.getpid()
if add_user_agent:
hdrs_out['User-Agent'] = 'direct-client %s' % os.getpid()
return hdrs_out
@ -197,7 +248,7 @@ def direct_head_container(node, part, account, container, conn_timeout=5,
def direct_get_container(node, part, account, container, marker=None,
limit=None, prefix=None, delimiter=None,
conn_timeout=5, response_timeout=15, end_marker=None,
reverse=None):
reverse=None, headers=None):
"""
Get container listings directly from the container server.
@ -213,6 +264,7 @@ def direct_get_container(node, part, account, container, marker=None,
:param response_timeout: timeout in seconds for getting the response
:param end_marker: end_marker query
:param reverse: reverse the returned listing
:param headers: headers to be included in the request
:returns: a tuple of (response headers, a list of objects) The response
headers will be a HeaderKeyDict.
"""
@ -224,7 +276,8 @@ def direct_get_container(node, part, account, container, marker=None,
end_marker=end_marker,
reverse=reverse,
conn_timeout=conn_timeout,
response_timeout=response_timeout)
response_timeout=response_timeout,
headers=headers)
def direct_delete_container(node, part, account, container, conn_timeout=5,
@ -250,6 +303,37 @@ def direct_delete_container(node, part, account, container, conn_timeout=5,
'Container', conn_timeout, response_timeout)
def direct_put_container(node, part, account, container, conn_timeout=5,
response_timeout=15, headers=None, contents=None,
content_length=None, chunk_size=65535):
"""
Make a PUT request to a container server.
:param node: node dictionary from the ring
:param part: partition the container is on
:param account: account name
:param container: container name
:param conn_timeout: timeout in seconds for establishing the connection
:param response_timeout: timeout in seconds for getting the response
:param headers: additional headers to include in the request
:param contents: an iterable or string to send in request body (optional)
:param content_length: value to send as content-length header (optional)
:param chunk_size: chunk size of data to send (optional)
:raises ClientException: HTTP PUT request failed
"""
if headers is None:
headers = {}
lower_headers = set(k.lower() for k in headers)
headers_out = gen_headers(headers,
add_ts='x-timestamp' not in lower_headers,
add_user_agent='user-agent' not in lower_headers)
path = '/%s/%s' % (account, container)
_make_req(node, part, 'PUT', path, headers_out, 'Container', conn_timeout,
response_timeout, contents=contents,
content_length=content_length, chunk_size=chunk_size)
def direct_put_container_object(node, part, account, container, obj,
conn_timeout=5, response_timeout=15,
headers=None):
@ -385,56 +469,18 @@ def direct_put_object(node, part, account, container, name, contents,
headers = {}
if etag:
headers['ETag'] = etag.strip('"')
if content_length is not None:
headers['Content-Length'] = str(content_length)
else:
for n, v in headers.items():
if n.lower() == 'content-length':
content_length = int(v)
if content_type is not None:
headers['Content-Type'] = content_type
else:
headers['Content-Type'] = 'application/octet-stream'
if not contents:
headers['Content-Length'] = '0'
if isinstance(contents, six.string_types):
contents = [contents]
# Incase the caller want to insert an object with specific age
add_ts = 'X-Timestamp' not in headers
if content_length is None:
headers['Transfer-Encoding'] = 'chunked'
resp = _make_req(
node, part, 'PUT', path, gen_headers(headers, add_ts=add_ts),
'Object', conn_timeout, response_timeout, contents=contents,
content_length=content_length, chunk_size=chunk_size)
with Timeout(conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'], part,
'PUT', path, headers=gen_headers(headers, add_ts))
contents_f = FileLikeIter(contents)
if content_length is None:
chunk = contents_f.read(chunk_size)
while chunk:
conn.send('%x\r\n%s\r\n' % (len(chunk), chunk))
chunk = contents_f.read(chunk_size)
conn.send('0\r\n\r\n')
else:
left = content_length
while left > 0:
size = chunk_size
if size > left:
size = left
chunk = contents_f.read(size)
if not chunk:
break
conn.send(chunk)
left -= len(chunk)
with Timeout(response_timeout):
resp = conn.getresponse()
resp.read()
if not is_success(resp.status):
raise DirectClientException('Object', 'PUT',
node, part, path, resp)
return resp.getheader('etag').strip('"')

View File

@ -24,7 +24,8 @@ from eventlet import Timeout
import swift.common.db
from swift.container.sync_store import ContainerSyncStore
from swift.container.backend import ContainerBroker, DATADIR
from swift.container.backend import ContainerBroker, DATADIR, \
RECORD_TYPE_SHARD, UNSHARDED, SHARDING, SHARDED
from swift.container.replicator import ContainerReplicatorRpc
from swift.common.db import DatabaseAlreadyExists
from swift.common.container_sync_realms import ContainerSyncRealms
@ -33,7 +34,9 @@ from swift.common.request_helpers import get_param, \
from swift.common.utils import get_logger, hash_path, public, \
Timestamp, storage_directory, validate_sync_to, \
config_true_value, timing_stats, replication, \
override_bytes_from_content_type, get_log_line
override_bytes_from_content_type, get_log_line, ShardRange, \
list_from_csv
from swift.common.constraints import valid_timestamp, check_utf8, check_drive
from swift.common import constraints
from swift.common.bufferedhttp import http_connect
@ -72,6 +75,7 @@ def gen_resp_headers(info, is_deleted=False):
'X-Timestamp': Timestamp(info.get('created_at', 0)).normal,
'X-PUT-Timestamp': Timestamp(
info.get('put_timestamp', 0)).normal,
'X-Backend-Sharding-State': info.get('db_state', UNSHARDED),
})
return headers
@ -408,6 +412,22 @@ class ContainerController(BaseStorageServer):
req.headers.get('x-content-type-timestamp'),
req.headers.get('x-meta-timestamp'))
return HTTPCreated(request=req)
record_type = req.headers.get('x-backend-record-type', '').lower()
if record_type == RECORD_TYPE_SHARD:
try:
# validate incoming data...
shard_ranges = [ShardRange.from_dict(sr)
for sr in json.loads(req.body)]
except (ValueError, KeyError, TypeError) as err:
return HTTPBadRequest('Invalid body: %r' % err)
created = self._maybe_autocreate(broker, req_timestamp, account,
requested_policy_index)
self._update_metadata(req, broker, req_timestamp, 'PUT')
if shard_ranges:
# TODO: consider writing the shard ranges into the pending
# file, but if so ensure an all-or-none semantic for the write
broker.merge_shard_ranges(shard_ranges)
else: # put container
if requested_policy_index is None:
# use the default index sent by the proxy if available
@ -423,14 +443,14 @@ class ContainerController(BaseStorageServer):
resp = self.account_update(req, account, container, broker)
if resp:
return resp
if created:
return HTTPCreated(request=req,
headers={'x-backend-storage-policy-index':
broker.storage_policy_index})
else:
return HTTPAccepted(request=req,
headers={'x-backend-storage-policy-index':
broker.storage_policy_index})
if created:
return HTTPCreated(request=req,
headers={'x-backend-storage-policy-index':
broker.storage_policy_index})
else:
return HTTPAccepted(request=req,
headers={'x-backend-storage-policy-index':
broker.storage_policy_index})
@public
@timing_stats(sample_rate=0.1)
@ -469,13 +489,18 @@ class ContainerController(BaseStorageServer):
:params record: object entry record
:returns: modified record
"""
(name, created, size, content_type, etag) = record[:5]
if content_type is None:
return {'subdir': name.decode('utf8')}
response = {'bytes': size, 'hash': etag, 'name': name.decode('utf8'),
'content_type': content_type}
if isinstance(record, ShardRange):
created = record.timestamp
response = dict(record)
else:
(name, created, size, content_type, etag) = record[:5]
if content_type is None:
return {'subdir': name.decode('utf8')}
response = {
'bytes': size, 'hash': etag, 'name': name.decode('utf8'),
'content_type': content_type}
override_bytes_from_content_type(response, logger=self.logger)
response['last_modified'] = Timestamp(created).isoformat
override_bytes_from_content_type(response, logger=self.logger)
return response
@public
@ -509,12 +534,45 @@ class ContainerController(BaseStorageServer):
pending_timeout=0.1,
stale_reads_ok=True)
info, is_deleted = broker.get_info_is_deleted()
resp_headers = gen_resp_headers(info, is_deleted=is_deleted)
if is_deleted:
return HTTPNotFound(request=req, headers=resp_headers)
container_list = broker.list_objects_iter(
limit, marker, end_marker, prefix, delimiter, path,
storage_policy_index=info['storage_policy_index'], reverse=reverse)
record_type = req.headers.get('x-backend-record-type', '').lower()
if record_type == 'auto' and info.get('db_state') in (SHARDING,
SHARDED):
record_type = 'shard'
if record_type == 'shard':
override_deleted = info and config_true_value(
req.headers.get('x-backend-override-deleted', False))
resp_headers = gen_resp_headers(
info, is_deleted=is_deleted and not override_deleted)
if is_deleted and not override_deleted:
return HTTPNotFound(request=req, headers=resp_headers)
resp_headers['X-Backend-Record-Type'] = 'shard'
includes = get_param(req, 'includes')
states = get_param(req, 'states')
fill_gaps = False
if states:
states = list_from_csv(states)
fill_gaps = any(('listing' in states, 'updating' in states))
try:
states = broker.resolve_shard_range_states(states)
except ValueError:
return HTTPBadRequest(request=req, body='Bad state')
include_deleted = config_true_value(
req.headers.get('x-backend-include-deleted', False))
container_list = broker.get_shard_ranges(
marker, end_marker, includes, reverse, states=states,
include_deleted=include_deleted, fill_gaps=fill_gaps)
else:
resp_headers = gen_resp_headers(info, is_deleted=is_deleted)
if is_deleted:
return HTTPNotFound(request=req, headers=resp_headers)
resp_headers['X-Backend-Record-Type'] = 'object'
# Use the retired db while container is in process of sharding,
# otherwise use current db
src_broker = broker.get_brokers()[0]
container_list = src_broker.list_objects_iter(
limit, marker, end_marker, prefix, delimiter, path,
storage_policy_index=info['storage_policy_index'],
reverse=reverse)
return self.create_listing(req, out_content_type, info, resp_headers,
broker.metadata, container_list, container)

View File

@ -84,7 +84,9 @@ class ContainerController(Controller):
def GETorHEAD(self, req):
"""Handler for HTTP GET/HEAD requests."""
ai = self.account_info(self.account_name, req)
if not ai[1]:
auto_account = self.account_name.startswith(
self.app.auto_create_account_prefix)
if not (auto_account or ai[1]):
if 'swift.authorize' in req.environ:
aresp = req.environ['swift.authorize'](req)
if aresp:

View File

@ -95,6 +95,11 @@ def mocked_http_conn(*args, **kwargs):
yield fake_conn
@contextmanager
def noop_timeout(duration):
yield
@patch_policies
class TestDirectClient(unittest.TestCase):
@ -117,6 +122,10 @@ class TestDirectClient(unittest.TestCase):
self.account, self.container, self.obj))
self.user_agent = 'direct-client %s' % os.getpid()
patcher = mock.patch.object(direct_client, 'Timeout', noop_timeout)
patcher.start()
self.addCleanup(patcher.stop)
def test_gen_headers(self):
stub_user_agent = 'direct-client %s' % os.getpid()
@ -450,6 +459,67 @@ class TestDirectClient(unittest.TestCase):
self.assertEqual(err.http_status, 500)
self.assertTrue('DELETE' in str(err))
def test_direct_put_container(self):
body = 'Let us begin with a quick introduction'
headers = {'x-foo': 'bar', 'Content-Length': str(len(body)),
'Content-Type': 'application/json',
'User-Agent': 'my UA'}
with mocked_http_conn(204) as conn:
rv = direct_client.direct_put_container(
self.node, self.part, self.account, self.container,
contents=body, headers=headers)
self.assertEqual(conn.host, self.node['ip'])
self.assertEqual(conn.port, self.node['port'])
self.assertEqual(conn.method, 'PUT')
self.assertEqual(conn.path, self.container_path)
self.assertEqual(conn.req_headers['Content-Length'],
str(len(body)))
self.assertEqual(conn.req_headers['Content-Type'],
'application/json')
self.assertEqual(conn.req_headers['User-Agent'], 'my UA')
self.assertTrue('x-timestamp' in conn.req_headers)
self.assertEqual('bar', conn.req_headers.get('x-foo'))
self.assertEqual(md5(body).hexdigest(), conn.etag.hexdigest())
self.assertIsNone(rv)
def test_direct_put_container_chunked(self):
body = 'Let us begin with a quick introduction'
headers = {'x-foo': 'bar', 'Content-Type': 'application/json'}
with mocked_http_conn(204) as conn:
rv = direct_client.direct_put_container(
self.node, self.part, self.account, self.container,
contents=body, headers=headers)
self.assertEqual(conn.host, self.node['ip'])
self.assertEqual(conn.port, self.node['port'])
self.assertEqual(conn.method, 'PUT')
self.assertEqual(conn.path, self.container_path)
self.assertEqual(conn.req_headers['Transfer-Encoding'], 'chunked')
self.assertEqual(conn.req_headers['Content-Type'],
'application/json')
self.assertTrue('x-timestamp' in conn.req_headers)
self.assertEqual('bar', conn.req_headers.get('x-foo'))
self.assertNotIn('Content-Length', conn.req_headers)
expected_sent = '%0x\r\n%s\r\n0\r\n\r\n' % (len(body), body)
self.assertEqual(md5(expected_sent).hexdigest(),
conn.etag.hexdigest())
self.assertIsNone(rv)
def test_direct_put_container_fail(self):
with mock.patch('swift.common.bufferedhttp.http_connect_raw',
side_effect=Exception('conn failed')):
with self.assertRaises(Exception) as cm:
direct_client.direct_put_container(
self.node, self.part, self.account, self.container)
self.assertEqual('conn failed', str(cm.exception))
with mocked_http_conn(Exception('resp failed')):
with self.assertRaises(Exception) as cm:
direct_client.direct_put_container(
self.node, self.part, self.account, self.container)
self.assertEqual('resp failed', str(cm.exception))
def test_direct_put_container_object(self):
headers = {'x-foo': 'bar'}

File diff suppressed because it is too large Load Diff

View File

@ -8356,6 +8356,29 @@ class TestContainerController(unittest.TestCase):
self.assertEqual(res.content_length, 0)
self.assertNotIn('transfer-encoding', res.headers)
def test_GET_account_non_existent(self):
with save_globals():
set_http_connect(404, 404, 404)
controller = proxy_server.ContainerController(self.app, 'a', 'c')
req = Request.blank('/v1/a/c')
self.app.update_request(req)
res = controller.GET(req)
self.assertEqual(res.status_int, 404)
self.assertNotIn('container/a/c', res.environ['swift.infocache'])
def test_GET_auto_create_prefix_account_non_existent(self):
with save_globals():
set_http_connect(404, 404, 404, 204, 204, 204)
controller = proxy_server.ContainerController(self.app, '.a', 'c')
req = Request.blank('/v1/a/c')
self.app.update_request(req)
res = controller.GET(req)
self.assertEqual(res.status_int, 204)
ic = res.environ['swift.infocache']
self.assertEqual(ic['container/.a/c']['status'], 204)
self.assertEqual(res.content_length, 0)
self.assertNotIn('transfer-encoding', res.headers)
def test_GET_calls_authorize(self):
called = [False]