Merged from trunk

This commit is contained in:
gholt 2010-11-18 08:36:48 -08:00
commit 04926dd806
10 changed files with 440 additions and 135 deletions

36
bin/st
View File

@ -821,7 +821,7 @@ from optparse import OptionParser
from os import environ, listdir, makedirs, utime from os import environ, listdir, makedirs, utime
from os.path import basename, dirname, getmtime, getsize, isdir, join from os.path import basename, dirname, getmtime, getsize, isdir, join
from Queue import Empty, Queue from Queue import Empty, Queue
from sys import argv, exit, stderr from sys import argv, exit, stderr, stdout
from threading import enumerate as threading_enumerate, Thread from threading import enumerate as threading_enumerate, Thread
from time import sleep from time import sleep
@ -969,14 +969,23 @@ def st_delete(options, args):
st_download_help = ''' st_download_help = '''
download --all OR download container [object] [object] ... download --all OR download container [object] [object] ...
Downloads everything in the account (with --all), or everything in a Downloads everything in the account (with --all), or everything in a
container, or a list of objects depending on the args given.'''.strip('\n') container, or a list of objects depending on the args given. Use
the -o [--output] <filename> option to redirect the output to a file
or if "-" then the just redirect to stdout. '''.strip('\n')
def st_download(options, args): def st_download(options, args):
if (not args and not options.yes_all) or (args and options.yes_all): if (not args and not options.yes_all) or (args and options.yes_all):
options.error_queue.put('Usage: %s [options] %s' % options.error_queue.put('Usage: %s [options] %s' %
(basename(argv[0]), st_download_help)) (basename(argv[0]), st_download_help))
return return
object_queue = Queue(10000) object_queue = Queue(10000)
def _download_object((container, obj), conn): def _download_object(queue_arg, conn):
if len(queue_arg) == 2:
container, obj = queue_arg
out_file = None
elif len(queue_arg) == 3:
container, obj, out_file = queue_arg
else:
raise Exception("Invalid queue_arg length of %s" % len(queue_arg))
try: try:
headers, body = \ headers, body = \
conn.get_object(container, obj, resp_chunk_size=65536) conn.get_object(container, obj, resp_chunk_size=65536)
@ -998,7 +1007,12 @@ def st_download(options, args):
dirpath = dirname(path) dirpath = dirname(path)
if dirpath and not isdir(dirpath): if dirpath and not isdir(dirpath):
mkdirs(dirpath) mkdirs(dirpath)
fp = open(path, 'wb') if out_file == "-":
fp = stdout
elif out_file:
fp = open(out_file, 'wb')
else:
fp = open(path, 'wb')
read_length = 0 read_length = 0
md5sum = md5() md5sum = md5()
for chunk in body : for chunk in body :
@ -1013,7 +1027,7 @@ def st_download(options, args):
options.error_queue.put( options.error_queue.put(
'%s: read_length != content_length, %d != %d' % '%s: read_length != content_length, %d != %d' %
(path, read_length, content_length)) (path, read_length, content_length))
if 'x-object-meta-mtime' in headers: if 'x-object-meta-mtime' in headers and not options.out_file:
mtime = float(headers['x-object-meta-mtime']) mtime = float(headers['x-object-meta-mtime'])
utime(path, (mtime, mtime)) utime(path, (mtime, mtime))
if options.verbose: if options.verbose:
@ -1070,8 +1084,12 @@ def st_download(options, args):
elif len(args) == 1: elif len(args) == 1:
_download_container(args[0], create_connection()) _download_container(args[0], create_connection())
else: else:
for obj in args[1:]: if len(args) == 2:
object_queue.put((args[0], obj)) obj = args[1]
object_queue.put((args[0], obj, options.out_file))
else:
for obj in args[1:]:
object_queue.put((args[0], obj))
while not container_queue.empty(): while not container_queue.empty():
sleep(0.01) sleep(0.01)
for thread in container_threads: for thread in container_threads:
@ -1438,10 +1456,14 @@ Example:
help='User name for obtaining an auth token') help='User name for obtaining an auth token')
parser.add_option('-K', '--key', dest='key', parser.add_option('-K', '--key', dest='key',
help='Key for obtaining an auth token') help='Key for obtaining an auth token')
parser.add_option('-o', '--output', dest='out_file',
help='For a single file download stream the output other location ')
args = argv[1:] args = argv[1:]
if not args: if not args:
args.append('-h') args.append('-h')
(options, args) = parser.parse_args(args) (options, args) = parser.parse_args(args)
if options.out_file == '-':
options.verbose = 0
required_help = ''' required_help = '''
Requires ST_AUTH, ST_USER, and ST_KEY environment variables be set or Requires ST_AUTH, ST_USER, and ST_KEY environment variables be set or

