robert.sanfeliu 2024-05-09 17:15:49 +02:00
__pycache__/ __pycache__/
.nox/ .nox/

NebulOuS integration tests
- tests: Folder With a Java Maven project with JUnit tests that validate basic aspects of NebulOuS
- apps: Folder containing necessary testing apps used during the testing process.

Source code of the mqtt processor app used for testing.
On startup, the application connects to the configured MQTT broker and waits for messages on the topic APP_MQTT_INPUT_TOPIC. When a well structured message on said topic, the application simulates some work and sends a message to APP_MQTT_OUTPUT_TOPIC.
The structure of the input message is:
- job_id: An unique UUID assigned to the job
- timestamp: Timestamp of the request with the format YYYY-MM-dd HH:mm:ssZ
- job_timestamp: Same as timestamp
- inference_duration: Time in seconds that the worker processing this job will sleep to simulate a time consuming inference process.
The worker needs the following environment variables to work:
- mqtt_ip: The IP/host of the MQTT broker
- mqtt_port: The port of the MQTT broker
- mqtt_subscribe_topic: The topic to subscribe to and recieve requests
- mqtt_publish_topic: The topic to connect to and publish results
- report_metrics_to_ems: Flag to indicate if metrics should be published to EMS
- nebulous_ems_ip: EMS IP
- nebulous_ems_port: EMS port
- nebulous_ems_user: EMS user
- nebulous_ems_password: EMS password
- nebulous_ems_metrics_topic: EMS topic to use to report metrics

FROM python:3.11
RUN mkdir /app
COPY ./requirements.txt /app/requirements.txt
RUN pip install -r /app/requirements.txt
COPY ./ ./
CMD [ "python3","-u", "./"]

