Resource Manager: Copied changes from branch 'add-sal-connectivity'

Change-Id: I9b39e95b117331e0b5d764938b7701c183986fba
This commit is contained in:
ipatini 2024-04-02 20:41:39 +03:00
parent 2cf4d6a7f9
commit a7bad9a798
27 changed files with 1317 additions and 23 deletions

View File

@ -1,2 +1,2 @@
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.4/apache-maven-3.9.4-bin.zip
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.6/apache-maven-3.9.6-bin.zip
wrapperUrl=https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar

View File

@ -1,9 +1,6 @@
ARG BUILDER_IMAGE=docker.io/library/maven:3.9.5-eclipse-temurin-17
ARG RUN_IMAGE=docker.io/library/eclipse-temurin:17.0.8.1_1-jre
# ----------------- Builder image -----------------
FROM docker.io/library/maven:3.9.5-eclipse-temurin-17 as rd-builder
FROM docker.io/library/maven:3.9.6-eclipse-temurin-21 as rd-builder
ENV BASEDIR /app
WORKDIR ${BASEDIR}
COPY src ${BASEDIR}/src
@ -13,7 +10,7 @@ RUN mvn -f ${BASEDIR}/pom.xml -DskipTests clean install && \
java -Djarmode=layertools -jar ${BASEDIR}/target/resource-discovery-*.jar extract
# ----------------- Runtime image -----------------
FROM docker.io/library/eclipse-temurin:17.0.8.1_1-jre
FROM docker.io/library/eclipse-temurin:21.0.1_12-jre
# Setup environment
ENV BASEDIR /opt/resource-discovery
@ -26,9 +23,9 @@ RUN wget --progress=dot:giga -O /usr/local/bin/dumb-init \
# Add RD user
ARG RD_USER=rd
RUN mkdir ${RD_HOME} ; \
addgroup ${RD_USER} ; \
adduser --home ${RD_HOME} --no-create-home --ingroup ${RD_USER} --disabled-password ${RD_USER} ; \
RUN mkdir ${RD_HOME} && \
addgroup ${RD_USER} && \
adduser --home ${RD_HOME} --no-create-home --ingroup ${RD_USER} --disabled-password ${RD_USER} && \
chown ${RD_USER}:${RD_USER} ${RD_HOME}
# Set User and Workdir

View File

@ -6,7 +6,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.4</version>
<version>3.2.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
@ -17,11 +17,19 @@
<description>Nebulous resource discovery service</description>
<properties>
<java.version>17</java.version>
<java.version>21</java.version>
<imageName>${project.artifactId}:${project.version}</imageName>
</properties>
<dependencies>
<dependency>
<groupId>eu.nebulouscloud</groupId>
<artifactId>exn-connector-java</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
@ -45,12 +53,12 @@
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
<version>1.18.30</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.13.0</version>
<version>3.14.0</version>
</dependency>
<dependency>
@ -72,6 +80,57 @@
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.15.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.16</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.14</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.5.14</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<dependencyManagement>
@ -79,7 +138,7 @@
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>2.0</version>
<version>2.2</version>
</dependency>
</dependencies>
</dependencyManagement>
@ -93,6 +152,18 @@
</plugins>
</build>
<repositories>
<repository>
<id>maven-central</id>
<url>https://repo1.maven.org/maven2/</url>
</repository>
<repository>
<id>nexus-nebulous</id>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
</repository>
</repositories>
<!-- Creating Docker image with BUILDPACKS -->
<!--<build>
<plugins>

View File

@ -43,7 +43,7 @@ ${JRE} \
$JAVA_ADD_OPENS \
-Djasypt.encryptor.password=$JASYPT_PASSWORD \
-Djava.security.egd=file:/dev/urandom \
org.springframework.boot.loader.JarLauncher \
org.springframework.boot.loader.launch.JarLauncher \
$* &
# Get PID and wait it to exit

View File

