Merge pull request #250 from pigmej/torrent_transport

torrent transport
This commit is contained in:
Dmitry Shulyak 2015-10-13 15:54:21 +03:00
commit a85a66764d
12 changed files with 439 additions and 15 deletions

View File

@ -30,6 +30,8 @@
- python-virtualenv
# Required by packer
- build-essential
# for torrent transport
- python-libtorrent
# PIP
#- apt: name=python-pip state=absent

View File

@ -0,0 +1,25 @@
Example of using torrent transport with solar. Torrent is used to distribute task data. After fetching is finished torrent client forks and continues seeding.
The example contains single node with single host mapping + transports.
Execute:
```
python examples/torrent/example.py
solar changes stage
solar changes process
solar orch run-once last
```
Wait for finish:
```
solar orch report last -w 100
```
After this you should see new entry in `/etc/hosts` file.
* All created torrents are in `/vagrant/torrents`, it doesn't need to be shared
* Initial seeding is done using torrent file
* Downloading and then seeding is always done with magnetlinks

View File

@ -0,0 +1,74 @@
import time
from solar.core.resource import virtual_resource as vr
from solar import errors
from solar.interfaces.db import get_db
db = get_db()
def run():
db.clear()
node = vr.create('node', 'resources/ro_node', {'name': 'first' + str(time.time()),
'ip': '10.0.0.3',
'node_id': 'node1',
})[0]
transports = vr.create('transports_node1', 'resources/transports')[0]
ssh_transport = vr.create('ssh_transport', 'resources/transport_ssh',
{'ssh_key': '/vagrant/.vagrant/machines/solar-dev1/virtualbox/private_key',
'ssh_user': 'vagrant'})[0]
transports.connect(node, {})
# it uses reverse mappings
ssh_transport.connect(transports, {'ssh_key': 'transports:key',
'ssh_user': 'transports:user',
'ssh_port': 'transports:port',
'name': 'transports:name'})
hosts = vr.create('hosts_file', 'resources/hosts_file', {})[0]
# let's add torrent transport for hosts file deployment (useless in real life)
torrent_transport = vr.create('torrent_transport',
'resources/transport_torrent',
{'trackers': ['udp://open.demonii.com:1337',
'udp://tracker.openbittorrent.com:80']})[0]
# you could use any trackers as you want
transports_for_torrent = vr.create(
'transports_for_torrent', 'resources/transports')[0]
transports_for_torrent.connect(torrent_transport, {})
ssh_transport.connect_with_events(transports_for_torrent, {'ssh_key': 'transports:key',
'ssh_user': 'transports:user',
'ssh_port': 'transports:port',
'name': 'transports:name'},
events={})
transports_for_hosts = vr.create(
'transports_for_hosts', 'resources/transports')[0]
torrent_transport.connect(transports_for_hosts, {'trackers': 'transports:trackers',
'name': 'transports:name'})
ssh_transport.connect(transports_for_hosts, {'ssh_key': 'transports:key',
'ssh_user': 'transports:user',
'ssh_port': 'transports:port',
'name': 'transports:name'})
transports_for_hosts.connect(hosts)
transports_for_hosts.connect_with_events(node, events={})
node.connect(hosts, {
'ip': 'hosts:ip',
'name': 'hosts:name'
})
run()

View File

@ -0,0 +1,9 @@
- hosts: [{{ host }}]
sudo: yes
tasks:
- apt:
name: python-libtorrent
state: present
- copy:
src: {{scripts_dir}}/solar_torrent.py
dest: /var/tmp/solar_torrent.py

View File

@ -0,0 +1,18 @@
id: transport_torrent
handler: ansible
input:
trackers:
schema: [str!]
value: []
name:
schema: str!
value: torrent
location_id:
schema: str
value:
reverse: True
is_own: False
transports_id:
schema: str
value:
is_emit: False

View File

@ -0,0 +1 @@
../../../solar/solar/core/transports/helpers/solar_torrent.py

View File

