Copied new changes from RD repository

Change-Id: I38c891816cc3258b10c98b4824400e9eeef19200
This commit is contained in:
ipatini 2024-04-10 14:24:45 +03:00
parent a7bad9a798
commit a465f2e6ce
8 changed files with 121 additions and 47 deletions

View File

@ -6,13 +6,13 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.1</version>
<version>3.2.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>eu.nebulous.resource-management</groupId>
<artifactId>resource-discovery</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.2-SNAPSHOT</version>
<name>Resource discovery service</name>
<description>Nebulous resource discovery service</description>
@ -37,6 +37,12 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
@ -120,17 +126,6 @@
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<dependencyManagement>

View File

@ -6,7 +6,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
@ -59,6 +58,11 @@ public class ResourceDiscoveryProperties {
private String deviceLifeCycleRequestsTopic = "ems.client.installation.requests";
private String deviceLifeCycleResponsesTopic = "ems.client.installation.reports";
// SAL registration settings
private boolean salRegistrationEnabled = true;
private long salRegistrationTimeout = 60*1000;
private String registration_topic_name = "eu.nebulouscloud.exn.sal.node.create";
// Failed devices detection
private boolean automaticFailedDetection = true;
private long suspectDeviceThreshold = 5; // in minutes
@ -89,8 +93,8 @@ public class ResourceDiscoveryProperties {
private String nebulous_broker_ip_address;
private int nebulous_broker_port;
private String nebulous_broker_username;
private String lost_device_topic;
private String nebulous_broker_password;
private String lost_device_topic;
@Data
public static class UserData {

View File

@ -50,6 +50,8 @@ public class Device {
private Instant suspectTimestamp;
private int retries;
private boolean registeredToSAL;
public void incrementRetries() {
retries++;
}

View File

@ -6,7 +6,6 @@ import eu.nebulous.resource.discovery.ResourceDiscoveryProperties;
import eu.nebulous.resource.discovery.common.BrokerUtil;
import eu.nebulous.resource.discovery.monitor.model.Device;
import eu.nebulous.resource.discovery.monitor.model.DeviceMetrics;
import eu.nebulous.resource.discovery.monitor.model.DeviceStatus;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@ -32,6 +31,7 @@ public class DeviceMetricsMonitorService extends AbstractMonitorService {
{
super("DeviceMetricsMonitorService", monitorProperties, taskScheduler, objectMapper, brokerUtil);
this.deviceManagementService = deviceManagementService;
log.trace("DeviceMetricsMonitorService.<INIT>: {}", monitorProperties);
}
@Override
@ -41,6 +41,7 @@ public class DeviceMetricsMonitorService extends AbstractMonitorService {
protected void processPayload(@NonNull Map<?,?> dataMap) {
Object obj = dataMap.get("message");
log.trace("DeviceMetricsMonitorService: dataMap={}, message={}, message-class={}", dataMap, obj, obj!=null ? obj.getClass() : null);
if (obj==null) {
log.debug("DeviceMetricsMonitorService: Message does not contain device metrics (message field is null): {}", dataMap);
return;
@ -63,6 +64,7 @@ public class DeviceMetricsMonitorService extends AbstractMonitorService {
String clientId = stringValue(metricsMap.get("clientId"));
String ipAddress = stringValue(metricsMap.get("ipAddress"));
String timestampStr = stringValue(metricsMap.get("receivedAtServer"));
log.debug("DeviceMetricsMonitorService: client={}, ip={}, ts={}", clientId, ipAddress, timestampStr);
if (clientId.isEmpty() || ipAddress.isEmpty() || timestampStr.isEmpty()) {
log.warn("DeviceMetricsMonitorService: Device metrics received do not contain clientId or ipAddress or receivedAtServer. Ignoring them: {}", metricsMap);
return;
@ -72,9 +74,18 @@ public class DeviceMetricsMonitorService extends AbstractMonitorService {
// Get registered device using IP address
Optional<Device> result = deviceManagementService.getByIpAddress(ipAddress);
log.debug("DeviceMetricsMonitorService: device-by-ip: {}", result);
if (result.isEmpty()) {
log.debug("DeviceMetricsMonitorService: Device metrics IP address does not match any registered device: {}", infoMap);
return;
result = deviceManagementService.getAll().stream()
.filter(d->d.getStatusUpdate()!=null)
.filter(d->StringUtils.isNotBlank(d.getStatusUpdate().getClientId()))
.filter(d->StringUtils.equalsIgnoreCase(d.getStatusUpdate().getClientId(), clientId))
.findAny();
log.debug("DeviceMetricsMonitorService: device-by-clientId: {}", result);
if (result.isEmpty())
return;
}
Device device = result.get();
@ -115,6 +126,7 @@ public class DeviceMetricsMonitorService extends AbstractMonitorService {
deviceManagementService.update(device);
log.debug("DeviceMetricsMonitorService: Device metrics updated for device: id={}, ip-address={}, update={}",
device.getId(), device.getIpAddress(), metrics);
log.debug("DeviceMetricsMonitorService: Device statistics updated: {}", device);
} catch (Exception e) {
log.warn("DeviceMetricsMonitorService: EXCEPTION while processing device metrics map: {}\n", infoMap, e);
}

View File

@ -370,6 +370,7 @@ public class RegistrationRequestProcessor implements IRegistrationRequestProcess
device.setRequestId(registrationRequest.getId());
device.setNodeReference(registrationRequest.getNodeReference());
deviceManagementService.save(device);
salRegistrationService.register(device);
if (processorProperties.isSalRegistrationEnabled())
salRegistrationService.queueForRegistration(device);
}
}

View File

@ -3,28 +3,36 @@ package eu.nebulous.resource.discovery.registration.service;
import eu.nebulous.resource.discovery.ResourceDiscoveryProperties;
import eu.nebulous.resource.discovery.broker_communication.SynchronousBrokerPublisher;
import eu.nebulous.resource.discovery.monitor.model.Device;
import eu.nebulous.resource.discovery.monitor.service.DeviceManagementService;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.json.simple.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Service;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;
import static eu.nebulous.resource.discovery.broker_communication.SALCommunicator.*;
import static eu.nebulous.resource.discovery.broker_communication.SALCommunicator.get_device_registration_json;
@Slf4j
@Service
@RequiredArgsConstructor
public class SALRegistrationService implements InitializingBean {
@Autowired
private final DeviceManagementService deviceManagementService;
private final ResourceDiscoveryProperties processorProperties;
private final TaskExecutor taskExecutor;
private final LinkedBlockingDeque<Device> queue = new LinkedBlockingDeque<>();
private Thread processQueueThread;
private long lastRegistrationStartTimestamp = -1L;
public SALRegistrationService(ResourceDiscoveryProperties processorProperties) {
this.processorProperties = processorProperties;
public void queueForRegistration(@NonNull Device device) {
if (processorProperties.isSalRegistrationEnabled())
queue.add(device);
}
public void register(Device device) {
@ -50,9 +58,9 @@ public class SALRegistrationService implements InitializingBean {
*/
String device_name = device.getName();
Integer cores = Integer.parseInt(device_info.get("CPU_PROCESSORS"));
Integer ram_gb = Integer.parseInt(device_info.get("RAM_TOTAL_KB"))/1000000;
Integer disk_gb = Integer.parseInt(device_info.get("DISK_TOTAL_KB"))/1000000;
int cores = Integer.parseInt(device_info.get("CPU_PROCESSORS"));
int ram_gb = Integer.parseInt(device_info.get("RAM_TOTAL_KB"))/1000000;
int disk_gb = Integer.parseInt(device_info.get("DISK_TOTAL_KB"))/1000000;
String external_ip_address = device.getIpAddress();
String device_username = device.getUsername();
String device_password = new String(device.getPassword());
@ -68,11 +76,11 @@ public class SALRegistrationService implements InitializingBean {
//register_device_message.put("timestamp",(int)(clock.millis()/1000));
String register_device_message_string = get_device_registration_json("10.100.100",external_ip_address,cores,ram_gb,disk_gb,device_name,"test_provider","Athens","Greece", device_username, device_password);
log.error("topic is "+get_registration_topic_name(application_name));
log.error("broker ip is "+processorProperties.getNebulous_broker_ip_address());
log.error("broker port is "+processorProperties.getNebulous_broker_port());
log.error("username is "+processorProperties.getNebulous_broker_username());
log.error("password is "+processorProperties.getNebulous_broker_password());
log.info("topic is {}", get_registration_topic_name(application_name));
log.info("broker ip is {}", processorProperties.getNebulous_broker_ip_address());
log.info("broker port is {}", processorProperties.getNebulous_broker_port());
log.info("username is {}", processorProperties.getNebulous_broker_username());
log.info("password is {}", StringUtils.isNotBlank(processorProperties.getNebulous_broker_password()) ? "<provided>" : "<not provided>");
//String sal_running_applications_reply = request_running_applications_AMQP();
//ArrayList<String> applications = get_running_applications(sal_running_applications_reply);
//for (String application_name:applications) {
@ -96,22 +104,73 @@ public class SALRegistrationService implements InitializingBean {
}
private String get_registration_topic_name(String application_name) {
return "eu.nebulouscloud.exn.sal.node.create";
return processorProperties.getRegistration_topic_name();
//return ("eu.nebulouscloud.exn.sal.edge." + application_name);
}
@Override
public void afterPropertiesSet() throws Exception {
if ( processorProperties.getNebulous_broker_password()!=null &&
processorProperties.getNebulous_broker_username()!=null &&
processorProperties.getNebulous_broker_ip_address()!=null
){
public void afterPropertiesSet() {
if (! processorProperties.isSalRegistrationEnabled()) {
log.info("SAL registration is disabled due to configuration");
return;
}
if ( StringUtils.isNotBlank(processorProperties.getNebulous_broker_ip_address()) &&
StringUtils.isNotBlank(processorProperties.getNebulous_broker_username()) &&
StringUtils.isNotBlank(processorProperties.getNebulous_broker_password()) )
{
log.info("Successful setting of properties for communication with SAL");
}else{
log.error("broker ip is "+processorProperties.getNebulous_broker_ip_address());
log.error("username is "+processorProperties.getNebulous_broker_username());
log.error("password is "+processorProperties.getNebulous_broker_password());
throw new Exception("Required data is null - broker ip is "+processorProperties.getNebulous_broker_ip_address()+" username is "+processorProperties.getNebulous_broker_username()+" password is "+processorProperties.getNebulous_broker_password());
taskExecutor.execute(this::processQueue);
taskExecutor.execute(this::checkProcessQueue);
} else {
String message = String.format("Nebulous broker configuration is missing: ip-address=%s, username=%s, password=%s",
processorProperties.getNebulous_broker_ip_address(),
processorProperties.getNebulous_broker_username(),
StringUtils.isNotBlank(processorProperties.getNebulous_broker_password()) ? "<provided>" : "<not provided>");
log.error(message);
throw new RuntimeException(message);
}
}
public void processQueue() {
processQueueThread = Thread.currentThread();
while (true) {
Device device = null;
try {
device = queue.take();
log.warn("SALRegistrationService: processQueue(): Will register device: {}", device);
lastRegistrationStartTimestamp = System.currentTimeMillis();
register(device);
lastRegistrationStartTimestamp = -1L;
device.setRegisteredToSAL(true);
deviceManagementService.update(device);
log.warn("SALRegistrationService: processQueue(): Device registered to SAL: {}", device);
} catch (InterruptedException e) {
log.warn("SALRegistrationService: processQueue(): Interrupted. Will not register device to SAL: {}", device);
lastRegistrationStartTimestamp = -1L;
// break;
} catch (Exception e) {
log.warn("SALRegistrationService: processQueue(): EXCEPTION caught. Will not register device to SAL: {}", device, e);
lastRegistrationStartTimestamp = -1L;
}
}
}
public void checkProcessQueue() {
while (true) {
try {
Thread.sleep(1000);
if (processQueueThread!=null && lastRegistrationStartTimestamp > 0) {
long runningTime = System.currentTimeMillis() - lastRegistrationStartTimestamp;
if (runningTime > processorProperties.getSalRegistrationTimeout()) {
log.warn("SALRegistrationService: checkProcessQueue(): Method 'processQueue' is running for too log. Will attempt to interrupt it");
processQueueThread.interrupt();
lastRegistrationStartTimestamp = -1L;
}
}
} catch (Exception e) {
log.warn("SALRegistrationService: checkProcessQueue(): EXCEPTION caught: ", e);
}
}
}
}

View File

@ -27,9 +27,9 @@ discovery:
sal_host: "localhost"
sal_port: 8080
lost_device_topic: "eu.nebulouscloud.monitoring.device_lost"
trustStoreFile: tests/config/broker-truststore.p12
trustStorePassword: melodic
trustStoreType: PKCS12
# trustStoreFile: tests/config/broker-truststore.p12
# trustStorePassword: melodic
# trustStoreType: PKCS12
allowedDeviceInfoKeys:
- '*'
# NOTE:

View File

@ -9,3 +9,4 @@ ${AnsiColor.012} ╚═╝ ╚═╝╚══════╝╚═════
${AnsiColor.046} :: App version :: ${AnsiColor.87} (${application.version})
${AnsiColor.046} :: Spring Boot :: ${AnsiColor.87} ${spring-boot.formatted-version}
${AnsiColor.046} :: Java (TM) :: ${AnsiColor.87} (${java.version})
${AnsiColor.DEFAULT}${AnsiStyle.NORMAL}