Support for solver component termination message completing the solver component implementation

Change-Id: I0c0fd1e1f6962e4d840789d3477169373b75c971
This commit is contained in:
Geir Horn 2024-01-09 22:19:50 +01:00
parent 8cd1958fbc
commit 3105b79e14
6 changed files with 544 additions and 132 deletions

80
ExecutionControl.cpp Normal file
View File

@ -0,0 +1,80 @@
/*==============================================================================
Execution control
The source file implements the static variables and functions of the Execution
control actor.
Author and Copyright: Geir Horn, University of Oslo
Contact: Geir.Horn@mn.uio.no
License: MPL2.0 (https://www.mozilla.org/en-US/MPL/2.0/)
==============================================================================*/
#include "Actor.hpp"
#include "Communication/NetworkEndpoint.hpp"
#include "ExecutionControl.hpp"
namespace NebulOuS
{
// -----------------------------------------------------------------------------
// Static variables
// -----------------------------------------------------------------------------
bool ExecutionControl::Running = true;
std::mutex ExecutionControl::TerminationLock;
std::condition_variable ExecutionControl::ReadyToTerminate;
// -----------------------------------------------------------------------------
// Waiting function
// -----------------------------------------------------------------------------
//
// The function used to wait for the termination message simply waits on the
// condition variable until it is signalled by the message handler. As there
// could be spurious wake-ups it is necessary to check if the actor is still
// running when the condition variable is signalled, and if so the calling
// thread will just block again in another wait.
void ExecutionControl::WaitForTermination( void )
{
while( Running )
{
std::unique_lock< std::mutex > Lock( TerminationLock );
ReadyToTerminate.wait( Lock );
}
}
// -----------------------------------------------------------------------------
// Stop message handler
// -----------------------------------------------------------------------------
//
// The stop message handler will first send the network stop message to the
// session layer requesting it to coordinate the network shutdown and close all
// externally communicating actors.
void ExecutionControl::StopMessageHandler( const StopMessage & Command,
const Address Sender )
{
std::lock_guard< std::mutex > Lock( TerminationLock );
Send( Theron::Network::ShutDown(),
Theron::Network::GetAddress( Theron::Network::Layer::Session ) );
Running = false;
ReadyToTerminate.notify_all();
}
// -----------------------------------------------------------------------------
// Constructor
// -----------------------------------------------------------------------------
//
// The only action taken by the constructor is to register the handler for the
// stop message.
ExecutionControl::ExecutionControl( const std::string & TheActorName )
: Actor( TheActorName ),
StandardFallbackHandler( Actor::GetAddress().AsString() )
{
RegisterHandler( this, &ExecutionControl::StopMessageHandler );
}
} // namespace NebulOuS

108
ExecutionControl.hpp Normal file
View File

@ -0,0 +1,108 @@
/*==============================================================================
Execution control
The Solver Component should run as long as the application being optimised is
running. This requires an external message to the Solver Component about when
the Solver Component should shut down, and a way to stop other threads from
progressing until the shut down message has been processed.
The following Actor may run on its own, but it may also be included with
another Actor to avoid running a separate thread just waiting for a single shut
down message. This Actor will therefore be base class for the Solver Manager
actor, but the implementation cannot be done there since the Solver Manager is
a templated actor, and knowlege about the template parameter would be necessary
to call the function to wait for termination.
The threads calling the function to wait for termination will block until the
required message is received.
Author and Copyright: Geir Horn, University of Oslo
Contact: Geir.Horn@mn.uio.no
License: MPL2.0 (https://www.mozilla.org/en-US/MPL/2.0/)
==============================================================================*/
#ifndef NEBULOUS_EXECUTION_CONTROL
#define NEBULOUS_EXECUTION_CONTROL
// Standard headers
#include <condition_variable> // Execution stop management
#include <mutex> // Lock the condtion variable
// Theron++ headers
#include "Actor.hpp" // Actor base class
#include "Utility/StandardFallbackHandler.hpp" // Exception unhanded messages
namespace NebulOuS
{
/*==============================================================================
Execution control
==============================================================================*/
class ExecutionControl
: virtual public Theron::Actor,
virtual public Theron::StandardFallbackHandler
{
// The mechanism used for blocking other threads will be to make them wait
// for a condition variable until the message handler for the exit message
// will trigger and notifiy this variable.
private:
static bool Running;
static std::mutex TerminationLock;
static std::condition_variable ReadyToTerminate;
public:
// The function used to wait for the termination message simply waits on the
// condition variable until it is signalled by the message handler. As there
// could be spurious wake-ups it is necessary to check if the actor is still
// running when the condition variable is signalled, and if so the calling
// thread will just block again in another wait.
//
// Note that returning from this function does not imply that all actors have
// closed and finished processing. One should wait for the local actor system
// to close before deleting the local actors, see the normal function
// Actor::WaitForGlobalTermination()
static void WaitForTermination( void );
// The stop message has not yet been defined and it is defined as an empty
// class here as a named placeholder for a better future definition.
class StopMessage
{
public:
StopMessage() = default;
StopMessage( const StopMessage & Other ) = default;
~StopMessage() = default;
};
protected:
// The message handler will change the value of the flag indicating that the
// Actor is running, and signalling the condition variable to indicate that
// the termination has started.
virtual void StopMessageHandler( const StopMessage & Command,
const Address Sender );
// The constructor is simply taking the name of the actor as parameter and
// initialises the base classes.
public:
ExecutionControl( const std::string & TheActorName );
ExecutionControl() = delete;
virtual ~ExecutionControl() = default;
};
} // namespace NebulOuS
#endif // NEBULOUS_EXECUTION_CONTROL

