[packetary] Infrastructure

Introduce infrastructure:
* Executor (The Asynchronous task executor)
* ConnectionsPool (The pool of network connections)
* ResumeableStream - allows resume streaming when error occures

Change-Id: Ia68d60c2b9d685820a4c1916c8f1aa724f3a3f91
Implements: blueprint refactor-local-mirror-scripts
Partial-Bug: #1487077
This commit is contained in:
Bulat Gaifullin 2015-10-20 18:08:10 +03:00
parent 95cd15286f
commit 56689ae7cf
11 changed files with 1036 additions and 21 deletions

View File

@ -15,17 +15,6 @@
# under the License.
"""
test_packetary
----------------------------------
import eventlet
Tests for `packetary` module.
"""
from packetary.tests import base
class TestPacketary(base.TestCase):
def test_something(self):
pass
eventlet.monkey_patch()

View File

@ -0,0 +1,82 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import hashlib
class _HashComposite(object):
"""Combines several hash methods."""
def __init__(self, hash_objects):
self.hash_objects = hash_objects
def update(self, data):
"""Updates the hash objects with the string arg.
For more details see doc of hashlib.update.
"""
for o in self.hash_objects:
o.update(data)
def hexdigest(self):
"""Returns the list of appropriate hexdigests of hash_objects.
For more details see doc of hashlib.hexdigest.
"""
return [o.hexdigest() for o in self.hash_objects]
def _new_composite(methods):
"""Creates new composite method."""
def wrapper():
return _HashComposite([x() for x in methods])
return wrapper
def _checksum(method):
"""Makes function to calculate checksum for stream."""
@functools.wraps(method)
def calculate(stream, chunksize=16 * 1024):
"""Calculates checksum for binary stream.
:param stream: file-like object opened in binary mode.
:return: the checksum of content in terms of method.
"""
s = method()
while True:
chunk = stream.read(chunksize)
if not chunk:
break
s.update(chunk)
return s.hexdigest()
return calculate
md5 = _checksum(hashlib.md5)
sha1 = _checksum(hashlib.sha1)
sha256 = _checksum(hashlib.sha256)
def composite(*methods):
"""Calculate several checksum at one time."""
return _checksum(_new_composite(
[getattr(hashlib, x) for x in methods]
))

View File

