Merge pull request #144 from pigmej/solard_idea

Solard idea
This commit is contained in:
Łukasz Oleś 2015-10-01 09:59:03 +02:00
commit 7f90bcb805
23 changed files with 1354 additions and 13 deletions

View File

@ -208,6 +208,31 @@ def setup_haproxies():
add_event(event)
@click.command()
@click.argument('i', type=int, required=True)
def add_solard(i):
solard_transport = vr.create('solard_transport%s' % i, 'resources/transport_solard',
{'solard_user': 'vagrant',
'solard_password': 'password'})[0]
transports = resource.load('transports%s' % i)
ssh_transport = resource.load('ssh_transport%s' % i)
transports_for_solard = vr.create('transports_for_solard%s' % i, 'resources/transports')[0]
# install solard with ssh
signals.connect(transports_for_solard, solard_transport, {})
signals.connect(ssh_transport, transports_for_solard, {'ssh_key': 'transports:key',
'ssh_user': 'transports:user',
'ssh_port': 'transports:port',
'name': 'transports:name'})
# add solard to transports on this node
signals.connect(solard_transport, transports, {'solard_user': 'transports:user',
'solard_port': 'transports:port',
'solard_password': 'transports:password',
'name': 'transports:name'})
@click.group()
def main():
pass
@ -231,6 +256,7 @@ def undeploy():
main.add_command(deploy)
main.add_command(undeploy)
main.add_command(add_haproxies)
main.add_command(add_solard)
if __name__ == '__main__':

View File

@ -0,0 +1,68 @@
import click
import sys
import time
from solar.core import resource
from solar.core import signals
from solar.core.resource import virtual_resource as vr
from solar.interfaces.db import get_db
db = get_db()
def run():
db.clear()
node = vr.create('node', 'resources/ro_node', {'name': 'first' + str(time.time()),
'ip': '10.0.0.3',
'node_id': 'node1',
})[0]
transports = vr.create('transports_node1', 'resources/transports')[0]
transports_for_solard = vr.create('transports_for_solard', 'resources/transports')[0]
ssh_transport = vr.create('ssh_transport', 'resources/transport_ssh',
{'ssh_key': '/vagrant/.vagrant/machines/solar-dev1/virtualbox/private_key',
'ssh_user': 'vagrant'})[0]
solard_transport = vr.create('solard_transport', 'resources/transport_solard',
{'solard_user': 'vagrant',
'solard_password': 'password'})[0]
signals.connect(transports_for_solard, solard_transport, {})
signals.connect(ssh_transport, transports_for_solard, {'ssh_key': 'transports:key',
'ssh_user': 'transports:user',
'ssh_port': 'transports:port',
'name': 'transports:name'})
# set transports_id
signals.connect(transports, node, {})
# it uses reverse mappings
signals.connect(ssh_transport, transports, {'ssh_key': 'transports:key',
'ssh_user': 'transports:user',
'ssh_port': 'transports:port',
'name': 'transports:name'})
signals.connect(solard_transport, transports, {'solard_user': 'transports:user',
'solard_port': 'transports:port',
'solard_password': 'transports:password',
'name': 'transports:name'})
hosts = vr.create('hosts_file', 'resources/hosts_file', {})[0]
signals.connect(node, hosts, {
'ip': 'hosts:ip',
'name': 'hosts:name'
})
# for r in (node, hosts, ssh_transport, transports):
# print r.name, repr(r.args['location_id']), repr(r.args['transports_id'])
# print hosts.transports()
# print hosts.ip()
run()

View File

@ -0,0 +1,7 @@
- hosts: [{{ host }}]
sudo: yes
tasks:
- shell: pip install -e /vagrant/solard
- shell: start-stop-daemon --stop --make-pidfile --pidfile /tmp/solard.pid --chdir /vagrant/solard --startas /bin/bash -- -c "exec python /vagrant/solard/solard/tcp_server.py > /tmp/solard.log 2>&1"
ignore_errors: True
- shell: start-stop-daemon -b --start --make-pidfile --pidfile /tmp/solard.pid --chdir /vagrant/solard --startas /bin/bash -- -c "exec python /vagrant/solard/solard/tcp_server.py > /tmp/solard.log 2>&1"

View File

@ -0,0 +1,26 @@
id: transport_solard
handler: ansible
input:
solard_user:
schema: str!
value:
solard_password:
schema: str!
value:
# solard_transport_class:
# schema: str!
# value:
solard_port:
schema: int!
value: 5555
name:
schema: str!
value: solard
location_id:
schema: str
value:
reverse: True
is_own: False
transports_id:
schema: str
is_emit: False

View File

@ -13,6 +13,11 @@ input:
schema: str!
value: ssh
location_id:
schema: str!
schema: str
value:
reverse: True
is_own: False
transports_id:
schema: str
value:
is_emit: False

