From 93a5d71a6bf2ddbf1f01a4352e58820337f767a3 Mon Sep 17 00:00:00 2001 From: "robert.sanfeliu" Date: Thu, 9 May 2024 17:15:49 +0200 Subject: [PATCH] Commit Change-Id: If19e60cf28d7a597d1589b74b498711551fc02e1 --- .gitignore | 13 + README.md | 4 + apps/mqtt_processor_app/README.md | 24 ++ apps/mqtt_processor_app/worker/Dockerfile | 7 + .../worker/requirements.txt | 4 + apps/mqtt_processor_app/worker/worker.py | 128 ++++++++ tests/.gitignore | 8 + tests/README.md | 0 tests/pom.xml | 105 +++++++ tests/src/main/resources/log4j2.properties | 55 ++++ .../tests/FileTemplatingUtils.java | 74 +++++ .../tests/MQTTProcessorAppDeploymentTest.java | 293 ++++++++++++++++++ ...QTTProcessorAppMessageBrokerInterface.java | 154 +++++++++ .../tests/NebulOuSCoreMessage.java | 29 ++ .../NebulousCoreMessageBrokerInterface.java | 286 +++++++++++++++++ .../tests/SendPayloadToMQTTProcessorApp.java | 92 ++++++ .../app_creation_message.json | 116 +++++++ .../mqtt_processor_app/metric_model.json | 1 + 18 files changed, 1393 insertions(+) create mode 100644 README.md create mode 100644 apps/mqtt_processor_app/README.md create mode 100644 apps/mqtt_processor_app/worker/Dockerfile create mode 100644 apps/mqtt_processor_app/worker/requirements.txt create mode 100644 apps/mqtt_processor_app/worker/worker.py create mode 100644 tests/.gitignore create mode 100644 tests/README.md create mode 100644 tests/pom.xml create mode 100644 tests/src/main/resources/log4j2.properties create mode 100644 tests/src/test/java/eut/nebulouscloud/tests/FileTemplatingUtils.java create mode 100644 tests/src/test/java/eut/nebulouscloud/tests/MQTTProcessorAppDeploymentTest.java create mode 100644 tests/src/test/java/eut/nebulouscloud/tests/MQTTProcessorAppMessageBrokerInterface.java create mode 100644 tests/src/test/java/eut/nebulouscloud/tests/NebulOuSCoreMessage.java create mode 100644 tests/src/test/java/eut/nebulouscloud/tests/NebulousCoreMessageBrokerInterface.java create mode 100644 tests/src/test/java/eut/nebulouscloud/tests/SendPayloadToMQTTProcessorApp.java create mode 100644 tests/src/test/resources/mqtt_processor_app/app_creation_message.json create mode 100644 tests/src/test/resources/mqtt_processor_app/metric_model.json diff --git a/.gitignore b/.gitignore index e0e9b8b..bd81b3a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,15 @@ +/nebulous-automated-tests/.classpath +/nebulous-automated-tests/.gitattributes +/nebulous-automated-tests/.project +/nebulous-automated-tests/.settings/org.eclipse.buildship.core.prefs +/nebulous-automated-tests/.settings/org.eclipse.jdt.core.prefs +/nebulous-automated-tests/gradlew.bat +/nebulous-automated-tests/gradle/wrapper/gradle-wrapper.jar +/nebulous-automated-tests/gradle/wrapper/gradle-wrapper.properties +/nebulous-automated-tests/gradlew __pycache__/ .nox/ +/nebulous-automated-tests/maven-repo +/tests/.settings +/tests/logs +/apps/mqtt_processor_app/worker/.vscode diff --git a/README.md b/README.md new file mode 100644 index 0000000..bba2624 --- /dev/null +++ b/README.md @@ -0,0 +1,4 @@ +NebulOuS integration tests + +- tests: Folder With a Java Maven project with JUnit tests that validate basic aspects of NebulOuS +- apps: Folder containing necessary testing apps used during the testing process. \ No newline at end of file diff --git a/apps/mqtt_processor_app/README.md b/apps/mqtt_processor_app/README.md new file mode 100644 index 0000000..bec88c2 --- /dev/null +++ b/apps/mqtt_processor_app/README.md @@ -0,0 +1,24 @@ +Source code of the mqtt processor app used for testing. + +On startup, the application connects to the configured MQTT broker and waits for messages on the topic APP_MQTT_INPUT_TOPIC. When a well structured message on said topic, the application simulates some work and sends a message to APP_MQTT_OUTPUT_TOPIC. + +The structure of the input message is: +- job_id: An unique UUID assigned to the job +- timestamp: Timestamp of the request with the format YYYY-MM-dd HH:mm:ssZ +- job_timestamp: Same as timestamp +- inference_duration: Time in seconds that the worker processing this job will sleep to simulate a time consuming inference process. + +The worker needs the following environment variables to work: +- mqtt_ip: The IP/host of the MQTT broker +- mqtt_port: The port of the MQTT broker +- mqtt_subscribe_topic: The topic to subscribe to and recieve requests +- mqtt_publish_topic: The topic to connect to and publish results + +- report_metrics_to_ems: Flag to indicate if metrics should be published to EMS +- nebulous_ems_ip: EMS IP +- nebulous_ems_port: EMS port +- nebulous_ems_user: EMS user +- nebulous_ems_password: EMS password +- nebulous_ems_metrics_topic: EMS topic to use to report metrics + + \ No newline at end of file diff --git a/apps/mqtt_processor_app/worker/Dockerfile b/apps/mqtt_processor_app/worker/Dockerfile new file mode 100644 index 0000000..1e1b5f4 --- /dev/null +++ b/apps/mqtt_processor_app/worker/Dockerfile @@ -0,0 +1,7 @@ +FROM python:3.11 +RUN mkdir /app +WORKDIR /app +COPY ./requirements.txt /app/requirements.txt +RUN pip install -r /app/requirements.txt +COPY ./worker.py ./worker.py +CMD [ "python3","-u", "./worker.py"] \ No newline at end of file diff --git a/apps/mqtt_processor_app/worker/requirements.txt b/apps/mqtt_processor_app/worker/requirements.txt new file mode 100644 index 0000000..7ac45b2 --- /dev/null +++ b/apps/mqtt_processor_app/worker/requirements.txt @@ -0,0 +1,4 @@ +python-dotenv==1.0.0 +PyYAML==6.0.1 +paho-mqtt==1.6.1 +stomp.py \ No newline at end of file diff --git a/apps/mqtt_processor_app/worker/worker.py b/apps/mqtt_processor_app/worker/worker.py new file mode 100644 index 0000000..9ee359a --- /dev/null +++ b/apps/mqtt_processor_app/worker/worker.py @@ -0,0 +1,128 @@ +import paho.mqtt.client as mqtt +import stomp +import os.path +import logging +import threading +import queue +import time +import datetime +import json +from uuid import uuid4 +import sys +import traceback +print("Starting dummy app worker") + +shared_stack = queue.Queue() +worker_id = str(uuid4()) +# MQTT Broker details +mqtt_broker_address = os.getenv("mqtt_ip") +mqtt_port = int(os.getenv("mqtt_port")) +mqtt_topic = os.getenv("mqtt_subscribe_topic") +mqtt_publish_topic = os.getenv("mqtt_publish_topic") + +# STOMP Broker details +report_metrics_to_ems = os.getenv("report_metrics_to_ems") +stomp_broker_address = os.getenv("nebulous_ems_ip") +stomp_port = int(os.getenv("nebulous_ems_port")) +stomp_destination = os.getenv("nebulous_ems_metrics_topic") +stomp_user = os.getenv("nebulous_ems_user") +stomp_pass = os.getenv("nebulous_ems_password") + +def map_value(old_value, old_min, old_max, new_min, new_max): + return ( (old_value - old_min) / (old_max - old_min) ) * (new_max - new_min) + new_min + + +# MQTT callback function +def on_message(client, userdata, message): + try: + payload = message.payload.decode("utf-8") + print("Recieved MQTT message",payload) + print("Message added to stack. Current length:",shared_stack.qsize()) + shared_stack.put(payload) + backpressure = map_value(min(shared_stack.qsize(),10),0,10,0,2) + print("Backpressure: ",backpressure) + if backpressure>0: + time.sleep(backpressure) + except Exception as e: + print("Error",e) + sys.exit(1) + +def process_messages(): + while True: + try: + # Get message from the shared stack + payload = shared_stack.get() + payload = json.loads(payload) + print("Processing ",payload) + print("Proceed to simulate an inference of ",payload["inference_duration"]) + time.sleep(payload["inference_duration"]) + date_timestamp = datetime.datetime.strptime(payload['job_timestamp'], "%Y-%m-%d %H:%M:%S%z") + total_job_duration = int((datetime.datetime.now(datetime.timezone.utc) - date_timestamp).total_seconds()) + print(f"total_job_duration: {total_job_duration}") + json_msg = { + "metricValue": total_job_duration, + "level": 1, + "timestamp": int(datetime.datetime.now().timestamp()) + } + + + payload["worker_id"] = worker_id + payload["total_job_duration"] = total_job_duration + payload["job_completion_timestamp"] = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d %H:%M:%S%z") + mqtt_client.publish(mqtt_publish_topic,json.dumps(payload),2) + + + if "True" == report_metrics_to_ems: + print("send_metric ",json_msg) + print(json.dumps(json_msg)) + stomp_client.send(body=json.dumps(json_msg), headers={'type':'textMessage', 'amq-msg-type':'text'}, destination=stomp_destination) + else: + print("EMS reporting is disabled.") + except Exception as e: + print("Error",e) + sys.exit(1) + +# STOMP connection callback +def on_connect_stomp(): + print("Connected to STOMP broker") + +# STOMP error callback +def on_error_stomp(): + print("Error in STOMP connection") + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + + + + + + +print("Connecting to MQTT") +# Initialize MQTT client +mqtt_client = mqtt.Client() +mqtt_client.on_message = on_message +mqtt_client.connect(mqtt_broker_address, mqtt_port) +mqtt_client.subscribe(mqtt_topic) +mqtt_client.enable_logger(logger) +publish_thread = threading.Thread(target=process_messages) +publish_thread.daemon = True # Daemonize the thread so it will exit when the main thread exits +publish_thread.start() +print("Done") + +if "True" == report_metrics_to_ems: + print("Connecting to STOMP") + try: + stomp_client = stomp.Connection12(host_and_ports=[(stomp_broker_address, stomp_port)]) + stomp_client.set_listener('', stomp.PrintingListener()) + stomp_client.connect(stomp_user, stomp_pass, wait=True) + stomp_client.subscribe(stomp_destination,str(uuid4())) + except Exception as e: + traceback.print_exc() + mqtt_client.publish(mqtt_publish_topic,"Error in STOMP connection",2) + sys.exit(1) + print("Done") +print("Start MQTT Loop") +# Start the MQTT client loop +mqtt_client.loop_forever() +print("App ended") \ No newline at end of file diff --git a/tests/.gitignore b/tests/.gitignore new file mode 100644 index 0000000..ea4537c --- /dev/null +++ b/tests/.gitignore @@ -0,0 +1,8 @@ +# Ignore Gradle project-specific cache directory +.gradle + +# Ignore Gradle build output directory +build +data +bin +/target/ diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 0000000..e69de29 diff --git a/tests/pom.xml b/tests/pom.xml new file mode 100644 index 0000000..7a4c4c5 --- /dev/null +++ b/tests/pom.xml @@ -0,0 +1,105 @@ + + + 4.0.0 + + eu.nebulouscloud + tests + 0.1.0 + + + + ossrh + https://s01.oss.sonatype.org/content/repositories/snapshots + + + ossrh + + https://s01.oss.sonatype.org/service/local/staging/deploy/maven2/ + + + + + + 11 + 11 + + + + + exn-java-connector + https://s01.oss.sonatype.org/content/repositories/snapshots/ + + + + + + org.apache.logging.log4j + log4j-slf4j-impl + 2.19.0 + + + com.jayway.jsonpath + json-path + 2.8.0 + + + com.fasterxml.jackson.core + jackson-databind + 2.16.0 + + + + eu.nebulouscloud + exn-connector-java + 1.0-SNAPSHOT + compile + + + org.slf4j + slf4j-simple + + + + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.5 + + + org.slf4j + slf4j-simple + + + + + + org.junit.jupiter + junit-jupiter-api + 5.8.1 + test + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.3.0 + + + package + + shade + + + + + + + + + \ No newline at end of file diff --git a/tests/src/main/resources/log4j2.properties b/tests/src/main/resources/log4j2.properties new file mode 100644 index 0000000..4f2efee --- /dev/null +++ b/tests/src/main/resources/log4j2.properties @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Log4J 2 configuration + +# Monitor config file every X seconds for updates +monitorInterval = 5 + +loggers = activemq +logger.activemq.name = org.apache.activemq +logger.activemq.level = WARN + + +loggers = test +logger.test.name = eut.nebulouscloud.automated_tests +logger.test.level = DEBUG + +#logger.org.apache.activemq.artemis.core.server.cluster.level = TRACE + +rootLogger.level = INFO +#rootLogger = console +rootLogger.appenderRef.console.ref = console +appenders = console, file + +appender.file.type = File +appender.file.name = LOGFILE +appender.file.fileName=logs/log4j.log +appender.file.layout.type=PatternLayout +appender.file.layout.pattern=[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n +#appender.file.filter.threshold.type = ThresholdFilter +#appender.file.filter.threshold.level = info + + +# Console appender +appender.console.type = Console +appender.console.name = STDOUT +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n + + +rootLogger.appenderRefs = stdout, logfile +rootLogger.appenderRef.stdout.ref = STDOUT +rootLogger.appenderRef.logfile.ref = LOGFILE diff --git a/tests/src/test/java/eut/nebulouscloud/tests/FileTemplatingUtils.java b/tests/src/test/java/eut/nebulouscloud/tests/FileTemplatingUtils.java new file mode 100644 index 0000000..48db5c3 --- /dev/null +++ b/tests/src/test/java/eut/nebulouscloud/tests/FileTemplatingUtils.java @@ -0,0 +1,74 @@ +package eut.nebulouscloud.tests; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +public class FileTemplatingUtils { + static Logger LOGGER = LoggerFactory.getLogger(FileTemplatingUtils.class); + static ObjectMapper om = new ObjectMapper(); + + /** + * Load a JSON file stored in the resources folder of the project and perform the substitutions provided. + * @param path + * @param substitutions + * @return + * @throws Exception + */ + public static Map loadJSONFileAndSubstitute(String path,Map substitutions) throws Exception + { + return om.readValue(loadFileAndSubstitute(path, substitutions),HashMap.class); + } + + /** + * Load a text file stored in the resources folder of the project and perform the substitutions provided. + * @param path + * @param substitutions + * @return + * @throws Exception + */ + public static String loadFileAndSubstitute(String path,Map substitutions) throws Exception + { + StringBuilder contentBuilder = new StringBuilder(); + try (BufferedReader br = new BufferedReader(new InputStreamReader(FileTemplatingUtils.class.getClassLoader() + .getResourceAsStream(path)))) { + String line; + while ((line = br.readLine()) != null) { + // Apply substitutions + line = applySubstitutions(line, substitutions); + contentBuilder.append(line).append("\n"); + } + } + catch(Exception ex) + { + throw new Exception(ex); + } + return contentBuilder.toString(); + + } + + /** + * Find any placeholders in the given line and substitute them with the appropriate value + * @param line + * @param substitutions + * @return + */ + private static String applySubstitutions(String line, Map substitutions) { + + for (Map.Entry entry : substitutions.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + line = line.replace(key, value); + } + return line; + } + + + + +} diff --git a/tests/src/test/java/eut/nebulouscloud/tests/MQTTProcessorAppDeploymentTest.java b/tests/src/test/java/eut/nebulouscloud/tests/MQTTProcessorAppDeploymentTest.java new file mode 100644 index 0000000..953e913 --- /dev/null +++ b/tests/src/test/java/eut/nebulouscloud/tests/MQTTProcessorAppDeploymentTest.java @@ -0,0 +1,293 @@ +package eut.nebulouscloud.tests; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; + +import com.fasterxml.jackson.databind.ObjectMapper; + +class MQTTProcessorAppDeploymentTest { + static Logger LOGGER = LoggerFactory.getLogger(MQTTProcessorAppDeploymentTest.class); + protected ObjectMapper om = new ObjectMapper(); + static int DELAY_SECONDS = 3; + + String applicationId = new SimpleDateFormat("HHmmssddMM").format(new Date()) + + "automated-testing-mqtt-app-" + + new Date().getTime(); + + NebulousCoreMessageBrokerInterface coreBroker; + MQTTProcessorAppMessageBrokerInterface appBroker; + String mqttBroker = "broker.emqx.io"; + String mqttPort = "1883"; + String mqttTopicPrefix = "atest"; + String mqttAppInputTopic = mqttTopicPrefix + "/input"; + String mqttAppOutputTopic = mqttTopicPrefix + "/output"; + + + + /** + * This test ensures that a MQTT processor app can be deployed using NebulOuS. The test + * simulates the user requesting a deployment of an app through UI (sending app + * creation message and metric model). Then, the test asserts that optimizer + * controller performs expected actions, namely: - requesting node candidates + * and geting a response from CFSB - defines the app cluster - deploys the app + * cluster - sends the AMPL file for the solver - reports app status to be + * running + * + * Once the optimizer controller reports the app being successfully deployed, + * the test asserts that the app works as expected. For this, it connects to a + * public MQTT broker where the app is waiting for job requests, publish one and waits for the response + * for them. + * + * @throws Exception + */ + @Test + void test() throws Exception { + + LOGGER.info(String.format("Begin MQTT Processor APP deployment. applicationId is %s", applicationId)); + coreBroker = new NebulousCoreMessageBrokerInterface(); + appBroker = new MQTTProcessorAppMessageBrokerInterface("tcp://" + mqttBroker + ":" + mqttPort, mqttAppOutputTopic); + + /** + * Prepare and send app creation message and assert is correctly received by any subscriber. + * + * The app creation message payload template is stored in the project resources folder. This file contains several + * parameters that need to be substituted. These are: + * APP_ID: The id of the app being deployed + * MQTT connection details (APP_MQTT_BROKER_SERVER, APP_MQTT_BROKER_PORT, APP_MQTT_INPUT_TOPIC, APP_MQTT_OUTPUT_TOPIC): On startup, the application connects to the configured MQTT broker + * and waits for messages on the topic APP_MQTT_INPUT_TOPIC. Uppon a well structured message on said topic, the application simulates some work and sends a message to APP_MQTT_OUTPUT_TOPIC. + * REPORT_METRICS_TO_EMS: If true, application tries to connect to local EMS broker to report metrics. If false, not. + * + */ + LOGGER.info("send app creation message"); + + Map appParameters = new HashMap(); + appParameters.put("{{APP_ID}}", applicationId); + appParameters.put("{{APP_MQTT_BROKER_SERVER}}", mqttBroker); + appParameters.put("{{APP_MQTT_BROKER_PORT}}", mqttPort); + appParameters.put("{{APP_MQTT_INPUT_TOPIC}}", "$share/workers/" + mqttAppInputTopic); + appParameters.put("{{APP_MQTT_OUTPUT_TOPIC}}", mqttAppOutputTopic); + appParameters.put("{{REPORT_METRICS_TO_EMS}}", "True"); + + Map appCreationPayload = FileTemplatingUtils + .loadJSONFileAndSubstitute("mqtt_processor_app/app_creation_message.json", appParameters); + coreBroker.sendAppCreationMessage(appCreationPayload, applicationId); + + // Assert that the message was sent + assertTrue(coreBroker.findFirst(applicationId, "eu.nebulouscloud.ui.dsl.generic", null, 10).isPresent()); + Thread.sleep(DELAY_SECONDS * 1000); + + /** + * Send metric model and assert is correctly received by any subscriber + */ + LOGGER.info("send metric model"); + Map metricModelPayload = FileTemplatingUtils.loadJSONFileAndSubstitute("mqtt_processor_app/metric_model.json", + Map.of("{{APP_ID}}", applicationId)); + coreBroker.sendMetricModelMessage(metricModelPayload, applicationId); + assertTrue(coreBroker.findFirst(applicationId, "eu.nebulouscloud.ui.dsl.metric_model", null, 10).isPresent()); + + /** + * Assert that Optimizer controller requests for node candidates for the + * application cluster + */ + LOGGER.info("Wait for optimizer to request node candidates"); + + Optional nodeRequestToCFSB = coreBroker.findFirst(applicationId, + "eu.nebulouscloud.cfsb.get_node_candidates", null, 10); + assertTrue(nodeRequestToCFSB.isPresent()); + assertNotNull(nodeRequestToCFSB.get().correlationId); + + Optional nodeRequestToSAL = coreBroker.findFirst(applicationId, + "eu.nebulouscloud.exn.sal.nodecandidate.get", null, 10); + assertTrue(nodeRequestToSAL.isPresent()); + assertNotNull(nodeRequestToSAL.get().correlationId); + /** + * Assert that SAL anwsers the request + */ + LOGGER.info("Wait for CFSB to recieve an answer on node candidates from SAL"); + assertTrue(coreBroker.findFirst(applicationId, "eu.nebulouscloud.exn.sal.nodecandidate.get.reply", + m -> nodeRequestToSAL.get().correlationId.equals(m.correlationId), 30).isPresent()); + + /** + * Assert that CFSB anwsers the request + */ + LOGGER.info("Wait for optimizer to recieve an answer on node candidates from CFSB"); + assertTrue(coreBroker.findFirst(applicationId, "eu.nebulouscloud.cfsb.get_node_candidates.reply", + m -> nodeRequestToCFSB.get().correlationId.equals(m.correlationId), 30).isPresent()); + + /** + * Assert that optimiser defines the cluster + */ + LOGGER.info("Wait for optimizer to define cluster"); + Optional defineClusterRequest = coreBroker.findFirst(applicationId, + "eu.nebulouscloud.exn.sal.cluster.define", null, 80); + assertTrue(defineClusterRequest.isPresent()); + LOGGER.info(om.writeValueAsString(defineClusterRequest.get().payload)); + // Retrieve the name of the new cluster + String clusterName = (String) om + .readValue((String) defineClusterRequest.get().payload.get("body"), HashMap.class).get("name"); + + LOGGER.info(String.format("Cluster name: %s", clusterName)); + + /** + * Assert that Optimiser deploys the cluster + */ + LOGGER.info("Wait for optimizer to deploy cluster"); + assertTrue( + coreBroker.findFirst(applicationId, "eu.nebulouscloud.exn.sal.cluster.deploy", null, 80).isPresent()); + + LOGGER.info("Wait for a message from optimizer controller to solver with the AMPL File"); + assertTrue( + coreBroker.findFirst(applicationId, "eu.nebulouscloud.exn.sal.cluster.deploy", null, 80).isPresent()); + + LOGGER.info("Wait for cluster to be ready"); + waitForCluster(clusterName, 60 * 10); + + LOGGER.info("Wait for APP state to be Running"); + assertTrue(waitForAppRunning(60 * 10)); + + LOGGER.info("Wait for APP to be operative"); + assertTrue(checkApplicationWorks(60 * 10)); + // myEXNClient.stop(); + + } + + /** + * Checks that the application is working by sending an input message through + * the app message broker and expecting the apropriate answer from the + * application throught the same app message broker. If the application reports + * a problem with STOMP communication for publishing metrics to EMS "Error in + * STOMP connection", retry 2 times and give up. + * + * @param timeoutSeconds The ammount of seconds to wait for an answer + * @return true if the application responded, false otherwise + * @throws Exception + */ + private boolean checkApplicationWorks(int timeoutSeconds) throws Exception { + long timeout = new Date().getTime() + (timeoutSeconds * 1000); + int retriesLeft = 2; + do { + /** + * Build a request to be sent to the application input topic. + */ + Map inferenceRequest = new HashMap(); + inferenceRequest.put("timestamp", new SimpleDateFormat("YYYY-MM-dd HH:mm:ssZ").format(new Date())); + inferenceRequest.put("job_timestamp", inferenceRequest.get("timestamp")); + inferenceRequest.put("inference_duration", 1); + String jobId = UUID.randomUUID().toString(); + inferenceRequest.put("job_id", jobId); + String payload = om.writeValueAsString(inferenceRequest); + // Send the request + appBroker.publish(mqttAppInputTopic, payload); + + /** + * Check if the application sends a message to the response channel with + * apropriate structure (check it is a JSON and has the job_id value). If found, + * we can consider the app is running + */ + if (appBroker.findFirst(m -> { + return m.jsonPayload() != null && jobId.equals(m.jsonPayload().get("job_id")); + }, 3).isPresent()) { + return true; + } + + /** + * If there is a message with the content "Error in STOMP connection" it means + * that the APP is not able to publish metrics to EMS using STOMP. In this + * situation, retry at most two times. + */ + if (appBroker.findFirst(m -> "Error in STOMP connection".equals(m.payload), 3).isPresent()) { + retriesLeft--; + LOGGER.error("APP is reporting initialization error. Retries left:" + retriesLeft); + appBroker.clearMessageCache(); + if (retriesLeft == 0) + return false; + } + + + } while (new Date().getTime() < timeout); + LOGGER.error("Timeout waiting for a message"); + return false; + + } + + private boolean waitForCluster(String clusterName, int timeoutSeconds) { + long timeout = new Date().getTime() + (timeoutSeconds * 1000); + do { + String status = coreBroker.getClusterStatus(clusterName); + if (status == null || "submited".equals(status)) { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + continue; + } + if ("deployed".equals(status)) { + return true; + } + return false; + } while (new Date().getTime() < timeout); + LOGGER.error("Timeout waiting for a message"); + return false; + } + + /** + * Wait for the optimizer controller to report that the application is on status "RUNNING" (return true) or "FAILED" (return false). + * + *
    + *
  • NEW: The application has been created from the GUI and is waiting for the + * performance indicators from the utility evaluator. * + *
  • READY: The application is ready for deployment. + *
  • DEPLOYING: The application is being deployed or redeployed. + *
  • RUNNING: The application is running. + *
  • FAILED: The application is in an invalid state: one or more messages + * could not be parsed, or deployment or redeployment failed. + * + * @param timeoutSeconds + * @return True if the optimizer controller reported the app to be running, false if the optimizer controller the app to have failed or the timeout is reached. + */ + private boolean waitForAppRunning(int timeoutSeconds) { + + long timeout = new Date().getTime() + (timeoutSeconds * 1000); + do { + /** + * Check if app status is reported to be running + */ + if (coreBroker.findFirst(applicationId, "eu.nebulouscloud.optimiser.controller.app_state", + m -> "RUNNING".equals(m.payload.get("state")), 2).isPresent()) { + return true; + } + /** + * Check if APP status is failed. + */ + if (coreBroker.findFirst(applicationId, "eu.nebulouscloud.optimiser.controller.app_state", + m -> "FAILED".equals(m.payload.get("state")), 2).isPresent()) { + return false; + } + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } while (new Date().getTime() < timeout); + LOGGER.error("Timeout waiting for a message"); + return false; + + } + +} \ No newline at end of file diff --git a/tests/src/test/java/eut/nebulouscloud/tests/MQTTProcessorAppMessageBrokerInterface.java b/tests/src/test/java/eut/nebulouscloud/tests/MQTTProcessorAppMessageBrokerInterface.java new file mode 100644 index 0000000..906d87a --- /dev/null +++ b/tests/src/test/java/eut/nebulouscloud/tests/MQTTProcessorAppMessageBrokerInterface.java @@ -0,0 +1,154 @@ +package eut.nebulouscloud.tests; + +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; + +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +/*** + * Class for facilitating the interaction with the MQTT broker used by the MQTT processing APP. + * Registers any received message and offers methods to query them. + * Implements a method for facilitating sending messages to the processing app using MQTT. + */ +public class MQTTProcessorAppMessageBrokerInterface { + + static Logger LOGGER = LoggerFactory.getLogger(MQTTProcessorAppMessageBrokerInterface.class); + private final AtomicBoolean messageRecieved = new AtomicBoolean(false); + MqttClient client; + protected ObjectMapper om = new ObjectMapper(); + private List messages = Collections.synchronizedList(new LinkedList()); + + public class SimpleMQTTMessage { + final protected ObjectMapper om = new ObjectMapper(); + final String topic; + final String payload; + final Date date; + + public SimpleMQTTMessage(String topic, String payload) { + super(); + this.topic = topic; + this.payload = payload; + this.date = new Date(); + } + + public Map jsonPayload() { + try { + return om.readValue(payload, HashMap.class); + } catch (JsonProcessingException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return null; + } + + } + + public void publish(String topic,String payload) + { + MqttMessage m = new MqttMessage(); + m.setQos(2); + m.setPayload(payload.getBytes()); + try { + client.publish(topic, m); + LOGGER.info("Message published: "+payload); + } catch (MqttException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + public MQTTProcessorAppMessageBrokerInterface(String broker, String baseTopic) { + try { + LOGGER.info("Connecting to broker: " + broker); + client = new MqttClient(broker, MqttClient.generateClientId(), new MemoryPersistence()); + MqttConnectOptions connOpts = new MqttConnectOptions(); + connOpts.setCleanSession(true); + + client.connect(connOpts); + LOGGER.info("Connected"); + + client.setCallback(new MqttCallback() { + @Override + public void connectionLost(Throwable throwable) { + LOGGER.error("Connection lost!"); + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + String payload = new String(message.getPayload()); + LOGGER.info("Message received: " + payload); + messages.add(new SimpleMQTTMessage(topic, payload)); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + } + }); + client.subscribe(baseTopic+"/#"); + } catch (MqttException e) { + e.printStackTrace(); + } + } + + /** + * Waits for a message that matches the given predicate to appear and returns it + * (if found). If timeout is reached without the message being recieved, returns + * an empty optional. + * + * @param predicate The search predicate. If null, it is not used. + * @param timeoutSeconds The maximum timeout to wait for a message with the + * given predicate to be found in the list (in seconds). + * It must be a positive integer or 0. + * @return An optional with the first message that matchs the predicate if any + * found. + */ + public Optional findFirst(Predicate predicate, int timeoutSeconds) { + Optional result = Optional.empty(); + long timeout = new Date().getTime() + (timeoutSeconds * 1000); + do { + synchronized (messages) { + result = messages.stream().filter(predicate).findFirst(); + } + if (result.isEmpty() && new Date().getTime() < timeout) { + LOGGER.error(String.format("Waiting for message. %.2fs left for timeout.", + ((timeout - new Date().getTime()) / 1000.0))); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } while (result.isEmpty() && new Date().getTime() < timeout); + if (new Date().getTime() > timeout) { + LOGGER.error("Timeout waiting for a message"); + } + return result; + + }; + + /** + * Remove all messages from the cache + */ + public void clearMessageCache() + { + synchronized (messages) { + messages.clear(); + } + } + + +} \ No newline at end of file diff --git a/tests/src/test/java/eut/nebulouscloud/tests/NebulOuSCoreMessage.java b/tests/src/test/java/eut/nebulouscloud/tests/NebulOuSCoreMessage.java new file mode 100644 index 0000000..52da118 --- /dev/null +++ b/tests/src/test/java/eut/nebulouscloud/tests/NebulOuSCoreMessage.java @@ -0,0 +1,29 @@ +package eut.nebulouscloud.tests; + +import java.util.Date; +import java.util.Map; + +/** + * An object that represents a message sent through the NebulOuS message broker. + * The prefix "topic://" is removed from the topic value (if exists). + */ +public class NebulOuSCoreMessage { + public Date receptionDate; + public String topic; + public Map payload; + public String applicationId; + public String correlationId; + public NebulOuSCoreMessage(Date receptionDate, String topic, Map payload, String applicationId, + String correlationId) { + super(); + this.receptionDate = receptionDate; + this.topic = topic!=null?topic.replaceFirst("topic://", ""):null; + this.payload = payload; + this.applicationId = applicationId; + this.correlationId = correlationId; + } + + + + +} diff --git a/tests/src/test/java/eut/nebulouscloud/tests/NebulousCoreMessageBrokerInterface.java b/tests/src/test/java/eut/nebulouscloud/tests/NebulousCoreMessageBrokerInterface.java new file mode 100644 index 0000000..2e9a29a --- /dev/null +++ b/tests/src/test/java/eut/nebulouscloud/tests/NebulousCoreMessageBrokerInterface.java @@ -0,0 +1,286 @@ +package eut.nebulouscloud.tests; + +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; + +import org.apache.qpid.protonj2.client.Message; +import org.apache.qpid.protonj2.client.exceptions.ClientException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.nebulouscloud.exn.Connector; +import eu.nebulouscloud.exn.core.Consumer; +import eu.nebulouscloud.exn.core.Context; +import eu.nebulouscloud.exn.core.Handler; +import eu.nebulouscloud.exn.core.Publisher; +import eu.nebulouscloud.exn.core.SyncedPublisher; +import eu.nebulouscloud.exn.handlers.ConnectorHandler; +import eu.nebulouscloud.exn.settings.StaticExnConfig; + +/*** + * Class for facilitating the interaction with the NebulOuS message broker with connection parameters host:"localhost", port: 5672, user: "admin", password:"admin". + * Implements several functions to send messages to mimic the behaviour of certain NebulOuS components. + * Registers any received message and offers methods to query them. + */ +public class NebulousCoreMessageBrokerInterface { + static Logger LOGGER = LoggerFactory.getLogger(NebulousCoreMessageBrokerInterface.class); + protected ObjectMapper om = new ObjectMapper(); + Publisher metricModelPublisher; + Publisher appDeployMessagePublisher; + SyncedPublisher getClusterPublisher; + + /** + * Broker connection properties + */ + final String brokerHost = "localhost"; + final int brokerPort = 5672; + final String brokerUser = "admin"; + final String brokerPassword ="admin"; + + private List messages = Collections.synchronizedList(new LinkedList()); + + + public NebulousCoreMessageBrokerInterface() { + /** + * Setup NebulOuS message broker client + */ + LOGGER.info("Start NebulOuS message broker client"); + metricModelPublisher = new Publisher("metricModelPublisher", "eu.nebulouscloud.ui.dsl.metric_model", true, + true); + appDeployMessagePublisher = new Publisher("appDeployMessagePublisher", "eu.nebulouscloud.ui.dsl.generic", true, + true); + getClusterPublisher = new SyncedPublisher("getCluster", "eu.nebulouscloud.exn.sal.cluster.get", true, true); + Consumer cons1 = new Consumer("monitoring", ">", new MyConsumerHandler(this), true, true); + Connector myEXNClient = new Connector("thisINotImportant", new MyConnectorHandler(), + List.of(metricModelPublisher, appDeployMessagePublisher, getClusterPublisher), List.of(cons1), true, + true, + new StaticExnConfig(brokerHost, brokerPort, brokerUser, brokerPassword)); + myEXNClient.start(); + } + + /** + * Waits for a message that matches the given predicate to appear and returns it + * (if found). If timeout is reached without the message being recieved, returns + * an empty optional. + * + * @param appId the app Id to filter by. If null, no filtering occurs + * by appId + * @param topic the topic to filter by. If null, no filtering occurs by + * topic + * @param predicate The search predicate. If null, it is not used. + * @param timeoutSeconds The maximum timeout to wait for a message with the + * given predicate to be found in the list (in seconds). + * It must be a positive integer or 0. + * @return An optional with the first message that matchs the predicate if any + * found. + */ + public Optional findFirst(String appId, String topic, Predicate predicate, + int timeoutSeconds) { + Optional result = Optional.empty(); + long timeout = new Date().getTime() + (timeoutSeconds * 1000); + Predicate finalPredicate = predicate != null + ? messagesFromAppAndTopic(appId, topic).and(predicate) + : messagesFromAppAndTopic(appId, topic); + do { + synchronized (messages) { + + result = messages.stream().filter(finalPredicate).findFirst(); + } + if (result.isEmpty() && new Date().getTime() < timeout) { + LOGGER.error(String.format("Waiting for message. %.2fs left for timeout.", + ((timeout - new Date().getTime()) / 1000.0))); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } while (result.isEmpty() && new Date().getTime() < timeout); + if (new Date().getTime() > timeout) { + LOGGER.error("Timeout waiting for a message"); + } + return result; + + }; + + class MyConsumerHandler extends Handler { + NebulousCoreMessageBrokerInterface messageStore; + + public MyConsumerHandler(NebulousCoreMessageBrokerInterface messageStore) { + this.messageStore = messageStore; + } + + @Override + public void onMessage(String key, String address, Map body, Message message, Context context) { + String to = "??"; + try { + to = message.to() != null ? message.to() : address; + } catch (Exception ex) { + ex.printStackTrace(); + } + Map props = new HashMap(); + + try { + message.forEachProperty((k, v) -> props.put(k, v)); + } catch (ClientException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + String subject = "?"; + try { + subject = message.subject(); + } catch (ClientException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + Object correlationId = 0; + try { + correlationId = message.correlationId(); + } catch (ClientException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + LOGGER.trace("\r\n{}\r\nsubject:{}\r\npayload:{}\r\nproperties:{}\r\ncorrelationId:{}", to, subject, body, + props, correlationId); + + NebulOuSCoreMessage internal = new NebulOuSCoreMessage(new Date(), to, body, + (String) props.getOrDefault("application", null), + correlationId != null ? correlationId.toString() : ""); + messageStore.add(internal); + try { + LOGGER.trace(om.writeValueAsString(body)); + } catch (JsonProcessingException e) { + } + } + } + + /** + * Queries SAL for the status of a cluster + * @param clusterName The cluster name. + * @return The cluster status, or null in case of error. + */ + public String getClusterStatus(String clusterName) { + Map msg = Map.of("metaData", Map.of("user", "admin", "clusterName", clusterName)); + Map response = getClusterPublisher.sendSync(msg, clusterName, null, false); + JsonNode payload = extractPayloadFromExnResponse(response, "getCluster"); + if (payload.isMissingNode()) + return null; + LOGGER.info("isClusterReady: " + payload.toString()); + JsonNode jsonState = payload.at("/status"); + if (jsonState.isMissingNode()) + return null; + return jsonState.asText(); + } + + /** + * Extract and check the SAL response from an exn-middleware response. The SAL + * response will be valid JSON encoded as a string in the "body" field of the + * response. If the response is of the following form, log an error and return a + * missing node instead: + * + *
    {@code
    +	 * {
    +	 *   "key": ,
    +	 *   "message": "some error message"
    +	 * }
    +	 * }
    + * + * @param responseMessage The response from exn-middleware. + * @param caller Caller information, used for logging only. + * @return The SAL response as a parsed JsonNode, or a node where {@code + * isMissingNode()} will return true if SAL reported an error. + */ + private JsonNode extractPayloadFromExnResponse(Map responseMessage, String caller) { + JsonNode response = om.valueToTree(responseMessage); + String salRawResponse = response.at("/body").asText(); // it's already a string, asText() is for the type system + JsonNode metadata = response.at("/metaData"); + JsonNode salResponse = om.missingNode(); // the data coming from SAL + try { + salResponse = om.readTree(salRawResponse); + } catch (JsonProcessingException e) { + LOGGER.error("Could not read message body as JSON: body = '{}', caller = '{}'", salRawResponse, caller, e); + return om.missingNode(); + } + if (!metadata.at("/status").asText().startsWith("2")) { + // we only accept 200, 202, numbers of that nature + LOGGER.error("exn-middleware-sal request failed with error code '{}' and message '{}', caller '{}'", + metadata.at("/status"), salResponse.at("/message").asText(), caller); + return om.missingNode(); + } + return salResponse; + } + + private class MyConnectorHandler extends ConnectorHandler { + + public void onReady(AtomicReference context) { + LOGGER.info("Optimiser-controller connected to ActiveMQ"); + } + } + + + /** + * Build a predicate that filters by messages for a certain appId and topic + * @param appId: The appId to filter for. If null, it is ignored + * @param topic: The topic to filter for. If null, it is ignored. + * @return + */ + public static Predicate messagesFromAppAndTopic(String appId, String topic) { + return messagesFromApp(appId).and((Predicate) m -> topic == null || topic.equals(m.topic)); + } + + /** + * Builds a predicate that filters messages for the given app ID. If appID is null, the predicate has no effect. + * @param id + * @return + */ + public static Predicate messagesFromApp(String id) { + return ((Predicate) m -> id == null || id.equals(m.applicationId)); + } + + /** + * Adds a new message to the cache + * + * @param message + */ + public void add(NebulOuSCoreMessage message) { + try { + LOGGER.trace("Adding message:" + om.writeValueAsString(message)); + } catch (JsonProcessingException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + this.messages.add(message); + } + + /** + * Sends an app creation message as done by the UI during deployment. + * @param appCreationPayload + * @param applicationId + */ + public void sendAppCreationMessage(Map appCreationPayload, String applicationId) { + appDeployMessagePublisher.send(appCreationPayload, applicationId); + + } + /** + * Sends the metric model message as done by the UI during deployment. + * @param metricModelPayload + * @param applicationId + */ + public void sendMetricModelMessage(Map metricModelPayload,String applicationId) + { + metricModelPublisher.send(metricModelPayload,applicationId); + } + +} diff --git a/tests/src/test/java/eut/nebulouscloud/tests/SendPayloadToMQTTProcessorApp.java b/tests/src/test/java/eut/nebulouscloud/tests/SendPayloadToMQTTProcessorApp.java new file mode 100644 index 0000000..4d33934 --- /dev/null +++ b/tests/src/test/java/eut/nebulouscloud/tests/SendPayloadToMQTTProcessorApp.java @@ -0,0 +1,92 @@ +package eut.nebulouscloud.tests; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +public class SendPayloadToMQTTProcessorApp { + static Logger LOGGER = LoggerFactory.getLogger(SendPayloadToMQTTProcessorApp.class); + static protected ObjectMapper om = new ObjectMapper(); + static int DELAY_SECONDS = 3; + static MQTTProcessorAppMessageBrokerInterface appBroker; + static String mqttBroker = "broker.emqx.io"; + static String mqttPort = "1883"; + static String mqttTopicPrefix = "atest"; + static String mqttAppInputTopic = mqttTopicPrefix + "/input"; + static String mqttAppOutputTopic = mqttTopicPrefix + "/output"; + + public static void main(String[] args) throws Exception + { + //String applicationId = "application=1431290905automated-testing-mqtt-app-1715257889393"; + String applicationId = "1549030905automated-testing-mqtt-app-1715262543304"; + LOGGER.info(String.format("Begin MQTT Processor APP deployment. applicationId is %s", applicationId)); + appBroker = new MQTTProcessorAppMessageBrokerInterface("tcp://" + mqttBroker + ":" + mqttPort, mqttAppOutputTopic); + while(true) + { + if(sendPayloadAndWaitAnswer(20) == false) + { + break; + }else + { + LOGGER.info("App responded"); + } + Thread.sleep(1000); + } + } + + private static boolean sendPayloadAndWaitAnswer(int timeoutSeconds) throws Exception { + long timeout = new Date().getTime() + (timeoutSeconds * 1000); + int retriesLeft = 2; + do { + /** + * Build a request to be sent to the application input topic. + */ + Map inferenceRequest = new HashMap(); + inferenceRequest.put("timestamp", new SimpleDateFormat("YYYY-MM-dd HH:mm:ssZ").format(new Date())); + inferenceRequest.put("job_timestamp", inferenceRequest.get("timestamp")); + inferenceRequest.put("inference_duration", 5); + String jobId = UUID.randomUUID().toString(); + inferenceRequest.put("job_id", jobId); + String payload = om.writeValueAsString(inferenceRequest); + // Send the request + appBroker.publish(mqttAppInputTopic, payload); + + /** + * Check if the application sends a message to the response channel with + * apropriate structure (check it is a JSON and has the job_id value). If found, + * we can consider the app is running + */ + if (appBroker.findFirst(m -> { + return m.jsonPayload() != null && jobId.equals(m.jsonPayload().get("job_id")); + }, 3).isPresent()) { + return true; + } + + /** + * If there is a message with the content "Error in STOMP connection" it means + * that the APP is not able to publish metrics to EMS using STOMP. In this + * situation, retry at most two times. + */ + if (appBroker.findFirst(m -> "Error in STOMP connection".equals(m.payload), 3).isPresent()) { + retriesLeft--; + LOGGER.error("APP is reporting initialization error. Retries left:" + retriesLeft); + appBroker.clearMessageCache(); + if (retriesLeft == 0) + return false; + } + + + } while (new Date().getTime() < timeout); + LOGGER.error("Timeout waiting for a message"); + return false; + + } + +} diff --git a/tests/src/test/resources/mqtt_processor_app/app_creation_message.json b/tests/src/test/resources/mqtt_processor_app/app_creation_message.json new file mode 100644 index 0000000..ba8cf21 --- /dev/null +++ b/tests/src/test/resources/mqtt_processor_app/app_creation_message.json @@ -0,0 +1,116 @@ +{ + "title": "{{APP_ID}}", + "uuid": "{{APP_ID}}", + "status": "deploying", + "content": "apiVersion: \"core.oam.dev/v1beta1\"\r\nkind: \"Application\"\r\nmetadata:\r\n name: \"{{APP_ID}}\"\r\nspec:\r\n components:\r\n - name: \"dummy-app-worker\"\r\n type: \"webservice\"\r\n properties:\r\n image: \"docker.io/rsprat/mytestrepo:v1\"\r\n cpu: \"2.0\"\r\n memory: \"2048Mi\"\r\n imagePullPolicy: \"Always\"\r\n cmd:\r\n - \"python\"\r\n - \"-u\"\r\n - \"worker.py\"\r\n env:\r\n - name: \"mqtt_ip\"\r\n value: \"{{APP_MQTT_BROKER_SERVER}}\"\r\n - name: \"mqtt_port\"\r\n value: \"{{APP_MQTT_BROKER_PORT}}\"\r\n - name: \"mqtt_subscribe_topic\"\r\n value: \"{{APP_MQTT_INPUT_TOPIC}}\"\r\n - name: \"mqtt_publish_topic\"\r\n value: \"{{APP_MQTT_OUTPUT_TOPIC}}\"\r\n - name: \"report_metrics_to_ems\"\r\n value: \"{{REPORT_METRICS_TO_EMS}}\"\r\n - name: \"nebulous_ems_ip\"\r\n valueFrom:\r\n fieldRef:\r\n fieldPath: status.hostIP\r\n - name: \"nebulous_ems_port\"\r\n value: \"61610\"\r\n - name: \"nebulous_ems_user\"\r\n value: \"aaa\"\r\n - name: \"nebulous_ems_password\"\r\n value: \"111\"\r\n - name: \"nebulous_ems_metrics_topic\"\r\n value: \"/topic/RawProcessingLatency_SENSOR\"\r\n traits:\r\n - type: \"scaler\"\r\n properties:\r\n replicas: 1\r\n\r\n\r\n policies:\r\n - name: \"target-default\"\r\n type: \"topology\"\r\n properties:\r\n namespace: \"default\"\r\n workflow:\r\n steps:\r\n - name: \"deploy2default\"\r\n type: \"deploy\"\r\n properties:\r\n policies:\r\n - \"target-default\"", + "variables": [ + { + "key": "spec_components_0_traits_0_properties_replicas", + "path": "/spec/components/0/traits/0/properties/replicas", + "type": "float", + "meaning": "replicas", + "value": { + "lower_bound": 1, + "higher_bound": 5 + } + } + ], + "environmentVariables": [], + "resources": [ + + ], + "templates": [], + "parameters": [], + "metrics": [ + { + "type": "raw", + "name": "RawProcessingLatency", + "level": "global", + "components": [], + "sensor": "job_process_time_instance", + "config": [], + "isWindowOutputRaw": true, + "outputRaw": { + "type": "all", + "interval": 30, + "unit": "sec" + } + }, + { + "type": "composite", + "level": "global", + "components": [], + "name": "MeanJobProcessingLatency", + "template": "", + "formula": "mean(RawProcessingLatency)", + "isWindowInput": true, + "input": { + "type": "sliding", + "interval": 30, + "unit": "sec" + }, + "isWindowOutput": true, + "output": { + "type": "all", + "interval": 30, + "unit": "sec" + }, + "arguments": [ + "RawProcessingLatency" + ] + } + ], + "sloViolations": { + "nodeKey": "5ce4273e-5ac3-478b-b460-075b053fb994", + "isComposite": true, + "condition": "AND", + "not": false, + "children": [ + { + "nodeKey": "982c13a8-bbae-4574-b2be-eca15b865563", + "isComposite": false, + "metricName": "MeanJobProcessingLatency", + "operator": ">", + "value": 50 + } + ] + }, + "utilityFunctions": [ + { + "name": "f", + "type": "minimize", + "expression": { + "formula": "(dummy_app_worker_MeanJobProcessingLatency*currentReplicas)/spec_components_0_traits_0_properties_replicas", + "variables": [ + { + "name": "dummy_app_worker_MeanJobProcessingLatency", + "value": "MeanJobProcessingLatency" + }, + { + "name": "currentReplicas", + "value": "currentReplicas" + }, + { + "name": "spec_components_0_traits_0_properties_replicas", + "value": "spec_components_0_traits_0_properties_replicas" + } + ] + } + }, + { + "name": "currentReplicas", + "type": "constant", + "expression": { + "formula": "a", + "variables": [ + { + "name": "a", + "value": "spec_components_0_traits_0_properties_replicas" + } + ] + } + } + ], + "_create": true, + "_delete": true +} \ No newline at end of file diff --git a/tests/src/test/resources/mqtt_processor_app/metric_model.json b/tests/src/test/resources/mqtt_processor_app/metric_model.json new file mode 100644 index 0000000..cb0f7fb --- /dev/null +++ b/tests/src/test/resources/mqtt_processor_app/metric_model.json @@ -0,0 +1 @@ +{"application":"{{APP_ID}}","yaml":{"apiVersion":"nebulous/v1","kind":"MetricModel","metadata":{"name":"{{APP_ID}}","labels":{"app":"{{APP_ID}}"}},"templates":[],"spec":{"components":[{"name":"spec-comp","metrics":[]}],"scopes":[{"name":"app-wide-scope","components":[],"metrics":[{"name":"RawProcessingLatency","type":"raw","sensor":{"type":"job_process_time_instance","config":{}},"output":"all 30 sec"},{"name":"MeanJobProcessingLatency","type":"composite","template":"","formula":"mean(RawProcessingLatency)","window":{"type":"sliding","size":"30 sec"},"output":"all 30 sec"}],"requirements":[{"name":"Combined_SLO","type":"slo","constraint":"(MeanJobProcessingLatency > 50)"}]}]}}} \ No newline at end of file