Fix record dump utility
From memcached records are read in bulks. But memcache may return less records than asked if the total size of chunk is larger than internal limit. The dump utility should verify number of records and fall back to one-by-ne retrieval if needed. Change-Id: I795790e2b24828bef258aa881f4154a194c0cc1c
This commit is contained in:
parent
d827333ebf
commit
79deedf278
@ -111,9 +111,20 @@ def export_data(memcached_inst, fd):
|
||||
key_prefix = key + ':'
|
||||
|
||||
for record_id_set in utils.make_range(0, count, BULK_READ_SIZE):
|
||||
for k, v in six.iteritems(memcached_inst.get_multi(
|
||||
record_id_set, key_prefix)):
|
||||
pickle.dump((key_prefix + str(k), v), fd)
|
||||
# memcache limits the size of returned data to specific yet unknown
|
||||
# chunk size, the code should verify that all requested records are
|
||||
# returned an be able to fall back to one-by-one retrieval
|
||||
|
||||
chunk = memcached_inst.get_multi(record_id_set, key_prefix)
|
||||
if len(chunk) < len(record_id_set):
|
||||
# retrieve one-by-one
|
||||
for record_id in record_id_set:
|
||||
key = key_prefix + str(record_id)
|
||||
pickle.dump((key, memcached_inst.get(key)), fd)
|
||||
else:
|
||||
# dump the whole chunk
|
||||
for k, v in six.iteritems(chunk):
|
||||
pickle.dump((key_prefix + str(k), v), fd)
|
||||
|
||||
for user_seq in range(memcached_inst.get('user:count') or 0):
|
||||
user = memcached_inst.get('user:%s' % user_seq)
|
||||
@ -126,25 +137,6 @@ def export_data(memcached_inst, fd):
|
||||
pickle.dump(('user:%s' % email, user), fd)
|
||||
|
||||
|
||||
def export_data_universal(memcached_inst, fd):
|
||||
LOG.info('Exporting data from memcached')
|
||||
slabs = memcached_inst.get_slabs()
|
||||
for slab_number, slab in six.iteritems(slabs[0][1]):
|
||||
count = int(slab['number'])
|
||||
keys = memcached_inst.get_stats(
|
||||
'cachedump %s %s' % (slab_number, count))[0][1].keys()
|
||||
|
||||
n = 0
|
||||
while n < count:
|
||||
LOG.debug('Dumping slab %s, start record %s', slab_number, n)
|
||||
|
||||
for k, v in six.iteritems(memcached_inst.get_multi(
|
||||
keys[n: min(count, n + BULK_READ_SIZE)])):
|
||||
pickle.dump((k, v), fd)
|
||||
|
||||
n += BULK_READ_SIZE
|
||||
|
||||
|
||||
def _connect_to_memcached(uri):
|
||||
stripped = re.sub(MEMCACHED_URI_PREFIX, '', uri)
|
||||
if stripped:
|
||||
|
68
tests/unit/test_dump.py
Normal file
68
tests/unit/test_dump.py
Normal file
@ -0,0 +1,68 @@
|
||||
# Copyright (c) 2014 Mirantis Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import memcache
|
||||
import mock
|
||||
import testtools
|
||||
|
||||
from stackalytics.processor import dump
|
||||
|
||||
|
||||
class TestDump(testtools.TestCase):
|
||||
|
||||
def _make_data(self, record_count):
|
||||
data = {'record:count': record_count}
|
||||
for i in range(record_count):
|
||||
data['record:%d' % i] = i
|
||||
return data
|
||||
|
||||
def test_export_data_records(self):
|
||||
record_count = 153
|
||||
data = self._make_data(record_count)
|
||||
memcache_inst = mock.Mock(memcache.Client)
|
||||
memcache_inst.get = lambda x: data.get(x)
|
||||
memcache_inst.get_multi = lambda keys, key_prefix: dict(
|
||||
('%s' % n, data.get(key_prefix + '%s' % n)) for n in keys)
|
||||
|
||||
with mock.patch('pickle.dump') as pickle_dump:
|
||||
fd = mock.Mock()
|
||||
dump.export_data(memcache_inst, fd)
|
||||
# self.assertEquals(total, pickle_dump.call_count)
|
||||
|
||||
expected_calls = [mock.call(('record:count', record_count), fd)]
|
||||
for i in range(record_count):
|
||||
expected_calls.append(mock.call(('record:%d' % i,
|
||||
data['record:%d' % i]), fd))
|
||||
pickle_dump.assert_has_calls(expected_calls, any_order=True)
|
||||
|
||||
def test_export_data_records_get_multi_truncates_chunk(self):
|
||||
record_count = 153
|
||||
data = self._make_data(record_count)
|
||||
memcache_inst = mock.Mock(memcache.Client)
|
||||
memcache_inst.get = lambda x: data.get(x)
|
||||
memcache_inst.get_multi = lambda keys, key_prefix: dict(
|
||||
('%s' % n, data.get(key_prefix + '%s' % n))
|
||||
for n in [k for k, v in zip(keys, range(len(keys) - 1))])
|
||||
|
||||
with mock.patch('pickle.dump') as pickle_dump:
|
||||
fd = mock.Mock()
|
||||
dump.export_data(memcache_inst, fd)
|
||||
# self.assertEquals(total, pickle_dump.call_count)
|
||||
|
||||
expected_calls = [mock.call(('record:count', record_count), fd)]
|
||||
for i in range(record_count):
|
||||
expected_calls.append(mock.call(('record:%d' % i,
|
||||
data['record:%d' % i]), fd))
|
||||
pickle_dump.assert_has_calls(expected_calls, any_order=True)
|
Loading…
x
Reference in New Issue
Block a user