RD: Bug fix in SAL registration service

Change-Id: I2e821466b2b29dd38d94916c269082abb54f978b
This commit is contained in:
ipatini 2024-05-15 17:59:06 +03:00
parent a0b1e1d08b
commit 72cfafcaf8
4 changed files with 55 additions and 7 deletions

View File

@ -26,9 +26,13 @@ public class BrokerPublisher {
private int broker_port;
public BrokerPublisher(String topic, String broker_ip, int broker_port, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) {
this(topic,broker_ip,broker_port,brokerUsername,brokerPassword,amqLibraryConfigurationLocation,false);
}
public BrokerPublisher(String topic, String broker_ip, int broker_port, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation, boolean hard_initialize_connector) {
boolean able_to_initialize_BrokerPublisher = topic!=null && broker_ip!=null && brokerUsername!=null && brokerPassword!=null && !topic.equals(EMPTY) && !broker_ip.equals(EMPTY) && !brokerUsername.equals(EMPTY) && !brokerPassword.equals(EMPTY);
if (!able_to_initialize_BrokerPublisher){
log.error("Could not initialize BrokerPublisher");
return;
}
boolean publisher_configuration_changed;
@ -48,7 +52,7 @@ public class BrokerPublisher {
}
if (publisher_configuration_changed){
if (publisher_configuration_changed || hard_initialize_connector){
// for (String current_broker_ip : broker_and_topics_to_publish_to.keySet()){
log.info("Publisher configuration changed, creating new connector at "+broker_ip+" for topic "+topic);
if (active_connector!=null) {
@ -99,12 +103,15 @@ public class BrokerPublisher {
} catch (ParseException p) {
log.warn( "Could not parse the string content to be published to the broker as json, which is the following: "+json_string_content);
}
if (private_publisher_instance != null) {
if (!is_publisher_null()) {
private_publisher_instance.send(json_object);
log.info("Sent new message\n"+json_object.toJSONString());
} else {
log.error( "Could not send message to AMQP broker, as the broker ip to be used has not been specified");
log.error( "Could not send message to AMQP broker, as the publisher instance is null");
}
}
}
public boolean is_publisher_null(){
return (private_publisher_instance == null);
}
}

View File

@ -22,12 +22,16 @@ public class SynchronousBrokerPublisher {
private ExtendedConnector active_connector;
private String topic;
private String broker_ip;
public SynchronousBrokerPublisher(String topic, String broker_ip, int broker_port, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) {
this(topic, broker_ip, broker_port, brokerUsername, brokerPassword, amqLibraryConfigurationLocation,false);
}
public SynchronousBrokerPublisher(String topic, String broker_ip, int broker_port, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation,boolean hard_initialize_connector) {
boolean able_to_initialize_BrokerPublisher = topic!=null && broker_ip!=null && brokerUsername!=null && brokerPassword!=null && !topic.equals(EMPTY) && !broker_ip.equals(EMPTY) && !brokerUsername.equals(EMPTY) && !brokerPassword.equals(EMPTY);
if (!able_to_initialize_BrokerPublisher){
log.error("Unable to initialize SynchronousBrokerPublisher");
return;
}
boolean publisher_configuration_changed;
@ -49,7 +53,7 @@ public class SynchronousBrokerPublisher {
}
//log.error("preliminary_outside");
if (publisher_configuration_changed){
if (publisher_configuration_changed || hard_initialize_connector){
//log.error("preliminary_inside1");
// for (String current_broker_ip : broker_and_topics_to_publish_to.keySet()){
log.info("Publisher configuration changed, creating new connector at "+broker_ip+" for topic "+topic);
@ -71,7 +75,7 @@ public class SynchronousBrokerPublisher {
}
//CustomConnectorHandler custom_handler = new CustomConnectorHandler();
active_connector = new ExtendedConnector("resource_manager"
active_connector = new ExtendedConnector("resource_manager_synchronous"
, new CustomConnectorHandler() {}
, publishers
, List.of(),
@ -146,4 +150,7 @@ public class SynchronousBrokerPublisher {
}
return reply;
}
public boolean is_publisher_null(){
return (private_publisher_instance == null);
}
}

View File

@ -138,7 +138,26 @@ public class DeviceProcessor implements InitializingBean {
lost_device_message.put("device_name",device.getName());
Clock clock = Clock.systemUTC();
lost_device_message.put("timestamp",(int)(clock.millis()/1000));
log.info("Creating new BrokerPublisher to publish device lost message");
BrokerPublisher device_lost_publisher = new BrokerPublisher(processorProperties.getLost_device_topic(), processorProperties.getNebulous_broker_ip_address(), processorProperties.getNebulous_broker_port(), processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), "");
int sending_attempt = 1;
while (device_lost_publisher.is_publisher_null()){
try {
log.info("Attempting to recreate new BrokerPublisher to publish the device lost message");
log.info("The topic name is "+processorProperties.getLost_device_topic()+", the broker ip is "+ processorProperties.getNebulous_broker_ip_address()+", the broker port is "+ processorProperties.getNebulous_broker_port()+", the username is "+ processorProperties.getNebulous_broker_username()+", and the password is "+ processorProperties.getNebulous_broker_password());
if (sending_attempt<=2) {
device_lost_publisher = new BrokerPublisher(processorProperties.getLost_device_topic(), processorProperties.getNebulous_broker_ip_address(), processorProperties.getNebulous_broker_port(), processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), "");
}else{
log.warn("Will now attempt to reset the BrokerPublisher connector");
device_lost_publisher = new BrokerPublisher(processorProperties.getLost_device_topic(), processorProperties.getNebulous_broker_ip_address(), processorProperties.getNebulous_broker_port(), processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), "",true);
}
Thread.sleep(3000);
}catch (InterruptedException i){
i.printStackTrace();
}
sending_attempt++;
}
device_lost_publisher.publish(lost_device_message.toJSONString(), Collections.singleton(""));
log.warn("processFailedDevices: Marked as FAILED device with Id: {}", device.getId());
}

View File

@ -110,6 +110,21 @@ public class SALRegistrationService implements InitializingBean {
//ArrayList<String> applications = get_running_applications(sal_running_applications_reply);
//for (String application_name:applications) {
SynchronousBrokerPublisher register_device_publisher = new SynchronousBrokerPublisher(get_registration_topic_name(application_name), processorProperties.getNebulous_broker_ip_address(),processorProperties.getNebulous_broker_port(), processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), "");
int sending_attempt = 1;
while (register_device_publisher.is_publisher_null()){
if (sending_attempt<=2) {
register_device_publisher = new SynchronousBrokerPublisher(get_registration_topic_name(application_name), processorProperties.getNebulous_broker_ip_address(), processorProperties.getNebulous_broker_port(), processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), "");
}else{
log.warn("Will now attempt to reset the Synchronous publisher connector");
register_device_publisher = new SynchronousBrokerPublisher(get_registration_topic_name(application_name), processorProperties.getNebulous_broker_ip_address(), processorProperties.getNebulous_broker_port(), processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), "");
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
sending_attempt++;
}
//TODO handle the response here
Map response = register_device_publisher.publish_for_response(register_device_message_string, Collections.singleton(application_name));
log.info("The response received while trying to register device " + device_name + " is "+response.toString());