ChangBo Guo(gcb) c644308296 Drop python 2.6 support
* Use weakref.WeakSet
* Use memoryview
* Allow test_graceful_death for eventlet

Change-Id: I46651c2e84e2ef0057d338841bf4981e61cdc257
2015-11-26 21:00:29 +08:00

196 lines
6.1 KiB
Python

# Copyright (c) 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
import errno
import json
from multiprocessing import connection
from multiprocessing import managers
import socket
import struct
import weakref
from oslo_rootwrap import wrapper
class RpcJSONEncoder(json.JSONEncoder):
def default(self, o):
# We need to pass bytes unchanged as they are expected in arguments for
# and are result of Popen.communicate()
if isinstance(o, bytes):
return {"__bytes__": base64.b64encode(o).decode('ascii')}
# Handle two exception types relevant to command execution
if isinstance(o, wrapper.NoFilterMatched):
return {"__exception__": "NoFilterMatched"}
elif isinstance(o, wrapper.FilterMatchNotExecutable):
return {"__exception__": "FilterMatchNotExecutable",
"match": o.match}
# Other errors will fail to pass JSON encoding and will be visible on
# client side
else:
return super(RpcJSONEncoder, self).default(o)
# Parse whatever RpcJSONEncoder supplied us with
def rpc_object_hook(obj):
if "__exception__" in obj:
type_name = obj.pop("__exception__")
if type_name not in ("NoFilterMatched", "FilterMatchNotExecutable"):
return obj
exc_type = getattr(wrapper, type_name)
return exc_type(**obj)
elif "__bytes__" in obj:
return base64.b64decode(obj["__bytes__"].encode('ascii'))
else:
return obj
class JsonListener(object):
def __init__(self, address, backlog=1):
self.address = address
self._socket = socket.socket(socket.AF_UNIX)
try:
self._socket.setblocking(True)
self._socket.bind(address)
self._socket.listen(backlog)
except socket.error:
self._socket.close()
raise
self.closed = False
self._accepted = weakref.WeakSet()
def accept(self):
while True:
try:
s, _ = self._socket.accept()
except socket.error as e:
if e.errno in (errno.EINVAL, errno.EBADF):
raise EOFError
elif e.errno != errno.EINTR:
raise
else:
break
s.setblocking(True)
conn = JsonConnection(s)
self._accepted.add(conn)
return conn
def close(self):
if not self.closed:
self._socket.shutdown(socket.SHUT_RDWR)
self._socket.close()
self.closed = True
def get_accepted(self):
return self._accepted
if hasattr(managers.Server, 'accepter'):
# In Python 3 accepter() thread has infinite loop. We break it with
# EOFError, so we should silence this error here.
def silent_accepter(self):
try:
old_accepter(self)
except EOFError:
pass
old_accepter = managers.Server.accepter
managers.Server.accepter = silent_accepter
class JsonConnection(object):
def __init__(self, sock):
sock.setblocking(True)
self._socket = sock
def send_bytes(self, s):
self._socket.sendall(struct.pack('!Q', len(s)))
self._socket.sendall(s)
def recv_bytes(self, maxsize=None):
l = struct.unpack('!Q', self.recvall(8))[0]
if maxsize is not None and l > maxsize:
raise RuntimeError("Too big message received")
s = self.recvall(l)
return s
def send(self, obj):
s = self.dumps(obj)
self.send_bytes(s)
def recv(self):
s = self.recv_bytes()
return self.loads(s)
def close(self):
self._socket.close()
def half_close(self):
self._socket.shutdown(socket.SHUT_RD)
# We have to use slow version of recvall with eventlet because of a bug in
# GreenSocket.recv_into:
# https://bitbucket.org/eventlet/eventlet/pull-request/41
def _recvall_slow(self, size):
remaining = size
res = []
while remaining:
piece = self._socket.recv(remaining)
if not piece:
raise EOFError
res.append(piece)
remaining -= len(piece)
return b''.join(res)
def recvall(self, size):
buf = bytearray(size)
mem = memoryview(buf)
got = 0
while got < size:
piece_size = self._socket.recv_into(mem[got:])
if not piece_size:
raise EOFError
got += piece_size
# bytearray is mostly compatible with bytes and we could avoid copying
# data here, but hmac doesn't like it in Python 3.3 (not in 2.7 or 3.4)
return bytes(buf)
@staticmethod
def dumps(obj):
return json.dumps(obj, cls=RpcJSONEncoder).encode('utf-8')
@staticmethod
def loads(s):
res = json.loads(s.decode('utf-8'), object_hook=rpc_object_hook)
try:
kind = res[0]
except (IndexError, TypeError):
pass
else:
# In Python 2 json returns unicode while multiprocessing needs str
if (kind in ("#TRACEBACK", "#UNSERIALIZABLE") and
not isinstance(res[1], str)):
res[1] = res[1].encode('utf-8', 'replace')
return res
class JsonClient(JsonConnection):
def __init__(self, address, authkey=None):
sock = socket.socket(socket.AF_UNIX)
sock.setblocking(True)
sock.connect(address)
super(JsonClient, self).__init__(sock)
if authkey is not None:
connection.answer_challenge(self, authkey)
connection.deliver_challenge(self, authkey)