From 79deedf278d6d18f8c477c1a993154192dd3ad05 Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Thu, 14 Aug 2014 17:54:41 +0400 Subject: [PATCH] Fix record dump utility From memcached records are read in bulks. But memcache may return less records than asked if the total size of chunk is larger than internal limit. The dump utility should verify number of records and fall back to one-by-ne retrieval if needed. Change-Id: I795790e2b24828bef258aa881f4154a194c0cc1c --- stackalytics/processor/dump.py | 36 +++++++----------- tests/unit/test_dump.py | 68 ++++++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 22 deletions(-) create mode 100644 tests/unit/test_dump.py diff --git a/stackalytics/processor/dump.py b/stackalytics/processor/dump.py index 957161ad4..891b033ed 100644 --- a/stackalytics/processor/dump.py +++ b/stackalytics/processor/dump.py @@ -111,9 +111,20 @@ def export_data(memcached_inst, fd): key_prefix = key + ':' for record_id_set in utils.make_range(0, count, BULK_READ_SIZE): - for k, v in six.iteritems(memcached_inst.get_multi( - record_id_set, key_prefix)): - pickle.dump((key_prefix + str(k), v), fd) + # memcache limits the size of returned data to specific yet unknown + # chunk size, the code should verify that all requested records are + # returned an be able to fall back to one-by-one retrieval + + chunk = memcached_inst.get_multi(record_id_set, key_prefix) + if len(chunk) < len(record_id_set): + # retrieve one-by-one + for record_id in record_id_set: + key = key_prefix + str(record_id) + pickle.dump((key, memcached_inst.get(key)), fd) + else: + # dump the whole chunk + for k, v in six.iteritems(chunk): + pickle.dump((key_prefix + str(k), v), fd) for user_seq in range(memcached_inst.get('user:count') or 0): user = memcached_inst.get('user:%s' % user_seq) @@ -126,25 +137,6 @@ def export_data(memcached_inst, fd): pickle.dump(('user:%s' % email, user), fd) -def export_data_universal(memcached_inst, fd): - LOG.info('Exporting data from memcached') - slabs = memcached_inst.get_slabs() - for slab_number, slab in six.iteritems(slabs[0][1]): - count = int(slab['number']) - keys = memcached_inst.get_stats( - 'cachedump %s %s' % (slab_number, count))[0][1].keys() - - n = 0 - while n < count: - LOG.debug('Dumping slab %s, start record %s', slab_number, n) - - for k, v in six.iteritems(memcached_inst.get_multi( - keys[n: min(count, n + BULK_READ_SIZE)])): - pickle.dump((k, v), fd) - - n += BULK_READ_SIZE - - def _connect_to_memcached(uri): stripped = re.sub(MEMCACHED_URI_PREFIX, '', uri) if stripped: diff --git a/tests/unit/test_dump.py b/tests/unit/test_dump.py new file mode 100644 index 000000000..0741a2c7f --- /dev/null +++ b/tests/unit/test_dump.py @@ -0,0 +1,68 @@ +# Copyright (c) 2014 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import memcache +import mock +import testtools + +from stackalytics.processor import dump + + +class TestDump(testtools.TestCase): + + def _make_data(self, record_count): + data = {'record:count': record_count} + for i in range(record_count): + data['record:%d' % i] = i + return data + + def test_export_data_records(self): + record_count = 153 + data = self._make_data(record_count) + memcache_inst = mock.Mock(memcache.Client) + memcache_inst.get = lambda x: data.get(x) + memcache_inst.get_multi = lambda keys, key_prefix: dict( + ('%s' % n, data.get(key_prefix + '%s' % n)) for n in keys) + + with mock.patch('pickle.dump') as pickle_dump: + fd = mock.Mock() + dump.export_data(memcache_inst, fd) + # self.assertEquals(total, pickle_dump.call_count) + + expected_calls = [mock.call(('record:count', record_count), fd)] + for i in range(record_count): + expected_calls.append(mock.call(('record:%d' % i, + data['record:%d' % i]), fd)) + pickle_dump.assert_has_calls(expected_calls, any_order=True) + + def test_export_data_records_get_multi_truncates_chunk(self): + record_count = 153 + data = self._make_data(record_count) + memcache_inst = mock.Mock(memcache.Client) + memcache_inst.get = lambda x: data.get(x) + memcache_inst.get_multi = lambda keys, key_prefix: dict( + ('%s' % n, data.get(key_prefix + '%s' % n)) + for n in [k for k, v in zip(keys, range(len(keys) - 1))]) + + with mock.patch('pickle.dump') as pickle_dump: + fd = mock.Mock() + dump.export_data(memcache_inst, fd) + # self.assertEquals(total, pickle_dump.call_count) + + expected_calls = [mock.call(('record:count', record_count), fd)] + for i in range(record_count): + expected_calls.append(mock.call(('record:%d' % i, + data['record:%d' % i]), fd)) + pickle_dump.assert_has_calls(expected_calls, any_order=True)