diff --git a/apps/rest_processor_app/README.md b/apps/rest_processor_app/README.md new file mode 100755 index 0000000..c6701b6 --- /dev/null +++ b/apps/rest_processor_app/README.md @@ -0,0 +1,16 @@ +# Rest Processor App + +This application consists of a controller that accepts "work" requests and a variable number of workers that process these requests. + +# Controller + +The controller is an API implemented with FastAPI, featuring three endpoints: +* The "POST /" endpoint with a URL parameter "t" representing a positive integer value, which adds a new work request to the "pending requests" queue. The component periodically publishes to the EMS the age in seconds of the oldest request waiting in the queue (metric "max_age"). +* The "POST /accept" endpoint removes the oldest element from the "pending requests" queue, which ideally will be consumed by the worker, and added to the controller's "completed requests" queue. +* The "GET /" endpoint displays the two queues and their requests in HTML format. The "pending requests" queue is sorted from oldest to youngest, while the "completed requests" queue is sorted from youngest to oldest, with a maximum of one hundred requests stored. + +# Worker + +The worker consumes requests from the controller's "pending requests" queue via the "POST /accept" endpoint. Upon receiving a request, the worker sleeps for the duration received in the request (in seconds) until the queue is empty and receives a "null" response from the controller. + +Once all pending requests have been processed, the worker begins a countdown based on the "max idle time" setting, which is specified by the user through the environment variable "MAX_IDLE_TIME". If no new requests are received within this idle time, the worker gracefully exits. diff --git a/apps/rest_processor_app/controller.py b/apps/rest_processor_app/controller.py new file mode 100755 index 0000000..948fa3d --- /dev/null +++ b/apps/rest_processor_app/controller.py @@ -0,0 +1,265 @@ +import asyncio +from datetime import date +import datetime +import json +import os +import queue +import sys +import threading +from time import sleep +import time +import traceback +from uuid import uuid4 +from fastapi import FastAPI +import stomp +from fastapi.responses import HTMLResponse +import logging + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) +file_handler = logging.FileHandler('rest_processor_app.log', encoding='utf-8') +file_handler.setLevel(logging.DEBUG) +formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') +file_handler.setFormatter(formatter) + +class ExcludeWatchfilesFilter(logging.Filter): + def filter(self, record): + return not record.name.startswith('watchfiles') + +file_handler.addFilter(ExcludeWatchfilesFilter()) +logger.addHandler(file_handler) + + +report_metrics_to_ems = os.getenv("report_metrics_to_ems","False") +stomp_broker_address = os.getenv("nebulous_ems_ip") +stomp_port = int(os.getenv("nebulous_ems_port",0)) +stomp_destination = os.getenv("nebulous_ems_metrics_topic") +stomp_user = os.getenv("nebulous_ems_user") +stomp_pass = os.getenv("nebulous_ems_password") +stomp_client = None + +pending_requests = queue.Queue() +completed_requests = queue.Queue(100) +app = FastAPI() + +class CompletedRequest(): + def __init__(self, t, reception_time, completion_time, worker_id): + self.t = t + self.reception_time = reception_time + self.completion_time = completion_time + self.worker_id = worker_id + self.total_time = round((completion_time - reception_time),4) + + def __str__(self): + task_dict = { + 't': self.t, + 'reception_time': self.reception_time, + 'completion_time': self.completion_time, + 'worker_id': self.worker_id, + 'total_time' : self.total_time + } + return json.dumps(task_dict) + +class PendingRequest(): + def __init__(self, t, reception_time): + self.t = t + self.reception_time = reception_time + self.age = 0 + + def completedBy(self,worker_uid): + return CompletedRequest(self.t,self.reception_time,time.time(),worker_uid) + + def __str__(self): + task_dict = { + 't': self.t, + 'reception_time': self.reception_time, + 'age':self.age + } + return json.dumps(task_dict) + + + + +def publish_data_to_api(): + if "True" == report_metrics_to_ems: + max_age = -1 + try: + max_age = time.time() - list(pending_requests.queue)[0].reception_time + except: + None + json_msg="{max_age: "+str(max_age) +"}" + #print(json.dumps(json_msg)) + logger.debug(json.dumps(json_msg)) + stomp_client.send(body=json.dumps(json_msg), headers={'type':'textMessage', 'amq-msg-type':'text'}, destination=stomp_destination) + else: + logger.info("EMS reporting is disabled") + +def report_data(): + while True: + publish_data_to_api() + sleep(10) + +def update_age(): + logger.info("Pending requests age clock started") + while True: + for i in range(pending_requests.qsize()): + pending_requests.queue[i].age = round((time.time() - pending_requests.queue[i].reception_time),4) + sleep(3) + + + +@app.on_event("startup") +async def startup_event(): + logger.info("Starting controller's API") + thread1 = threading.Thread(target=update_age) + thread1.daemon = True + thread1.start() + if "True" == report_metrics_to_ems: + logger.info("Connecting to STOMP") + try: + stomp_client = stomp.Connection12(host_and_ports=[(stomp_broker_address, stomp_port)]) + stomp_client.set_listener('', stomp.PrintingListener()) + stomp_client.connect(stomp_user, stomp_pass, wait=True) + stomp_client.subscribe(stomp_destination,str(uuid4())) + thread = threading.Thread(target=report_data) + thread.daemon = True + thread.start() + logger.info("Connection sucessfully") + except Exception as e: + traceback.print_exc() + sys.exit(1) + logger.info("Done") + + +@app.post("/") +async def root(t : int): + + pending_request = PendingRequest(t,time.time()) + pending_requests.put(pending_request) + logger.debug("New request: "+str(pending_request)) + return {"message": "Request recieved, "+str(pending_request)} + + +@app.get("/", response_class=HTMLResponse) +async def root(): + pending_requests_str = list(pending_requests.queue) + completed_requests_str = list(completed_requests.queue) + pending_requests_html = "" + + for req in pending_requests_str: + for field in str(req).split(","): + field_stripped = field.split(":")[1].strip("} ") + if(field.split(":")[0].find("reception_time")!= -1): + field_stripped=datetime.datetime.fromtimestamp(float(field_stripped)).isoformat() + pending_requests_html += f"{field_stripped}\n" + pending_requests_html += "\n" + + + completed_requests_html = "" + for req in completed_requests_str: + completed_request_html = "" + for field in str(req).split(","): + field_stripped = field.split(":")[1].strip("} ") + if(field.split(":")[0].find("reception_time")!= -1 or field.split(":")[0].find("completion_time")!= -1): + field_stripped=datetime.datetime.fromtimestamp(float(field_stripped)).isoformat() + completed_request_html += f"{field_stripped}\n" + completed_request_html += "\n" + completed_requests_html = completed_request_html+completed_requests_html + + html_response = f""" + + + + Requests + + + +
+ +