@ -0,0 +1,280 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import six
import six.moves.http_client as http_client
import six.moves.urllib.request as urllib_request
import six.moves.urllib_error as urllib_error
import time
from packetary.library.streams import StreamWrapper
logger = logging.getLogger(__package__)
RETRYABLE_ERRORS = (http_client.HTTPException, IOError)
class RangeError(urllib_error.URLError):
pass
class RetryableRequest(urllib_request.Request):
offset = 0
retries_left = 1
start_time = 0
class ResumableResponse(StreamWrapper):
"""The http-response wrapper to add resume ability.
Allows to resume read from same position if connection is lost.
"""
def __init__(self, request, response, opener):
"""Initialises.
:param request: the original http request
:param response: the original http response
:param opener: the instance of urllib.OpenerDirector
"""
super(ResumableResponse, self).__init__(response)
self.request = request
self.opener = opener
def read_chunk(self, chunksize):
"""Overrides super class method."""
while 1:
try:
chunk = self.stream.read(chunksize)
self.request.offset += len(chunk)
return chunk
except RETRYABLE_ERRORS as e:
response = self.opener.error(
self.request.get_type(), self.request,
self.stream, 502, six.text_type(e), self.stream.info()
)
self.stream = response.stream
class RetryHandler(urllib_request.BaseHandler):
"""urllib Handler to add ability for retrying on server errors."""
@staticmethod
def http_request(request):
"""Initialises http request."""
logger.debug("start request: %s", request.get_full_url())
if request.offset > 0:
request.add_header('Range', 'bytes=%d-' % request.offset)
request.start_time = time.time()
return request
def http_response(self, request, response):
"""Wraps response in a ResumableResponse.
Checks that partial request completed successfully.
"""
# the server should response partial content if range is specified
logger.debug(
"finish request: %s - %d (%s), duration - %d ms.",
request.get_full_url(), response.getcode(), response.msg,
int((time.time() - request.start_time) * 1000)
)
if request.offset > 0 and response.getcode() != 206:
raise RangeError("Server does not support ranges.")
return ResumableResponse(request, response, self.parent)
def http_error(self, req, fp, code, msg, hdrs):
"""Checks error code and retries request if it is allowed."""
if code >= 500 and req.retries_left > 0:
req.retries_left -= 1
logger.warning(
"fail request: %s - %d(%s), retries left - %d.",
req.get_full_url(), code, msg, req.retries_left
)
return self.parent.open(req)
https_request = http_request
https_response = http_response
class Connection(object):
"""Helper class to deal with streams."""
def __init__(self, opener, retries_num):
"""Initializes.
:param opener: the instance of urllib.OpenerDirector
:param retries_num: the number of allowed retries
"""
self.opener = opener
self.retries_num = retries_num
def make_request(self, url, offset=0):
"""Makes new http request.
:param url: the remote file`s url
:param offset: the number of bytes from begin, that will be skipped
:return: The new http request
"""
if url.startswith("/"):
url = "file://" + url
request = RetryableRequest(url)
request.retries_left = self.retries_num
request.offset = offset
return request
def open_stream(self, url, offset=0):
"""Opens remote file for streaming.
:param url: the remote file`s url
:param offset: the number of bytes from begin, that will be skipped
"""
request = self.make_request(url, offset)
while 1:
try:
return self.opener.open(request)
except (RangeError, urllib_error.HTTPError):
raise
except RETRYABLE_ERRORS as e:
if request.retries_left <= 0:
raise
request.retries_left -= 1
logger.exception(
"Failed to open url - %s: %s. retries left - %d.",
url, six.text_type(e), request.retries_left
)
def retrieve(self, url, filename, offset=0):
"""Downloads remote file.
:param url: the remote file`s url
:param filename: the file`s name, that includes path on local fs
:param offset: the number of bytes from begin, that will be skipped
"""
self._ensure_dir_exists(filename)
fd = os.open(filename, os.O_CREAT | os.O_WRONLY)
try:
self._copy_stream(fd, url, offset)
except RangeError:
if offset == 0:
raise
logger.warning(
"Failed to resume download, starts from begin: %s", url
)
self._copy_stream(fd, url, 0)
finally:
os.fsync(fd)
os.close(fd)
@staticmethod
def _ensure_dir_exists(dst):
"""Checks that directory exists and creates otherwise."""
target_dir = os.path.dirname(dst)
try:
os.makedirs(target_dir)
except OSError as e:
if e.errno != 17:
raise
def _copy_stream(self, fd, url, offset):
"""Copies remote file to local.
:param fd: the file`s descriptor
:param url: the remote file`s url
:param offset: the number of bytes from begin, that will be skipped
"""
source = self.open_stream(url, offset)
os.ftruncate(fd, offset)
os.lseek(fd, offset, os.SEEK_SET)
chunk_size = 16 * 1024
while 1:
chunk = source.read(chunk_size)
if not chunk:
break
os.write(fd, chunk)
class ConnectionContext(object):
"""Helper class acquire and release connection within context."""
def __init__(self, connection, on_exit):
self.connection = connection
self.on_exit = on_exit
def __enter__(self):
return self.connection
def __exit__(self, *_):
self.on_exit(self.connection)
class ConnectionsPool(object):
"""Controls the number of simultaneously opened connections."""
MIN_CONNECTIONS_COUNT = 1
def __init__(self, count=0, proxy=None, secure_proxy=None, retries_num=0):
"""Initialises.
:param count: the number of allowed simultaneously connections
:param proxy: the url of proxy for http-connections
:param secure_proxy: the url of proxy for https-connections
:param retries_num: the number of allowed retries
"""
if proxy:
proxies = {
"http": proxy,
"https": secure_proxy or proxy,
}
else:
proxies = None
opener = urllib_request.build_opener(
RetryHandler(),
urllib_request.ProxyHandler(proxies)
)
limit = max(count, self.MIN_CONNECTIONS_COUNT)
connections = six.moves.queue.Queue()
while limit > 0:
connections.put(Connection(opener, retries_num))
limit -= 1
self.free = connections
def get(self, timeout=None):
"""Gets the free connection.
Blocks in case if there is no free connections.
:param timeout: the timeout in seconds to wait.
by default infinity waiting.
"""
return ConnectionContext(
self.free.get(timeout=timeout), self._release
)
def _release(self, connection):
"""Puts back connection to free connections."""
self.free.put(connection)

