diff --git a/bin/st b/bin/st index 1a4bf34312..e65a0ac0b9 100755 --- a/bin/st +++ b/bin/st @@ -821,7 +821,7 @@ from optparse import OptionParser from os import environ, listdir, makedirs, utime from os.path import basename, dirname, getmtime, getsize, isdir, join from Queue import Empty, Queue -from sys import argv, exit, stderr +from sys import argv, exit, stderr, stdout from threading import enumerate as threading_enumerate, Thread from time import sleep @@ -969,14 +969,23 @@ def st_delete(options, args): st_download_help = ''' download --all OR download container [object] [object] ... Downloads everything in the account (with --all), or everything in a - container, or a list of objects depending on the args given.'''.strip('\n') + container, or a list of objects depending on the args given. Use + the -o [--output] option to redirect the output to a file + or if "-" then the just redirect to stdout. '''.strip('\n') def st_download(options, args): if (not args and not options.yes_all) or (args and options.yes_all): options.error_queue.put('Usage: %s [options] %s' % (basename(argv[0]), st_download_help)) return object_queue = Queue(10000) - def _download_object((container, obj), conn): + def _download_object(queue_arg, conn): + if len(queue_arg) == 2: + container, obj = queue_arg + out_file = None + elif len(queue_arg) == 3: + container, obj, out_file = queue_arg + else: + raise Exception("Invalid queue_arg length of %s" % len(queue_arg)) try: headers, body = \ conn.get_object(container, obj, resp_chunk_size=65536) @@ -998,7 +1007,12 @@ def st_download(options, args): dirpath = dirname(path) if dirpath and not isdir(dirpath): mkdirs(dirpath) - fp = open(path, 'wb') + if out_file == "-": + fp = stdout + elif out_file: + fp = open(out_file, 'wb') + else: + fp = open(path, 'wb') read_length = 0 md5sum = md5() for chunk in body : @@ -1013,7 +1027,7 @@ def st_download(options, args): options.error_queue.put( '%s: read_length != content_length, %d != %d' % (path, read_length, content_length)) - if 'x-object-meta-mtime' in headers: + if 'x-object-meta-mtime' in headers and not options.out_file: mtime = float(headers['x-object-meta-mtime']) utime(path, (mtime, mtime)) if options.verbose: @@ -1070,8 +1084,12 @@ def st_download(options, args): elif len(args) == 1: _download_container(args[0], create_connection()) else: - for obj in args[1:]: - object_queue.put((args[0], obj)) + if len(args) == 2: + obj = args[1] + object_queue.put((args[0], obj, options.out_file)) + else: + for obj in args[1:]: + object_queue.put((args[0], obj)) while not container_queue.empty(): sleep(0.01) for thread in container_threads: @@ -1438,10 +1456,14 @@ Example: help='User name for obtaining an auth token') parser.add_option('-K', '--key', dest='key', help='Key for obtaining an auth token') + parser.add_option('-o', '--output', dest='out_file', + help='For a single file download stream the output other location ') args = argv[1:] if not args: args.append('-h') (options, args) = parser.parse_args(args) + if options.out_file == '-': + options.verbose = 0 required_help = ''' Requires ST_AUTH, ST_USER, and ST_KEY environment variables be set or diff --git a/doc/source/_static/tweaks.css b/doc/source/_static/tweaks.css new file mode 100644 index 0000000000..16cd6e76e2 --- /dev/null +++ b/doc/source/_static/tweaks.css @@ -0,0 +1,65 @@ +ul.todo_list { + list-style-type: none; + margin: 0; + padding: 0; +} + +ul.todo_list li { + display: block; + margin: 0; + padding: 7px 0; + border-top: 1px solid #eee; +} + +ul.todo_list li p { + display: inline; +} + +ul.todo_list li p.link { + font-weight: bold; +} + +ul.todo_list li p.details { + font-style: italic; +} + +ul.todo_list li { +} + +div.admonition { + border: 1px solid #8F1000; +} + +div.admonition p.admonition-title { + background-color: #8F1000; + border-bottom: 1px solid #8E8E8E; +} + +a { + color: #CF2F19; +} + +div.related ul li a { + color: #CF2F19; +} + +div.sphinxsidebar h4 { + background-color:#8E8E8E; + border:1px solid #255E6E; + color:white; + font-size:1em; + margin:1em 0 0.5em; + padding:0.1em 0 0.1em 0.5em; +} + +em { + font-style: normal; +} + +table.docutils { + font-size: 11px; +} + +a tt { + color:#CF2F19; +} \ No newline at end of file diff --git a/doc/source/_theme/layout.html b/doc/source/_theme/layout.html new file mode 100644 index 0000000000..ed1cab0a6e --- /dev/null +++ b/doc/source/_theme/layout.html @@ -0,0 +1,2 @@ +{% extends "sphinxdoc/layout.html" %} +{% set css_files = css_files + ['_static/tweaks.css'] %} diff --git a/doc/source/_theme/theme.conf b/doc/source/_theme/theme.conf new file mode 100644 index 0000000000..e039fe01f9 --- /dev/null +++ b/doc/source/_theme/theme.conf @@ -0,0 +1,5 @@ +[theme] +inherit = sphinxdoc +stylesheet = sphinxdoc.css +pygments_style = friendly + diff --git a/doc/source/conf.py b/doc/source/conf.py index 78b662c8a3..74dfce9947 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -114,7 +114,9 @@ modindex_common_prefix = ['swift.'] # The theme to use for HTML and HTML Help pages. Major themes that come with # Sphinx are currently 'default' and 'sphinxdoc'. -html_theme = 'default' +# html_theme = 'default' +html_theme_path = ["."] +html_theme = '_theme' # Theme options are theme-specific and customize the look and feel of a theme # further. For a list of options available for each theme, see the diff --git a/doc/source/development_saio.rst b/doc/source/development_saio.rst index 3a615074ce..a270753338 100644 --- a/doc/source/development_saio.rst +++ b/doc/source/development_saio.rst @@ -60,6 +60,7 @@ If you are going to use a separate partition for Swift data, be sure to add anot mkdir /var/run/swift chown : /var/run/swift + #. Next, skip to :ref:`rsync-section`. .. _loopback-section: @@ -87,6 +88,8 @@ If you want to use a loopback device instead of another partition, follow these mkdir /var/run/swift chown : /var/run/swift +.. _rsync-section: + ---------------- Setting up rsync ---------------- diff --git a/doc/source/index.rst b/doc/source/index.rst index 854aa4c64d..9b20293921 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -1,20 +1,38 @@ -.. Swift documentation master file, created by - sphinx-quickstart on Tue May 18 13:50:15 2010. - You can adapt this file completely to your liking, but it should at least - contain the root `toctree` directive. +.. + Copyright 2010 OpenStack LLC + All Rights Reserved. + Licensed under the Apache License, Version 2.0 (the "License"); you may + not use this file except in compliance with the License. You may obtain + a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + License for the specific language governing permissions and limitations + under the License. + Welcome to Swift's documentation! ================================= Swift is a highly available, distributed, eventually consistent object/blob -store. +store. Organizations can use Swift to store lots of data efficiently, safely, and cheaply. + +This documentation is generated by the Sphinx toolkit and lives in the source +tree. Additional documentation on Swift and other components of OpenStack can +be found on the `OpenStack wiki`_. + +.. _`OpenStack wiki`: http://wiki.openstack.org .. toctree:: :maxdepth: 1 getting_started -Overview: +Overview and Concepts +===================== .. toctree:: :maxdepth: 1 @@ -27,7 +45,8 @@ Overview: overview_stats ratelimit -Development: +Developer Documentation +======================= .. toctree:: :maxdepth: 1 @@ -36,7 +55,8 @@ Development: development_saio development_auth -Deployment: +Administrator Documentation +=========================== .. toctree:: :maxdepth: 1 @@ -46,14 +66,16 @@ Deployment: admin_guide debian_package_guide -End User Guides: +End User Guides +=============== .. toctree:: :maxdepth: 1 howto_cyberduck -Source: +Source Documentation +==================== .. toctree:: :maxdepth: 2 diff --git a/swift/common/utils.py b/swift/common/utils.py index bb635725c8..1e30b2f9ee 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -29,7 +29,6 @@ from urllib import quote from contextlib import contextmanager import ctypes import ctypes.util -import fcntl import struct from ConfigParser import ConfigParser, NoSectionError, NoOptionError from tempfile import mkstemp diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index 963f02e375..9b0294627e 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -61,7 +61,7 @@ def hash_suffix(path, reclaim_age): elif files: files.sort(reverse=True) meta = data = tomb = None - for filename in files: + for filename in list(files): if not meta and filename.endswith('.meta'): meta = filename if not data and filename.endswith('.data'): @@ -232,7 +232,7 @@ class ObjectReplicator(Daemon): """ Execute the rsync binary to replicate a partition. - :returns: a tuple of (rsync exit code, rsync standard output) + :returns: return code of rsync process. 0 is successful """ start_time = time.time() ret_val = None @@ -470,6 +470,40 @@ class ObjectReplicator(Daemon): self.kill_coros() self.last_replication_count = self.replication_count + def collect_jobs(self): + """ + Returns a sorted list of jobs (dictionaries) that specify the + partitions, nodes, etc to be rsynced. + """ + jobs = [] + ips = whataremyips() + for local_dev in [dev for dev in self.object_ring.devs + if dev and dev['ip'] in ips and dev['port'] == self.port]: + dev_path = join(self.devices_dir, local_dev['device']) + obj_path = join(dev_path, 'objects') + tmp_path = join(dev_path, 'tmp') + if self.mount_check and not os.path.ismount(dev_path): + self.logger.warn('%s is not mounted' % local_dev['device']) + continue + unlink_older_than(tmp_path, time.time() - self.reclaim_age) + if not os.path.exists(obj_path): + continue + for partition in os.listdir(obj_path): + try: + nodes = [node for node in + self.object_ring.get_part_nodes(int(partition)) + if node['id'] != local_dev['id']] + jobs.append(dict(path=join(obj_path, partition), + nodes=nodes, delete=len(nodes) > 2, + partition=partition)) + except ValueError: + continue + random.shuffle(jobs) + # Partititons that need to be deleted take priority + jobs.sort(key=lambda job: not job['delete']) + self.job_count = len(jobs) + return jobs + def replicate(self): """Run a replication pass""" self.start = time.time() @@ -479,38 +513,11 @@ class ObjectReplicator(Daemon): self.replication_count = 0 self.last_replication_count = -1 self.partition_times = [] - jobs = [] stats = eventlet.spawn(self.heartbeat) lockup_detector = eventlet.spawn(self.detect_lockups) try: - ips = whataremyips() self.run_pool = GreenPool(size=self.concurrency) - for local_dev in [ - dev for dev in self.object_ring.devs - if dev and dev['ip'] in ips and dev['port'] == self.port]: - dev_path = join(self.devices_dir, local_dev['device']) - obj_path = join(dev_path, 'objects') - tmp_path = join(dev_path, 'tmp') - if self.mount_check and not os.path.ismount(dev_path): - self.logger.warn('%s is not mounted' % local_dev['device']) - continue - unlink_older_than(tmp_path, time.time() - self.reclaim_age) - if not os.path.exists(obj_path): - continue - for partition in os.listdir(obj_path): - try: - nodes = [node for node in - self.object_ring.get_part_nodes(int(partition)) - if node['id'] != local_dev['id']] - jobs.append(dict(path=join(obj_path, partition), - nodes=nodes, delete=len(nodes) > 2, - partition=partition)) - except ValueError: - continue - random.shuffle(jobs) - # Partititons that need to be deleted take priority - jobs.sort(key=lambda job: not job['delete']) - self.job_count = len(jobs) + jobs = self.collect_jobs() for job in jobs: if not self.check_ring(): self.logger.info( diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index 06f7d74582..657570409d 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -22,46 +22,88 @@ from shutil import rmtree import cPickle as pickle import logging import fcntl +import time +import tempfile from contextlib import contextmanager +from eventlet import tpool from eventlet.green import subprocess - -from swift.obj import replicator as object_replicator +from swift.common.utils import hash_path, mkdirs, normalize_timestamp from swift.common import ring +from swift.obj import replicator as object_replicator +from swift.obj.server import DiskFile + def _ips(): - return ['127.0.0.0',] + return ['127.0.0.0'] object_replicator.whataremyips = _ips -class NullHandler(logging.Handler): - def emit(self, record): - pass -null_logger = logging.getLogger("testing") -null_logger.addHandler(NullHandler()) + +def mock_http_connect(status): + + class FakeConn(object): + + def __init__(self, status, *args, **kwargs): + self.status = status + self.reason = 'Fake' + self.host = args[0] + self.port = args[1] + self.method = args[4] + self.path = args[5] + self.with_exc = False + self.headers = kwargs.get('headers', {}) + + def getresponse(self): + if self.with_exc: + raise Exception('test') + return self + + def getheader(self, header): + return self.headers[header] + + def read(self, amt=None): + return pickle.dumps({}) + + def close(self): + return + return lambda *args, **kwargs: FakeConn(status, *args, **kwargs) + +process_errors = [] + class MockProcess(object): ret_code = None ret_log = None + check_args = None class Stream(object): + def read(self): return MockProcess.ret_log.next() def __init__(self, *args, **kwargs): + targs = MockProcess.check_args.next() + for targ in targs: + if targ not in args[0]: + process_errors.append("Invalid: %s not in %s" % (targ, + args)) self.stdout = self.Stream() def wait(self): return self.ret_code.next() + @contextmanager def _mock_process(ret): orig_process = subprocess.Popen MockProcess.ret_code = (i[0] for i in ret) MockProcess.ret_log = (i[1] for i in ret) + MockProcess.check_args = (i[2] for i in ret) object_replicator.subprocess.Popen = MockProcess yield object_replicator.subprocess.Popen = orig_process + def _create_test_ring(path): testgz = os.path.join(path, 'object.ring.gz') intended_replica2part2dev_id = [ @@ -90,7 +132,7 @@ class TestObjectReplicator(unittest.TestCase): def setUp(self): # Setup a test ring (stolen from common/test_ring.py) - self.testdir = os.path.join('/dev/shm', 'test_replicator') + self.testdir = tempfile.mkdtemp() self.devices = os.path.join(self.testdir, 'node') rmtree(self.testdir, ignore_errors=1) os.mkdir(self.testdir) @@ -98,7 +140,9 @@ class TestObjectReplicator(unittest.TestCase): os.mkdir(os.path.join(self.devices, 'sda')) self.objects = os.path.join(self.devices, 'sda', 'objects') os.mkdir(self.objects) - for part in ['0','1','2', '3']: + self.parts = {} + for part in ['0', '1', '2', '3']: + self.parts[part] = os.path.join(self.objects, part) os.mkdir(os.path.join(self.objects, part)) self.ring = _create_test_ring(self.testdir) self.conf = dict( @@ -107,87 +151,221 @@ class TestObjectReplicator(unittest.TestCase): self.replicator = object_replicator.ObjectReplicator( self.conf) -# def test_check_ring(self): -# self.replicator.collect_jobs('sda', 0, self.ring) -# self.assertTrue(self.replicator.check_ring()) -# orig_check = self.replicator.next_check -# self.replicator.next_check = orig_check - 30 -# self.assertTrue(self.replicator.check_ring()) -# self.replicator.next_check = orig_check -# orig_ring_time = self.replicator.object_ring._mtime -# self.replicator.object_ring._mtime = orig_ring_time - 30 -# self.assertTrue(self.replicator.check_ring()) -# self.replicator.next_check = orig_check - 30 -# self.assertFalse(self.replicator.check_ring()) -# -# def test_collect_jobs(self): -# self.replicator.collect_jobs('sda', 0, self.ring) -# self.assertTrue('1' in self.replicator.parts_to_delete) -# self.assertEquals( -# [node['id'] for node in self.replicator.partitions['0']['nodes']], -# [1,2]) -# self.assertEquals( -# [node['id'] for node in self.replicator.partitions['1']['nodes']], -# [1,2,3]) -# self.assertEquals( -# [node['id'] for node in self.replicator.partitions['2']['nodes']], -# [2,3]) -# self.assertEquals( -# [node['id'] for node in self.replicator.partitions['3']['nodes']], -# [3,1]) -# for part in ['0', '1', '2', '3']: -# self.assertEquals(self.replicator.partitions[part]['device'], 'sda') -# self.assertEquals(self.replicator.partitions[part]['path'], -# self.objects) -# -# def test_delete_partition(self): -# self.replicator.collect_jobs('sda', 0, self.ring) -# part_path = os.path.join(self.objects, '1') -# self.assertTrue(os.access(part_path, os.F_OK)) -# self.replicator.delete_partition('1') -# self.assertFalse(os.access(part_path, os.F_OK)) -# -# def test_rsync(self): -# self.replicator.collect_jobs('sda', 0, self.ring) -# with _mock_process([(0,''), (0,''), (0,'')]): -# self.replicator.rsync('0') -# -# def test_rsync_delete_no(self): -# self.replicator.collect_jobs('sda', 0, self.ring) -# with _mock_process([(-1, "stuff in log"), (-1, "stuff in log"), -# (0,''), (0,'')]): -# self.replicator.rsync('1') -# self.assertEquals(self.replicator.parts_to_delete['1'], -# [False, True, True]) -# -# def test_rsync_delete_yes(self): -# self.replicator.collect_jobs('sda', 0, self.ring) -# with _mock_process([(0,''), (0,''), (0,'')]): -# self.replicator.rsync('1') -# self.assertEquals(self.replicator.parts_to_delete['1'], -# [True, True, True]) -# -# def test_rsync_delete_yes_with_failure(self): -# self.replicator.collect_jobs('sda', 0, self.ring) -# with _mock_process([(-1, "stuff in log"), (0, ''), (0,''), (0,'')]): -# self.replicator.rsync('1') -# self.assertEquals(self.replicator.parts_to_delete['1'], -# [True, True, True]) -# -# def test_rsync_failed_drive(self): -# self.replicator.collect_jobs('sda', 0, self.ring) -# with _mock_process([(12,'There was an error in file IO'), -# (0,''), (0,''), (0,'')]): -# self.replicator.rsync('1') -# self.assertEquals(self.replicator.parts_to_delete['1'], -# [True, True, True]) + def tearDown(self): + process_errors = [] + rmtree(self.testdir, ignore_errors=1) + + def test_run_once(self): + replicator = object_replicator.ObjectReplicator( + dict(swift_dir=self.testdir, devices=self.devices, + mount_check='false', timeout='300', stats_interval='1')) + was_connector = object_replicator.http_connect + object_replicator.http_connect = mock_http_connect(200) + cur_part = '0' + df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') + mkdirs(df.datadir) + f = open(os.path.join(df.datadir, + normalize_timestamp(time.time()) + '.data'), + 'wb') + f.write('1234567890') + f.close() + ohash = hash_path('a', 'c', 'o') + data_dir = ohash[-3:] + whole_path_from = os.path.join(self.objects, cur_part, data_dir) + process_arg_checker = [] + nodes = [node for node in + self.ring.get_part_nodes(int(cur_part)) \ + if node['ip'] not in _ips()] + for node in nodes: + rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], cur_part) + process_arg_checker.append( + (0, '', ['rsync', whole_path_from, rsync_mod])) + with _mock_process(process_arg_checker): + replicator.run_once() + self.assertFalse(process_errors) + + object_replicator.http_connect = was_connector + + def test_hash_suffix_one_file(self): + df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') + mkdirs(df.datadir) + f = open(os.path.join(df.datadir, + normalize_timestamp(time.time() - 100) + '.ts'), + 'wb') + f.write('1234567890') + f.close() + ohash = hash_path('a', 'c', 'o') + data_dir = ohash[-3:] + whole_path_from = os.path.join(self.objects, '0', data_dir) + object_replicator.hash_suffix(whole_path_from, 101) + self.assertEquals(len(os.listdir(self.parts['0'])), 1) + + object_replicator.hash_suffix(whole_path_from, 99) + self.assertEquals(len(os.listdir(self.parts['0'])), 0) + + def test_hash_suffix_multi_file_one(self): + df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') + mkdirs(df.datadir) + for tdiff in [1, 50, 100, 500]: + for suff in ['.meta', '.data', '.ts']: + f = open(os.path.join(df.datadir, + normalize_timestamp(int(time.time()) - tdiff) + suff), + 'wb') + f.write('1234567890') + f.close() + + ohash = hash_path('a', 'c', 'o') + data_dir = ohash[-3:] + whole_path_from = os.path.join(self.objects, '0', data_dir) + hsh_path = os.listdir(whole_path_from)[0] + whole_hsh_path = os.path.join(whole_path_from, hsh_path) + + object_replicator.hash_suffix(whole_path_from, 99) + # only the tombstone should be left + self.assertEquals(len(os.listdir(whole_hsh_path)), 1) + + def test_hash_suffix_multi_file_two(self): + df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') + mkdirs(df.datadir) + for tdiff in [1, 50, 100, 500]: + suffs = ['.meta', '.data'] + if tdiff > 50: + suffs.append('.ts') + for suff in suffs: + f = open(os.path.join(df.datadir, + normalize_timestamp(int(time.time()) - tdiff) + suff), + 'wb') + f.write('1234567890') + f.close() + + ohash = hash_path('a', 'c', 'o') + data_dir = ohash[-3:] + whole_path_from = os.path.join(self.objects, '0', data_dir) + hsh_path = os.listdir(whole_path_from)[0] + whole_hsh_path = os.path.join(whole_path_from, hsh_path) + + object_replicator.hash_suffix(whole_path_from, 99) + # only the meta and data should be left + self.assertEquals(len(os.listdir(whole_hsh_path)), 2) + + def test_invalidate_hash(self): + + def assertFileData(file_path, data): + with open(file_path, 'r') as fp: + fdata = fp.read() + self.assertEquals(fdata, data) + + df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') + mkdirs(df.datadir) + ohash = hash_path('a', 'c', 'o') + data_dir = ohash[-3:] + whole_path_from = os.path.join(self.objects, '0', data_dir) + hashes_file = os.path.join(self.objects, '0', + object_replicator.HASH_FILE) + # test that non existant file except caught + self.assertEquals(object_replicator.invalidate_hash(whole_path_from), + None) + # test that hashes get cleared + check_pickle_data = pickle.dumps({data_dir: None}, + object_replicator.PICKLE_PROTOCOL) + for data_hash in [{data_dir: None}, {data_dir: 'abcdefg'}]: + with open(hashes_file, 'wb') as fp: + pickle.dump(data_hash, fp, object_replicator.PICKLE_PROTOCOL) + object_replicator.invalidate_hash(whole_path_from) + assertFileData(hashes_file, check_pickle_data) + + def test_check_ring(self): + self.assertTrue(self.replicator.check_ring()) + orig_check = self.replicator.next_check + self.replicator.next_check = orig_check - 30 + self.assertTrue(self.replicator.check_ring()) + self.replicator.next_check = orig_check + orig_ring_time = self.replicator.object_ring._mtime + self.replicator.object_ring._mtime = orig_ring_time - 30 + self.assertTrue(self.replicator.check_ring()) + self.replicator.next_check = orig_check - 30 + self.assertFalse(self.replicator.check_ring()) + + def test_collect_jobs(self): + jobs = self.replicator.collect_jobs() + jobs_to_delete = [j for j in jobs if j['delete']] + jobs_to_keep = [j for j in jobs if not j['delete']] + jobs_by_part = {} + for job in jobs: + jobs_by_part[job['partition']] = job + self.assertEquals(len(jobs_to_delete), 1) + self.assertTrue('1', jobs_to_delete[0]['partition']) + self.assertEquals( + [node['id'] for node in jobs_by_part['0']['nodes']], [1, 2]) + self.assertEquals( + [node['id'] for node in jobs_by_part['1']['nodes']], [1, 2, 3]) + self.assertEquals( + [node['id'] for node in jobs_by_part['2']['nodes']], [2, 3]) + self.assertEquals( + [node['id'] for node in jobs_by_part['3']['nodes']], [3, 1]) + for part in ['0', '1', '2', '3']: + for node in jobs_by_part[part]['nodes']: + self.assertEquals(node['device'], 'sda') + self.assertEquals(jobs_by_part[part]['path'], + os.path.join(self.objects, part)) + + def test_delete_partition(self): + df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') + mkdirs(df.datadir) + ohash = hash_path('a', 'c', 'o') + data_dir = ohash[-3:] + part_path = os.path.join(self.objects, '1') + self.assertTrue(os.access(part_path, os.F_OK)) + self.replicator.replicate() + self.assertFalse(os.access(part_path, os.F_OK)) + + def test_run_once_recover_from_failure(self): + replicator = object_replicator.ObjectReplicator( + dict(swift_dir=self.testdir, devices=self.devices, + mount_check='false', timeout='300', stats_interval='1')) + was_connector = object_replicator.http_connect + object_replicator.http_connect = mock_http_connect(200) + # Write some files into '1' and run replicate- they should be moved + # to the other partitoins and then node should get deleted. + cur_part = '1' + df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') + mkdirs(df.datadir) + f = open(os.path.join(df.datadir, + normalize_timestamp(time.time()) + '.data'), + 'wb') + f.write('1234567890') + f.close() + ohash = hash_path('a', 'c', 'o') + data_dir = ohash[-3:] + whole_path_from = os.path.join(self.objects, cur_part, data_dir) + process_arg_checker = [] + nodes = [node for node in + self.ring.get_part_nodes(int(cur_part)) \ + if node['ip'] not in _ips()] + for node in nodes: + rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], cur_part) + process_arg_checker.append( + (0, '', ['rsync', whole_path_from, rsync_mod])) + self.assertTrue(os.access(os.path.join(self.objects, + '1', data_dir, ohash), + os.F_OK)) + with _mock_process(process_arg_checker): + replicator.run_once() + self.assertFalse(process_errors) + for i, result in [('0', True), ('1', False), + ('2', True), ('3', True)]: + self.assertEquals(os.access( + os.path.join(self.objects, + i, object_replicator.HASH_FILE), + os.F_OK), result) + object_replicator.http_connect = was_connector def test_run(self): - with _mock_process([(0,'')]*100): + with _mock_process([(0, '')] * 100): self.replicator.replicate() def test_run_withlog(self): - with _mock_process([(0,"stuff in log")]*100): + with _mock_process([(0, "stuff in log")] * 100): self.replicator.replicate() if __name__ == '__main__':