Container Sync: Simple HTTP Proxy load balancing
Change-Id: I021b043b927153bacff48cae648d4d8c5bbad765
This commit is contained in:
parent
f538006bbf
commit
69d331d0d6
@ -153,7 +153,9 @@ use = egg:swift#recon
|
|||||||
# log_address = /dev/log
|
# log_address = /dev/log
|
||||||
#
|
#
|
||||||
# If you need to use an HTTP Proxy, set it here; defaults to no proxy.
|
# If you need to use an HTTP Proxy, set it here; defaults to no proxy.
|
||||||
# sync_proxy = http://127.0.0.1:8888
|
# You can also set this to a comma separated list of HTTP Proxies and they will
|
||||||
|
# be randomly used (simple load balancing).
|
||||||
|
# sync_proxy = http://10.1.1.1:8888,http://10.1.1.2:8888
|
||||||
#
|
#
|
||||||
# Will sync each container at most once per interval
|
# Will sync each container at most once per interval
|
||||||
# interval = 300
|
# interval = 300
|
||||||
|
@ -17,7 +17,7 @@ import os
|
|||||||
import uuid
|
import uuid
|
||||||
from swift import gettext_ as _
|
from swift import gettext_ as _
|
||||||
from time import ctime, time
|
from time import ctime, time
|
||||||
from random import random, shuffle
|
from random import choice, random, shuffle
|
||||||
from struct import unpack_from
|
from struct import unpack_from
|
||||||
|
|
||||||
from eventlet import sleep, Timeout
|
from eventlet import sleep, Timeout
|
||||||
@ -133,7 +133,10 @@ class ContainerSync(Daemon):
|
|||||||
h.strip()
|
h.strip()
|
||||||
for h in conf.get('allowed_sync_hosts', '127.0.0.1').split(',')
|
for h in conf.get('allowed_sync_hosts', '127.0.0.1').split(',')
|
||||||
if h.strip()]
|
if h.strip()]
|
||||||
self.proxy = conf.get('sync_proxy')
|
self.http_proxies = [
|
||||||
|
a.strip()
|
||||||
|
for a in conf.get('sync_proxy', '').split(',')
|
||||||
|
if a.strip()]
|
||||||
#: Number of containers with sync turned on that were successfully
|
#: Number of containers with sync turned on that were successfully
|
||||||
#: synced.
|
#: synced.
|
||||||
self.container_syncs = 0
|
self.container_syncs = 0
|
||||||
@ -352,7 +355,7 @@ class ContainerSync(Daemon):
|
|||||||
else:
|
else:
|
||||||
headers['x-container-sync-key'] = user_key
|
headers['x-container-sync-key'] = user_key
|
||||||
delete_object(sync_to, name=row['name'], headers=headers,
|
delete_object(sync_to, name=row['name'], headers=headers,
|
||||||
proxy=self.proxy)
|
proxy=self.select_http_proxy())
|
||||||
except ClientException as err:
|
except ClientException as err:
|
||||||
if err.http_status != HTTP_NOT_FOUND:
|
if err.http_status != HTTP_NOT_FOUND:
|
||||||
raise
|
raise
|
||||||
@ -415,7 +418,7 @@ class ContainerSync(Daemon):
|
|||||||
headers['x-container-sync-key'] = user_key
|
headers['x-container-sync-key'] = user_key
|
||||||
put_object(sync_to, name=row['name'], headers=headers,
|
put_object(sync_to, name=row['name'], headers=headers,
|
||||||
contents=FileLikeIter(body),
|
contents=FileLikeIter(body),
|
||||||
proxy=self.proxy)
|
proxy=self.select_http_proxy())
|
||||||
self.container_puts += 1
|
self.container_puts += 1
|
||||||
self.logger.increment('puts')
|
self.logger.increment('puts')
|
||||||
self.logger.timing_since('puts.timing', start_time)
|
self.logger.timing_since('puts.timing', start_time)
|
||||||
@ -448,3 +451,6 @@ class ContainerSync(Daemon):
|
|||||||
self.logger.increment('failures')
|
self.logger.increment('failures')
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
def select_http_proxy(self):
|
||||||
|
return choice(self.http_proxies) if self.http_proxies else None
|
||||||
|
@ -658,7 +658,7 @@ class TestContainerSync(unittest.TestCase):
|
|||||||
sync.delete_object = fake_delete_object
|
sync.delete_object = fake_delete_object
|
||||||
cs = sync.ContainerSync({}, container_ring=FakeRing(),
|
cs = sync.ContainerSync({}, container_ring=FakeRing(),
|
||||||
object_ring=FakeRing())
|
object_ring=FakeRing())
|
||||||
cs.proxy = 'http://proxy'
|
cs.http_proxies = ['http://proxy']
|
||||||
# Success
|
# Success
|
||||||
self.assertTrue(cs.container_sync_row(
|
self.assertTrue(cs.container_sync_row(
|
||||||
{'deleted': True,
|
{'deleted': True,
|
||||||
@ -765,7 +765,7 @@ class TestContainerSync(unittest.TestCase):
|
|||||||
|
|
||||||
cs = sync.ContainerSync({}, container_ring=FakeRing(),
|
cs = sync.ContainerSync({}, container_ring=FakeRing(),
|
||||||
object_ring=FakeRing())
|
object_ring=FakeRing())
|
||||||
cs.proxy = 'http://proxy'
|
cs.http_proxies = ['http://proxy']
|
||||||
|
|
||||||
def fake_direct_get_object(node, part, account, container, obj,
|
def fake_direct_get_object(node, part, account, container, obj,
|
||||||
resp_chunk_size=1):
|
resp_chunk_size=1):
|
||||||
@ -912,6 +912,27 @@ class TestContainerSync(unittest.TestCase):
|
|||||||
sync.put_object = orig_put_object
|
sync.put_object = orig_put_object
|
||||||
sync.direct_get_object = orig_direct_get_object
|
sync.direct_get_object = orig_direct_get_object
|
||||||
|
|
||||||
|
def test_select_http_proxy_None(self):
|
||||||
|
cs = sync.ContainerSync(
|
||||||
|
{'sync_proxy': ''}, container_ring=FakeRing(),
|
||||||
|
object_ring=FakeRing())
|
||||||
|
self.assertEqual(cs.select_http_proxy(), None)
|
||||||
|
|
||||||
|
def test_select_http_proxy_one(self):
|
||||||
|
cs = sync.ContainerSync(
|
||||||
|
{'sync_proxy': 'http://one'}, container_ring=FakeRing(),
|
||||||
|
object_ring=FakeRing())
|
||||||
|
self.assertEqual(cs.select_http_proxy(), 'http://one')
|
||||||
|
|
||||||
|
def test_select_http_proxy_multiple(self):
|
||||||
|
cs = sync.ContainerSync(
|
||||||
|
{'sync_proxy': 'http://one,http://two,http://three'},
|
||||||
|
container_ring=FakeRing(),
|
||||||
|
object_ring=FakeRing())
|
||||||
|
self.assertEqual(
|
||||||
|
set(cs.http_proxies),
|
||||||
|
set(['http://one', 'http://two', 'http://three']))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user