View File

@ -0,0 +1,82 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import with_statement
import logging
import six
from eventlet.greenpool import GreenPool
logger = logging.getLogger(__package__)
class AsynchronousSection(object):
"""Allows calling function asynchronously with waiting on exit."""
MIN_POOL_SIZE = 1
def __init__(self, size=0, ignore_errors_num=0):
"""Initialises.
:param size: the max number of parallel tasks
:param ignore_errors_num:
number of errors which does not stop the execution
"""
self.executor = GreenPool(max(size, self.MIN_POOL_SIZE))
self.ignore_errors_num = ignore_errors_num
self.errors = 0
self.tasks = set()
def __enter__(self):
self.errors = 0
return self
def __exit__(self, etype, *_):
self.wait(etype is not None)
def execute(self, func, *args, **kwargs):
"""Calls function asynchronously."""
if 0 <= self.ignore_errors_num < self.errors:
raise RuntimeError("Too many errors.")
gt = self.executor.spawn(func, *args, **kwargs)
self.tasks.add(gt)
gt.link(self.on_complete)
def on_complete(self, gt):
"""Callback to handle task completion."""
try:
gt.wait()
except Exception as e:
self.errors += 1
logger.exception("Task failed: %s", six.text_type(e))
finally:
self.tasks.discard(gt)
def wait(self, ignore_errors=False):
"""Waits until all tasks will be completed.
Do not use directly, will be called from context manager.
"""
self.executor.waitall()
if not ignore_errors and self.errors > 0:
raise RuntimeError(
"Operations completed with errors. See log for more details."
)

View File

@ -0,0 +1,125 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import zlib
class StreamWrapper(object):
"""Helper class to implement stream wrappers.
It is base-class for Streamers,
that provides functionality to transform stream on the fly.
The wrapped stream may return data more that required,
the extra read data will be kept in the internal buffer till
next read.
"""
CHUNK_SIZE = 1024
def __init__(self, stream):
"""Initializes.
:param stream: file-like object opened in binary mode.
"""
self.stream = stream
self.unread_tail = b""
def __getattr__(self, item):
return getattr(self.stream, item)
def _read_tail(self):
tmp = self.unread_tail
self.unread_tail = b""
return tmp
def _align_chunk(self, chunk, size):
self.unread_tail = chunk[size:]
return chunk[:size]
def read_chunk(self, chunksize):
"""Overrides this method to change default behaviour."""
return self.stream.read(chunksize)
def read(self, size=-1):
result = self._read_tail()
if size < 0:
while True:
chunk = self.read_chunk(self.CHUNK_SIZE)
if not chunk:
break
result += chunk
else:
if len(result) > size:
result = self._align_chunk(result, size)
size -= len(result)
while size > 0:
chunk = self.read_chunk(max(self.CHUNK_SIZE, size))
if not chunk:
break
if len(chunk) > size:
chunk = self._align_chunk(chunk, size)
size -= len(chunk)
result += chunk
return result
def readline(self):
pos = self.unread_tail.find(b"\n")
if pos >= 0:
line = self._align_chunk(self.unread_tail, pos + 1)
else:
line = self._read_tail()
while True:
chunk = self.read_chunk(self.CHUNK_SIZE)
if not chunk:
break
pos = chunk.find(b"\n")
if pos >= 0:
line += self._align_chunk(chunk, pos + 1)
break
line += chunk
return line
def readlines(self):
while True:
line = self.readline()
if not line:
break
yield line
def __iter__(self):
return self.readlines()
class GzipDecompress(StreamWrapper):
"""The decompress stream."""
def __init__(self, stream):
super(GzipDecompress, self).__init__(stream)
# Magic parameter makes zlib module understand gzip header
# http://stackoverflow.com/questions/1838699/how-can-i-decompress-a-gzip-stream-with-zlib
# This works on cpython and pypy, but not jython.
self.decompress = zlib.decompressobj(16 + zlib.MAX_WBITS)
def read_chunk(self, chunksize):
if self.decompress.unconsumed_tail:
return self.decompress.decompress(
self.decompress.unconsumed_tail, chunksize
)
chunk = self.stream.read(chunksize)
if not chunk:
return self.decompress.flush()
return self.decompress.decompress(chunk, chunksize)