View File

@ -0,0 +1,65 @@
ul.todo_list {
list-style-type: none;
margin: 0;
padding: 0;
}
ul.todo_list li {
display: block;
margin: 0;
padding: 7px 0;
border-top: 1px solid #eee;
}
ul.todo_list li p {
display: inline;
}
ul.todo_list li p.link {
font-weight: bold;
}
ul.todo_list li p.details {
font-style: italic;
}
ul.todo_list li {
}
div.admonition {
border: 1px solid #8F1000;
}
div.admonition p.admonition-title {
background-color: #8F1000;
border-bottom: 1px solid #8E8E8E;
}
a {
color: #CF2F19;
}
div.related ul li a {
color: #CF2F19;
}
div.sphinxsidebar h4 {
background-color:#8E8E8E;
border:1px solid #255E6E;
color:white;
font-size:1em;
margin:1em 0 0.5em;
padding:0.1em 0 0.1em 0.5em;
}
em {
font-style: normal;
}
table.docutils {
font-size: 11px;
}
a tt {
color:#CF2F19;
}

View File

@ -0,0 +1,2 @@
{% extends "sphinxdoc/layout.html" %}
{% set css_files = css_files + ['_static/tweaks.css'] %}

View File

@ -0,0 +1,5 @@
[theme]
inherit = sphinxdoc
stylesheet = sphinxdoc.css
pygments_style = friendly

View File

@ -114,7 +114,9 @@ modindex_common_prefix = ['swift.']
# The theme to use for HTML and HTML Help pages. Major themes that come with # The theme to use for HTML and HTML Help pages. Major themes that come with
# Sphinx are currently 'default' and 'sphinxdoc'. # Sphinx are currently 'default' and 'sphinxdoc'.
html_theme = 'default' # html_theme = 'default'
html_theme_path = ["."]
html_theme = '_theme'
# Theme options are theme-specific and customize the look and feel of a theme # Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the # further. For a list of options available for each theme, see the

View File

@ -60,6 +60,7 @@ If you are going to use a separate partition for Swift data, be sure to add anot
mkdir /var/run/swift mkdir /var/run/swift
chown <your-user-name>:<your-group-name> /var/run/swift chown <your-user-name>:<your-group-name> /var/run/swift
#. Next, skip to :ref:`rsync-section`.
.. _loopback-section: .. _loopback-section:
@ -87,6 +88,8 @@ If you want to use a loopback device instead of another partition, follow these
mkdir /var/run/swift mkdir /var/run/swift
chown <your-user-name>:<your-group-name> /var/run/swift chown <your-user-name>:<your-group-name> /var/run/swift
.. _rsync-section:
---------------- ----------------
Setting up rsync Setting up rsync
---------------- ----------------

View File

@ -1,20 +1,38 @@
.. Swift documentation master file, created by ..
sphinx-quickstart on Tue May 18 13:50:15 2010. Copyright 2010 OpenStack LLC
You can adapt this file completely to your liking, but it should at least All Rights Reserved.
contain the root `toctree` directive.
Licensed under the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with the License. You may obtain
a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
Welcome to Swift's documentation! Welcome to Swift's documentation!
================================= =================================
Swift is a highly available, distributed, eventually consistent object/blob Swift is a highly available, distributed, eventually consistent object/blob
store. store. Organizations can use Swift to store lots of data efficiently, safely, and cheaply.
This documentation is generated by the Sphinx toolkit and lives in the source
tree. Additional documentation on Swift and other components of OpenStack can
be found on the `OpenStack wiki`_.
.. _`OpenStack wiki`: http://wiki.openstack.org
.. toctree:: .. toctree::
:maxdepth: 1 :maxdepth: 1
getting_started getting_started
Overview: Overview and Concepts
=====================
.. toctree:: .. toctree::
:maxdepth: 1 :maxdepth: 1
@ -27,7 +45,8 @@ Overview:
overview_stats overview_stats
ratelimit ratelimit
Development: Developer Documentation
=======================
.. toctree:: .. toctree::
:maxdepth: 1 :maxdepth: 1
@ -36,7 +55,8 @@ Development:
development_saio development_saio
development_auth development_auth
Deployment: Administrator Documentation
===========================
.. toctree:: .. toctree::
:maxdepth: 1 :maxdepth: 1
@ -46,14 +66,16 @@ Deployment:
admin_guide admin_guide
debian_package_guide debian_package_guide
End User Guides: End User Guides
===============
.. toctree:: .. toctree::
:maxdepth: 1 :maxdepth: 1
howto_cyberduck howto_cyberduck
Source: Source Documentation
====================
.. toctree:: .. toctree::
:maxdepth: 2 :maxdepth: 2