View File

@ -8,6 +8,6 @@ input:
value: $uuid
reverse: True
location_id:
schema: str!
schema: str
value:
reverse: True

View File

@ -15,15 +15,23 @@
import handlers
from solar.core.transports.ssh import SSHSyncTransport, SSHRunTransport
# from solar.core.transports.ssh import SSHSyncTransport, SSHRunTransport
# from solar.core.transports.rsync import RsyncSyncTransport
# from solar.core.transports.solard_transport import SolardRunTransport, SolardSyncTransport
from solar.core.transports.bat import BatRunTransport, BatSyncTransport
_default_transports = {
'sync': SSHSyncTransport,
# 'sync': RsyncSyncTransport,
'run': SSHRunTransport
# 'sync': SSHSyncTransport,
# 'run': SSHRunTransport,
# 'run': SolardRunTransport,
# 'sync': SolardSyncTransport
'run': BatRunTransport,
'sync': BatSyncTransport
}
def resource_action(resource, action):
handler = resource.db_obj.handler or 'none'
with handlers.get(handler)([resource], _default_transports) as h:

View File

@ -20,7 +20,6 @@ from solar.core.log import log
from solar.core.handlers.base import TempFileHandler
from solar import errors
# otherwise fabric will sys.exit(1) in case of errors
env.warn_only = True
@ -29,6 +28,7 @@ env.warn_only = True
# we would not need to render it there
# for now we redender it locally, sync to remote, run ansible on remote host as local
class AnsibleTemplate(TempFileHandler):
def action(self, resource, action_name):
inventory_file = self._create_inventory(resource)
playbook_file = self._create_playbook(resource, action_name)

View File

@ -109,7 +109,7 @@ class Resource(object):
inputs.setdefault('transports_id', {'value': "",
'schema': 'str'})
for inp in ('transports_id', 'location_id'):
if inputs[inp]['value'] == '$uuid':
if inputs[inp].get('value') == '$uuid':
inputs[inp]['value'] = md5(self.name + uuid4().hex).hexdigest()
def transports(self):

View File