View File

@ -0,0 +1,50 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
from packetary.library import checksum
from packetary.tests import base
class TestChecksum(base.TestCase):
def test_checksum(self):
stream = six.BytesIO(b"line1\nline2\nline3\n")
checksums = {
checksum.md5: "cc3d5ed5fda53dfa81ea6aa951d7e1fe",
checksum.sha1: "8c84f6f36dd2230d3e9c954fa436e5fda90b1957",
checksum.sha256: "66663af9c7aa341431a8ee2ff27b72"
"abd06c9218f517bb6fef948e4803c19e03"
}
for chunksize in (8, 256):
for algo, expected in six.iteritems(checksums):
stream.seek(0)
self.assertEqual(
expected, algo(stream, chunksize)
)
def test_composite(self):
stream = six.BytesIO(b"line1\nline2\nline3\n")
result = checksum.composite('md5', 'sha1', 'sha256')(stream)
self.assertEqual(
[
"cc3d5ed5fda53dfa81ea6aa951d7e1fe",
"8c84f6f36dd2230d3e9c954fa436e5fda90b1957",
"66663af9c7aa341431a8ee2ff27b72"
"abd06c9218f517bb6fef948e4803c19e03"
],
result
)

View File

@ -0,0 +1,249 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import six
import time
from packetary.library import connections
from packetary.tests import base
class TestConnectionsPool(base.TestCase):
def test_get_connection(self):
pool = connections.ConnectionsPool(count=2)
self.assertEqual(2, pool.free.qsize())
with pool.get():
self.assertEqual(1, pool.free.qsize())
self.assertEqual(2, pool.free.qsize())
def _check_proxies(self, pool, http_proxy, https_proxy):
with pool.get() as c:
for h in c.opener.handlers:
if isinstance(h, connections.urllib_request.ProxyHandler):
self.assertEqual(
(http_proxy, https_proxy),
(h.proxies["http"], h.proxies["https"])
)
break
else:
self.fail("ProxyHandler should be in list of handlers.")
def test_set_proxy(self):
pool = connections.ConnectionsPool(count=1, proxy="http://localhost")
self._check_proxies(pool, "http://localhost", "http://localhost")
pool = connections.ConnectionsPool(
proxy="http://localhost", secure_proxy="https://localhost")
self._check_proxies(pool, "http://localhost", "https://localhost")
def test_reliability(self):
pool = connections.ConnectionsPool(count=0, retries_num=2)
self.assertEqual(1, pool.free.qsize())
with pool.get() as c:
self.assertEqual(2, c.retries_num)
for h in c.opener.handlers:
if isinstance(h, connections.RetryHandler):
break
else:
self.fail("RetryHandler should be in list of handlers.")
class TestConnection(base.TestCase):
def setUp(self):
super(TestConnection, self).setUp()
self.connection = connections.Connection(mock.MagicMock(), 2)
def test_make_request(self):
request = self.connection.make_request("/test/file", 0)
self.assertIsInstance(request, connections.RetryableRequest)
self.assertEqual("file:///test/file", request.get_full_url())
self.assertEqual(0, request.offset)
self.assertEqual(2, request.retries_left)
request2 = self.connection.make_request("http://server/path", 100)
self.assertEqual("http://server/path", request2.get_full_url())
self.assertEqual(100, request2.offset)
def test_open_stream(self):
self.connection.open_stream("/test/file")
self.assertEqual(1, self.connection.opener.open.call_count)
args = self.connection.opener.open.call_args[0]
self.assertIsInstance(args[0], connections.RetryableRequest)
self.assertEqual(2, args[0].retries_left)
@mock.patch("packetary.library.connections.logger")
def test_retries_on_io_error(self, logger):
self.connection.opener.open.side_effect = [
IOError("I/O error"),
mock.MagicMock()
]
self.connection.open_stream("/test/file")
self.assertEqual(2, self.connection.opener.open.call_count)
logger.exception.assert_called_with(
"Failed to open url - %s: %s. retries left - %d.",
"/test/file", "I/O error", 1
)
self.connection.opener.open.side_effect = IOError("I/O error")
with self.assertRaises(IOError):
self.connection.open_stream("/test/file")
logger.exception.assert_called_with(
"Failed to open url - %s: %s. retries left - %d.",
"/test/file", "I/O error", 0
)
def test_raise_other_errors(self):
self.connection.opener.open.side_effect = \
connections.urllib_error.HTTPError("", 500, "", {}, None)
with self.assertRaises(connections.urllib_error.URLError):
self.connection.open_stream("/test/file")
self.assertEqual(1, self.connection.opener.open.call_count)
@mock.patch("packetary.library.connections.os")
def test_retrieve_from_offset(self, os):
os.path.mkdirs.side_effect = OSError(17, "")
os.open.return_value = 1
response = mock.MagicMock()
self.connection.opener.open.return_value = response
response.read.side_effect = [b"test", b""]
self.connection.retrieve("/file/src", "/file/dst", 10)
os.lseek.assert_called_once_with(1, 10, os.SEEK_SET)
os.ftruncate.assert_called_once_with(1, 10)
self.assertEqual(1, os.write.call_count)
os.fsync.assert_called_once_with(1)
os.close.assert_called_once_with(1)
@mock.patch.multiple(
"packetary.library.connections",
logger=mock.DEFAULT,
os=mock.DEFAULT
)
def test_retrieve_from_offset_fail(self, os, logger):
os.path.mkdirs.side_effect = OSError(17, "")
os.open.return_value = 1
response = mock.MagicMock()
self.connection.opener.open.side_effect = [
connections.RangeError("error"), response
]
response.read.side_effect = [b"test", b""]
self.connection.retrieve("/file/src", "/file/dst", 10)
logger.warning.assert_called_once_with(
"Failed to resume download, starts from begin: %s",
"/file/src"
)
os.lseek.assert_called_once_with(1, 0, os.SEEK_SET)
os.ftruncate.assert_called_once_with(1, 0)
self.assertEqual(1, os.write.call_count)
os.fsync.assert_called_once_with(1)
os.close.assert_called_once_with(1)
@mock.patch("packetary.library.connections.logger")
class TestRetryHandler(base.TestCase):
def setUp(self):
super(TestRetryHandler, self).setUp()
self.handler = connections.RetryHandler()
self.handler.add_parent(mock.MagicMock())
def test_start_request(self, logger):
request = mock.MagicMock()
request.offset = 0
request.get_full_url.return_value = "/file/test"
request = self.handler.http_request(request)
request.start_time <= time.time()
logger.debug.assert_called_with("start request: %s", "/file/test")
request.offset = 1
request = self.handler.http_request(request)
request.add_header.assert_called_once_with('Range', 'bytes=1-')
def test_handle_response(self, logger):
request = mock.MagicMock()
request.offset = 0
request.start_time.__rsub__.return_value = 0.01
request.get_full_url.return_value = "/file/test"
response = mock.MagicMock()
response.getcode.return_value = 200
response.msg = "test"
r = self.handler.http_response(request, response)
self.assertIsInstance(r, connections.ResumableResponse)
logger.debug.assert_called_with(
"finish request: %s - %d (%s), duration - %d ms.",
"/file/test", 200, "test", 10
)
def test_handle_partial_response(self, _):
request = mock.MagicMock()
request.offset = 1
request.get_full_url.return_value = "/file/test"
response = mock.MagicMock()
response.getcode.return_value = 200
response.msg = "test"
with self.assertRaises(connections.RangeError):
self.handler.http_response(request, response)
response.getcode.return_value = 206
self.handler.http_response(request, response)
def test_error(self, logger):
request = mock.MagicMock()
request.get_full_url.return_value = "/test"
request.retries_left = 1
self.handler.http_error(
request, mock.MagicMock(), 500, "error", mock.MagicMock()
)
logger.warning.assert_called_with(
"fail request: %s - %d(%s), retries left - %d.",
"/test", 500, "error", 0
)
self.handler.http_error(
request, mock.MagicMock(), 500, "error", mock.MagicMock()
)
self.handler.parent.open.assert_called_once_with(request)
class TestResumeableResponse(base.TestCase):
def setUp(self):
super(TestResumeableResponse, self).setUp()
self.request = mock.MagicMock()
self.opener = mock.MagicMock()
self.stream = mock.MagicMock()
def test_resume_read(self):
self.request.offset = 0
response = connections.ResumableResponse(
self.request,
self.stream,
self.opener
)
self.stream.read.side_effect = [
b"chunk1", IOError(), b"chunk2", b""
]
self.opener.error.return_value = response
data = response.read()
self.assertEqual(b"chunk1chunk2", data)
self.assertEqual(12, self.request.offset)
self.assertEqual(1, self.opener.error.call_count)
def test_read(self):
self.request.offset = 0
response = connections.ResumableResponse(
self.request,
six.BytesIO(b"line1\nline2\nline3\n"),
self.opener
)
self.assertEqual(
b"line1\nline2\nline3\n", response.read()
)

