From 5db3a5e8651457205e0aa132972f9a24b87fac83 Mon Sep 17 00:00:00 2001 From: Geir Horn Date: Tue, 2 Jan 2024 18:41:49 +0100 Subject: [PATCH] Implemented AMPL Solver class and interaction with the other Actors Change-Id: I37bb164b60bf1888b0ce99665486f13552681737 --- .vscode/c_cpp_properties.json | 29 ++++ AMPLSolver.cpp | 243 ++++++++++++++++++++++++++++++++++ AMPLSolver.hpp | 224 +++++++++++++++++++++++++++++++ MetricUpdater.cpp | 45 ++++++- MetricUpdater.hpp | 83 +++--------- SolutionManager.hpp | 2 +- Solver.hpp | 83 +++++++++--- SolverComponent.cpp | 10 ++ 8 files changed, 630 insertions(+), 89 deletions(-) create mode 100644 .vscode/c_cpp_properties.json create mode 100644 AMPLSolver.cpp create mode 100644 SolverComponent.cpp diff --git a/.vscode/c_cpp_properties.json b/.vscode/c_cpp_properties.json new file mode 100644 index 0000000..6b6d476 --- /dev/null +++ b/.vscode/c_cpp_properties.json @@ -0,0 +1,29 @@ +{ + "configurations": [ + { + "name": "Linux", + "includePath": [ + "${default}", + "/home/GHo/Documents/Code/CxxOpts/include", + "/usr/include", + "/home/GHo/Documents/Code/Theron++", + "/home/GHo/Documents/Code/Theron++/Utility", + "/home/GHo/Documents/Code/Theron++/Communication", + "/home/GHo/Documents/Code/Theron++/Communication/AMQ", + "/opt/AMPL/amplapi/include", + "${workspaceFolder}/**" + ], + "defines": [], + "compilerArgs": [ + "--std=c++23", + "-I/opt/AMPL/amplapi/include", + "-I/usr/include" + ], + "cStandard": "c23", + "cppStandard": "c++23", + "intelliSenseMode": "linux-gcc-x64", + "compilerPath": "/usr/bin/g++" + } + ], + "version": 4 +} \ No newline at end of file diff --git a/AMPLSolver.cpp b/AMPLSolver.cpp new file mode 100644 index 0000000..198afe4 --- /dev/null +++ b/AMPLSolver.cpp @@ -0,0 +1,243 @@ +/*============================================================================== +AMPL Solver + +This file provides the implementation of the methods of the AMLP Solver actor +that is instantiated by the Solution Manager and used to obtain solutions for +optimisation problems in the queue managed by the Solution Manager. + +Author and Copyright: Geir Horn, University of Oslo +Contact: Geir.Horn@mn.uio.no +License: MPL2.0 (https://www.mozilla.org/en-US/MPL/2.0/) +==============================================================================*/ + +#include // For file I/O +#include // For formatted errors +#include // Standard exceptions +#include // Error codes + +#include "AMPLSolver.hpp" + +namespace NebulOuS +{ + +// ----------------------------------------------------------------------------- +// Utility function +// ----------------------------------------------------------------------------- +// + +std::string AMPLSolver::SaveFile( const JSON & TheMessage, + const std::source_location & Location ) +{ + if( TheMessage.is_object() ) + { + // Writing the problem file based on the message content that should be + // only a single key-value pair. If the file could not be opened, a run + // time exception is thrown. + + std::string TheFileName + = ProblemFileDirectory / TheMessage.begin().key(); + + std::fstream ProblemFile( TheFileName, std::ios::out ); + + if( ProblemFile.is_open() ) + { + ProblemFile << TheMessage.begin().value(); + ProblemFile.close(); + return TheFileName; + } + else + { + std::ostringstream ErrorMessage; + + ErrorMessage << "[" << Location.file_name() << " at line " + << Location.line() + << "in function " << Location.function_name() <<"] " + << "The AMPL file at " + << TheFileName + << " could not be opened for output!"; + + throw std::system_error( static_cast< int >( std::errc::io_error ), + std::system_category(), ErrorMessage.str() ); + } + } + else + { + std::ostringstream ErrorMessage; + + ErrorMessage << "[" << Location.file_name() << " at line " + << Location.line() + << "in function " << Location.function_name() <<"] " + << "The JSON message is not an object. The received " + << "message is " << std::endl + << TheMessage.dump(2) + << std::endl; + + throw std::system_error( static_cast< int >( std::errc::io_error ), + std::system_category(), ErrorMessage.str() ); + } +} + +// ----------------------------------------------------------------------------- +// Optimisation +// ----------------------------------------------------------------------------- +// +// The first step in solving an optimisation problem is to define the problme +// involving the decision variables, the parameters, and the constraints over +// these entities. The problem is received as an AMQ JSON message where where +// the only key is the file name and the value is the AMPL model file. This file +// is first saved, and if there is no exception thrown form the save file +// function, the filename will be returned and read back into the problem +// definition. + +void AMPLSolver::DefineProblem(const Solver::OptimisationProblem & TheProblem, + const Address TheOracle) +{ + ProblemDefinition.read( SaveFile( TheProblem ) ); +} + +// The data file(s) corresponding to the current optimisation problem will be +// sent in the same way and separately file by file. The logic is the same as +// the Define Problem message handler: The save file is used to store the +// received file, which is then loaded as the data problem. + +void AMPLSolver::DataFileUpdate( const DataFileMessage & TheDataFile, + const Address TheOracle ) +{ + ProblemDefinition.readData( SaveFile( TheDataFile ) ); +} + +// The solver function is more involved as must set the metric values received +// in the application execution context message as parameter values for the +// optimisation problem, then solve for the optimal objective value, and finally +// report the solution back to the entity requesting the solution, typically an +// instance of the Solution Manager actor. + +void AMPLSolver::SolveProblem( + const ApplicationExecutionContext & TheContext, const Address TheRequester ) +{ + // Setting the metric values one by one. In the setting of NebulOuS a metric + // is either a numerical value or a string. Vectors are currently not + // supported as values. + + for( const auto & [ TheName, MetricValue ] : + Solver::MetricValueType( TheContext.at( Solver::ExecutionContext ) ) ) + { + ampl::Parameter TheParameter = ProblemDefinition.getParameter( TheName ); + + switch ( MetricValue.type() ) + { + case JSON::value_t::number_integer : + case JSON::value_t::number_unsigned : + case JSON::value_t::boolean : + TheParameter.set( MetricValue.get< long >() ); + break; + case JSON::value_t::number_float : + TheParameter.set( MetricValue.get< double >() ); + break; + case JSON::value_t::string : + TheParameter.set( MetricValue.get< std::string >() ); + break; + default: + { + std::source_location Location = std::source_location::current(); + std::ostringstream ErrorMessage; + + ErrorMessage << "[" << Location.file_name() << " at line " + << Location.line() + << "in function " << Location.function_name() <<"] " + << "The JSON value " << MetricValue + << " has JSON type " + << static_cast< int >( MetricValue.type() ) + << " which is not supported" + << std::endl; + + throw std::invalid_argument( ErrorMessage.str() ); + } + break; + } + } + + // Setting the given objective as the active objective and all other + // objective functions as 'dropped'. Note that this is experimental code + // as the multi-objective possibilities in AMPL are not well documented. + + for( auto TheObjective : ProblemDefinition.getObjectives() ) + if( TheObjective.name() == TheContext.at( Solver::ObjectiveFunctionLabel ) ) + TheObjective.restore(); + else + TheObjective.drop(); + + // The problem can then be solved. + + Optimize(); + + // Once the problem has been optimised, the objective values can be + // be obtained from the objectives + + Solver::Solution::ObjectiveValuesType ObjectiveValues; + + for( auto TheObjective : ProblemDefinition.getObjectives() ) + ObjectiveValues.emplace( TheObjective.name(), TheObjective.value() ); + + // The variable values are obtained in the same way + + Solver::Solution::VariableValuesType VariableValues; + + for( auto Variable : ProblemDefinition.getVariables() ) + VariableValues.emplace( Variable.name(), Variable.value() ); + + // The found solution can then be returned to the requesting actor or topic + + Send( Solver::Solution( + TheContext.at( Solver::ContextIdentifier ), + TheContext.at( Solver::TimeStamp ).get< Solver::TimePointType >(), + TheContext.at( Solver::ObjectiveFunctionLabel ), + ObjectiveValues, VariableValues + ), TheRequester ); +} + +// ----------------------------------------------------------------------------- +// Constructor and destructor +// ----------------------------------------------------------------------------- +// +// The constructor initialises the base classes and sets the AMPL installation +// directory and the path for the problem related files. The message handlers +// for the data file updates must be registered since the inherited handlers +// for the application execution context and the problem definition were already +// defined by the generic solver. Note that no publisher is defined for the +// solution since the solution message is just returned to the requester actor, +// which is assumed to be a Solution Manager on the local endpoint because +// multiple solvers may run in parallel. The external publication of solutions +// will be made by the Solution Manager for all solvers on this endpoint. + +AMPLSolver::AMPLSolver( const std::string & TheActorName, + const ampl::Environment & InstallationDirectory, + const std::filesystem::path & ProblemPath ) +: Actor( TheActorName ), + StandardFallbackHandler( Actor::GetAddress().AsString() ), + NetworkingActor( Actor::GetAddress().AsString() ), + Solver( Actor::GetAddress().AsString() ), + ProblemFileDirectory( ProblemPath ), + ProblemDefinition( InstallationDirectory ) +{ + RegisterHandler( this, &LSolver::DataFileUpdate ); + + Send( Theron::AMQ::NetworkLayer::TopicSubscription( + Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription, + Theron::AMQ::TopicName( DataFileTopic ) + ), GetSessionLayerAddress() ); +} + +// In case the network is still running when the actor is closing, the data file +// subscription should be closed. + +AMPLSolver::~AMPLSolver() +{ + if( HasNetwork() ) + Send( Theron::AMQ::NetworkLayer::TopicSubscription( + Theron::AMQ::NetworkLayer::TopicSubscription::Action::CloseSubscription, + Theron::AMQ::TopicName( DataFileTopic ) + ), GetSessionLayerAddress() ); +} + +} // namespace NebulOuS diff --git a/AMPLSolver.hpp b/AMPLSolver.hpp index 5ae9653..cc14f2b 100644 --- a/AMPLSolver.hpp +++ b/AMPLSolver.hpp @@ -26,8 +26,232 @@ License: MPL2.0 (https://www.mozilla.org/en-US/MPL/2.0/) #ifndef NEBULOUS_AMPL_SOLVER #define NEBULOUS_AMPL_SOLVER +// Standard headers + +#include // Constant strings +#include // Standard strings +#include // To store names +#include // For problem files +#include // For better errors + +// Other packages + +#include // JSON object definition +using JSON = nlohmann::json; // Short form name space + +// Theron++ files + +#include "Actor.hpp" // Actor base class +#include "Utility/StandardFallbackHandler.hpp" // Exception unhanded messages +#include "Communication/NetworkingActor.hpp" // Actor to receive messages +#include "Communication/PolymorphicMessage.hpp" // The network message type + +// AMQ communication files + +#include "Communication/AMQ/AMQjson.hpp" // For JSON metric messages +#include "Communication/AMQ/AMQEndpoint.hpp" // AMQ endpoint +#include "Communication/AMQ/AMQSessionLayer.hpp" // For topic subscriptions + +// NebulOuS files + +#include "Solver.hpp" // The generic solver base + +// AMPL Application Programmer Interface (API) + +#include "ampl/ampl.h" + namespace NebulOuS { +/*============================================================================== + + AMPL Solver actor + +==============================================================================*/ +// +// The AMPL solver is an Actor and a Solver. It provides handlers for messages +// defining the problem file and data file(s), and responds to an application +// execution context message by optimising the saved problem for the given +// context parameters. + +class AMPLSolver +: virtual public Theron::Actor, + virtual public Theron::StandardFallbackHandler, + virtual public Theron::NetworkingActor< + typename Theron::AMQ::Message::PayloadType >, + virtual public Solver +{ + // -------------------------------------------------------------------------- + // Utility methods + // -------------------------------------------------------------------------- + // + // Since both the optimisation problem file and the data file(s) will be sent + // as JSON messages with a single key-value pair where the key is the filename + // and the value is the file content, there is a common dfinition of the + // problem file directory and a function to read the file. The function will + // throw errors if the JSON message given is not an object, or of there are + // issues opening the file name given. If the file could be successfully + // saved, the functino will close the file and return the file name for + // further processing. + +private: + + const std::filesystem::path ProblemFileDirectory; + + std::string SaveFile( const JSON & TheMessage, + const std::source_location & Location + = std::source_location::current() ); + + // -------------------------------------------------------------------------- + // The optimisation problem + // -------------------------------------------------------------------------- + // + // The problem is received as an AMPL file in a message. However, the AMPL + // interface allows the loading of problem and data files on an existing + // AMPL object, and the AMPL API object is therefore reused when a new + // problem file is received. + + ampl::AMPL ProblemDefinition; + + // The problem is loaded by the handler defining the problem. This receives + // the standard optimisation problem definition. Essentially, this message + // contains one tag, the name of the AMPL file and the body is a big string + // containing the file content. + + virtual void DefineProblem( const Solver::OptimisationProblem & TheProblem, + const Address TheOracle ) override; + + // The topic on which the problem file is posted is currently defined as a + // constant string + + static constexpr std::string_view AMPLProblemTopic + = "AMPL::OptimisationProblem"; + + // -------------------------------------------------------------------------- + // Data file updates + // -------------------------------------------------------------------------- + // + // The data files are assumed to be published on a dedicated topic for the + // optimiser + +public: + + static constexpr std::string_view DataFileTopic = "AMPL::DataFileUpdates"; + + // The message defining the data file is a JSON topic message with the same + // structure as the optimisation problem message: It contains only one + // attribute, which is the name of the data file, and the data file + // content as the value. This content is just saved to the problem file + // directory before it is read back to the AMPL problem definition. + + class DataFileMessage + : public Theron::AMQ::JSONTopicMessage + { + public: + + DataFileMessage( const std::string & TheDataFileName, + const JSON & DataFileContent ) + : JSONTopicMessage( std::string( DataFileTopic ), + { TheDataFileName, DataFileContent } ) + {} + + DataFileMessage( const DataFileMessage & Other ) + : JSONTopicMessage( Other ) + {} + + DataFileMessage() + : JSONTopicMessage( std::string( DataFileTopic ) ) + {} + + virtual ~DataFileMessage() = default; + }; + + // The handler for this message saves the received file and uploads the file + // to the AMPL problem definition. + +private: + + void DataFileUpdate( const DataFileMessage & TheDataFile, + const Address TheOracle ); + + // -------------------------------------------------------------------------- + // Solving the problem + // -------------------------------------------------------------------------- + // + // The real action happens when an Application Execution Context message is + // received. This defines the values of the independent metrics used in the + // objective functions and in the problem constraints, and one objective + // function name indicating which objective to optimise. The actual solution + // is provided by a small helper function. The reason is that this may + // use the AMPL problem but not the solver, and as such other solvers can + // be build on this class. The standard definition just asks AMPL to call + // the solver. + +protected: + + virtual void Optimize( void ) + { ProblemDefinition.solve(); } + + // The handler for the application execution context will first set all the + // parameter values for the contex metrics to the received values, and then + // optimise the problem. When a solution is found it will be sent back to + // the Agent providing the application execution context as a solution value + // message. The message format is defined in the Solver base class. + + virtual void SolveProblem( const ApplicationExecutionContext & TheContext, + const Address TheRequester ) override; + + // -------------------------------------------------------------------------- + // Constructor and destructor + // -------------------------------------------------------------------------- + // + // The AMPL solver requires the name of the actor, an AMPL environment class + // pointing to the AMPL installation directory. If this is given as empty, + // then the path is taken from the corresponding environment variables. There + // is also a path to the directory where the optimisation problem file will + // be stored together with any required data files. + // + // Note that the constructors are declared as explicit because in theory + // a string could be converted to an Environment class or a Path and so to + // be able to distinquish what a string means, the actual classes must be + // given constructed on the content string. + +public: + + explicit AMPLSolver( const std::string & TheActorName, + const ampl::Environment & InstallationDirectory, + const std::filesystem::path & ProblemPath ); + + // If the path to the problem directory is omitted, it will be initialised to + // a temporary directory. + + explicit AMPLSolver( const std::string & TheActorName, + const ampl::Environment & InstallationDirectory ) + : AMPLSolver( TheActorName, InstallationDirectory, + std::filesystem::temp_directory_path() ) + {} + + // If the AMPL installation environment is omitted, the installation directory + // will be taken form the environment variables. + + explicit AMPLSolver( const std::string & TheActorName, + const std::filesystem::path & ProblemPath ) + : AMPLSolver( TheActorName, ampl::Environment(), ProblemPath ) + {} + + // Finally, it is just the standard constructor taking only the name of the + // actor + + AMPLSolver( const std::string & TheActorName ) + : AMPLSolver( TheActorName, ampl::Environment(), + std::filesystem::temp_directory_path() ) + {} + + // The solver will just close the open connections for listening to data file + // updates since the subscriptions for the problem definition will be closed + // by the generic solver + + virtual ~AMPLSolver(); +}; } // namespace NebulOuS #endif // NEBULOUS_AMPL_SOLVER \ No newline at end of file diff --git a/MetricUpdater.cpp b/MetricUpdater.cpp index 29fbb8d..560acc3 100644 --- a/MetricUpdater.cpp +++ b/MetricUpdater.cpp @@ -101,6 +101,10 @@ void MetricUpdater::UpdateMetricValue( } } +// -------------------------------------------------------------------------- +// SLO Violation Events +// -------------------------------------------------------------------------- +// // When an SLO Violation is predicted a message is received from the SLO // violation detector and this will trigger the definition of a new // application execution context and a request to the Solution Manager to @@ -150,6 +154,10 @@ void MetricUpdater::SLOViolationHandler( ), TheSolutionManger ); } +// -------------------------------------------------------------------------- +// Constructor and destructor +// -------------------------------------------------------------------------- +// // The constructor initialises the base classes and sets the validity time // to zero so that it will be initialised by the first metric values received. // The message handlers are registered, and the the updater will then subscribe @@ -170,13 +178,42 @@ MetricUpdater::MetricUpdater( const std::string UpdaterName, Send( Theron::AMQ::NetworkLayer::TopicSubscription( Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription, - std::string( MetricSubscriptions ) ), - Theron::Network::GetAddress( Theron::Network::Layer::Session ) ); + std::string( NebulOuS::MetricSubscriptions ) ), + GetSessionLayerAddress() ); Send( Theron::AMQ::NetworkLayer::TopicSubscription( Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription, - std::string( SLOViolationTopic ) ), - Theron::Network::GetAddress( Theron::Network::Layer::Session ) ); + std::string( NebulOuS::SLOViolationTopic ) ), + GetSessionLayerAddress() ); +} + +// The destructor is closing the established subscription if the network is +// still running. If this is called when the application is closing the network +// connection should be stopped, and in that case all subscriptions will be +// automatically cancelled. + +MetricUpdater::~MetricUpdater() +{ + if( HasNetwork() ) + { + Send( Theron::AMQ::NetworkLayer::TopicSubscription( + Theron::AMQ::NetworkLayer::TopicSubscription::Action::CloseSubscription, + std::string( NebulOuS::MetricSubscriptions ) ), + GetSessionLayerAddress() ); + + Send( Theron::AMQ::NetworkLayer::TopicSubscription( + Theron::AMQ::NetworkLayer::TopicSubscription::Action::CloseSubscription, + std::string( NebulOuS::SLOViolationTopic ) ), + GetSessionLayerAddress() ); + + std::ranges::for_each( std::views::keys( MetricValues ), + [this]( const Theron::AMQ::TopicName & TheMetricTopic ){ + Send( Theron::AMQ::NetworkLayer::TopicSubscription( + Theron::AMQ::NetworkLayer::TopicSubscription::Action::CloseSubscription, + std::string( MetricValueRootString ) + TheMetricTopic ), + GetSessionLayerAddress() ); + }); + } } } // End name space NebulOuS \ No newline at end of file diff --git a/MetricUpdater.hpp b/MetricUpdater.hpp index 18cd2a2..e6f0b49 100644 --- a/MetricUpdater.hpp +++ b/MetricUpdater.hpp @@ -95,7 +95,7 @@ constexpr std::string_view MetricSubscriptions = "ApplicationContext"; // the Wiki-page [1] constexpr std::string_view MetricValueRootString - = "eu.nebulouscloud.monitoring.predicted"; + = "eu.nebulouscloud.monitoring.predicted."; // The SLO violation detector will publish a message when a reconfiguration is // deamed necessary for a future time point called "Event type V" on the wiki @@ -201,57 +201,6 @@ private: const Address TheSolutionManger; - // -------------------------------------------------------------------------- - // JSON messages: Type by topic - // -------------------------------------------------------------------------- - // - // The JSON message initialiser assumes that the content_type field of - // the message contains an unique label for the JSON message type to - // cover the situation where an actor may subscribe to multiple different - // messages all encoded as JSON messages. However, for this actor the type - // of the message will be decided by the topic on which the message is - // received. It is therefore necessary to set the message content type equal - // to the AMQ sender prior to decoding the AMQ message to the correct JSON - // object. - // - // The issue with the metric subscriptions is that the same type of message - // can come from any of the topics publishing metric values, and as such any - // topic name not being from the metric subscription command topic or the - // SLO Violation Event topic will be understood as a metric value update - // event The initialiser will check if the sender (topic) starts with the - // message identifier. This will allow the wildcard matching for metric - // values as well as an exact match for topic whose reply to address - // equals the message identifer. - - class TypeByTopic - : public Theron::AMQ::JSONMessage - { - protected: - - virtual bool - Initialize( const ProtocolPayload & ThePayload ) noexcept override - { - if( ThePayload->reply_to().starts_with( GetMessageIdentifier() ) ) - { - ThePayload->content_type( GetMessageIdentifier() ); - return JSONMessage::Initialize( ThePayload ); - } - else return false; - } - - public: - - TypeByTopic( const std::string & TopicIdentifier ) - : JSONMessage( TopicIdentifier ) - {} - - TypeByTopic( const TypeByTopic & Other ) - : JSONMessage( Other.GetMessageIdentifier(), Other ) - {} - - virtual ~TypeByTopic() = default; - }; - // -------------------------------------------------------------------------- // Subscribing to metric prediction values // -------------------------------------------------------------------------- @@ -263,16 +212,16 @@ private: // the value publisher. class MetricTopic - : public TypeByTopic + : public Theron::AMQ::JSONTopicMessage { public: MetricTopic( void ) - : TypeByTopic( std::string( MetricSubscriptions ) ) + : JSONTopicMessage( std::string( MetricSubscriptions ) ) {} MetricTopic( const MetricTopic & Other ) - : TypeByTopic( Other ) + : JSONTopicMessage( Other ) {} virtual ~MetricTopic() = default; @@ -296,16 +245,16 @@ private: // with this string. class MetricValueUpdate - : public TypeByTopic + : public Theron::AMQ::JSONWildcardMessage { public: MetricValueUpdate( void ) - : TypeByTopic( std::string( MetricValueRootString ) ) + : JSONWildcardMessage( std::string( MetricValueRootString ) ) {} MetricValueUpdate( const MetricValueUpdate & Other ) - : TypeByTopic( Other ) + : JSONWildcardMessage( Other ) {} virtual ~MetricValueUpdate() = default; @@ -325,18 +274,24 @@ private: // The SLO Violation detector publishes an event to indicate that at least // one of the constraints for the application deployment will be violated in // the predicted future, and that the search for a new solution should start. + // This message is caught by the Optimisation Controller and republished + // adding a unique event identifier enabling the Optimisation Controller to + // match the produced solution with the event and deploy the right + // configuration.The message must also contain the name of the objective + // function to maximise. This name must match the name in the optimisation + // model sent to the solver. class SLOViolation - : public TypeByTopic + : public Theron::AMQ::JSONTopicMessage { public: SLOViolation( void ) - : TypeByTopic( std::string( SLOViolationTopic ) ) + : JSONTopicMessage( std::string( SLOViolationTopic ) ) {} SLOViolation( const SLOViolation & Other ) - : TypeByTopic( Other ) + : JSONTopicMessage( Other ) {} virtual ~SLOViolation() = default; @@ -362,9 +317,11 @@ public: MetricUpdater( const std::string UpdaterName, const Address ManagerForSolutions ); - // The destructor is just the default destructor + // The destructor will unsubscribe from the control channels for the + // message defining metrics, and the channel for receiving SLO violation + // events. - virtual ~MetricUpdater() = default; + virtual ~MetricUpdater(); }; // Class Metric Updater } // Name space NebulOuS diff --git a/SolutionManager.hpp b/SolutionManager.hpp index fb07ee3..be3a49c 100644 --- a/SolutionManager.hpp +++ b/SolutionManager.hpp @@ -95,7 +95,7 @@ class SolverManager private: - const Address SolutionReceiver; + const Theron::AMQ::TopicName SolutionReceiver; // -------------------------------------------------------------------------- // Solver management diff --git a/Solver.hpp b/Solver.hpp index f52f8c0..1d02eb7 100644 --- a/Solver.hpp +++ b/Solver.hpp @@ -49,10 +49,14 @@ using JSON = nlohmann::json; // Short form name space #include "Actor.hpp" // Actor base class #include "Utility/StandardFallbackHandler.hpp" // Exception unhanded messages +#include "Communication/PolymorphicMessage.hpp" // The network message type +#include "Communication/NetworkingActor.hpp" // External communications // AMQ communication headers #include "Communication/AMQ/AMQjson.hpp" // For JSON metric messages +#include "Communication/AMQ/AMQEndpoint.hpp" // Enabling AMQ communication +#include "Communication/AMQ/AMQSessionLayer.hpp" // For topic subscriptions namespace NebulOuS { @@ -64,7 +68,9 @@ namespace NebulOuS class Solver : virtual public Theron::Actor, - virtual public Theron::StandardFallbackHandler + virtual public Theron::StandardFallbackHandler, + virtual public Theron::NetworkingActor< + typename Theron::AMQ::Message::PayloadType > { public: @@ -127,10 +133,11 @@ public: // The message is a simple JSON object where the various fields of the // message struct are set by the constructor to ensure that all fields are - // given when the message is constructed. + // given when the message is constructed. The message is a JSON Topic Message + // received on the topic with the same name as the message identifier. class ApplicationExecutionContext - : public Theron::AMQ::JSONMessage + : public Theron::AMQ::JSONTopicMessage { public: @@ -141,7 +148,7 @@ public: const TimePointType MicroSecondTimePoint, const std::string ObjectiveFunctionID, const MetricValueType & TheContext ) - : JSONMessage( std::string( MessageIdentifier ), + : JSONTopicMessage( std::string( MessageIdentifier ), { { std::string( ContextIdentifier ), TheIdentifier }, { std::string( TimeStamp ), MicroSecondTimePoint }, { std::string( ObjectiveFunctionLabel ), ObjectiveFunctionID }, @@ -149,10 +156,13 @@ public: ) {} ApplicationExecutionContext( const ApplicationExecutionContext & Other ) - : JSONMessage( Other ) + : JSONTopicMessage( Other ) + {} + + ApplicationExecutionContext() + : JSONTopicMessage( std::string( MessageIdentifier ) ) {} - ApplicationExecutionContext() = delete; virtual ~ApplicationExecutionContext() = default; }; @@ -186,30 +196,36 @@ protected: public: - using ObjectiveValuesType = MetricValueType; - static constexpr std::string_view ObjectiveValues = "ObjectiveValues"; - class Solution - : public Theron::AMQ::JSONMessage + : public Theron::AMQ::JSONTopicMessage { public: + using ObjectiveValuesType = MetricValueType; + using VariableValuesType = MetricValueType; + + static constexpr std::string_view ObjectiveValues = "ObjectiveValues"; + static constexpr std::string_view VariableValues = "VariableValues"; + static constexpr std::string_view MessageIdentifier = "Solver::Solution"; Solution( const ContextIdentifierType & TheIdentifier, const TimePointType MicroSecondTimePoint, const std::string ObjectiveFunctionID, const ObjectiveValuesType & TheObjectiveValues, - const MetricValueType & TheContext ) - : JSONMessage( std::string( MessageIdentifier ) , + const VariableValuesType & TheVariables ) + : JSONTopicMessage( std::string( MessageIdentifier ) , { { std::string( ContextIdentifier ), TheIdentifier }, { std::string( TimeStamp ), MicroSecondTimePoint }, { std::string( ObjectiveFunctionLabel ), ObjectiveFunctionID }, { std::string( ObjectiveValues ) , TheObjectiveValues }, - { std::string( ExecutionContext ), TheContext } } ) + { std::string( VariableValues ), TheVariables } } ) {} - Solution() = delete; + Solution() + : JSONTopicMessage( std::string( MessageIdentifier ) ) + {} + virtual ~Solution() = default; }; @@ -224,7 +240,7 @@ public: // to implement this in a way appropriate for the algorithm. class OptimisationProblem - : public Theron::AMQ::JSONMessage + : public Theron::AMQ::JSONTopicMessage { public: @@ -232,10 +248,13 @@ public: std::string_view MessageIdentifier = "Solver::OptimisationProblem"; OptimisationProblem( const JSON & TheProblem ) - : JSONMessage( std::string( MessageIdentifier ), TheProblem ) + : JSONTopicMessage( std::string( MessageIdentifier ), TheProblem ) + {} + + OptimisationProblem() + : JSONTopicMessage( std::string( MessageIdentifier ) ) {} - OptimisationProblem() = delete; virtual ~OptimisationProblem() = default; }; @@ -249,21 +268,43 @@ public: // Constructor and destructor // -------------------------------------------------------------------------- // - // The constructor defines the message handlers so that the derived soler + // The constructor defines the message handlers so that the derived solver // classes will not need to deal with the Actor specific details, and to // ensure that the handlers are called when the Actor receives the various - // messages. The constructor requires an actor name as the only parameter. + // messages. It should be noted that the problem definition can arrive from + // a remote actor on a topic corresponding to the message indentifier name. + // However, no subscription will be made for application execution contexts + // since these should be sorted and sent in order by the Solution Manager + // actor, and external communication should go throug the Solution Manager. + // + // The constructor requires an actor name as the only parameter, and the + // destructor unsubscribes from the topics previously subscribed to by + // the constuctor. Solver( const std::string & TheSolverName ) : Actor( TheSolverName ), - StandardFallbackHandler( Actor::GetAddress().AsString() ) + StandardFallbackHandler( Actor::GetAddress().AsString() ), + NetworkingActor( Actor::GetAddress().AsString() ) { RegisterHandler( this, &Solver::SolveProblem ); RegisterHandler( this, &Solver::DefineProblem ); + + Send( Theron::AMQ::NetworkLayer::TopicSubscription( + Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription, + Theron::AMQ::TopicName( OptimisationProblem::MessageIdentifier ) + ), GetSessionLayerAddress() ); } Solver() = delete; - virtual ~Solver() = default; + + virtual ~Solver() + { + if( HasNetwork() ) + Send( Theron::AMQ::NetworkLayer::TopicSubscription( + Theron::AMQ::NetworkLayer::TopicSubscription::Action::CloseSubscription, + Theron::AMQ::TopicName( OptimisationProblem::MessageIdentifier ) + ), GetSessionLayerAddress() ); + } }; /*============================================================================== diff --git a/SolverComponent.cpp b/SolverComponent.cpp new file mode 100644 index 0000000..564770d --- /dev/null +++ b/SolverComponent.cpp @@ -0,0 +1,10 @@ +/*============================================================================== +Solver Component + +This is the main file for the Solver Component executable including the parsing +of command line arguments and the AMQ network interface. + +Author and Copyright: Geir Horn, University of Oslo +Contact: Geir.Horn@mn.uio.no +License: MPL2.0 (https://www.mozilla.org/en-US/MPL/2.0/) +==============================================================================*/