Tested and validated (app ID filtering removed as it does not work in the EXE middleware yet)

Change-Id: If7ecb996fecba1ed1cbe78a633b38659a4b27c6f
This commit is contained in:
Geir Horn 2024-02-14 17:53:39 +01:00
parent 294eb0e62f
commit 4486393c02
6 changed files with 57 additions and 102 deletions

View File

@ -16,6 +16,7 @@
"compilerPath": "/usr/bin/g++", "compilerPath": "/usr/bin/g++",
"compilerArgs": [ "compilerArgs": [
"-std=c++23", "-std=c++23",
"-I/usr/include/",
"-I/home/GHo/Documents/Code/Theron++/", "-I/home/GHo/Documents/Code/Theron++/",
"-I/home/GHo/Documents/Code/CxxOpts/include", "-I/home/GHo/Documents/Code/CxxOpts/include",
"-I/opt/AMPL/amplapi/include/" "-I/opt/AMPL/amplapi/include/"

View File

@ -43,7 +43,7 @@ std::string AMPLSolver::SaveFile( const JSON & TheMessage,
if( ProblemFile.is_open() ) if( ProblemFile.is_open() )
{ {
ProblemFile << TheMessage.at( AMPLSolver::FileContent ); ProblemFile << TheMessage.at( AMPLSolver::FileContent ).get<std::string>();
ProblemFile.close(); ProblemFile.close();
return TheFileName; return TheFileName;
} }
@ -156,11 +156,6 @@ void AMPLSolver::SetAMPLParameter( const std::string & ParameterName,
void AMPLSolver::DefineProblem(const Solver::OptimisationProblem & TheProblem, void AMPLSolver::DefineProblem(const Solver::OptimisationProblem & TheProblem,
const Address TheOracle) const Address TheOracle)
{ {
Theron::ConsoleOutput Output;
Output << "AMPL Solver received the AMPL problem: " << std::endl
<< TheProblem.dump(2)
<< std::endl;
// First storing the AMPL problem file from its definition in the message // First storing the AMPL problem file from its definition in the message
// and read the file back to the AMPL interpreter. // and read the file back to the AMPL interpreter.
@ -200,8 +195,6 @@ void AMPLSolver::DefineProblem(const Solver::OptimisationProblem & TheProblem,
SetAMPLParameter( ConstantName, SetAMPLParameter( ConstantName,
ConstantRecord.at( InitialConstantValue ) ); ConstantRecord.at( InitialConstantValue ) );
} }
Output << "Problem loaded!" << std::endl;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
@ -217,8 +210,6 @@ void AMPLSolver::DataFileUpdate( const DataFileMessage & TheDataFile,
const Address TheOracle ) const Address TheOracle )
{ {
ProblemDefinition.readData( SaveFile( TheDataFile ) ); ProblemDefinition.readData( SaveFile( TheDataFile ) );
//Theron::ConsoleOutput Output;
//Output << "Message recevied is: " << std::endl << TheDataFile.dump(2) << std::endl;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------

View File

@ -59,14 +59,14 @@ void MetricUpdater::AddMetricSubscription( const MetricTopic & TheMetrics,
// some of them may correspond to known metrics, some of them may // some of them may correspond to known metrics, some of them may
// correspond to metrics that are new. // correspond to metrics that are new.
std::set< std::string > MetricNames; std::set< std::string > TheMetricNames;
for (auto & MetricRecord : TheMetrics.at( NebulOuS::MetricList ) ) for (auto & MetricRecord : TheMetrics.at( NebulOuS::MetricList ) )
{ {
auto [ MetricRecordPointer, MetricAdded ] = MetricValues.try_emplace( auto [ MetricRecordPointer, MetricAdded ] = MetricValues.try_emplace(
MetricRecord.at( NebulOuS::MetricName ), JSON() ); MetricRecord.at( NebulOuS::MetricName ).get<std::string>(), JSON() );
MetricNames.insert( MetricRecordPointer->first ); TheMetricNames.insert( MetricRecordPointer->first );
if( MetricAdded ) if( MetricAdded )
Send( Theron::AMQ::NetworkLayer::TopicSubscription( Send( Theron::AMQ::NetworkLayer::TopicSubscription(
@ -81,7 +81,7 @@ void MetricUpdater::AddMetricSubscription( const MetricTopic & TheMetrics,
// should be unsubcribed and their metric records removed. // should be unsubcribed and their metric records removed.
for( const auto & TheMetric : std::views::keys( MetricValues ) ) for( const auto & TheMetric : std::views::keys( MetricValues ) )
if( !MetricName.contains( TheMetric ) ) if( !TheMetricNames.contains( TheMetric ) )
{ {
Send( Theron::AMQ::NetworkLayer::TopicSubscription( Send( Theron::AMQ::NetworkLayer::TopicSubscription(
Theron::AMQ::NetworkLayer::TopicSubscription::Action::CloseSubscription, Theron::AMQ::NetworkLayer::TopicSubscription::Action::CloseSubscription,
@ -136,17 +136,10 @@ void MetricUpdater::AddMetricSubscription( const MetricTopic & TheMetrics,
void MetricUpdater::UpdateMetricValue( void MetricUpdater::UpdateMetricValue(
const MetricValueUpdate & TheMetricValue, const Address TheMetricTopic) const MetricValueUpdate & TheMetricValue, const Address TheMetricTopic)
{ {
Theron::ConsoleOutput Output;
Output << "Metric topic: " << TheMetricTopic.AsString() << std::endl;
Theron::AMQ::TopicName TheTopic Theron::AMQ::TopicName TheTopic
= TheMetricTopic.AsString().erase( 0, = TheMetricTopic.AsString().erase( 0,
NebulOuS::MetricValueRootString.size() ); NebulOuS::MetricValueRootString.size() );
Output << "The metric: " << TheTopic << " has new value "
<< TheMetricValue[ NebulOuS::ValueLabel ] << std::endl;
if( MetricValues.contains( TheTopic ) ) if( MetricValues.contains( TheTopic ) )
{ {
MetricValues.at( TheTopic ) = TheMetricValue[ NebulOuS::ValueLabel ]; MetricValues.at( TheTopic ) = TheMetricValue[ NebulOuS::ValueLabel ];

View File

@ -83,11 +83,6 @@ public:
// object. The attributes expected are defined as constant strings so that // object. The attributes expected are defined as constant strings so that
// the actual textual representation can be changed without changing the code // the actual textual representation can be changed without changing the code
// //
// "Identifier" It can be anything corresponding to the need of the sender
// and returned to the sender with the found solution
static constexpr std::string_view ContextIdentifier = "Identifier";
// "Timestamp" : This is the field giving the implicit order of the // "Timestamp" : This is the field giving the implicit order of the
// different application execution execution contexts waiting for being // different application execution execution contexts waiting for being
// solved when there are more requests than there are solvers available to // solved when there are more requests than there are solvers available to

View File

@ -59,7 +59,7 @@ endpoint is set to some unique identifier of the application for which this
solver is used, e.g., solver is used, e.g.,
./SolverComponent --AMPLDir /opt/AMPL \ ./SolverComponent --AMPLDir /opt/AMPL \
--Endpoint f81ee-b42a8-a13d56-e28ec9-2f5578 --ModelDir AMPLTest/ --ModelDir AMPLTest/ --Endpoint f81ee-b42a8-a13d56-e28ec9-2f5578
Debugging after a coredump Debugging after a coredump
@ -228,31 +228,31 @@ int main( int NumberOfCLIOptions, char ** CLIOptionStrings )
// example for an earlier Proton version (0.32.0) and the example at // example for an earlier Proton version (0.32.0) and the example at
// https://qpid.apache.org/releases/qpid-proton-0.32.0/proton/cpp/examples/selected_recv.cpp.html // https://qpid.apache.org/releases/qpid-proton-0.32.0/proton/cpp/examples/selected_recv.cpp.html
virtual proton::receiver_options ReceiverOptions( void ) const override // virtual proton::receiver_options ReceiverOptions( void ) const override
{ // {
proton::source::filter_map TheFilter; // proton::source::filter_map TheFilter;
proton::source_options TheSourceOptions; // proton::source_options TheSourceOptions;
proton::symbol FilterKey("selector"); // proton::symbol FilterKey("selector");
proton::value FilterValue; // proton::value FilterValue;
proton::codec::encoder EncodedFilter( FilterValue ); // proton::codec::encoder EncodedFilter( FilterValue );
proton::receiver_options TheOptions( // proton::receiver_options TheOptions(
Theron::AMQ::NetworkLayer::AMQProperties::ReceiverOptions() ); // Theron::AMQ::NetworkLayer::AMQProperties::ReceiverOptions() );
std::ostringstream SelectorString; // std::ostringstream SelectorString;
SelectorString << "application = '" << ApplicationID << "'"; // SelectorString << "application = '" << ApplicationID << "'";
EncodedFilter << proton::codec::start::described() // EncodedFilter << proton::codec::start::described()
<< proton::symbol("apache.org:selector-filter:string") // << proton::symbol("apache.org:selector-filter:string")
<< SelectorString.str() // << SelectorString.str()
<< proton::codec::finish(); // << proton::codec::finish();
TheFilter.put( FilterKey, FilterValue ); // TheFilter.put( FilterKey, FilterValue );
TheSourceOptions.filters( TheFilter ); // TheSourceOptions.filters( TheFilter );
TheOptions.source( TheSourceOptions ); // TheOptions.source( TheSourceOptions );
return TheOptions; // return TheOptions;
} // }
// The application identifier must also be provided in every message to // The application identifier must also be provided in every message to
// allow other receivers to filter on this. // allow other receivers to filter on this.

View File

@ -120,17 +120,10 @@ private:
// //
// The contexts are dispatched in time sorted order. However, the time // The contexts are dispatched in time sorted order. However, the time
// to solve a problem depends on the complexity of the the context and the // to solve a problem depends on the complexity of the the context and the
// results may therefore become available out-of-order. Each application // results may therefore become available out-of-order.
// execution context should carry a unique identifier, and this is used as
// the index key for quickly finding the right execution context. There is
// a second view of the queue of application context where the identifiers
// are sorted based on their time stamp.
std::unordered_map< Solver::ContextIdentifierType, std::multimap< Solver::TimePointType,
Solver:: ApplicationExecutionContext > Contexts; Solver::ApplicationExecutionContext > ContextQueue;
std::multimap< Solver::TimePointType, Solver::ContextIdentifierType >
ContextExecutionQueue;
// When the new applicaton execution context message arrives, it will be // When the new applicaton execution context message arrives, it will be
// queued, and its time point recoreded. If there are passive solvers, // queued, and its time point recoreded. If there are passive solvers,
@ -138,77 +131,54 @@ private:
// time order. Essentially, it implements a 'riffle' for the passive solvers // time order. Essentially, it implements a 'riffle' for the passive solvers
// and the pending contexts.The issue is that there are likely different // and the pending contexts.The issue is that there are likely different
// cardinalities of the two sets, and the solvers should be marked as // cardinalities of the two sets, and the solvers should be marked as
// active after the dispatch and the context identifiers should be // active after the dispatch and the contexts should be removed from the
// removed from the queue after the dispatch. // queue after the dispatch.
void DispatchToSolvers( void ) void DispatchToSolvers( void )
{ {
if( !PassiveSolvers.empty() && !ContextExecutionQueue.empty() ) if( !PassiveSolvers.empty() && !ContextQueue.empty() )
{ {
for( const auto & [ SolverAddress, ContextElement ] : for( const auto & [ SolverAddress, ContextElement ] :
std::ranges::views::zip( PassiveSolvers, ContextExecutionQueue ) ) std::ranges::views::zip( PassiveSolvers, ContextQueue ) )
Send( Contexts.at( ContextElement.second ), SolverAddress ); Send( ContextElement.second, SolverAddress );
// The number of contexts dispatched must equal the minimum of the // The number of contexts dispatched must equal the minimum of the
// available solvers and the available contexts. // available solvers and the available contexts.
std::size_t DispatchedContexts std::size_t DispatchedContexts
= std::min( PassiveSolvers.size(), ContextExecutionQueue.size() ); = std::min( PassiveSolvers.size(), ContextQueue.size() );
// Then move the passive solver addresses used to active solver addresses // Then move the passive solver addresses used to active solver addresses
std::ranges::move( std::ranges::move(
std::ranges::subrange( PassiveSolvers.begin(), std::ranges::subrange( PassiveSolvers.begin(),
std::ranges::next( PassiveSolvers.begin(), DispatchedContexts, std::ranges::next( PassiveSolvers.begin(),
DispatchedContexts,
PassiveSolvers.end() ) ), PassiveSolvers.end() ) ),
std::inserter( ActiveSolvers, ActiveSolvers.end() ) ); std::inserter( ActiveSolvers, ActiveSolvers.end() ) );
// Then the dispatched context identifiers are removed from queue // Then the dispatched context identifiers are removed from queue
ContextExecutionQueue.erase( ContextExecutionQueue.begin(), ContextQueue.erase( ContextQueue.begin(),
std::ranges::next( ContextExecutionQueue.begin(), DispatchedContexts, std::ranges::next( ContextQueue.begin(),
ContextExecutionQueue.end() ) ); DispatchedContexts,
ContextQueue.end() ) );
} }
} }
// The handler function simply enqueues the received context, records its // The handler function simply enqueues the received context, records its
// timesamp and dispatch as many contexts as possible to the solvers. Note // timesamp and dispatch as many contexts as possible to the solvers.
// that the context identifiers must be unique and there is a logic error
// if there is already a context with the same identifier. Then an invalid
// arguemtn exception will be thrown. This strategy should be reconsidered
// if there will be multiple entities firing execution contexts.
void HandleApplicationExecutionContext( void HandleApplicationExecutionContext(
const Solver:: ApplicationExecutionContext & TheContext, const Solver:: ApplicationExecutionContext & TheContext,
const Address TheRequester ) const Address TheRequester )
{ {
auto [_, Success] = Contexts.try_emplace( ContextQueue.emplace(
TheContext[ Solver::ContextIdentifier.data() ], TheContext ); TheContext.at( Solver::TimeStamp ).get< Solver::TimePointType >(),
TheContext );
if( Success )
{
ContextExecutionQueue.emplace(
TheContext[ Solver::TimeStamp.data() ],
TheContext[ Solver::ContextIdentifier.data() ] );
DispatchToSolvers(); DispatchToSolvers();
} }
else
{
std::source_location Location = std::source_location::current();
std::ostringstream ErrorMessage;
ErrorMessage << "[" << Location.file_name() << " at line "
<< Location.line()
<< "in function " << Location.function_name() <<"] "
<< "An Application Execution Context with identifier "
<< TheContext[ Solver::ContextIdentifier.data() ]
<< " was received while there is already one with the same "
<< "identifer. The identifiers must be unique!";
throw std::invalid_argument( ErrorMessage.str() );
}
}
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
// Solutions // Solutions
@ -262,7 +232,7 @@ public:
SolutionReceiver( SolutionTopic ), SolutionReceiver( SolutionTopic ),
ContextTopic( ContextPublisherTopic ), ContextTopic( ContextPublisherTopic ),
SolverPool(), ActiveSolvers(), PassiveSolvers(), SolverPool(), ActiveSolvers(), PassiveSolvers(),
Contexts(), ContextExecutionQueue() ContextQueue()
{ {
// The solvers are created by expanding the arguments for the solvers // The solvers are created by expanding the arguments for the solvers
// one by one creating new elements in the solver pool. The solvers // one by one creating new elements in the solver pool. The solvers
@ -322,6 +292,11 @@ public:
throw std::invalid_argument( ErrorMessage.str() ); throw std::invalid_argument( ErrorMessage.str() );
} }
// Finally, the handlers for the messages are registered
RegisterHandler(this, &SolverManager::HandleApplicationExecutionContext );
RegisterHandler(this, &SolverManager::PublishSolution );
} }
// The destructor closes all the open topics if the network is still open // The destructor closes all the open topics if the network is still open