From d2324e42a3abf9c68974cd1e8d8512e1a9802a53 Mon Sep 17 00:00:00 2001 From: Geir Horn Date: Wed, 20 Mar 2024 19:13:27 +0100 Subject: [PATCH] Merge all outstanding changesets Commit messages as follows: Change-Id: I631d374144efc540b158868fa65a0bac232a7548 --- Changed the comments for the SLO Violation handler --- Performance update: Adding a boolean flag to indicate when all metrics have been set to avoid a linear scan of all metrics on each SLO Violation message. --- Metric list and reconfiguration wait Metric updater now listening for a metric list from the Optimiser Controller and not frmo the EMS, and discards SLO Violations until the Optimiser Controller sends a message indicating that the previous application reconfiguration has finished. --- Log message to indicate that the "reconfiguration done" even message has been received --- Added the right topic for the metric list --- New messages Metric list from the controller New message format for AMPL model definition Fixed the AMQ message property settings --- .gitignore | 2 + .vscode/c_cpp_properties.json | 12 ++- AMPLSolver.cpp | 105 ++++++++++--------- AMPLSolver.hpp | 80 +++++++++------ ExecutionControl.cpp | 8 +- ExecutionControl.hpp | 13 +-- MetricUpdater.cpp | 183 +++++++++++++++++++++------------- MetricUpdater.hpp | 98 +++++++++++++----- Solver.code-workspace | 3 +- Solver.hpp | 35 ++++--- SolverComponent.cpp | 26 ++--- SolverManager.hpp | 4 +- 12 files changed, 348 insertions(+), 221 deletions(-) diff --git a/.gitignore b/.gitignore index 0d0f370..580fd42 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ __pycache__/ *.d /SolverComponent /ampl.lic +*.~1NiBF4kGEKO5WetCRbF0UOTl6FtAH2hiz-f9e028.insyncdl +*.insyncdl diff --git a/.vscode/c_cpp_properties.json b/.vscode/c_cpp_properties.json index f9c1283..ff6b24a 100644 --- a/.vscode/c_cpp_properties.json +++ b/.vscode/c_cpp_properties.json @@ -7,8 +7,12 @@ "/home/GHo/Documents/Code/CxxOpts/include", "/opt/AMPL/amplapi/include/", "${workspaceFolder}/**", - "/usr/lib/gcc/x86_64-redhat-linux/13/../../../../include/c++/13", - "/home/GHo/Documents/Code/Theron++" + "/home/GHo/Documents/Code/NebulOuS/Solvers", + "/usr/include/c++/13", + "/home/GHo/Documents/Code/Theron++", + "/usr/include/c++/13/x86_64-redhat-linux", + "/usr/include/linux", + "/usr/include/c++/13/tr1" ], "defines": [], "cStandard": "c23", @@ -29,7 +33,9 @@ "/home/GHo/Documents/Code/Theron++/", "/home/GHo/Documents/Code/CxxOpts/include", "/opt/AMPL/amplapi/include/", - "/usr/lib/gcc/x86_64-redhat-linux/13/../../../../include/c++/13" + "/usr/include/c++/13", + "/usr/include/linux", + "/home/GHo/Documents/Code/NebulOuS/Solvers" ], "limitSymbolsToIncludedHeaders": false } diff --git a/AMPLSolver.cpp b/AMPLSolver.cpp index ab90ff8..3e24d31 100644 --- a/AMPLSolver.cpp +++ b/AMPLSolver.cpp @@ -31,50 +31,30 @@ namespace NebulOuS // is received updating AMPL model parameters. Hence the common file creation // is taken care of by a dedicated function. -std::string AMPLSolver::SaveFile( const JSON & TheMessage, +std::string AMPLSolver::SaveFile( std::string_view TheName, + std::string_view TheContent, const std::source_location & Location ) { - if( TheMessage.is_object() ) + std::string TheFileName = ProblemFileDirectory / TheName; + + std::fstream TheFile( TheFileName, std::ios::out | std::ios::binary ); + + if( TheFile.is_open() ) { - std::string TheFileName - = ProblemFileDirectory / TheMessage.at( AMPLSolver::FileName ); - - std::fstream ProblemFile( TheFileName, std::ios::out | std::ios::binary ); - - if( ProblemFile.is_open() ) - { - ProblemFile << TheMessage.at( AMPLSolver::FileContent ).get(); - ProblemFile.close(); - return TheFileName; - } - else - { - std::source_location Location = std::source_location::current(); - std::ostringstream ErrorMessage; - - ErrorMessage << "[" << Location.file_name() << " at line " - << Location.line() - << "in function " << Location.function_name() <<"] " - << "The AMPL file at " - << TheFileName - << " could not be opened for output!"; - - throw std::system_error( static_cast< int >( std::errc::io_error ), - std::system_category(), ErrorMessage.str() ); - } + TheFile << TheContent; + TheFile.close(); + return TheFileName; } else { - std::source_location Location = std::source_location::current(); std::ostringstream ErrorMessage; ErrorMessage << "[" << Location.file_name() << " at line " << Location.line() << "in function " << Location.function_name() <<"] " - << "The JSON message is not an object. The received " - << "message is " << std::endl - << TheMessage.dump(2) - << std::endl; + << "The AMPL file at " + << TheFileName + << " could not be opened for output!"; throw std::system_error( static_cast< int >( std::errc::io_error ), std::system_category(), ErrorMessage.str() ); @@ -163,13 +143,18 @@ void AMPLSolver::DefineProblem(const Solver::OptimisationProblem & TheProblem, // First storing the AMPL problem file from its definition in the message // and read the file back to the AMPL interpreter. - ProblemDefinition.read( SaveFile( TheProblem ) ); + ProblemDefinition.read( SaveFile( + TheProblem.at( + OptimisationProblem::Keys::ProblemFile ).get< std::string >() , + TheProblem.at( + OptimisationProblem::Keys::ProblemDescription ).get< std::string >() ) ); // The next is to read the label of the default objective function and // store this. An invalid argument exception is thrown if the field is missing - if( TheProblem.contains( Solver::ObjectiveFunctionLabel ) ) - DefaultObjectiveFunction = TheProblem.at( Solver::ObjectiveFunctionLabel ); + if( TheProblem.contains(OptimisationProblem::Keys::DefaultObjectiveFunction) ) + DefaultObjectiveFunction + = TheProblem.at( OptimisationProblem::Keys::DefaultObjectiveFunction ); else { std::source_location Location = std::source_location::current(); @@ -180,24 +165,44 @@ void AMPLSolver::DefineProblem(const Solver::OptimisationProblem & TheProblem, << "in function " << Location.function_name() <<"] " << "The problem definition must contain a default objective " << "function under the key [" - << Solver::ObjectiveFunctionLabel + << OptimisationProblem::Keys::DefaultObjectiveFunction << "]" << std::endl; throw std::invalid_argument( ErrorMessage.str() ); } - // After all the manatory fields have been processed, the set of constants - // will be processed storing the mapping from variable value to constant. + // The default values for the data will be loaded from the data file. This + // operation is the same as the one done for data messages, and to avoid + // code duplication the handler is just invoked using the address of this + // solver Actor as the the sender is not important for this update. - if( TheProblem.contains( ConstantsLabel ) && - TheProblem.at( ConstantsLabel ).is_object() ) + if( TheProblem.contains( DataFileMessage::Keys::DataFile ) && + TheProblem.contains( DataFileMessage::Keys::NewData ) ) + { + std::string FileContent + = TheProblem.at( DataFileMessage::Keys::NewData ).get< std::string >(); + + if( !FileContent.empty() ) + DataFileUpdate( DataFileMessage( + TheProblem.at( DataFileMessage::Keys::DataFile ).get< std::string >(), + FileContent ), + GetAddress() ); + } + + // The set of constants will be processed storing the mapping from a variable + // value to a constant. + + if( TheProblem.contains( OptimisationProblem::Keys::Constants ) && + TheProblem.at( OptimisationProblem::Keys::Constants ).is_object() ) for( const auto & [ ConstantName, ConstantRecord ] : - TheProblem.at( ConstantsLabel ).items() ) + TheProblem.at( OptimisationProblem::Keys::Constants ).items() ) { - VariablesToConstants.emplace( ConstantRecord.at( VariableName ), - ConstantName ); + VariablesToConstants.emplace( + ConstantRecord.at( OptimisationProblem::Keys::VariableName ), + ConstantName ); + SetAMPLParameter( ConstantName, - ConstantRecord.at( InitialConstantValue ) ); + ConstantRecord.at( OptimisationProblem::Keys::InitialConstantValue ) ); } // Finally, the problem has been defined and the flag is set to allow @@ -215,10 +220,12 @@ void AMPLSolver::DefineProblem(const Solver::OptimisationProblem & TheProblem, // the Define Problem message handler: The save file is used to store the // received file, which is then loaded as the data problem. -void AMPLSolver::DataFileUpdate( const DataFileMessage & TheDataFile, +void AMPLSolver::DataFileUpdate( const DataFileMessage & NewData, const Address TheOracle ) { - ProblemDefinition.readData( SaveFile( TheDataFile ) ); + ProblemDefinition.readData( SaveFile( + NewData.at( DataFileMessage::Keys::DataFile ).get< std::string >(), + NewData.at( DataFileMessage::Keys::NewData ).get< std::string >() ) ); } // ----------------------------------------------------------------------------- @@ -385,7 +392,7 @@ AMPLSolver::AMPLSolver( const std::string & TheActorName, Send( Theron::AMQ::NetworkLayer::TopicSubscription( Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription, - Theron::AMQ::TopicName( DataFileMessage::MessageIdentifier ) + DataFileMessage::AMQTopic ), GetSessionLayerAddress() ); } @@ -397,7 +404,7 @@ AMPLSolver::~AMPLSolver() if( HasNetwork() ) Send( Theron::AMQ::NetworkLayer::TopicSubscription( Theron::AMQ::NetworkLayer::TopicSubscription::Action::CloseSubscription, - Theron::AMQ::TopicName( DataFileMessage::MessageIdentifier ) + DataFileMessage::AMQTopic ), GetSessionLayerAddress() ); } diff --git a/AMPLSolver.hpp b/AMPLSolver.hpp index 6efaf4f..631da61 100644 --- a/AMPLSolver.hpp +++ b/AMPLSolver.hpp @@ -85,21 +85,18 @@ class AMPLSolver // Utility methods // -------------------------------------------------------------------------- // - // Since both the optimisation problem file and the data file(s) will be sent - // as JSON messages with a single key-value pair where the key is the filename - // and the value is the file content, there is a common dfinition of the - // problem file directory and a function to read the file. The function will - // throw errors if the JSON message given is not an object, or of there are - // issues opening the file name given. If the file could be successfully - // saved, the functino will close the file and return the file name for - // further processing. + // Since both the optimisation problem file and the data file(s) will arrive + // as messages containing the file name and the then the content of the file + // as a long text string. The following file will open the file for writing + // and save the content string to this file. private: const std::filesystem::path ProblemFileDirectory; - std::string SaveFile( const JSON & TheMessage, - const std::source_location & Location + std::string SaveFile( std::string_view TheName, + std::string_view TheContent, + const std::source_location & Location = std::source_location::current() ); // There is also a utility function to look up a named AMPL parameter and @@ -130,28 +127,36 @@ protected: virtual void DefineProblem( const Solver::OptimisationProblem & TheProblem, const Address TheOracle ) override; - // The topic on which the problem file is posted is currently defined as a - // constant string - - static constexpr std::string_view AMPLProblemTopic - = "eu.nebulouscloud.optimiser.solver.model"; - // The JSON message received on this topic is supposed to contain several // keys in the JSON message // 1) The filename of the problem file // 2) The file content as a single string - // 3) The default objective function (defined in the Solver class) - // 4) An optional constants section containing constant names as keys + // 3) The name of the initial data file + // 4) The content of the initial data file as a single string + // 5) The default objective function (defined in the Solver class) + // 6) An optional constants section containing constant names as keys // and the values will be another map containing the variable // whose value should be passed to the constant, and the initial // value of the constant. + // Since these elements are parts of the optimisation problem message + // whose class cannot be extended to contain these directly, it is + // necessary to scope these keys differently for the compiler. - static constexpr std::string_view - FileName = "FileName", - FileContent = "FileContent", - ConstantsLabel = "Constants", - VariableName = "Variable", - InitialConstantValue = "Value"; + struct OptimisationProblem + { + struct Keys + { + static constexpr std::string_view + ProblemFile = "ModelFileName", + ProblemDescription = "ModelFileContent", + DataFile = "DataFileName", + InitialisationData = "DataFileContent", + DefaultObjectiveFunction = "ObjectiveFunction", + Constants = "Constants", + VariableName = "Variable", + InitialConstantValue = "Value"; + }; + }; // Finally, no solution will be produced unless the problem has been // defined. A flag is therefore set by the message handler indicating @@ -193,18 +198,27 @@ public: { public: - // The data files are assumed to be published on a dedicated topic for the - // optimiser + // The data files are assumed to be published by the performance module + // on a dedicated topic topic for the running solvers. - static constexpr std::string_view MessageIdentifier - = "eu.nebulouscloud.optimiser.solver.data"; + static constexpr std::string_view AMQTopic + = "eu.nebulouscloud.optimiser.performancemodule.data"; + // The received message will be a mapp supporting the following keys + // basically defining the data file name and its content. - DataFileMessage( const std::string & TheDataFileName, + struct Keys + { + static constexpr std::string_view + DataFile = "DataFileName", + NewData = "DataFileContent"; + }; + + DataFileMessage( const std::string_view & TheDataFileName, const JSON & DataFileContent ) - : JSONTopicMessage( std::string( MessageIdentifier ), - { { FileName, TheDataFileName }, - { FileContent, DataFileContent } } ) + : JSONTopicMessage( AMQTopic, + { { Keys::DataFile, TheDataFileName }, + { Keys::NewData, DataFileContent } } ) {} DataFileMessage( const DataFileMessage & Other ) @@ -212,7 +226,7 @@ public: {} DataFileMessage() - : JSONTopicMessage( std::string( MessageIdentifier ) ) + : JSONTopicMessage( AMQTopic ) {} virtual ~DataFileMessage() = default; diff --git a/ExecutionControl.cpp b/ExecutionControl.cpp index 7b57811..7b8fb80 100644 --- a/ExecutionControl.cpp +++ b/ExecutionControl.cpp @@ -58,7 +58,7 @@ void ExecutionControl::StopMessageHandler( const StopMessage & Command, std::lock_guard< std::mutex > Lock( TerminationLock ); Send( StatusMessage( StatusMessage::State::Stopped ), - Address( std::string( StatusTopic ) ) ); + Address( StatusMessage::AMQTopic ) ); Send( Theron::Network::ShutDown(), Theron::Network::GetAddress( Theron::Network::Layer::Session ) ); @@ -83,11 +83,11 @@ ExecutionControl::ExecutionControl( const std::string & TheActorName ) Send( Theron::AMQ::NetworkLayer::TopicSubscription( Theron::AMQ::NetworkLayer::TopicSubscription::Action::Publisher, - std::string( StatusTopic ) + StatusMessage::AMQTopic ), GetSessionLayerAddress() ); Send( StatusMessage( StatusMessage::State::Starting ), - Address( std::string( StatusTopic ) ) ); + Address( StatusMessage::AMQTopic ) ); } @@ -99,7 +99,7 @@ ExecutionControl::~ExecutionControl( void ) if( HasNetwork() ) Send( Theron::AMQ::NetworkLayer::TopicSubscription( Theron::AMQ::NetworkLayer::TopicSubscription::Action::ClosePublisher, - std::string( StatusTopic ) + StatusMessage::AMQTopic ), GetSessionLayerAddress() ); } diff --git a/ExecutionControl.hpp b/ExecutionControl.hpp index 61f6ea8..b39895b 100644 --- a/ExecutionControl.hpp +++ b/ExecutionControl.hpp @@ -110,19 +110,20 @@ protected: public: + // The status of the solver is communicated on the dedicated status topic + + static constexpr std::string_view AMQTopic + = "eu.nebulouscloud.solver.state"; + + StatusMessage( State TheSituation, std::string AdditionalInformation = std::string() ) - : JSONMessage( std::string( StatusTopic ), + : JSONMessage( StatusMessage::AMQTopic, { {"when", UTCNow() }, {"state", ToString( TheSituation ) }, {"message", AdditionalInformation } } ) {} }; - // The status of the solver is communicated on the dedicated status topic - - static constexpr std::string_view StatusTopic - = "eu.nebulouscloud.solver.state"; - public: // The function used to wait for the termination message simply waits on the diff --git a/MetricUpdater.cpp b/MetricUpdater.cpp index c42b66f..1607081 100644 --- a/MetricUpdater.cpp +++ b/MetricUpdater.cpp @@ -30,67 +30,66 @@ namespace NebulOuS // Subscribing to metric prediction values // -------------------------------------------------------------------------- // -// The received message must be a JSON object with metric names as -// attribute (keys) and the topic name as the value. Multiple metrics maby be -// included in the same message and and the andler will iterate and set up a -// subcription for each of the provided metrics. It should be noted that -// initially the metric has no value, and it is a prerequisite that all -// metric values must be updated before the complete set of metrics will be -// used for finding a better configuration for the application's execution -// context given by the metric values. -// -// The message is just considered if the version number of the message is larger -// than the version of the current set of metrics. The complicating factor is -// to deal with metrics that have changed in the case the metric version is -// increased. Then new metrics must be subscribed, deleted metrics must be -// unsubscribed, and values for kept metrics must be kept. +// The Optimiser controller defines the metric names used in the optimisatoin +// model, and the metric subscription will subscribe to these. It is allowed +// that the metric list may change during run-time, and therefore the message +// hadler will make subscriptions for new metrics and remove subscriptions for +// metrics that are not included in the list, but currently having +// subscriptions. -void MetricUpdater::AddMetricSubscription( const MetricTopic & TheMetrics, - const Address OptimiserController ) +void MetricUpdater::AddMetricSubscription( + const MetricTopic & MetricDefinitions, const Address OptimiserController ) { - if( TheMetrics.is_object() && - TheMetrics.at( NebulOuS::MetricList ).is_array() ) + JSON TheMetrics = MetricDefinitions.at( MetricList ); + + if( TheMetrics.is_array() ) { - if( MetricsVersion < TheMetrics.at( MetricVersionCounter ).get() ) + // The first step is to try inserting the metrics into the metric value + // map and if this is successful, a subscription is created for the + // publisherof this metric value. The metric names are recorded since + // some of them may correspond to known metrics, some of them may + // correspond to metrics that are new. + + std::set< std::string > TheMetricNames; + + for (auto & MetricRecord : TheMetrics ) { - // The first step is to try inserting the metrics into the metric value - // map and if this is successful, a subscription is created for the - // publisherof this metric value. The metric names are recorded since - // some of them may correspond to known metrics, some of them may - // correspond to metrics that are new. + auto [ MetricRecordPointer, MetricAdded ] = MetricValues.try_emplace( + MetricRecord.get(), JSON() ); - std::set< std::string > TheMetricNames; + TheMetricNames.insert( MetricRecordPointer->first ); - for (auto & MetricRecord : TheMetrics.at( NebulOuS::MetricList ) ) + // If a new metric was added, a subscription will be set up for this + // new metric, and the flag indicating that values have been received + // for all metrics will be reset since this new metric has yet to receive + // its first value + + if( MetricAdded ) { - auto [ MetricRecordPointer, MetricAdded ] = MetricValues.try_emplace( - MetricRecord.at( NebulOuS::MetricName ).get(), JSON() ); + Send( Theron::AMQ::NetworkLayer::TopicSubscription( + Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription, + std::string( MetricValueRootString ) + MetricRecordPointer->first ), + GetSessionLayerAddress() ); - TheMetricNames.insert( MetricRecordPointer->first ); - - if( MetricAdded ) - Send( Theron::AMQ::NetworkLayer::TopicSubscription( - Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription, - std::string( MetricValueRootString ) + MetricRecordPointer->first ), - GetSessionLayerAddress() ); + AllMetricValuesSet = false; } - - // There could be some metric value records that were defined by the - // previous metrics defined, but missing from the new metric set. If - // this is the case, the metric value records for the missing metrics - // should be unsubcribed and their metric records removed. - - for( const auto & TheMetric : std::views::keys( MetricValues ) ) - if( !TheMetricNames.contains( TheMetric ) ) - { - Send( Theron::AMQ::NetworkLayer::TopicSubscription( - Theron::AMQ::NetworkLayer::TopicSubscription::Action::CloseSubscription, - std::string( MetricValueRootString ) + TheMetric ), - GetSessionLayerAddress() ); - - MetricValues.erase( TheMetric ); - } } + + // There could be some metric value records that were defined by the + // previous metrics defined, but missing from the new metric set. If + // this is the case, the metric value records for the missing metrics + // should be unsubcribed and their metric records removed. + + for( const auto & TheMetric : std::views::keys( MetricValues ) ) + if( !TheMetricNames.contains( TheMetric ) ) + { + Send( Theron::AMQ::NetworkLayer::TopicSubscription( + Theron::AMQ::NetworkLayer::TopicSubscription::Action::CloseSubscription, + std::string( MetricValueRootString ) + TheMetric ), + GetSessionLayerAddress() ); + + MetricValues.erase( TheMetric ); + } } else { @@ -99,7 +98,8 @@ void MetricUpdater::AddMetricSubscription( const MetricTopic & TheMetrics, ErrorMessage << "[" << Location.file_name() << " at line " << Location.line() << " in function " << Location.function_name() <<"] " - << "The message to define a new metric subscription is given as " + << "The message to define the application's execution context " + << "was given as: " << std::endl << std::endl << TheMetrics.dump(2) << std::endl << "this is not as expected!"; @@ -163,8 +163,11 @@ void MetricUpdater::UpdateMetricValue( // must look for this identifier type on the solutions in order to decide // which solutions to deploy. // -// The message will be ignored if not all metric values have been received, -// and no error message indication will be given. +// The message will be ignored if not all metric values have been received +// or if there are no metric values defined. In both cases the SLO violation +// message will just be ignored. In order to avoid the scan over all metrics +// to see if they are set, a boolean flag will be used and set once all metrics +// have values. Then future scans will be avoided. void MetricUpdater::SLOViolationHandler( const SLOViolation & SeverityMessage, const Address TheSLOTopic ) @@ -173,23 +176,49 @@ void MetricUpdater::SLOViolationHandler( Output << "Metric Updater: SLO violation received " << std::endl << SeverityMessage.dump(2) << std::endl; - // The application context can then be sent to the solution manager - // using the application execution context message provided that none of - // metric values are null indicating that no value has been received (yet) - // Thus, only if all metrics have values will the message be sent. - - if( !MetricValues.empty() && - std::ranges::none_of( MetricValues, - [](const auto & MetricRecord){ return MetricRecord.second.is_null(); } )) + if( !ReconfigurationInProgress && + ( AllMetricValuesSet || + (!MetricValues.empty() && + std::ranges::none_of( std::views::values( MetricValues ), + [](const auto & MetricValue){ return MetricValue.is_null(); } ))) ) + { Send( Solver::ApplicationExecutionContext( SeverityMessage.at( NebulOuS::TimePoint ).get< Solver::TimePointType >(), MetricValues, true ), TheSolverManager ); + + AllMetricValuesSet = true; + ReconfigurationInProgress = true; + } else Output << "... failed to forward the application execution context (size: " << MetricValues.size() << ")" << std::endl; } +// -------------------------------------------------------------------------- +// Reconfigured application +// -------------------------------------------------------------------------- +// +// When the reconfiguration message is received it is an indication tha the +// Optimiser Controller has reconfigured the application and that the +// application is running in the new configuration found by the solver. +// It is the event that is important m not the content of the message, and +// it is therefore only used to reset the ongoing reconfiguration flag. + +void MetricUpdater::ReconfigurationDone( + const ReconfigurationMessage & TheReconfiguraton, + const Address TheReconfigurationTopic ) +{ + Theron::ConsoleOutput Output; + + ReconfigurationInProgress = false; + + Output << "Reconfiguration ongoing flag reset after receiving the following " + << "message indicating that the previous reconfiguration was" + << "completed: " << std::endl + << TheReconfiguraton.dump(2) << std::endl; +} + // -------------------------------------------------------------------------- // Constructor and destructor // -------------------------------------------------------------------------- @@ -199,28 +228,39 @@ void MetricUpdater::SLOViolationHandler( // The message handlers are registered, and the the updater will then subscribe // to the two topics published by the Optimisation Controller: One for the // initial message defining the metrics and the associated topics to subscribe -// to for their values, and the second for receiving the SLO violation message. +// to for their values, and the second to know when a reconfiguration has been +// enacted based on a previously sent application execution context. One +// subscritpion is also made to receive the SLO violation message indicating +// that the running configuration is no longer valid and that a reconfiguration +// must be made. MetricUpdater::MetricUpdater( const std::string UpdaterName, const Address ManagerOfSolvers ) : Actor( UpdaterName ), StandardFallbackHandler( Actor::GetAddress().AsString() ), NetworkingActor( Actor::GetAddress().AsString() ), - MetricValues(), ValidityTime(0), TheSolverManager( ManagerOfSolvers ), - MetricsVersion(-1) + MetricValues(), ValidityTime(0), AllMetricValuesSet(false), + TheSolverManager( ManagerOfSolvers ), + ReconfigurationInProgress( false ) { RegisterHandler( this, &MetricUpdater::AddMetricSubscription ); RegisterHandler( this, &MetricUpdater::UpdateMetricValue ); RegisterHandler( this, &MetricUpdater::SLOViolationHandler ); + RegisterHandler( this, &MetricUpdater::ReconfigurationDone ); Send( Theron::AMQ::NetworkLayer::TopicSubscription( Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription, - std::string( NebulOuS::MetricSubscriptions ) ), + NebulOuS::MetricSubscriptions ), GetSessionLayerAddress() ); Send( Theron::AMQ::NetworkLayer::TopicSubscription( Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription, - std::string( NebulOuS::SLOViolationTopic ) ), + NebulOuS::ReconfigurationTopic ), + GetSessionLayerAddress() ); + + Send( Theron::AMQ::NetworkLayer::TopicSubscription( + Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription, + NebulOuS::SLOViolationTopic ), GetSessionLayerAddress() ); } @@ -235,12 +275,17 @@ MetricUpdater::~MetricUpdater() { Send( Theron::AMQ::NetworkLayer::TopicSubscription( Theron::AMQ::NetworkLayer::TopicSubscription::Action::CloseSubscription, - std::string( NebulOuS::MetricSubscriptions ) ), + NebulOuS::MetricSubscriptions ), GetSessionLayerAddress() ); Send( Theron::AMQ::NetworkLayer::TopicSubscription( Theron::AMQ::NetworkLayer::TopicSubscription::Action::CloseSubscription, - std::string( NebulOuS::SLOViolationTopic ) ), + NebulOuS::ReconfigurationTopic ), + GetSessionLayerAddress() ); + + Send( Theron::AMQ::NetworkLayer::TopicSubscription( + Theron::AMQ::NetworkLayer::TopicSubscription::Action::CloseSubscription, + NebulOuS::SLOViolationTopic ), GetSessionLayerAddress() ); std::ranges::for_each( std::views::keys( MetricValues ), diff --git a/MetricUpdater.hpp b/MetricUpdater.hpp index 423daad..51257f6 100644 --- a/MetricUpdater.hpp +++ b/MetricUpdater.hpp @@ -88,16 +88,14 @@ constexpr std::string_view TimePoint = "predictionTime"; // defined next. constexpr std::string_view MetricSubscriptions - = "eu.nebulouscloud.monitoring.metric_list"; + = "eu.nebulouscloud.optimiser.controller.metric_list"; // The JSON message attribute for the list of metrics is another JSON object // stored under the following key, see the Event type III defined in // https://158.39.75.54/projects/nebulous-collaboration-hub/wiki/slo-severity-based-violation-detector // where the name of the metric is defined under as sub-key. -constexpr std::string_view MetricList = "metric_list", - MetricName = "name", - MetricVersionCounter = "version"; +constexpr std::string_view MetricList = "metrics"; // The metric value messages will be published on different topics and to // check if an inbound message is from a metric value topic, it is necessary @@ -130,6 +128,14 @@ constexpr std::string_view MetricValueRootString constexpr std::string_view SLOViolationTopic = "eu.nebulouscloud.monitoring.slo.severity_value"; +// When a reconfiguration has been enacted by the Optimiser Controller and +// a new configuration is confirmed to be running on the new platofrm, it will +// send a message to inform all other components that the reconfiguration +// has happened on the following topic. + +constexpr std::string_view ReconfigurationTopic + = "eu.nebulouscloud.optimiser.adaptations"; + /*============================================================================== Metric Updater @@ -159,7 +165,7 @@ private: // assumed that same metric name is used both for the optimisation model // and for the metric topic. - std::unordered_map< Theron::AMQ::TopicName, JSON > MetricValues; + Solver::MetricValueType MetricValues; // The metric values should ideally be forecasted for the same future time // point, but this may not be assured, and as such a zero-order hold is @@ -172,24 +178,26 @@ private: Solver::TimePointType ValidityTime; - // When an SLO violation message is received the current vector of metric - // values should be sent as an application execution context (message) to the - // Solution Manager actor that will invoke a solver to find the optimal - // configuration for this configuration. The Metric Updater must therefore - // know the address of the Soler Manager, and this must be passed to - // the constructor. + // There is also a flag to indicate when all metric values have received + // values since optimising for a application execution context defiend all + // metrics requires that at least one value is received for each metric. This + // condition could be tested before sending the request to find a new + // solution, but this means testing all metrics in a linear scan for a + // condition that will only happen initially until all metrics have been seen + // and so it is better for the performance if there is a flag to check for + // this condition. - const Address TheSolverManager; + bool AllMetricValuesSet; // -------------------------------------------------------------------------- // Subscribing to metric prediction values // -------------------------------------------------------------------------- // // Initially, the Optimiser Controller will pass a message containing all - // optimiser metric names and the AMQ topic on which their values will be - // published. Essentially, these messages arrives as a JSON message with - // one attribute per metric, and where the value is the topic string for - // the value publisher. + // optimiser metric names that are used in the optimisation and therefore + // constitutes the application's execution context. This message is a simple + // JSON map containing an array since the Optimiser Controller is not able + // to send just an array. class MetricTopic : public Theron::AMQ::JSONTopicMessage @@ -211,22 +219,12 @@ private: virtual ~MetricTopic() = default; }; - // The metric definition message "Event type III" of the EMS is sent every - // 60 seconds in order to inform new components or crashed components about - // the metrics. The version number of the message is a counter that indicates - // if the set of metrics has changed. Thus the message should be ignored - // as long as the version number stays the same. The version number of the - // current set of metrics is therefore cached to avoid redefining the - // metrics. - - long int MetricsVersion; - // The handler for this message will check each attribute value of the // received JSON struct, and those not already existing in the metric // value map be added and a subscription made for the published // prediction values. - void AddMetricSubscription( const MetricTopic & TheMetrics, + void AddMetricSubscription( const MetricTopic & MetricDefinitions, const Address OptimiserController ); // -------------------------------------------------------------------------- @@ -296,6 +294,52 @@ private: void SLOViolationHandler( const SLOViolation & SeverityMessage, const Address TheSLOTopic ); + // The application execution context (message) will be sent to the + // Solution Manager actor that will invoke a solver to find the optimal + // configuration for this configuration. The Metric Updater must therefore + // know the address of the Solver Manager, and this must be passed to + // the constructor and stored for for the duration of the execution + + const Address TheSolverManager; + + // After the sending of the application's excution context, one should not + // initiate another reconfiguration because the state may the possibly be + // inconsistent with the SLO Violation Detector belieivng that the old + // configuration is still in effect while the new configuration is being + // enacted. It is therefore a flag that will be set by the SLO Violation + // handler indicating that a reconfiguration is ongoing. + + bool ReconfigurationInProgress; + + // When the reconfiguration has been done and the Optimizer Controller + // confirms that the application is running in a new configuration, it will + // send a reconfiguration completed message. This message will just be a + // JSON message. + + class ReconfigurationMessage + : public Theron::AMQ::JSONTopicMessage + { + public: + + ReconfigurationMessage( void ) + : JSONTopicMessage( std::string( ReconfigurationTopic ) ) + {} + + ReconfigurationMessage( const ReconfigurationMessage & Other ) + : JSONTopicMessage( Other ) + {} + + virtual ~ReconfigurationMessage() = default; + }; + + // The handler for this message will actually not use its contents, but only + // note that the reconfiguration has been completed to reset the + // reconfiguration in progress flag allowing future SLO Violation Events to + // triger new reconfigurations. + + void ReconfigurationDone( const ReconfigurationMessage & TheReconfiguraton, + const Address TheReconfigurationTopic ); + // -------------------------------------------------------------------------- // Constructor and destructor // -------------------------------------------------------------------------- diff --git a/Solver.code-workspace b/Solver.code-workspace index e95c076..dad863f 100644 --- a/Solver.code-workspace +++ b/Solver.code-workspace @@ -91,7 +91,8 @@ "valarray": "cpp", "bitset": "cpp", "regex": "cpp", - "syncstream": "cpp" + "syncstream": "cpp", + "expected": "cpp" }, "gerrit.gitRepo": "/home/GHo/Documents/Code/NebulOuS/Solvers" } diff --git a/Solver.hpp b/Solver.hpp index fa7059e..c9228a3 100644 --- a/Solver.hpp +++ b/Solver.hpp @@ -155,14 +155,20 @@ public: { public: - static constexpr std::string_view MessageIdentifier + // First the topic on which these messages will arrive is defined so that + // it can be used when subscribing. + + static constexpr std::string_view AMQTopic = "eu.nebulouscloud.optimiser.solver.context"; + // The full constructor takes the time point, the objective function to + // solve for, and the application's execution context as the metric map + ApplicationExecutionContext( const TimePointType MicroSecondTimePoint, const std::string ObjectiveFunctionID, const MetricValueType & TheContext, bool DeploySolution = false ) - : JSONTopicMessage( std::string( MessageIdentifier ), + : JSONTopicMessage( std::string( AMQTopic ), { { std::string( TimeStamp ), MicroSecondTimePoint }, { std::string( ObjectiveFunctionLabel ), ObjectiveFunctionID }, { std::string( ExecutionContext ), TheContext }, @@ -170,12 +176,13 @@ public: }) {} // The constructor omitting the objective function identifier is similar - // but without the objective function string. + // but without the objective function string implying that the default + // objective function should be used. ApplicationExecutionContext( const TimePointType MicroSecondTimePoint, const MetricValueType & TheContext, bool DeploySolution = false ) - : JSONTopicMessage( std::string( MessageIdentifier ), + : JSONTopicMessage( std::string( AMQTopic ), { { std::string( TimeStamp ), MicroSecondTimePoint }, { std::string( ExecutionContext ), TheContext }, { std::string( DeploymentFlag ), DeploySolution } @@ -191,7 +198,7 @@ public: // The default constructor simply stores the message identifier ApplicationExecutionContext() - : JSONTopicMessage( std::string( MessageIdentifier ) ) + : JSONTopicMessage( std::string( AMQTopic ) ) {} // The default destrucor is used @@ -240,7 +247,7 @@ public: static constexpr std::string_view ObjectiveValues = "ObjectiveValues"; static constexpr std::string_view VariableValues = "VariableValues"; - static constexpr std::string_view MessageIdentifier + static constexpr std::string_view AMQTopic = "eu.nebulouscloud.optimiser.solver.solution"; Solution( const TimePointType MicroSecondTimePoint, @@ -248,7 +255,7 @@ public: const ObjectiveValuesType & TheObjectiveValues, const VariableValuesType & TheVariables, bool DeploySolution ) - : JSONTopicMessage( std::string( MessageIdentifier ) , + : JSONTopicMessage( std::string( AMQTopic ) , { { std::string( TimeStamp ), MicroSecondTimePoint }, { std::string( ObjectiveFunctionLabel ), ObjectiveFunctionID }, { std::string( ObjectiveValues ) , TheObjectiveValues }, @@ -258,7 +265,7 @@ public: {} Solution() - : JSONTopicMessage( std::string( MessageIdentifier ) ) + : JSONTopicMessage( std::string( AMQTopic ) ) {} virtual ~Solution() = default; @@ -279,15 +286,15 @@ public: { public: - static constexpr std::string_view - MessageIdentifier = "eu.nebulouscloud.optimiser.solver.model"; + static constexpr std::string_view AMQTopic + = "eu.nebulouscloud.optimiser.controller.model"; OptimisationProblem( const JSON & TheProblem ) - : JSONTopicMessage( std::string( MessageIdentifier ), TheProblem ) + : JSONTopicMessage( std::string( AMQTopic ), TheProblem ) {} OptimisationProblem() - : JSONTopicMessage( std::string( MessageIdentifier ) ) + : JSONTopicMessage( std::string( AMQTopic ) ) {} virtual ~OptimisationProblem() = default; @@ -326,7 +333,7 @@ public: Send( Theron::AMQ::NetworkLayer::TopicSubscription( Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription, - Theron::AMQ::TopicName( OptimisationProblem::MessageIdentifier ) + OptimisationProblem::AMQTopic ), GetSessionLayerAddress() ); } @@ -337,7 +344,7 @@ public: if( HasNetwork() ) Send( Theron::AMQ::NetworkLayer::TopicSubscription( Theron::AMQ::NetworkLayer::TopicSubscription::Action::CloseSubscription, - Theron::AMQ::TopicName( OptimisationProblem::MessageIdentifier ) + OptimisationProblem::AMQTopic ), GetSessionLayerAddress() ); } }; diff --git a/SolverComponent.cpp b/SolverComponent.cpp index 99b2eab..163e82c 100644 --- a/SolverComponent.cpp +++ b/SolverComponent.cpp @@ -95,6 +95,7 @@ License: MPL2.0 (https://www.mozilla.org/en-US/MPL/2.0/) #include "proton/message.hpp" // AMQ messages definitions #include "proton/source_options.hpp" // App ID filters #include "proton/source.hpp" // The filter map +#include "proton/types.hpp" // Type definitions #include "Communication/AMQ/AMQMessage.hpp" // The AMQP messages #include "Communication/AMQ/AMQEndpoint.hpp" // The AMP endpoint #include "Communication/AMQ/AMQjson.hpp" // Transparent JSON-AMQP @@ -213,8 +214,7 @@ int main( int NumberOfCLIOptions, char ** CLIOptionStrings ) virtual proton::connection_options ConnectionOptions(void) const override { - proton::connection_options Options( - Theron::AMQ::NetworkLayer::AMQProperties::ConnectionOptions() ); + proton::connection_options Options( AMQProperties::ConnectionOptions() ); Options.user( User ); Options.password( Password ); @@ -235,8 +235,7 @@ int main( int NumberOfCLIOptions, char ** CLIOptionStrings ) proton::symbol FilterKey("selector"); proton::value FilterValue; proton::codec::encoder EncodedFilter( FilterValue ); - proton::receiver_options TheOptions( - Theron::AMQ::NetworkLayer::AMQProperties::ReceiverOptions() ); + proton::receiver_options TheOptions( AMQProperties::ReceiverOptions() ); std::ostringstream SelectorString; @@ -255,17 +254,18 @@ int main( int NumberOfCLIOptions, char ** CLIOptionStrings ) } // The application identifier must also be provided in every message to - // allow other receivers to filter on this. + // allow other receivers to filter on this. First will the default + // properties from the base class be set before the new application + // identifier property will be added. - virtual proton::message::property_map MessageProperties( + virtual std::map MessageProperties( const proton::message::property_map & CurrentProperties = proton::message::property_map() ) const override { - proton::message::property_map TheProperties( - Theron::AMQ::NetworkLayer::AMQProperties::MessageProperties( - CurrentProperties )); - - TheProperties.put( "application", ApplicationID ); + std::map + TheProperties( AMQProperties::MessageProperties( CurrentProperties ) ); + + TheProperties["application"] = ApplicationID; return TheProperties; } @@ -332,8 +332,8 @@ int main( int NumberOfCLIOptions, char ** CLIOptionStrings ) NebulOuS::SolverManager< NebulOuS::AMPLSolver > WorkloadMabager( CLIValues["Name"].as(), - std::string( NebulOuS::Solver::Solution::MessageIdentifier ), - std::string( NebulOuS::Solver::ApplicationExecutionContext::MessageIdentifier ), + NebulOuS::Solver::Solution::AMQTopic, + NebulOuS::Solver::ApplicationExecutionContext::AMQTopic, 1, "AMPLSolver", ampl::Environment( TheAMPLDirectory.native() ), ModelDirectory, CLIValues["Solver"].as() ); diff --git a/SolverManager.hpp b/SolverManager.hpp index de2406b..7e4c423 100644 --- a/SolverManager.hpp +++ b/SolverManager.hpp @@ -193,7 +193,7 @@ private: void PublishSolution( const Solver::Solution & TheSolution, const Address TheSolver ) { - Send( TheSolution, SolutionReceiver ); + Send( TheSolution, Address( SolutionReceiver ) ); PassiveSolvers.insert( ActiveSolvers.extract( TheSolver ) ); DispatchToSolvers(); } @@ -274,7 +274,7 @@ public: Send( ExecutionControl::StatusMessage( ExecutionControl::StatusMessage::State::Started - ), Address( std::string( ExecutionControl::StatusTopic ) ) ); + ), Address( ExecutionControl::StatusMessage::AMQTopic ) ); } else {