diff --git a/MetricUpdater.cpp b/MetricUpdater.cpp index 2ff9fff..29fbb8d 100644 --- a/MetricUpdater.cpp +++ b/MetricUpdater.cpp @@ -10,11 +10,11 @@ Contact: Geir.Horn@mn.uio.no License: MPL2.0 (https://www.mozilla.org/en-US/MPL/2.0/) ==============================================================================*/ -#include "ranges" // Better containers -#include // Making informative error messages -#include // To format error messages -#include // standard exceptions - +#include // Better containers +#include // Informative error messages +#include // To format error messages +#include // standard exceptions +#include // Iterator support #include "Communication/AMQ/AMQEndpoint.hpp" // For Topic subscriptions @@ -42,9 +42,9 @@ void MetricUpdater::AddMetricSubscription( const MetricTopic & TheMetrics, if( TheMetrics.is_object() ) for( const auto & [MetricName, TopicName] : TheMetrics.items() ) { - auto [ MetricRecord, NewMetric ] = MetricValues.try_emplace( TopicName, - MetricName, JSON() ); - + auto [ MetricRecord, NewMetric ] = MetricValues.try_emplace( + TopicName, MetricName, JSON() ); + if( NewMetric ) Send( Theron::AMQ::NetworkLayer::TopicSubscription( Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription, @@ -66,4 +66,117 @@ void MetricUpdater::AddMetricSubscription( const MetricTopic & TheMetrics, } } +// The metric update value is received whenever any of subscribed forecasters +// has a new value for its metric. The format of the message is described in +// the project wiki page [1], with an example message given as +// { +// "metricValue": 12.34, +// "level": 1, +// "timestamp": 163532341, +// "probability": 0.98, +// "confidence_interval " : [8,15] +// "predictionTime": 163532342, +// } +// +// Currently only the metric value and the timestamp will be used from this +// record. It would be interesting in the future to explore ways to use the +// confidence interval in some Bayesian resoning about the true value. +// +// The sender address will contain the metric topic, but this will contain the +// generic metric prediction root string, and this string must be removed +// before the metric name can be updated. + +void MetricUpdater::UpdateMetricValue( + const MetricValueUpdate & TheMetricValue, const Address TheMetricTopic) +{ + Theron::AMQ::TopicName TheTopic + = TheMetricTopic.AsString().erase(0, MetricValueRootString.size() ); + + if( MetricValues.contains( TheTopic ) ) + { + MetricValues[ TheTopic ].Value = TheMetricValue[ NebulOuS::ValueLabel ]; + + ValidityTime = std::max( ValidityTime, + TheMetricValue[ NebulOuS::TimePoint ].get< Solver::TimePointType >() ); + } +} + +// When an SLO Violation is predicted a message is received from the SLO +// violation detector and this will trigger the definition of a new +// application execution context and a request to the Solution Manager to +// generate a new solution for this context. +// +// Note that the identifier of the application execution context is defined +// based on the time point of the severity message. The Optimiser controller +// must look for this identifier type on the solutions in order to decide +// which solutions to deploy. + +void MetricUpdater::SLOViolationHandler( + const SLOViolation & SeverityMessage, const Address TheSLOTopic ) +{ + // The application execution context is constructed first + // as it represents the name and the current values of the recorded + // metrics. Note the construction has to be done conditionally based + // on whether the standard library containers supports the range based + // constructors defined for C++23 + + #ifdef __cpp_lib_containers_ranges + #pragma message("C++23: Range inserters available! Rewrite MetricUpdater.hpp!") + + Solver::MetricValueType TheApplicationExecutionContext( + std::views::transform( MetricValues, [](const auto & MetricRecord){ + return std::make_pair( MetricRecord.second.OptimisationName, + MetricRecord.second.Value ); + }) ); + #else + + Solver::MetricValueType TheApplicationExecutionContext; + + for( const auto & [_, MetricRecord ] : MetricValues ) + TheApplicationExecutionContext.emplace( MetricRecord.OptimisationName, + MetricRecord.Value ); + + #endif + + // The application context can then be sent to the solution manager + // using the corresponding message, and the time stamp of the severity + // message, + + Send( Solver::ApplicationExecutionContext( + SeverityMessage[ NebulOuS::SLOIdentifier ], + SeverityMessage[ NebulOuS::TimePoint ].get< Solver::TimePointType >(), + SeverityMessage[ NebulOuS::ObjectiveFunctionName ], + TheApplicationExecutionContext + ), TheSolutionManger ); +} + +// The constructor initialises the base classes and sets the validity time +// to zero so that it will be initialised by the first metric values received. +// The message handlers are registered, and the the updater will then subscribe +// to the two topics published by the Optimisation Controller: One for the +// initial message defining the metrics and the associated topics to subscribe +// to for their values, and the second for receiving the SLO violation message. + +MetricUpdater::MetricUpdater( const std::string UpdaterName, + const Address ManagerForSolutions ) +: Actor( UpdaterName ), + StandardFallbackHandler( Actor::GetAddress().AsString() ), + NetworkingActor( Actor::GetAddress().AsString() ), + MetricValues(), ValidityTime(0), TheSolutionManger( ManagerForSolutions ) +{ + RegisterHandler( this, &MetricUpdater::AddMetricSubscription ); + RegisterHandler( this, &MetricUpdater::UpdateMetricValue ); + RegisterHandler( this, &MetricUpdater::SLOViolationHandler ); + + Send( Theron::AMQ::NetworkLayer::TopicSubscription( + Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription, + std::string( MetricSubscriptions ) ), + Theron::Network::GetAddress( Theron::Network::Layer::Session ) ); + + Send( Theron::AMQ::NetworkLayer::TopicSubscription( + Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription, + std::string( SLOViolationTopic ) ), + Theron::Network::GetAddress( Theron::Network::Layer::Session ) ); +} + } // End name space NebulOuS \ No newline at end of file diff --git a/MetricUpdater.hpp b/MetricUpdater.hpp index 0b4d636..18cd2a2 100644 --- a/MetricUpdater.hpp +++ b/MetricUpdater.hpp @@ -62,30 +62,71 @@ using JSON = nlohmann::json; // Short form name space #include "Communication/AMQ/AMQEndpoint.hpp" // AMQ endpoint #include "Communication/AMQ/AMQSessionLayer.hpp" // For topic subscriptions +// NebulOuS files + +#include "Solver.hpp" // The generic solver base + namespace NebulOuS { +/*============================================================================== + + Basic interface definitions + +==============================================================================*/ +// // Definitions for the terminology to facilitate changing the lables of the // various message labels without changing the code. The definitions are // compile time constants and as such should not lead to any run-time overhead. +// The JSON attribute names may be found under the "Predicted monitoring +// metrics" section on the Wiki page [1]. -constexpr std::string_view ValueLabel{ "metricValue" }; -constexpr std::string_view TimePoint { "predictionTime" }; +constexpr std::string_view ValueLabel = "metricValue"; +constexpr std::string_view TimePoint = "predictionTime"; // The topic used for receiving the message(s) defining the metrics of the // application execution context as published by the Optimiser Controller is // defined next. -constexpr std::string_view MetricSubscriptions{ "ApplicationContext" }; +constexpr std::string_view MetricSubscriptions = "ApplicationContext"; // The metric value messages will be published on different topics and to // check if an inbound message is from a metric value topic, it is necessary // to test against the base string for the metric value topics according to -// the Wiki-page at -// https://openproject.nebulouscloud.eu/projects/nebulous-collaboration-hub/wiki/monitoringdata-interface +// the Wiki-page [1] -constexpr std::string_view MetricValueRootString{ - "eu.nebulouscloud.monitoring.predicted" -}; +constexpr std::string_view MetricValueRootString + = "eu.nebulouscloud.monitoring.predicted"; + +// The SLO violation detector will publish a message when a reconfiguration is +// deamed necessary for a future time point called "Event type V" on the wiki +// page [3]. The event contains a probability for at least one of the SLOs +// being violated at the predicted time point. It is not clear if the assessment +// is being made by the SLO violation detector at every new metric prediction, +// or if this event is only signalled when the probability is above some +// internal threshold of the SLO violation detector. +// +// The current implementation assumes that the latter is the case, and hence +// just receiving the message indicates that a new application configuration +// should be found given the application execution context as predicted by the +// metric values recorded by the Metric Updater. Should this assumption be +// wrong, the probability must be compared with some user set threshold for +// each message, and to cater for this the probability field will always be +// compared to a threshold, currently set to zero to ensure that every event +// message will trigger a reconfiguration. +// +// However, the Metric updater will get this message from the Optimiser +// Controller component only if an update must be made. The message must +// contain a unique identifier, a time point for the solution, and the objective +// function to be maximised. + +constexpr std::string_view SLOIdentifier = "Identifier"; +constexpr std::string_view ObjectiveFunctionName = "ObjectiveFunction"; + +// The messages from the Optimizer Controller will be sent on a topic that +// should follow some standard topic convention. + +constexpr std::string_view SLOViolationTopic + = "eu.nebulouscloud.optimiser.slo.violation"; /*============================================================================== @@ -117,8 +158,6 @@ private: // they arrive as JSON values this avoids converting the values on input and // output. The metric optimisation name is just a string. -private: - class MetricValueRecord { public: @@ -142,6 +181,26 @@ private: std::unordered_map< Theron::AMQ::TopicName, MetricValueRecord > MetricValues; + // The metric values should ideally be forecasted for the same future time + // point, but this may not be assured, and as such a zero-order hold is + // assumed for all metric values. This means that the last value received + // for a metric is taken to be valid until the next update. The implication + // is that the whole vector of metric values is valid for the largest time + // point of any of the predictions. Hence, the largest prediction time point + // must be stored for being able to associate a time point of validity to + // the retruned metric vector. + + Solver::TimePointType ValidityTime; + + // When an SLO violation message is received the current vector of metric + // values should be sent as an application execution context (message) to the + // Solution Manager actor that will invoke a solver to find the optimal + // configuration for this configuration. The Metric Updater must therefore + // know the address of the Solution Manager, and this must be passed to + // the constructor. + + const Address TheSolutionManger; + // -------------------------------------------------------------------------- // JSON messages: Type by topic // -------------------------------------------------------------------------- @@ -189,7 +248,7 @@ private: TypeByTopic( const TypeByTopic & Other ) : JSONMessage( Other.GetMessageIdentifier(), Other ) {} - + virtual ~TypeByTopic() = default; }; @@ -209,9 +268,14 @@ private: public: MetricTopic( void ) - : TypeByTopic( MetricSubscriptions.data() ) + : TypeByTopic( std::string( MetricSubscriptions ) ) {} + MetricTopic( const MetricTopic & Other ) + : TypeByTopic( Other ) + {} + + virtual ~MetricTopic() = default; }; // The handler for this message will check each attribute value of the @@ -226,6 +290,10 @@ private: // Metric values // -------------------------------------------------------------------------- // + // The metric value message is defined as a topic message where the message + // identifier is the root of the metric value topic name string. This is + // identical to a wildcard operation matching all topics whose name start + // with this string. class MetricValueUpdate : public TypeByTopic @@ -233,19 +301,71 @@ private: public: MetricValueUpdate( void ) - : TypeByTopic( MetricValueRootString.data() ) + : TypeByTopic( std::string( MetricValueRootString ) ) {} + MetricValueUpdate( const MetricValueUpdate & Other ) + : TypeByTopic( Other ) + {} + + virtual ~MetricValueUpdate() = default; }; - // The handler function will check the sender address against the subscribed - // topics and if a match is found it will update the value of the metric. - // if no subscribed metric corresponds to the received message, the message - // will just be discarded. + // The handler function will update the value of the subscribed metric + // based on the given topic name. If there is no such metric known, then the + // message will just be discarded. void UpdateMetricValue( const MetricValueUpdate & TheMetricValue, const Address TheMetricTopic ); + // -------------------------------------------------------------------------- + // SLO violations + // -------------------------------------------------------------------------- + // + // The SLO Violation detector publishes an event to indicate that at least + // one of the constraints for the application deployment will be violated in + // the predicted future, and that the search for a new solution should start. + + class SLOViolation + : public TypeByTopic + { + public: + + SLOViolation( void ) + : TypeByTopic( std::string( SLOViolationTopic ) ) + {} + + SLOViolation( const SLOViolation & Other ) + : TypeByTopic( Other ) + {} + + virtual ~SLOViolation() = default; + }; + + // The handler for this message will generate an Application Execution + // Context message to the Solution Manager passing the values of all + // the metrics currently kept by the Metric Updater. + + void SLOViolationHandler( const SLOViolation & SeverityMessage, + const Address TheSLOTopic ); + + // -------------------------------------------------------------------------- + // Constructor and destructor + // -------------------------------------------------------------------------- + // + // The constructor requires the name of the Metric Updater Actor, and the + // actor address of the Solution Manager Actor. It registers the handlers + // for all the message types + +public: + + MetricUpdater( const std::string UpdaterName, + const Address ManagerForSolutions ); + + // The destructor is just the default destructor + + virtual ~MetricUpdater() = default; + }; // Class Metric Updater } // Name space NebulOuS #endif // NEBULOUS_METRIC_UPDATE \ No newline at end of file diff --git a/SolutionManager.hpp b/SolutionManager.hpp index 4d01f10..fb07ee3 100644 --- a/SolutionManager.hpp +++ b/SolutionManager.hpp @@ -250,7 +250,7 @@ SolverManager( const std::string & TheActorName, SolverPool(), ActiveSolvers(), PassiveSolvers(), Contexts(), ContextExecutionQueue() { - // The solvers are created by the expanding the arguments for the solvers + // The solvers are created by expanding the arguments for the solvers // one by one creating new elements in the solver pool ( SolverPool.emplace_back( SolverArguments ), ... ); diff --git a/Solver.code-workspace b/Solver.code-workspace index 1f301f8..e95c076 100644 --- a/Solver.code-workspace +++ b/Solver.code-workspace @@ -88,7 +88,10 @@ "variant": "cpp", "any": "cpp", "forward_list": "cpp", - "valarray": "cpp" + "valarray": "cpp", + "bitset": "cpp", + "regex": "cpp", + "syncstream": "cpp" }, "gerrit.gitRepo": "/home/GHo/Documents/Code/NebulOuS/Solvers" } diff --git a/Solver.hpp b/Solver.hpp index dcaefa3..f52f8c0 100644 --- a/Solver.hpp +++ b/Solver.hpp @@ -141,11 +141,11 @@ public: const TimePointType MicroSecondTimePoint, const std::string ObjectiveFunctionID, const MetricValueType & TheContext ) - : JSONMessage( MessageIdentifier.data(), - { { ContextIdentifier.data(), TheIdentifier }, - { TimeStamp.data(), MicroSecondTimePoint }, - { ObjectiveFunctionLabel.data(), ObjectiveFunctionID }, - { ExecutionContext.data(), TheContext } } + : JSONMessage( std::string( MessageIdentifier ), + { { std::string( ContextIdentifier ), TheIdentifier }, + { std::string( TimeStamp ), MicroSecondTimePoint }, + { std::string( ObjectiveFunctionLabel ), ObjectiveFunctionID }, + { std::string( ExecutionContext ), TheContext } } ) {} ApplicationExecutionContext( const ApplicationExecutionContext & Other ) @@ -201,12 +201,12 @@ public: const std::string ObjectiveFunctionID, const ObjectiveValuesType & TheObjectiveValues, const MetricValueType & TheContext ) - : JSONMessage( MessageIdentifier.data() , - { { ContextIdentifier.data(), TheIdentifier }, - { TimeStamp.data(), MicroSecondTimePoint }, - { ObjectiveFunctionLabel.data(), ObjectiveFunctionID }, - { ObjectiveValues.data(), TheObjectiveValues }, - { ExecutionContext.data(), TheContext } } ) + : JSONMessage( std::string( MessageIdentifier ) , + { { std::string( ContextIdentifier ), TheIdentifier }, + { std::string( TimeStamp ), MicroSecondTimePoint }, + { std::string( ObjectiveFunctionLabel ), ObjectiveFunctionID }, + { std::string( ObjectiveValues ) , TheObjectiveValues }, + { std::string( ExecutionContext ), TheContext } } ) {} Solution() = delete; @@ -232,7 +232,7 @@ public: std::string_view MessageIdentifier = "Solver::OptimisationProblem"; OptimisationProblem( const JSON & TheProblem ) - : JSONMessage( MessageIdentifier.data(), TheProblem ) + : JSONMessage( std::string( MessageIdentifier ), TheProblem ) {} OptimisationProblem() = delete;