@ -1,7 +1,7 @@
id: transports
input:
transports:
schema: [{user: str, password: str, port: int!, key: str, name: str!}]
schema: [{user: str, password: str, port: int!, key: str, name: str!, trackers: [str]}]
value: []
transports_id:
schema: str!

View File

@ -83,16 +83,23 @@ class SolarTransport(object):
pass
def get_transport_data(self, resource, name=None):
key = '_used_transport_%s' % self._mode
# TODO: naive object local cache
try:
transport = resource._used_transport
transport = getattr(resource, key)
except AttributeError:
if name is None:
name = self.preffered_transport_name
transport = next(x for x in resource.transports() if x['name'] == name)
setattr(resource, '_used_transport', transport)
setattr(resource, key, transport)
return transport
def other(self, resource):
return self._other
def bind_with(self, other):
self._other = other
class SyncTransport(SolarTransport):
"""
@ -106,11 +113,6 @@ class SyncTransport(SolarTransport):
super(SyncTransport, self).__init__()
self.executors = []
def bind_with(self, other):
# we migth add there something later
# like compat checking etc
self.other = other
def copy(self, resource, *args, **kwargs):
pass
@ -157,11 +159,6 @@ class RunTransport(SolarTransport):
def get_result(self, *args, **kwargs):
raise NotImplementedError()
def bind_with(self, other):
# we migth add there something later
# like compat checking etc
self.other = other
def run(self, resource, *args, **kwargs):
pass

View File

@ -2,8 +2,10 @@ from solar.core.transports.base import SyncTransport, RunTransport, SolarTranspo
from solar.core.transports.ssh import SSHSyncTransport, SSHRunTransport
from solar.core.transports.rsync import RsyncSyncTransport
from solar.core.transports.solard_transport import SolardRunTransport, SolardSyncTransport
from solar.core.transports.torrent import TorrentSyncTransport
KNOWN_SYNC_TRANSPORTS = {
'torrent': TorrentSyncTransport,
'solard': SolardSyncTransport,
'rsync': RsyncSyncTransport,
'ssh': SSHSyncTransport
@ -50,9 +52,10 @@ class BatTransport(SolarTransport):
if not selected:
raise Exception("No valid transport found")
instance = self._bat_transports[selected['name']]()
setattr(resource, '_used_transport', selected)
setattr(resource, '_used_transport_%s' % instance._mode, selected)
setattr(resource, key_name, instance)
self._used_transports.append(instance)
instance.bind_with(self._other_remember)
return instance
# return self._bat_transports[selected['name']]
@ -60,11 +63,14 @@ class BatTransport(SolarTransport):
self.select_valid_transport(resource)
return super(BatTransport, self).get_transport_data(resource, *args, **kwargs)
def bind_with(self, other):
self._other_remember = other
class BatSyncTransport(SyncTransport, BatTransport):
preffered_transport_name = None
_order = ('solard', 'rsync', 'ssh')
_order = ('torrent', 'solard', 'rsync', 'ssh')
_bat_transports = KNOWN_SYNC_TRANSPORTS
def __init__(self, *args, **kwargs):

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,173 @@
# TODO: change to something less naive
#
import libtorrent as lt
from operator import attrgetter
import time
import sys
import os
state_str = ['queued', 'checking', 'downloading metadata', \
'downloading', 'finished', 'seeding', 'allocating', 'checking fastresume']
class MultiTorrent(object):
def __init__(self, torrents, ses):
self.torrents = torrents
self.ses = ses
def force_reannounce(self):
for torrent in self.torrents:
torrent.force_reannounce()
@property
def is_seeding(self):
for torrent in self.torrents:
status = torrent.status()
if state_str[status.state] != 'seeding':
return False
return True
@property
def progress(self):
total_progress = map(attrgetter('progress'), map(lambda x: x.status(), self.torrents))
return sum(total_progress) / len(total_progress)
def numbers(self):
seeding = 0
downloading = 0
for torrent in self.torrents:
if torrent.status().is_seeding:
seeding += 1
else:
downloading += 1
return seeding, downloading
def init_session(args, seed=False):
ses = lt.session()
all_torrents = []
for save_path, magnet_or_path in args:
if os.path.exists(magnet_or_path):
e = lt.bdecode(open(magnet_or_path, 'rb').read())
info = lt.torrent_info(e)
params = { 'save_path': save_path,
'storage_mode': lt.storage_mode_t.storage_mode_sparse,
'ti': info,
'seed_mode': seed}
h = ses.add_torrent(params)
else:
h = ses.add_torrent({
'save_path': save_path,
'storage_mode': lt.storage_mode_t.storage_mode_sparse,
'url': magnet_or_path,
'seed_mode': seed})
all_torrents.append(h)
return ses, all_torrents
def _daemonize():
# should be true daemonize
new_pid = os.fork()
if new_pid > 0:
# first
sys.exit(0)
os.setsid()
new_pid2 = os.fork()
if new_pid2 > 0:
sys.exit(0)
stdin = file(os.devnull, 'r')
stdout = file(os.devnull, 'a+')
stderr = file(os.devnull, 'a+', 0)
os.dup2(stdin.fileno(), sys.stdin.fileno())
os.dup2(stdout.fileno(), sys.stdout.fileno())
os.dup2(stderr.fileno(), sys.stderr.fileno())
def _seeder(torrents, save_path='.', max_seed_ratio=5):
_daemonize()
no_peers = 120
max_alive = 5 * 60
ses, all_torrents = init_session(torrents, seed=True)
mt = MultiTorrent(all_torrents, ses)
end = time.time() + max_alive
peers_0 = time.time()
i = 0
while not time.time() > end:
now = time.time()
i += 1
# if i % 10 == 0 and i != 0:
# mt.force_reannounce()
s = ses.status()
# if not mt.is_seeding:
# sys.exit("Was seeder mode but not seeding")
if peers_0 < now - no_peers:
sys.exit("No peers for %d seconds exiting" % no_peers)
if i % 5 == 0:
print "%.2f%% up=%.1f kB/s peers=%s total_upload_B=%.1f" \
% (mt.progress * 100,
s.upload_rate / 1000,
s.num_peers,
(s.total_upload))
if s.num_peers != 0:
peers_0 = now
sys.stdout.flush()
time.sleep(1)
else:
print 'Seed timeout exiting'
sys.exit(0)
def _getter(torrents, max_seed_ratio=3):
ses = lt.session()
ses.listen_on(6881, 6981)
max_no_changes = 1 * 60
ses, all_torrents = init_session(torrents)
mt = MultiTorrent(all_torrents, ses)
i = 0
last_state = (time.time(), None)
while (not mt.is_seeding):
i += 1
# if i % 10 == 0 and i != 0:
# mt.force_reannounce()
s = ses.status()
if i % 5 == 0:
print '%.2f%% complete (down: %.1f kb/s up: %.1f kB/s peers: %d) %s' % \
(mt.progress * 100, s.download_rate / 1000, s.upload_rate / 1000, \
s.num_peers, mt.numbers())
now = time.time()
current_state = (now, mt.progress)
if current_state[-1] != last_state[-1]:
last_state = current_state
if last_state[0] < now - max_no_changes:
sys.exit("Failed to fetch torrents in %ds" % max_no_changes)
time.sleep(0.5)
if mt.progress == 1:
# ok
# torrent lib dislikes forks there
from subprocess import check_output
import sys
args = sys.argv[:]
args[-2] = 's'
args.insert(0, sys.executable)
print "Entering seeder mode"
check_output(args, shell=False)
else:
# err
sys.exit(1)
if __name__ == '__main__':
mode = sys.argv[1]
torrents = sys.argv[2]
torrents = [x.split('|') for x in torrents.split(';')]
print repr(torrents)
if mode == 'g':
_getter(torrents, *sys.argv[3:])
elif mode == 's':
_seeder(torrents, *sys.argv[3:])
else:
sys.exit("`s` or `g` needed")

View File

@ -0,0 +1,118 @@
from solar.core.log import log
from solar.core.transports.ssh import (SSHSyncTransport,
SSHRunTransport)
from solar.core.transports.base import SyncTransport, Executor
import errno
from collections import defaultdict
from operator import attrgetter, itemgetter
import libtorrent as lt
import os
from uuid import uuid4
class TorrentSyncTransport(SyncTransport):
def __init__(self):
super(TorrentSyncTransport, self).__init__()
# we need some non torrent based sync transfer to upload client
self._sync_helper = SSHSyncTransport()
self._torrents = []
self._sudo_torrents = []
self._torrent_path = '/vagrant/torrents'
def bind_with(self, other):
self._sync_helper.bind_with(other)
super(TorrentSyncTransport, self).bind_with(other)
def copy(self, resource, _from, _to, use_sudo=False):
log.debug("TORRENT: %s -> %s", _from, _to)
executor = Executor(resource=resource,
executor=None,
params=(_from, _to, use_sudo))
self.executors.append(executor)
def _create_single_torrent(self, resource, _from, _to, use_sudo):
fs = lt.file_storage()
lt.add_files(fs, _from)
self._create_torrent(resource, fs, _from)
def _create_torrent_name(self):
return os.path.join(self._torrent_path, uuid4().hex + '.torrent')
def _create_torrent(self, resource, fs, root='.', use_sudo=False):
t = lt.create_torrent(fs)
transports = resource.transports()
torrent_transport = next((x for x in transports if x['name'] == 'torrent'))
trackers = torrent_transport['trackers']
for tracker in trackers:
t.add_tracker(tracker)
lt.set_piece_hashes(t, os.path.join(root, '..'))
torrent = t.generate()
torrent['priv'] = True # private torrent, no DHT, only trackers
name = self._create_torrent_name()
try:
# not checking for path existence
with open(name, 'wb') as f:
f.write(lt.bencode(torrent))
except IOError as e:
if e.errno != errno.ENOENT:
raise
os.makedirs(self._torrent_path)
with open(name, 'wb') as f:
f.write(lt.bencode(torrent))
log.debug("Created torrent file %s", name)
magnet_uri = lt.make_magnet_uri(lt.torrent_info(name))
# self._torrents[root] = (name, magnet_uri)
if not use_sudo:
self._torrents.append((name, magnet_uri, root))
else:
self._sudo_torrents.append((name, magnet_uri, root))
return name
def _start_seeding(self):
# XXX: naive naive naive
# we don't need use sudo there for now
from fabric import api as fabric_api
torrents = self._torrents + self._sudo_torrents
to_seed = ["%s|%s" % (os.path.abspath(os.path.join(x[2], '..')), x[0]) for x in torrents]
seed_args = ';'.join(to_seed)
# TODO: 'g' is just for debug, it should be 's', remove when sure
cmd = ['/usr/bin/python',
'/vagrant/solar/solar/core/transports/helpers/solar_torrent.py',
'g',
'"%s"' % seed_args]
log.debug("Will start seeding: %r" % ' '.join(cmd))
fabric_api.local(' '.join(cmd))
log.debug("Torrent seeding started")
def _start_remote_fetch(self, resource, use_sudo):
# later we will send solar_torrent with other sync tranport,
# or remote will have solar_torrent installed somehow
if use_sudo is False:
torrents = self._torrents
else:
torrents = self._sudo_torrents
to_get = ["%s|%s" % (os.path.abspath(os.path.join(x[2], '..')), x[1]) for x in torrents]
get_args = ';'.join(to_get)
cmd = ['/usr/bin/python',
'/var/tmp/solar_torrent.py',
'g',
'"%s"' % get_args]
self.other(resource).run(resource, *cmd, use_sudo=use_sudo)
def preprocess(self, executor):
_from, _to, use_sudo = executor.params
self._create_single_torrent(executor.resource, _from, _to, use_sudo)
def run_all(self):
self._start_seeding()
resource = self.executors[0].resource
# TODO: we should paralelize it
if self._torrents:
self._start_remote_fetch(resource, use_sudo=False)
if self._sudo_torrents:
self._start_remote_fetch(resource, use_sudo=True)