@ -85,6 +85,13 @@ public class ResourceDiscoveryProperties {
// Users
private List<UserData> users;
// Nebulous broker subscription details
private String nebulous_broker_ip_address;
private int nebulous_broker_port;
private String nebulous_broker_username;
private String lost_device_topic;
private String nebulous_broker_password;
@Data
public static class UserData {
private final String username;

View File

@ -0,0 +1,110 @@
package eu.nebulous.resource.discovery.broker_communication;
import eu.nebulouscloud.exn.Connector;
import eu.nebulouscloud.exn.core.Publisher;
import eu.nebulouscloud.exn.handlers.ConnectorHandler;
import eu.nebulouscloud.exn.settings.StaticExnConfig;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
@Slf4j
public class BrokerPublisher {
public static String EMPTY="";
private static HashMap<String, HashSet<String>> broker_and_topics_to_publish_to = new HashMap<>();
private Publisher private_publisher_instance;
private ArrayList<Publisher> publishers = new ArrayList<>();
private ExtendedConnector active_connector;
private String topic;
private String broker_ip;
private int broker_port;
public BrokerPublisher(String topic, String broker_ip, int broker_port, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) {
boolean able_to_initialize_BrokerPublisher = topic!=null && broker_ip!=null && brokerUsername!=null && brokerPassword!=null && !topic.equals(EMPTY) && !broker_ip.equals(EMPTY) && !brokerUsername.equals(EMPTY) && !brokerPassword.equals(EMPTY);
if (!able_to_initialize_BrokerPublisher){
return;
}
boolean publisher_configuration_changed;
if (!broker_and_topics_to_publish_to.containsKey(broker_ip)){
HashSet<String> topics_to_publish_to = new HashSet<>();
topics_to_publish_to.add(topic);
broker_and_topics_to_publish_to.put(broker_ip,topics_to_publish_to);
publisher_configuration_changed = true;
}else{
if (!broker_and_topics_to_publish_to.get(broker_ip).contains(topic)){
broker_and_topics_to_publish_to.get(broker_ip).add(topic);
publisher_configuration_changed = true;
}
else{
publisher_configuration_changed = false;
}
}
if (publisher_configuration_changed){
// for (String current_broker_ip : broker_and_topics_to_publish_to.keySet()){
log.info("Publisher configuration changed, creating new connector at "+broker_ip+" for topic "+topic);
if (active_connector!=null) {
active_connector.stop(new ArrayList<>(), publishers);
}
publishers.clear();
for (String broker_topic : broker_and_topics_to_publish_to.get(broker_ip)){
//ArrayList<Publisher> publishers = new ArrayList<>();
Publisher publisher = new Publisher("resource_manager_"+broker_topic, broker_topic, true, true);
publishers.add(publisher);
if (broker_topic.equals(topic)){
this.private_publisher_instance = publishers.get(publishers.size()-1);
this.topic = broker_topic;
this.broker_ip = broker_ip;
this.broker_port = broker_port;
}
}
//CustomConnectorHandler custom_handler = new CustomConnectorHandler();
active_connector = new ExtendedConnector("resource_manager"
, new CustomConnectorHandler() {}
, publishers
, List.of(),
false,
false,
new StaticExnConfig(
broker_ip,
broker_port,
brokerUsername,
brokerPassword,
60,
EMPTY
)
);
active_connector.start();
}
}
//TODO The methods below assume that the only content to be sent is json-like
public void publish (String json_string_content, Collection<String> application_names){
for (String application_name : application_names) {
JSONParser parser = new JSONParser();
JSONObject json_object = new JSONObject();
try {
json_object = (JSONObject) parser.parse(json_string_content);
} catch (ParseException p) {
log.warn( "Could not parse the string content to be published to the broker as json, which is the following: "+json_string_content);
}
if (private_publisher_instance != null) {
private_publisher_instance.send(json_object);
log.info("Sent new message\n"+json_object.toJSONString());
} else {
log.error( "Could not send message to AMQP broker, as the broker ip to be used has not been specified");
}
}
}
}

View File

@ -0,0 +1,194 @@
package eu.nebulous.resource.discovery.broker_communication;
import eu.nebulouscloud.exn.core.Consumer;
import eu.nebulouscloud.exn.core.Context;
import eu.nebulouscloud.exn.core.Handler;
import eu.nebulouscloud.exn.settings.StaticExnConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.qpid.protonj2.client.Message;
import org.json.simple.JSONValue;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import static eu.nebulous.resource.discovery.broker_communication.BrokerPublisher.EMPTY;
@Slf4j
public class BrokerSubscriber {
private class MessageProcessingHandler extends Handler {
private BrokerSubscriptionDetails broker_details;
private static final BiFunction temporary_function = (Object o, Object o2) -> {
//System.out.println("");
log.info("REPLACE_TEMPORARY_HANDLING_FUNCTIONALITY");
return "IN_PROCESSING";
};
private BiFunction<BrokerSubscriptionDetails, String, String> processing_function;
@Override
public void onMessage(String key, String address, Map body, Message message, Context context) {
log.info("Handling message for address " + address);
processing_function.apply(broker_details, JSONValue.toJSONString(body));
}
public MessageProcessingHandler(BrokerSubscriptionDetails broker_details) {
this.broker_details = broker_details;
this.processing_function = temporary_function;
}
public MessageProcessingHandler(BiFunction<BrokerSubscriptionDetails, String, String> biFunction, BrokerSubscriptionDetails broker_details) {
this.broker_details = broker_details;
this.processing_function = biFunction;
}
public BiFunction getProcessing_function() {
return processing_function;
}
public void setProcessing_function(BiFunction processing_function) {
this.processing_function = processing_function;
}
}
private static HashMap<String, HashSet<String>> broker_and_topics_to_subscribe_to = new HashMap<>();
private static HashMap<String, HashMap<String, Consumer>> active_consumers_per_topic_per_broker_ip = new HashMap<>();
private static HashMap<String, ExtendedConnector> current_connectors = new HashMap<>();
private String topic;
private String broker_ip;
private int broker_port;
private String brokerUsername;
private String brokerPassword;
BrokerSubscriptionDetails broker_details;
public BrokerSubscriber(String topic, String broker_ip, int broker_port, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation, String application_name) {
boolean able_to_initialize_BrokerSubscriber = topic != null && broker_ip != null && brokerUsername != null && brokerPassword != null && !topic.equals(EMPTY) && !broker_ip.equals(EMPTY) && !brokerUsername.equals(EMPTY) && !brokerPassword.equals(EMPTY);
if (!able_to_initialize_BrokerSubscriber) {
try {
throw new Exception("Unable to initialize Subscriber");
} catch (Exception e) {
String message = "Topic is " + topic + " broker ip is " + broker_ip + " broker username/pass are " + brokerUsername + "," + brokerPassword;
log.info(message);
throw new RuntimeException(e);
}
}
broker_details = new BrokerSubscriptionDetails(broker_ip, broker_port, brokerUsername, brokerPassword, application_name, topic);
boolean subscriber_configuration_changed;
if (!broker_and_topics_to_subscribe_to.containsKey(broker_ip)) {
HashSet<String> topics_to_subscribe_to = new HashSet<>();
//topics_to_subscribe_to.add(realtime_metric_topic_name);
//topics_to_subscribe_to.add(forecasted_metric_topic_name);
//topics_to_subscribe_to.add();
topics_to_subscribe_to.add(topic);
broker_and_topics_to_subscribe_to.put(broker_ip, new HashSet<>());
active_consumers_per_topic_per_broker_ip.put(broker_ip, new HashMap<>());
broker_and_topics_to_subscribe_to.get(broker_ip).add(topic);
subscriber_configuration_changed = true;
} else {
if (!broker_and_topics_to_subscribe_to.get(broker_ip).contains(topic)) {
broker_and_topics_to_subscribe_to.get(broker_ip).add(topic);
subscriber_configuration_changed = true;
} else {
subscriber_configuration_changed = false;
}
}
if (subscriber_configuration_changed) {
Consumer current_consumer;
if (application_name != null && !application_name.equals(EMPTY)) { //Create a consumer for one application
log.info("APP level subscriber " + topic);
current_consumer = new Consumer(topic, topic, new MessageProcessingHandler(broker_details), application_name, true, true);
} else { //Allow the consumer to get information from any publisher
current_consumer = new Consumer(topic, topic, new MessageProcessingHandler(broker_details), true, true);
log.info("HIGH level subscriber " + topic);
}
active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic, current_consumer);
this.topic = topic;
this.broker_ip = broker_ip;
this.broker_port = broker_port;
this.brokerUsername = brokerUsername;
this.brokerPassword = brokerPassword;
add_topic_consumer_to_broker_connector(current_consumer);
}
}
/**
* This method updates the global connector of Resource manager to the AMQP server, by adding support for one more component
*/
private void add_topic_consumer_to_broker_connector(Consumer new_consumer) {
if (current_connectors.get(broker_ip) != null) {
current_connectors.get(broker_ip).add_consumer(new_consumer);
} else {
ArrayList<Consumer> consumers = new ArrayList<>();
consumers.add(new_consumer);
ExtendedConnector extended_connector = new ExtendedConnector("resource_manager",
new CustomConnectorHandler() {
},
List.of(),
consumers,
false,
false,
new StaticExnConfig(
broker_ip,
broker_port,
brokerUsername,
brokerPassword,
60,
EMPTY
)
);
extended_connector.start();
current_connectors.put(broker_ip, extended_connector);
}
}
private void remove_topic_from_broker_connector(String topic_key) {
if (current_connectors.get(broker_ip) != null) {
current_connectors.get(broker_ip).remove_consumer_with_key(topic_key);
}
}
public int subscribe(BiFunction function, String application_name, AtomicBoolean stop_signal) {
int exit_status = -1;
log.info("ESTABLISHING SUBSCRIPTION for " + topic);
//First remove any leftover consumer
if (active_consumers_per_topic_per_broker_ip.containsKey(broker_ip)) {
active_consumers_per_topic_per_broker_ip.get(broker_ip).remove(topic);
remove_topic_from_broker_connector(topic);
} else {
active_consumers_per_topic_per_broker_ip.put(broker_ip, new HashMap<>());
}
//Then add the new consumer
Consumer new_consumer;
if (application_name != null && !application_name.equals(EMPTY)) {
new_consumer = new Consumer(topic, topic, new MessageProcessingHandler(function, broker_details), application_name,
true, true);
} else {
new_consumer = new Consumer(topic, topic, new MessageProcessingHandler(function, broker_details), true, true);
}
new_consumer.setProperty("topic", topic);
active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic, new_consumer);
add_topic_consumer_to_broker_connector(new_consumer);
log.info("ESTABLISHED SUBSCRIPTION to topic " + topic);
synchronized (stop_signal) {
while (!stop_signal.get()) {
try {
stop_signal.wait();
} catch (Exception e) {
log.warn( e.toString() + " in thread " + Thread.currentThread().getName());
break;
}
}
log.info("Stopping subscription for broker " + broker_ip + " and topic " + topic + "at thread " + Thread.currentThread().getName());
stop_signal.set(false);
}
active_consumers_per_topic_per_broker_ip.get(broker_ip).remove(topic);
remove_topic_from_broker_connector(topic);
exit_status = 0;
return exit_status;
}
}