View File

@ -29,7 +29,6 @@ from urllib import quote
from contextlib import contextmanager from contextlib import contextmanager
import ctypes import ctypes
import ctypes.util import ctypes.util
import fcntl
import struct import struct
from ConfigParser import ConfigParser, NoSectionError, NoOptionError from ConfigParser import ConfigParser, NoSectionError, NoOptionError
from tempfile import mkstemp from tempfile import mkstemp

View File

@ -61,7 +61,7 @@ def hash_suffix(path, reclaim_age):
elif files: elif files:
files.sort(reverse=True) files.sort(reverse=True)
meta = data = tomb = None meta = data = tomb = None
for filename in files: for filename in list(files):
if not meta and filename.endswith('.meta'): if not meta and filename.endswith('.meta'):
meta = filename meta = filename
if not data and filename.endswith('.data'): if not data and filename.endswith('.data'):
@ -232,7 +232,7 @@ class ObjectReplicator(Daemon):
""" """
Execute the rsync binary to replicate a partition. Execute the rsync binary to replicate a partition.
:returns: a tuple of (rsync exit code, rsync standard output) :returns: return code of rsync process. 0 is successful
""" """
start_time = time.time() start_time = time.time()
ret_val = None ret_val = None
@ -470,6 +470,40 @@ class ObjectReplicator(Daemon):
self.kill_coros() self.kill_coros()
self.last_replication_count = self.replication_count self.last_replication_count = self.replication_count
def collect_jobs(self):
"""
Returns a sorted list of jobs (dictionaries) that specify the
partitions, nodes, etc to be rsynced.
"""
jobs = []
ips = whataremyips()
for local_dev in [dev for dev in self.object_ring.devs
if dev and dev['ip'] in ips and dev['port'] == self.port]:
dev_path = join(self.devices_dir, local_dev['device'])
obj_path = join(dev_path, 'objects')
tmp_path = join(dev_path, 'tmp')
if self.mount_check and not os.path.ismount(dev_path):
self.logger.warn('%s is not mounted' % local_dev['device'])
continue
unlink_older_than(tmp_path, time.time() - self.reclaim_age)
if not os.path.exists(obj_path):
continue
for partition in os.listdir(obj_path):
try:
nodes = [node for node in
self.object_ring.get_part_nodes(int(partition))
if node['id'] != local_dev['id']]
jobs.append(dict(path=join(obj_path, partition),
nodes=nodes, delete=len(nodes) > 2,
partition=partition))
except ValueError:
continue
random.shuffle(jobs)
# Partititons that need to be deleted take priority
jobs.sort(key=lambda job: not job['delete'])
self.job_count = len(jobs)
return jobs
def replicate(self): def replicate(self):
"""Run a replication pass""" """Run a replication pass"""
self.start = time.time() self.start = time.time()
@ -479,38 +513,11 @@ class ObjectReplicator(Daemon):
self.replication_count = 0 self.replication_count = 0
self.last_replication_count = -1 self.last_replication_count = -1
self.partition_times = [] self.partition_times = []
jobs = []
stats = eventlet.spawn(self.heartbeat) stats = eventlet.spawn(self.heartbeat)
lockup_detector = eventlet.spawn(self.detect_lockups) lockup_detector = eventlet.spawn(self.detect_lockups)
try: try:
ips = whataremyips()
self.run_pool = GreenPool(size=self.concurrency) self.run_pool = GreenPool(size=self.concurrency)
for local_dev in [ jobs = self.collect_jobs()
dev for dev in self.object_ring.devs
if dev and dev['ip'] in ips and dev['port'] == self.port]:
dev_path = join(self.devices_dir, local_dev['device'])
obj_path = join(dev_path, 'objects')
tmp_path = join(dev_path, 'tmp')
if self.mount_check and not os.path.ismount(dev_path):
self.logger.warn('%s is not mounted' % local_dev['device'])
continue
unlink_older_than(tmp_path, time.time() - self.reclaim_age)
if not os.path.exists(obj_path):
continue
for partition in os.listdir(obj_path):
try:
nodes = [node for node in
self.object_ring.get_part_nodes(int(partition))
if node['id'] != local_dev['id']]
jobs.append(dict(path=join(obj_path, partition),
nodes=nodes, delete=len(nodes) > 2,
partition=partition))
except ValueError:
continue
random.shuffle(jobs)
# Partititons that need to be deleted take priority
jobs.sort(key=lambda job: not job['delete'])
self.job_count = len(jobs)
for job in jobs: for job in jobs:
if not self.check_ring(): if not self.check_ring():
self.logger.info( self.logger.info(

View File

@ -22,46 +22,88 @@ from shutil import rmtree
import cPickle as pickle import cPickle as pickle
import logging import logging
import fcntl import fcntl
import time
import tempfile
from contextlib import contextmanager from contextlib import contextmanager
from eventlet import tpool
from eventlet.green import subprocess from eventlet.green import subprocess
from swift.common.utils import hash_path, mkdirs, normalize_timestamp
from swift.obj import replicator as object_replicator
from swift.common import ring from swift.common import ring
from swift.obj import replicator as object_replicator
from swift.obj.server import DiskFile
def _ips(): def _ips():
return ['127.0.0.0',] return ['127.0.0.0']
object_replicator.whataremyips = _ips object_replicator.whataremyips = _ips
class NullHandler(logging.Handler):
def emit(self, record): def mock_http_connect(status):
pass
null_logger = logging.getLogger("testing") class FakeConn(object):
null_logger.addHandler(NullHandler())
def __init__(self, status, *args, **kwargs):
self.status = status
self.reason = 'Fake'
self.host = args[0]
self.port = args[1]
self.method = args[4]
self.path = args[5]
self.with_exc = False
self.headers = kwargs.get('headers', {})
def getresponse(self):
if self.with_exc:
raise Exception('test')
return self
def getheader(self, header):
return self.headers[header]
def read(self, amt=None):
return pickle.dumps({})
def close(self):
return
return lambda *args, **kwargs: FakeConn(status, *args, **kwargs)
process_errors = []
class MockProcess(object): class MockProcess(object):
ret_code = None ret_code = None
ret_log = None ret_log = None
check_args = None
class Stream(object): class Stream(object):
def read(self): def read(self):
return MockProcess.ret_log.next() return MockProcess.ret_log.next()
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
targs = MockProcess.check_args.next()
for targ in targs:
if targ not in args[0]:
process_errors.append("Invalid: %s not in %s" % (targ,
args))
self.stdout = self.Stream() self.stdout = self.Stream()
def wait(self): def wait(self):
return self.ret_code.next() return self.ret_code.next()
@contextmanager @contextmanager
def _mock_process(ret): def _mock_process(ret):
orig_process = subprocess.Popen orig_process = subprocess.Popen
MockProcess.ret_code = (i[0] for i in ret) MockProcess.ret_code = (i[0] for i in ret)
MockProcess.ret_log = (i[1] for i in ret) MockProcess.ret_log = (i[1] for i in ret)
MockProcess.check_args = (i[2] for i in ret)
object_replicator.subprocess.Popen = MockProcess object_replicator.subprocess.Popen = MockProcess
yield yield
object_replicator.subprocess.Popen = orig_process object_replicator.subprocess.Popen = orig_process
def _create_test_ring(path): def _create_test_ring(path):
testgz = os.path.join(path, 'object.ring.gz') testgz = os.path.join(path, 'object.ring.gz')
intended_replica2part2dev_id = [ intended_replica2part2dev_id = [
@ -90,7 +132,7 @@ class TestObjectReplicator(unittest.TestCase):
def setUp(self): def setUp(self):
# Setup a test ring (stolen from common/test_ring.py) # Setup a test ring (stolen from common/test_ring.py)
self.testdir = os.path.join('/dev/shm', 'test_replicator') self.testdir = tempfile.mkdtemp()
self.devices = os.path.join(self.testdir, 'node') self.devices = os.path.join(self.testdir, 'node')
rmtree(self.testdir, ignore_errors=1) rmtree(self.testdir, ignore_errors=1)
os.mkdir(self.testdir) os.mkdir(self.testdir)
@ -98,7 +140,9 @@ class TestObjectReplicator(unittest.TestCase):
os.mkdir(os.path.join(self.devices, 'sda')) os.mkdir(os.path.join(self.devices, 'sda'))
self.objects = os.path.join(self.devices, 'sda', 'objects') self.objects = os.path.join(self.devices, 'sda', 'objects')
os.mkdir(self.objects) os.mkdir(self.objects)
for part in ['0','1','2', '3']: self.parts = {}
for part in ['0', '1', '2', '3']:
self.parts[part] = os.path.join(self.objects, part)
os.mkdir(os.path.join(self.objects, part)) os.mkdir(os.path.join(self.objects, part))
self.ring = _create_test_ring(self.testdir) self.ring = _create_test_ring(self.testdir)
self.conf = dict( self.conf = dict(
@ -107,87 +151,221 @@ class TestObjectReplicator(unittest.TestCase):
self.replicator = object_replicator.ObjectReplicator( self.replicator = object_replicator.ObjectReplicator(
self.conf) self.conf)
# def test_check_ring(self): def tearDown(self):
# self.replicator.collect_jobs('sda', 0, self.ring) process_errors = []
# self.assertTrue(self.replicator.check_ring()) rmtree(self.testdir, ignore_errors=1)
# orig_check = self.replicator.next_check
# self.replicator.next_check = orig_check - 30 def test_run_once(self):
# self.assertTrue(self.replicator.check_ring()) replicator = object_replicator.ObjectReplicator(
# self.replicator.next_check = orig_check dict(swift_dir=self.testdir, devices=self.devices,
# orig_ring_time = self.replicator.object_ring._mtime mount_check='false', timeout='300', stats_interval='1'))
# self.replicator.object_ring._mtime = orig_ring_time - 30 was_connector = object_replicator.http_connect
# self.assertTrue(self.replicator.check_ring()) object_replicator.http_connect = mock_http_connect(200)
# self.replicator.next_check = orig_check - 30 cur_part = '0'
# self.assertFalse(self.replicator.check_ring()) df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
# mkdirs(df.datadir)
# def test_collect_jobs(self): f = open(os.path.join(df.datadir,
# self.replicator.collect_jobs('sda', 0, self.ring) normalize_timestamp(time.time()) + '.data'),
# self.assertTrue('1' in self.replicator.parts_to_delete) 'wb')
# self.assertEquals( f.write('1234567890')
# [node['id'] for node in self.replicator.partitions['0']['nodes']], f.close()
# [1,2]) ohash = hash_path('a', 'c', 'o')
# self.assertEquals( data_dir = ohash[-3:]
# [node['id'] for node in self.replicator.partitions['1']['nodes']], whole_path_from = os.path.join(self.objects, cur_part, data_dir)
# [1,2,3]) process_arg_checker = []
# self.assertEquals( nodes = [node for node in
# [node['id'] for node in self.replicator.partitions['2']['nodes']], self.ring.get_part_nodes(int(cur_part)) \
# [2,3]) if node['ip'] not in _ips()]
# self.assertEquals( for node in nodes:
# [node['id'] for node in self.replicator.partitions['3']['nodes']], rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], cur_part)
# [3,1]) process_arg_checker.append(
# for part in ['0', '1', '2', '3']: (0, '', ['rsync', whole_path_from, rsync_mod]))
# self.assertEquals(self.replicator.partitions[part]['device'], 'sda') with _mock_process(process_arg_checker):
# self.assertEquals(self.replicator.partitions[part]['path'], replicator.run_once()
# self.objects) self.assertFalse(process_errors)
#
# def test_delete_partition(self): object_replicator.http_connect = was_connector
# self.replicator.collect_jobs('sda', 0, self.ring)
# part_path = os.path.join(self.objects, '1') def test_hash_suffix_one_file(self):
# self.assertTrue(os.access(part_path, os.F_OK)) df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
# self.replicator.delete_partition('1') mkdirs(df.datadir)
# self.assertFalse(os.access(part_path, os.F_OK)) f = open(os.path.join(df.datadir,
# normalize_timestamp(time.time() - 100) + '.ts'),
# def test_rsync(self): 'wb')
# self.replicator.collect_jobs('sda', 0, self.ring) f.write('1234567890')
# with _mock_process([(0,''), (0,''), (0,'')]): f.close()
# self.replicator.rsync('0') ohash = hash_path('a', 'c', 'o')
# data_dir = ohash[-3:]
# def test_rsync_delete_no(self): whole_path_from = os.path.join(self.objects, '0', data_dir)
# self.replicator.collect_jobs('sda', 0, self.ring) object_replicator.hash_suffix(whole_path_from, 101)
# with _mock_process([(-1, "stuff in log"), (-1, "stuff in log"), self.assertEquals(len(os.listdir(self.parts['0'])), 1)
# (0,''), (0,'')]):
# self.replicator.rsync('1') object_replicator.hash_suffix(whole_path_from, 99)
# self.assertEquals(self.replicator.parts_to_delete['1'], self.assertEquals(len(os.listdir(self.parts['0'])), 0)
# [False, True, True])
# def test_hash_suffix_multi_file_one(self):
# def test_rsync_delete_yes(self): df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
# self.replicator.collect_jobs('sda', 0, self.ring) mkdirs(df.datadir)
# with _mock_process([(0,''), (0,''), (0,'')]): for tdiff in [1, 50, 100, 500]:
# self.replicator.rsync('1') for suff in ['.meta', '.data', '.ts']:
# self.assertEquals(self.replicator.parts_to_delete['1'], f = open(os.path.join(df.datadir,
# [True, True, True]) normalize_timestamp(int(time.time()) - tdiff) + suff),
# 'wb')
# def test_rsync_delete_yes_with_failure(self): f.write('1234567890')
# self.replicator.collect_jobs('sda', 0, self.ring) f.close()
# with _mock_process([(-1, "stuff in log"), (0, ''), (0,''), (0,'')]):
# self.replicator.rsync('1') ohash = hash_path('a', 'c', 'o')
# self.assertEquals(self.replicator.parts_to_delete['1'], data_dir = ohash[-3:]
# [True, True, True]) whole_path_from = os.path.join(self.objects, '0', data_dir)
# hsh_path = os.listdir(whole_path_from)[0]
# def test_rsync_failed_drive(self): whole_hsh_path = os.path.join(whole_path_from, hsh_path)
# self.replicator.collect_jobs('sda', 0, self.ring)
# with _mock_process([(12,'There was an error in file IO'), object_replicator.hash_suffix(whole_path_from, 99)
# (0,''), (0,''), (0,'')]): # only the tombstone should be left
# self.replicator.rsync('1') self.assertEquals(len(os.listdir(whole_hsh_path)), 1)
# self.assertEquals(self.replicator.parts_to_delete['1'],
# [True, True, True]) def test_hash_suffix_multi_file_two(self):
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
mkdirs(df.datadir)
for tdiff in [1, 50, 100, 500]:
suffs = ['.meta', '.data']
if tdiff > 50:
suffs.append('.ts')
for suff in suffs:
f = open(os.path.join(df.datadir,
normalize_timestamp(int(time.time()) - tdiff) + suff),
'wb')
f.write('1234567890')
f.close()
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, '0', data_dir)
hsh_path = os.listdir(whole_path_from)[0]
whole_hsh_path = os.path.join(whole_path_from, hsh_path)
object_replicator.hash_suffix(whole_path_from, 99)
# only the meta and data should be left
self.assertEquals(len(os.listdir(whole_hsh_path)), 2)
def test_invalidate_hash(self):
def assertFileData(file_path, data):
with open(file_path, 'r') as fp:
fdata = fp.read()
self.assertEquals(fdata, data)
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
mkdirs(df.datadir)
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, '0', data_dir)
hashes_file = os.path.join(self.objects, '0',
object_replicator.HASH_FILE)
# test that non existant file except caught
self.assertEquals(object_replicator.invalidate_hash(whole_path_from),
None)
# test that hashes get cleared
check_pickle_data = pickle.dumps({data_dir: None},
object_replicator.PICKLE_PROTOCOL)
for data_hash in [{data_dir: None}, {data_dir: 'abcdefg'}]:
with open(hashes_file, 'wb') as fp:
pickle.dump(data_hash, fp, object_replicator.PICKLE_PROTOCOL)
object_replicator.invalidate_hash(whole_path_from)
assertFileData(hashes_file, check_pickle_data)
def test_check_ring(self):
self.assertTrue(self.replicator.check_ring())
orig_check = self.replicator.next_check
self.replicator.next_check = orig_check - 30
self.assertTrue(self.replicator.check_ring())
self.replicator.next_check = orig_check
orig_ring_time = self.replicator.object_ring._mtime
self.replicator.object_ring._mtime = orig_ring_time - 30
self.assertTrue(self.replicator.check_ring())
self.replicator.next_check = orig_check - 30
self.assertFalse(self.replicator.check_ring())
def test_collect_jobs(self):
jobs = self.replicator.collect_jobs()
jobs_to_delete = [j for j in jobs if j['delete']]
jobs_to_keep = [j for j in jobs if not j['delete']]
jobs_by_part = {}
for job in jobs:
jobs_by_part[job['partition']] = job
self.assertEquals(len(jobs_to_delete), 1)
self.assertTrue('1', jobs_to_delete[0]['partition'])
self.assertEquals(
[node['id'] for node in jobs_by_part['0']['nodes']], [1, 2])
self.assertEquals(
[node['id'] for node in jobs_by_part['1']['nodes']], [1, 2, 3])
self.assertEquals(
[node['id'] for node in jobs_by_part['2']['nodes']], [2, 3])
self.assertEquals(
[node['id'] for node in jobs_by_part['3']['nodes']], [3, 1])
for part in ['0', '1', '2', '3']:
for node in jobs_by_part[part]['nodes']:
self.assertEquals(node['device'], 'sda')
self.assertEquals(jobs_by_part[part]['path'],
os.path.join(self.objects, part))
def test_delete_partition(self):
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
mkdirs(df.datadir)
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
self.replicator.replicate()
self.assertFalse(os.access(part_path, os.F_OK))
def test_run_once_recover_from_failure(self):
replicator = object_replicator.ObjectReplicator(
dict(swift_dir=self.testdir, devices=self.devices,
mount_check='false', timeout='300', stats_interval='1'))
was_connector = object_replicator.http_connect
object_replicator.http_connect = mock_http_connect(200)
# Write some files into '1' and run replicate- they should be moved
# to the other partitoins and then node should get deleted.
cur_part = '1'
df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
mkdirs(df.datadir)
f = open(os.path.join(df.datadir,
normalize_timestamp(time.time()) + '.data'),
'wb')
f.write('1234567890')
f.close()
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, cur_part, data_dir)
process_arg_checker = []
nodes = [node for node in
self.ring.get_part_nodes(int(cur_part)) \
if node['ip'] not in _ips()]
for node in nodes:
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], cur_part)
process_arg_checker.append(
(0, '', ['rsync', whole_path_from, rsync_mod]))
self.assertTrue(os.access(os.path.join(self.objects,
'1', data_dir, ohash),
os.F_OK))
with _mock_process(process_arg_checker):
replicator.run_once()
self.assertFalse(process_errors)
for i, result in [('0', True), ('1', False),
('2', True), ('3', True)]:
self.assertEquals(os.access(
os.path.join(self.objects,
i, object_replicator.HASH_FILE),
os.F_OK), result)
object_replicator.http_connect = was_connector
def test_run(self): def test_run(self):
with _mock_process([(0,'')]*100): with _mock_process([(0, '')] * 100):
self.replicator.replicate() self.replicator.replicate()
def test_run_withlog(self): def test_run_withlog(self):
with _mock_process([(0,"stuff in log")]*100): with _mock_process([(0, "stuff in log")] * 100):
self.replicator.replicate() self.replicator.replicate()
if __name__ == '__main__': if __name__ == '__main__':