response = getClusterPublisher.sendSync(msg, clusterName, null, false);
+ JsonNode payload = extractPayloadFromExnResponse(response, "getCluster");
+ if (payload.isMissingNode())
+ return null;
+ LOGGER.info("isClusterReady: " + payload.toString());
+ JsonNode jsonState = payload.at("/status");
+ if (jsonState.isMissingNode())
+ return null;
+ return jsonState.asText();
+ }
+
+ /**
+ * Extract and check the SAL response from an exn-middleware response. The SAL
+ * response will be valid JSON encoded as a string in the "body" field of the
+ * response. If the response is of the following form, log an error and return a
+ * missing node instead:
+ *
+ * {@code
+ * {
+ * "key": ,
+ * "message": "some error message"
+ * }
+ * }
+ *
+ * @param responseMessage The response from exn-middleware.
+ * @param caller Caller information, used for logging only.
+ * @return The SAL response as a parsed JsonNode, or a node where {@code
+ * isMissingNode()} will return true if SAL reported an error.
+ */
+ private JsonNode extractPayloadFromExnResponse(Map responseMessage, String caller) {
+ JsonNode response = om.valueToTree(responseMessage);
+ String salRawResponse = response.at("/body").asText(); // it's already a string, asText() is for the type system
+ JsonNode metadata = response.at("/metaData");
+ JsonNode salResponse = om.missingNode(); // the data coming from SAL
+ try {
+ salResponse = om.readTree(salRawResponse);
+ } catch (JsonProcessingException e) {
+ LOGGER.error("Could not read message body as JSON: body = '{}', caller = '{}'", salRawResponse, caller, e);
+ return om.missingNode();
+ }
+ if (!metadata.at("/status").asText().startsWith("2")) {
+ // we only accept 200, 202, numbers of that nature
+ LOGGER.error("exn-middleware-sal request failed with error code '{}' and message '{}', caller '{}'",
+ metadata.at("/status"), salResponse.at("/message").asText(), caller);
+ return om.missingNode();
+ }
+ return salResponse;
+ }
+
+ private class MyConnectorHandler extends ConnectorHandler {
+
+ public void onReady(AtomicReference context) {
+ LOGGER.info("Optimiser-controller connected to ActiveMQ");
+ }
+ }
+
+
+ /**
+ * Build a predicate that filters by messages for a certain appId and topic
+ * @param appId: The appId to filter for. If null, it is ignored
+ * @param topic: The topic to filter for. If null, it is ignored.
+ * @return
+ */
+ public static Predicate messagesFromAppAndTopic(String appId, String topic) {
+ return messagesFromApp(appId).and((Predicate) m -> topic == null || topic.equals(m.topic));
+ }
+
+ /**
+ * Builds a predicate that filters messages for the given app ID. If appID is null, the predicate has no effect.
+ * @param id
+ * @return
+ */
+ public static Predicate messagesFromApp(String id) {
+ return ((Predicate) m -> id == null || id.equals(m.applicationId));
+ }
+
+ /**
+ * Adds a new message to the cache
+ *
+ * @param message
+ */
+ public void add(NebulOuSCoreMessage message) {
+ try {
+ LOGGER.trace("Adding message:" + om.writeValueAsString(message));
+ } catch (JsonProcessingException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ this.messages.add(message);
+ }
+
+ /**
+ * Sends an app creation message as done by the UI during deployment.
+ * @param appCreationPayload
+ * @param applicationId
+ */
+ public void sendAppCreationMessage(Map appCreationPayload, String applicationId) {
+ appDeployMessagePublisher.send(appCreationPayload, applicationId);
+
+ }
+ /**
+ * Sends the metric model message as done by the UI during deployment.
+ * @param metricModelPayload
+ * @param applicationId
+ */
+ public void sendMetricModelMessage(Map metricModelPayload,String applicationId)
+ {
+ metricModelPublisher.send(metricModelPayload,applicationId);
+ }
+
+}
diff --git a/tests/src/test/java/eut/nebulouscloud/tests/SendPayloadToMQTTProcessorApp.java b/tests/src/test/java/eut/nebulouscloud/tests/SendPayloadToMQTTProcessorApp.java
new file mode 100644
index 0000000..4d33934
--- /dev/null
+++ b/tests/src/test/java/eut/nebulouscloud/tests/SendPayloadToMQTTProcessorApp.java
@@ -0,0 +1,92 @@
+package eut.nebulouscloud.tests;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class SendPayloadToMQTTProcessorApp {
+ static Logger LOGGER = LoggerFactory.getLogger(SendPayloadToMQTTProcessorApp.class);
+ static protected ObjectMapper om = new ObjectMapper();
+ static int DELAY_SECONDS = 3;
+ static MQTTProcessorAppMessageBrokerInterface appBroker;
+ static String mqttBroker = "broker.emqx.io";
+ static String mqttPort = "1883";
+ static String mqttTopicPrefix = "atest";
+ static String mqttAppInputTopic = mqttTopicPrefix + "/input";
+ static String mqttAppOutputTopic = mqttTopicPrefix + "/output";
+
+ public static void main(String[] args) throws Exception
+ {
+ //String applicationId = "application=1431290905automated-testing-mqtt-app-1715257889393";
+ String applicationId = "1549030905automated-testing-mqtt-app-1715262543304";
+ LOGGER.info(String.format("Begin MQTT Processor APP deployment. applicationId is %s", applicationId));
+ appBroker = new MQTTProcessorAppMessageBrokerInterface("tcp://" + mqttBroker + ":" + mqttPort, mqttAppOutputTopic);
+ while(true)
+ {
+ if(sendPayloadAndWaitAnswer(20) == false)
+ {
+ break;
+ }else
+ {
+ LOGGER.info("App responded");
+ }
+ Thread.sleep(1000);
+ }
+ }
+
+ private static boolean sendPayloadAndWaitAnswer(int timeoutSeconds) throws Exception {
+ long timeout = new Date().getTime() + (timeoutSeconds * 1000);
+ int retriesLeft = 2;
+ do {
+ /**
+ * Build a request to be sent to the application input topic.
+ */
+ Map inferenceRequest = new HashMap();
+ inferenceRequest.put("timestamp", new SimpleDateFormat("YYYY-MM-dd HH:mm:ssZ").format(new Date()));
+ inferenceRequest.put("job_timestamp", inferenceRequest.get("timestamp"));
+ inferenceRequest.put("inference_duration", 5);
+ String jobId = UUID.randomUUID().toString();
+ inferenceRequest.put("job_id", jobId);
+ String payload = om.writeValueAsString(inferenceRequest);
+ // Send the request
+ appBroker.publish(mqttAppInputTopic, payload);
+
+ /**
+ * Check if the application sends a message to the response channel with
+ * apropriate structure (check it is a JSON and has the job_id value). If found,
+ * we can consider the app is running
+ */
+ if (appBroker.findFirst(m -> {
+ return m.jsonPayload() != null && jobId.equals(m.jsonPayload().get("job_id"));
+ }, 3).isPresent()) {
+ return true;
+ }
+
+ /**
+ * If there is a message with the content "Error in STOMP connection" it means
+ * that the APP is not able to publish metrics to EMS using STOMP. In this
+ * situation, retry at most two times.
+ */
+ if (appBroker.findFirst(m -> "Error in STOMP connection".equals(m.payload), 3).isPresent()) {
+ retriesLeft--;
+ LOGGER.error("APP is reporting initialization error. Retries left:" + retriesLeft);
+ appBroker.clearMessageCache();
+ if (retriesLeft == 0)
+ return false;
+ }
+
+
+ } while (new Date().getTime() < timeout);
+ LOGGER.error("Timeout waiting for a message");
+ return false;
+
+ }
+
+}
diff --git a/tests/src/test/resources/mqtt_processor_app/app_creation_message.json b/tests/src/test/resources/mqtt_processor_app/app_creation_message.json
new file mode 100644
index 0000000..ba8cf21
--- /dev/null
+++ b/tests/src/test/resources/mqtt_processor_app/app_creation_message.json
@@ -0,0 +1,116 @@
+{
+ "title": "{{APP_ID}}",
+ "uuid": "{{APP_ID}}",
+ "status": "deploying",
+ "content": "apiVersion: \"core.oam.dev/v1beta1\"\r\nkind: \"Application\"\r\nmetadata:\r\n name: \"{{APP_ID}}\"\r\nspec:\r\n components:\r\n - name: \"dummy-app-worker\"\r\n type: \"webservice\"\r\n properties:\r\n image: \"docker.io/rsprat/mytestrepo:v1\"\r\n cpu: \"2.0\"\r\n memory: \"2048Mi\"\r\n imagePullPolicy: \"Always\"\r\n cmd:\r\n - \"python\"\r\n - \"-u\"\r\n - \"worker.py\"\r\n env:\r\n - name: \"mqtt_ip\"\r\n value: \"{{APP_MQTT_BROKER_SERVER}}\"\r\n - name: \"mqtt_port\"\r\n value: \"{{APP_MQTT_BROKER_PORT}}\"\r\n - name: \"mqtt_subscribe_topic\"\r\n value: \"{{APP_MQTT_INPUT_TOPIC}}\"\r\n - name: \"mqtt_publish_topic\"\r\n value: \"{{APP_MQTT_OUTPUT_TOPIC}}\"\r\n - name: \"report_metrics_to_ems\"\r\n value: \"{{REPORT_METRICS_TO_EMS}}\"\r\n - name: \"nebulous_ems_ip\"\r\n valueFrom:\r\n fieldRef:\r\n fieldPath: status.hostIP\r\n - name: \"nebulous_ems_port\"\r\n value: \"61610\"\r\n - name: \"nebulous_ems_user\"\r\n value: \"aaa\"\r\n - name: \"nebulous_ems_password\"\r\n value: \"111\"\r\n - name: \"nebulous_ems_metrics_topic\"\r\n value: \"/topic/RawProcessingLatency_SENSOR\"\r\n traits:\r\n - type: \"scaler\"\r\n properties:\r\n replicas: 1\r\n\r\n\r\n policies:\r\n - name: \"target-default\"\r\n type: \"topology\"\r\n properties:\r\n namespace: \"default\"\r\n workflow:\r\n steps:\r\n - name: \"deploy2default\"\r\n type: \"deploy\"\r\n properties:\r\n policies:\r\n - \"target-default\"",
+ "variables": [
+ {
+ "key": "spec_components_0_traits_0_properties_replicas",
+ "path": "/spec/components/0/traits/0/properties/replicas",
+ "type": "float",
+ "meaning": "replicas",
+ "value": {
+ "lower_bound": 1,
+ "higher_bound": 5
+ }
+ }
+ ],
+ "environmentVariables": [],
+ "resources": [
+
+ ],
+ "templates": [],
+ "parameters": [],
+ "metrics": [
+ {
+ "type": "raw",
+ "name": "RawProcessingLatency",
+ "level": "global",
+ "components": [],
+ "sensor": "job_process_time_instance",
+ "config": [],
+ "isWindowOutputRaw": true,
+ "outputRaw": {
+ "type": "all",
+ "interval": 30,
+ "unit": "sec"
+ }
+ },
+ {
+ "type": "composite",
+ "level": "global",
+ "components": [],
+ "name": "MeanJobProcessingLatency",
+ "template": "",
+ "formula": "mean(RawProcessingLatency)",
+ "isWindowInput": true,
+ "input": {
+ "type": "sliding",
+ "interval": 30,
+ "unit": "sec"
+ },
+ "isWindowOutput": true,
+ "output": {
+ "type": "all",
+ "interval": 30,
+ "unit": "sec"
+ },
+ "arguments": [
+ "RawProcessingLatency"
+ ]
+ }
+ ],
+ "sloViolations": {
+ "nodeKey": "5ce4273e-5ac3-478b-b460-075b053fb994",
+ "isComposite": true,
+ "condition": "AND",
+ "not": false,
+ "children": [
+ {
+ "nodeKey": "982c13a8-bbae-4574-b2be-eca15b865563",
+ "isComposite": false,
+ "metricName": "MeanJobProcessingLatency",
+ "operator": ">",
+ "value": 50
+ }
+ ]
+ },
+ "utilityFunctions": [
+ {
+ "name": "f",
+ "type": "minimize",
+ "expression": {
+ "formula": "(dummy_app_worker_MeanJobProcessingLatency*currentReplicas)/spec_components_0_traits_0_properties_replicas",
+ "variables": [
+ {
+ "name": "dummy_app_worker_MeanJobProcessingLatency",
+ "value": "MeanJobProcessingLatency"
+ },
+ {
+ "name": "currentReplicas",
+ "value": "currentReplicas"
+ },
+ {
+ "name": "spec_components_0_traits_0_properties_replicas",
+ "value": "spec_components_0_traits_0_properties_replicas"
+ }
+ ]
+ }
+ },
+ {
+ "name": "currentReplicas",
+ "type": "constant",
+ "expression": {
+ "formula": "a",
+ "variables": [
+ {
+ "name": "a",
+ "value": "spec_components_0_traits_0_properties_replicas"
+ }
+ ]
+ }
+ }
+ ],
+ "_create": true,
+ "_delete": true
+}
\ No newline at end of file
diff --git a/tests/src/test/resources/mqtt_processor_app/metric_model.json b/tests/src/test/resources/mqtt_processor_app/metric_model.json
new file mode 100644
index 0000000..cb0f7fb
--- /dev/null
+++ b/tests/src/test/resources/mqtt_processor_app/metric_model.json
@@ -0,0 +1 @@
+{"application":"{{APP_ID}}","yaml":{"apiVersion":"nebulous/v1","kind":"MetricModel","metadata":{"name":"{{APP_ID}}","labels":{"app":"{{APP_ID}}"}},"templates":[],"spec":{"components":[{"name":"spec-comp","metrics":[]}],"scopes":[{"name":"app-wide-scope","components":[],"metrics":[{"name":"RawProcessingLatency","type":"raw","sensor":{"type":"job_process_time_instance","config":{}},"output":"all 30 sec"},{"name":"MeanJobProcessingLatency","type":"composite","template":"","formula":"mean(RawProcessingLatency)","window":{"type":"sliding","size":"30 sec"},"output":"all 30 sec"}],"requirements":[{"name":"Combined_SLO","type":"slo","constraint":"(MeanJobProcessingLatency > 50)"}]}]}}}
\ No newline at end of file