From 4486393c0292ffd3266e61e10deffa556202cd57 Mon Sep 17 00:00:00 2001 From: Geir Horn Date: Wed, 14 Feb 2024 17:53:39 +0100 Subject: [PATCH] Tested and validated (app ID filtering removed as it does not work in the EXE middleware yet) Change-Id: If7ecb996fecba1ed1cbe78a633b38659a4b27c6f --- .vscode/c_cpp_properties.json | 1 + AMPLSolver.cpp | 11 +---- MetricUpdater.cpp | 15 ++----- Solver.hpp | 5 --- SolverComponent.cpp | 42 ++++++++--------- SolverManager.hpp | 85 +++++++++++++---------------------- 6 files changed, 57 insertions(+), 102 deletions(-) diff --git a/.vscode/c_cpp_properties.json b/.vscode/c_cpp_properties.json index d6b4621..18f837d 100644 --- a/.vscode/c_cpp_properties.json +++ b/.vscode/c_cpp_properties.json @@ -16,6 +16,7 @@ "compilerPath": "/usr/bin/g++", "compilerArgs": [ "-std=c++23", + "-I/usr/include/", "-I/home/GHo/Documents/Code/Theron++/", "-I/home/GHo/Documents/Code/CxxOpts/include", "-I/opt/AMPL/amplapi/include/" diff --git a/AMPLSolver.cpp b/AMPLSolver.cpp index 0e6ae8e..30ad9b6 100644 --- a/AMPLSolver.cpp +++ b/AMPLSolver.cpp @@ -43,7 +43,7 @@ std::string AMPLSolver::SaveFile( const JSON & TheMessage, if( ProblemFile.is_open() ) { - ProblemFile << TheMessage.at( AMPLSolver::FileContent ); + ProblemFile << TheMessage.at( AMPLSolver::FileContent ).get(); ProblemFile.close(); return TheFileName; } @@ -156,11 +156,6 @@ void AMPLSolver::SetAMPLParameter( const std::string & ParameterName, void AMPLSolver::DefineProblem(const Solver::OptimisationProblem & TheProblem, const Address TheOracle) { - Theron::ConsoleOutput Output; - Output << "AMPL Solver received the AMPL problem: " << std::endl - << TheProblem.dump(2) - << std::endl; - // First storing the AMPL problem file from its definition in the message // and read the file back to the AMPL interpreter. @@ -200,8 +195,6 @@ void AMPLSolver::DefineProblem(const Solver::OptimisationProblem & TheProblem, SetAMPLParameter( ConstantName, ConstantRecord.at( InitialConstantValue ) ); } - - Output << "Problem loaded!" << std::endl; } // ----------------------------------------------------------------------------- @@ -217,8 +210,6 @@ void AMPLSolver::DataFileUpdate( const DataFileMessage & TheDataFile, const Address TheOracle ) { ProblemDefinition.readData( SaveFile( TheDataFile ) ); - //Theron::ConsoleOutput Output; - //Output << "Message recevied is: " << std::endl << TheDataFile.dump(2) << std::endl; } // ----------------------------------------------------------------------------- diff --git a/MetricUpdater.cpp b/MetricUpdater.cpp index 19e8b38..cefda49 100644 --- a/MetricUpdater.cpp +++ b/MetricUpdater.cpp @@ -59,14 +59,14 @@ void MetricUpdater::AddMetricSubscription( const MetricTopic & TheMetrics, // some of them may correspond to known metrics, some of them may // correspond to metrics that are new. - std::set< std::string > MetricNames; + std::set< std::string > TheMetricNames; for (auto & MetricRecord : TheMetrics.at( NebulOuS::MetricList ) ) { auto [ MetricRecordPointer, MetricAdded ] = MetricValues.try_emplace( - MetricRecord.at( NebulOuS::MetricName ), JSON() ); + MetricRecord.at( NebulOuS::MetricName ).get(), JSON() ); - MetricNames.insert( MetricRecordPointer->first ); + TheMetricNames.insert( MetricRecordPointer->first ); if( MetricAdded ) Send( Theron::AMQ::NetworkLayer::TopicSubscription( @@ -81,7 +81,7 @@ void MetricUpdater::AddMetricSubscription( const MetricTopic & TheMetrics, // should be unsubcribed and their metric records removed. for( const auto & TheMetric : std::views::keys( MetricValues ) ) - if( !MetricName.contains( TheMetric ) ) + if( !TheMetricNames.contains( TheMetric ) ) { Send( Theron::AMQ::NetworkLayer::TopicSubscription( Theron::AMQ::NetworkLayer::TopicSubscription::Action::CloseSubscription, @@ -136,17 +136,10 @@ void MetricUpdater::AddMetricSubscription( const MetricTopic & TheMetrics, void MetricUpdater::UpdateMetricValue( const MetricValueUpdate & TheMetricValue, const Address TheMetricTopic) { - Theron::ConsoleOutput Output; - - Output << "Metric topic: " << TheMetricTopic.AsString() << std::endl; - Theron::AMQ::TopicName TheTopic = TheMetricTopic.AsString().erase( 0, NebulOuS::MetricValueRootString.size() ); - Output << "The metric: " << TheTopic << " has new value " - << TheMetricValue[ NebulOuS::ValueLabel ] << std::endl; - if( MetricValues.contains( TheTopic ) ) { MetricValues.at( TheTopic ) = TheMetricValue[ NebulOuS::ValueLabel ]; diff --git a/Solver.hpp b/Solver.hpp index 1be3d75..fa7059e 100644 --- a/Solver.hpp +++ b/Solver.hpp @@ -83,11 +83,6 @@ public: // object. The attributes expected are defined as constant strings so that // the actual textual representation can be changed without changing the code // - // "Identifier" It can be anything corresponding to the need of the sender - // and returned to the sender with the found solution - - static constexpr std::string_view ContextIdentifier = "Identifier"; - // "Timestamp" : This is the field giving the implicit order of the // different application execution execution contexts waiting for being // solved when there are more requests than there are solvers available to diff --git a/SolverComponent.cpp b/SolverComponent.cpp index e5e436c..0f06968 100644 --- a/SolverComponent.cpp +++ b/SolverComponent.cpp @@ -59,7 +59,7 @@ endpoint is set to some unique identifier of the application for which this solver is used, e.g., ./SolverComponent --AMPLDir /opt/AMPL \ - --Endpoint f81ee-b42a8-a13d56-e28ec9-2f5578 --ModelDir AMPLTest/ + --ModelDir AMPLTest/ --Endpoint f81ee-b42a8-a13d56-e28ec9-2f5578 Debugging after a coredump @@ -228,31 +228,31 @@ int main( int NumberOfCLIOptions, char ** CLIOptionStrings ) // example for an earlier Proton version (0.32.0) and the example at // https://qpid.apache.org/releases/qpid-proton-0.32.0/proton/cpp/examples/selected_recv.cpp.html - virtual proton::receiver_options ReceiverOptions( void ) const override - { - proton::source::filter_map TheFilter; - proton::source_options TheSourceOptions; - proton::symbol FilterKey("selector"); - proton::value FilterValue; - proton::codec::encoder EncodedFilter( FilterValue ); - proton::receiver_options TheOptions( - Theron::AMQ::NetworkLayer::AMQProperties::ReceiverOptions() ); + // virtual proton::receiver_options ReceiverOptions( void ) const override + // { + // proton::source::filter_map TheFilter; + // proton::source_options TheSourceOptions; + // proton::symbol FilterKey("selector"); + // proton::value FilterValue; + // proton::codec::encoder EncodedFilter( FilterValue ); + // proton::receiver_options TheOptions( + // Theron::AMQ::NetworkLayer::AMQProperties::ReceiverOptions() ); - std::ostringstream SelectorString; + // std::ostringstream SelectorString; - SelectorString << "application = '" << ApplicationID << "'"; + // SelectorString << "application = '" << ApplicationID << "'"; - EncodedFilter << proton::codec::start::described() - << proton::symbol("apache.org:selector-filter:string") - << SelectorString.str() - << proton::codec::finish(); + // EncodedFilter << proton::codec::start::described() + // << proton::symbol("apache.org:selector-filter:string") + // << SelectorString.str() + // << proton::codec::finish(); - TheFilter.put( FilterKey, FilterValue ); - TheSourceOptions.filters( TheFilter ); - TheOptions.source( TheSourceOptions ); + // TheFilter.put( FilterKey, FilterValue ); + // TheSourceOptions.filters( TheFilter ); + // TheOptions.source( TheSourceOptions ); - return TheOptions; - } + // return TheOptions; + // } // The application identifier must also be provided in every message to // allow other receivers to filter on this. diff --git a/SolverManager.hpp b/SolverManager.hpp index 362c779..de2406b 100644 --- a/SolverManager.hpp +++ b/SolverManager.hpp @@ -111,7 +111,7 @@ private: // The solution manager dispatches the application execution contexts as // requests for solutions to a pool of solvers. - std::list< SolverType > SolverPool; + std::list< SolverType > SolverPool; std::unordered_set< Address > ActiveSolvers, PassiveSolvers; // -------------------------------------------------------------------------- @@ -120,94 +120,64 @@ private: // // The contexts are dispatched in time sorted order. However, the time // to solve a problem depends on the complexity of the the context and the - // results may therefore become available out-of-order. Each application - // execution context should carry a unique identifier, and this is used as - // the index key for quickly finding the right execution context. There is - // a second view of the queue of application context where the identifiers - // are sorted based on their time stamp. + // results may therefore become available out-of-order. - std::unordered_map< Solver::ContextIdentifierType, - Solver:: ApplicationExecutionContext > Contexts; + std::multimap< Solver::TimePointType, + Solver::ApplicationExecutionContext > ContextQueue; - std::multimap< Solver::TimePointType, Solver::ContextIdentifierType > - ContextExecutionQueue; - // When the new applicaton execution context message arrives, it will be // queued, and its time point recoreded. If there are passive solvers, // the handler will immediately dispatch the contexts to each of these in // time order. Essentially, it implements a 'riffle' for the passive solvers // and the pending contexts.The issue is that there are likely different // cardinalities of the two sets, and the solvers should be marked as - // active after the dispatch and the context identifiers should be - // removed from the queue after the dispatch. + // active after the dispatch and the contexts should be removed from the + // queue after the dispatch. void DispatchToSolvers( void ) { - if( !PassiveSolvers.empty() && !ContextExecutionQueue.empty() ) + if( !PassiveSolvers.empty() && !ContextQueue.empty() ) { for( const auto & [ SolverAddress, ContextElement ] : - std::ranges::views::zip( PassiveSolvers, ContextExecutionQueue ) ) - Send( Contexts.at( ContextElement.second ), SolverAddress ); + std::ranges::views::zip( PassiveSolvers, ContextQueue ) ) + Send( ContextElement.second, SolverAddress ); // The number of contexts dispatched must equal the minimum of the // available solvers and the available contexts. std::size_t DispatchedContexts - = std::min( PassiveSolvers.size(), ContextExecutionQueue.size() ); + = std::min( PassiveSolvers.size(), ContextQueue.size() ); // Then move the passive solver addresses used to active solver addresses std::ranges::move( std::ranges::subrange( PassiveSolvers.begin(), - std::ranges::next( PassiveSolvers.begin(), DispatchedContexts, - PassiveSolvers.end() ) ), + std::ranges::next( PassiveSolvers.begin(), + DispatchedContexts, + PassiveSolvers.end() ) ), std::inserter( ActiveSolvers, ActiveSolvers.end() ) ); // Then the dispatched context identifiers are removed from queue - ContextExecutionQueue.erase( ContextExecutionQueue.begin(), - std::ranges::next( ContextExecutionQueue.begin(), DispatchedContexts, - ContextExecutionQueue.end() ) ); + ContextQueue.erase( ContextQueue.begin(), + std::ranges::next( ContextQueue.begin(), + DispatchedContexts, + ContextQueue.end() ) ); } } // The handler function simply enqueues the received context, records its - // timesamp and dispatch as many contexts as possible to the solvers. Note - // that the context identifiers must be unique and there is a logic error - // if there is already a context with the same identifier. Then an invalid - // arguemtn exception will be thrown. This strategy should be reconsidered - // if there will be multiple entities firing execution contexts. + // timesamp and dispatch as many contexts as possible to the solvers. void HandleApplicationExecutionContext( const Solver:: ApplicationExecutionContext & TheContext, const Address TheRequester ) { - auto [_, Success] = Contexts.try_emplace( - TheContext[ Solver::ContextIdentifier.data() ], TheContext ); + ContextQueue.emplace( + TheContext.at( Solver::TimeStamp ).get< Solver::TimePointType >(), + TheContext ); - if( Success ) - { - ContextExecutionQueue.emplace( - TheContext[ Solver::TimeStamp.data() ], - TheContext[ Solver::ContextIdentifier.data() ] ); - - DispatchToSolvers(); - } - else - { - std::source_location Location = std::source_location::current(); - std::ostringstream ErrorMessage; - - ErrorMessage << "[" << Location.file_name() << " at line " - << Location.line() - << "in function " << Location.function_name() <<"] " - << "An Application Execution Context with identifier " - << TheContext[ Solver::ContextIdentifier.data() ] - << " was received while there is already one with the same " - << "identifer. The identifiers must be unique!"; - - throw std::invalid_argument( ErrorMessage.str() ); - } + DispatchToSolvers(); } // -------------------------------------------------------------------------- @@ -262,7 +232,7 @@ public: SolutionReceiver( SolutionTopic ), ContextTopic( ContextPublisherTopic ), SolverPool(), ActiveSolvers(), PassiveSolvers(), - Contexts(), ContextExecutionQueue() + ContextQueue() { // The solvers are created by expanding the arguments for the solvers // one by one creating new elements in the solver pool. The solvers @@ -318,10 +288,15 @@ public: << boost::core::demangle( typeid( SolverType ).name() ) << " from the given constructor argument types: "; - (( ErrorMessage << boost::core::demangle( typeid( SolverArguments ).name() ) << " " ), ... ); + (( ErrorMessage << boost::core::demangle( typeid( SolverArguments ).name() ) << " " ), ... ); - throw std::invalid_argument( ErrorMessage.str() ); + throw std::invalid_argument( ErrorMessage.str() ); } + + // Finally, the handlers for the messages are registered + + RegisterHandler(this, &SolverManager::HandleApplicationExecutionContext ); + RegisterHandler(this, &SolverManager::PublishSolution ); } // The destructor closes all the open topics if the network is still open