View File

@ -0,0 +1,79 @@
package eu.nebulous.resource.discovery.broker_communication;
import static eu.nebulous.resource.discovery.broker_communication.BrokerPublisher.EMPTY;
public class BrokerSubscriptionDetails {
String broker_username = "admin";
String broker_password = "admin";
String broker_ip = "localhost";
int broker_port = 5672;
String application_name = "default_application";
String topic = EMPTY;
public BrokerSubscriptionDetails(String broker_ip, int broker_port, String broker_username, String broker_password,String application_name, String topic) {
this.broker_ip = broker_ip;
this.broker_port = broker_port;
this.broker_username = broker_username;
this.broker_password = broker_password;
this.topic = topic;
this.application_name = application_name;
}
public BrokerSubscriptionDetails(boolean fake_broker_subscription) {
if (fake_broker_subscription) {
this.broker_username = EMPTY;
this.broker_password = EMPTY;
this.broker_ip = EMPTY;
this.topic = EMPTY;
this.application_name = EMPTY;
}
}
public String getBroker_username() {
return broker_username;
}
public void setBroker_username(String broker_username) {
this.broker_username = broker_username;
}
public String getBroker_password() {
return broker_password;
}
public void setBroker_password(String broker_password) {
this.broker_password = broker_password;
}
public String getBroker_ip() {
return broker_ip;
}
public void setBroker_ip(String broker_ip) {
this.broker_ip = broker_ip;
}
public String getApplication_name() {
return application_name;
}
public void setApplication_name(String application_name) {
this.application_name = application_name;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public int getBroker_port() {
return broker_port;
}
public void setBroker_port(int broker_port) {
this.broker_port = broker_port;
}
}

View File

@ -0,0 +1,24 @@
package eu.nebulous.resource.discovery.broker_communication;
import eu.nebulouscloud.exn.core.Context;
import eu.nebulouscloud.exn.handlers.ConnectorHandler;
public class CustomConnectorHandler extends ConnectorHandler {
private Context context;
@Override
public void onReady(Context context) {
this.context = context;
}
public void remove_consumer_with_key(String key){
context.unregisterConsumer(key);
}
public Context getContext() {
return context;
}
public void setContext(Context context) {
this.context = context;
}
}

View File

@ -0,0 +1,99 @@
package eu.nebulous.resource.discovery.broker_communication;
import eu.nebulouscloud.exn.Connector;
import eu.nebulouscloud.exn.core.Consumer;
import eu.nebulouscloud.exn.core.Context;
import eu.nebulouscloud.exn.core.Publisher;
import eu.nebulouscloud.exn.handlers.ConnectorHandler;
import eu.nebulouscloud.exn.settings.ExnConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.catalina.util.CustomObjectInputStream;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class ExtendedConnector extends Connector {
private CustomConnectorHandler handler;
private Connector connector;
public ExtendedConnector(String component, ConnectorHandler handler, List<Publisher> publishers, List<Consumer> consumers, boolean enableState, boolean enableHealth, ExnConfig configuration) {
super(component, handler, publishers, consumers, enableState, enableHealth, configuration);
this.handler =(CustomConnectorHandler) handler;
}
public ExtendedConnector(String component, ConnectorHandler handler, List<Publisher> publishers, List<Consumer> consumers, boolean enableState, ExnConfig configuration) {
super(component, handler, publishers, consumers, enableState, configuration);
this.handler = (CustomConnectorHandler) handler;
}
public ExtendedConnector(String component, ConnectorHandler handler, List<Publisher> publishers, List<Consumer> consumers, ExnConfig configuration) {
super(component, handler, publishers, consumers, configuration);
this.handler = (CustomConnectorHandler) handler;
}
public CustomConnectorHandler getHandler() {
return (CustomConnectorHandler) handler;
}
public void setHandler(CustomConnectorHandler handler) {
this.handler = handler;
}
public void remove_consumer_with_key(String key) {
try {
Context context = ((CustomConnectorHandler)handler).getContext();
context.unregisterConsumer(key);
}catch (ClassCastException c){
log.warn("Could not unregister consumer, as the handler of the Connector it belongs to is not a CustomConnectorHandler");
}
}
private void remove_publisher_with_key(String key) {
try {
Context context = ((CustomConnectorHandler)handler).getContext();
context.unregisterPublisher(key);
}catch (ClassCastException c){
log.warn("Could not unregister consumer, as the handler of the Connector it belongs to is not a CustomConnectorHandler");
}
}
public void add_consumer(Consumer newConsumer) {
try {
((CustomConnectorHandler)handler).getContext().registerConsumer(newConsumer);
}catch (ClassCastException c){
log.warn("Could not register consumer, as the handler of the Connector it belongs to is not a CustomConnectorHandler");
}
}
public void stop(ArrayList<Consumer> consumers, ArrayList <Publisher> publishers){
if (consumers.size()>0) {
stop_consumers(consumers);
}
if (publishers.size()>0) {
stop_publishers(publishers);
}
}
public void stop_consumers(ArrayList<Consumer> consumers){
for (Consumer consumer : consumers){
remove_consumer_with_key(consumer.key());
}
}
public void stop_publishers(ArrayList<Publisher> publishers){
for (Publisher publisher : publishers){
remove_publisher_with_key(publisher.key());
}
}
public Connector getConnector() {
return connector;
}
public void setConnector(Connector connector) {
this.connector = connector;
}
}

View File

@ -0,0 +1,27 @@
package eu.nebulous.resource.discovery.broker_communication;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import java.io.FileReader;
public class JsonFileParser {
public static JSONObject parse(String file_name){
JSONParser parser = new JSONParser();
try {
Object obj = parser.parse(new FileReader(file_name));
JSONObject jsonObject = (JSONObject) obj;
// Access properties of the parsed JSON object here
return jsonObject;
} catch (Exception e) {
e.printStackTrace();
}
return new JSONObject();
}
public static void main(String[] args) {
String currentDir = System.getProperty("user.dir");
System.out.println("Current Directory: " + currentDir);
String parsed_file_string = parse("./src/main/java/eu/nebulous/resource/discovery/broker_communication/file.json").toString();
System.out.println(parsed_file_string);
}
}

View File

@ -0,0 +1,355 @@
package eu.nebulous.resource.discovery.broker_communication;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.entity.mime.content.StringBody;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
/**
* Assuming that only one SALCommunicator will exist - otherwise some variables should have their characterization as 'static' be removed
*/
@Slf4j
public class SALCommunicator {
private static String sal_host = "localhost";
private static String sal_port = "9000";
private static String mylogin = "admin";
private static String mypassword = "admin";
public static void register_device(int cpu_cores, int ram_gb, int disk, String application_name, String internal_ip_address, String external_ip_address, String city, String country, String latitude, String longitude){
}
public static String get_connection_id(String sal_host, int sal_port, String sal_username, String sal_password){
// Request 1 - Get sessionID
HashMap<String,String> authentication_map = new HashMap<>();
authentication_map.put("username",sal_username);
authentication_map.put("password",sal_password);
String sessionID = sendPOSTRequest("http://"+sal_host+":"+sal_port+"/sal/pagateway/connect", new HashMap<>(), authentication_map);
log.info("Retrieved session id "+sessionID);
return sessionID;
}
public static void main(String[] args) {
//String contentType = "application/json";
String sessionID = get_connection_id("localhost",9000,"admin","admin");
log.info("The session id is "+sessionID);
ArrayList<String> applications = get_running_applications(request_running_applications_REST(sessionID));
log.info("The running apps are "+applications.toString());
register_devices("./src/main/resources/sal_device_registration_base_payload.json", sessionID, applications,"10.100.100","100.100.100.",10,10,10,"test12","test_provider","Athens","Greece",100);
// Request 4
//String payload4 = "{\"key3\": \"value3\"}";
//sendRequest("https://api.example.com/endpoint3", sessionID, contentType, payload4);
}
public static String request_running_applications_AMQP() {
// Request 2 - Get available jobs
//String get_jobs_string = sendGETRequest("http://localhost:9000/sal/job/" );
//return get_jobs_string;
return null;
}
private static String request_running_applications_REST(String sessionID) {
// Request 2 - Get available jobs
String get_jobs_payload = "{\"sessionid\": \""+sessionID+"\"}";
HashMap<String,String> session_id_headers = new HashMap<>();
session_id_headers.put("sessionid",sessionID);
log.info("Using temporary \"job\" endpoint to get the jobs from SAL...");
String get_jobs_string = sendGETRequest("http://localhost:9000/sal/job/",session_id_headers );
return get_jobs_string;
}
private static void register_devices(String request_body_file, String sessionID, ArrayList<String> applications,String internal_ip_address, String external_ip_address, int cpu_cores, int ram_gb, int disk_gb, String device_name,String provider_id, String city_name, String country_name, int number_of_devices_to_register) {
for (int counter = 0; counter < number_of_devices_to_register; counter++) {
JSONObject json = JsonFileParser.parse(request_body_file);
if (number_of_devices_to_register>1) { //Test mode, TODO delete
json.put("name", "test" + counter);
((JSONObject) ((JSONArray) json.get("ipAddresses")).get(0)).put("value", internal_ip_address + counter);
((JSONObject) json.get("nodeProperties")).put("disk", new Random().nextInt(1, 101));
((JSONObject) json.get("nodeProperties")).put("memory", new Random().nextInt(1, 17));
((JSONObject) json.get("nodeProperties")).put("providerId", String.valueOf(new Random().nextInt(1, 21)));
((JSONObject) json.get("nodeProperties")).put("numberOfCores", new Random().nextInt(1, 17));
String[] country_choices = {"Greece", "Poland", "France"};
String[] city_choices = {"Athens", "Warsaw", "Nice"};
int random_int = new Random().nextInt(0, 3);
((JSONObject) ((JSONObject) json.get("nodeProperties")).get("geoLocation")).put("country", country_choices[random_int]);
((JSONObject) ((JSONObject) json.get("nodeProperties")).get("geoLocation")).put("city", city_choices[random_int]);
((JSONObject) ((JSONObject) json.get("nodeProperties")).get("geoLocation")).put("latitude", new Random().nextFloat(-90, 90));
((JSONObject) ((JSONObject) json.get("nodeProperties")).get("geoLocation")).put("longitude", new Random().nextFloat(-90, 90));
// Request 3 - Register device for a particular job
}else{
json.put("name", device_name);
((JSONObject) ((JSONArray) json.get("ipAddresses")).get(0)).put("value", internal_ip_address);
((JSONObject) ((JSONArray) json.get("ipAddresses")).get(1)).put("value", external_ip_address);
((JSONObject) json.get("nodeProperties")).put("disk", disk_gb);
((JSONObject) json.get("nodeProperties")).put("memory", ram_gb);
((JSONObject) json.get("nodeProperties")).put("providerId", provider_id);
((JSONObject) json.get("nodeProperties")).put("numberOfCores", cpu_cores);
((JSONObject) ((JSONObject) json.get("nodeProperties")).get("geoLocation")).put("country", country_name);
((JSONObject) ((JSONObject) json.get("nodeProperties")).get("geoLocation")).put("city", city_name);
((JSONObject) ((JSONObject) json.get("nodeProperties")).get("geoLocation")).put("latitude", new Random().nextFloat(-90, 90));
((JSONObject) ((JSONObject) json.get("nodeProperties")).get("geoLocation")).put("longitude", new Random().nextFloat(-90, 90));
}
System.out.println(json.toJSONString());
for (String application : applications) {
json.put("jobId", application);
String payload3 = json.toJSONString();
HashMap<String, String> headers = new HashMap<>();
headers.put("sessionid", sessionID);
headers.put("Content-Type", "application/json");
sendPOSTRequest("http://" + sal_host + ":" + sal_port + "/sal/edge/" + application, headers, payload3);
}
}
}
public static String get_device_registration_json(String internal_ip_address, String external_ip_address, int cpu_cores, int ram_gb, int disk_gb, String device_name,String provider_id, String city_name, String country_name, String device_username, String device_password) {
JSONObject root_json_object = new JSONObject();
JSONObject loginCredential = new JSONObject();
JSONObject ipAddress1 = new JSONObject();
JSONObject ipAddress2 = new JSONObject();
JSONObject operatingSystem = new JSONObject();
JSONObject geoLocation = new JSONObject();
JSONObject nodeProperties = new JSONObject();
loginCredential.put("username", device_username);
loginCredential.put("password", device_password);
loginCredential.put("privateKey", "");
ipAddress1.put("IpAddressType", "PUBLIC_IP");
ipAddress1.put("IpVersion", "V4");
ipAddress1.put("value", external_ip_address);
ipAddress2.put("IpAddressType", "PRIVATE_IP");
ipAddress2.put("IpVersion", "V4");
ipAddress2.put("value", internal_ip_address);
operatingSystem.put("operatingSystemFamily", "UBUNTU");
operatingSystem.put("operatingSystemArchitecture", "ARMv8");
operatingSystem.put("operatingSystemVersion", 1804);
geoLocation.put("city", city_name);
geoLocation.put("country", country_name);
geoLocation.put("latitude", new Random().nextFloat(-90, 90));
geoLocation.put("longitude", new Random().nextFloat(-90, 90));
nodeProperties.put("providerId", provider_id);
nodeProperties.put("numberOfCores", cpu_cores);
nodeProperties.put("memory", ram_gb);
nodeProperties.put("disk", disk_gb);
nodeProperties.put("operatingSystem", operatingSystem);
nodeProperties.put("geoLocation", geoLocation);
root_json_object.put("name", device_name);
root_json_object.put("loginCredential", loginCredential);
JSONArray ipAddresses = new JSONArray();
ipAddresses.add(ipAddress1);
ipAddresses.add(ipAddress2);
root_json_object.put("ipAddresses", ipAddresses);
root_json_object.put("nodeProperties", nodeProperties);
root_json_object.put("systemArch", "ARMv8");
root_json_object.put("scriptURL", "https://www.google.com");
root_json_object.put("jarURL", "https://www.activeeon.com/public_content/7cde3381417ff3784639dc41fa7e7cd0544a5234-morphemic-7bulls/node_13.1.0-SNAPSHOT_armv8.jar");
//JSONObject root_json_object = JsonFileParser.parse(request_body_file);
//root_json_object.put("name", device_name);
//((JSONObject) ((JSONArray) root_json_object.get("ipAddresses")).get(0)).put("value", internal_ip_address);
//((JSONObject) ((JSONArray) root_json_object.get("ipAddresses")).get(1)).put("value", external_ip_address);
//((JSONObject) root_json_object.get("nodeProperties")).put("disk", disk_gb);
//((JSONObject) root_json_object.get("nodeProperties")).put("memory", ram_gb);
//((JSONObject) root_json_object.get("nodeProperties")).put("providerId", provider_id);
//((JSONObject) root_json_object.get("nodeProperties")).put("numberOfCores", cpu_cores);
//((JSONObject) ((JSONObject) root_json_object.get("nodeProperties")).get("geoLocation")).put("country", country_name);
//((JSONObject) ((JSONObject) root_json_object.get("nodeProperties")).get("geoLocation")).put("city", city_name);
//((JSONObject) ((JSONObject) root_json_object.get("nodeProperties")).get("geoLocation")).put("latitude", new Random().nextFloat(-90, 90));
//((JSONObject) ((JSONObject) root_json_object.get("nodeProperties")).get("geoLocation")).put("longitude", new Random().nextFloat(-90, 90));
return(root_json_object.toJSONString());
}
public static ArrayList<String> get_running_applications(String running_jobs_string) {
ArrayList<String>applications = new ArrayList<>();
JSONParser parser = new JSONParser();
try{
Object received_json = parser.parse(running_jobs_string);
if (received_json instanceof JSONArray) {
JSONArray jobs_array = (JSONArray) parser.parse(running_jobs_string);
for (int i = 0; i < jobs_array.size(); i++) {
JSONObject json_job_object = (JSONObject) jobs_array.get(i);
applications.add((String) json_job_object.get("jobId"));
}
}else if (received_json instanceof JSONObject){
JSONObject json_job_object = (JSONObject) received_json;
applications.add((String) json_job_object.get("jobId"));
}
}catch (Exception e){
e.printStackTrace();
System.out.println("This is the input json jobs string\n\n");
System.out.println(running_jobs_string);
}
return applications;
}
private static String sendGETRequest(String url, HashMap<String,String>headers) {
String response_string = "";
CloseableHttpClient client = HttpClients.createDefault();
HttpGet httpGet = new HttpGet(url);
// Set headers
for (Map.Entry<String,String> entry : headers.entrySet()) {
httpGet.setHeader(entry.getKey(), entry.getValue());
}
CloseableHttpResponse response = null;
try {
response = client.execute(httpGet);
HttpEntity responseEntity = response.getEntity();
if (responseEntity != null) {
InputStream inputStream = responseEntity.getContent();
response_string = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
byte[] buffer = new byte[1024];
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
System.out.write(buffer, 0, bytesRead);
}
}
response.close();
client.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
System.out.println("Status code: " + response.getStatusLine().getStatusCode());
return response_string;
}
public static String sendPOSTRequest(String urlString, HashMap<String,String> headers, HashMap<String,String> multipart_form) {
String response_string = "Invalid response";
HttpPost post = new HttpPost(urlString);
try {
MultipartEntityBuilder builder = MultipartEntityBuilder.create();
// Create a multipart entity
if (!multipart_form.isEmpty()) {
for (Map.Entry<String,String> entry : multipart_form.entrySet()) {
builder.addPart(entry.getKey(), new StringBody(entry.getValue(), ContentType.TEXT_PLAIN));
}
//post.setHeader("Content-Type", "multipart/form-data");
HttpEntity entity = builder.build();
post.setEntity(entity);
}
if (!headers.isEmpty()) {
for (Map.Entry<String,String> entry : headers.entrySet()) {
post.setHeader(entry.getKey(), entry.getValue());
}
}
CloseableHttpClient client = HttpClients.createDefault();
HttpResponse response = client.execute(post);
HttpEntity responseEntity = response.getEntity();
if (responseEntity != null) {
InputStream inputStream = responseEntity.getContent();
response_string = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
System.out.println("Printing before "+response_string);
byte[] buffer = new byte[1024];
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
System.out.write(buffer, 0, bytesRead);
}
}
}catch (Exception e){
e.printStackTrace();
}
System.out.println("Returning the response string "+response_string);
return response_string;
}
public static String sendPOSTRequest(String urlString, HashMap<String,String> headers, String payload) {
String response_string = "";
CloseableHttpClient httpclient = HttpClients.createDefault();
HttpPost httpPost = new HttpPost(urlString);
// Set headers
if (!headers.isEmpty()) {
for (Map.Entry<String,String> entry : headers.entrySet()) {
httpPost.setHeader(entry.getKey(), entry.getValue());
}
}
// Define JSON payload
String jsonPayload = payload;
StringEntity entity = null;
try {
entity = new StringEntity(jsonPayload);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
httpPost.setEntity(entity);
// Execute the request
CloseableHttpResponse response = null;
try {
response = httpclient.execute(httpPost);
// Handle the response
HttpEntity responseEntity = response.getEntity();
response_string = EntityUtils.toString(responseEntity);
// Close resources
response.close();
httpclient.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
return response_string;
}
}

View File

@ -0,0 +1,149 @@
package eu.nebulous.resource.discovery.broker_communication;
import eu.nebulouscloud.exn.core.Publisher;
import eu.nebulouscloud.exn.core.SyncedPublisher;
import eu.nebulouscloud.exn.settings.StaticExnConfig;
import lombok.extern.slf4j.Slf4j;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.slf4j.Marker;
import java.util.*;
@Slf4j
public class SynchronousBrokerPublisher {
public static String EMPTY="";
private static HashMap<String, HashSet<String>> broker_and_topics_to_publish_to = new HashMap<>();
private SyncedPublisher private_publisher_instance;
private ArrayList<Publisher> publishers = new ArrayList<>();
private ExtendedConnector active_connector;
private String topic;
private String broker_ip;
public SynchronousBrokerPublisher(String topic, String broker_ip, int broker_port, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) {
boolean able_to_initialize_BrokerPublisher = topic!=null && broker_ip!=null && brokerUsername!=null && brokerPassword!=null && !topic.equals(EMPTY) && !broker_ip.equals(EMPTY) && !brokerUsername.equals(EMPTY) && !brokerPassword.equals(EMPTY);
if (!able_to_initialize_BrokerPublisher){
return;
}
boolean publisher_configuration_changed;
if (!broker_and_topics_to_publish_to.containsKey(broker_ip)){
HashSet<String> topics_to_publish_to = new HashSet<>();
topics_to_publish_to.add(topic);
broker_and_topics_to_publish_to.put(broker_ip,topics_to_publish_to);
log.error("changed1");
publisher_configuration_changed = true;
}else{
if (!broker_and_topics_to_publish_to.get(broker_ip).contains(topic)){
broker_and_topics_to_publish_to.get(broker_ip).add(topic);
log.error("changed2");
publisher_configuration_changed = true;
}
else{
publisher_configuration_changed = false;
}
}
log.error("preliminary_outside");
if (publisher_configuration_changed){
log.error("preliminary_inside1");
// for (String current_broker_ip : broker_and_topics_to_publish_to.keySet()){
log.info("Publisher configuration changed, creating new connector at "+broker_ip+" for topic "+topic);
if (active_connector!=null) {
active_connector.stop(new ArrayList<>(), publishers);
}
publishers.clear();
for (String broker_topic : broker_and_topics_to_publish_to.get(broker_ip)){
log.error("preliminary_inside2");
//ArrayList<Publisher> publishers = new ArrayList<>();
SyncedPublisher publisher = new SyncedPublisher("resource_manager_"+broker_topic, broker_topic, true, true);
publishers.add(publisher);
if (broker_topic.equals(topic)){
log.error("inside_assignment_to_private_publisher_instance");
this.private_publisher_instance = (SyncedPublisher) publishers.get(publishers.size()-1);
this.topic = broker_topic;
this.broker_ip = broker_ip;
}
}
//CustomConnectorHandler custom_handler = new CustomConnectorHandler();
active_connector = new ExtendedConnector("resource_manager"
, new CustomConnectorHandler() {}
, publishers
, List.of(),
false,
false,
new StaticExnConfig(
broker_ip,
broker_port,
brokerUsername,
brokerPassword,
60,
EMPTY
)
);
active_connector.start();
}
}
public Map publish_for_response (String json_string_content, Collection<String> application_names){
Map reply = null;
HashMap<String,Object> payload = new HashMap<>();
HashMap<String,String> metadata = new HashMap<>();
metadata.put("user","admin");
metadata.put("type","edge");
if (application_names!=null && !application_names.isEmpty()) {
for (String application_name : application_names) {
boolean successful_json_parsing = false;
JSONParser parser = new JSONParser();
JSONObject json_object = new JSONObject();
try {
json_object = (JSONObject) parser.parse(json_string_content);
successful_json_parsing = true;
} catch (ParseException p) {
log.warn("Could not parse the string content to be published to the broker as json, which is the following: " + json_string_content);
}
metadata.put("jobId",application_name);
payload.put("metaData",metadata);
if (private_publisher_instance != null) {
//reply = private_publisher_instance.sendSync(json_object, application_name, null, false);
if (successful_json_parsing) {
json_object.put("jobId",application_name);
payload.put("body",json_object.toJSONString());
reply = private_publisher_instance.sendSync(payload, application_name, null, false);
}else{
payload.put("body",json_string_content);
log.warn(Marker.ANY_MARKER,"Sending the original json string without any modification as its parsing was not successful");
reply = private_publisher_instance.sendSync(payload, application_name, null, false);
}
} else {
log.error("Could not send message to AMQP broker, as the private publisher instance is null (is broker ip specified?)");
}
}
}else{ //Send an empty string for application
JSONParser parser = new JSONParser();
JSONObject json_object = new JSONObject();
try {
json_object = (JSONObject) parser.parse(json_string_content);
} catch (ParseException p) {
log.warn("Could not parse the string content to be published to the broker as json, which is the following: " + json_string_content);
}
if (private_publisher_instance != null) {
log.info("Sending new synchronous message\n"+json_object.toJSONString());
reply = private_publisher_instance.sendSync(json_object,EMPTY, null, false);
log.info("Sent new synchronous message\n"+json_object.toJSONString());
} else {
log.error("Could not send message to AMQP broker, as the private publisher instance is null (is broker ip specified?)");
}
}
return reply;
}
}

View File

@ -2,11 +2,14 @@
package eu.nebulous.resource.discovery.monitor;
import eu.nebulous.resource.discovery.ResourceDiscoveryProperties;
import eu.nebulous.resource.discovery.broker_communication.BrokerPublisher;
import java.time.Clock;
import eu.nebulous.resource.discovery.monitor.model.Device;
import eu.nebulous.resource.discovery.monitor.model.DeviceStatus;
import eu.nebulous.resource.discovery.monitor.service.DeviceManagementService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.json.simple.JSONObject;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableAsync;
@ -17,6 +20,7 @@ import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
@ -130,6 +134,12 @@ public class DeviceProcessor implements InitializingBean {
&& device.getCreationDate().isBefore(failedDeviceThreshold) )
{
device.setStatus(DeviceStatus.FAILED);
JSONObject lost_device_message = new JSONObject();
lost_device_message.put("device_name",device.getName());
Clock clock = Clock.systemUTC();
lost_device_message.put("timestamp",(int)(clock.millis()/1000));
BrokerPublisher device_lost_publisher = new BrokerPublisher(processorProperties.getLost_device_topic(), processorProperties.getNebulous_broker_ip_address(), processorProperties.getNebulous_broker_port(),processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), "");
device_lost_publisher.publish(lost_device_message.toJSONString(), Collections.singleton(""));
log.warn("processFailedDevices: Marked as FAILED device with Id: {}", device.getId());
}

View File

@ -7,6 +7,7 @@ import eu.nebulous.resource.discovery.monitor.model.DeviceException;
import eu.nebulous.resource.discovery.monitor.service.DeviceLifeCycleRequestService;
import eu.nebulous.resource.discovery.monitor.service.DeviceManagementService;
import eu.nebulous.resource.discovery.registration.model.RegistrationRequestException;
import eu.nebulous.resource.discovery.registration.service.SALRegistrationService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@ -31,6 +32,7 @@ public class DeviceManagementController {
private final DeviceProcessor deviceProcessor;
private final DeviceManagementService deviceService;
private final DeviceLifeCycleRequestService deviceLifeCycleRequestService;
private final SALRegistrationService salRegistrationService;
private boolean isAuthenticated(Authentication authentication) {
return authentication!=null && StringUtils.isNotBlank(authentication.getName());
@ -83,6 +85,8 @@ public class DeviceManagementController {
@PutMapping(value = "/device", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
public Device createDevice(@RequestBody Device device) {
salRegistrationService.register(device);
return deviceService.save(device);
}

View File

@ -21,6 +21,7 @@ public class Device {
private String name;
private String owner;
private String ipAddress;
private int port;
private DeviceLocation location;
private String username;
@ToString.Exclude
@ -40,6 +41,7 @@ public class Device {
private String nodeReference;
@Setter(AccessLevel.NONE)
@Builder.Default
private List<String> messages = new ArrayList<>();
private DeviceStatusUpdate statusUpdate;

View File

@ -192,6 +192,7 @@ public class DeviceManagementService {
result.get().setArchiveDate(Instant.now());
archivedDeviceRepository.save(deviceConversionService.toArchivedDevice(result.get()));
deviceRepository.delete(result.get());
//XXX:TODO: Send notification to SAL to deregister Device
}
public void unarchiveDevice(String id, Map<String,String> credentials) {
@ -208,6 +209,7 @@ public class DeviceManagementService {
restoredDevice.setPublicKey(credentials.get("publicKey").toCharArray());
deviceRepository.save(restoredDevice);
archivedDeviceRepository.deleteById(result.get().getId());
//XXX:TODO: Send notification to SAL to re-register Device
}
private void checkCredentials(String id, Map<String, String> credentials) {

View File

@ -9,18 +9,19 @@ import eu.nebulous.resource.discovery.monitor.model.Device;
import eu.nebulous.resource.discovery.monitor.model.DeviceStatus;
import eu.nebulous.resource.discovery.registration.model.RegistrationRequest;
import eu.nebulous.resource.discovery.registration.service.RegistrationRequestService;
import eu.nebulous.resource.discovery.registration.service.SALRegistrationService;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Service;
//import org.springframework.stereotype.Service;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
@Slf4j
@Service
//@Service
public class UnknownDeviceRegistrationService extends AbstractMonitorService {
private final static List<String> MONITORED_REQUEST_TYPES = List.of(
REQUEST_TYPE.INFO.name(),
@ -29,16 +30,18 @@ public class UnknownDeviceRegistrationService extends AbstractMonitorService {
);
private final RegistrationRequestService registrationRequestService;
private final DeviceManagementService deviceManagementService;
private final SALRegistrationService salRegistrationService;
private final Map<String, String> detectedDevices = Collections.synchronizedMap(new LinkedHashMap<>());
private final List<Map> deviceDetailsQueue = Collections.synchronizedList(new LinkedList<>());
public UnknownDeviceRegistrationService(ResourceDiscoveryProperties monitorProperties, TaskScheduler taskScheduler,
ObjectMapper objectMapper, DeviceManagementService deviceManagementService,
RegistrationRequestService registrationRequestService, BrokerUtil brokerUtil)
RegistrationRequestService registrationRequestService, BrokerUtil brokerUtil, SALRegistrationService salRegistrationService)
{
super("UnknownDeviceRegistrationService", monitorProperties, taskScheduler, objectMapper, brokerUtil);
this.registrationRequestService = registrationRequestService;
this.deviceManagementService = deviceManagementService;
this.salRegistrationService = salRegistrationService;
}
@Override
@ -243,6 +246,11 @@ public class UnknownDeviceRegistrationService extends AbstractMonitorService {
.build();
newDevice = deviceManagementService.save(newDevice);
log.info("UnknownDeviceRegistrationService: Registered device: {}", newDevice);
log.info("Registering the device {} to SAL...",newDevice);
salRegistrationService.register(newDevice);
} catch (Exception e) {
log.warn("UnknownDeviceRegistrationService: EXCEPTION while processing device details response: {}\nException: ", map, e);
}

View File

@ -9,6 +9,7 @@ import eu.nebulous.resource.discovery.monitor.service.DeviceManagementService;
import eu.nebulous.resource.discovery.registration.model.RegistrationRequest;
import eu.nebulous.resource.discovery.registration.model.RegistrationRequestStatus;
import eu.nebulous.resource.discovery.registration.service.RegistrationRequestService;
import eu.nebulous.resource.discovery.registration.service.SALRegistrationService;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -50,6 +51,7 @@ public class RegistrationRequestProcessor implements IRegistrationRequestProcess
private final ResourceDiscoveryProperties processorProperties;
private final RegistrationRequestService registrationRequestService;
private final DeviceManagementService deviceManagementService;
private final SALRegistrationService salRegistrationService;
private final TaskScheduler taskScheduler;
private final ObjectMapper objectMapper;
private final BrokerUtil brokerUtil;
@ -177,6 +179,7 @@ public class RegistrationRequestProcessor implements IRegistrationRequestProcess
"deviceOs", registrationRequest.getDevice().getOs(),
"deviceName", registrationRequest.getDevice().getName(),
"deviceIpAddress", registrationRequest.getDevice().getIpAddress(),
"devicePort", Integer.toString( registrationRequest.getDevice().getPort() ),
"deviceUsername", registrationRequest.getDevice().getUsername(),
"devicePassword", new String(registrationRequest.getDevice().getPassword()),
"devicePublicKey", new String(registrationRequest.getDevice().getPublicKey())
@ -257,7 +260,7 @@ public class RegistrationRequestProcessor implements IRegistrationRequestProcess
String ipAddress = registrationRequest.getDevice().getIpAddress();
boolean isError = false;
if (StringUtils.isNotBlank(deviceIpAddress) && ! StringUtils.equals(ipAddress, deviceIpAddress)) {
String mesg = String.format("Device IP address in RESPONSE does not match with that in the request: id=%s, ip-address-response=%s != ip-address-in-request%s", requestId, deviceIpAddress, ipAddress);
String mesg = String.format("Device IP address in RESPONSE does not match with that in the request: id=%s, ip-address-response=%s != ip-address-in-request=%s", requestId, deviceIpAddress, ipAddress);
log.warn("processResponse: {}", mesg);
registrationRequest.getMessages().add(mesg);
isError = true;
@ -367,5 +370,6 @@ public class RegistrationRequestProcessor implements IRegistrationRequestProcess
device.setRequestId(registrationRequest.getId());
device.setNodeReference(registrationRequest.getNodeReference());
deviceManagementService.save(device);
salRegistrationService.register(device);
}
}

View File

@ -18,6 +18,7 @@ public class Device {
private String name;
private String owner;
private String ipAddress;
private int port;
private DeviceLocation location;
private String username;
@ToString.Exclude

View File

@ -1,9 +1,6 @@
package eu.nebulous.resource.discovery.registration.model;
import lombok.AccessLevel;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
@ -25,9 +22,11 @@ public class RegistrationRequest {
private Instant lastUpdateDate;
private Instant archiveDate;
private RegistrationRequestStatus status;
@Builder.Default
private List<RegistrationRequestHistoryEntry> history = new ArrayList<>();
private String nodeReference;
@Setter(AccessLevel.NONE)
@Builder.Default
private List<String> messages = new ArrayList<>();
// Required in order BeanUtils.copyProperties() to also copy this

View File

@ -0,0 +1,117 @@
package eu.nebulous.resource.discovery.registration.service;
import eu.nebulous.resource.discovery.ResourceDiscoveryProperties;
import eu.nebulous.resource.discovery.broker_communication.SynchronousBrokerPublisher;
import eu.nebulous.resource.discovery.monitor.model.Device;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.json.simple.JSONObject;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import static eu.nebulous.resource.discovery.broker_communication.SALCommunicator.*;
@Slf4j
@Service
public class SALRegistrationService implements InitializingBean {
@Autowired
private final ResourceDiscoveryProperties processorProperties;
public SALRegistrationService(ResourceDiscoveryProperties processorProperties) {
this.processorProperties = processorProperties;
}
public void register(Device device) {
String application_name = "default-application"; //TODO decide on this
Map<String,String> device_info = device.getDeviceInfo();
/* Information available from the EMS, based on https://gitlab.com/nebulous-project/ems-main/-/blob/master/ems-core/bin/detect.sh?ref_type=heads
echo CPU_SOCKETS=$TMP_NUM_CPUS
echo CPU_CORES=$TMP_NUM_CORES
echo CPU_PROCESSORS=$TMP_NUM_PROCESSORS
echo RAM_TOTAL_KB=$TMP_RAM_TOTAL_KB
echo RAM_AVAILABLE_KB=$TMP_RAM_AVAILABLE_KB
echo RAM_FREE_KB=$TMP_RAM_FREE_KB
echo RAM_USED_KB=$TMP_RAM_USED_KB
echo RAM_UTILIZATION=$TMP_RAM_UTILIZATION
echo DISK_TOTAL_KB=$TMP_DISK_TOTAL_KB
echo DISK_FREE_KB=$TMP_DISK_FREE_KB
echo DISK_USED_KB=$TMP_DISK_USED_KB
echo DISK_UTILIZATION=$TMP_DISK_UTILIZATION
echo OS_ARCHITECTURE=$TMP_ARCHITECTURE
echo OS_KERNEL=$TMP_KERNEL
echo OS_KERNEL_RELEASE=$TMP_KERNEL_RELEASE
*/
String device_name = device.getName();
Integer cores = Integer.parseInt(device_info.get("CPU_PROCESSORS"));
Integer ram_gb = Integer.parseInt(device_info.get("RAM_TOTAL_KB"))/1000000;
Integer disk_gb = Integer.parseInt(device_info.get("DISK_TOTAL_KB"))/1000000;
String external_ip_address = device.getIpAddress();
String device_username = device.getUsername();
String device_password = new String(device.getPassword());
String device_pub_key = new String(device.getPublicKey()); //TODO get here private key instead and pass this to device registration
//TODO implement provider here: String provider = device.getProvider();
//String network_rx =device_info.get("RX");
//String network_tx = device_info.get("TX");
Clock clock = Clock.systemUTC();
//JSONObject register_device_message = new JSONObject();
//register_device_message.put("device_name",device_name);
//register_device_message.put("timestamp",(int)(clock.millis()/1000));
String register_device_message_string = get_device_registration_json("10.100.100",external_ip_address,cores,ram_gb,disk_gb,device_name,"test_provider","Athens","Greece", device_username, device_password);
log.error("topic is "+get_registration_topic_name(application_name));
log.error("broker ip is "+processorProperties.getNebulous_broker_ip_address());
log.error("broker port is "+processorProperties.getNebulous_broker_port());
log.error("username is "+processorProperties.getNebulous_broker_username());
log.error("password is "+processorProperties.getNebulous_broker_password());
//String sal_running_applications_reply = request_running_applications_AMQP();
//ArrayList<String> applications = get_running_applications(sal_running_applications_reply);
//for (String application_name:applications) {
SynchronousBrokerPublisher register_device_publisher = new SynchronousBrokerPublisher(get_registration_topic_name(application_name), processorProperties.getNebulous_broker_ip_address(),processorProperties.getNebulous_broker_port(), processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), "");
//TODO handle the response here
Map response = register_device_publisher.publish_for_response(register_device_message_string, Collections.singleton(application_name));
log.info("The response received while trying to register device " + device_name);
//}
/* This is some realtime information, could be retrieved with a different call to the EMS.
CurrDateTime: 1709207141
UpDateTime: 1709186638
Uptime: 20503
CPU: 0
RAM: 31.4725
DISK: 10.3586
RX: 0
TX: 0
*/
}
private String get_registration_topic_name(String application_name) {
return "eu.nebulouscloud.exn.sal.node.create";
//return ("eu.nebulouscloud.exn.sal.edge." + application_name);
}
@Override
public void afterPropertiesSet() throws Exception {
if ( processorProperties.getNebulous_broker_password()!=null &&
processorProperties.getNebulous_broker_username()!=null &&
processorProperties.getNebulous_broker_ip_address()!=null
){
log.info("Successful setting of properties for communication with SAL");
}else{
log.error("broker ip is "+processorProperties.getNebulous_broker_ip_address());
log.error("username is "+processorProperties.getNebulous_broker_username());
log.error("password is "+processorProperties.getNebulous_broker_password());
throw new Exception("Required data is null - broker ip is "+processorProperties.getNebulous_broker_ip_address()+" username is "+processorProperties.getNebulous_broker_username()+" password is "+processorProperties.getNebulous_broker_password());
}
}
}

View File

@ -20,6 +20,13 @@ discovery:
brokerURL: "ssl://localhost:61617?daemon=true&trace=false&useInactivityMonitor=false&connectionTimeout=0&keepAlive=true"
brokerUsername: "aaa"
brokerPassword: "111"
nebulous_broker_ip_address: "nebulous-activemq"
nebulous_broker_port: 5672
nebulous_broker_username: "admin"
nebulous_broker_password: "admin"
sal_host: "localhost"
sal_port: 8080
lost_device_topic: "eu.nebulouscloud.monitoring.device_lost"
trustStoreFile: tests/config/broker-truststore.p12
trustStorePassword: melodic
trustStoreType: PKCS12

View File

@ -287,6 +287,13 @@ function flattenObject(ob) {
<input type="text" readonly class="form-control-plaintext" id="device#ipAddress" value="" placeholder="Device IP address">
</div>
</div>
<!-- Device Port -->
<div class="form-group row">
<label for="device#port" class="col-sm-2 col-form-label"><b>SSH port</b></label>
<div class="col-sm-10">
<input type="text" readonly class="form-control-plaintext" id="device#port" value="" placeholder="Device SSH port">
</div>
</div>
<!-- Device Location -->
<div class="form-group row">
<label for="device#location#name" class="col-sm-2 col-form-label"><b>Location</b></label>

View File

@ -300,6 +300,13 @@ function flattenObject(ob) {
<input type="text" readonly class="form-control" id="request#device#ipAddress" value="" placeholder="Device IP address">
</div>
</div>
<!-- Device Port -->
<div class="form-group row">
<label for="request#device#port" class="col-sm-2 col-form-label"><b>SSH port</b></label>
<div class="col-sm-10">
<input type="text" readonly class="form-control" id="request#device#port" value="" placeholder="Device SSH port">
</div>
</div>
<!-- Device Location -->
<div class="form-group row">
<label for="request#device#location#name" class="col-sm-2 col-form-label"><b>Location</b></label>

View File

@ -404,6 +404,13 @@ function sendDeviceData(deviceData) {
<input type="text" readonly class="form-control-plaintext" id="device#ipAddress" value="" placeholder="Device IP address">
</div>
</div>
<!-- Device Port -->
<div class="form-group row">
<label for="device#port" class="col-sm-2 col-form-label"><b>SSH port</b></label>
<div class="col-sm-10">
<input type="text" readonly class="form-control-plaintext" id="device#port" value="" placeholder="Device SSH port">
</div>
</div>
<!-- Device Location -->
<div class="form-group row">
<label for="device#location#name" class="col-sm-2 col-form-label"><b>Location</b></label>

View File

@ -458,6 +458,13 @@ function sendRequestData(requestData) {
<input type="text" class="form-control" id="request#device#ipAddress" value="" placeholder="Device IP address">
</div>
</div>
<!-- Device Port -->
<div class="form-group row">
<label for="request#device#port" class="col-sm-2 col-form-label"><b>SSH port</b></label>
<div class="col-sm-10">
<input type="text" class="form-control" id="request#device#port" value="" placeholder="Device SSH port">
</div>
</div>
<!-- Device Location -->
<div class="form-group row">
<label for="request#device#location#name" class="col-sm-2 col-form-label"><b>Location</b></label>