From d2452b830b5fb559d848cf4e73fa666b5b168707 Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Fri, 11 Sep 2015 17:50:37 +0200 Subject: [PATCH 01/33] Really PoC --- solard/setup.py | 51 ++++++++ solard/solard/__init__.py | 0 solard/solard/client.py | 75 ++++++++++++ solard/solard/core.py | 237 ++++++++++++++++++++++++++++++++++++ solard/solard/logger.py | 33 +++++ solard/solard/tcp_client.py | 142 +++++++++++++++++++++ solard/solard/tcp_server.py | 214 ++++++++++++++++++++++++++++++++ 7 files changed, 752 insertions(+) create mode 100644 solard/setup.py create mode 100644 solard/solard/__init__.py create mode 100644 solard/solard/client.py create mode 100644 solard/solard/core.py create mode 100755 solard/solard/logger.py create mode 100644 solard/solard/tcp_client.py create mode 100644 solard/solard/tcp_server.py diff --git a/solard/setup.py b/solard/setup.py new file mode 100644 index 00000000..923de51c --- /dev/null +++ b/solard/setup.py @@ -0,0 +1,51 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License attached# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See then +# License for the specific language governing permissions and limitations +# under the License. + +import os + +from setuptools import find_packages +from setuptools import setup + + +# def find_requires(): +# prj_root = os.path.dirname(os.path.realpath(__file__)) +# requirements = [] +# with open(u'{0}/requirements.txt'.format(prj_root), 'r') as reqs: +# requirements = reqs.readlines() +# return requirements + + +setup( + name='solard', + version='0.0.1', + description='Deployment tool daemon', + long_description="""Deployment tool daemon""", + classifiers=[ + "Development Status :: 1 - Beta", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python", + "Programming Language :: Python :: 2.6", + "Programming Language :: Python :: 2.7", + "Topic :: System :: Software Distribution"], + author='Mirantis Inc.', + author_email='product@mirantis.com', + url='http://mirantis.com', + keywords='deployment', + packages=find_packages(), + zip_safe=False, + # install_requires=find_requires(), + include_package_data=True, + # entry_points={ + # 'console_scripts': [ + # 'solar = solar.cli.main:run']} +) diff --git a/solard/solard/__init__.py b/solard/solard/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/solard/solard/client.py b/solard/solard/client.py new file mode 100644 index 00000000..5c1146cb --- /dev/null +++ b/solard/solard/client.py @@ -0,0 +1,75 @@ +import msgpack +import os + +# TODO: handle errors + +class SolardClient(object): + + read_buffer = 4096 + + def __init__(self, transport): + self.transport = transport + + def run(self, *args, **kwargs): + send = self.transport.send({'m': 'run', 'args': args, 'kwargs': kwargs}) + resp = self.transport.resp() + return resp + + def copy_directory(self, _from, _to, use_sudo=False): + i = 0 # TODO: very very naive + for root, dirs, files in os.walk(_from): + for name in files: + _from = os.path.join(root, name) + _to = os.path.join(root.replace(_from, _to), name) + self._copy_file(_from, _to, use_sudo) + # resp = self.transport.resp(close=False) + resp = True + i += 1 # TODO: this is very very naive + if not resp: + break + for _ in xrange(i): + resp = self.transport.resp(close=False) + if not resp: + return resp + self.transport.disconnect() + return True + + def _copy_file(self, _from, _to, use_sudo=False): + transport = self.transport + f_size = os.stat(_from).st_size + send = transport.send({'m': 'copy_file', + 'args': (_to, f_size), + 's': True}) + transport.send_stream_start(add_size=False) + to_read = f_size + with open(_from, 'rb') as f: + while to_read > 0: + data = f.read(self.read_buffer) # expose sendfile there + transport.send_stream_data(data) + to_read -= len(data) + assert to_read == 0 + return True + + def copy_file(self, _from, _to, use_sudo=False): + self._copy_file(_from, _to, use_sudo) + return self.transport.resp() + + + def copy(self, _from, _to, use_sudo=False): + if os.path.isdir(_from): + resp = self.copy_directory(_from, _to, use_sudo) + else: + resp = self.copy_file(_from, _to, use_sudo) + return resp + + + + + +if __name__ == '__main__': + import time + from solard.tcp_client import SolardTCPClient + c = SolardClient(transport=SolardTCPClient('localhost', 5555)) + print c.run('hostname') + print c.copy('/tmp/a', '/tmp/bbb/a.%s' % (time.time())) + print c.copy('/tmp/bbb', '/tmp/s/ccc%s' % (time.time())) diff --git a/solard/solard/core.py b/solard/solard/core.py new file mode 100644 index 00000000..d6f332a4 --- /dev/null +++ b/solard/solard/core.py @@ -0,0 +1,237 @@ +import os +import random +import string +from contextlib import nested +from fabric import api as fabric_api +from subprocess import check_output +import shlex +from itertools import takewhile + + + +# XXX: not used for now vvv + +# def common_path(paths, sep=os.path.sep): +# paths = [x.split(sep) for x in paths] +# dirs = zip(*(p for p in paths)) +# return [x[0] for x in takewhile(lambda x: all(n == x[0] for n in x[1:]), dirs)] + + +# class SolardContext(object): + +# def __init__(self): +# self._dirs = {} +# self._files = {} + +# def file(self, path): +# try: +# return self._files[path] +# except KeyError: +# if self.is_safe_file(path): +# cls = SolardSafeFile +# else: +# cls = SolardFile +# self._files[path] = f = cls(self, path) +# return f + +# def dir(self, path): +# try: +# return self._dirs[path] +# except KeyError: +# self._dirs[path] = solard_dir = SolardDir(self, path) +# return solard_dir + +# def is_safe_file(self, path): +# dirname = os.path.dirname(path) +# common = SolardContext.common_path(dirname, self._dirs.keys()) +# if common not in ((), ('/', )): +# return False +# return True + +# def is_safe_dir(self, path): +# common = SolardContext.common_path(path, self._dirs.keys()) +# if common not in ((), ('/', )): +# return False +# return True + +# @staticmethod +# def common_path(path, paths, sep=os.path.sep): +# all_paths = paths + [path] +# paths = [x.split(sep) for x in all_paths] +# dirs = zip(*(p for p in all_paths)) +# return tuple(x[0] for x in takewhile(lambda x: all(n == x[0] for n in x[1:]), dirs)) + + +# class SolardSafeFile(object): + +# def __init__(self, context, target): +# self._f = None +# self._rnd = 'solar' + ''.join((random.choice(string.ascii_lowercase) for _ in xrange(6))) +# self._path = target +# self._safe_path = self._path + '_' + self._rnd + +# def open(self): +# self._f = open(self._safe_path, 'wb') + +# def write(self, data): +# return self._f.write(data) + +# def close(self): +# self._f.close() + +# def finish(self): +# self.close() +# os.rename(self._safe_path, self._path) + + +# class SolardFile(object): + +# def __init__(self, context, target): +# self._f = None +# self._path = target + +# def open(self): +# self._f = open(self._path, 'wb') + +# def write(self, data): +# self._f.write(data) + +# def close(self): +# self._f.close() + +# def finish(self): +# self.close() + + +# class SolardSafeDir(object): + +# def __init__(self, context, target): +# self._rnd = 'solar' + ''.join((random.choice(string.ascii_lowercase) for _ in xrange(6))) +# self._path = target +# self._safe_path = self._path + '_' + self._rnd + +# def start(self): +# os.makedirs(self._safe_path) + +# def finish(self): +# os.rename(self._safe_path, self._path) + + +# class SolardDir(object): + +# def __init__(self, context, target): +# self._path = target + +# def start(self): +# os.makedirs(self._path) + +# def finish(self): +# pass + +# XXX: not used for now ^^^ + +class SolardContext(object): + + def __init__(self): + self.files = {} + + def file(self, path): + try: + return self.files[path] + except KeyError: + self.files[path] = r = SolardFile(self, path) + return r + + +class SolardFile(object): + + def __init__(self, context, target): + self.ctx = context + self._rnd = 'solar' + ''.join((random.choice(string.ascii_lowercase) for _ in xrange(6))) + self._path = target + self._f = None + self._safe_path = self._path + '_' + self._rnd + + def open(self): + dirname = os.path.dirname(self._safe_path) + if not os.path.exists(dirname): + os.makedirs(dirname) + if self._f is None: + self._f = open(self._safe_path, 'wb') + + def write(self, data): + self._f.write(data) + + def finish(self): + self._f.close() + self._f = None + os.rename(self._safe_path, self._path) + + +class SolardIface(object): + + @staticmethod + def run(solard_context, cmd, **kwargs): + # return check_output(shlex.split(cmd)) + executor = fabric_api.local + if kwargs.get('use_sudo', False): + cmd = 'sudo ' + cmd + + managers = [] + + cwd = kwargs.get('cwd') + if cwd: + managers.append(fabric_api.cd(kwargs['cwd'])) + + env = kwargs.get('env') + if env: + managers.append(fabric_api.shell_env(**kwargs['env'])) + + if kwargs.get('warn_only', False): + managers.append(fabric_api.warn_only()) + + with nested(*managers): + out = executor(cmd, capture=True) + if out.failed: + raise Exception("Remote failed") + return out.stdout + + @staticmethod + def copy_file(solard_context, stream_reader, path, size=None): + f = SolardIface.file_start(solard_context, path) + rdr = stream_reader(size) + for data in rdr: + f.write(data) + SolardIface.file_end(solard_context, path) + return True + + # # TODO: not used YET fully + # @staticmethod + # def dir_start(solard_context, path): + # solard_dir = solard_context.dir(path) + # solard_dir.start() + # return solard_dir + + # @staticmethod + # def dir_finish(solard_context, path): + # solard_dir = solard_context.dir(path) + # solard_dir.finish() + # return True + + @staticmethod + def file_start(solard_context, path): + solard_file = solard_context.file(path) + solard_file.open() + return solard_file + + @staticmethod + def file_put_data(solard_context, path, data): + solard_file = solard_context.file(path) + return solard_file.write(data) + + @staticmethod + def file_end(solard_context, path): + solard_file = solard_context.file(path) + solard_file.finish() + return True + diff --git a/solard/solard/logger.py b/solard/solard/logger.py new file mode 100755 index 00000000..da01693a --- /dev/null +++ b/solard/solard/logger.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging + + +def __init_logger(): + logger = logging.getLogger("Wassi") + logger.setLevel(logging.DEBUG) + + # formatter = logging.Formatter( + # '%(levelname)s:%(asctime)s - %(name)s - %(message)s') + + formatter = logging.Formatter('%(levelname)s:%(asctime)s - %(message)s') + + ch = logging.StreamHandler() + ch.setLevel(logging.DEBUG) + ch.setFormatter(formatter) + + logger.addHandler(ch) + return logger + + +__global_logger = None + + +def get_logger(): + global __global_logger + if not __global_logger: + __global_logger = __init_logger() + return __global_logger + +logger = get_logger() diff --git a/solard/solard/tcp_client.py b/solard/solard/tcp_client.py new file mode 100644 index 00000000..72ac47ee --- /dev/null +++ b/solard/solard/tcp_client.py @@ -0,0 +1,142 @@ +import msgpack + +import socket +import errno +import struct + + +HDR = '= 0 + if not size: + break + self._wrote = False + return msgpack.unpackb(''.join(d)) + + def _read_stream(self, size=None): + if size is None: + try: + size = struct.unpack(HDR, self.sock.recv(HDR_SIZE))[0] + except: + raise ReadFailure("Can't read header data") + while True: + b = min(size, SERVER_BUFF) + curr = self.sock.recv(b) + size -= len(curr) + assert size >= 0 + yield curr + if not size: + break + + def _write(self, **kwargs): + assert self._wrote is False + _data = msgpack.packb(kwargs) + size = len(_data) + hdr = struct.pack(HDR, size) + self.sock.sendall(hdr + _data) + self._wrote = True + + def _write_ok(self, res): + # logger.debug("Ok sent") + data = {'st': 2, 'res': res} + self._write(**data) + + def _write_ok_gen(self, res): + data = {'st': 20, 'res': res} + self._write(**data) + + # def _write_ok_stream(self, res): + # data = {'st': 30, 'res': res} + # self._write(**data) + + # def _write_stream_data(self, data): + # self.sock.sendall(data) + + def _write_gen_end(self): + data = {'st': 21, 'res': None} + self._write(**data) + + def _write_failure(self, exception, reason, tb=""): + data = {'st': 0, 'exception': exception, 'reason': reason, 'tb': tb} + self._write(**data) + + def _write_err(self, error): + logger.info("Client error: %s" % error) + data = {'st': 1, 'error': error} + self._write(**data) + + def process(self): + try: + known_type = INT_DEFAULT_REPLY_TYPE + input_data = self._read() + if not input_data: + return False + method = input_data['m'] + meth = getattr(SolardIface, method) + is_stream = input_data.get('s', False) + logger.debug("Going to run %r", method) + if is_stream: + res = meth(self.ctx, self._read_stream, *input_data.get('args', ()), **input_data.get('kwargs', {})) + else: + res = meth(self.ctx, *input_data.get('args', ()), **input_data.get('kwargs', {})) + if isinstance(res, GeneratorType): + known_type = INT_GENERATOR_REPLY_TYPE + try: + for curr in res: + self._wrote = False + self._write_ok_gen(curr) + except: + raise + finally: + try: + self._wrote = False + self._write_gen_end() + except Exception: # ignore if eng gen couldn't be send + pass + self._wrote = True + else: + # if input_data.get('silent', False): + self._write_ok(res) + + except ReadFailure: + return False + except Exception as ex: + if self._wrote: + if known_type == INT_GENERATOR_REPLY_TYPE: + errno_ = getattr(ex, 'errno', None) + if errno_ in (errno.EPIPE, errno.ECONNRESET): + logger.debug( + "Client disconnected during generator based reply") + else: + logger.debug("Error during generator based reply") + raise + else: + logger.error("Already wrote data, but got exception") + raise + else: + logger.exception("Got exception") + self.handle_exception() + return True + + def handle_exception(self): + exc_type, exc_value, exc_traceback = sys.exc_info() + tb = traceback.format_exception(exc_type, exc_value, exc_traceback) + + reason = str(exc_value) + if not reason: + reason = exc_type.__name__ + try: + self._write_failure(exc_type.__name__, reason, tb) + except: + logger.warn("Failure when sending error response") + raise + finally: + logger.exception("Got exception") + + + +class SolardTCPServer(StreamServer): + + allow_reuse_address = True + + def __init__(self, *args, **kwargs): + StreamServer.__init__(self, *args, **kwargs) + + def handle(self, sock, address): + try: + logger.debug("New from %s:%d" % address) + h = SolardTCPHandler(sock, address) + while True: + if not h.process(): + logger.debug("End from %s:%d" % address) + break + else: + logger.debug("Waiting for more from %s:%d" % address) + try: + sock.shutdown(socket.SHUT_RDWR) + except Exception: + pass + finally: + sock.close() + + + + + +if __name__ == '__main__': + s = SolardTCPServer(('0.0.0.0', 5555)) + s.serve_forever() From 0cb07cfe2da24ab3607a0365ba1ba7a679e6c2bc Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Fri, 11 Sep 2015 17:58:58 +0200 Subject: [PATCH 02/33] Small adjustments --- solard/solard/client.py | 10 ++-------- solard/solard/tcp_server.py | 1 - 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/solard/solard/client.py b/solard/solard/client.py index 5c1146cb..a97714b9 100644 --- a/solard/solard/client.py +++ b/solard/solard/client.py @@ -16,21 +16,15 @@ class SolardClient(object): return resp def copy_directory(self, _from, _to, use_sudo=False): - i = 0 # TODO: very very naive + # dir should open context on remote, and sync all files as one req/resp for root, dirs, files in os.walk(_from): for name in files: _from = os.path.join(root, name) _to = os.path.join(root.replace(_from, _to), name) self._copy_file(_from, _to, use_sudo) - # resp = self.transport.resp(close=False) - resp = True - i += 1 # TODO: this is very very naive + resp = self.transport.resp(close=False) if not resp: break - for _ in xrange(i): - resp = self.transport.resp(close=False) - if not resp: - return resp self.transport.disconnect() return True diff --git a/solard/solard/tcp_server.py b/solard/solard/tcp_server.py index 9743ceaa..b16fbae8 100644 --- a/solard/solard/tcp_server.py +++ b/solard/solard/tcp_server.py @@ -141,7 +141,6 @@ class SolardTCPHandler(object): pass self._wrote = True else: - # if input_data.get('silent', False): self._write_ok(res) except ReadFailure: From a758faa05c1fc5b5641a8b2767d8f8a8d194c656 Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Fri, 11 Sep 2015 18:39:57 +0200 Subject: [PATCH 03/33] Speed improvements --- solard/solard/client.py | 24 ++++++++++++++---------- solard/solard/core.py | 2 +- solard/solard/tcp_client.py | 1 - solard/solard/tcp_server.py | 3 ++- 4 files changed, 17 insertions(+), 13 deletions(-) diff --git a/solard/solard/client.py b/solard/solard/client.py index a97714b9..082979e9 100644 --- a/solard/solard/client.py +++ b/solard/solard/client.py @@ -17,23 +17,27 @@ class SolardClient(object): def copy_directory(self, _from, _to, use_sudo=False): # dir should open context on remote, and sync all files as one req/resp + to_cp_files = [] for root, dirs, files in os.walk(_from): for name in files: _from = os.path.join(root, name) _to = os.path.join(root.replace(_from, _to), name) - self._copy_file(_from, _to, use_sudo) - resp = self.transport.resp(close=False) - if not resp: - break - self.transport.disconnect() - return True + to_cp_files.append((_from, _to)) + for _from, _to in to_cp_files: + self._copy_file(_from, _to, use_sudo, empty_ok_resp=True) + resp = self.transport.resp() + return resp - def _copy_file(self, _from, _to, use_sudo=False): + def _copy_file(self, _from, _to, use_sudo=False, empty_ok_resp=False): + # TODO: separate empty_ok_resp from copy args transport = self.transport f_size = os.stat(_from).st_size - send = transport.send({'m': 'copy_file', - 'args': (_to, f_size), - 's': True}) + data = {'m': 'copy_file', + 'args': (_to, use_sudo, f_size), + 's': True} + if empty_ok_resp: + data['empty_ok_resp'] = True + send = transport.send(data) transport.send_stream_start(add_size=False) to_read = f_size with open(_from, 'rb') as f: diff --git a/solard/solard/core.py b/solard/solard/core.py index d6f332a4..4ca02b25 100644 --- a/solard/solard/core.py +++ b/solard/solard/core.py @@ -197,7 +197,7 @@ class SolardIface(object): return out.stdout @staticmethod - def copy_file(solard_context, stream_reader, path, size=None): + def copy_file(solard_context, stream_reader, path, use_sudo=False, size=None): f = SolardIface.file_start(solard_context, path) rdr = stream_reader(size) for data in rdr: diff --git a/solard/solard/tcp_client.py b/solard/solard/tcp_client.py index 72ac47ee..9ebd1be6 100644 --- a/solard/solard/tcp_client.py +++ b/solard/solard/tcp_client.py @@ -33,7 +33,6 @@ class SolardTCPClient(object): self.sock = None self._streaming = False - def connect(self): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: diff --git a/solard/solard/tcp_server.py b/solard/solard/tcp_server.py index b16fbae8..32ea9f56 100644 --- a/solard/solard/tcp_server.py +++ b/solard/solard/tcp_server.py @@ -141,7 +141,8 @@ class SolardTCPHandler(object): pass self._wrote = True else: - self._write_ok(res) + if not input_data.get('empty_ok_resp', False): + self._write_ok(res) except ReadFailure: return False From 0bde23e9f940f04ac7460acda8691936db4d6f59 Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Mon, 14 Sep 2015 12:48:16 +0200 Subject: [PATCH 04/33] More changes --- solard/solard/client.py | 39 ++++++++++++++++++++++++------------- solard/solard/core.py | 15 ++++++++++++++ solard/solard/tcp_client.py | 14 ++++++++++++- solard/solard/tcp_server.py | 18 +++++++++++++---- 4 files changed, 67 insertions(+), 19 deletions(-) diff --git a/solard/solard/client.py b/solard/solard/client.py index 082979e9..39056a2b 100644 --- a/solard/solard/client.py +++ b/solard/solard/client.py @@ -18,26 +18,40 @@ class SolardClient(object): def copy_directory(self, _from, _to, use_sudo=False): # dir should open context on remote, and sync all files as one req/resp to_cp_files = [] + transport = self.transport for root, dirs, files in os.walk(_from): for name in files: _from = os.path.join(root, name) _to = os.path.join(root.replace(_from, _to), name) - to_cp_files.append((_from, _to)) - for _from, _to in to_cp_files: - self._copy_file(_from, _to, use_sudo, empty_ok_resp=True) - resp = self.transport.resp() + size = os.stat(_from).st_size + to_cp_files.append((_from, _to, size)) + tos = [(x[1], size) for x in to_cp_files] + total_size = sum((x[1] for x in tos)) + data = {'m': 'copy_files', + 'args': (tos, use_sudo, total_size), + 's': True} + _ = transport.send(data) + transport.send_stream_start() + for _from, _to, _size in to_cp_files: + # sock = transport.send_stream_cont(add_size=_size) + sock = transport.send_stream_cont() + with open(_from, 'rb') as f: + while _size > 0: + data = f.read(self.read_buffer) + transport.send_stream_data(data) + _size -= len(data) + assert _size == 0 # maybe somehow below ? + transport.send_stream_end() + resp = transport.resp() return resp - def _copy_file(self, _from, _to, use_sudo=False, empty_ok_resp=False): - # TODO: separate empty_ok_resp from copy args + def copy_file(self, _from, _to, use_sudo=False): transport = self.transport f_size = os.stat(_from).st_size data = {'m': 'copy_file', 'args': (_to, use_sudo, f_size), 's': True} - if empty_ok_resp: - data['empty_ok_resp'] = True - send = transport.send(data) + _ = transport.send(data) transport.send_stream_start(add_size=False) to_read = f_size with open(_from, 'rb') as f: @@ -46,13 +60,9 @@ class SolardClient(object): transport.send_stream_data(data) to_read -= len(data) assert to_read == 0 - return True - - def copy_file(self, _from, _to, use_sudo=False): - self._copy_file(_from, _to, use_sudo) + transport.send_stream_end() return self.transport.resp() - def copy(self, _from, _to, use_sudo=False): if os.path.isdir(_from): resp = self.copy_directory(_from, _to, use_sudo) @@ -70,4 +80,5 @@ if __name__ == '__main__': c = SolardClient(transport=SolardTCPClient('localhost', 5555)) print c.run('hostname') print c.copy('/tmp/a', '/tmp/bbb/a.%s' % (time.time())) + print c.copy('/tmp/a', '/tmp/bbb/b.%s' % (time.time())) print c.copy('/tmp/bbb', '/tmp/s/ccc%s' % (time.time())) diff --git a/solard/solard/core.py b/solard/solard/core.py index 4ca02b25..57ea8e8d 100644 --- a/solard/solard/core.py +++ b/solard/solard/core.py @@ -8,6 +8,8 @@ import shlex from itertools import takewhile +from solard.logger import logger + # XXX: not used for now vvv @@ -205,6 +207,19 @@ class SolardIface(object): SolardIface.file_end(solard_context, path) return True + @staticmethod + def copy_files(solard_context, stream_reader, paths, use_sudo=False, total_size=None): + for _to, _size in paths: + logger.debug("Starting %s size=%d", _to, _size) + f = SolardIface.file_start(solard_context, _to) + rdr = stream_reader(_size) + for data in rdr: + f.write(data) + SolardIface.file_end(solard_context, _to) + logger.debug("Done %s size=%d", _to, _size) + return True + + # # TODO: not used YET fully # @staticmethod # def dir_start(solard_context, path): diff --git a/solard/solard/tcp_client.py b/solard/solard/tcp_client.py index 9ebd1be6..f0b283ef 100644 --- a/solard/solard/tcp_client.py +++ b/solard/solard/tcp_client.py @@ -74,10 +74,22 @@ class SolardTCPClient(object): self.sock.sendall(hdr) return self.sock + def send_stream_cont(self, add_size=False): + assert self._streaming is True + if add_size is not False: + hdr = struct.pack(HDR, add_size) + self.sock.sendall(hdr) + return self.sock + + def send_stream_end(self): + assert self._streaming is True + self._streaming = False + return self.sock + def send_stream_data(self, data): assert self._streaming is True self.sock.sendall(data) # TODO: expose sendfile easier - self._streaming = False + # self._streaming = False def read(self): sock = self.sock diff --git a/solard/solard/tcp_server.py b/solard/solard/tcp_server.py index 32ea9f56..5d585b38 100644 --- a/solard/solard/tcp_server.py +++ b/solard/solard/tcp_server.py @@ -51,13 +51,21 @@ class SolardTCPHandler(object): while True: b = min(size, SERVER_BUFF) curr = self.sock.recv(b) + if not curr: + raise ReadFailure("No data") d.append(curr) size -= len(curr) assert size >= 0 if not size: break self._wrote = False - return msgpack.unpackb(''.join(d)) + try: + return msgpack.unpackb(''.join(d)) + except: + print '--------------------' + print repr(d) + print '--------------------' + raise def _read_stream(self, size=None): if size is None: @@ -68,6 +76,9 @@ class SolardTCPHandler(object): while True: b = min(size, SERVER_BUFF) curr = self.sock.recv(b) + if not curr: + if size > 0: + raise ReadFailure("Expected more data") size -= len(curr) assert size >= 0 yield curr @@ -141,9 +152,8 @@ class SolardTCPHandler(object): pass self._wrote = True else: - if not input_data.get('empty_ok_resp', False): - self._write_ok(res) - + # if not input_data.get('empty_ok_resp', False): + self._write_ok(res) except ReadFailure: return False except Exception as ex: From 810f1aed9e8aa525bfe1af360865e8ef7f8c50c5 Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Mon, 14 Sep 2015 14:27:48 +0200 Subject: [PATCH 05/33] Added naive forks --- solard/solard/client.py | 10 +++++-- solard/solard/tcp_client.py | 9 ++++++ solard/solard/tcp_server.py | 56 +++++++++++++++++++++++++++++++++++-- 3 files changed, 70 insertions(+), 5 deletions(-) diff --git a/solard/solard/client.py b/solard/solard/client.py index 39056a2b..426e2356 100644 --- a/solard/solard/client.py +++ b/solard/solard/client.py @@ -7,8 +7,13 @@ class SolardClient(object): read_buffer = 4096 - def __init__(self, transport): + def __init__(self, auth, transport): + self.auth = auth self.transport = transport + self.make_auth() + + def make_auth(self): + self.transport.auth = self.auth def run(self, *args, **kwargs): send = self.transport.send({'m': 'run', 'args': args, 'kwargs': kwargs}) @@ -77,8 +82,9 @@ class SolardClient(object): if __name__ == '__main__': import time from solard.tcp_client import SolardTCPClient - c = SolardClient(transport=SolardTCPClient('localhost', 5555)) + c = SolardClient(auth={'user': 'pigmej', 'auth': 'password'}, transport=SolardTCPClient('localhost', 5555)) print c.run('hostname') + print c.run('whoami') print c.copy('/tmp/a', '/tmp/bbb/a.%s' % (time.time())) print c.copy('/tmp/a', '/tmp/bbb/b.%s' % (time.time())) print c.copy('/tmp/bbb', '/tmp/s/ccc%s' % (time.time())) diff --git a/solard/solard/tcp_client.py b/solard/solard/tcp_client.py index f0b283ef..2238b2e7 100644 --- a/solard/solard/tcp_client.py +++ b/solard/solard/tcp_client.py @@ -31,6 +31,7 @@ class SolardTCPClient(object): # self._connect_timeout = kwargs.get("connect_timeout", None) self._socket_timeout = kwargs.get("socket_timeout", None) self.sock = None + self.auth = None self._streaming = False def connect(self): @@ -43,8 +44,16 @@ class SolardTCPClient(object): raise else: self.sock = sock + if not self.initialize_with_auth(): + self.sock = None + raise ClientException("Auth failed") return sock + def initialize_with_auth(self): + self.send(self.auth) + resp = self.resp(close=False) + return resp + def disconnect(self): sock = self.sock try: diff --git a/solard/solard/tcp_server.py b/solard/solard/tcp_server.py index 5d585b38..0738eb52 100644 --- a/solard/solard/tcp_server.py +++ b/solard/solard/tcp_server.py @@ -10,7 +10,8 @@ import struct import errno import sys import traceback - +import pwd +import os from types import GeneratorType from solard.logger import logger @@ -39,7 +40,9 @@ class SolardTCPHandler(object): self.sock = sock self.address = address self.ctx = SolardContext() + self.auth = None self._wrote = False + self.forked = False def _read(self): # TODO: client closed connection @@ -122,6 +125,42 @@ class SolardTCPHandler(object): data = {'st': 1, 'error': error} self._write(**data) + def make_auth(self): + # it's responsible for: + # - checking auth + # - forking if needed + auth_data = self._read() + if not auth_data: + self._write_ok(False) + return False + req_user = auth_data.get('user') + if not req_user: + self._write_ok(False) + return False + proc_user = pwd.getpwuid(os.getuid())[0] + logger.debug("Requested user %r", req_user) + # we may add there anything we want, checking in file etc + valid = auth_data.get('auth') == 'password' + if not valid: + self._write_ok(False) + return False + if req_user == proc_user: + self._write_ok(True) + return True + # fork there + child_pid = os.fork() + if child_pid == 0: + pw_uid = pwd.getpwnam(req_user).pw_uid + pw_gid = pwd.getpwuid(pw_uid).pw_gid + os.setgid(pw_gid) + os.setuid(pw_uid) + logger.debug("Child forked %d", os.getpid()) + self.forked = True + self._write_ok(True) + return True + return None + + def process(self): try: known_type = INT_DEFAULT_REPLY_TYPE @@ -199,9 +238,18 @@ class SolardTCPServer(StreamServer): StreamServer.__init__(self, *args, **kwargs) def handle(self, sock, address): + close = True + h = SolardTCPHandler(sock, address) try: logger.debug("New from %s:%d" % address) - h = SolardTCPHandler(sock, address) + auth_state = h.make_auth() + if auth_state is False: + logger.debug("Failed auth") + return + if auth_state is None: + # child forked + close = False + return while True: if not h.process(): logger.debug("End from %s:%d" % address) @@ -214,7 +262,9 @@ class SolardTCPServer(StreamServer): pass finally: sock.close() - + if h.forked: + # if forked we can safely exit now + sys.exit() From 8e6a373abcd8ee84160b6ef5f7cea19ac607ec23 Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Mon, 14 Sep 2015 14:33:57 +0200 Subject: [PATCH 06/33] Added licenses --- solard/solard/core.py | 15 ++++++++++++++- solard/solard/logger.py | 14 ++++++++++++-- solard/solard/tcp_client.py | 13 +++++++++++++ solard/solard/tcp_server.py | 18 ++++++++++++++++-- 4 files changed, 55 insertions(+), 5 deletions(-) diff --git a/solard/solard/core.py b/solard/solard/core.py index 57ea8e8d..38d23ce5 100644 --- a/solard/solard/core.py +++ b/solard/solard/core.py @@ -1,3 +1,16 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License attached# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See then +# License for the specific language governing permissions and limitations +# under the License. + import os import random import string @@ -208,7 +221,7 @@ class SolardIface(object): return True @staticmethod - def copy_files(solard_context, stream_reader, paths, use_sudo=False, total_size=None): + def copy_files(solard_context, stream_reader, paths, use_sudo=False): for _to, _size in paths: logger.debug("Starting %s size=%d", _to, _size) f = SolardIface.file_start(solard_context, _to) diff --git a/solard/solard/logger.py b/solard/solard/logger.py index da01693a..67a1993e 100755 --- a/solard/solard/logger.py +++ b/solard/solard/logger.py @@ -1,5 +1,15 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License attached# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See then +# License for the specific language governing permissions and limitations +# under the License. import logging diff --git a/solard/solard/tcp_client.py b/solard/solard/tcp_client.py index 2238b2e7..b083c9b7 100644 --- a/solard/solard/tcp_client.py +++ b/solard/solard/tcp_client.py @@ -1,3 +1,16 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License attached# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See then +# License for the specific language governing permissions and limitations +# under the License. + import msgpack import socket diff --git a/solard/solard/tcp_server.py b/solard/solard/tcp_server.py index 0738eb52..2a91c3b7 100644 --- a/solard/solard/tcp_server.py +++ b/solard/solard/tcp_server.py @@ -1,3 +1,16 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License attached# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See then +# License for the specific language governing permissions and limitations +# under the License. + from gevent import monkey monkey.patch_all() @@ -147,6 +160,9 @@ class SolardTCPHandler(object): if req_user == proc_user: self._write_ok(True) return True + # we may not want to fork there + if auth_data.get('no_fork'): + return True # fork there child_pid = os.fork() if child_pid == 0: @@ -267,8 +283,6 @@ class SolardTCPServer(StreamServer): sys.exit() - - if __name__ == '__main__': s = SolardTCPServer(('0.0.0.0', 5555)) s.serve_forever() From 19bb679bc0f127d2f688030d9bfb4b6494ba8ccf Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Mon, 14 Sep 2015 14:39:51 +0200 Subject: [PATCH 07/33] Changed logger --- solard/solard/logger.py | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/solard/solard/logger.py b/solard/solard/logger.py index 67a1993e..02438dc2 100755 --- a/solard/solard/logger.py +++ b/solard/solard/logger.py @@ -14,20 +14,14 @@ import logging -def __init_logger(): - logger = logging.getLogger("Wassi") +def __setup_logger(): + logger = logging.getLogger("solard") logger.setLevel(logging.DEBUG) - - # formatter = logging.Formatter( - # '%(levelname)s:%(asctime)s - %(name)s - %(message)s') - - formatter = logging.Formatter('%(levelname)s:%(asctime)s - %(message)s') - - ch = logging.StreamHandler() - ch.setLevel(logging.DEBUG) - ch.setFormatter(formatter) - - logger.addHandler(ch) + formatter = logging.Formatter('%(asctime)s %(levelname)s %(funcName)s (%(filename)s::%(lineno)s)::%(message)s') + stream = logging.StreamHandler() + stream.setLevel(logging.DEBUG) + stream.setFormatter(formatter) + logger.addHandler(stream) return logger @@ -37,7 +31,7 @@ __global_logger = None def get_logger(): global __global_logger if not __global_logger: - __global_logger = __init_logger() + __global_logger = __setup_logger() return __global_logger logger = get_logger() From af4deb23dd7600a7ad59e0458ab31f375fc16e35 Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Mon, 14 Sep 2015 17:03:28 +0200 Subject: [PATCH 08/33] sudo should work now --- solard/solard/client.py | 37 +++++++++++++++++++++++++------------ solard/solard/core.py | 4 ++-- solard/solard/tcp_server.py | 7 +++++-- 3 files changed, 32 insertions(+), 16 deletions(-) diff --git a/solard/solard/client.py b/solard/solard/client.py index 426e2356..b171b52f 100644 --- a/solard/solard/client.py +++ b/solard/solard/client.py @@ -7,24 +7,36 @@ class SolardClient(object): read_buffer = 4096 - def __init__(self, auth, transport): + def __init__(self, auth, transport_args, transport_class): self.auth = auth - self.transport = transport + self.sudo_transport = transport_class(*transport_args) + self.normal_transport = transport_class(*transport_args) self.make_auth() def make_auth(self): - self.transport.auth = self.auth + self.normal_transport.auth = dict(self.auth) + self.sudo_transport.auth = dict(self.auth) + self.sudo_transport.auth['sudo'] = True def run(self, *args, **kwargs): - send = self.transport.send({'m': 'run', 'args': args, 'kwargs': kwargs}) - resp = self.transport.resp() + if kwargs.get('use_sudo'): + transport = self.transport(use_sudo=True) + else: + transport = self.transport(use_sudo=False) + send = transport.send({'m': 'run', 'args': args, 'kwargs': kwargs}) + resp = transport.resp() return resp + def transport(self, use_sudo): + if use_sudo: + return self.sudo_transport + return self.normal_transport + def copy_directory(self, _from, _to, use_sudo=False): # dir should open context on remote, and sync all files as one req/resp to_cp_files = [] - transport = self.transport - for root, dirs, files in os.walk(_from): + transport = self.transport(use_sudo) + for root, _, files in os.walk(_from): for name in files: _from = os.path.join(root, name) _to = os.path.join(root.replace(_from, _to), name) @@ -51,7 +63,7 @@ class SolardClient(object): return resp def copy_file(self, _from, _to, use_sudo=False): - transport = self.transport + transport = self.transport(use_sudo) f_size = os.stat(_from).st_size data = {'m': 'copy_file', 'args': (_to, use_sudo, f_size), @@ -82,9 +94,10 @@ class SolardClient(object): if __name__ == '__main__': import time from solard.tcp_client import SolardTCPClient - c = SolardClient(auth={'user': 'pigmej', 'auth': 'password'}, transport=SolardTCPClient('localhost', 5555)) + c = SolardClient(auth={'user': 'pigmej', 'auth': 'password'}, transport_args=('localhost', 5555), transport_class=SolardTCPClient) print c.run('hostname') print c.run('whoami') - print c.copy('/tmp/a', '/tmp/bbb/a.%s' % (time.time())) - print c.copy('/tmp/a', '/tmp/bbb/b.%s' % (time.time())) - print c.copy('/tmp/bbb', '/tmp/s/ccc%s' % (time.time())) + print c.run('whoami', use_sudo=True) + # print c.copy('/tmp/a', '/tmp/bbb/a.%s' % (time.time())) + # print c.copy('/tmp/a', '/tmp/bbb/b.%s' % (time.time())) + # print c.copy('/tmp/bbb', '/tmp/s/ccc%s' % (time.time())) diff --git a/solard/solard/core.py b/solard/solard/core.py index 38d23ce5..3f971892 100644 --- a/solard/solard/core.py +++ b/solard/solard/core.py @@ -189,8 +189,8 @@ class SolardIface(object): def run(solard_context, cmd, **kwargs): # return check_output(shlex.split(cmd)) executor = fabric_api.local - if kwargs.get('use_sudo', False): - cmd = 'sudo ' + cmd + # if kwargs.get('use_sudo', False): + # cmd = 'sudo ' + cmd managers = [] diff --git a/solard/solard/tcp_server.py b/solard/solard/tcp_server.py index 2a91c3b7..ea4166e2 100644 --- a/solard/solard/tcp_server.py +++ b/solard/solard/tcp_server.py @@ -152,7 +152,9 @@ class SolardTCPHandler(object): return False proc_user = pwd.getpwuid(os.getuid())[0] logger.debug("Requested user %r", req_user) + # TODO: # we may add there anything we want, checking in file etc + # for now it's just `password` valid = auth_data.get('auth') == 'password' if not valid: self._write_ok(False) @@ -160,8 +162,9 @@ class SolardTCPHandler(object): if req_user == proc_user: self._write_ok(True) return True - # we may not want to fork there - if auth_data.get('no_fork'): + # TODO: very naive + if auth_data.get('sudo'): + self._write_ok(True) return True # fork there child_pid = os.fork() From 1c8d20b95ab6743fa023d3a471a040c01f0ef594 Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Mon, 14 Sep 2015 17:05:25 +0200 Subject: [PATCH 09/33] Removed 'use_sudo' from solard core --- solard/solard/client.py | 4 ++-- solard/solard/core.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/solard/solard/client.py b/solard/solard/client.py index b171b52f..f322fce1 100644 --- a/solard/solard/client.py +++ b/solard/solard/client.py @@ -45,7 +45,7 @@ class SolardClient(object): tos = [(x[1], size) for x in to_cp_files] total_size = sum((x[1] for x in tos)) data = {'m': 'copy_files', - 'args': (tos, use_sudo, total_size), + 'args': (tos, total_size), 's': True} _ = transport.send(data) transport.send_stream_start() @@ -66,7 +66,7 @@ class SolardClient(object): transport = self.transport(use_sudo) f_size = os.stat(_from).st_size data = {'m': 'copy_file', - 'args': (_to, use_sudo, f_size), + 'args': (_to, f_size), 's': True} _ = transport.send(data) transport.send_stream_start(add_size=False) diff --git a/solard/solard/core.py b/solard/solard/core.py index 3f971892..ff73384b 100644 --- a/solard/solard/core.py +++ b/solard/solard/core.py @@ -212,7 +212,7 @@ class SolardIface(object): return out.stdout @staticmethod - def copy_file(solard_context, stream_reader, path, use_sudo=False, size=None): + def copy_file(solard_context, stream_reader, path, size=None): f = SolardIface.file_start(solard_context, path) rdr = stream_reader(size) for data in rdr: @@ -221,7 +221,7 @@ class SolardIface(object): return True @staticmethod - def copy_files(solard_context, stream_reader, paths, use_sudo=False): + def copy_files(solard_context, stream_reader, paths): for _to, _size in paths: logger.debug("Starting %s size=%d", _to, _size) f = SolardIface.file_start(solard_context, _to) From 01306997c11e7d46d81f18ac98618eaadb6db1fa Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Tue, 15 Sep 2015 09:59:11 +0200 Subject: [PATCH 10/33] Added requirements --- solard/requirements.txt | 2 ++ solard/setup.py | 19 ++++++++----------- 2 files changed, 10 insertions(+), 11 deletions(-) create mode 100644 solard/requirements.txt diff --git a/solard/requirements.txt b/solard/requirements.txt new file mode 100644 index 00000000..d4dfb93f --- /dev/null +++ b/solard/requirements.txt @@ -0,0 +1,2 @@ +gevent>=1.0.2 +msgpack-python>=0.4.6 diff --git a/solard/setup.py b/solard/setup.py index 923de51c..9cfe92e0 100644 --- a/solard/setup.py +++ b/solard/setup.py @@ -17,12 +17,12 @@ from setuptools import find_packages from setuptools import setup -# def find_requires(): -# prj_root = os.path.dirname(os.path.realpath(__file__)) -# requirements = [] -# with open(u'{0}/requirements.txt'.format(prj_root), 'r') as reqs: -# requirements = reqs.readlines() -# return requirements +def find_requires(): + prj_root = os.path.dirname(os.path.realpath(__file__)) + requirements = [] + with open(u'{0}/requirements.txt'.format(prj_root), 'r') as reqs: + requirements = reqs.readlines() + return requirements setup( @@ -43,9 +43,6 @@ setup( keywords='deployment', packages=find_packages(), zip_safe=False, - # install_requires=find_requires(), - include_package_data=True, - # entry_points={ - # 'console_scripts': [ - # 'solar = solar.cli.main:run']} + install_requires=find_requires(), + include_package_data=True ) From f5dde495390880619f5ad8eded193bdbe4e7c510 Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Tue, 15 Sep 2015 17:03:09 +0200 Subject: [PATCH 11/33] tcp_core with consts --- solard/solard/tcp_client.py | 19 ++++++++++++------- solard/solard/tcp_core.py | 27 +++++++++++++++++++++++++++ solard/solard/tcp_server.py | 16 ++++++---------- 3 files changed, 45 insertions(+), 17 deletions(-) create mode 100644 solard/solard/tcp_core.py diff --git a/solard/solard/tcp_client.py b/solard/solard/tcp_client.py index b083c9b7..da1b1cda 100644 --- a/solard/solard/tcp_client.py +++ b/solard/solard/tcp_client.py @@ -17,9 +17,8 @@ import socket import errno import struct +from solard.tcp_core import * -HDR = ' Date: Fri, 25 Sep 2015 13:30:34 +0200 Subject: [PATCH 12/33] transport_class defaults to tcp --- solard/solard/client.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/solard/solard/client.py b/solard/solard/client.py index f322fce1..00cdd29f 100644 --- a/solard/solard/client.py +++ b/solard/solard/client.py @@ -7,7 +7,9 @@ class SolardClient(object): read_buffer = 4096 - def __init__(self, auth, transport_args, transport_class): + def __init__(self, auth, transport_args, transport_class=None): + if transport_class is None: + transport_class = SolardTCPClient self.auth = auth self.sudo_transport = transport_class(*transport_args) self.normal_transport = transport_class(*transport_args) From fdd44e49b1cfa0b9d04b75af68a3df1bce8f5da8 Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Fri, 25 Sep 2015 13:52:59 +0200 Subject: [PATCH 13/33] missing import --- solard/solard/client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/solard/solard/client.py b/solard/solard/client.py index 00cdd29f..f3f38ede 100644 --- a/solard/solard/client.py +++ b/solard/solard/client.py @@ -9,6 +9,7 @@ class SolardClient(object): def __init__(self, auth, transport_args, transport_class=None): if transport_class is None: + from solard.tcp_client import SolardTCPClient transport_class = SolardTCPClient self.auth = auth self.sudo_transport = transport_class(*transport_args) From 05119d68d17ceddef98c19b5469fb60e9184ce92 Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Fri, 25 Sep 2015 18:08:14 +0200 Subject: [PATCH 14/33] solard_transport added --- solar/solar/core/actions.py | 7 ++- .../solar/core/transports/solard_transport.py | 62 +++++++++++++++++++ solard/solard/client.py | 4 +- solard/solard/core.py | 3 +- 4 files changed, 71 insertions(+), 5 deletions(-) create mode 100644 solar/solar/core/transports/solard_transport.py diff --git a/solar/solar/core/actions.py b/solar/solar/core/actions.py index 84385346..deff8f69 100644 --- a/solar/solar/core/actions.py +++ b/solar/solar/core/actions.py @@ -17,11 +17,14 @@ import handlers from solar.core.transports.ssh import SSHSyncTransport, SSHRunTransport # from solar.core.transports.rsync import RsyncSyncTransport +from solar.core.transports.solard_transport import SolardRunTransport, SolardSyncTransport _default_transports = { - 'sync': SSHSyncTransport, + # 'sync': SSHSyncTransport, # 'sync': RsyncSyncTransport, - 'run': SSHRunTransport + # 'run': SSHRunTransport + 'run': SolardRunTransport, + 'sync': SolardSyncTransport } def resource_action(resource, action): diff --git a/solar/solar/core/transports/solard_transport.py b/solar/solar/core/transports/solard_transport.py new file mode 100644 index 00000000..0ca72346 --- /dev/null +++ b/solar/solar/core/transports/solard_transport.py @@ -0,0 +1,62 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from solard.client import SolardClient + +from solar.core.transports.base import RunTransport, SyncTransport, Executor +from solar.core.log import log + + +class SolardTransport(object): + + + def get_client(self, resource): + transport = self.get_transport_data(resource) + host = resource.ip() + user = transport['user'] + port = transport['port'] + auth = transport['password'] + transport_class = transport.get('transport_class') + client = SolardClient(auth={'user': user, 'auth': auth}, + transport_args=(host, port), + transport_class=transport_class) + return client + + +class SolardSyncTransport(SyncTransport, SolardTransport): + + preffered_transport_name = 'solard' + + def copy(self, resource, _from, _to, use_sudo=False): + log.debug("Solard copy: %s -> %s", _from, _to) + + client = self.get_client(resource) + executor = lambda transport: client.copy(_from, _to, use_sudo) + executor = Executor(resource=resource, + executor=executor, + params=(_from, _to, use_sudo)) + self.executors.append(executor) + + + +class SolardRunTransport(RunTransport, SolardTransport): + + preffered_transport_name = 'solard' + + def run(self, resource, *args, **kwargs): + log.debug("Solard run: %s", args) + client = self.get_client(resource) + return client.run(' '.join(args), **kwargs) + diff --git a/solard/solard/client.py b/solard/solard/client.py index f3f38ede..f71f25cf 100644 --- a/solard/solard/client.py +++ b/solard/solard/client.py @@ -97,10 +97,10 @@ class SolardClient(object): if __name__ == '__main__': import time from solard.tcp_client import SolardTCPClient - c = SolardClient(auth={'user': 'pigmej', 'auth': 'password'}, transport_args=('localhost', 5555), transport_class=SolardTCPClient) + c = SolardClient(auth={'user': 'vagrant', 'auth': 'password'}, transport_args=('10.0.0.3', 5555), transport_class=SolardTCPClient) print c.run('hostname') print c.run('whoami') print c.run('whoami', use_sudo=True) - # print c.copy('/tmp/a', '/tmp/bbb/a.%s' % (time.time())) + print c.copy('/vagrant/library', '/tmp') # print c.copy('/tmp/a', '/tmp/bbb/b.%s' % (time.time())) # print c.copy('/tmp/bbb', '/tmp/s/ccc%s' % (time.time())) diff --git a/solard/solard/core.py b/solard/solard/core.py index ff73384b..5eaa116f 100644 --- a/solard/solard/core.py +++ b/solard/solard/core.py @@ -221,7 +221,8 @@ class SolardIface(object): return True @staticmethod - def copy_files(solard_context, stream_reader, paths): + def copy_files(solard_context, stream_reader, paths, total_size): + # total_size not used for now for _to, _size in paths: logger.debug("Starting %s size=%d", _to, _size) f = SolardIface.file_start(solard_context, _to) From 77906936ae4a460070d6dedecc954fa5c226b70b Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Fri, 25 Sep 2015 21:40:45 +0200 Subject: [PATCH 15/33] properly handle files with 0 size in solard --- solard/solard/core.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/solard/solard/core.py b/solard/solard/core.py index 5eaa116f..e000e3f6 100644 --- a/solard/solard/core.py +++ b/solard/solard/core.py @@ -226,9 +226,10 @@ class SolardIface(object): for _to, _size in paths: logger.debug("Starting %s size=%d", _to, _size) f = SolardIface.file_start(solard_context, _to) - rdr = stream_reader(_size) - for data in rdr: - f.write(data) + if _size > 0: + rdr = stream_reader(_size) + for data in rdr: + f.write(data) SolardIface.file_end(solard_context, _to) logger.debug("Done %s size=%d", _to, _size) return True From 247777891c1de7a8818cc5b9357c50c9f7c753e0 Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Mon, 28 Sep 2015 12:54:22 +0200 Subject: [PATCH 16/33] Fixed small bug when sending files --- solard/solard/client.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/solard/solard/client.py b/solard/solard/client.py index f71f25cf..41e0834d 100644 --- a/solard/solard/client.py +++ b/solard/solard/client.py @@ -41,11 +41,11 @@ class SolardClient(object): transport = self.transport(use_sudo) for root, _, files in os.walk(_from): for name in files: - _from = os.path.join(root, name) - _to = os.path.join(root.replace(_from, _to), name) - size = os.stat(_from).st_size - to_cp_files.append((_from, _to, size)) - tos = [(x[1], size) for x in to_cp_files] + single_from = os.path.join(root, name) + _to = os.path.join(root.replace(single_from, _to), name) + size = os.stat(single_from).st_size + to_cp_files.append((single_from, _to, size)) + tos = [(x[1], x[2]) for x in to_cp_files] total_size = sum((x[1] for x in tos)) data = {'m': 'copy_files', 'args': (tos, total_size), From 2103f8f60606cfcd9212d22f8523a4108d51cfed Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Mon, 28 Sep 2015 13:47:22 +0200 Subject: [PATCH 17/33] Added SolarRunResult to wrap all Run transports results --- solar/solar/core/handlers/ansible_template.py | 2 +- solar/solar/core/transports/base.py | 18 ++++++++++++++++++ .../solar/core/transports/solard_transport.py | 12 +++++++++--- solar/solar/core/transports/ssh.py | 13 ++++++++++++- solard/solard/client.py | 2 ++ 5 files changed, 42 insertions(+), 5 deletions(-) diff --git a/solar/solar/core/handlers/ansible_template.py b/solar/solar/core/handlers/ansible_template.py index b59438c4..68ebcb4b 100644 --- a/solar/solar/core/handlers/ansible_template.py +++ b/solar/solar/core/handlers/ansible_template.py @@ -20,7 +20,6 @@ from solar.core.log import log from solar.core.handlers.base import TempFileHandler from solar import errors - # otherwise fabric will sys.exit(1) in case of errors env.warn_only = True @@ -29,6 +28,7 @@ env.warn_only = True # we would not need to render it there # for now we redender it locally, sync to remote, run ansible on remote host as local class AnsibleTemplate(TempFileHandler): + def action(self, resource, action_name): inventory_file = self._create_inventory(resource) playbook_file = self._create_playbook(resource, action_name) diff --git a/solar/solar/core/transports/base.py b/solar/solar/core/transports/base.py index 4d1d0983..a844b66d 100644 --- a/solar/solar/core/transports/base.py +++ b/solar/solar/core/transports/base.py @@ -58,6 +58,21 @@ class SolarTransport(object): +class SolarRunResult(object): + + def __init__(self, result, failed=False): + self._result = result + self._failed = failed + + @property + def failed(self): + return self._failed + + @property + def result(self): + return self._result + + class SyncTransport(SolarTransport): """ Transport that is responsible for file / directory syncing. @@ -116,6 +131,9 @@ class RunTransport(SolarTransport): def __init__(self): super(RunTransport, self).__init__() + def get_result(self, *args, **kwargs): + raise NotImplementedError() + def bind_with(self, other): # we migth add there something later # like compat checking etc diff --git a/solar/solar/core/transports/solard_transport.py b/solar/solar/core/transports/solard_transport.py index 0ca72346..252d4824 100644 --- a/solar/solar/core/transports/solard_transport.py +++ b/solar/solar/core/transports/solard_transport.py @@ -15,7 +15,7 @@ from solard.client import SolardClient -from solar.core.transports.base import RunTransport, SyncTransport, Executor +from solar.core.transports.base import RunTransport, SyncTransport, Executor, SolarRunResult from solar.core.log import log @@ -50,13 +50,19 @@ class SolardSyncTransport(SyncTransport, SolardTransport): self.executors.append(executor) - class SolardRunTransport(RunTransport, SolardTransport): preffered_transport_name = 'solard' + def get_result(self, result, failed=False): + return SolarRunResult(result, failed) + def run(self, resource, *args, **kwargs): log.debug("Solard run: %s", args) client = self.get_client(resource) - return client.run(' '.join(args), **kwargs) + try: + res = client.run(' '.join(args), **kwargs) + return self.get_result(res, failed=False) + except Exception as ex: + return self.get_result(ex, failed=True) diff --git a/solar/solar/core/transports/ssh.py b/solar/solar/core/transports/ssh.py index 0495a23d..6323037f 100644 --- a/solar/solar/core/transports/ssh.py +++ b/solar/solar/core/transports/ssh.py @@ -20,6 +20,7 @@ from fabric.contrib import project as fabric_project from solar.core.log import log from solar.core.transports.base import RunTransport, SyncTransport, Executor +from solar.core.transports.base import SolarRunResult class _SSHTransport(object): @@ -86,6 +87,15 @@ class SSHRunTransport(RunTransport, _SSHTransport): preffered_transport_name = 'ssh' + def get_result(self, output): + """ + Needed for compatibility with other handlers / transports + """ + if output.failed: + return SolarRunResult(output, failed=True) + return SolarRunResult(output, failed=False) + + def run(self, resource, *args, **kwargs): log.debug('SSH: %s', args) @@ -109,4 +119,5 @@ class SSHRunTransport(RunTransport, _SSHTransport): managers.append(fabric_api.warn_only()) with nested(*managers): - return executor(' '.join(args)) + res = executor(' '.join(args)) + return self.get_result(res) diff --git a/solard/solard/client.py b/solard/solard/client.py index 41e0834d..2d77c03c 100644 --- a/solard/solard/client.py +++ b/solard/solard/client.py @@ -3,6 +3,8 @@ import os # TODO: handle errors + + class SolardClient(object): read_buffer = 4096 From ae3cb4cad11cef3b1d7f9bcf5bb581a18734ab9d Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Mon, 28 Sep 2015 18:04:11 +0200 Subject: [PATCH 18/33] Solard resource with run action (start-stop-daemon) --- resources/transport_solard/actions/run.yaml | 5 +++++ resources/transport_solard/meta.yaml | 22 +++++++++++++++++++++ 2 files changed, 27 insertions(+) create mode 100644 resources/transport_solard/actions/run.yaml create mode 100644 resources/transport_solard/meta.yaml diff --git a/resources/transport_solard/actions/run.yaml b/resources/transport_solard/actions/run.yaml new file mode 100644 index 00000000..e12eec90 --- /dev/null +++ b/resources/transport_solard/actions/run.yaml @@ -0,0 +1,5 @@ +- hosts: [{{ host }}] + sudo: yes + tasks: + - shell: pip install -e /vagrant/solard + - shell: start-stop-daemon -b --name solard --start --make-pidfile --pidfile /tmp/solard.pid --chdir /vagrant/solard --exec /usr/bin/python -- solard/tcp_server.py diff --git a/resources/transport_solard/meta.yaml b/resources/transport_solard/meta.yaml new file mode 100644 index 00000000..27279735 --- /dev/null +++ b/resources/transport_solard/meta.yaml @@ -0,0 +1,22 @@ +id: transport_solard +handler: ansible +input: + solard_user: + schema: str! + value: + solard_password: + schema: str! + value: + # solard_transport_class: + # schema: str! + # value: + solard_port: + schema: int! + value: 5555 + name: + schema: str! + value: solard + location_id: + schema: str! + value: + reverse: True From dc2c79bac46ef3e9832f752d0ba467df634cbc88 Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Tue, 29 Sep 2015 00:49:26 +0200 Subject: [PATCH 19/33] Fixed problem with transports connected to another transports - When transports_id value is False no magic mappings - Added is_own to location_id, when set to False location_id is not mapped --- resources/transport_solard/meta.yaml | 4 ++++ resources/transport_ssh/meta.yaml | 4 ++++ solar/solar/core/actions.py | 8 ++++---- solar/solar/core/signals.py | 17 +++++++++++++---- 4 files changed, 25 insertions(+), 8 deletions(-) diff --git a/resources/transport_solard/meta.yaml b/resources/transport_solard/meta.yaml index 27279735..ac8b7aab 100644 --- a/resources/transport_solard/meta.yaml +++ b/resources/transport_solard/meta.yaml @@ -20,3 +20,7 @@ input: schema: str! value: reverse: True + is_own: False + transports_id: + schema: str! + value: False diff --git a/resources/transport_ssh/meta.yaml b/resources/transport_ssh/meta.yaml index 4a614ffa..97d27fbd 100644 --- a/resources/transport_ssh/meta.yaml +++ b/resources/transport_ssh/meta.yaml @@ -16,3 +16,7 @@ input: schema: str! value: reverse: True + is_own: False + transports_id: + schema: str! + value: False diff --git a/solar/solar/core/actions.py b/solar/solar/core/actions.py index deff8f69..4b54dbe8 100644 --- a/solar/solar/core/actions.py +++ b/solar/solar/core/actions.py @@ -20,11 +20,11 @@ from solar.core.transports.ssh import SSHSyncTransport, SSHRunTransport from solar.core.transports.solard_transport import SolardRunTransport, SolardSyncTransport _default_transports = { - # 'sync': SSHSyncTransport, # 'sync': RsyncSyncTransport, - # 'run': SSHRunTransport - 'run': SolardRunTransport, - 'sync': SolardSyncTransport + 'sync': SSHSyncTransport, + 'run': SSHRunTransport +# 'run': SolardRunTransport, +# 'sync': SolardSyncTransport } def resource_action(resource, action): diff --git a/solar/solar/core/signals.py b/solar/solar/core/signals.py index 6413e0d4..d741a82e 100644 --- a/solar/solar/core/signals.py +++ b/solar/solar/core/signals.py @@ -58,12 +58,21 @@ def location_and_transports(emitter, receiver, orig_mapping): elif isinstance(orig_mapping, set): orig_mapping.remove(single) - def _single(single, inps_emitter, inps_receiver): + def _single(single, emitter, receiver, inps_emitter, inps_receiver): if inps_emitter and inps_receiver: - log.debug("location and transports different, skipping") - return + if not inps_emitter == inps_receiver: + log.warning("Different %r defined %r => %r", single, emitter.name, receiver.name) + return + else: + log.debug("The same %r defined for %r => %r, skipping", single, emitter.name, receiver.name) + return emitter_single = emitter.db_obj.meta_inputs[single] receiver_single = receiver.db_obj.meta_inputs[single] + if emitter_single.get('value') is False: + log.debug("Disabled %r mapping for %r", single, emitter.name) + return + if receiver_single.get('is_own') is False: + log.debug("Not is_own %r for %r ", single, emitter.name) emitter_single_reverse = emitter_single.get('reverse') receiver_single_reverse = receiver_single.get('reverse') # connect in other direction @@ -86,7 +95,7 @@ def location_and_transports(emitter, receiver, orig_mapping): # XXX: should be somehow parametrized (input attribute?) for single in ('transports_id', 'location_id'): if single in inps_emitter and inps_receiver: - _single(single, inps_emitter[single], inps_receiver[single]) + _single(single, emitter, receiver, inps_emitter[single], inps_receiver[single]) else: log.warning('Unable to create connection for %s with' ' emitter %s, receiver %s', From 94159cb81120649991e9f324de39a24d7c406149 Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Tue, 29 Sep 2015 01:13:44 +0200 Subject: [PATCH 20/33] Added solard example --- examples/solard/example.py | 68 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 examples/solard/example.py diff --git a/examples/solard/example.py b/examples/solard/example.py new file mode 100644 index 00000000..01737e75 --- /dev/null +++ b/examples/solard/example.py @@ -0,0 +1,68 @@ +import click +import sys +import time + +from solar.core import resource +from solar.core import signals +from solar.core.resource import virtual_resource as vr + +from solar.interfaces.db import get_db + + +db = get_db() + + + +def run(): + db.clear() + + node = vr.create('node', 'resources/ro_node', {'name': 'first' + str(time.time()), + 'ip': '10.0.0.3', + 'node_id': 'node1', + })[0] + + transports = vr.create('transports_node1', 'resources/transports')[0] + transports_for_solard = vr.create('transports_for_solard', 'resources/transports')[0] + + ssh_transport = vr.create('ssh_transport', 'resources/transport_ssh', + {'ssh_key': '/vagrant/.vagrant/machines/solar-dev1/virtualbox/private_key', + 'ssh_user': 'vagrant'})[0] + + solard_transport = vr.create('solard_transport', 'resources/transport_solard', + {'solard_user': 'vagrant', + 'solard_password': 'password'})[0] + + signals.connect(transports_for_solard, solard_transport, {}) + + signals.connect(ssh_transport, transports_for_solard, {'ssh_key': 'transports:key', + 'ssh_user': 'transports:user', + 'ssh_port': 'transports:port', + 'name': 'transports:name'}) + # set transports_id + signals.connect(transports, node, {}) + + # it uses reverse mappings + signals.connect(ssh_transport, transports, {'ssh_key': 'transports:key', + 'ssh_user': 'transports:user', + 'ssh_port': 'transports:port', + 'name': 'transports:name'}) + + signals.connect(solard_transport, transports, {'solard_user': 'transports:user', + 'solard_port': 'transports:port', + 'solard_password': 'transports:password', + 'name': 'transports:name'}) + + + hosts = vr.create('hosts_file', 'resources/hosts_file', {})[0] + signals.connect(node, hosts, { + 'ip': 'hosts:ip', + 'name': 'hosts:name' + }) + + # for r in (node, hosts, ssh_transport, transports): + # print r.name, repr(r.args['location_id']), repr(r.args['transports_id']) + + # print hosts.transports() + # print hosts.ip() + +run() From 786fb7499cd59977c55ff5b20b49561326b7e44a Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Tue, 29 Sep 2015 11:04:04 +0200 Subject: [PATCH 21/33] Removed some debug statement --- solard/solard/tcp_server.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/solard/solard/tcp_server.py b/solard/solard/tcp_server.py index b1e055c3..4977f66f 100644 --- a/solard/solard/tcp_server.py +++ b/solard/solard/tcp_server.py @@ -74,9 +74,6 @@ class SolardTCPHandler(object): try: return msgpack.unpackb(''.join(d)) except: - print '--------------------' - print repr(d) - print '--------------------' raise def _read_stream(self, size=None): From 8e82470a5f41553ee4f130c9575c666e78b667be Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Tue, 29 Sep 2015 15:56:53 +0200 Subject: [PATCH 22/33] Removed gevent requirement for now --- solard/solard/tcp_server.py | 39 ++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/solard/solard/tcp_server.py b/solard/solard/tcp_server.py index 4977f66f..7c2e07f2 100644 --- a/solard/solard/tcp_server.py +++ b/solard/solard/tcp_server.py @@ -11,11 +11,13 @@ # License for the specific language governing permissions and limitations # under the License. -from gevent import monkey -monkey.patch_all() +# from gevent import monkey +# monkey.patch_all() -from gevent.server import StreamServer +# from gevent.server import StreamServer + +from SocketServer import ThreadingTCPServer, BaseRequestHandler import socket import msgpack @@ -167,11 +169,16 @@ class SolardTCPHandler(object): os.setgid(pw_gid) os.setuid(pw_uid) logger.debug("Child forked %d", os.getpid()) + self._fix_env(pw_uid) self.forked = True self._write_ok(True) return True return None + def _fix_env(self, pw_uid): + pw_dir = pwd.getpwuid(pw_uid).pw_dir + os.environ['HOME'] = pw_dir + def process(self): try: @@ -241,16 +248,12 @@ class SolardTCPHandler(object): logger.exception("Got exception") +class SolardReqHandler(BaseRequestHandler): -class SolardTCPServer(StreamServer): - - allow_reuse_address = True - - def __init__(self, *args, **kwargs): - StreamServer.__init__(self, *args, **kwargs) - - def handle(self, sock, address): + def handle(self): close = True + sock = self.request + address = self.client_address h = SolardTCPHandler(sock, address) try: logger.debug("New from %s:%d" % address) @@ -276,9 +279,19 @@ class SolardTCPServer(StreamServer): sock.close() if h.forked: # if forked we can safely exit now - sys.exit() + os._exit(0) + + +class SolardTCPServer(ThreadingTCPServer): + + allow_reuse_address = True + + def __init__(self, *args, **kwargs): + # StreamServer.__init__(self, *args, **kwargs) + ThreadingTCPServer.__init__(self, *args, **kwargs) + if __name__ == '__main__': - s = SolardTCPServer(('0.0.0.0', 5555)) + s = SolardTCPServer(('0.0.0.0', 5555), SolardReqHandler) s.serve_forever() From 59fdb314120f4e01de5d92efefe464799dea94f6 Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Tue, 29 Sep 2015 15:57:08 +0200 Subject: [PATCH 23/33] Added BAT transport --- solar/solar/core/transports/bat.py | 95 ++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 solar/solar/core/transports/bat.py diff --git a/solar/solar/core/transports/bat.py b/solar/solar/core/transports/bat.py new file mode 100644 index 00000000..b4c32b12 --- /dev/null +++ b/solar/solar/core/transports/bat.py @@ -0,0 +1,95 @@ +from solar.core.transports.base import SyncTransport, RunTransport, SolarTransport +from solar.core.transports.ssh import SSHSyncTransport, SSHRunTransport +from solar.core.transports.rsync import RsyncSyncTransport +from solar.core.transports.solard_transport import SolardRunTransport, SolardSyncTransport + +KNOWN_SYNC_TRANSPORTS = { + 'solard': SolardSyncTransport, + 'rsync': RsyncSyncTransport, + 'ssh': SSHSyncTransport +} + + +KNOWN_RUN_TRANSPORTS = { + 'solard': SolardRunTransport, + 'ssh': SSHRunTransport +} + + +class OnAll(object): + + def __init__(self, target): + self._target = target + + def __get__(self, obj, objtype): + def _inner(*args, **kwargs): + for transport in obj._used_transports: + getattr(transport, self._target)(*args, **kwargs) + return _inner + + +class BatTransport(SolarTransport): + + _order = () + + def __init__(self, *args, **kwargs): + super(BatTransport, self).__init__(*args, **kwargs) + self._cache = {} + self._used_transports = [] + + def select_valid_transport(self, resource, *args, **kwargs): + key_name = '_bat_transport_%s' % self._mode + try: + return getattr(resource, key_name) + except AttributeError: + transports = resource.transports() + for pref in self._order: + selected = next((x for x in transports if x['name'] == pref), None) + if selected: + break + if not selected: + raise Exception("No valid transport found") + instance = self._bat_transports[selected['name']]() + setattr(resource, '_used_transport', selected) + setattr(resource, key_name, instance) + self._used_transports.append(instance) + return instance + # return self._bat_transports[selected['name']] + + def get_transport_data(self, resource, *args, **kwargs): + self.select_valid_transport(resource) + return super(BatTransport, self).get_transport_data(resource, *args, **kwargs) + + +class BatSyncTransport(SyncTransport, BatTransport): + + preffered_transport_name = None + _order = ('solard', 'rsync', 'ssh') + _bat_transports = KNOWN_SYNC_TRANSPORTS + + def __init__(self, *args, **kwargs): + BatTransport.__init__(self) + SyncTransport.__init__(self, *args, **kwargs) + + def copy(self, resource, *args, **kwargs): + transport = self.select_valid_transport(resource) + return transport.copy(resource, *args, **kwargs) + + run_all = OnAll('run_all') + preprocess_all = OnAll('preprocess_all') + + +class BatRunTransport(RunTransport, BatTransport): + + preffered_transport_name = None + _order = ('solard', 'ssh') + _bat_transports = KNOWN_RUN_TRANSPORTS + + def __init__(self, *args, **kwargs): + BatTransport.__init__(self) + RunTransport.__init__(self, *args, **kwargs) + + def run(self, resource, *args, **kwargs): + transport = self.select_valid_transport(resource) + return transport.run(resource, *args, **kwargs) + From c6b50b4c13fcc967812264acf7c260e27fad9afe Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Tue, 29 Sep 2015 15:57:17 +0200 Subject: [PATCH 24/33] Better debug in solard transport --- solar/solar/core/transports/solard_transport.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/solar/solar/core/transports/solard_transport.py b/solar/solar/core/transports/solard_transport.py index 252d4824..3c0f7156 100644 --- a/solar/solar/core/transports/solard_transport.py +++ b/solar/solar/core/transports/solard_transport.py @@ -21,7 +21,6 @@ from solar.core.log import log class SolardTransport(object): - def get_client(self, resource): transport = self.get_transport_data(resource) host = resource.ip() @@ -64,5 +63,6 @@ class SolardRunTransport(RunTransport, SolardTransport): res = client.run(' '.join(args), **kwargs) return self.get_result(res, failed=False) except Exception as ex: + log.exception("Exception during solard run") return self.get_result(ex, failed=True) From 221071dd3a151e2aa0c7f7b3d72884c21f2179c7 Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Tue, 29 Sep 2015 15:57:31 +0200 Subject: [PATCH 25/33] Adjustments in transports for latest changes --- solar/solar/core/actions.py | 17 +++++++++++------ solar/solar/core/transports/base.py | 11 ++++++++++- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/solar/solar/core/actions.py b/solar/solar/core/actions.py index 4b54dbe8..5faac2ce 100644 --- a/solar/solar/core/actions.py +++ b/solar/solar/core/actions.py @@ -15,18 +15,23 @@ import handlers -from solar.core.transports.ssh import SSHSyncTransport, SSHRunTransport +# from solar.core.transports.ssh import SSHSyncTransport, SSHRunTransport # from solar.core.transports.rsync import RsyncSyncTransport -from solar.core.transports.solard_transport import SolardRunTransport, SolardSyncTransport +# from solar.core.transports.solard_transport import SolardRunTransport, SolardSyncTransport + +from solar.core.transports.bat import BatRunTransport, BatSyncTransport _default_transports = { # 'sync': RsyncSyncTransport, - 'sync': SSHSyncTransport, - 'run': SSHRunTransport -# 'run': SolardRunTransport, -# 'sync': SolardSyncTransport + # 'sync': SSHSyncTransport, + # 'run': SSHRunTransport, + # 'run': SolardRunTransport, + # 'sync': SolardSyncTransport + 'run': BatRunTransport, + 'sync': BatSyncTransport } + def resource_action(resource, action): handler = resource.db_obj.handler or 'none' with handlers.get(handler)([resource], _default_transports) as h: diff --git a/solar/solar/core/transports/base.py b/solar/solar/core/transports/base.py index a844b66d..241bae43 100644 --- a/solar/solar/core/transports/base.py +++ b/solar/solar/core/transports/base.py @@ -42,6 +42,8 @@ class Executor(object): class SolarTransport(object): + _mode = None + def __init__(self): pass @@ -60,7 +62,7 @@ class SolarTransport(object): class SolarRunResult(object): - def __init__(self, result, failed=False): + def __init__(self, result, failed=True): self._result = result self._failed = failed @@ -72,6 +74,11 @@ class SolarRunResult(object): def result(self): return self._result + def __str__(self): + if self.failed: + return str(self.failed) + return str(self.result) + class SyncTransport(SolarTransport): """ @@ -79,6 +86,7 @@ class SyncTransport(SolarTransport): """ preffered_transport_name = None + _mode = 'sync' def __init__(self): super(SyncTransport, self).__init__() @@ -127,6 +135,7 @@ class RunTransport(SolarTransport): """ preffered_transport_name = None + _mode = 'run' def __init__(self): super(RunTransport, self).__init__() From 88ba25db2e519734004a35d17daabf6950873ed1 Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Tue, 29 Sep 2015 18:58:20 +0200 Subject: [PATCH 26/33] Changed start-stop-daemon arguments --- resources/transport_solard/actions/run.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resources/transport_solard/actions/run.yaml b/resources/transport_solard/actions/run.yaml index e12eec90..52af16da 100644 --- a/resources/transport_solard/actions/run.yaml +++ b/resources/transport_solard/actions/run.yaml @@ -2,4 +2,4 @@ sudo: yes tasks: - shell: pip install -e /vagrant/solard - - shell: start-stop-daemon -b --name solard --start --make-pidfile --pidfile /tmp/solard.pid --chdir /vagrant/solard --exec /usr/bin/python -- solard/tcp_server.py + - shell: start-stop-daemon -b --start --make-pidfile --pidfile /tmp/solard.pid --chdir /vagrant/solard --startas /bin/bash -- -c "exec python /vagrant/solard/solard/tcp_server.py > /tmp/solard.log 2>&1" From f1d8f937532804fbc6f85dec5ceb07ac855623ec Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Tue, 29 Sep 2015 20:54:46 +0200 Subject: [PATCH 27/33] Docummented signals.py magic, adjusted behaviour in some cases --- solar/solar/core/signals.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/solar/solar/core/signals.py b/solar/solar/core/signals.py index d741a82e..b3efc593 100644 --- a/solar/solar/core/signals.py +++ b/solar/solar/core/signals.py @@ -59,6 +59,13 @@ def location_and_transports(emitter, receiver, orig_mapping): orig_mapping.remove(single) def _single(single, emitter, receiver, inps_emitter, inps_receiver): + # this function is responsible for doing magic with transports_id and location_id + # it tries to be safe and smart as possible + # it connects only when 100% that it can and should + # user can always use direct mappings, + # we also use direct mappings in VR + # when we will remove location_id and transports_id from inputs then this function, + # will be deleted too if inps_emitter and inps_receiver: if not inps_emitter == inps_receiver: log.warning("Different %r defined %r => %r", single, emitter.name, receiver.name) @@ -68,13 +75,24 @@ def location_and_transports(emitter, receiver, orig_mapping): return emitter_single = emitter.db_obj.meta_inputs[single] receiver_single = receiver.db_obj.meta_inputs[single] + emitter_single_reverse = emitter_single.get('reverse') + receiver_single_reverse = receiver_single.get('reverse') + if inps_receiver is None and inps_emitter is not None: + # we don't connect automaticaly when receiver is None and emitter is not None + # for cases when we connect existing transports to other data containers + if receiver_single_reverse: + log.info("Didn't connect automaticaly %s::%s -> %s::%s", + receiver.name, + single, + emitter.name, + single) + return if emitter_single.get('value') is False: log.debug("Disabled %r mapping for %r", single, emitter.name) return if receiver_single.get('is_own') is False: log.debug("Not is_own %r for %r ", single, emitter.name) - emitter_single_reverse = emitter_single.get('reverse') - receiver_single_reverse = receiver_single.get('reverse') + return # connect in other direction if emitter_single_reverse: if receiver_single_reverse: From 674b11a72a8e08477b61c08914096cd21f15d55f Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Tue, 29 Sep 2015 20:54:59 +0200 Subject: [PATCH 28/33] Possibility to add solard in riak example --- examples/riak/riaks.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/examples/riak/riaks.py b/examples/riak/riaks.py index 8c8b4741..0cf93744 100755 --- a/examples/riak/riaks.py +++ b/examples/riak/riaks.py @@ -208,6 +208,31 @@ def setup_haproxies(): add_event(event) +@click.command() +@click.argument('i', type=int, required=True) +def add_solard(i): + solard_transport = vr.create('solard_transport%s' % i, 'resources/transport_solard', + {'solard_user': 'vagrant', + 'solard_password': 'password'})[0] + transports = resource.load('transports%s' % i) + ssh_transport = resource.load('ssh_transport%s' % i) + transports_for_solard = vr.create('transports_for_solard%s' % i, 'resources/transports')[0] + + # install solard with ssh + signals.connect(transports_for_solard, solard_transport, {}) + + signals.connect(ssh_transport, transports_for_solard, {'ssh_key': 'transports:key', + 'ssh_user': 'transports:user', + 'ssh_port': 'transports:port', + 'name': 'transports:name'}) + + # add solard to transports on this node + signals.connect(solard_transport, transports, {'solard_user': 'transports:user', + 'solard_port': 'transports:port', + 'solard_password': 'transports:password', + 'name': 'transports:name'}) + + @click.group() def main(): pass @@ -231,6 +256,7 @@ def undeploy(): main.add_command(deploy) main.add_command(undeploy) main.add_command(add_haproxies) +main.add_command(add_solard) if __name__ == '__main__': From dcd1ea23d8320e9b8e93d70ab65de1be1c540465 Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Wed, 30 Sep 2015 10:08:26 +0200 Subject: [PATCH 29/33] Everything validates now --- resources/transport_solard/meta.yaml | 6 +++--- resources/transport_ssh/meta.yaml | 7 ++++--- resources/transports/meta.yaml | 2 +- solar/solar/core/signals.py | 8 +++++++- 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/resources/transport_solard/meta.yaml b/resources/transport_solard/meta.yaml index ac8b7aab..41532c5d 100644 --- a/resources/transport_solard/meta.yaml +++ b/resources/transport_solard/meta.yaml @@ -17,10 +17,10 @@ input: schema: str! value: solard location_id: - schema: str! + schema: str value: reverse: True is_own: False transports_id: - schema: str! - value: False + schema: str + is_emit: False diff --git a/resources/transport_ssh/meta.yaml b/resources/transport_ssh/meta.yaml index 97d27fbd..4b607ec6 100644 --- a/resources/transport_ssh/meta.yaml +++ b/resources/transport_ssh/meta.yaml @@ -13,10 +13,11 @@ input: schema: str! value: ssh location_id: - schema: str! + schema: str value: reverse: True is_own: False transports_id: - schema: str! - value: False + schema: str + value: + is_emit: False diff --git a/resources/transports/meta.yaml b/resources/transports/meta.yaml index b5c70947..d73bb018 100644 --- a/resources/transports/meta.yaml +++ b/resources/transports/meta.yaml @@ -8,6 +8,6 @@ input: value: $uuid reverse: True location_id: - schema: str! + schema: str value: reverse: True diff --git a/solar/solar/core/signals.py b/solar/solar/core/signals.py index b3efc593..760522b7 100644 --- a/solar/solar/core/signals.py +++ b/solar/solar/core/signals.py @@ -87,10 +87,16 @@ def location_and_transports(emitter, receiver, orig_mapping): emitter.name, single) return - if emitter_single.get('value') is False: + if emitter_single.get('is_emit') is False: + # this case is when we connect resource to transport itself + # like adding ssh_transport for solard_transport and we don't want then + # transports_id to be messed + # it forbids passing this value around log.debug("Disabled %r mapping for %r", single, emitter.name) return if receiver_single.get('is_own') is False: + # this case is when we connect resource which has location_id but that is + # from another resource log.debug("Not is_own %r for %r ", single, emitter.name) return # connect in other direction From 0bfd60d28185a2ed3435e4acb1aaefdf62e63512 Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Wed, 30 Sep 2015 10:10:52 +0200 Subject: [PATCH 30/33] .get for checking if $uuid --- solar/solar/core/resource/resource.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/solar/solar/core/resource/resource.py b/solar/solar/core/resource/resource.py index 214a03dd..4d60bc02 100644 --- a/solar/solar/core/resource/resource.py +++ b/solar/solar/core/resource/resource.py @@ -103,7 +103,7 @@ class Resource(object): inputs.setdefault('transports_id', {'value': "", 'schema': 'str'}) for inp in ('transports_id', 'location_id'): - if inputs[inp]['value'] == '$uuid': + if inputs[inp].get('value') == '$uuid': inputs[inp]['value'] = md5(self.name + uuid4().hex).hexdigest() def transports(self): From 12e333f29b6dce503736be2d1827106a99b967c1 Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Wed, 30 Sep 2015 10:15:11 +0200 Subject: [PATCH 31/33] Stop solard before start --- resources/transport_solard/actions/run.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/resources/transport_solard/actions/run.yaml b/resources/transport_solard/actions/run.yaml index 52af16da..541bf215 100644 --- a/resources/transport_solard/actions/run.yaml +++ b/resources/transport_solard/actions/run.yaml @@ -2,4 +2,6 @@ sudo: yes tasks: - shell: pip install -e /vagrant/solard + - shell: start-stop-daemon --stop --make-pidfile --pidfile /tmp/solard.pid --chdir /vagrant/solard --startas /bin/bash -- -c "exec python /vagrant/solard/solard/tcp_server.py > /tmp/solard.log 2>&1" + ignore_errors: True - shell: start-stop-daemon -b --start --make-pidfile --pidfile /tmp/solard.pid --chdir /vagrant/solard --startas /bin/bash -- -c "exec python /vagrant/solard/solard/tcp_server.py > /tmp/solard.log 2>&1" From 4eb7dc44e4a30ecfa9a1d36b249bb4e6c231a5ab Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Wed, 30 Sep 2015 13:13:37 +0200 Subject: [PATCH 32/33] Changed comment in signals.py --- solar/solar/core/signals.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/solar/solar/core/signals.py b/solar/solar/core/signals.py index 760522b7..5a7681af 100644 --- a/solar/solar/core/signals.py +++ b/solar/solar/core/signals.py @@ -49,7 +49,9 @@ def guess_mapping(emitter, receiver): def location_and_transports(emitter, receiver, orig_mapping): - # XXX: we didn't agree on that reverse thign there + # XXX: we definitely need to change this + # inputs shouldn't carry is_own, or is_emit flags + # but for now we don't have anything better def _remove_from_mapping(single): if single in orig_mapping: From e687d10f68645956ee69c89821b931bfa79db9c6 Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Wed, 30 Sep 2015 14:41:58 +0200 Subject: [PATCH 33/33] Removed gevent from requirements.txt --- solard/requirements.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/solard/requirements.txt b/solard/requirements.txt index d4dfb93f..5a66c8fb 100644 --- a/solard/requirements.txt +++ b/solard/requirements.txt @@ -1,2 +1 @@ -gevent>=1.0.2 msgpack-python>=0.4.6