import paho.mqtt.client as mqtt
import stomp
import os.path
import logging
import threading
import queue
import time
import datetime
import json
from uuid import uuid4
import sys
import traceback
print("Starting dummy app worker")
shared_stack = queue.Queue()
worker_id = str(uuid4())
# MQTT Broker details
mqtt_broker_address = os.getenv("mqtt_ip")
mqtt_port = int(os.getenv("mqtt_port"))
mqtt_topic = os.getenv("mqtt_subscribe_topic")
mqtt_publish_topic = os.getenv("mqtt_publish_topic")
# STOMP Broker details
report_metrics_to_ems = os.getenv("report_metrics_to_ems")
stomp_broker_address = os.getenv("nebulous_ems_ip")
stomp_port = int(os.getenv("nebulous_ems_port"))
stomp_destination = os.getenv("nebulous_ems_metrics_topic")
stomp_user = os.getenv("nebulous_ems_user")
stomp_pass = os.getenv("nebulous_ems_password")
def map_value(old_value, old_min, old_max, new_min, new_max):
return ( (old_value - old_min) / (old_max - old_min) ) * (new_max - new_min) + new_min
# MQTT callback function
def on_message(client, userdata, message):
payload = message.payload.decode("utf-8")
print("Recieved MQTT message",payload)
print("Message added to stack. Current length:",shared_stack.qsize())
backpressure = map_value(min(shared_stack.qsize(),10),0,10,0,2)
print("Backpressure: ",backpressure)
if backpressure>0:
except Exception as e:
def process_messages():
while True:
# Get message from the shared stack
payload = shared_stack.get()
payload = json.loads(payload)
print("Processing ",payload)
print("Proceed to simulate an inference of ",payload["inference_duration"])
date_timestamp = datetime.datetime.strptime(payload['job_timestamp'], "%Y-%m-%d %H:%M:%S%z")
total_job_duration = int(( - date_timestamp).total_seconds())
print(f"total_job_duration: {total_job_duration}")
json_msg = {
"metricValue": total_job_duration,
"level": 1,
"timestamp": int(
payload["worker_id"] = worker_id
payload["total_job_duration"] = total_job_duration
payload["job_completion_timestamp"] ="%Y-%m-%d %H:%M:%S%z")
if "True" == report_metrics_to_ems:
print("send_metric ",json_msg)
stomp_client.send(body=json.dumps(json_msg), headers={'type':'textMessage', 'amq-msg-type':'text'}, destination=stomp_destination)
print("EMS reporting is disabled.")
except Exception as e:
# STOMP connection callback
def on_connect_stomp():
print("Connected to STOMP broker")
# STOMP error callback
def on_error_stomp():
print("Error in STOMP connection")
logger = logging.getLogger(__name__)
print("Connecting to MQTT")
# Initialize MQTT client
mqtt_client = mqtt.Client()
mqtt_client.on_message = on_message
mqtt_client.connect(mqtt_broker_address, mqtt_port)
publish_thread = threading.Thread(target=process_messages)
publish_thread.daemon = True # Daemonize the thread so it will exit when the main thread exits
if "True" == report_metrics_to_ems:
print("Connecting to STOMP")
stomp_client = stomp.Connection12(host_and_ports=[(stomp_broker_address, stomp_port)])
stomp_client.set_listener('', stomp.PrintingListener())
stomp_client.connect(stomp_user, stomp_pass, wait=True)
except Exception as e:
mqtt_client.publish(mqtt_publish_topic,"Error in STOMP connection",2)
print("Start MQTT Loop")
# Start the MQTT client loop
print("App ended")

# Ignore Gradle project-specific cache directory
<?xml version="1.0" encoding="UTF-8"?>
# Log4J 2 configuration
# Monitor config file every X seconds for updates
monitorInterval = 5
loggers = activemq = org.apache.activemq
logger.activemq.level = WARN
loggers = test = eut.nebulouscloud.automated_tests
logger.test.level = DEBUG = TRACE
rootLogger.level = INFO
#rootLogger = console
rootLogger.appenderRef.console.ref = console
appenders = console, file
appender.file.type = File = LOGFILE
appender.file.layout.pattern=[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n
#appender.file.filter.threshold.type = ThresholdFilter
#appender.file.filter.threshold.level = info
# Console appender
appender.console.type = Console = STDOUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n
rootLogger.appenderRefs = stdout, logfile
rootLogger.appenderRef.stdout.ref = STDOUT
rootLogger.appenderRef.logfile.ref = LOGFILE

package eut.nebulouscloud.tests;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
public class FileTemplatingUtils {
static Logger LOGGER = LoggerFactory.getLogger(FileTemplatingUtils.class);
static ObjectMapper om = new ObjectMapper();
* Load a JSON file stored in the resources folder of the project and perform the substitutions provided.
* @param path
* @param substitutions
* @return
* @throws Exception
public static Map<String,Object> loadJSONFileAndSubstitute(String path,Map<String,String> substitutions) throws Exception
return om.readValue(loadFileAndSubstitute(path, substitutions),HashMap.class);
* Load a text file stored in the resources folder of the project and perform the substitutions provided.
* @param path
* @param substitutions
* @return
* @throws Exception
public static String loadFileAndSubstitute(String path,Map<String,String> substitutions) throws Exception
StringBuilder contentBuilder = new StringBuilder();
try (BufferedReader br = new BufferedReader(new InputStreamReader(FileTemplatingUtils.class.getClassLoader()
.getResourceAsStream(path)))) {
String line;
while ((line = br.readLine()) != null) {
// Apply substitutions
line = applySubstitutions(line, substitutions);
catch(Exception ex)
throw new Exception(ex);
return contentBuilder.toString();
* Find any placeholders in the given line and substitute them with the appropriate value
* @param line
* @param substitutions
* @return
private static String applySubstitutions(String line, Map<String, String> substitutions) {
for (Map.Entry<String, String> entry : substitutions.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
line = line.replace(key, value);
return line;

package eut.nebulouscloud.tests;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import com.fasterxml.jackson.databind.ObjectMapper;
class MQTTProcessorAppDeploymentTest {
static Logger LOGGER = LoggerFactory.getLogger(MQTTProcessorAppDeploymentTest.class);
protected ObjectMapper om = new ObjectMapper();
static int DELAY_SECONDS = 3;
String applicationId = new SimpleDateFormat("HHmmssddMM").format(new Date())
+ "automated-testing-mqtt-app-"
+ new Date().getTime();
NebulousCoreMessageBrokerInterface coreBroker;
MQTTProcessorAppMessageBrokerInterface appBroker;
String mqttBroker = "";
String mqttPort = "1883";
String mqttTopicPrefix = "atest";
String mqttAppInputTopic = mqttTopicPrefix + "/input";
String mqttAppOutputTopic = mqttTopicPrefix + "/output";
* This test ensures that a MQTT processor app can be deployed using NebulOuS. The test
* simulates the user requesting a deployment of an app through UI (sending app
* creation message and metric model). Then, the test asserts that optimizer
* controller performs expected actions, namely: - requesting node candidates
* and geting a response from CFSB - defines the app cluster - deploys the app
* cluster - sends the AMPL file for the solver - reports app status to be
* running
* Once the optimizer controller reports the app being successfully deployed,
* the test asserts that the app works as expected. For this, it connects to a
* public MQTT broker where the app is waiting for job requests, publish one and waits for the response
* for them.
* @throws Exception
void test() throws Exception {"Begin MQTT Processor APP deployment. applicationId is %s", applicationId));
coreBroker = new NebulousCoreMessageBrokerInterface();
appBroker = new MQTTProcessorAppMessageBrokerInterface("tcp://" + mqttBroker + ":" + mqttPort, mqttAppOutputTopic);
* Prepare and send app creation message and assert is correctly received by any subscriber.
* The app creation message payload template is stored in the project resources folder. This file contains several
* parameters that need to be substituted. These are:
* APP_ID: The id of the app being deployed
* MQTT connection details (APP_MQTT_BROKER_SERVER, APP_MQTT_BROKER_PORT, APP_MQTT_INPUT_TOPIC, APP_MQTT_OUTPUT_TOPIC): On startup, the application connects to the configured MQTT broker
* and waits for messages on the topic APP_MQTT_INPUT_TOPIC. Uppon a well structured message on said topic, the application simulates some work and sends a message to APP_MQTT_OUTPUT_TOPIC.
* REPORT_METRICS_TO_EMS: If true, application tries to connect to local EMS broker to report metrics. If false, not.
*/"send app creation message");
Map<String, String> appParameters = new HashMap<String, String>();
appParameters.put("{{APP_ID}}", applicationId);
appParameters.put("{{APP_MQTT_BROKER_SERVER}}", mqttBroker);
appParameters.put("{{APP_MQTT_BROKER_PORT}}", mqttPort);
appParameters.put("{{APP_MQTT_INPUT_TOPIC}}", "$share/workers/" + mqttAppInputTopic);
appParameters.put("{{APP_MQTT_OUTPUT_TOPIC}}", mqttAppOutputTopic);
appParameters.put("{{REPORT_METRICS_TO_EMS}}", "True");
Map<String, Object> appCreationPayload = FileTemplatingUtils
.loadJSONFileAndSubstitute("mqtt_processor_app/app_creation_message.json", appParameters);
coreBroker.sendAppCreationMessage(appCreationPayload, applicationId);
// Assert that the message was sent
assertTrue(coreBroker.findFirst(applicationId, "eu.nebulouscloud.ui.dsl.generic", null, 10).isPresent());
Thread.sleep(DELAY_SECONDS * 1000);
* Send metric model and assert is correctly received by any subscriber
*/"send metric model");
Map<String, Object> metricModelPayload = FileTemplatingUtils.loadJSONFileAndSubstitute("mqtt_processor_app/metric_model.json",
Map.of("{{APP_ID}}", applicationId));
coreBroker.sendMetricModelMessage(metricModelPayload, applicationId);
assertTrue(coreBroker.findFirst(applicationId, "eu.nebulouscloud.ui.dsl.metric_model", null, 10).isPresent());
* Assert that Optimizer controller requests for node candidates for the
* application cluster
*/"Wait for optimizer to request node candidates");
Optional<NebulOuSCoreMessage> nodeRequestToCFSB = coreBroker.findFirst(applicationId,
"eu.nebulouscloud.cfsb.get_node_candidates", null, 10);
Optional<NebulOuSCoreMessage> nodeRequestToSAL = coreBroker.findFirst(applicationId,
"eu.nebulouscloud.exn.sal.nodecandidate.get", null, 10);
* Assert that SAL anwsers the request
*/"Wait for CFSB to recieve an answer on node candidates from SAL");
assertTrue(coreBroker.findFirst(applicationId, "eu.nebulouscloud.exn.sal.nodecandidate.get.reply",
m -> nodeRequestToSAL.get().correlationId.equals(m.correlationId), 30).isPresent());
* Assert that CFSB anwsers the request
*/"Wait for optimizer to recieve an answer on node candidates from CFSB");
assertTrue(coreBroker.findFirst(applicationId, "eu.nebulouscloud.cfsb.get_node_candidates.reply",
m -> nodeRequestToCFSB.get().correlationId.equals(m.correlationId), 30).isPresent());
* Assert that optimiser defines the cluster
*/"Wait for optimizer to define cluster");
Optional<NebulOuSCoreMessage> defineClusterRequest = coreBroker.findFirst(applicationId,
"eu.nebulouscloud.exn.sal.cluster.define", null, 80);
// Retrieve the name of the new cluster
String clusterName = (String) om
.readValue((String) defineClusterRequest.get().payload.get("body"), HashMap.class).get("name");"Cluster name: %s", clusterName));
* Assert that Optimiser deploys the cluster
*/"Wait for optimizer to deploy cluster");
coreBroker.findFirst(applicationId, "eu.nebulouscloud.exn.sal.cluster.deploy", null, 80).isPresent());"Wait for a message from optimizer controller to solver with the AMPL File");
coreBroker.findFirst(applicationId, "eu.nebulouscloud.exn.sal.cluster.deploy", null, 80).isPresent());"Wait for cluster to be ready");
waitForCluster(clusterName, 60 * 10);"Wait for APP state to be Running");
assertTrue(waitForAppRunning(60 * 10));"Wait for APP to be operative");
assertTrue(checkApplicationWorks(60 * 10));
// myEXNClient.stop();
* Checks that the application is working by sending an input message through
* the app message broker and expecting the apropriate answer from the
* application throught the same app message broker. If the application reports
* a problem with STOMP communication for publishing metrics to EMS "Error in
* STOMP connection", retry 2 times and give up.
* @param timeoutSeconds The ammount of seconds to wait for an answer
* @return true if the application responded, false otherwise
* @throws Exception
private boolean checkApplicationWorks(int timeoutSeconds) throws Exception {
long timeout = new Date().getTime() + (timeoutSeconds * 1000);
int retriesLeft = 2;
do {
* Build a request to be sent to the application input topic.
Map<String, Object> inferenceRequest = new HashMap<String, Object>();
inferenceRequest.put("timestamp", new SimpleDateFormat("YYYY-MM-dd HH:mm:ssZ").format(new Date()));
inferenceRequest.put("job_timestamp", inferenceRequest.get("timestamp"));
inferenceRequest.put("inference_duration", 1);
String jobId = UUID.randomUUID().toString();
inferenceRequest.put("job_id", jobId);
String payload = om.writeValueAsString(inferenceRequest);
// Send the request
appBroker.publish(mqttAppInputTopic, payload);
* Check if the application sends a message to the response channel with
* apropriate structure (check it is a JSON and has the job_id value). If found,
* we can consider the app is running
if (appBroker.findFirst(m -> {
return m.jsonPayload() != null && jobId.equals(m.jsonPayload().get("job_id"));
}, 3).isPresent()) {
return true;
* If there is a message with the content "Error in STOMP connection" it means
* that the APP is not able to publish metrics to EMS using STOMP. In this
* situation, retry at most two times.
if (appBroker.findFirst(m -> "Error in STOMP connection".equals(m.payload), 3).isPresent()) {
LOGGER.error("APP is reporting initialization error. Retries left:" + retriesLeft);
if (retriesLeft == 0)
return false;
} while (new Date().getTime() < timeout);
LOGGER.error("Timeout waiting for a message");
return false;
private boolean waitForCluster(String clusterName, int timeoutSeconds) {
long timeout = new Date().getTime() + (timeoutSeconds * 1000);
do {
String status = coreBroker.getClusterStatus(clusterName);
if (status == null || "submited".equals(status)) {
try {
} catch (InterruptedException e) {
// TODO Auto-generated catch block
if ("deployed".equals(status)) {
return true;
return false;
} while (new Date().getTime() < timeout);
LOGGER.error("Timeout waiting for a message");
return false;
* Wait for the optimizer controller to report that the application is on status "RUNNING" (return true) or "FAILED" (return false).
* <ul>
* <li>NEW: The application has been created from the GUI and is waiting for the
* performance indicators from the utility evaluator. *
* <li>READY: The application is ready for deployment.
* <li>DEPLOYING: The application is being deployed or redeployed.
* <li>RUNNING: The application is running.
* <li>FAILED: The application is in an invalid state: one or more messages
* could not be parsed, or deployment or redeployment failed.
* @param timeoutSeconds
* @return True if the optimizer controller reported the app to be running, false if the optimizer controller the app to have failed or the timeout is reached.
private boolean waitForAppRunning(int timeoutSeconds) {
long timeout = new Date().getTime() + (timeoutSeconds * 1000);
do {
* Check if app status is reported to be running
if (coreBroker.findFirst(applicationId, "eu.nebulouscloud.optimiser.controller.app_state",
m -> "RUNNING".equals(m.payload.get("state")), 2).isPresent()) {
return true;
* Check if APP status is failed.
if (coreBroker.findFirst(applicationId, "eu.nebulouscloud.optimiser.controller.app_state",
m -> "FAILED".equals(m.payload.get("state")), 2).isPresent()) {
return false;
try {
} catch (InterruptedException e) {
} while (new Date().getTime() < timeout);
LOGGER.error("Timeout waiting for a message");
return false;

package eut.nebulouscloud.tests;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
* Class for facilitating the interaction with the MQTT broker used by the MQTT processing APP.
* Registers any received message and offers methods to query them.
* Implements a method for facilitating sending messages to the processing app using MQTT.
public class MQTTProcessorAppMessageBrokerInterface {
static Logger LOGGER = LoggerFactory.getLogger(MQTTProcessorAppMessageBrokerInterface.class);
private final AtomicBoolean messageRecieved = new AtomicBoolean(false);
MqttClient client;
protected ObjectMapper om = new ObjectMapper();
private List<SimpleMQTTMessage> messages = Collections.synchronizedList(new LinkedList<SimpleMQTTMessage>());
public class SimpleMQTTMessage {
final protected ObjectMapper om = new ObjectMapper();
final String topic;
final String payload;
final Date date;
public SimpleMQTTMessage(String topic, String payload) {
this.topic = topic;
this.payload = payload; = new Date();
public Map<String, Object> jsonPayload() {
try {
return om.readValue(payload, HashMap.class);
} catch (JsonProcessingException e) {
// TODO Auto-generated catch block
return null;
public void publish(String topic,String payload)
MqttMessage m = new MqttMessage();
try {
client.publish(topic, m);"Message published: "+payload);
} catch (MqttException e) {
// TODO Auto-generated catch block
public MQTTProcessorAppMessageBrokerInterface(String broker, String baseTopic) {
try {"Connecting to broker: " + broker);
client = new MqttClient(broker, MqttClient.generateClientId(), new MemoryPersistence());
MqttConnectOptions connOpts = new MqttConnectOptions();
client.setCallback(new MqttCallback() {
public void connectionLost(Throwable throwable) {
LOGGER.error("Connection lost!");
public void messageArrived(String topic, MqttMessage message) throws Exception {
String payload = new String(message.getPayload());"Message received: " + payload);
messages.add(new SimpleMQTTMessage(topic, payload));
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
} catch (MqttException e) {
* Waits for a message that matches the given predicate to appear and returns it
* (if found). If timeout is reached without the message being recieved, returns
* an empty optional.
* @param predicate The search predicate. If null, it is not used.
* @param timeoutSeconds The maximum timeout to wait for a message with the
* given predicate to be found in the list (in seconds).
* It must be a positive integer or 0.
* @return An optional with the first message that matchs the predicate if any
* found.
public Optional<SimpleMQTTMessage> findFirst(Predicate<SimpleMQTTMessage> predicate, int timeoutSeconds) {
Optional<SimpleMQTTMessage> result = Optional.empty();
long timeout = new Date().getTime() + (timeoutSeconds * 1000);
do {
synchronized (messages) {
result =;
if (result.isEmpty() && new Date().getTime() < timeout) {
LOGGER.error(String.format("Waiting for message. %.2fs left for timeout.",
((timeout - new Date().getTime()) / 1000.0)));
try {
} catch (InterruptedException e) {
} while (result.isEmpty() && new Date().getTime() < timeout);
if (new Date().getTime() > timeout) {
LOGGER.error("Timeout waiting for a message");
return result;
* Remove all messages from the cache
public void clearMessageCache()
synchronized (messages) {

package eut.nebulouscloud.tests;
import java.util.Date;
import java.util.Map;
* An object that represents a message sent through the NebulOuS message broker.
* The prefix "topic://" is removed from the topic value (if exists).
public class NebulOuSCoreMessage {
public Date receptionDate;
public String topic;
public Map<String,Object> payload;
public String applicationId;
public String correlationId;
public NebulOuSCoreMessage(Date receptionDate, String topic, Map<String, Object> payload, String applicationId,
String correlationId) {
this.receptionDate = receptionDate;
this.topic = topic!=null?topic.replaceFirst("topic://", ""):null;
this.payload = payload;
this.applicationId = applicationId;
this.correlationId = correlationId;

package eut.nebulouscloud.tests;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.apache.qpid.protonj2.client.Message;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.nebulouscloud.exn.Connector;
import eu.nebulouscloud.exn.core.Consumer;
import eu.nebulouscloud.exn.core.Context;
import eu.nebulouscloud.exn.core.Handler;
import eu.nebulouscloud.exn.core.Publisher;
import eu.nebulouscloud.exn.core.SyncedPublisher;
import eu.nebulouscloud.exn.handlers.ConnectorHandler;
import eu.nebulouscloud.exn.settings.StaticExnConfig;
* Class for facilitating the interaction with the NebulOuS message broker with connection parameters host:"localhost", port: 5672, user: "admin", password:"admin".
* Implements several functions to send messages to mimic the behaviour of certain NebulOuS components.
* Registers any received message and offers methods to query them.
public class NebulousCoreMessageBrokerInterface {
static Logger LOGGER = LoggerFactory.getLogger(NebulousCoreMessageBrokerInterface.class);
protected ObjectMapper om = new ObjectMapper();
Publisher metricModelPublisher;
Publisher appDeployMessagePublisher;
SyncedPublisher getClusterPublisher;
* Broker connection properties
final String brokerHost = "localhost";
final int brokerPort = 5672;
final String brokerUser = "admin";
final String brokerPassword ="admin";
private List<NebulOuSCoreMessage> messages = Collections.synchronizedList(new LinkedList<NebulOuSCoreMessage>());
public NebulousCoreMessageBrokerInterface() {
* Setup NebulOuS message broker client
*/"Start NebulOuS message broker client");
metricModelPublisher = new Publisher("metricModelPublisher", "eu.nebulouscloud.ui.dsl.metric_model", true,
appDeployMessagePublisher = new Publisher("appDeployMessagePublisher", "eu.nebulouscloud.ui.dsl.generic", true,
getClusterPublisher = new SyncedPublisher("getCluster", "eu.nebulouscloud.exn.sal.cluster.get", true, true);
Consumer cons1 = new Consumer("monitoring", ">", new MyConsumerHandler(this), true, true);
Connector myEXNClient = new Connector("thisINotImportant", new MyConnectorHandler(),
List.of(metricModelPublisher, appDeployMessagePublisher, getClusterPublisher), List.of(cons1), true,
new StaticExnConfig(brokerHost, brokerPort, brokerUser, brokerPassword));
* Waits for a message that matches the given predicate to appear and returns it
* (if found). If timeout is reached without the message being recieved, returns
* an empty optional.
* @param appId the app Id to filter by. If null, no filtering occurs
* by appId
* @param topic the topic to filter by. If null, no filtering occurs by
* topic
* @param predicate The search predicate. If null, it is not used.
* @param timeoutSeconds The maximum timeout to wait for a message with the
* given predicate to be found in the list (in seconds).
* It must be a positive integer or 0.
* @return An optional with the first message that matchs the predicate if any
* found.
public Optional<NebulOuSCoreMessage> findFirst(String appId, String topic, Predicate<NebulOuSCoreMessage> predicate,
int timeoutSeconds) {
Optional<NebulOuSCoreMessage> result = Optional.empty();
long timeout = new Date().getTime() + (timeoutSeconds * 1000);
Predicate<NebulOuSCoreMessage> finalPredicate = predicate != null
? messagesFromAppAndTopic(appId, topic).and(predicate)
: messagesFromAppAndTopic(appId, topic);
do {
synchronized (messages) {
result =;
if (result.isEmpty() && new Date().getTime() < timeout) {
LOGGER.error(String.format("Waiting for message. %.2fs left for timeout.",
((timeout - new Date().getTime()) / 1000.0)));
try {
} catch (InterruptedException e) {
} while (result.isEmpty() && new Date().getTime() < timeout);
if (new Date().getTime() > timeout) {
LOGGER.error("Timeout waiting for a message");
return result;
class MyConsumerHandler extends Handler {
NebulousCoreMessageBrokerInterface messageStore;
public MyConsumerHandler(NebulousCoreMessageBrokerInterface messageStore) {
this.messageStore = messageStore;
public void onMessage(String key, String address, Map body, Message message, Context context) {
String to = "??";
try {
to = != null ? : address;
} catch (Exception ex) {
Map<Object, Object> props = new HashMap<Object, Object>();
try {
message.forEachProperty((k, v) -> props.put(k, v));
} catch (ClientException e) {
// TODO Auto-generated catch block
String subject = "?";
try {
subject = message.subject();
} catch (ClientException e) {
// TODO Auto-generated catch block
Object correlationId = 0;
try {
correlationId = message.correlationId();
} catch (ClientException e) {
// TODO Auto-generated catch block
LOGGER.trace("\r\n{}\r\nsubject:{}\r\npayload:{}\r\nproperties:{}\r\ncorrelationId:{}", to, subject, body,
props, correlationId);
NebulOuSCoreMessage internal = new NebulOuSCoreMessage(new Date(), to, body,
(String) props.getOrDefault("application", null),
correlationId != null ? correlationId.toString() : "");
try {
} catch (JsonProcessingException e) {
* Queries SAL for the status of a cluster
* @param clusterName The cluster name.
* @return The cluster status, or null in case of error.
public String getClusterStatus(String clusterName) {
Map<String, Object> msg = Map.of("metaData", Map.of("user", "admin", "clusterName", clusterName));
Map<String, Object> response = getClusterPublisher.sendSync(msg, clusterName, null, false);
JsonNode payload = extractPayloadFromExnResponse(response, "getCluster");
if (payload.isMissingNode())
return null;"isClusterReady: " + payload.toString());
JsonNode jsonState ="/status");
if (jsonState.isMissingNode())
return null;
return jsonState.asText();
* Extract and check the SAL response from an exn-middleware response. The SAL
* response will be valid JSON encoded as a string in the "body" field of the
* response. If the response is of the following form, log an error and return a
* missing node instead:
* <pre>{@code
* {
* "key": <known exception key>,
* "message": "some error message"
* }
* }</pre>
* @param responseMessage The response from exn-middleware.
* @param caller Caller information, used for logging only.
* @return The SAL response as a parsed JsonNode, or a node where {@code
* isMissingNode()} will return true if SAL reported an error.
private JsonNode extractPayloadFromExnResponse(Map<String, Object> responseMessage, String caller) {
JsonNode response = om.valueToTree(responseMessage);
String salRawResponse ="/body").asText(); // it's already a string, asText() is for the type system
JsonNode metadata ="/metaData");
JsonNode salResponse = om.missingNode(); // the data coming from SAL
try {
salResponse = om.readTree(salRawResponse);
} catch (JsonProcessingException e) {
LOGGER.error("Could not read message body as JSON: body = '{}', caller = '{}'", salRawResponse, caller, e);
return om.missingNode();
if (!"/status").asText().startsWith("2")) {
// we only accept 200, 202, numbers of that nature
LOGGER.error("exn-middleware-sal request failed with error code '{}' and message '{}', caller '{}'","/status"),"/message").asText(), caller);
return om.missingNode();
return salResponse;
private class MyConnectorHandler extends ConnectorHandler {
public void onReady(AtomicReference<Context> context) {"Optimiser-controller connected to ActiveMQ");
* Build a predicate that filters by messages for a certain appId and topic
* @param appId: The appId to filter for. If null, it is ignored
* @param topic: The topic to filter for. If null, it is ignored.
* @return
public static Predicate<NebulOuSCoreMessage> messagesFromAppAndTopic(String appId, String topic) {
return messagesFromApp(appId).and((Predicate<NebulOuSCoreMessage>) m -> topic == null || topic.equals(m.topic));
* Builds a predicate that filters messages for the given app ID. If appID is null, the predicate has no effect.
* @param id
* @return
public static Predicate<NebulOuSCoreMessage> messagesFromApp(String id) {
return ((Predicate<NebulOuSCoreMessage>) m -> id == null || id.equals(m.applicationId));
* Adds a new message to the cache
* @param message
public void add(NebulOuSCoreMessage message) {
try {
LOGGER.trace("Adding message:" + om.writeValueAsString(message));
} catch (JsonProcessingException e) {
// TODO Auto-generated catch block
* Sends an app creation message as done by the UI during deployment.
* @param appCreationPayload
* @param applicationId
public void sendAppCreationMessage(Map<String, Object> appCreationPayload, String applicationId) {
appDeployMessagePublisher.send(appCreationPayload, applicationId);
* Sends the metric model message as done by the UI during deployment.
* @param metricModelPayload
* @param applicationId
public void sendMetricModelMessage(Map<String,Object> metricModelPayload,String applicationId)

package eut.nebulouscloud.tests;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
public class SendPayloadToMQTTProcessorApp {
static Logger LOGGER = LoggerFactory.getLogger(SendPayloadToMQTTProcessorApp.class);
static protected ObjectMapper om = new ObjectMapper();
static int DELAY_SECONDS = 3;
static MQTTProcessorAppMessageBrokerInterface appBroker;
static String mqttBroker = "";
static String mqttPort = "1883";
static String mqttTopicPrefix = "atest";
static String mqttAppInputTopic = mqttTopicPrefix + "/input";
static String mqttAppOutputTopic = mqttTopicPrefix + "/output";
public static void main(String[] args) throws Exception
//String applicationId = "application=1431290905automated-testing-mqtt-app-1715257889393";
String applicationId = "1549030905automated-testing-mqtt-app-1715262543304";"Begin MQTT Processor APP deployment. applicationId is %s", applicationId));
appBroker = new MQTTProcessorAppMessageBrokerInterface("tcp://" + mqttBroker + ":" + mqttPort, mqttAppOutputTopic);
if(sendPayloadAndWaitAnswer(20) == false)
{"App responded");
private static boolean sendPayloadAndWaitAnswer(int timeoutSeconds) throws Exception {
long timeout = new Date().getTime() + (timeoutSeconds * 1000);
int retriesLeft = 2;
do {
* Build a request to be sent to the application input topic.
Map<String, Object> inferenceRequest = new HashMap<String, Object>();
inferenceRequest.put("timestamp", new SimpleDateFormat("YYYY-MM-dd HH:mm:ssZ").format(new Date()));
inferenceRequest.put("job_timestamp", inferenceRequest.get("timestamp"));
inferenceRequest.put("inference_duration", 5);
String jobId = UUID.randomUUID().toString();
inferenceRequest.put("job_id", jobId);
String payload = om.writeValueAsString(inferenceRequest);
// Send the request
appBroker.publish(mqttAppInputTopic, payload);
* Check if the application sends a message to the response channel with
* apropriate structure (check it is a JSON and has the job_id value). If found,
* we can consider the app is running
if (appBroker.findFirst(m -> {
return m.jsonPayload() != null && jobId.equals(m.jsonPayload().get("job_id"));
}, 3).isPresent()) {
return true;
* If there is a message with the content "Error in STOMP connection" it means
* that the APP is not able to publish metrics to EMS using STOMP. In this
* situation, retry at most two times.
if (appBroker.findFirst(m -> "Error in STOMP connection".equals(m.payload), 3).isPresent()) {
LOGGER.error("APP is reporting initialization error. Retries left:" + retriesLeft);
if (retriesLeft == 0)
return false;
} while (new Date().getTime() < timeout);
LOGGER.error("Timeout waiting for a message");
return false;

"title": "{{APP_ID}}",
"uuid": "{{APP_ID}}",
"status": "deploying",
"content": "apiVersion: \"\"\r\nkind: \"Application\"\r\nmetadata:\r\n name: \"{{APP_ID}}\"\r\nspec:\r\n components:\r\n - name: \"dummy-app-worker\"\r\n type: \"webservice\"\r\n properties:\r\n image: \"\"\r\n cpu: \"2.0\"\r\n memory: \"2048Mi\"\r\n imagePullPolicy: \"Always\"\r\n cmd:\r\n - \"python\"\r\n - \"-u\"\r\n - \"\"\r\n env:\r\n - name: \"mqtt_ip\"\r\n value: \"{{APP_MQTT_BROKER_SERVER}}\"\r\n - name: \"mqtt_port\"\r\n value: \"{{APP_MQTT_BROKER_PORT}}\"\r\n - name: \"mqtt_subscribe_topic\"\r\n value: \"{{APP_MQTT_INPUT_TOPIC}}\"\r\n - name: \"mqtt_publish_topic\"\r\n value: \"{{APP_MQTT_OUTPUT_TOPIC}}\"\r\n - name: \"report_metrics_to_ems\"\r\n value: \"{{REPORT_METRICS_TO_EMS}}\"\r\n - name: \"nebulous_ems_ip\"\r\n valueFrom:\r\n fieldRef:\r\n fieldPath: status.hostIP\r\n - name: \"nebulous_ems_port\"\r\n value: \"61610\"\r\n - name: \"nebulous_ems_user\"\r\n value: \"aaa\"\r\n - name: \"nebulous_ems_password\"\r\n value: \"111\"\r\n - name: \"nebulous_ems_metrics_topic\"\r\n value: \"/topic/RawProcessingLatency_SENSOR\"\r\n traits:\r\n - type: \"scaler\"\r\n properties:\r\n replicas: 1\r\n\r\n\r\n policies:\r\n - name: \"target-default\"\r\n type: \"topology\"\r\n properties:\r\n namespace: \"default\"\r\n workflow:\r\n steps:\r\n - name: \"deploy2default\"\r\n type: \"deploy\"\r\n properties:\r\n policies:\r\n - \"target-default\"",
"variables": [
"key": "spec_components_0_traits_0_properties_replicas",
"path": "/spec/components/0/traits/0/properties/replicas",
"type": "float",
"meaning": "replicas",
"value": {
"lower_bound": 1,
"higher_bound": 5
"environmentVariables": [],
"resources": [
"templates": [],
"parameters": [],
"metrics": [
"type": "raw",
"name": "RawProcessingLatency",
"level": "global",
"components": [],
"sensor": "job_process_time_instance",
"config": [],
"isWindowOutputRaw": true,
"outputRaw": {
"type": "all",
"interval": 30,
"unit": "sec"
"type": "composite",
"level": "global",
"components": [],
"name": "MeanJobProcessingLatency",
"template": "",
"formula": "mean(RawProcessingLatency)",
"isWindowInput": true,
"input": {
"type": "sliding",
"interval": 30,
"unit": "sec"
"isWindowOutput": true,
"output": {
"type": "all",
"interval": 30,
"unit": "sec"
"arguments": [
"sloViolations": {
"nodeKey": "5ce4273e-5ac3-478b-b460-075b053fb994",
"isComposite": true,
"condition": "AND",
"not": false,
"children": [
"nodeKey": "982c13a8-bbae-4574-b2be-eca15b865563",
"isComposite": false,
"metricName": "MeanJobProcessingLatency",
"operator": ">",
"value": 50
"utilityFunctions": [
"name": "f",
"type": "minimize",
"expression": {
"formula": "(dummy_app_worker_MeanJobProcessingLatency*currentReplicas)/spec_components_0_traits_0_properties_replicas",
"variables": [
"name": "dummy_app_worker_MeanJobProcessingLatency",
"value": "MeanJobProcessingLatency"
"name": "currentReplicas",
"value": "currentReplicas"
"name": "spec_components_0_traits_0_properties_replicas",
"value": "spec_components_0_traits_0_properties_replicas"
"name": "currentReplicas",
"type": "constant",
"expression": {
"formula": "a",
"variables": [
"name": "a",
"value": "spec_components_0_traits_0_properties_replicas"
"_create": true,
"_delete": true

{"application":"{{APP_ID}}","yaml":{"apiVersion":"nebulous/v1","kind":"MetricModel","metadata":{"name":"{{APP_ID}}","labels":{"app":"{{APP_ID}}"}},"templates":[],"spec":{"components":[{"name":"spec-comp","metrics":[]}],"scopes":[{"name":"app-wide-scope","components":[],"metrics":[{"name":"RawProcessingLatency","type":"raw","sensor":{"type":"job_process_time_instance","config":{}},"output":"all 30 sec"},{"name":"MeanJobProcessingLatency","type":"composite","template":"","formula":"mean(RawProcessingLatency)","window":{"type":"sliding","size":"30 sec"},"output":"all 30 sec"}],"requirements":[{"name":"Combined_SLO","type":"slo","constraint":"(MeanJobProcessingLatency > 50)"}]}]}}}