View File

@ -0,0 +1,64 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import threading
import time
from packetary.library import executor
from packetary.tests import base
def _raise_value_error(*_):
raise ValueError("error")
@mock.patch("packetary.library.executor.logger")
class TestAsynchronousSection(base.TestCase):
def setUp(self):
super(TestAsynchronousSection, self).setUp()
self.results = []
def test_isolation(self, _):
section1 = executor.AsynchronousSection()
section2 = executor.AsynchronousSection()
event = threading.Event()
section1.execute(event.wait)
section2.execute(time.sleep, 0)
section2.wait()
event.set()
section1.wait()
def test_ignore_errors(self, logger):
section = executor.AsynchronousSection(ignore_errors_num=1)
section.execute(_raise_value_error)
section.execute(time.sleep, 0)
section.wait(ignore_errors=True)
self.assertEqual(1, section.errors)
logger.exception.assert_called_with(
"Task failed: %s", "error"
)
def test_fail_if_too_many_errors(self, _):
section = executor.AsynchronousSection(ignore_errors_num=0)
section.execute(_raise_value_error)
section.wait(ignore_errors=True)
with self.assertRaisesRegexp(RuntimeError, "Too many errors"):
section.execute(time.sleep, 0)
with self.assertRaisesRegexp(
RuntimeError, "Operations completed with errors"):
section.wait(ignore_errors=False)