@ -49,7 +49,9 @@ def guess_mapping(emitter, receiver):
def location_and_transports(emitter, receiver, orig_mapping):
# XXX: we didn't agree on that reverse thign there
# XXX: we definitely need to change this
# inputs shouldn't carry is_own, or is_emit flags
# but for now we don't have anything better
def _remove_from_mapping(single):
if single in orig_mapping:
@ -58,14 +60,47 @@ def location_and_transports(emitter, receiver, orig_mapping):
elif isinstance(orig_mapping, set):
orig_mapping.remove(single)
def _single(single, inps_emitter, inps_receiver):
def _single(single, emitter, receiver, inps_emitter, inps_receiver):
# this function is responsible for doing magic with transports_id and location_id
# it tries to be safe and smart as possible
# it connects only when 100% that it can and should
# user can always use direct mappings,
# we also use direct mappings in VR
# when we will remove location_id and transports_id from inputs then this function,
# will be deleted too
if inps_emitter and inps_receiver:
log.debug("location and transports different, skipping")
if not inps_emitter == inps_receiver:
log.warning("Different %r defined %r => %r", single, emitter.name, receiver.name)
return
else:
log.debug("The same %r defined for %r => %r, skipping", single, emitter.name, receiver.name)
return
emitter_single = emitter.db_obj.meta_inputs[single]
receiver_single = receiver.db_obj.meta_inputs[single]
emitter_single_reverse = emitter_single.get('reverse')
receiver_single_reverse = receiver_single.get('reverse')
if inps_receiver is None and inps_emitter is not None:
# we don't connect automaticaly when receiver is None and emitter is not None
# for cases when we connect existing transports to other data containers
if receiver_single_reverse:
log.info("Didn't connect automaticaly %s::%s -> %s::%s",
receiver.name,
single,
emitter.name,
single)
return
if emitter_single.get('is_emit') is False:
# this case is when we connect resource to transport itself
# like adding ssh_transport for solard_transport and we don't want then
# transports_id to be messed
# it forbids passing this value around
log.debug("Disabled %r mapping for %r", single, emitter.name)
return
if receiver_single.get('is_own') is False:
# this case is when we connect resource which has location_id but that is
# from another resource
log.debug("Not is_own %r for %r ", single, emitter.name)
return
# connect in other direction
if emitter_single_reverse:
if receiver_single_reverse:
@ -86,7 +121,7 @@ def location_and_transports(emitter, receiver, orig_mapping):
# XXX: should be somehow parametrized (input attribute?)
for single in ('transports_id', 'location_id'):
if single in inps_emitter and inps_receiver:
_single(single, inps_emitter[single], inps_receiver[single])
_single(single, emitter, receiver, inps_emitter[single], inps_receiver[single])
else:
log.warning('Unable to create connection for %s with'
' emitter %s, receiver %s',

View File

@ -42,6 +42,8 @@ class Executor(object):
class SolarTransport(object):
_mode = None
def __init__(self):
pass
@ -58,12 +60,33 @@ class SolarTransport(object):
class SolarRunResult(object):
def __init__(self, result, failed=True):
self._result = result
self._failed = failed
@property
def failed(self):
return self._failed
@property
def result(self):
return self._result
def __str__(self):
if self.failed:
return str(self.failed)
return str(self.result)
class SyncTransport(SolarTransport):
"""
Transport that is responsible for file / directory syncing.
"""
preffered_transport_name = None
_mode = 'sync'
def __init__(self):
super(SyncTransport, self).__init__()
@ -112,10 +135,14 @@ class RunTransport(SolarTransport):
"""
preffered_transport_name = None
_mode = 'run'
def __init__(self):
super(RunTransport, self).__init__()
def get_result(self, *args, **kwargs):
raise NotImplementedError()
def bind_with(self, other):
# we migth add there something later
# like compat checking etc

View File

@ -0,0 +1,95 @@
from solar.core.transports.base import SyncTransport, RunTransport, SolarTransport
from solar.core.transports.ssh import SSHSyncTransport, SSHRunTransport
from solar.core.transports.rsync import RsyncSyncTransport
from solar.core.transports.solard_transport import SolardRunTransport, SolardSyncTransport
KNOWN_SYNC_TRANSPORTS = {
'solard': SolardSyncTransport,
'rsync': RsyncSyncTransport,
'ssh': SSHSyncTransport
}
KNOWN_RUN_TRANSPORTS = {
'solard': SolardRunTransport,
'ssh': SSHRunTransport
}
class OnAll(object):
def __init__(self, target):
self._target = target
def __get__(self, obj, objtype):
def _inner(*args, **kwargs):
for transport in obj._used_transports:
getattr(transport, self._target)(*args, **kwargs)
return _inner
class BatTransport(SolarTransport):
_order = ()
def __init__(self, *args, **kwargs):
super(BatTransport, self).__init__(*args, **kwargs)
self._cache = {}
self._used_transports = []
def select_valid_transport(self, resource, *args, **kwargs):
key_name = '_bat_transport_%s' % self._mode
try:
return getattr(resource, key_name)
except AttributeError:
transports = resource.transports()
for pref in self._order:
selected = next((x for x in transports if x['name'] == pref), None)
if selected:
break
if not selected:
raise Exception("No valid transport found")
instance = self._bat_transports[selected['name']]()
setattr(resource, '_used_transport', selected)
setattr(resource, key_name, instance)
self._used_transports.append(instance)
return instance
# return self._bat_transports[selected['name']]
def get_transport_data(self, resource, *args, **kwargs):
self.select_valid_transport(resource)
return super(BatTransport, self).get_transport_data(resource, *args, **kwargs)
class BatSyncTransport(SyncTransport, BatTransport):
preffered_transport_name = None
_order = ('solard', 'rsync', 'ssh')
_bat_transports = KNOWN_SYNC_TRANSPORTS
def __init__(self, *args, **kwargs):
BatTransport.__init__(self)
SyncTransport.__init__(self, *args, **kwargs)
def copy(self, resource, *args, **kwargs):
transport = self.select_valid_transport(resource)
return transport.copy(resource, *args, **kwargs)
run_all = OnAll('run_all')
preprocess_all = OnAll('preprocess_all')
class BatRunTransport(RunTransport, BatTransport):
preffered_transport_name = None
_order = ('solard', 'ssh')
_bat_transports = KNOWN_RUN_TRANSPORTS
def __init__(self, *args, **kwargs):
BatTransport.__init__(self)
RunTransport.__init__(self, *args, **kwargs)
def run(self, resource, *args, **kwargs):
transport = self.select_valid_transport(resource)
return transport.run(resource, *args, **kwargs)

View File

@ -0,0 +1,68 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from solard.client import SolardClient
from solar.core.transports.base import RunTransport, SyncTransport, Executor, SolarRunResult
from solar.core.log import log
class SolardTransport(object):
def get_client(self, resource):
transport = self.get_transport_data(resource)
host = resource.ip()
user = transport['user']
port = transport['port']
auth = transport['password']
transport_class = transport.get('transport_class')
client = SolardClient(auth={'user': user, 'auth': auth},
transport_args=(host, port),
transport_class=transport_class)
return client
class SolardSyncTransport(SyncTransport, SolardTransport):
preffered_transport_name = 'solard'
def copy(self, resource, _from, _to, use_sudo=False):
log.debug("Solard copy: %s -> %s", _from, _to)
client = self.get_client(resource)
executor = lambda transport: client.copy(_from, _to, use_sudo)
executor = Executor(resource=resource,
executor=executor,
params=(_from, _to, use_sudo))
self.executors.append(executor)
class SolardRunTransport(RunTransport, SolardTransport):
preffered_transport_name = 'solard'
def get_result(self, result, failed=False):
return SolarRunResult(result, failed)
def run(self, resource, *args, **kwargs):
log.debug("Solard run: %s", args)
client = self.get_client(resource)
try:
res = client.run(' '.join(args), **kwargs)
return self.get_result(res, failed=False)
except Exception as ex:
log.exception("Exception during solard run")
return self.get_result(ex, failed=True)

View File

@ -20,6 +20,7 @@ from fabric.contrib import project as fabric_project
from solar.core.log import log
from solar.core.transports.base import RunTransport, SyncTransport, Executor
from solar.core.transports.base import SolarRunResult
class _SSHTransport(object):
@ -86,6 +87,15 @@ class SSHRunTransport(RunTransport, _SSHTransport):
preffered_transport_name = 'ssh'
def get_result(self, output):
"""
Needed for compatibility with other handlers / transports
"""
if output.failed:
return SolarRunResult(output, failed=True)
return SolarRunResult(output, failed=False)
def run(self, resource, *args, **kwargs):
log.debug('SSH: %s', args)
@ -109,4 +119,5 @@ class SSHRunTransport(RunTransport, _SSHTransport):
managers.append(fabric_api.warn_only())
with nested(*managers):
return executor(' '.join(args))
res = executor(' '.join(args))
return self.get_result(res)

1
solard/requirements.txt Normal file
View File

@ -0,0 +1 @@
msgpack-python>=0.4.6

48
solard/setup.py Normal file
View File

@ -0,0 +1,48 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License attached#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See then
# License for the specific language governing permissions and limitations
# under the License.
import os
from setuptools import find_packages
from setuptools import setup
def find_requires():
prj_root = os.path.dirname(os.path.realpath(__file__))
requirements = []
with open(u'{0}/requirements.txt'.format(prj_root), 'r') as reqs:
requirements = reqs.readlines()
return requirements
setup(
name='solard',
version='0.0.1',
description='Deployment tool daemon',
long_description="""Deployment tool daemon""",
classifiers=[
"Development Status :: 1 - Beta",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python",
"Programming Language :: Python :: 2.6",
"Programming Language :: Python :: 2.7",
"Topic :: System :: Software Distribution"],
author='Mirantis Inc.',
author_email='product@mirantis.com',
url='http://mirantis.com',
keywords='deployment',
packages=find_packages(),
zip_safe=False,
install_requires=find_requires(),
include_package_data=True
)

View File

108
solard/solard/client.py Normal file
View File

@ -0,0 +1,108 @@
import msgpack
import os
# TODO: handle errors
class SolardClient(object):
read_buffer = 4096
def __init__(self, auth, transport_args, transport_class=None):
if transport_class is None:
from solard.tcp_client import SolardTCPClient
transport_class = SolardTCPClient
self.auth = auth
self.sudo_transport = transport_class(*transport_args)
self.normal_transport = transport_class(*transport_args)
self.make_auth()
def make_auth(self):
self.normal_transport.auth = dict(self.auth)
self.sudo_transport.auth = dict(self.auth)
self.sudo_transport.auth['sudo'] = True
def run(self, *args, **kwargs):
if kwargs.get('use_sudo'):
transport = self.transport(use_sudo=True)
else:
transport = self.transport(use_sudo=False)
send = transport.send({'m': 'run', 'args': args, 'kwargs': kwargs})
resp = transport.resp()
return resp
def transport(self, use_sudo):
if use_sudo:
return self.sudo_transport
return self.normal_transport
def copy_directory(self, _from, _to, use_sudo=False):
# dir should open context on remote, and sync all files as one req/resp
to_cp_files = []
transport = self.transport(use_sudo)
for root, _, files in os.walk(_from):
for name in files:
single_from = os.path.join(root, name)
_to = os.path.join(root.replace(single_from, _to), name)
size = os.stat(single_from).st_size
to_cp_files.append((single_from, _to, size))
tos = [(x[1], x[2]) for x in to_cp_files]
total_size = sum((x[1] for x in tos))
data = {'m': 'copy_files',
'args': (tos, total_size),
's': True}
_ = transport.send(data)
transport.send_stream_start()
for _from, _to, _size in to_cp_files:
# sock = transport.send_stream_cont(add_size=_size)
sock = transport.send_stream_cont()
with open(_from, 'rb') as f:
while _size > 0:
data = f.read(self.read_buffer)
transport.send_stream_data(data)
_size -= len(data)
assert _size == 0 # maybe somehow below ?
transport.send_stream_end()
resp = transport.resp()
return resp
def copy_file(self, _from, _to, use_sudo=False):
transport = self.transport(use_sudo)
f_size = os.stat(_from).st_size
data = {'m': 'copy_file',
'args': (_to, f_size),
's': True}
_ = transport.send(data)
transport.send_stream_start(add_size=False)
to_read = f_size
with open(_from, 'rb') as f:
while to_read > 0:
data = f.read(self.read_buffer) # expose sendfile there
transport.send_stream_data(data)
to_read -= len(data)
assert to_read == 0
transport.send_stream_end()
return self.transport.resp()
def copy(self, _from, _to, use_sudo=False):
if os.path.isdir(_from):
resp = self.copy_directory(_from, _to, use_sudo)
else:
resp = self.copy_file(_from, _to, use_sudo)
return resp
if __name__ == '__main__':
import time
from solard.tcp_client import SolardTCPClient
c = SolardClient(auth={'user': 'vagrant', 'auth': 'password'}, transport_args=('10.0.0.3', 5555), transport_class=SolardTCPClient)
print c.run('hostname')
print c.run('whoami')
print c.run('whoami', use_sudo=True)
print c.copy('/vagrant/library', '/tmp')
# print c.copy('/tmp/a', '/tmp/bbb/b.%s' % (time.time()))
# print c.copy('/tmp/bbb', '/tmp/s/ccc%s' % (time.time()))

267
solard/solard/core.py Normal file
View File

@ -0,0 +1,267 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License attached#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See then
# License for the specific language governing permissions and limitations
# under the License.
import os
import random
import string
from contextlib import nested
from fabric import api as fabric_api
from subprocess import check_output
import shlex
from itertools import takewhile
from solard.logger import logger
# XXX: not used for now vvv
# def common_path(paths, sep=os.path.sep):
# paths = [x.split(sep) for x in paths]
# dirs = zip(*(p for p in paths))
# return [x[0] for x in takewhile(lambda x: all(n == x[0] for n in x[1:]), dirs)]
# class SolardContext(object):
# def __init__(self):
# self._dirs = {}
# self._files = {}
# def file(self, path):
# try:
# return self._files[path]
# except KeyError:
# if self.is_safe_file(path):
# cls = SolardSafeFile
# else:
# cls = SolardFile
# self._files[path] = f = cls(self, path)
# return f
# def dir(self, path):
# try:
# return self._dirs[path]
# except KeyError:
# self._dirs[path] = solard_dir = SolardDir(self, path)
# return solard_dir
# def is_safe_file(self, path):
# dirname = os.path.dirname(path)
# common = SolardContext.common_path(dirname, self._dirs.keys())
# if common not in ((), ('/', )):
# return False
# return True
# def is_safe_dir(self, path):
# common = SolardContext.common_path(path, self._dirs.keys())
# if common not in ((), ('/', )):
# return False
# return True
# @staticmethod
# def common_path(path, paths, sep=os.path.sep):
# all_paths = paths + [path]
# paths = [x.split(sep) for x in all_paths]
# dirs = zip(*(p for p in all_paths))
# return tuple(x[0] for x in takewhile(lambda x: all(n == x[0] for n in x[1:]), dirs))
# class SolardSafeFile(object):
# def __init__(self, context, target):
# self._f = None
# self._rnd = 'solar' + ''.join((random.choice(string.ascii_lowercase) for _ in xrange(6)))
# self._path = target
# self._safe_path = self._path + '_' + self._rnd
# def open(self):
# self._f = open(self._safe_path, 'wb')
# def write(self, data):
# return self._f.write(data)
# def close(self):
# self._f.close()
# def finish(self):
# self.close()
# os.rename(self._safe_path, self._path)
# class SolardFile(object):
# def __init__(self, context, target):
# self._f = None
# self._path = target
# def open(self):
# self._f = open(self._path, 'wb')
# def write(self, data):
# self._f.write(data)
# def close(self):
# self._f.close()
# def finish(self):
# self.close()
# class SolardSafeDir(object):
# def __init__(self, context, target):
# self._rnd = 'solar' + ''.join((random.choice(string.ascii_lowercase) for _ in xrange(6)))
# self._path = target
# self._safe_path = self._path + '_' + self._rnd
# def start(self):
# os.makedirs(self._safe_path)
# def finish(self):
# os.rename(self._safe_path, self._path)
# class SolardDir(object):
# def __init__(self, context, target):
# self._path = target
# def start(self):
# os.makedirs(self._path)
# def finish(self):
# pass
# XXX: not used for now ^^^
class SolardContext(object):
def __init__(self):
self.files = {}
def file(self, path):
try:
return self.files[path]
except KeyError:
self.files[path] = r = SolardFile(self, path)
return r
class SolardFile(object):
def __init__(self, context, target):
self.ctx = context
self._rnd = 'solar' + ''.join((random.choice(string.ascii_lowercase) for _ in xrange(6)))
self._path = target
self._f = None
self._safe_path = self._path + '_' + self._rnd
def open(self):
dirname = os.path.dirname(self._safe_path)
if not os.path.exists(dirname):
os.makedirs(dirname)
if self._f is None:
self._f = open(self._safe_path, 'wb')
def write(self, data):
self._f.write(data)
def finish(self):
self._f.close()
self._f = None
os.rename(self._safe_path, self._path)
class SolardIface(object):
@staticmethod
def run(solard_context, cmd, **kwargs):
# return check_output(shlex.split(cmd))
executor = fabric_api.local
# if kwargs.get('use_sudo', False):
# cmd = 'sudo ' + cmd
managers = []
cwd = kwargs.get('cwd')
if cwd:
managers.append(fabric_api.cd(kwargs['cwd']))
env = kwargs.get('env')
if env:
managers.append(fabric_api.shell_env(**kwargs['env']))
if kwargs.get('warn_only', False):
managers.append(fabric_api.warn_only())
with nested(*managers):
out = executor(cmd, capture=True)
if out.failed:
raise Exception("Remote failed")
return out.stdout
@staticmethod
def copy_file(solard_context, stream_reader, path, size=None):
f = SolardIface.file_start(solard_context, path)
rdr = stream_reader(size)
for data in rdr:
f.write(data)
SolardIface.file_end(solard_context, path)
return True
@staticmethod
def copy_files(solard_context, stream_reader, paths, total_size):
# total_size not used for now
for _to, _size in paths:
logger.debug("Starting %s size=%d", _to, _size)
f = SolardIface.file_start(solard_context, _to)
if _size > 0:
rdr = stream_reader(_size)
for data in rdr:
f.write(data)
SolardIface.file_end(solard_context, _to)
logger.debug("Done %s size=%d", _to, _size)
return True
# # TODO: not used YET fully
# @staticmethod
# def dir_start(solard_context, path):
# solard_dir = solard_context.dir(path)
# solard_dir.start()
# return solard_dir
# @staticmethod
# def dir_finish(solard_context, path):
# solard_dir = solard_context.dir(path)
# solard_dir.finish()
# return True
@staticmethod
def file_start(solard_context, path):
solard_file = solard_context.file(path)
solard_file.open()
return solard_file
@staticmethod
def file_put_data(solard_context, path, data):
solard_file = solard_context.file(path)
return solard_file.write(data)
@staticmethod
def file_end(solard_context, path):
solard_file = solard_context.file(path)
solard_file.finish()
return True

37
solard/solard/logger.py Executable file
View File

@ -0,0 +1,37 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License attached#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See then
# License for the specific language governing permissions and limitations
# under the License.
import logging
def __setup_logger():
logger = logging.getLogger("solard")
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(funcName)s (%(filename)s::%(lineno)s)::%(message)s')
stream = logging.StreamHandler()
stream.setLevel(logging.DEBUG)
stream.setFormatter(formatter)
logger.addHandler(stream)
return logger
__global_logger = None
def get_logger():
global __global_logger
if not __global_logger:
__global_logger = __setup_logger()
return __global_logger
logger = get_logger()

180
solard/solard/tcp_client.py Normal file
View File

@ -0,0 +1,180 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License attached#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See then
# License for the specific language governing permissions and limitations
# under the License.
import msgpack
import socket
import errno
import struct
from solard.tcp_core import *
CLIENT_BUFF = 4096
class ClientException(Exception):
pass
class ReadError(ClientException):
pass
class RemoteException(ClientException):
pass
class RemoteFailure(ClientException):
pass
class SolardTCPClient(object):
def __init__(self, host, port, **kwargs):
self.host = host
self.port = port
# self._connect_timeout = kwargs.get("connect_timeout", None)
self._socket_timeout = kwargs.get("socket_timeout", None)
self.sock = None
self.auth = None
self._streaming = False
def connect(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.settimeout(self._socket_timeout)
sock.connect((self.host, self.port))
except Exception:
sock.close()
raise
else:
self.sock = sock
if not self.initialize_with_auth():
self.sock = None
raise ClientException("Auth failed")
return sock
def initialize_with_auth(self):
self.send(self.auth)
resp = self.resp(close=False)
return resp
def disconnect(self):
sock = self.sock
try:
sock.shutdown(socket.SHUT_RDWR)
except Exception as e:
err_ = getattr(e, 'errno', None)
if err_ not in (errno.ENOTCONN, errno.EBADF):
raise
ret = sock.close()
self.sock = None
return ret
def send(self, data):
assert self._streaming is False
if self.sock is None:
self.connect()
_data = msgpack.packb(data)
size = len(_data)
hdr = struct.pack(HDR, size)
return self.sock.sendall(hdr + _data)
def send_stream_start(self, add_size=False):
assert self._streaming is False
self._streaming = True
if add_size is not False:
hdr = struct.pack(HDR, add_size)
self.sock.sendall(hdr)
return self.sock
def send_stream_cont(self, add_size=False):
assert self._streaming is True
if add_size is not False:
hdr = struct.pack(HDR, add_size)
self.sock.sendall(hdr)
return self.sock
def send_stream_end(self):
assert self._streaming is True
self._streaming = False
return self.sock
def send_stream_data(self, data):
assert self._streaming is True
self.sock.sendall(data) # TODO: expose sendfile easier
# self._streaming = False
def read(self):
sock = self.sock
d = sock.recv(HDR_SIZE)
if not len(d) == HDR_SIZE:
raise ReadError()
size = struct.unpack(HDR, d)[0]
d = []
while True:
b = min(size, CLIENT_BUFF)
curr = sock.recv(b)
d.append(curr)
size -= len(curr)
if not size:
break
return msgpack.unpackb(''.join(d))
def _resp_result_gen(self, data):
st = data['st']
if st == REPLY_GEN_OK: # OK
return data['res']
elif st == REPLY_GEN_END:
raise StopIteration()
else:
raise RemoteException(data)
def _resp_result_stream(self, data):
return data
def _resp_result(self, data):
st = data['st']
if st == REPLY_OK: # OK
return data['res']
elif st == REPLY_ERR:
raise RemoteException(data)
else:
raise RemoteFailure(data)
def _resp_gen(self, res, close):
try:
while True:
yield res
try:
res = self._resp_result_gen(self.read())
except StopIteration:
break
except Exception:
raise RemoteException(res)
finally:
if close:
self.disconnect()
def resp(self, close=True):
recv = self.read()
st = recv['st']
if REPLY_GEN_OK <= st <= REPLY_GEN_END:
return self._resp_gen(recv, close)
try:
res = self._resp_result(recv)
finally:
if close:
self.disconnect()
return res

27
solard/solard/tcp_core.py Normal file
View File

@ -0,0 +1,27 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License attached#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See then
# License for the specific language governing permissions and limitations
# under the License.
import struct
REPLY_OK = 2
REPLY_GEN_OK = 20
REPLY_GEN_END = 21
REPLY_FAIL = 0
REPLY_ERR = 1
HDR = "<I"
HDR_SIZE = struct.calcsize(HDR)
INT_DEFAULT_REPLY_TYPE = 0
INT_GENERATOR_REPLY_TYPE = 1

297
solard/solard/tcp_server.py Normal file
View File

@ -0,0 +1,297 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License attached#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See then
# License for the specific language governing permissions and limitations
# under the License.
# from gevent import monkey
# monkey.patch_all()
# from gevent.server import StreamServer
from SocketServer import ThreadingTCPServer, BaseRequestHandler
import socket
import msgpack
import struct
import errno
import sys
import traceback
import pwd
import os
from types import GeneratorType
from solard.logger import logger
from solard.core import SolardContext, SolardIface
from solard.tcp_core import *
SERVER_BUFF = 4096
class SolardTCPException(Exception):
pass
class ReadFailure(SolardTCPException):
pass
class SolardTCPHandler(object):
def __init__(self, sock, address):
self.sock = sock
self.address = address
self.ctx = SolardContext()
self.auth = None
self._wrote = False
self.forked = False
def _read(self):
# TODO: client closed connection
try:
size = struct.unpack(HDR, self.sock.recv(HDR_SIZE))[0]
except:
raise ReadFailure("Can't read header data")
d = []
while True:
b = min(size, SERVER_BUFF)
curr = self.sock.recv(b)
if not curr:
raise ReadFailure("No data")
d.append(curr)
size -= len(curr)
assert size >= 0
if not size:
break
self._wrote = False
try:
return msgpack.unpackb(''.join(d))
except:
raise
def _read_stream(self, size=None):
if size is None:
try:
size = struct.unpack(HDR, self.sock.recv(HDR_SIZE))[0]
except:
raise ReadFailure("Can't read header data")
while True:
b = min(size, SERVER_BUFF)
curr = self.sock.recv(b)
if not curr:
if size > 0:
raise ReadFailure("Expected more data")
size -= len(curr)
assert size >= 0
yield curr
if not size:
break
def _write(self, **kwargs):
assert self._wrote is False
_data = msgpack.packb(kwargs)
size = len(_data)
hdr = struct.pack(HDR, size)
self.sock.sendall(hdr + _data)
self._wrote = True
def _write_ok(self, res):
# logger.debug("Ok sent")
data = {'st': REPLY_OK, 'res': res}
self._write(**data)
def _write_ok_gen(self, res):
data = {'st': REPLY_GEN_OK, 'res': res}
self._write(**data)
# def _write_ok_stream(self, res):
# data = {'st': 30, 'res': res}
# self._write(**data)
# def _write_stream_data(self, data):
# self.sock.sendall(data)
def _write_gen_end(self):
data = {'st': REPLY_GEN_END, 'res': None}
self._write(**data)
def _write_failure(self, exception, reason, tb=""):
data = {'st': REPLY_FAIL, 'exception': exception, 'reason': reason, 'tb': tb}
self._write(**data)
def _write_err(self, error):
logger.info("Client error: %s" % error)
data = {'st': REPLY_ERR, 'error': error}
self._write(**data)
def make_auth(self):
# it's responsible for:
# - checking auth
# - forking if needed
auth_data = self._read()
if not auth_data:
self._write_ok(False)
return False
req_user = auth_data.get('user')
if not req_user:
self._write_ok(False)
return False
proc_user = pwd.getpwuid(os.getuid())[0]
logger.debug("Requested user %r", req_user)
# TODO:
# we may add there anything we want, checking in file etc
# for now it's just `password`
valid = auth_data.get('auth') == 'password'
if not valid:
self._write_ok(False)
return False
if req_user == proc_user:
self._write_ok(True)
return True
# TODO: very naive
if auth_data.get('sudo'):
self._write_ok(True)
return True
# fork there
child_pid = os.fork()
if child_pid == 0:
pw_uid = pwd.getpwnam(req_user).pw_uid
pw_gid = pwd.getpwuid(pw_uid).pw_gid
os.setgid(pw_gid)
os.setuid(pw_uid)
logger.debug("Child forked %d", os.getpid())
self._fix_env(pw_uid)
self.forked = True
self._write_ok(True)
return True
return None
def _fix_env(self, pw_uid):
pw_dir = pwd.getpwuid(pw_uid).pw_dir
os.environ['HOME'] = pw_dir
def process(self):
try:
known_type = INT_DEFAULT_REPLY_TYPE
input_data = self._read()
if not input_data:
return False
method = input_data['m']
meth = getattr(SolardIface, method)
is_stream = input_data.get('s', False)
logger.debug("Going to run %r", method)
if is_stream:
res = meth(self.ctx, self._read_stream, *input_data.get('args', ()), **input_data.get('kwargs', {}))
else:
res = meth(self.ctx, *input_data.get('args', ()), **input_data.get('kwargs', {}))
if isinstance(res, GeneratorType):
known_type = INT_GENERATOR_REPLY_TYPE
try:
for curr in res:
self._wrote = False
self._write_ok_gen(curr)
except:
raise
finally:
try:
self._wrote = False
self._write_gen_end()
except Exception: # ignore if eng gen couldn't be send
pass
self._wrote = True
else:
# if not input_data.get('empty_ok_resp', False):
self._write_ok(res)
except ReadFailure:
return False
except Exception as ex:
if self._wrote:
if known_type == INT_GENERATOR_REPLY_TYPE:
errno_ = getattr(ex, 'errno', None)
if errno_ in (errno.EPIPE, errno.ECONNRESET):
logger.debug(
"Client disconnected during generator based reply")
else:
logger.debug("Error during generator based reply")
raise
else:
logger.error("Already wrote data, but got exception")
raise
else:
logger.exception("Got exception")
self.handle_exception()
return True
def handle_exception(self):
exc_type, exc_value, exc_traceback = sys.exc_info()
tb = traceback.format_exception(exc_type, exc_value, exc_traceback)
reason = str(exc_value)
if not reason:
reason = exc_type.__name__
try:
self._write_failure(exc_type.__name__, reason, tb)
except:
logger.warn("Failure when sending error response")
raise
finally:
logger.exception("Got exception")
class SolardReqHandler(BaseRequestHandler):
def handle(self):
close = True
sock = self.request
address = self.client_address
h = SolardTCPHandler(sock, address)
try:
logger.debug("New from %s:%d" % address)
auth_state = h.make_auth()
if auth_state is False:
logger.debug("Failed auth")
return
if auth_state is None:
# child forked
close = False
return
while True:
if not h.process():
logger.debug("End from %s:%d" % address)
break
else:
logger.debug("Waiting for more from %s:%d" % address)
try:
sock.shutdown(socket.SHUT_RDWR)
except Exception:
pass
finally:
sock.close()
if h.forked:
# if forked we can safely exit now
os._exit(0)
class SolardTCPServer(ThreadingTCPServer):
allow_reuse_address = True
def __init__(self, *args, **kwargs):
# StreamServer.__init__(self, *args, **kwargs)
ThreadingTCPServer.__init__(self, *args, **kwargs)
if __name__ == '__main__':
s = SolardTCPServer(('0.0.0.0', 5555), SolardReqHandler)
s.serve_forever()