diff --git a/scalpels/agents/base.py b/scalpels/agents/base.py index 0cb4d75..dab8683 100644 --- a/scalpels/agents/base.py +++ b/scalpels/agents/base.py @@ -10,7 +10,7 @@ def run_agent(task_uuid, ag): """ python /agent.py mysql """ - cmd = "python %s/agent.py %s %s" % (data_dir, task_uuid, ag) + cmd = "sca-tracer %s %s" % (task_uuid, ag) ag = subprocess.Popen(cmd.split()) return ag.pid diff --git a/scalpels/cmd/agent.py b/scalpels/cmd/agent.py index 0723925..5d0195c 100644 --- a/scalpels/cmd/agent.py +++ b/scalpels/cmd/agent.py @@ -5,11 +5,9 @@ from scalpels.agents.server import server def main(): - try: - server.start() - server.wait() - except KeyboardInterrupt: - server.stop() + # TODO handle stop later + server.start() + server.wait() if __name__ == "__main__": main() diff --git a/scalpels/cmd/tracer.py b/scalpels/cmd/tracer.py index eb7a8b8..3dbbea4 100644 --- a/scalpels/cmd/tracer.py +++ b/scalpels/cmd/tracer.py @@ -2,8 +2,67 @@ #-*- coding:utf-8 -*- # Author: Kun Huang +import subprocess +import psutil +import sys +from scalpels.db import api as db_api +from copy import deepcopy as copy +import signal +from tooz import coordination +import time +from scalpels.agents import base + +""" +example: + sca-tracer mysql +TODO: + add help (-h) message + config key-word arguments for each tracer +""" + +def read_from_ag(ag): + # wrong impl. here, need read from config or db instead + from scalpels.client.utils import tracers_map as agents_map + data_dir = db_api.setup_config_get()["data_dir"].rstrip("/") + return agents_map.get(ag) % data_dir + def main(): - raise NotImplementedError() + task_uuid, ag = sys.argv[1], sys.argv[2] + cmd = read_from_ag(ag) + + worker = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE) + out = [] + try: + while True: + t = worker.stdout.readline() + if not len(t): + break + _t = (time.time(), t.strip()) + out.append(_t) + except KeyboardInterrupt: + pass + + # psutil is much more professional... I have to use it instead + # this kill is to script process + worker_p = psutil.Process(worker.pid) + worker_p.send_signal(signal.SIGINT) + + parse_func = getattr(base, "parse_%s" % ag) + + # TODO file lock is okay in localhost, here need redis for distributed + # lock istead + co = coordination.get_coordinator("file:///tmp", b"localhost") + co.start() + lock = co.get_lock("task_update_lock") + with lock: + task = db_api.task_get(task_uuid) + results = copy(task.results) + for ret in parse_func(out): + ret = db_api.result_create(**ret) + results.append(ret.uuid) + db_api.task_update(task_uuid, results=results) + time.sleep(2) + co.stop() if __name__ == "__main__": main() diff --git a/scripts/agent.py b/scripts/agent.py deleted file mode 100755 index 80f4f39..0000000 --- a/scripts/agent.py +++ /dev/null @@ -1,61 +0,0 @@ -#!/usr/bin/env python -#-*- coding:utf-8 -*- -# Author: Kun Huang - -import subprocess -import psutil -import sys -from scalpels.db import api as db_api -from copy import deepcopy as copy -import signal -from tooz import coordination -import time -from scalpels.agents import base - -""" -python /agent.py mysql -""" - -def read_from_ag(ag): - # wrong impl. here, need read from config or db instead - from scalpels.client.utils import tracers_map as agents_map - data_dir = db_api.setup_config_get()["data_dir"].rstrip("/") - return agents_map.get(ag) % data_dir - -if __name__ == "__main__": - task_uuid, ag = sys.argv[1], sys.argv[2] - cmd = read_from_ag(ag) - - worker = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE) - out = [] - try: - while True: - t = worker.stdout.readline() - if not len(t): - break - _t = (time.time(), t.strip()) - out.append(_t) - except KeyboardInterrupt: - pass - - # psutil is much more professional... I have to use it instead - # this kill is to script process - worker_p = psutil.Process(worker.pid) - worker_p.send_signal(signal.SIGINT) - - parse_func = getattr(base, "parse_%s" % ag) - - # TODO file lock is okay in localhost, here need redis for distributed - # lock istead - co = coordination.get_coordinator("file:///tmp", b"localhost") - co.start() - lock = co.get_lock("task_update_lock") - with lock: - task = db_api.task_get(task_uuid) - results = copy(task.results) - for ret in parse_func(out): - ret = db_api.result_create(**ret) - results.append(ret.uuid) - db_api.task_update(task_uuid, results=results) - time.sleep(2) - co.stop()