View File

@ -0,0 +1,93 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import gzip
import six
from packetary.library import streams
from packetary.tests import base
class TestBufferedStream(base.TestCase):
def setUp(self):
super(TestBufferedStream, self).setUp()
self.stream = streams.StreamWrapper(
six.BytesIO(b"line1\nline2\nline3\n")
)
def test_read(self):
self.stream.CHUNK_SIZE = 10
chunk = self.stream.read(5)
self.assertEqual(b"line1", chunk)
self.assertEqual(b"\nline", self.stream.unread_tail)
chunk = self.stream.read(1024)
self.assertEqual(b"\nline2\nline3\n", chunk)
self.assertEqual(b"", self.stream.unread_tail)
def test_readline(self):
self.stream.CHUNK_SIZE = 12
chunk = self.stream.readline()
self.assertEqual(b"line1\n", chunk)
self.assertEqual(b"line2\n", self.stream.unread_tail)
lines = list(self.stream.readlines())
self.assertEqual([b"line2\n", b"line3\n"], lines)
self.assertEqual(b"", self.stream.unread_tail)
def test_readlines(self):
self.stream.CHUNK_SIZE = 12
lines = list(self.stream.readlines())
self.assertEqual(
[b"line1\n", b"line2\n", b"line3\n"],
lines)
class TestGzipDecompress(base.TestCase):
@classmethod
def setUpClass(cls):
cls.gzipped = six.BytesIO()
gz = gzip.GzipFile(fileobj=cls.gzipped, mode="w")
gz.write(b"line1\nline2\nline3\n")
gz.flush()
gz.close()
def setUp(self):
super(TestGzipDecompress, self).setUp()
self.gzipped.seek(0)
self.stream = streams.GzipDecompress(self.gzipped)
def test_read(self):
chunk = self.stream.read(5)
self.assertEqual(b"line1", chunk)
self.assertEqual(b"\nline2\nline3\n", self.stream.unread_tail)
chunk = self.stream.read(1024)
self.assertEqual(b"\nline2\nline3\n", chunk)
self.assertEqual(b"", self.stream.unread_tail)
def test_readline(self):
self.stream.CHUNK_SIZE = 12
chunk = self.stream.readline()
self.assertEqual(b"line1\n", chunk)
self.assertEqual(b"line2\nl", self.stream.unread_tail)
lines = list(self.stream.readlines())
self.assertEqual([b"line2\n", b"line3\n"], lines)
self.assertEqual(b"", self.stream.unread_tail)
def test_readlines(self):
self.stream.CHUNK_SIZE = 12
lines = list(self.stream.readlines())
self.assertEqual(
[b"line1\n", b"line2\n", b"line3\n"],
lines)