View File

@ -151,7 +151,7 @@ void MetricUpdater::SLOViolationHandler(
SeverityMessage[ NebulOuS::TimePoint ].get< Solver::TimePointType >(), SeverityMessage[ NebulOuS::TimePoint ].get< Solver::TimePointType >(),
SeverityMessage[ NebulOuS::ObjectiveFunctionName ], SeverityMessage[ NebulOuS::ObjectiveFunctionName ],
TheApplicationExecutionContext TheApplicationExecutionContext
), TheSolutionManger ); ), TheSolverManager );
} }
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
@ -166,11 +166,11 @@ void MetricUpdater::SLOViolationHandler(
// to for their values, and the second for receiving the SLO violation message. // to for their values, and the second for receiving the SLO violation message.
MetricUpdater::MetricUpdater( const std::string UpdaterName, MetricUpdater::MetricUpdater( const std::string UpdaterName,
const Address ManagerForSolutions ) const Address ManagerOfSolvers )
: Actor( UpdaterName ), : Actor( UpdaterName ),
StandardFallbackHandler( Actor::GetAddress().AsString() ), StandardFallbackHandler( Actor::GetAddress().AsString() ),
NetworkingActor( Actor::GetAddress().AsString() ), NetworkingActor( Actor::GetAddress().AsString() ),
MetricValues(), ValidityTime(0), TheSolutionManger( ManagerForSolutions ) MetricValues(), ValidityTime(0), TheSolverManager( ManagerOfSolvers )
{ {
RegisterHandler( this, &MetricUpdater::AddMetricSubscription ); RegisterHandler( this, &MetricUpdater::AddMetricSubscription );
RegisterHandler( this, &MetricUpdater::UpdateMetricValue ); RegisterHandler( this, &MetricUpdater::UpdateMetricValue );

View File

@ -196,10 +196,10 @@ private:
// values should be sent as an application execution context (message) to the // values should be sent as an application execution context (message) to the
// Solution Manager actor that will invoke a solver to find the optimal // Solution Manager actor that will invoke a solver to find the optimal
// configuration for this configuration. The Metric Updater must therefore // configuration for this configuration. The Metric Updater must therefore
// know the address of the Solution Manager, and this must be passed to // know the address of the Soler Manager, and this must be passed to
// the constructor. // the constructor.
const Address TheSolutionManger; const Address TheSolverManager;
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
// Subscribing to metric prediction values // Subscribing to metric prediction values
@ -315,7 +315,7 @@ private:
public: public:
MetricUpdater( const std::string UpdaterName, MetricUpdater( const std::string UpdaterName,
const Address ManagerForSolutions ); const Address ManagerOfSolvers );
// The destructor will unsubscribe from the control channels for the // The destructor will unsubscribe from the control channels for the
// message defining metrics, and the channel for receiving SLO violation // message defining metrics, and the channel for receiving SLO violation

View File

@ -2,9 +2,228 @@
Solver Component Solver Component
This is the main file for the Solver Component executable including the parsing This is the main file for the Solver Component executable including the parsing
of command line arguments and the AMQ network interface. of command line arguments and the AMQ network interface. It first starts the
AMQ interface actors of the Network Endpoint, then creates the actors of the
solver component: The Metric Updater and the Solution Manager, which in turn
will start the solver actor(s). All actors are executing on proper operating
system threads, and they are scheduled for execution whenever they have a
pending message.
The command line arguments that can be givne to the Solver Component are
-A or --AMPLDir <installation directory> for the AMPL model interpreter
-B or --broker <URL> for the location of the AMQ broker
-E or --endpoint <name> The endpoint name
-M ir --ModelDir <directory> for model and data files
-N or --name The AMQ identity of the solver (see below)
-P or --port <n> the port to use on the AMQ broker URL
-U or --user <user> the user to authenticate for the AMQ broker
-Pw or --password <password> the AMQ broker password for the user
-? or --Help prints a help message for the options
Default values:
-A taken from the standard AMPL environment variables if omitted
-B localhost
-E <no default - must be given>
-M <temporary directory created by the OS>
-N "NebulOuS::Solver"
-P 5672
-U admin
-Pw admin
A note on the mandatory endpoint name defining the extension used for the
solver component when connecting to the AMQ server. Typically the connection
will be established as "name@endpoint" and so if there are several
solver components running, the endpoint is the only way for the AMQ solvers to
distinguish the different solver component subscriptions.
Author and Copyright: Geir Horn, University of Oslo Author and Copyright: Geir Horn, University of Oslo
Contact: Geir.Horn@mn.uio.no Contact: Geir.Horn@mn.uio.no
License: MPL2.0 (https://www.mozilla.org/en-US/MPL/2.0/) License: MPL2.0 (https://www.mozilla.org/en-US/MPL/2.0/)
==============================================================================*/ ==============================================================================*/
// Standard headers
#include <string> // For standard strings
// #include <memory> // For smart pointers
#include <source_location> // Making informative error messages
#include <sstream> // To format error messages
#include <stdexcept> // standard exceptions
#include <filesystem> // Access to the file system
// #include <initializer_list> // To unpack variable arguments
// #include <concepts> // To constrain types
// #include <vector> // To store subscribed topics
// #include <thread> // To sleep while waiting for termination
// #include <chrono> // To have a concept of fime
// Theron++ headers
#include "Actor.hpp"
#include "Utility/StandardFallbackHandler.hpp"
#include "Utility/ConsolePrint.hpp"
#include "Communication/PolymorphicMessage.hpp"
#include "Communication/NetworkingActor.hpp"
// AMQ protocol related headers
#include "proton/connection_options.hpp" // Options for the Broker
#include "Communication/AMQ/AMQMessage.hpp" // The AMQP messages
#include "Communication/AMQ/AMQEndpoint.hpp" // The AMP endpoint
#include "Communication/AMQ/AMQjson.hpp" // Transparent JSON-AMQP
// The cxxopts command line options parser
#include "cxxopts.hpp"
// AMPL Application Programmer Interface (API)
#include "ampl/ampl.h"
// NegulOuS related headers
#include "MetricUpdater.hpp"
#include "SolverManager.hpp"
#include "AMPLSolver.hpp"
/*==============================================================================
Main file
==============================================================================*/
//
int main( int NumberOfCLIOptions, char ** CLIOptionStrings )
{
// --------------------------------------------------------------------------
// Defining and parsing the Command Line Interface (CLI) options
// --------------------------------------------------------------------------
cxxopts::Options CLIOptions("./SolverComponent",
"The NebulOuS Solver component");
CLIOptions.add_options()
("A,AMPLDir", "The AMPL installation path",
cxxopts::value<std::string>()->default_value("") )
("B,broker", "The URL of the AMQ broker",
cxxopts::value<std::string>()->default_value("localhost") )
("E,endpoint", "The endpoint name", cxxopts::value<std::string>() )
("M,ModelDir", "Directory to store the model and its data",
cxxopts::value<std::string>()->default_value("") )
("N,name", "The name of the Solver Component",
cxxopts::value<std::string>()->default_value("NebulOuS::Solver") )
("P,port", "TCP port on AMQ Broker",
cxxopts::value<unsigned int>()->default_value("5672") )
("U,user", "The user name used for the AMQ Broker connection",
cxxopts::value<std::string>()->default_value("admin") )
("Pw,password", "The password for the AMQ Broker connection",
cxxopts::value<std::string>()->default_value("admin") )
("?,help", "Print help information");
CLIOptions.allow_unrecognised_options();
auto CLIValues = CLIOptions.parse( NumberOfCLIOptions, CLIOptionStrings );
if( CLIValues.count("help") )
{
std::cout << CLIOptions.help() << std::endl;
exit( EXIT_SUCCESS );
}
// --------------------------------------------------------------------------
// Validating directories
// --------------------------------------------------------------------------
//
// The directories are given as strings and they must be validated to see if
// the provided values correspond to an existing directory in the case of the
// AMPL directory. The model directory will be created if it is not an empty
// string, for which a temparary directory will be created.
std::filesystem::path TheAMPLDirectory( CLIValues["AMPLDir"].as<std::string>() );
if( !std::filesystem::exists( TheAMPLDirectory ) )
{
std::source_location Location = std::source_location::current();
std::ostringstream ErrorMessage;
ErrorMessage << "[" << Location.file_name() << " at line " << Location.line()
<< "in function " << Location.function_name() <<"] "
<< "The AMPL installation driectory is given as ["
<< CLIValues["AMPLDir"].as<std::string>()
<< "] but this directory does not ezist!";
throw std::invalid_argument( ErrorMessage.str() );
}
std::filesystem::path ModelDirectory( CLIValues["ModelDir"].as<std::string>() );
if( ModelDirectory.empty() || !std::filesystem::exists( ModelDirectory ) )
ModelDirectory = std::filesystem::temp_directory_path();
// --------------------------------------------------------------------------
// AMQ communication
// --------------------------------------------------------------------------
//
// The AMQ communication is managed by the standard communication actors of
// the Theron++ Actor framewokr. Thus, it is just a matter of starting the
// endpoint actors with the given command line parameters.
//
// The network endpoint takes the endpoint name as the first argument, then
// the URL for the broker and the port number. The user name and the password
// are defined in the AMQ Qpid Proton connection options, and the values are
// therefore set for the connection options.
proton::connection_options AMQOptions;
AMQOptions.user( CLIValues["user"].as< std::string >() );
AMQOptions.password( CLIValues["password"].as< std::string >() );
// Then the network endpoint cna be constructed using the default names for
// the various network endpoint servers in order to pass the defined
// connection options.
Theron::AMQ::NetworkEndpoint AMQNetWork(
CLIValues["endpoint"].as< std::string >(),
CLIValues["broker"].as< std::string >(),
CLIValues["port"].as< unsigned int >(),
Theron::AMQ::Network::NetworkLayerLabel,
Theron::AMQ::Network::SessionLayerLabel,
Theron::AMQ::Network::PresentationLayerLabel,
AMQOptions
);
// --------------------------------------------------------------------------
// Solver component actors
// --------------------------------------------------------------------------
//
// The solver managager must be started first since its address should be
// a parameter to the constructor of the Metric Updater so the latter actor
// knows where to send application execution contexts whenever a new solution
// is requested by the SLO Violation Detector through the Optimzer Controller.
NebulOuS::SolverManager< NebulOuS::AMPLSolver >
WorkloadMabager( "WorkloadManager",
std::string( NebulOuS::Solver::Solution::MessageIdentifier ),
std::string( NebulOuS::Solver::ApplicationExecutionContext::MessageIdentifier ),
"AMPLSolver", ampl::Environment( TheAMPLDirectory.native() ), ModelDirectory );
NebulOuS::MetricUpdater
ContextMabager( "MetricUpdater", WorkloadMabager.GetAddress() );
// --------------------------------------------------------------------------
// Termination management
// --------------------------------------------------------------------------
//
// The critical part is to wait for the global shut down message from the
// Optimiser controller. That message will trigger the network to shut down
// and the Solver Component may terminate when the actor system has finished.
// Thus, the actors can still be running for some time after the global shut
// down message has been received, and it is therefore necessary to also wait
// for the actors to terminate.
NebulOuS::ExecutionControl::WaitForTermination();
Theron::Actor::WaitForGlobalTermination();
return EXIT_SUCCESS;
}

View File

@ -1,5 +1,5 @@
/*============================================================================== /*==============================================================================
Solution Manager Solver Manager
This class handles the Execution Context mssage containing a time stamp and a This class handles the Execution Context mssage containing a time stamp and a
set of variable value assignments.It manages a time sorted queue and dispatches set of variable value assignments.It manages a time sorted queue and dispatches
@ -7,7 +7,7 @@ the first application execution context to the solver when the solver is ready.
The solution returned for a given execution context will be published together The solution returned for a given execution context will be published together
with the execution context and the maximal utility value found by the solver. with the execution context and the maximal utility value found by the solver.
The solver actor class is given as a template argument to the solution manager, The solver actor class is given as a template argument to the solver manager,
and at least one solver actor is instantiated at start up. This to allow and at least one solver actor is instantiated at start up. This to allow
multiple solvers to run in parallel should this be necessary to serve properly multiple solvers to run in parallel should this be necessary to serve properly
the queue of waiting application execution contexts. If there are multiple the queue of waiting application execution contexts. If there are multiple
@ -52,6 +52,8 @@ License: MPL2.0 (https://www.mozilla.org/en-US/MPL/2.0/)
#include <sstream> // For nice error messages #include <sstream> // For nice error messages
#include <stdexcept> // Standard exceptions #include <stdexcept> // Standard exceptions
#include <source_location> // Error location reporting #include <source_location> // Error location reporting
#include <condition_variable> // Execution stop management
#include <mutex> // Lock the condtion variable
// Other packages // Other packages
@ -72,6 +74,7 @@ using JSON = nlohmann::json; // Short form name space
// NebulOuS headers // NebulOuS headers
#include "ExecutionControl.hpp" // Shut down messages
#include "Solver.hpp" // The basic solver class #include "Solver.hpp" // The basic solver class
namespace NebulOuS namespace NebulOuS
@ -87,7 +90,8 @@ class SolverManager
: virtual public Theron::Actor, : virtual public Theron::Actor,
virtual public Theron::StandardFallbackHandler, virtual public Theron::StandardFallbackHandler,
virtual public Theron::NetworkingActor< virtual public Theron::NetworkingActor<
typename Theron::AMQ::Message::PayloadType > typename Theron::AMQ::Message::PayloadType >,
virtual public ExecutionControl
{ {
// There is a topic name used to publish solutions found by the solvers. This // There is a topic name used to publish solutions found by the solvers. This
// topic is given to the constructor and kept as a constant during the class // topic is given to the constructor and kept as a constant during the class
@ -104,8 +108,6 @@ private:
// The solution manager dispatches the application execution contexts as // The solution manager dispatches the application execution contexts as
// requests for solutions to a pool of solvers. // requests for solutions to a pool of solvers.
private:
std::list< SolverType > SolverPool; std::list< SolverType > SolverPool;
std::unordered_set< Address > ActiveSolvers, PassiveSolvers; std::unordered_set< Address > ActiveSolvers, PassiveSolvers;
@ -161,137 +163,140 @@ private:
ContextExecutionQueue.erase( ContextExecutionQueue.begin(), ContextExecutionQueue.erase( ContextExecutionQueue.begin(),
ContextExecutionQueue.begin() + DispatchedContexts ); ContextExecutionQueue.begin() + DispatchedContexts );
}
} }
}
// The handler function simply enqueues the received context, records its // The handler function simply enqueues the received context, records its
// timesamp and dispatch as many contexts as possible to the solvers. Note // timesamp and dispatch as many contexts as possible to the solvers. Note
// that the context identifiers must be unique and there is a logic error // that the context identifiers must be unique and there is a logic error
// if there is already a context with the same identifier. Then an invalid // if there is already a context with the same identifier. Then an invalid
// arguemtn exception will be thrown. This strategy should be reconsidered // arguemtn exception will be thrown. This strategy should be reconsidered
// if there will be multiple entities firing execution contexts. // if there will be multiple entities firing execution contexts.
void HandleApplicationExecutionContext( void HandleApplicationExecutionContext(
const Solver:: ApplicationExecutionContext & TheContext, const Solver:: ApplicationExecutionContext & TheContext,
const Address TheRequester ) const Address TheRequester )
{
auto [_, Success] = Contexts.try_emplace(
TheContext[ Solver::ContextIdentifier.data() ], TheContext );
if( Success )
{ {
ContextExecutionQueue.emplace( auto [_, Success] = Contexts.try_emplace(
TheContext[ Solver::TimeStamp.data() ], TheContext[ Solver::ContextIdentifier.data() ], TheContext );
TheContext[ Solver::ContextIdentifier.data() ] );
if( Success )
{
ContextExecutionQueue.emplace(
TheContext[ Solver::TimeStamp.data() ],
TheContext[ Solver::ContextIdentifier.data() ] );
DispatchToSolvers();
}
else
{
std::source_location Location = std::source_location::current();
std::ostringstream ErrorMessage;
ErrorMessage << "[" << Location.file_name() << " at line "
<< Location.line()
<< "in function " << Location.function_name() <<"] "
<< "An Application Execution Context with identifier "
<< TheContext[ Solver::ContextIdentifier.data() ]
<< " was received while there is already one with the same "
<< "identifer. The identifiers must be unique!";
throw std::invalid_argument( ErrorMessage.str() );
}
}
// --------------------------------------------------------------------------
// Solutions
// --------------------------------------------------------------------------
//
// When a solution is received from a solver, it will be dispatched to all
// entities subscribing to the solution topic, and the solver will be returned
// to the pool of passive solvers. The dispatch function will be called at the
// end to ensure that the solver starts working on queued application execution
// contexts, if any.
void PublishSolution( const Solver::Solution & TheSolution,
const Addres TheSolver )
{
Send( TheSolution, SolutionReceiver );
PassiveSolvers.insert( ActiveSolvers.extract( TheSolver ) );
DispatchToSolvers(); DispatchToSolvers();
} }
else
{
std::source_location Location = std::source_location::current();
std::ostringstream ErrorMessage;
ErrorMessage << "[" << Location.file_name() << " at line " // --------------------------------------------------------------------------
<< Location.line() // Constructor and destructor
<< "in function " << Location.function_name() <<"] " // --------------------------------------------------------------------------
<< "An Application Execution Context with identifier " //
<< TheContext[ Solver::ContextIdentifier.data() ] // The constructor takes the name of the Solution Mnager Actor, the name of
<< " was received while there is already one with the same " // the topic where the solutions should be published, and the topic where the
<< "identifer. The identifiers must be unique!"; // application execution contexts will be published. If the latter is empty,
// the manager will not listen to any externally generated requests, only those
// being sent from the Metric Updater supposed to exist on the same Actor
// system node as the manager.The final arguments to the constructor is a
// set of arguments to the solver type in the order expected by the solver
// type and repeated for the number of (local) solvers that should be created.
//
// Currently this manager does not support dispatching configurations to
// remote solvers and collect responses from these. However, this can be
// circumvented by creating a local "solver" transferring the requests to
// a remote solvers and collecting results from the remote solver.
public:
SolverManager( const std::string & TheActorName,
const Theron::AMQ::TopicName & SolutionTopic,
const Theron::AMQ::TopicName & ContextPublisherTopic,
const auto & ...SolverArguments )
: Actor( TheActorName ),
StandardFallbackHandler( Actor::GetAddress().AsString() ),
NetworkingActor( Actor::GetAddress().AsString() ),
ExecutionControl( Actor::GetAddress().AsString() ),
SolutionReceiver( SolutionTopic ),
SolverPool(), ActiveSolvers(), PassiveSolvers(),
Contexts(), ContextExecutionQueue()
{
// The solvers are created by expanding the arguments for the solvers
// one by one creating new elements in the solver pool
( SolverPool.emplace_back( std::forward( SolverArguments ) ), ... );
// If the solvers were successfully created, their addresses are recorded as
// passive servers, and a publisher is made for the solution channel, and
// optionally, a subscritpion is made for the alternative context publisher
// topic. If the solvers could not be created, then an invalid argument
// exception will be thrown.
if( !SolverPool.empty() )
{
std::ranges::transform( ServerPool, std::inserter( PassiveSolvers ),
[](const SolverType & TheSolver){ return TheSolver.GetAddress(); } );
Send( Theron::AMQ::NetworkLayer::TopicSubscription(
Theron::AMQ::NetworkLayer::TopicSubscription::Action::Publisher,
SolutionTopic ), GetSessionLayerAddress() );
if( !ContextPublisherTopic.empty() )
Send( Theron::AMQ::NetworkLayer::TopicSubscription(
Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription,
ContextPublisherTopic ), GetSessionLayerAddress() );
}
else
{
std::source_location Location = std::source_location::current();
std::ostringstream ErrorMessage;
ErrorMessage << "[" << Location.file_name() << " at line "
<< Location.line()
<< "in function " << Location.function_name() <<"] "
<< "It was not possible to construct any solver of type "
<< boost::core::demangle( typeid( SolverType ).name() )
<< " from the given constructor argument types: ";
(( ErrorMessage << boost::core::demangle( typeid( SolverArguments ).name() ) << " " ), ... );
throw std::invalid_argument( ErrorMessage.str() ); throw std::invalid_argument( ErrorMessage.str() );
}
} }
}
// --------------------------------------------------------------------------
// Solutions
// --------------------------------------------------------------------------
//
// When a solution is received from a solver, it will be dispatched to all
// entities subscribing to the solution topic, and the solver will be returned
// to the pool of passive solvers. The dispatch function will be called at the
// end to ensure that the solver starts working on queued application execution
// contexts, if any.
void PublishSolution( const Solver::Solution & TheSolution,
const Addres TheSolver )
{
Send( TheSolution, SolutionReceiver );
PassiveSolvers.insert( ActiveSolvers.extract( TheSolver ) );
DispatchToSolvers();
}
// --------------------------------------------------------------------------
// Constructor and destructor
// --------------------------------------------------------------------------
//
// The constructor takes the name of the Solution Mnager Actor, the name of
// the topic where the solutions should be published, and the topic where the
// application execution contexts will be published. If the latter is empty,
// the manager will not listen to any externally generated requests, only those
// being sent from the Metric Updater supposed to exist on the same Actor
// system node as the manager.The final arguments to the constructor is a
// set of arguments to the solver type in the order expected by the solver
// type and repeated for the number of (local) solvers that should be created.
//
// Currently this manager does not support dispatching configurations to
// remote solvers and collect responses from these. However, this can be
// circumvented by creating a local "solver" transferring the requests to
// a remote solvers and collecting results from the remote solver.
SolverManager( const std::string & TheActorName,
const Theron::AMQ::TopicName & SolutionTopic,
const Theron::AMQ::TopicName & ContextPublisherTopic,
const auto & ...SolverArguments )
: Actor( TheActorName ),
StandardFallbackHandler( Actor::GetAddress().AsString() ),
NetworkingActor( Actor::GetAddress().AsString() ),
SolutionReceiver( SolutionTopic ),
SolverPool(), ActiveSolvers(), PassiveSolvers(),
Contexts(), ContextExecutionQueue()
{
// The solvers are created by expanding the arguments for the solvers
// one by one creating new elements in the solver pool
( SolverPool.emplace_back( SolverArguments ), ... );
// If the solvers were successfully created, their addresses are recorded as
// passive servers, and a publisher is made for the solution channel, and
// optionally, a subscritpion is made for the alternative context publisher
// topic. If the solvers could not be created, then an invalid argument
// exception will be thrown.
if( !SolverPool.empty() )
{
std::ranges::transform( ServerPool, std::inserter( PassiveSolvers ),
[](const SolverType & TheSolver){ return TheSolver.GetAddress(); } );
Send( Theron::AMQ::NetworkLayer::TopicSubscription(
Theron::AMQ::NetworkLayer::TopicSubscription::Action::Publisher,
SolutionTopic ), GetSessionLayerAddress() );
if( !ContextPublisherTopic.empty() )
Send( Theron::AMQ::NetworkLayer::TopicSubscription(
Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription,
ContextPublisherTopic ), GetSessionLayerAddress() );
}
else
{
std::source_location Location = std::source_location::current();
std::ostringstream ErrorMessage;
ErrorMessage << "[" << Location.file_name() << " at line "
<< Location.line()
<< "in function " << Location.function_name() <<"] "
<< "It was not possible to construct any solver of type "
<< boost::core::demangle( typeid( SolverType ).name() )
<< " from the given constructor argument types: ";
(( ErrorMessage << boost::core::demangle( typeid( SolverArguments ).name() ) << " " ), ... );
throw std::invalid_argument( ErrorMessage.str() );
}
}
}; };