parse rpc data before write into db

sca start -a rpc
sleep 20
sca stop
sca report
[you will see two results here: pkts and bytes]

Change-Id: Ibab207c43ef1a5509c8f85c0003cee48c4b66625
This commit is contained in:
Kun Huang 2015-10-29 16:05:06 +08:00
parent 1bd32a7524
commit 698c248ca0
7 changed files with 47 additions and 12 deletions

View File

@ -13,3 +13,32 @@ def run_agent(task_uuid, ag):
cmd = "python %s/agent.py %s %s" % (data_dir, task_uuid, ag)
ag = subprocess.Popen(cmd.split())
return ag.pid
def parse_rpc(out):
"""
in:
ts, 123.00 pkts 2312 bytes
...
...
out:
name: Port Traffic
unit: pkts
data: [(ts, 123.00), ...]
name: Port Traffic
unit: bytes
data: [(ts, 2312.00), ...]
"""
ag_name = "Port Traffic"
pkts_ret = {"name": ag_name,
"unit": "pkts",
"data":[]}
bytes_ret = {"name": ag_name,
"unit": "bytes",
"data":[]}
for ts, _t in out:
pkts, pkts_unit, bytes, bytes_unit = _t.split(" ", 3)
pkts_ret["data"].append((ts, pkts))
bytes_ret["data"].append((ts, bytes))
return (pkts_ret, bytes_ret)

View File

@ -33,4 +33,4 @@ def run(config):
for ret_uuid in task.results:
ret = db_api.result_get(ret_uuid)
results.append(ret.data)
print "result <%s>, data: %s" % (ret.uuid, ret.data)
print "result <%s>, data: %s, unit: %s, name: %s" % (ret.uuid, ret.data, ret.unit, ret.name)

View File

@ -19,12 +19,12 @@ def db_create(sc):
def db_drop():
IMPL.db_drop()
def result_create(data):
def result_create(name="", unit="", data=None):
"""
:param data: a list :)
:returns: result model obj
"""
return IMPL.result_create(data)
return IMPL.result_create(name, unit, data)
def task_create(results, pids):
"""

View File

@ -48,9 +48,9 @@ def model_query(model, session=None):
query = oslodbsqa_utils.model_query(model, session)
return query
def result_create(data):
def result_create(name="", unit="", data=None):
result = models.Result()
result.update({"data":data})
result.update({"name":name, "unit": unit, "data":data})
result.save()
return result

View File

@ -43,6 +43,7 @@ class Result(BASE, ScalpelsBase):
uuid = Column(String(36), default=lambda : str(uuid.uuid4()), nullable=False)
data = Column(JSONEncodedData, nullable=False)
unit = Column(String(20), nullable=True)
name = Column(String(20), nullable=True)
class Setup(BASE, ScalpelsBase):
__tablename__ = "setup"

View File

@ -10,6 +10,7 @@ from copy import deepcopy as copy
import signal
from tooz import coordination
import time
from scalpels.agents import base
"""
python <path-to-dir>/agent.py <uuid> mysql
@ -32,14 +33,18 @@ if __name__ == "__main__":
t = worker.stdout.readline()
if not len(t):
break
out.append(t.strip())
_t = (time.time(), t.strip())
out.append(_t)
except KeyboardInterrupt:
pass
# psutil is much more professional... I have to use it instead
# this kill is to script process
worker_p = psutil.Process(worker.pid)
worker_p.send_signal(signal.SIGINT)
parse_func = getattr(base, "parse_%s" % ag)
# TODO file lock is okay in localhost, here need redis for distributed
# lock istead
co = coordination.get_coordinator("file:///tmp", b"localhost")
@ -48,8 +53,9 @@ if __name__ == "__main__":
with lock:
task = db_api.task_get(task_uuid)
results = copy(task.results)
ret = db_api.result_create(out)
results.append(ret.uuid)
for ret in parse_func(out):
ret = db_api.result_create(**ret)
results.append(ret.uuid)
db_api.task_update(task_uuid, results=results)
time.sleep(2)
co.stop()

View File

@ -22,7 +22,7 @@ fi
rule="$chain -p $protocol --dport $port"
#XXX iptables -A INPUT -p tcp --dport 5672
echo applying rule: $rule
#echo applying rule: $rule
sudo iptables -t mangle -A $rule
interval=3
@ -33,11 +33,10 @@ while [ 1 -eq 1 ] ; do
sleep $interval
n_packages=`sudo iptables -t mangle -L $chain -n -v -x | grep $port | grep $protocol | tail -n 1 | awk '{print $1}'`
n_bytes=`sudo iptables -t mangle -L $chain -n -v -x | grep $port | grep $protocol | tail -n 1 | awk '{print $2}'`
python -c "print '%0.2f pkt/s' % (float($n_packages-$packages)/int($interval))"
python -c "print '%0.2f byte/s' % (float($n_bytes-$bytes)/int($interval))"
python -c "print '%0.2f pkt/s %0.2f byte/s' % (float($n_packages-$packages)/int($interval), float($n_bytes-$bytes)/int($interval))"
packages=$n_packages
bytes=$n_bytes
done
echo deleting rule: $rule
#echo deleting rule: $rule
sudo iptables -t mangle -D $rule