diff --git a/bootstrap/playbooks/tasks/base.yaml b/bootstrap/playbooks/tasks/base.yaml index cf2ffe9b..37cca252 100644 --- a/bootstrap/playbooks/tasks/base.yaml +++ b/bootstrap/playbooks/tasks/base.yaml @@ -30,6 +30,8 @@ - python-virtualenv # Required by packer - build-essential + # for torrent transport + - python-libtorrent # PIP #- apt: name=python-pip state=absent diff --git a/examples/torrent/README.md b/examples/torrent/README.md new file mode 100644 index 00000000..13b6a338 --- /dev/null +++ b/examples/torrent/README.md @@ -0,0 +1,25 @@ +Example of using torrent transport with solar. Torrent is used to distribute task data. After fetching is finished torrent client forks and continues seeding. + + +The example contains single node with single host mapping + transports. + +Execute: +``` +python examples/torrent/example.py +solar changes stage +solar changes process +solar orch run-once last +``` + +Wait for finish: + +``` +solar orch report last -w 100 +``` + +After this you should see new entry in `/etc/hosts` file. + + +* All created torrents are in `/vagrant/torrents`, it doesn't need to be shared +* Initial seeding is done using torrent file +* Downloading and then seeding is always done with magnetlinks diff --git a/examples/torrent/example.py b/examples/torrent/example.py new file mode 100644 index 00000000..3719105e --- /dev/null +++ b/examples/torrent/example.py @@ -0,0 +1,74 @@ +import time + +from solar.core.resource import virtual_resource as vr +from solar import errors + +from solar.interfaces.db import get_db + + +db = get_db() + + +def run(): + db.clear() + + node = vr.create('node', 'resources/ro_node', {'name': 'first' + str(time.time()), + 'ip': '10.0.0.3', + 'node_id': 'node1', + })[0] + + transports = vr.create('transports_node1', 'resources/transports')[0] + + ssh_transport = vr.create('ssh_transport', 'resources/transport_ssh', + {'ssh_key': '/vagrant/.vagrant/machines/solar-dev1/virtualbox/private_key', + 'ssh_user': 'vagrant'})[0] + + transports.connect(node, {}) + + # it uses reverse mappings + ssh_transport.connect(transports, {'ssh_key': 'transports:key', + 'ssh_user': 'transports:user', + 'ssh_port': 'transports:port', + 'name': 'transports:name'}) + + hosts = vr.create('hosts_file', 'resources/hosts_file', {})[0] + + # let's add torrent transport for hosts file deployment (useless in real life) + + torrent_transport = vr.create('torrent_transport', + 'resources/transport_torrent', + {'trackers': ['udp://open.demonii.com:1337', + 'udp://tracker.openbittorrent.com:80']})[0] + # you could use any trackers as you want + + transports_for_torrent = vr.create( + 'transports_for_torrent', 'resources/transports')[0] + + transports_for_torrent.connect(torrent_transport, {}) + + ssh_transport.connect_with_events(transports_for_torrent, {'ssh_key': 'transports:key', + 'ssh_user': 'transports:user', + 'ssh_port': 'transports:port', + 'name': 'transports:name'}, + events={}) + + transports_for_hosts = vr.create( + 'transports_for_hosts', 'resources/transports')[0] + + torrent_transport.connect(transports_for_hosts, {'trackers': 'transports:trackers', + 'name': 'transports:name'}) + + ssh_transport.connect(transports_for_hosts, {'ssh_key': 'transports:key', + 'ssh_user': 'transports:user', + 'ssh_port': 'transports:port', + 'name': 'transports:name'}) + + transports_for_hosts.connect(hosts) + transports_for_hosts.connect_with_events(node, events={}) + + node.connect(hosts, { + 'ip': 'hosts:ip', + 'name': 'hosts:name' + }) + +run() diff --git a/resources/transport_torrent/actions/run.yaml b/resources/transport_torrent/actions/run.yaml new file mode 100644 index 00000000..76b00830 --- /dev/null +++ b/resources/transport_torrent/actions/run.yaml @@ -0,0 +1,9 @@ +- hosts: [{{ host }}] + sudo: yes + tasks: + - apt: + name: python-libtorrent + state: present + - copy: + src: {{scripts_dir}}/solar_torrent.py + dest: /var/tmp/solar_torrent.py diff --git a/resources/transport_torrent/meta.yaml b/resources/transport_torrent/meta.yaml new file mode 100644 index 00000000..217a5442 --- /dev/null +++ b/resources/transport_torrent/meta.yaml @@ -0,0 +1,18 @@ +id: transport_torrent +handler: ansible +input: + trackers: + schema: [str!] + value: [] + name: + schema: str! + value: torrent + location_id: + schema: str + value: + reverse: True + is_own: False + transports_id: + schema: str + value: + is_emit: False diff --git a/resources/transport_torrent/scripts/solar_torrent.py b/resources/transport_torrent/scripts/solar_torrent.py new file mode 120000 index 00000000..0f59be0e --- /dev/null +++ b/resources/transport_torrent/scripts/solar_torrent.py @@ -0,0 +1 @@ +../../../solar/solar/core/transports/helpers/solar_torrent.py \ No newline at end of file diff --git a/resources/transports/meta.yaml b/resources/transports/meta.yaml index d73bb018..a691faed 100644 --- a/resources/transports/meta.yaml +++ b/resources/transports/meta.yaml @@ -1,7 +1,7 @@ id: transports input: transports: - schema: [{user: str, password: str, port: int!, key: str, name: str!}] + schema: [{user: str, password: str, port: int!, key: str, name: str!, trackers: [str]}] value: [] transports_id: schema: str! diff --git a/solar/solar/core/transports/base.py b/solar/solar/core/transports/base.py index 52bbec9a..90016ff6 100644 --- a/solar/solar/core/transports/base.py +++ b/solar/solar/core/transports/base.py @@ -83,16 +83,23 @@ class SolarTransport(object): pass def get_transport_data(self, resource, name=None): + key = '_used_transport_%s' % self._mode # TODO: naive object local cache try: - transport = resource._used_transport + transport = getattr(resource, key) except AttributeError: if name is None: name = self.preffered_transport_name transport = next(x for x in resource.transports() if x['name'] == name) - setattr(resource, '_used_transport', transport) + setattr(resource, key, transport) return transport + def other(self, resource): + return self._other + + def bind_with(self, other): + self._other = other + class SyncTransport(SolarTransport): """ @@ -106,11 +113,6 @@ class SyncTransport(SolarTransport): super(SyncTransport, self).__init__() self.executors = [] - def bind_with(self, other): - # we migth add there something later - # like compat checking etc - self.other = other - def copy(self, resource, *args, **kwargs): pass @@ -157,11 +159,6 @@ class RunTransport(SolarTransport): def get_result(self, *args, **kwargs): raise NotImplementedError() - def bind_with(self, other): - # we migth add there something later - # like compat checking etc - self.other = other - def run(self, resource, *args, **kwargs): pass diff --git a/solar/solar/core/transports/bat.py b/solar/solar/core/transports/bat.py index b4c32b12..56624a15 100644 --- a/solar/solar/core/transports/bat.py +++ b/solar/solar/core/transports/bat.py @@ -2,8 +2,10 @@ from solar.core.transports.base import SyncTransport, RunTransport, SolarTranspo from solar.core.transports.ssh import SSHSyncTransport, SSHRunTransport from solar.core.transports.rsync import RsyncSyncTransport from solar.core.transports.solard_transport import SolardRunTransport, SolardSyncTransport +from solar.core.transports.torrent import TorrentSyncTransport KNOWN_SYNC_TRANSPORTS = { + 'torrent': TorrentSyncTransport, 'solard': SolardSyncTransport, 'rsync': RsyncSyncTransport, 'ssh': SSHSyncTransport @@ -50,9 +52,10 @@ class BatTransport(SolarTransport): if not selected: raise Exception("No valid transport found") instance = self._bat_transports[selected['name']]() - setattr(resource, '_used_transport', selected) + setattr(resource, '_used_transport_%s' % instance._mode, selected) setattr(resource, key_name, instance) self._used_transports.append(instance) + instance.bind_with(self._other_remember) return instance # return self._bat_transports[selected['name']] @@ -60,11 +63,14 @@ class BatTransport(SolarTransport): self.select_valid_transport(resource) return super(BatTransport, self).get_transport_data(resource, *args, **kwargs) + def bind_with(self, other): + self._other_remember = other + class BatSyncTransport(SyncTransport, BatTransport): preffered_transport_name = None - _order = ('solard', 'rsync', 'ssh') + _order = ('torrent', 'solard', 'rsync', 'ssh') _bat_transports = KNOWN_SYNC_TRANSPORTS def __init__(self, *args, **kwargs): diff --git a/solar/solar/core/transports/helpers/__init__.py b/solar/solar/core/transports/helpers/__init__.py new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/solar/solar/core/transports/helpers/__init__.py @@ -0,0 +1 @@ + diff --git a/solar/solar/core/transports/helpers/solar_torrent.py b/solar/solar/core/transports/helpers/solar_torrent.py new file mode 100644 index 00000000..2f00f2ad --- /dev/null +++ b/solar/solar/core/transports/helpers/solar_torrent.py @@ -0,0 +1,173 @@ +# TODO: change to something less naive +# + +import libtorrent as lt +from operator import attrgetter +import time +import sys +import os + +state_str = ['queued', 'checking', 'downloading metadata', \ + 'downloading', 'finished', 'seeding', 'allocating', 'checking fastresume'] + + +class MultiTorrent(object): + + def __init__(self, torrents, ses): + self.torrents = torrents + self.ses = ses + + def force_reannounce(self): + for torrent in self.torrents: + torrent.force_reannounce() + + @property + def is_seeding(self): + for torrent in self.torrents: + status = torrent.status() + if state_str[status.state] != 'seeding': + return False + return True + + @property + def progress(self): + total_progress = map(attrgetter('progress'), map(lambda x: x.status(), self.torrents)) + return sum(total_progress) / len(total_progress) + + def numbers(self): + seeding = 0 + downloading = 0 + for torrent in self.torrents: + if torrent.status().is_seeding: + seeding += 1 + else: + downloading += 1 + return seeding, downloading + + +def init_session(args, seed=False): + ses = lt.session() + all_torrents = [] + for save_path, magnet_or_path in args: + if os.path.exists(magnet_or_path): + e = lt.bdecode(open(magnet_or_path, 'rb').read()) + info = lt.torrent_info(e) + params = { 'save_path': save_path, + 'storage_mode': lt.storage_mode_t.storage_mode_sparse, + 'ti': info, + 'seed_mode': seed} + h = ses.add_torrent(params) + else: + h = ses.add_torrent({ + 'save_path': save_path, + 'storage_mode': lt.storage_mode_t.storage_mode_sparse, + 'url': magnet_or_path, + 'seed_mode': seed}) + all_torrents.append(h) + return ses, all_torrents + + +def _daemonize(): + # should be true daemonize + new_pid = os.fork() + if new_pid > 0: + # first + sys.exit(0) + os.setsid() + new_pid2 = os.fork() + if new_pid2 > 0: + sys.exit(0) + stdin = file(os.devnull, 'r') + stdout = file(os.devnull, 'a+') + stderr = file(os.devnull, 'a+', 0) + os.dup2(stdin.fileno(), sys.stdin.fileno()) + os.dup2(stdout.fileno(), sys.stdout.fileno()) + os.dup2(stderr.fileno(), sys.stderr.fileno()) + + +def _seeder(torrents, save_path='.', max_seed_ratio=5): + _daemonize() + no_peers = 120 + max_alive = 5 * 60 + ses, all_torrents = init_session(torrents, seed=True) + mt = MultiTorrent(all_torrents, ses) + end = time.time() + max_alive + peers_0 = time.time() + i = 0 + while not time.time() > end: + now = time.time() + i += 1 + # if i % 10 == 0 and i != 0: + # mt.force_reannounce() + s = ses.status() + # if not mt.is_seeding: + # sys.exit("Was seeder mode but not seeding") + if peers_0 < now - no_peers: + sys.exit("No peers for %d seconds exiting" % no_peers) + if i % 5 == 0: + print "%.2f%% up=%.1f kB/s peers=%s total_upload_B=%.1f" \ + % (mt.progress * 100, + s.upload_rate / 1000, + s.num_peers, + (s.total_upload)) + if s.num_peers != 0: + peers_0 = now + sys.stdout.flush() + time.sleep(1) + else: + print 'Seed timeout exiting' + sys.exit(0) + + + +def _getter(torrents, max_seed_ratio=3): + ses = lt.session() + ses.listen_on(6881, 6981) + max_no_changes = 1 * 60 + ses, all_torrents = init_session(torrents) + + mt = MultiTorrent(all_torrents, ses) + + i = 0 + last_state = (time.time(), None) + while (not mt.is_seeding): + i += 1 + # if i % 10 == 0 and i != 0: + # mt.force_reannounce() + s = ses.status() + if i % 5 == 0: + print '%.2f%% complete (down: %.1f kb/s up: %.1f kB/s peers: %d) %s' % \ + (mt.progress * 100, s.download_rate / 1000, s.upload_rate / 1000, \ + s.num_peers, mt.numbers()) + now = time.time() + current_state = (now, mt.progress) + if current_state[-1] != last_state[-1]: + last_state = current_state + if last_state[0] < now - max_no_changes: + sys.exit("Failed to fetch torrents in %ds" % max_no_changes) + time.sleep(0.5) + if mt.progress == 1: + # ok + # torrent lib dislikes forks there + from subprocess import check_output + import sys + args = sys.argv[:] + args[-2] = 's' + args.insert(0, sys.executable) + print "Entering seeder mode" + check_output(args, shell=False) + else: + # err + sys.exit(1) + +if __name__ == '__main__': + mode = sys.argv[1] + torrents = sys.argv[2] + torrents = [x.split('|') for x in torrents.split(';')] + print repr(torrents) + if mode == 'g': + _getter(torrents, *sys.argv[3:]) + elif mode == 's': + _seeder(torrents, *sys.argv[3:]) + else: + sys.exit("`s` or `g` needed") diff --git a/solar/solar/core/transports/torrent.py b/solar/solar/core/transports/torrent.py new file mode 100644 index 00000000..b27ab425 --- /dev/null +++ b/solar/solar/core/transports/torrent.py @@ -0,0 +1,118 @@ +from solar.core.log import log +from solar.core.transports.ssh import (SSHSyncTransport, + SSHRunTransport) +from solar.core.transports.base import SyncTransport, Executor + +import errno +from collections import defaultdict +from operator import attrgetter, itemgetter + +import libtorrent as lt +import os +from uuid import uuid4 + + +class TorrentSyncTransport(SyncTransport): + + def __init__(self): + super(TorrentSyncTransport, self).__init__() + # we need some non torrent based sync transfer to upload client + self._sync_helper = SSHSyncTransport() + self._torrents = [] + self._sudo_torrents = [] + self._torrent_path = '/vagrant/torrents' + + def bind_with(self, other): + self._sync_helper.bind_with(other) + super(TorrentSyncTransport, self).bind_with(other) + + def copy(self, resource, _from, _to, use_sudo=False): + log.debug("TORRENT: %s -> %s", _from, _to) + + executor = Executor(resource=resource, + executor=None, + params=(_from, _to, use_sudo)) + self.executors.append(executor) + + def _create_single_torrent(self, resource, _from, _to, use_sudo): + fs = lt.file_storage() + lt.add_files(fs, _from) + self._create_torrent(resource, fs, _from) + + def _create_torrent_name(self): + return os.path.join(self._torrent_path, uuid4().hex + '.torrent') + + def _create_torrent(self, resource, fs, root='.', use_sudo=False): + t = lt.create_torrent(fs) + transports = resource.transports() + torrent_transport = next((x for x in transports if x['name'] == 'torrent')) + trackers = torrent_transport['trackers'] + for tracker in trackers: + t.add_tracker(tracker) + lt.set_piece_hashes(t, os.path.join(root, '..')) + torrent = t.generate() + torrent['priv'] = True # private torrent, no DHT, only trackers + name = self._create_torrent_name() + try: + # not checking for path existence + with open(name, 'wb') as f: + f.write(lt.bencode(torrent)) + except IOError as e: + if e.errno != errno.ENOENT: + raise + os.makedirs(self._torrent_path) + with open(name, 'wb') as f: + f.write(lt.bencode(torrent)) + log.debug("Created torrent file %s", name) + magnet_uri = lt.make_magnet_uri(lt.torrent_info(name)) + # self._torrents[root] = (name, magnet_uri) + if not use_sudo: + self._torrents.append((name, magnet_uri, root)) + else: + self._sudo_torrents.append((name, magnet_uri, root)) + return name + + def _start_seeding(self): + # XXX: naive naive naive + # we don't need use sudo there for now + from fabric import api as fabric_api + torrents = self._torrents + self._sudo_torrents + to_seed = ["%s|%s" % (os.path.abspath(os.path.join(x[2], '..')), x[0]) for x in torrents] + seed_args = ';'.join(to_seed) + # TODO: 'g' is just for debug, it should be 's', remove when sure + cmd = ['/usr/bin/python', + '/vagrant/solar/solar/core/transports/helpers/solar_torrent.py', + 'g', + '"%s"' % seed_args] + log.debug("Will start seeding: %r" % ' '.join(cmd)) + fabric_api.local(' '.join(cmd)) + log.debug("Torrent seeding started") + + def _start_remote_fetch(self, resource, use_sudo): + # later we will send solar_torrent with other sync tranport, + # or remote will have solar_torrent installed somehow + if use_sudo is False: + torrents = self._torrents + else: + torrents = self._sudo_torrents + to_get = ["%s|%s" % (os.path.abspath(os.path.join(x[2], '..')), x[1]) for x in torrents] + get_args = ';'.join(to_get) + cmd = ['/usr/bin/python', + '/var/tmp/solar_torrent.py', + 'g', + '"%s"' % get_args] + self.other(resource).run(resource, *cmd, use_sudo=use_sudo) + + def preprocess(self, executor): + _from, _to, use_sudo = executor.params + self._create_single_torrent(executor.resource, _from, _to, use_sudo) + + def run_all(self): + self._start_seeding() + resource = self.executors[0].resource + # TODO: we should paralelize it + if self._torrents: + self._start_remote_fetch(resource, use_sudo=False) + if self._sudo_torrents: + self._start_remote_fetch(resource, use_sudo=True) +