View File

@ -2,5 +2,6 @@
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
pbr>=1.6
Babel>=1.3
eventlet>=0.17
pbr>=1.6

14
tox.ini
View File

@ -1,6 +1,6 @@
[tox]
minversion = 1.6
envlist = py34,py26,py27,pep8
envlist = py34,py27,py26,pep8
skipsdist = True
[testenv]
@ -9,22 +9,22 @@ install_command = pip install -U {opts} {packages}
setenv =
VIRTUAL_ENV={envdir}
deps = -r{toxinidir}/test-requirements.txt
commands = python setup.py test --slowest --testr-args='{posargs}'
commands = python setup.py test --slowest --testr-args='{posargs:packetary}'
[testenv:pep8]
commands = flake8
commands = flake8 {posargs:packetary}
[testenv:venv]
commands = {posargs}
commands = {posargs:packetary}
[testenv:cover]
commands = python setup.py test --coverage --testr-args='{posargs}'
commands = python setup.py test --coverage --testr-args='{posargs:packetary}'
[testenv:docs]
commands = python setup.py build_sphinx
[testenv:debug]
commands = oslo_debug_helper {posargs}
commands = oslo_debug_helper {posargs:packetary}
[flake8]
# E123, E125 skipped as they are invalid PEP-8.
@ -32,4 +32,4 @@ commands = oslo_debug_helper {posargs}
show-source = True
ignore = E123,E125
builtins = _
exclude=.venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,build,util,.idea
exclude=*egg,*lib/python*,*openstack/common*,.git,.idea,.tox,.venv,build,dist,doc