diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerPublisher.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerPublisher.java index 03e8665..852167c 100644 --- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerPublisher.java +++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerPublisher.java @@ -26,9 +26,13 @@ public class BrokerPublisher { private int broker_port; public BrokerPublisher(String topic, String broker_ip, int broker_port, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) { + this(topic,broker_ip,broker_port,brokerUsername,brokerPassword,amqLibraryConfigurationLocation,false); + } + public BrokerPublisher(String topic, String broker_ip, int broker_port, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation, boolean hard_initialize_connector) { boolean able_to_initialize_BrokerPublisher = topic!=null && broker_ip!=null && brokerUsername!=null && brokerPassword!=null && !topic.equals(EMPTY) && !broker_ip.equals(EMPTY) && !brokerUsername.equals(EMPTY) && !brokerPassword.equals(EMPTY); if (!able_to_initialize_BrokerPublisher){ + log.error("Could not initialize BrokerPublisher"); return; } boolean publisher_configuration_changed; @@ -48,7 +52,7 @@ public class BrokerPublisher { } - if (publisher_configuration_changed){ + if (publisher_configuration_changed || hard_initialize_connector){ // for (String current_broker_ip : broker_and_topics_to_publish_to.keySet()){ log.info("Publisher configuration changed, creating new connector at "+broker_ip+" for topic "+topic); if (active_connector!=null) { @@ -99,12 +103,15 @@ public class BrokerPublisher { } catch (ParseException p) { log.warn( "Could not parse the string content to be published to the broker as json, which is the following: "+json_string_content); } - if (private_publisher_instance != null) { + if (!is_publisher_null()) { private_publisher_instance.send(json_object); log.info("Sent new message\n"+json_object.toJSONString()); } else { - log.error( "Could not send message to AMQP broker, as the broker ip to be used has not been specified"); + log.error( "Could not send message to AMQP broker, as the publisher instance is null"); } } } + public boolean is_publisher_null(){ + return (private_publisher_instance == null); + } } diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/SynchronousBrokerPublisher.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/SynchronousBrokerPublisher.java index 5cdf4b9..3db8fa6 100644 --- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/SynchronousBrokerPublisher.java +++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/SynchronousBrokerPublisher.java @@ -22,12 +22,16 @@ public class SynchronousBrokerPublisher { private ExtendedConnector active_connector; private String topic; private String broker_ip; - public SynchronousBrokerPublisher(String topic, String broker_ip, int broker_port, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) { + this(topic, broker_ip, broker_port, brokerUsername, brokerPassword, amqLibraryConfigurationLocation,false); + } + + public SynchronousBrokerPublisher(String topic, String broker_ip, int broker_port, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation,boolean hard_initialize_connector) { boolean able_to_initialize_BrokerPublisher = topic!=null && broker_ip!=null && brokerUsername!=null && brokerPassword!=null && !topic.equals(EMPTY) && !broker_ip.equals(EMPTY) && !brokerUsername.equals(EMPTY) && !brokerPassword.equals(EMPTY); if (!able_to_initialize_BrokerPublisher){ + log.error("Unable to initialize SynchronousBrokerPublisher"); return; } boolean publisher_configuration_changed; @@ -49,7 +53,7 @@ public class SynchronousBrokerPublisher { } //log.error("preliminary_outside"); - if (publisher_configuration_changed){ + if (publisher_configuration_changed || hard_initialize_connector){ //log.error("preliminary_inside1"); // for (String current_broker_ip : broker_and_topics_to_publish_to.keySet()){ log.info("Publisher configuration changed, creating new connector at "+broker_ip+" for topic "+topic); @@ -71,7 +75,7 @@ public class SynchronousBrokerPublisher { } //CustomConnectorHandler custom_handler = new CustomConnectorHandler(); - active_connector = new ExtendedConnector("resource_manager" + active_connector = new ExtendedConnector("resource_manager_synchronous" , new CustomConnectorHandler() {} , publishers , List.of(), @@ -146,4 +150,7 @@ public class SynchronousBrokerPublisher { } return reply; } + public boolean is_publisher_null(){ + return (private_publisher_instance == null); + } } diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/DeviceProcessor.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/DeviceProcessor.java index 8fe87bb..4b8d893 100644 --- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/DeviceProcessor.java +++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/DeviceProcessor.java @@ -138,7 +138,26 @@ public class DeviceProcessor implements InitializingBean { lost_device_message.put("device_name",device.getName()); Clock clock = Clock.systemUTC(); lost_device_message.put("timestamp",(int)(clock.millis()/1000)); - BrokerPublisher device_lost_publisher = new BrokerPublisher(processorProperties.getLost_device_topic(), processorProperties.getNebulous_broker_ip_address(), processorProperties.getNebulous_broker_port(),processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), ""); + log.info("Creating new BrokerPublisher to publish device lost message"); + BrokerPublisher device_lost_publisher = new BrokerPublisher(processorProperties.getLost_device_topic(), processorProperties.getNebulous_broker_ip_address(), processorProperties.getNebulous_broker_port(), processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), ""); + int sending_attempt = 1; + while (device_lost_publisher.is_publisher_null()){ + + try { + log.info("Attempting to recreate new BrokerPublisher to publish the device lost message"); + log.info("The topic name is "+processorProperties.getLost_device_topic()+", the broker ip is "+ processorProperties.getNebulous_broker_ip_address()+", the broker port is "+ processorProperties.getNebulous_broker_port()+", the username is "+ processorProperties.getNebulous_broker_username()+", and the password is "+ processorProperties.getNebulous_broker_password()); + if (sending_attempt<=2) { + device_lost_publisher = new BrokerPublisher(processorProperties.getLost_device_topic(), processorProperties.getNebulous_broker_ip_address(), processorProperties.getNebulous_broker_port(), processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), ""); + }else{ + log.warn("Will now attempt to reset the BrokerPublisher connector"); + device_lost_publisher = new BrokerPublisher(processorProperties.getLost_device_topic(), processorProperties.getNebulous_broker_ip_address(), processorProperties.getNebulous_broker_port(), processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), "",true); + } + Thread.sleep(3000); + }catch (InterruptedException i){ + i.printStackTrace(); + } + sending_attempt++; + } device_lost_publisher.publish(lost_device_message.toJSONString(), Collections.singleton("")); log.warn("processFailedDevices: Marked as FAILED device with Id: {}", device.getId()); } diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/service/SALRegistrationService.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/service/SALRegistrationService.java index 607cb1f..153ac91 100644 --- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/service/SALRegistrationService.java +++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/service/SALRegistrationService.java @@ -110,6 +110,21 @@ public class SALRegistrationService implements InitializingBean { //ArrayList applications = get_running_applications(sal_running_applications_reply); //for (String application_name:applications) { SynchronousBrokerPublisher register_device_publisher = new SynchronousBrokerPublisher(get_registration_topic_name(application_name), processorProperties.getNebulous_broker_ip_address(),processorProperties.getNebulous_broker_port(), processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), ""); + int sending_attempt = 1; + while (register_device_publisher.is_publisher_null()){ + if (sending_attempt<=2) { + register_device_publisher = new SynchronousBrokerPublisher(get_registration_topic_name(application_name), processorProperties.getNebulous_broker_ip_address(), processorProperties.getNebulous_broker_port(), processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), ""); + }else{ + log.warn("Will now attempt to reset the Synchronous publisher connector"); + register_device_publisher = new SynchronousBrokerPublisher(get_registration_topic_name(application_name), processorProperties.getNebulous_broker_ip_address(), processorProperties.getNebulous_broker_port(), processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), ""); + } + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + sending_attempt++; + } //TODO handle the response here Map response = register_device_publisher.publish_for_response(register_device_message_string, Collections.singleton(application_name)); log.info("The response received while trying to register device " + device_name + " is "+response.toString());