diff --git a/.vscode/c_cpp_properties.json b/.vscode/c_cpp_properties.json index acf1e67..4717f0f 100644 --- a/.vscode/c_cpp_properties.json +++ b/.vscode/c_cpp_properties.json @@ -3,19 +3,15 @@ { "name": "Linux", "includePath": [ - "/usr/lib/gcc/x86_64-redhat-linux/13/../../../../include/c++/13", "/home/GHo/Documents/Code/CxxOpts/include", "/home/GHo/Documents/Code/Theron++", - "/home/GHo/Documents/Code/Theron++/Utility", - "/home/GHo/Documents/Code/Theron++/Communication", - "/home/GHo/Documents/Code/Theron++/Communication/AMQ", "/opt/AMPL/amplapi/include/ampl", "${workspaceFolder}/**", - "/home/GHo/Documents/Code/Theron++" + "/usr/lib/gcc/x86_64-redhat-linux/13/../../../../include/c++/13" ], "defines": [], "cStandard": "c23", - "intelliSenseMode": "linux-gcc-x64", + "intelliSenseMode": "${default}", "compilerPath": "/usr/bin/g++", "compilerArgs": [ "-std=c++23", @@ -28,12 +24,9 @@ "configurationProvider": "ms-vscode.makefile-tools", "browse": { "path": [ - "/usr/lib/gcc/x86_64-redhat-linux/13/../../../../include/c++/13", "/home/GHo/Documents/Code/Theron++", "/home/GHo/Documents/Code/CxxOpts/include", - "/home/GHo/Documents/Code/Theron++/Utility", - "/home/GHo/Documents/Code/Theron++/Communication", - "/home/GHo/Documents/Code/Theron++/Examples" + "/usr/lib/gcc/x86_64-redhat-linux/13/../../../../include/c++/13" ], "limitSymbolsToIncludedHeaders": false } diff --git a/AMPLTest/.gitignore b/AMPLTest/.gitignore new file mode 100644 index 0000000..66c9df7 --- /dev/null +++ b/AMPLTest/.gitignore @@ -0,0 +1 @@ +*.ampl diff --git a/ExecutionControl.cpp b/ExecutionControl.cpp index b02e39a..7b57811 100644 --- a/ExecutionControl.cpp +++ b/ExecutionControl.cpp @@ -68,7 +68,7 @@ void ExecutionControl::StopMessageHandler( const StopMessage & Command, } // ----------------------------------------------------------------------------- -// Constructor +// Constructor & destructor // ----------------------------------------------------------------------------- // // The constructor registers the stop message handler and sets up a publisher @@ -91,4 +91,16 @@ ExecutionControl::ExecutionControl( const std::string & TheActorName ) } +// The destructor simply closes the publisher if the network is still active +// when the actor closes. + +ExecutionControl::~ExecutionControl( void ) +{ + if( HasNetwork() ) + Send( Theron::AMQ::NetworkLayer::TopicSubscription( + Theron::AMQ::NetworkLayer::TopicSubscription::Action::ClosePublisher, + std::string( StatusTopic ) + ), GetSessionLayerAddress() ); +} + } // namespace NebulOuS \ No newline at end of file diff --git a/ExecutionControl.hpp b/ExecutionControl.hpp index 6296031..61f6ea8 100644 --- a/ExecutionControl.hpp +++ b/ExecutionControl.hpp @@ -167,7 +167,7 @@ public: ExecutionControl( const std::string & TheActorName ); ExecutionControl() = delete; - virtual ~ExecutionControl() = default; + virtual ~ExecutionControl(); }; } // namespace NebulOuS diff --git a/MetricUpdater.cpp b/MetricUpdater.cpp index ce090c9..39492b9 100644 --- a/MetricUpdater.cpp +++ b/MetricUpdater.cpp @@ -18,6 +18,7 @@ License: MPL2.0 (https://www.mozilla.org/en-US/MPL/2.0/) #include // Container ranges #include // Algorithms +#include "Utility/ConsolePrint.hpp" // For logging #include "Communication/AMQ/AMQEndpoint.hpp" // For Topic subscriptions #include "MetricUpdater.hpp" @@ -41,36 +42,34 @@ namespace NebulOuS void MetricUpdater::AddMetricSubscription( const MetricTopic & TheMetrics, const Address OptimiserController ) { - if( TheMetrics.is_object() && - TheMetrics.at( NebulOuS::MetricList ).is_object() ) - { - JSON MetricList = TheMetrics.at( NebulOuS::MetricList ); - - for( const JSON MetricDefinition : MetricList.items() ) - { - auto [ MetricRecord, NewMetric ] = MetricValues.try_emplace( - MetricDefinition.at( NebulOuS::MetricName ), JSON() ); - - if( NewMetric ) - Send( Theron::AMQ::NetworkLayer::TopicSubscription( - Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription, - MetricRecord->first ), - Theron::AMQ::Network::GetAddress( Theron::Network::Layer::Session) ); - } - } - else + if( TheMetrics.is_object() && + TheMetrics.at( NebulOuS::MetricList ).is_array() ) + { + for (auto & MetricRecord : TheMetrics.at( NebulOuS::MetricList ) ) { - std::source_location Location = std::source_location::current(); - std::ostringstream ErrorMessage; + auto [ MetricRecordPointer, NewMetric ] = MetricValues.try_emplace( + MetricRecord.at( NebulOuS::MetricName ), JSON() ); - ErrorMessage << "[" << Location.file_name() << " at line " << Location.line() - << "in function " << Location.function_name() <<"] " - << "The message to define a new metric subscription is given as " - << std::endl << TheMetrics.dump(2) << std::endl - << "this is not as expected!"; - - throw std::invalid_argument( ErrorMessage.str() ); + if( NewMetric ) + Send( Theron::AMQ::NetworkLayer::TopicSubscription( + Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription, + std::string( MetricValueRootString ) + MetricRecordPointer->first ), + Theron::AMQ::Network::GetAddress( Theron::Network::Layer::Session) ); } + } + else + { + std::source_location Location = std::source_location::current(); + std::ostringstream ErrorMessage; + + ErrorMessage << "[" << Location.file_name() << " at line " << Location.line() + << " in function " << Location.function_name() <<"] " + << "The message to define a new metric subscription is given as " + << std::endl << TheMetrics.dump(2) << std::endl + << "this is not as expected!"; + + throw std::invalid_argument( ErrorMessage.str() ); + } } // The metric update value is received whenever any of subscribed forecasters @@ -102,9 +101,16 @@ void MetricUpdater::AddMetricSubscription( const MetricTopic & TheMetrics, void MetricUpdater::UpdateMetricValue( const MetricValueUpdate & TheMetricValue, const Address TheMetricTopic) { + Theron::ConsoleOutput Output; + + Output << "Metric topic: " << TheMetricTopic.AsString() << std::endl; + Theron::AMQ::TopicName TheTopic = TheMetricTopic.AsString().erase( 0, NebulOuS::MetricValueRootString.size() ); + + Output << "The metric: " << TheTopic << " has new value " + << TheMetricValue[ NebulOuS::ValueLabel ] << std::endl; if( MetricValues.contains( TheTopic ) ) { diff --git a/MetricUpdater.hpp b/MetricUpdater.hpp index db2b7ca..708f123 100644 --- a/MetricUpdater.hpp +++ b/MetricUpdater.hpp @@ -88,7 +88,7 @@ constexpr std::string_view TimePoint = "predictionTime"; // defined next. constexpr std::string_view MetricSubscriptions - = "eu.nebulouscloud.monitoring.metric_lists"; + = "eu.nebulouscloud.monitoring.metric_list"; // The JSON message attribute for the list of metrics is another JSON object // stored under the following key, see the Event type III defined in @@ -211,6 +211,10 @@ private: : JSONTopicMessage( Other ) {} + // MetricTopic( const JSONTopicMessage & Other ) + // : JSONTopicMessage( Other ) + // {} + virtual ~MetricTopic() = default; }; diff --git a/SolverComponent.cpp b/SolverComponent.cpp index ff2fa54..46f98c9 100644 --- a/SolverComponent.cpp +++ b/SolverComponent.cpp @@ -54,6 +54,17 @@ be extended with the AMPL execution file path, e.g., export PATH=$PATH:/opt/AMPL +The parameters to the application are used as described above, and typically the +endpoint is set to some unique identifier of the application for which this +solver is used, e.g., + + ./SolverComponent --AMPLDir /opt/AMPL \ + --Endpoint f81ee-b42a8-a13d56-e28ec9-2f5578 --ModelDir AMPLTest/ + +Debugging after a coredump + + coredumpctl debug SolverComponent + Author and Copyright: Geir Horn, University of Oslo Contact: Geir.Horn@mn.uio.no License: MPL2.0 (https://www.mozilla.org/en-US/MPL/2.0/) @@ -66,6 +77,7 @@ License: MPL2.0 (https://www.mozilla.org/en-US/MPL/2.0/) #include // To format error messages #include // standard exceptions #include // Access to the file system +#include // For extended AMQ properties // Theron++ headers @@ -78,7 +90,9 @@ License: MPL2.0 (https://www.mozilla.org/en-US/MPL/2.0/) // AMQ protocol related headers +#include "proton/symbol.hpp" // AMQ symbols #include "proton/connection_options.hpp" // Options for the Broker +#include "proton/message.hpp" // AMQ messages definitions #include "Communication/AMQ/AMQMessage.hpp" // The AMQP messages #include "Communication/AMQ/AMQEndpoint.hpp" // The AMP endpoint #include "Communication/AMQ/AMQjson.hpp" // Transparent JSON-AMQP @@ -127,6 +141,8 @@ int main( int NumberOfCLIOptions, char ** CLIOptionStrings ) cxxopts::value()->default_value("5672") ) ("S,Solver", "Solver to use, devault Couenne", cxxopts::value()->default_value("couenne") ) + ("T,Tenant", "Tenant identifier for messages", + cxxopts::value()->default_value("TheTenant")) ("U,User", "The user name used for the AMQ Broker connection", cxxopts::value()->default_value("admin") ) ("Pw,Password", "The password for the AMQ Broker connection", @@ -180,29 +196,51 @@ int main( int NumberOfCLIOptions, char ** CLIOptionStrings ) // The AMQ communication is managed by the standard communication actors of // the Theron++ Actor framewokr. Thus, it is just a matter of starting the // endpoint actors with the given command line parameters. - // - // The network endpoint takes the endpoint name as the first argument, then - // the URL for the broker and the port number. The user name and the password - // are defined in the AMQ Qpid Proton connection options, and the values are - // therefore set for the connection options. + + // There are certain properties defined for the NebulOuS communication + // protocol where the endpoint name is taken as the application identifier + // and the tenant identifier and the protocol used. The same information + // has to be given as different maps for the connection options and for + // the message properties. + + std::map< proton::symbol, proton::value > NebulOuSProperties{ + {"x-tenant-id", CLIValues["Tenant"].as< std::string >() }, + {"x-application-id", CLIValues["Endpoint"].as< std::string >()}, + {"x-protocol", "AMQP"} + }; + + proton::message::property_map MessageProperties{ + {"x-tenant-id", CLIValues["Tenant"].as< std::string >() }, + {"x-application-id", CLIValues["Endpoint"].as< std::string >()}, + {"x-protocol", "AMQP"} + }; + + // The user name and the password are defined in the AMQ Qpid Proton + // connection options, and the values are therefore set for the connection + // options. proton::connection_options AMQOptions; AMQOptions.user( CLIValues["User"].as< std::string >() ); AMQOptions.password( CLIValues["Password"].as< std::string >() ); - - // Then the network endpoint cna be constructed using the default names for - // the various network endpoint servers in order to pass the defined - // connection options. + AMQOptions.properties( NebulOuSProperties ); + + // The network endpoint takes the endpoint name as the first argument, then + // the URL for the broker and the port number. Then the network endpoint can + // be constructed using the default names for the Session Layer and the + // Presentation layer servers, but calling the endpoint for "Solver" to make + // it more visible at the AMQ broker listing of subscribers. The endpoint + // will be a unique application identifier. The server names are followed + // by the defined connection options and the message options. Theron::AMQ::NetworkEndpoint AMQNetWork( CLIValues["Endpoint"].as< std::string >(), CLIValues["Broker"].as< std::string >(), CLIValues["Port"].as< unsigned int >(), - Theron::AMQ::Network::NetworkLayerLabel, + "Solver", Theron::AMQ::Network::SessionLayerLabel, Theron::AMQ::Network::PresentationLayerLabel, - AMQOptions + AMQOptions, MessageProperties ); // -------------------------------------------------------------------------- diff --git a/SolverManager.hpp b/SolverManager.hpp index 69f5165..362c779 100644 --- a/SolverManager.hpp +++ b/SolverManager.hpp @@ -97,11 +97,12 @@ class SolverManager { // There is a topic name used to publish solutions found by the solvers. This // topic is given to the constructor and kept as a constant during the class - // execution. + // execution. The same goes for the topic on which application execution + // contexts will arrive for processing. private: - const Theron::AMQ::TopicName SolutionReceiver; + const Theron::AMQ::TopicName SolutionReceiver, ContextTopic; // -------------------------------------------------------------------------- // Solver management @@ -259,6 +260,7 @@ public: NetworkingActor( Actor::GetAddress().AsString() ), ExecutionControl( Actor::GetAddress().AsString() ), SolutionReceiver( SolutionTopic ), + ContextTopic( ContextPublisherTopic ), SolverPool(), ActiveSolvers(), PassiveSolvers(), Contexts(), ContextExecutionQueue() { @@ -322,6 +324,25 @@ public: } } + // The destructor closes all the open topics if the network is still open + // when the destructor is invoked. + + virtual ~SolverManager( void ) + { + if( HasNetwork() ) + { + Send( Theron::AMQ::NetworkLayer::TopicSubscription( + Theron::AMQ::NetworkLayer::TopicSubscription::Action::ClosePublisher, + SolutionReceiver + ), GetSessionLayerAddress() ); + + Send( Theron::AMQ::NetworkLayer::TopicSubscription( + Theron::AMQ::NetworkLayer::TopicSubscription::Action::CloseSubscription, + ContextTopic + ), GetSessionLayerAddress() ); + } + } + }; } // namespace NebulOuS diff --git a/makefile b/makefile index 7172cf0..f49290a 100644 --- a/makefile +++ b/makefile @@ -132,7 +132,7 @@ SOLVER_OBJECTS = $(addprefix $(OBJECTS_DIR)/, $(SOLVER_SOURCE:.cpp=.o) ) # component's objective file, they can be built by a general rule $(OBJECTS_DIR)/%.o : %.cpp - $(CXX) $(CXXFLAGS) -c $< -o $@ $(INCLUDE_DIRECTORIES) + $(CC) $(CXXFLAGS) -c $< -o $@ $(INCLUDE_DIRECTORIES) #------------------------------------------------------------------------------ # Solver component @@ -143,7 +143,7 @@ $(OBJECTS_DIR)/%.o : %.cpp # the object files or the solver actors. SolverComponent: $(SOLVER_OBJECTS) $(THERON)/Theron++.a - $(CXX) -o SolverComponent $(CXXFLAGS) $(SOLVER_OBJECTS) $(LDFLAGS) + $(CC) -o SolverComponent $(CXXFLAGS) $(SOLVER_OBJECTS) $(LDFLAGS) # There is also a standard target to clean the automatically generated build # files