Merge "Fix failure in multi-chunk state sync for nicira plugin"
This commit is contained in:
commit
b997c492b3
@ -458,24 +458,22 @@ class NvpSynchronizer():
|
|||||||
# API. In this case the request should be split in multiple
|
# API. In this case the request should be split in multiple
|
||||||
# requests. This is not ideal, and therefore a log warning will
|
# requests. This is not ideal, and therefore a log warning will
|
||||||
# be emitted.
|
# be emitted.
|
||||||
requests = range(0, page_size / (nvplib.MAX_PAGE_SIZE + 1) + 1)
|
num_requests = page_size / (nvplib.MAX_PAGE_SIZE + 1) + 1
|
||||||
if len(requests) > 1:
|
if num_requests > 1:
|
||||||
LOG.warn(_("Requested page size is %(cur_chunk_size)d."
|
LOG.warn(_("Requested page size is %(cur_chunk_size)d."
|
||||||
"It might be necessary to do %(num_requests)d "
|
"It might be necessary to do %(num_requests)d "
|
||||||
"round-trips to NVP for fetching data. Please "
|
"round-trips to NVP for fetching data. Please "
|
||||||
"tune sync parameters to ensure chunk size "
|
"tune sync parameters to ensure chunk size "
|
||||||
"is less than %(max_page_size)d"),
|
"is less than %(max_page_size)d"),
|
||||||
{'cur_chunk_size': page_size,
|
{'cur_chunk_size': page_size,
|
||||||
'num_requests': len(requests),
|
'num_requests': num_requests,
|
||||||
'max_page_size': nvplib.MAX_PAGE_SIZE})
|
'max_page_size': nvplib.MAX_PAGE_SIZE})
|
||||||
results = []
|
# Only the first request might return the total size,
|
||||||
actual_size = 0
|
# subsequent requests will definetely not
|
||||||
for _req in requests:
|
results, cursor, total_size = nvplib.get_single_query_page(
|
||||||
req_results, cursor, req_size = nvplib.get_single_query_page(
|
|
||||||
uri, self._cluster, cursor,
|
uri, self._cluster, cursor,
|
||||||
min(page_size, nvplib.MAX_PAGE_SIZE))
|
min(page_size, nvplib.MAX_PAGE_SIZE))
|
||||||
results.extend(req_results)
|
for _req in range(0, num_requests - 1):
|
||||||
actual_size = actual_size + req_size
|
|
||||||
# If no cursor is returned break the cycle as there is no
|
# If no cursor is returned break the cycle as there is no
|
||||||
# actual need to perform multiple requests (all fetched)
|
# actual need to perform multiple requests (all fetched)
|
||||||
# This happens when the overall size of resources exceeds
|
# This happens when the overall size of resources exceeds
|
||||||
@ -483,9 +481,13 @@ class NvpSynchronizer():
|
|||||||
# resource type is below this threshold
|
# resource type is below this threshold
|
||||||
if not cursor:
|
if not cursor:
|
||||||
break
|
break
|
||||||
|
req_results, cursor = nvplib.get_single_query_page(
|
||||||
|
uri, self._cluster, cursor,
|
||||||
|
min(page_size, nvplib.MAX_PAGE_SIZE))[:2]
|
||||||
|
results.extend(req_results)
|
||||||
# reset cursor before returning if we queried just to
|
# reset cursor before returning if we queried just to
|
||||||
# know the number of entities
|
# know the number of entities
|
||||||
return results, cursor if page_size else 'start', actual_size
|
return results, cursor if page_size else 'start', total_size
|
||||||
return [], cursor, None
|
return [], cursor, None
|
||||||
|
|
||||||
def _fetch_nvp_data_chunk(self, sp):
|
def _fetch_nvp_data_chunk(self, sp):
|
||||||
|
@ -135,7 +135,7 @@ class FakeClient:
|
|||||||
|
|
||||||
def _get_filters(self, querystring):
|
def _get_filters(self, querystring):
|
||||||
if not querystring:
|
if not querystring:
|
||||||
return (None, None, None)
|
return (None, None, None, None)
|
||||||
params = urlparse.parse_qs(querystring)
|
params = urlparse.parse_qs(querystring)
|
||||||
tag_filter = None
|
tag_filter = None
|
||||||
attr_filter = None
|
attr_filter = None
|
||||||
@ -144,15 +144,15 @@ class FakeClient:
|
|||||||
'tag': params['tag'][0]}
|
'tag': params['tag'][0]}
|
||||||
elif 'uuid' in params:
|
elif 'uuid' in params:
|
||||||
attr_filter = {'uuid': params['uuid'][0]}
|
attr_filter = {'uuid': params['uuid'][0]}
|
||||||
# Handle page_length
|
# Handle page length and page cursor parameter
|
||||||
# TODO(salv-orlando): Handle page cursor too
|
|
||||||
page_len = params.get('_page_length')
|
page_len = params.get('_page_length')
|
||||||
|
page_cursor = params.get('_page_cursor')
|
||||||
if page_len:
|
if page_len:
|
||||||
page_len = int(page_len[0])
|
page_len = int(page_len[0])
|
||||||
else:
|
else:
|
||||||
# Explicitly set it to None (avoid 0 or empty list)
|
# Explicitly set it to None (avoid 0 or empty list)
|
||||||
page_len = None
|
page_len = None
|
||||||
return (tag_filter, attr_filter, page_len)
|
return (tag_filter, attr_filter, page_len, page_cursor)
|
||||||
|
|
||||||
def _add_lswitch(self, body):
|
def _add_lswitch(self, body):
|
||||||
fake_lswitch = json.loads(body)
|
fake_lswitch = json.loads(body)
|
||||||
@ -369,7 +369,11 @@ class FakeClient:
|
|||||||
|
|
||||||
def _list(self, resource_type, response_file,
|
def _list(self, resource_type, response_file,
|
||||||
parent_uuid=None, query=None, relations=None):
|
parent_uuid=None, query=None, relations=None):
|
||||||
(tag_filter, attr_filter, page_len) = self._get_filters(query)
|
(tag_filter, attr_filter,
|
||||||
|
page_len, page_cursor) = self._get_filters(query)
|
||||||
|
# result_count attribute in response should appear only when
|
||||||
|
# page_cursor is not specified
|
||||||
|
do_result_count = not page_cursor
|
||||||
with open("%s/%s" % (self.fake_files_path, response_file)) as f:
|
with open("%s/%s" % (self.fake_files_path, response_file)) as f:
|
||||||
response_template = f.read()
|
response_template = f.read()
|
||||||
res_dict = getattr(self, '_fake_%s_dict' % resource_type)
|
res_dict = getattr(self, '_fake_%s_dict' % resource_type)
|
||||||
@ -410,6 +414,15 @@ class FakeClient:
|
|||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
def _cursor_match(res_uuid, page_cursor):
|
||||||
|
if not page_cursor:
|
||||||
|
return True
|
||||||
|
if page_cursor == res_uuid:
|
||||||
|
# always return True once page_cursor has been found
|
||||||
|
page_cursor = None
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
def _build_item(resource):
|
def _build_item(resource):
|
||||||
item = json.loads(response_template % resource)
|
item = json.loads(response_template % resource)
|
||||||
if relations:
|
if relations:
|
||||||
@ -437,7 +450,8 @@ class FakeClient:
|
|||||||
for res_uuid in res_dict
|
for res_uuid in res_dict
|
||||||
if (parent_func(res_uuid) and
|
if (parent_func(res_uuid) and
|
||||||
_tag_match(res_uuid) and
|
_tag_match(res_uuid) and
|
||||||
_attr_match(res_uuid))]
|
_attr_match(res_uuid) and
|
||||||
|
_cursor_match(res_uuid, page_cursor))]
|
||||||
# Rather inefficient, but hey this is just a mock!
|
# Rather inefficient, but hey this is just a mock!
|
||||||
next_cursor = None
|
next_cursor = None
|
||||||
total_items = len(items)
|
total_items = len(items)
|
||||||
@ -447,10 +461,11 @@ class FakeClient:
|
|||||||
except IndexError:
|
except IndexError:
|
||||||
next_cursor = None
|
next_cursor = None
|
||||||
items = items[:page_len]
|
items = items[:page_len]
|
||||||
response_dict = {'results': items,
|
response_dict = {'results': items}
|
||||||
'result_count': total_items}
|
|
||||||
if next_cursor:
|
if next_cursor:
|
||||||
response_dict['page_cursor'] = next_cursor
|
response_dict['page_cursor'] = next_cursor
|
||||||
|
if do_result_count:
|
||||||
|
response_dict['result_count'] = total_items
|
||||||
return json.dumps(response_dict)
|
return json.dumps(response_dict)
|
||||||
|
|
||||||
def _show(self, resource_type, response_file,
|
def _show(self, resource_type, response_file,
|
||||||
|
Loading…
Reference in New Issue
Block a user