Merge "Add support for websocket-proxy"
This commit is contained in:
commit
a85f2f19a8
39
zun/cmd/wsproxy.py
Normal file
39
zun/cmd/wsproxy.py
Normal file
@ -0,0 +1,39 @@
|
||||
# Copyright 2017 Linaro Limited
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_log import log as logging
|
||||
import sys
|
||||
|
||||
from zun.common import config
|
||||
from zun.common import service as zun_service
|
||||
import zun.conf
|
||||
from zun.websocket import websocketproxy
|
||||
|
||||
CONF = zun.conf.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def main():
|
||||
zun_service.prepare_service(sys.argv)
|
||||
config.parse_args(sys.argv)
|
||||
LOG.info("start websocket proxy")
|
||||
|
||||
host = CONF.websocket_proxy.wsproxy_host
|
||||
port = CONF.websocket_proxy.wsproxy_port
|
||||
websocketproxy.ZunWebSocketProxy(
|
||||
listen_host=host,
|
||||
listen_port=port,
|
||||
file_only=True,
|
||||
RequestHandlerClass=websocketproxy.ZunProxyRequestHandler
|
||||
).start_server()
|
@ -475,3 +475,23 @@ class ClassNotFound(NotFound):
|
||||
class ApiVersionsIntersect(ZunException):
|
||||
message = _("Version of %(name)s %(min_ver)s %(max_ver)s intersects "
|
||||
"with another versions.")
|
||||
|
||||
|
||||
class ConnectionFailed(ZunException):
|
||||
msg_fmt = _("Failed to connect to remote host")
|
||||
|
||||
|
||||
class SocketException(ZunException):
|
||||
msg_fmt = _("Socket exceptions")
|
||||
|
||||
|
||||
class InvalidWebsocketUrl(ZunException):
|
||||
msg_fmt = _("Websocket Url invalid")
|
||||
|
||||
|
||||
class InvalidWebsocketToken(ZunException):
|
||||
msg_fmt = _("Websocket token is invalid")
|
||||
|
||||
|
||||
class ValidationError(ZunException):
|
||||
msg_fmt = _("Validation error")
|
||||
|
@ -16,6 +16,7 @@ import six
|
||||
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from zun.common import consts
|
||||
from zun.common import exception
|
||||
@ -401,7 +402,12 @@ class Manager(object):
|
||||
LOG.debug('Get websocket url from the container: %s', container.uuid)
|
||||
try:
|
||||
url = self.driver.get_websocket_url(container)
|
||||
return url
|
||||
token = uuidutils.generate_uuid()
|
||||
access_url = '%s?token=%s' % (CONF.websocket_proxy.base_url, token)
|
||||
container.websocket_url = url
|
||||
container.websocket_token = token
|
||||
container.save(context)
|
||||
return access_url
|
||||
except Exception as e:
|
||||
LOG.error(("Error occurred while calling "
|
||||
"get websocket url function: %s"),
|
||||
|
@ -29,6 +29,7 @@ from zun.conf import profiler
|
||||
from zun.conf import scheduler
|
||||
from zun.conf import services
|
||||
from zun.conf import ssl
|
||||
from zun.conf import websocket_proxy
|
||||
from zun.conf import zun_client
|
||||
|
||||
CONF = cfg.CONF
|
||||
@ -49,3 +50,4 @@ ssl.register_opts(CONF)
|
||||
profiler.register_opts(CONF)
|
||||
neutron_client.register_opts(CONF)
|
||||
network.register_opts(CONF)
|
||||
websocket_proxy.register_opts(CONF)
|
||||
|
92
zun/conf/websocket_proxy.py
Normal file
92
zun/conf/websocket_proxy.py
Normal file
@ -0,0 +1,92 @@
|
||||
# Copyright 2017 Linaro Limited
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
wsproxy_group = cfg.OptGroup("websocket_proxy",
|
||||
title="Websocket Proxy Group",
|
||||
help="""
|
||||
Users use the websocket proxy to connect to containers, instead of
|
||||
connecting to containers directly, hence protects the socket daemon.
|
||||
""")
|
||||
|
||||
wsproxy_opts = [
|
||||
cfg.URIOpt('base_url',
|
||||
default='ws://$wsproxy_host:$wsproxy_port/',
|
||||
help="""
|
||||
The URL an end user would use to connect to the ``zun-wsproxy`` service.
|
||||
|
||||
The ``zun-wsproxy`` service is called with this token enriched URL
|
||||
and establishes the connection to the proper instance.
|
||||
|
||||
Related options:
|
||||
|
||||
* The IP address must be the same as the address to which the
|
||||
``zun-wsproxy`` service is listening (see option ``wsproxy_host``
|
||||
in this section).
|
||||
* The port must be the same as ``wsproxy_port``in this section.
|
||||
"""),
|
||||
cfg.StrOpt('wsproxy_host',
|
||||
default='127.0.0.1',
|
||||
help="""
|
||||
The IP address which is used by the ``zun-wsproxy`` service to listen
|
||||
for incoming requests.
|
||||
|
||||
The ``zun-wsproxy`` service listens on this IP address for incoming
|
||||
connection requests.
|
||||
|
||||
Related options:
|
||||
|
||||
* Ensure that this is the same IP address which is defined in the option
|
||||
``base_url`` of this section or use ``0.0.0.0`` to listen on all addresses.
|
||||
"""),
|
||||
cfg.PortOpt('wsproxy_port',
|
||||
default=6784,
|
||||
help="""
|
||||
The port number which is used by the ``zun-wsproxy`` service to listen
|
||||
for incoming requests.
|
||||
|
||||
The ``zun-wsproxy`` service listens on this port number for incoming
|
||||
connection requests.
|
||||
|
||||
Related options:
|
||||
|
||||
* Ensure that this is the same port number as that defined in the option
|
||||
``base_url`` of this section.
|
||||
"""),
|
||||
cfg.ListOpt('allowed_origins',
|
||||
default=[],
|
||||
help="""
|
||||
Adds list of allowed origins to the console websocket proxy to allow
|
||||
connections from other origin hostnames.
|
||||
Websocket proxy matches the host header with the origin header to
|
||||
prevent cross-site requests. This list specifies if any there are
|
||||
values other than host are allowed in the origin header.
|
||||
|
||||
Possible values:
|
||||
|
||||
* A list where each element is an allowed origin hostnames, else an empty list
|
||||
"""),
|
||||
]
|
||||
|
||||
ALL_OPTS = (wsproxy_opts)
|
||||
|
||||
|
||||
def register_opts(conf):
|
||||
conf.register_group(wsproxy_group)
|
||||
conf.register_opts(wsproxy_opts, group=wsproxy_group)
|
||||
|
||||
|
||||
def list_opts():
|
||||
return {wsproxy_group: ALL_OPTS}
|
@ -453,12 +453,12 @@ class TestManager(base.TestCase):
|
||||
self.context, container, {})
|
||||
|
||||
@mock.patch.object(fake_driver, 'attach')
|
||||
@mock.patch('zun.container.driver.ContainerDriver.get_websocket_url')
|
||||
def test_container_attach(self, mock_attach, mock_getwebsocket_url):
|
||||
@mock.patch.object(Container, 'save')
|
||||
def test_container_attach(self, mock_save, mock_attach):
|
||||
container = Container(self.context, **utils.get_test_container())
|
||||
mock_getwebsocket_url.return_value = "ws://test"
|
||||
mock_attach.return_value = "ws://test"
|
||||
self.compute_manager.container_attach(self.context, container)
|
||||
mock_attach.assert_called_once_with(container)
|
||||
mock_save.assert_called_with(self.context)
|
||||
|
||||
@mock.patch.object(fake_driver, 'attach')
|
||||
def test_container_attach_failed(self, mock_attach):
|
||||
|
@ -79,6 +79,10 @@ class FakeDriver(driver.ContainerDriver):
|
||||
def attach(self, container):
|
||||
pass
|
||||
|
||||
@check_container_id
|
||||
def get_websocket_url(self, container):
|
||||
pass
|
||||
|
||||
@check_container_id
|
||||
def resize(self, container, height, weight):
|
||||
pass
|
||||
|
0
zun/websocket/__init__.py
Normal file
0
zun/websocket/__init__.py
Normal file
44
zun/websocket/websocketclient.py
Normal file
44
zun/websocket/websocketclient.py
Normal file
@ -0,0 +1,44 @@
|
||||
# Copyright 2017 Linaro Limited
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import socket
|
||||
import websocket
|
||||
|
||||
from zun.common import exception
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class WebSocketClient(object):
|
||||
|
||||
def __init__(self, host_url, escape='~',
|
||||
close_wait=0.5):
|
||||
self.escape = escape
|
||||
self.close_wait = close_wait
|
||||
self.host_url = host_url
|
||||
self.cs = None
|
||||
|
||||
def connect(self):
|
||||
url = self.host_url
|
||||
try:
|
||||
self.ws = websocket.create_connection(url,
|
||||
skip_utf8_validation=True)
|
||||
except socket.error as e:
|
||||
raise exception.ConnectionFailed(e)
|
||||
except websocket.WebSocketConnectionClosedException as e:
|
||||
raise exception.ConnectionFailed(e)
|
||||
except websocket.WebSocketBadStatusException as e:
|
||||
raise exception.ConnectionFailed(e)
|
266
zun/websocket/websocketproxy.py
Normal file
266
zun/websocket/websocketproxy.py
Normal file
@ -0,0 +1,266 @@
|
||||
# Copyright (c) 2017 Linaro Limited
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
'''
|
||||
Websocket proxy that is compatible with OpenStack Zun.
|
||||
Leverages websockify.py by Joel Martin
|
||||
'''
|
||||
|
||||
import errno
|
||||
import select
|
||||
import socket
|
||||
import sys
|
||||
import time
|
||||
|
||||
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import uuidutils
|
||||
import six.moves.urllib.parse as urlparse
|
||||
import websockify
|
||||
from zun.common import context
|
||||
from zun.common import exception
|
||||
from zun.common.i18n import _
|
||||
import zun.conf
|
||||
from zun.db import api as db_api
|
||||
from zun.websocket.websocketclient import WebSocketClient
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONF = zun.conf.CONF
|
||||
|
||||
|
||||
class ZunProxyRequestHandlerBase(object):
|
||||
def verify_origin_proto(self, access_url, origin_proto):
|
||||
if not access_url:
|
||||
detail = _("No access_url available."
|
||||
"Cannot validate protocol")
|
||||
raise exception.ValidationError(detail=detail)
|
||||
expected_protos = [urlparse.urlparse(access_url).scheme]
|
||||
# NOTE: For serial consoles the expected protocol could be ws or
|
||||
# wss which correspond to http and https respectively in terms of
|
||||
# security.
|
||||
if 'ws' in expected_protos:
|
||||
expected_protos.append('http')
|
||||
if 'wss' in expected_protos:
|
||||
expected_protos.append('https')
|
||||
|
||||
return origin_proto in expected_protos
|
||||
|
||||
def _send_buffer(self, buff, target, send_all=False):
|
||||
size = len(buff)
|
||||
tosend = size
|
||||
already_sent = 0
|
||||
|
||||
while tosend > 0:
|
||||
try:
|
||||
# i should be able to send a bytearray
|
||||
sent = target.send(buff[already_sent:])
|
||||
if sent == 0:
|
||||
raise RuntimeError('socket connection broken')
|
||||
|
||||
already_sent += sent
|
||||
tosend -= sent
|
||||
|
||||
except socket.error as e:
|
||||
# if full buffers then wait for them to drain and try again
|
||||
if e.errno in [errno.EAGAIN, errno.EWOULDBLOCK]:
|
||||
if send_all:
|
||||
continue
|
||||
return buff[already_sent:]
|
||||
else:
|
||||
raise exception.SocketException(str(e))
|
||||
return None
|
||||
|
||||
def _handle_ins_outs(self, target, ins, outs):
|
||||
'''Handle the select file ins and outs
|
||||
|
||||
handle the operation ins and outs from select
|
||||
'''
|
||||
if self.request in outs:
|
||||
# Send queued target data to the client
|
||||
self.c_pend = self.send_frames(self.cqueue)
|
||||
self.cqueue = []
|
||||
|
||||
if self.request in ins:
|
||||
# Receive client data, decode it, and queue for target
|
||||
bufs, closed = self.recv_frames()
|
||||
self.tqueue.extend(bufs)
|
||||
if closed:
|
||||
self.msg(_("Client closed connection:"
|
||||
"%(host)s:%(port)s") % {
|
||||
'host': self.server.target_host,
|
||||
'port': self.server.target_port})
|
||||
raise self.CClose(closed['code'], closed['reason'])
|
||||
|
||||
if target in outs:
|
||||
while self.tqueue:
|
||||
payload = self.tqueue.pop(0)
|
||||
remaining = self._send_buffer(payload, target)
|
||||
if remaining is not None:
|
||||
self.tqueue.appendleft(remaining)
|
||||
break
|
||||
|
||||
if target in ins:
|
||||
# Receive target data, encode it and queue for client
|
||||
buf = target.recv()
|
||||
if len(buf) == 0:
|
||||
self.msg(_("Client closed connection:"
|
||||
"%(host)s:%(port)s") % {
|
||||
'host': self.server.target_host,
|
||||
'port': self.server.target_port})
|
||||
raise self.CClose(1000, "Target closed")
|
||||
self.cqueue.append(buf)
|
||||
|
||||
def do_proxy(self, target):
|
||||
'''Proxy websocket link
|
||||
|
||||
Proxy client WebSocket to normal target socket.
|
||||
'''
|
||||
self.cqueue = []
|
||||
self.tqueue = []
|
||||
self.c_pend = 0
|
||||
rlist = [self.request, target]
|
||||
|
||||
if self.server.heartbeat:
|
||||
now = time.time()
|
||||
self.heartbeat = now + self.server.heartbeat
|
||||
else:
|
||||
self.heartbeat = None
|
||||
|
||||
while True:
|
||||
wlist = []
|
||||
|
||||
if self.heartbeat is not None:
|
||||
now = time.time()
|
||||
if now > self.heartbeat:
|
||||
self.heartbeat = now + self.server.heartbeat
|
||||
self.send_ping()
|
||||
|
||||
if self.tqueue:
|
||||
wlist.append(target)
|
||||
if self.cqueue or self.c_pend:
|
||||
wlist.append(self.request)
|
||||
try:
|
||||
ins, outs, excepts = select.select(rlist, wlist, [], 1)
|
||||
except (select.error, OSError):
|
||||
exc = sys.exc_info()[1]
|
||||
if hasattr(exc, 'errno'):
|
||||
err = exc.errno
|
||||
else:
|
||||
err = exc[0]
|
||||
if err != errno.EINTR:
|
||||
raise
|
||||
else:
|
||||
continue
|
||||
|
||||
if excepts:
|
||||
raise exception.SocketException()
|
||||
|
||||
self._handle_ins_outs(target, ins, outs)
|
||||
|
||||
def new_websocket_client(self):
|
||||
"""Called after a new WebSocket connection has been established."""
|
||||
# Reopen the eventlet hub to make sure we don't share an epoll
|
||||
# fd with parent and/or siblings, which would be bad
|
||||
from eventlet import hubs
|
||||
hubs.use_hub()
|
||||
|
||||
# The zun expected behavior is to have token
|
||||
# passed to the method GET of the request
|
||||
parse = urlparse.urlparse(self.path)
|
||||
if parse.scheme not in ('http', 'https'):
|
||||
# From a bug in urlparse in Python < 2.7.4 we cannot support
|
||||
# special schemes (cf: http://bugs.python.org/issue9374)
|
||||
if sys.version_info < (2, 7, 4):
|
||||
raise exception.ZunException(
|
||||
_("We do not support scheme '%s' under Python < 2.7.4, "
|
||||
"please use http or https") % parse.scheme)
|
||||
|
||||
query = parse.query
|
||||
token = urlparse.parse_qs(query).get("token", [""]).pop()
|
||||
|
||||
dbapi = db_api._get_dbdriver_instance()
|
||||
ctx = context.get_admin_context(all_tenants=True)
|
||||
self.headerid = self.headers.get("User-Agent")
|
||||
|
||||
if uuidutils.is_uuid_like(self.headerid):
|
||||
container = dbapi.get_container_by_uuid(ctx, self.headerid)
|
||||
else:
|
||||
container = dbapi.get_container_by_name(ctx, self.headerid)
|
||||
|
||||
if token != container.websocket_token:
|
||||
raise exception.InvalidWebsocketToken(token)
|
||||
|
||||
access_url = '%s?token=%s' % (CONF.websocket_proxy.base_url, token)
|
||||
|
||||
# Verify Origin
|
||||
expected_origin_hostname = self.headers.get('Host')
|
||||
if ':' in expected_origin_hostname:
|
||||
e = expected_origin_hostname
|
||||
if '[' in e and ']' in e:
|
||||
expected_origin_hostname = e.split(']')[0][1:]
|
||||
else:
|
||||
expected_origin_hostname = e.split(':')[0]
|
||||
expected_origin_hostnames = CONF.websocket_proxy.allowed_origins
|
||||
expected_origin_hostnames.append(expected_origin_hostname)
|
||||
origin_url = self.headers.get('Origin')
|
||||
|
||||
# missing origin header indicates non-browser client which is OK
|
||||
if origin_url is not None:
|
||||
origin = urlparse.urlparse(origin_url)
|
||||
origin_hostname = origin.hostname
|
||||
origin_scheme = origin.scheme
|
||||
if origin_hostname == '' or origin_scheme == '':
|
||||
detail = _("Origin header not valid.")
|
||||
raise exception.ValidationError(detail)
|
||||
if origin_hostname not in expected_origin_hostnames:
|
||||
detail = _("Origin header does not match this host.")
|
||||
raise exception.ValidationError(detail)
|
||||
if not self.verify_origin_proto(access_url, origin_scheme):
|
||||
detail = _("Origin header protocol does not match this host.")
|
||||
raise exception.ValidationError(detail)
|
||||
|
||||
if container.websocket_url:
|
||||
target_url = container.websocket_url
|
||||
escape = "~"
|
||||
close_wait = 0.5
|
||||
wscls = WebSocketClient(host_url=target_url, escape=escape,
|
||||
close_wait=close_wait)
|
||||
wscls.connect()
|
||||
self.target = wscls
|
||||
else:
|
||||
raise exception.InvalidWebsocketUrl()
|
||||
|
||||
# Start proxying
|
||||
try:
|
||||
self.do_proxy(self.target.ws)
|
||||
except Exception as e:
|
||||
if self.target.ws:
|
||||
self.target.ws.close()
|
||||
self.vmsg(_("%Websocket client or target closed"))
|
||||
raise
|
||||
|
||||
|
||||
class ZunProxyRequestHandler(ZunProxyRequestHandlerBase,
|
||||
websockify.ProxyRequestHandler):
|
||||
def __init__(self, *args, **kwargs):
|
||||
websockify.ProxyRequestHandler.__init__(self, *args, **kwargs)
|
||||
|
||||
def socket(self, *args, **kwargs):
|
||||
return websockify.WebSocketServer.socket(*args, **kwargs)
|
||||
|
||||
|
||||
class ZunWebSocketProxy(websockify.WebSocketProxy):
|
||||
@staticmethod
|
||||
def get_logger():
|
||||
return LOG
|
Loading…
x
Reference in New Issue
Block a user