Pending Requests:

+ + + + + + {pending_requests_html} +
Workload durationReception TimeAge
+
+
+ +

Completed Requests:

+ + + + + + + + {completed_requests_html} +
Workload durationReception TimeProcessing start timeWorker idTotal Time
+
+ + + """ + return html_response + +@app.post("/accept") +async def accept(worker_id): + lock = threading.Lock() + lock.acquire() + try: + if(pending_requests.qsize()==0): + return {"message": None} + pending_request = pending_requests.get(0) + logger.debug("Selected request: "+ str(pending_request)) + completed_request = pending_request.completedBy(worker_id) + try: + completed_requests.put_nowait(completed_request) + except: + completed_requests.get() + completed_requests.put_nowait(completed_request) + logger.debug("Completing request: "+ str(completed_request)) + return {"message": pending_request.t} + finally: + # Release the lock after accessing the queue + + lock.release() + + + + + diff --git a/apps/rest_processor_app/worker.py b/apps/rest_processor_app/worker.py new file mode 100755 index 0000000..b6d96dd --- /dev/null +++ b/apps/rest_processor_app/worker.py @@ -0,0 +1,42 @@ +from datetime import date +from time import sleep +import time +from uuid import uuid4 +import requests +import os +os.environ["MAX_IDLE_TIME"] = "10" +max_idle_time = int(os.getenv("MAX_IDLE_TIME")) + +worker_id = str(uuid4()) +def consume_api(): + url = 'http://localhost:8000/accept' + try: + params = {'worker_id':worker_id } + response = requests.post(url,params=params) #ENVIAR UID + if response.status_code == 200: + # Assuming the response contains an integer + result = response.json()['message'] + if (isinstance(result, int) or result==None): + return result + else: + print("Error: Unexpected response format. Expected an integer or null.") + else: + print("Error: Failed to fetch data. Status code:", response.status_code) + except requests.RequestException as e: + print("Error: Failed to make request:", e) + +last_job_time = time.time() +while True: + # Example usage + result = consume_api() + if result is not None: + print(result) + sleep(result) + last_job_time -= time.time() + else: + if max_idle_time > -1 and time.time() > last_job_time+max_idle_time: + exit + sleep(1) + + +