First complete version of the Metric Updater

Change-Id: I029afeebaa98af6ae86c95cef4fcc460aea10dbd
This commit is contained in:
Geir Horn 2023-12-29 18:52:13 +01:00
parent 25c4a4fd7d
commit 285b64a8fe
5 changed files with 275 additions and 39 deletions

View File

@ -10,11 +10,11 @@ Contact: Geir.Horn@mn.uio.no
License: MPL2.0 (https://www.mozilla.org/en-US/MPL/2.0/)
==============================================================================*/
#include "ranges" // Better containers
#include <source_location> // Making informative error messages
#include <ranges> // Better containers
#include <source_location> // Informative error messages
#include <sstream> // To format error messages
#include <stdexcept> // standard exceptions
#include <iterator> // Iterator support
#include "Communication/AMQ/AMQEndpoint.hpp" // For Topic subscriptions
@ -42,8 +42,8 @@ void MetricUpdater::AddMetricSubscription( const MetricTopic & TheMetrics,
if( TheMetrics.is_object() )
for( const auto & [MetricName, TopicName] : TheMetrics.items() )
{
auto [ MetricRecord, NewMetric ] = MetricValues.try_emplace( TopicName,
MetricName, JSON() );
auto [ MetricRecord, NewMetric ] = MetricValues.try_emplace(
TopicName, MetricName, JSON() );
if( NewMetric )
Send( Theron::AMQ::NetworkLayer::TopicSubscription(
@ -66,4 +66,117 @@ void MetricUpdater::AddMetricSubscription( const MetricTopic & TheMetrics,
}
}
// The metric update value is received whenever any of subscribed forecasters
// has a new value for its metric. The format of the message is described in
// the project wiki page [1], with an example message given as
// {
// "metricValue": 12.34,
// "level": 1,
// "timestamp": 163532341,
// "probability": 0.98,
// "confidence_interval " : [8,15]
// "predictionTime": 163532342,
// }
//
// Currently only the metric value and the timestamp will be used from this
// record. It would be interesting in the future to explore ways to use the
// confidence interval in some Bayesian resoning about the true value.
//
// The sender address will contain the metric topic, but this will contain the
// generic metric prediction root string, and this string must be removed
// before the metric name can be updated.
void MetricUpdater::UpdateMetricValue(
const MetricValueUpdate & TheMetricValue, const Address TheMetricTopic)
{
Theron::AMQ::TopicName TheTopic
= TheMetricTopic.AsString().erase(0, MetricValueRootString.size() );
if( MetricValues.contains( TheTopic ) )
{
MetricValues[ TheTopic ].Value = TheMetricValue[ NebulOuS::ValueLabel ];
ValidityTime = std::max( ValidityTime,
TheMetricValue[ NebulOuS::TimePoint ].get< Solver::TimePointType >() );
}
}
// When an SLO Violation is predicted a message is received from the SLO
// violation detector and this will trigger the definition of a new
// application execution context and a request to the Solution Manager to
// generate a new solution for this context.
//
// Note that the identifier of the application execution context is defined
// based on the time point of the severity message. The Optimiser controller
// must look for this identifier type on the solutions in order to decide
// which solutions to deploy.
void MetricUpdater::SLOViolationHandler(
const SLOViolation & SeverityMessage, const Address TheSLOTopic )
{
// The application execution context is constructed first
// as it represents the name and the current values of the recorded
// metrics. Note the construction has to be done conditionally based
// on whether the standard library containers supports the range based
// constructors defined for C++23
#ifdef __cpp_lib_containers_ranges
#pragma message("C++23: Range inserters available! Rewrite MetricUpdater.hpp!")
Solver::MetricValueType TheApplicationExecutionContext(
std::views::transform( MetricValues, [](const auto & MetricRecord){
return std::make_pair( MetricRecord.second.OptimisationName,
MetricRecord.second.Value );
}) );
#else
Solver::MetricValueType TheApplicationExecutionContext;
for( const auto & [_, MetricRecord ] : MetricValues )
TheApplicationExecutionContext.emplace( MetricRecord.OptimisationName,
MetricRecord.Value );
#endif
// The application context can then be sent to the solution manager
// using the corresponding message, and the time stamp of the severity
// message,
Send( Solver::ApplicationExecutionContext(
SeverityMessage[ NebulOuS::SLOIdentifier ],
SeverityMessage[ NebulOuS::TimePoint ].get< Solver::TimePointType >(),
SeverityMessage[ NebulOuS::ObjectiveFunctionName ],
TheApplicationExecutionContext
), TheSolutionManger );
}
// The constructor initialises the base classes and sets the validity time
// to zero so that it will be initialised by the first metric values received.
// The message handlers are registered, and the the updater will then subscribe
// to the two topics published by the Optimisation Controller: One for the
// initial message defining the metrics and the associated topics to subscribe
// to for their values, and the second for receiving the SLO violation message.
MetricUpdater::MetricUpdater( const std::string UpdaterName,
const Address ManagerForSolutions )
: Actor( UpdaterName ),
StandardFallbackHandler( Actor::GetAddress().AsString() ),
NetworkingActor( Actor::GetAddress().AsString() ),
MetricValues(), ValidityTime(0), TheSolutionManger( ManagerForSolutions )
{
RegisterHandler( this, &MetricUpdater::AddMetricSubscription );
RegisterHandler( this, &MetricUpdater::UpdateMetricValue );
RegisterHandler( this, &MetricUpdater::SLOViolationHandler );
Send( Theron::AMQ::NetworkLayer::TopicSubscription(
Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription,
std::string( MetricSubscriptions ) ),
Theron::Network::GetAddress( Theron::Network::Layer::Session ) );
Send( Theron::AMQ::NetworkLayer::TopicSubscription(
Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription,
std::string( SLOViolationTopic ) ),
Theron::Network::GetAddress( Theron::Network::Layer::Session ) );
}
} // End name space NebulOuS

View File

@ -62,30 +62,71 @@ using JSON = nlohmann::json; // Short form name space
#include "Communication/AMQ/AMQEndpoint.hpp" // AMQ endpoint
#include "Communication/AMQ/AMQSessionLayer.hpp" // For topic subscriptions
// NebulOuS files
#include "Solver.hpp" // The generic solver base
namespace NebulOuS
{
/*==============================================================================
Basic interface definitions
==============================================================================*/
//
// Definitions for the terminology to facilitate changing the lables of the
// various message labels without changing the code. The definitions are
// compile time constants and as such should not lead to any run-time overhead.
// The JSON attribute names may be found under the "Predicted monitoring
// metrics" section on the Wiki page [1].
constexpr std::string_view ValueLabel{ "metricValue" };
constexpr std::string_view TimePoint { "predictionTime" };
constexpr std::string_view ValueLabel = "metricValue";
constexpr std::string_view TimePoint = "predictionTime";
// The topic used for receiving the message(s) defining the metrics of the
// application execution context as published by the Optimiser Controller is
// defined next.
constexpr std::string_view MetricSubscriptions{ "ApplicationContext" };
constexpr std::string_view MetricSubscriptions = "ApplicationContext";
// The metric value messages will be published on different topics and to
// check if an inbound message is from a metric value topic, it is necessary
// to test against the base string for the metric value topics according to
// the Wiki-page at
// https://openproject.nebulouscloud.eu/projects/nebulous-collaboration-hub/wiki/monitoringdata-interface
// the Wiki-page [1]
constexpr std::string_view MetricValueRootString{
"eu.nebulouscloud.monitoring.predicted"
};
constexpr std::string_view MetricValueRootString
= "eu.nebulouscloud.monitoring.predicted";
// The SLO violation detector will publish a message when a reconfiguration is
// deamed necessary for a future time point called "Event type V" on the wiki
// page [3]. The event contains a probability for at least one of the SLOs
// being violated at the predicted time point. It is not clear if the assessment
// is being made by the SLO violation detector at every new metric prediction,
// or if this event is only signalled when the probability is above some
// internal threshold of the SLO violation detector.
//
// The current implementation assumes that the latter is the case, and hence
// just receiving the message indicates that a new application configuration
// should be found given the application execution context as predicted by the
// metric values recorded by the Metric Updater. Should this assumption be
// wrong, the probability must be compared with some user set threshold for
// each message, and to cater for this the probability field will always be
// compared to a threshold, currently set to zero to ensure that every event
// message will trigger a reconfiguration.
//
// However, the Metric updater will get this message from the Optimiser
// Controller component only if an update must be made. The message must
// contain a unique identifier, a time point for the solution, and the objective
// function to be maximised.
constexpr std::string_view SLOIdentifier = "Identifier";
constexpr std::string_view ObjectiveFunctionName = "ObjectiveFunction";
// The messages from the Optimizer Controller will be sent on a topic that
// should follow some standard topic convention.
constexpr std::string_view SLOViolationTopic
= "eu.nebulouscloud.optimiser.slo.violation";
/*==============================================================================
@ -117,8 +158,6 @@ private:
// they arrive as JSON values this avoids converting the values on input and
// output. The metric optimisation name is just a string.
private:
class MetricValueRecord
{
public:
@ -142,6 +181,26 @@ private:
std::unordered_map< Theron::AMQ::TopicName, MetricValueRecord > MetricValues;
// The metric values should ideally be forecasted for the same future time
// point, but this may not be assured, and as such a zero-order hold is
// assumed for all metric values. This means that the last value received
// for a metric is taken to be valid until the next update. The implication
// is that the whole vector of metric values is valid for the largest time
// point of any of the predictions. Hence, the largest prediction time point
// must be stored for being able to associate a time point of validity to
// the retruned metric vector.
Solver::TimePointType ValidityTime;
// When an SLO violation message is received the current vector of metric
// values should be sent as an application execution context (message) to the
// Solution Manager actor that will invoke a solver to find the optimal
// configuration for this configuration. The Metric Updater must therefore
// know the address of the Solution Manager, and this must be passed to
// the constructor.
const Address TheSolutionManger;
// --------------------------------------------------------------------------
// JSON messages: Type by topic
// --------------------------------------------------------------------------
@ -209,9 +268,14 @@ private:
public:
MetricTopic( void )
: TypeByTopic( MetricSubscriptions.data() )
: TypeByTopic( std::string( MetricSubscriptions ) )
{}
MetricTopic( const MetricTopic & Other )
: TypeByTopic( Other )
{}
virtual ~MetricTopic() = default;
};
// The handler for this message will check each attribute value of the
@ -226,6 +290,10 @@ private:
// Metric values
// --------------------------------------------------------------------------
//
// The metric value message is defined as a topic message where the message
// identifier is the root of the metric value topic name string. This is
// identical to a wildcard operation matching all topics whose name start
// with this string.
class MetricValueUpdate
: public TypeByTopic
@ -233,19 +301,71 @@ private:
public:
MetricValueUpdate( void )
: TypeByTopic( MetricValueRootString.data() )
: TypeByTopic( std::string( MetricValueRootString ) )
{}
MetricValueUpdate( const MetricValueUpdate & Other )
: TypeByTopic( Other )
{}
virtual ~MetricValueUpdate() = default;
};
// The handler function will check the sender address against the subscribed
// topics and if a match is found it will update the value of the metric.
// if no subscribed metric corresponds to the received message, the message
// will just be discarded.
// The handler function will update the value of the subscribed metric
// based on the given topic name. If there is no such metric known, then the
// message will just be discarded.
void UpdateMetricValue( const MetricValueUpdate & TheMetricValue,
const Address TheMetricTopic );
// --------------------------------------------------------------------------
// SLO violations
// --------------------------------------------------------------------------
//
// The SLO Violation detector publishes an event to indicate that at least
// one of the constraints for the application deployment will be violated in
// the predicted future, and that the search for a new solution should start.
class SLOViolation
: public TypeByTopic
{
public:
SLOViolation( void )
: TypeByTopic( std::string( SLOViolationTopic ) )
{}
SLOViolation( const SLOViolation & Other )
: TypeByTopic( Other )
{}
virtual ~SLOViolation() = default;
};
// The handler for this message will generate an Application Execution
// Context message to the Solution Manager passing the values of all
// the metrics currently kept by the Metric Updater.
void SLOViolationHandler( const SLOViolation & SeverityMessage,
const Address TheSLOTopic );
// --------------------------------------------------------------------------
// Constructor and destructor
// --------------------------------------------------------------------------
//
// The constructor requires the name of the Metric Updater Actor, and the
// actor address of the Solution Manager Actor. It registers the handlers
// for all the message types
public:
MetricUpdater( const std::string UpdaterName,
const Address ManagerForSolutions );
// The destructor is just the default destructor
virtual ~MetricUpdater() = default;
}; // Class Metric Updater
} // Name space NebulOuS
#endif // NEBULOUS_METRIC_UPDATE

View File

@ -250,7 +250,7 @@ SolverManager( const std::string & TheActorName,
SolverPool(), ActiveSolvers(), PassiveSolvers(),
Contexts(), ContextExecutionQueue()
{
// The solvers are created by the expanding the arguments for the solvers
// The solvers are created by expanding the arguments for the solvers
// one by one creating new elements in the solver pool
( SolverPool.emplace_back( SolverArguments ), ... );

View File

@ -88,7 +88,10 @@
"variant": "cpp",
"any": "cpp",
"forward_list": "cpp",
"valarray": "cpp"
"valarray": "cpp",
"bitset": "cpp",
"regex": "cpp",
"syncstream": "cpp"
},
"gerrit.gitRepo": "/home/GHo/Documents/Code/NebulOuS/Solvers"
}

View File

@ -141,11 +141,11 @@ public:
const TimePointType MicroSecondTimePoint,
const std::string ObjectiveFunctionID,
const MetricValueType & TheContext )
: JSONMessage( MessageIdentifier.data(),
{ { ContextIdentifier.data(), TheIdentifier },
{ TimeStamp.data(), MicroSecondTimePoint },
{ ObjectiveFunctionLabel.data(), ObjectiveFunctionID },
{ ExecutionContext.data(), TheContext } }
: JSONMessage( std::string( MessageIdentifier ),
{ { std::string( ContextIdentifier ), TheIdentifier },
{ std::string( TimeStamp ), MicroSecondTimePoint },
{ std::string( ObjectiveFunctionLabel ), ObjectiveFunctionID },
{ std::string( ExecutionContext ), TheContext } }
) {}
ApplicationExecutionContext( const ApplicationExecutionContext & Other )
@ -201,12 +201,12 @@ public:
const std::string ObjectiveFunctionID,
const ObjectiveValuesType & TheObjectiveValues,
const MetricValueType & TheContext )
: JSONMessage( MessageIdentifier.data() ,
{ { ContextIdentifier.data(), TheIdentifier },
{ TimeStamp.data(), MicroSecondTimePoint },
{ ObjectiveFunctionLabel.data(), ObjectiveFunctionID },
{ ObjectiveValues.data(), TheObjectiveValues },
{ ExecutionContext.data(), TheContext } } )
: JSONMessage( std::string( MessageIdentifier ) ,
{ { std::string( ContextIdentifier ), TheIdentifier },
{ std::string( TimeStamp ), MicroSecondTimePoint },
{ std::string( ObjectiveFunctionLabel ), ObjectiveFunctionID },
{ std::string( ObjectiveValues ) , TheObjectiveValues },
{ std::string( ExecutionContext ), TheContext } } )
{}
Solution() = delete;
@ -232,7 +232,7 @@ public:
std::string_view MessageIdentifier = "Solver::OptimisationProblem";
OptimisationProblem( const JSON & TheProblem )
: JSONMessage( MessageIdentifier.data(), TheProblem )
: JSONMessage( std::string( MessageIdentifier ), TheProblem )
{}
OptimisationProblem() = delete;