
rpc calling maybe contain like: {u'_unique_id': u'012fad2a867c4582900b4dbf7af13de6', u'event_type': u'image.send', u'timestamp': u'2015-10-29 05:29:35.282756', u'message_id': u'70f443d2-56fe-451b-92cc-5456c5c97185', u'priority': u'INFO', u'publisher_id': u'image.localhost', u'payload': {u'receiver_tenant_id': u'7157341aa9f14e1ead34af636f19f306', u'destination_ip': u'127.0.0.1', u'bytes_sent': 3740163, u'image_id': u'3451f5dc-e0a4-44f0-b5e4-566b9a930b49', u'receiver_user_id': u'fe96cc0fd2e5421aa0be229a5c09431c', u'owner_id': u'7157341aa9f14e1ead34af636f19f306'}} no such oslo.messaging yet Change-Id: I7d612edd6d7782aeac62d1235fca66093cfd7258
44 lines
1.3 KiB
Python
Executable File
44 lines
1.3 KiB
Python
Executable File
#!/usr/bin/env python
|
|
#-*- coding:utf-8 -*-
|
|
# Author: Kun Huang <academicgareth@gmail.com>
|
|
|
|
import json
|
|
import subprocess
|
|
|
|
from kombu import Connection
|
|
from kombu import Exchange
|
|
from kombu.mixins import ConsumerMixin
|
|
from kombu import Queue
|
|
|
|
task_exchange = Exchange('amq.rabbitmq.trace', type='topic')
|
|
task_queues = []
|
|
|
|
class Worker(ConsumerMixin):
|
|
def __init__(self, connection):
|
|
self.connection = connection
|
|
|
|
def get_consumers(self, Consumer, channel):
|
|
return [Consumer(queues=task_queues,
|
|
accept=['pickle', 'json'],
|
|
callbacks=[self.process_task])]
|
|
|
|
# TODO get req if hint in resp
|
|
def process_task(self, body, message):
|
|
rpc_body = json.loads(body)
|
|
if "oslo.message" in rpc_body:
|
|
print json.loads(rpc_body["oslo.message"])
|
|
else:
|
|
print rpc_body
|
|
|
|
with Connection('amqp://guest:guest@localhost:5672//') as conn:
|
|
chan = conn.channel()
|
|
queue = Queue("trace_", task_exchange, routing_key="publish.*", channel=chan)
|
|
task_queues.append(queue)
|
|
try:
|
|
subprocess.check_call("sudo rabbitmqctl trace_on", shell=True)
|
|
worker = Worker(conn)
|
|
worker.run()
|
|
except KeyboardInterrupt:
|
|
subprocess.check_call("sudo rabbitmqctl trace_off", shell=True)
|
|
queue.delete()
|