First release
- Added build script and AMPL license file - Fixed merge errors for the makefile - Extended the makefile header - Added initial AMQ message topics - Tested remote build - Removed AMPL license file - Validated build script - Accepting the metric definition message from the EMS - Execution control status messages + solver type command line option - Executing solver component - Added instructions on use to the Solver Component source file - Explicit close of subscriptions in destrcutors if network active. - Correct handling of metric list messages and subscriptions - Adding correct message and connection properties Change-Id: If02caff12aacf8a2181c96eb6dca4a19dc23c118
This commit is contained in:
parent
ae588d8955
commit
44d093d2f8
13
.vscode/c_cpp_properties.json
vendored
13
.vscode/c_cpp_properties.json
vendored
@ -3,19 +3,15 @@
|
||||
{
|
||||
"name": "Linux",
|
||||
"includePath": [
|
||||
"/usr/lib/gcc/x86_64-redhat-linux/13/../../../../include/c++/13",
|
||||
"/home/GHo/Documents/Code/CxxOpts/include",
|
||||
"/home/GHo/Documents/Code/Theron++",
|
||||
"/home/GHo/Documents/Code/Theron++/Utility",
|
||||
"/home/GHo/Documents/Code/Theron++/Communication",
|
||||
"/home/GHo/Documents/Code/Theron++/Communication/AMQ",
|
||||
"/opt/AMPL/amplapi/include/ampl",
|
||||
"${workspaceFolder}/**",
|
||||
"/home/GHo/Documents/Code/Theron++"
|
||||
"/usr/lib/gcc/x86_64-redhat-linux/13/../../../../include/c++/13"
|
||||
],
|
||||
"defines": [],
|
||||
"cStandard": "c23",
|
||||
"intelliSenseMode": "linux-gcc-x64",
|
||||
"intelliSenseMode": "${default}",
|
||||
"compilerPath": "/usr/bin/g++",
|
||||
"compilerArgs": [
|
||||
"-std=c++23",
|
||||
@ -28,12 +24,9 @@
|
||||
"configurationProvider": "ms-vscode.makefile-tools",
|
||||
"browse": {
|
||||
"path": [
|
||||
"/usr/lib/gcc/x86_64-redhat-linux/13/../../../../include/c++/13",
|
||||
"/home/GHo/Documents/Code/Theron++",
|
||||
"/home/GHo/Documents/Code/CxxOpts/include",
|
||||
"/home/GHo/Documents/Code/Theron++/Utility",
|
||||
"/home/GHo/Documents/Code/Theron++/Communication",
|
||||
"/home/GHo/Documents/Code/Theron++/Examples"
|
||||
"/usr/lib/gcc/x86_64-redhat-linux/13/../../../../include/c++/13"
|
||||
],
|
||||
"limitSymbolsToIncludedHeaders": false
|
||||
}
|
||||
|
1
AMPLTest/.gitignore
vendored
Normal file
1
AMPLTest/.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
*.ampl
|
@ -68,7 +68,7 @@ void ExecutionControl::StopMessageHandler( const StopMessage & Command,
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// Constructor
|
||||
// Constructor & destructor
|
||||
// -----------------------------------------------------------------------------
|
||||
//
|
||||
// The constructor registers the stop message handler and sets up a publisher
|
||||
@ -91,4 +91,16 @@ ExecutionControl::ExecutionControl( const std::string & TheActorName )
|
||||
|
||||
}
|
||||
|
||||
// The destructor simply closes the publisher if the network is still active
|
||||
// when the actor closes.
|
||||
|
||||
ExecutionControl::~ExecutionControl( void )
|
||||
{
|
||||
if( HasNetwork() )
|
||||
Send( Theron::AMQ::NetworkLayer::TopicSubscription(
|
||||
Theron::AMQ::NetworkLayer::TopicSubscription::Action::ClosePublisher,
|
||||
std::string( StatusTopic )
|
||||
), GetSessionLayerAddress() );
|
||||
}
|
||||
|
||||
} // namespace NebulOuS
|
@ -167,7 +167,7 @@ public:
|
||||
ExecutionControl( const std::string & TheActorName );
|
||||
|
||||
ExecutionControl() = delete;
|
||||
virtual ~ExecutionControl() = default;
|
||||
virtual ~ExecutionControl();
|
||||
};
|
||||
|
||||
} // namespace NebulOuS
|
||||
|
@ -18,6 +18,7 @@ License: MPL2.0 (https://www.mozilla.org/en-US/MPL/2.0/)
|
||||
#include <ranges> // Container ranges
|
||||
#include <algorithm> // Algorithms
|
||||
|
||||
#include "Utility/ConsolePrint.hpp" // For logging
|
||||
#include "Communication/AMQ/AMQEndpoint.hpp" // For Topic subscriptions
|
||||
|
||||
#include "MetricUpdater.hpp"
|
||||
@ -41,36 +42,34 @@ namespace NebulOuS
|
||||
void MetricUpdater::AddMetricSubscription( const MetricTopic & TheMetrics,
|
||||
const Address OptimiserController )
|
||||
{
|
||||
if( TheMetrics.is_object() &&
|
||||
TheMetrics.at( NebulOuS::MetricList ).is_object() )
|
||||
{
|
||||
JSON MetricList = TheMetrics.at( NebulOuS::MetricList );
|
||||
|
||||
for( const JSON MetricDefinition : MetricList.items() )
|
||||
{
|
||||
auto [ MetricRecord, NewMetric ] = MetricValues.try_emplace(
|
||||
MetricDefinition.at( NebulOuS::MetricName ), JSON() );
|
||||
|
||||
if( NewMetric )
|
||||
Send( Theron::AMQ::NetworkLayer::TopicSubscription(
|
||||
Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription,
|
||||
MetricRecord->first ),
|
||||
Theron::AMQ::Network::GetAddress( Theron::Network::Layer::Session) );
|
||||
}
|
||||
}
|
||||
else
|
||||
if( TheMetrics.is_object() &&
|
||||
TheMetrics.at( NebulOuS::MetricList ).is_array() )
|
||||
{
|
||||
for (auto & MetricRecord : TheMetrics.at( NebulOuS::MetricList ) )
|
||||
{
|
||||
std::source_location Location = std::source_location::current();
|
||||
std::ostringstream ErrorMessage;
|
||||
auto [ MetricRecordPointer, NewMetric ] = MetricValues.try_emplace(
|
||||
MetricRecord.at( NebulOuS::MetricName ), JSON() );
|
||||
|
||||
ErrorMessage << "[" << Location.file_name() << " at line " << Location.line()
|
||||
<< "in function " << Location.function_name() <<"] "
|
||||
<< "The message to define a new metric subscription is given as "
|
||||
<< std::endl << TheMetrics.dump(2) << std::endl
|
||||
<< "this is not as expected!";
|
||||
|
||||
throw std::invalid_argument( ErrorMessage.str() );
|
||||
if( NewMetric )
|
||||
Send( Theron::AMQ::NetworkLayer::TopicSubscription(
|
||||
Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription,
|
||||
std::string( MetricValueRootString ) + MetricRecordPointer->first ),
|
||||
Theron::AMQ::Network::GetAddress( Theron::Network::Layer::Session) );
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
std::source_location Location = std::source_location::current();
|
||||
std::ostringstream ErrorMessage;
|
||||
|
||||
ErrorMessage << "[" << Location.file_name() << " at line " << Location.line()
|
||||
<< " in function " << Location.function_name() <<"] "
|
||||
<< "The message to define a new metric subscription is given as "
|
||||
<< std::endl << TheMetrics.dump(2) << std::endl
|
||||
<< "this is not as expected!";
|
||||
|
||||
throw std::invalid_argument( ErrorMessage.str() );
|
||||
}
|
||||
}
|
||||
|
||||
// The metric update value is received whenever any of subscribed forecasters
|
||||
@ -102,9 +101,16 @@ void MetricUpdater::AddMetricSubscription( const MetricTopic & TheMetrics,
|
||||
void MetricUpdater::UpdateMetricValue(
|
||||
const MetricValueUpdate & TheMetricValue, const Address TheMetricTopic)
|
||||
{
|
||||
Theron::ConsoleOutput Output;
|
||||
|
||||
Output << "Metric topic: " << TheMetricTopic.AsString() << std::endl;
|
||||
|
||||
Theron::AMQ::TopicName TheTopic
|
||||
= TheMetricTopic.AsString().erase( 0,
|
||||
NebulOuS::MetricValueRootString.size() );
|
||||
|
||||
Output << "The metric: " << TheTopic << " has new value "
|
||||
<< TheMetricValue[ NebulOuS::ValueLabel ] << std::endl;
|
||||
|
||||
if( MetricValues.contains( TheTopic ) )
|
||||
{
|
||||
|
@ -88,7 +88,7 @@ constexpr std::string_view TimePoint = "predictionTime";
|
||||
// defined next.
|
||||
|
||||
constexpr std::string_view MetricSubscriptions
|
||||
= "eu.nebulouscloud.monitoring.metric_lists";
|
||||
= "eu.nebulouscloud.monitoring.metric_list";
|
||||
|
||||
// The JSON message attribute for the list of metrics is another JSON object
|
||||
// stored under the following key, see the Event type III defined in
|
||||
@ -211,6 +211,10 @@ private:
|
||||
: JSONTopicMessage( Other )
|
||||
{}
|
||||
|
||||
// MetricTopic( const JSONTopicMessage & Other )
|
||||
// : JSONTopicMessage( Other )
|
||||
// {}
|
||||
|
||||
virtual ~MetricTopic() = default;
|
||||
};
|
||||
|
||||
|
@ -54,6 +54,17 @@ be extended with the AMPL execution file path, e.g.,
|
||||
|
||||
export PATH=$PATH:/opt/AMPL
|
||||
|
||||
The parameters to the application are used as described above, and typically the
|
||||
endpoint is set to some unique identifier of the application for which this
|
||||
solver is used, e.g.,
|
||||
|
||||
./SolverComponent --AMPLDir /opt/AMPL \
|
||||
--Endpoint f81ee-b42a8-a13d56-e28ec9-2f5578 --ModelDir AMPLTest/
|
||||
|
||||
Debugging after a coredump
|
||||
|
||||
coredumpctl debug SolverComponent
|
||||
|
||||
Author and Copyright: Geir Horn, University of Oslo
|
||||
Contact: Geir.Horn@mn.uio.no
|
||||
License: MPL2.0 (https://www.mozilla.org/en-US/MPL/2.0/)
|
||||
@ -66,6 +77,7 @@ License: MPL2.0 (https://www.mozilla.org/en-US/MPL/2.0/)
|
||||
#include <sstream> // To format error messages
|
||||
#include <stdexcept> // standard exceptions
|
||||
#include <filesystem> // Access to the file system
|
||||
#include <map> // For extended AMQ properties
|
||||
|
||||
// Theron++ headers
|
||||
|
||||
@ -78,7 +90,9 @@ License: MPL2.0 (https://www.mozilla.org/en-US/MPL/2.0/)
|
||||
|
||||
// AMQ protocol related headers
|
||||
|
||||
#include "proton/symbol.hpp" // AMQ symbols
|
||||
#include "proton/connection_options.hpp" // Options for the Broker
|
||||
#include "proton/message.hpp" // AMQ messages definitions
|
||||
#include "Communication/AMQ/AMQMessage.hpp" // The AMQP messages
|
||||
#include "Communication/AMQ/AMQEndpoint.hpp" // The AMP endpoint
|
||||
#include "Communication/AMQ/AMQjson.hpp" // Transparent JSON-AMQP
|
||||
@ -127,6 +141,8 @@ int main( int NumberOfCLIOptions, char ** CLIOptionStrings )
|
||||
cxxopts::value<unsigned int>()->default_value("5672") )
|
||||
("S,Solver", "Solver to use, devault Couenne",
|
||||
cxxopts::value<std::string>()->default_value("couenne") )
|
||||
("T,Tenant", "Tenant identifier for messages",
|
||||
cxxopts::value<std::string>()->default_value("TheTenant"))
|
||||
("U,User", "The user name used for the AMQ Broker connection",
|
||||
cxxopts::value<std::string>()->default_value("admin") )
|
||||
("Pw,Password", "The password for the AMQ Broker connection",
|
||||
@ -180,29 +196,51 @@ int main( int NumberOfCLIOptions, char ** CLIOptionStrings )
|
||||
// The AMQ communication is managed by the standard communication actors of
|
||||
// the Theron++ Actor framewokr. Thus, it is just a matter of starting the
|
||||
// endpoint actors with the given command line parameters.
|
||||
//
|
||||
// The network endpoint takes the endpoint name as the first argument, then
|
||||
// the URL for the broker and the port number. The user name and the password
|
||||
// are defined in the AMQ Qpid Proton connection options, and the values are
|
||||
// therefore set for the connection options.
|
||||
|
||||
// There are certain properties defined for the NebulOuS communication
|
||||
// protocol where the endpoint name is taken as the application identifier
|
||||
// and the tenant identifier and the protocol used. The same information
|
||||
// has to be given as different maps for the connection options and for
|
||||
// the message properties.
|
||||
|
||||
std::map< proton::symbol, proton::value > NebulOuSProperties{
|
||||
{"x-tenant-id", CLIValues["Tenant"].as< std::string >() },
|
||||
{"x-application-id", CLIValues["Endpoint"].as< std::string >()},
|
||||
{"x-protocol", "AMQP"}
|
||||
};
|
||||
|
||||
proton::message::property_map MessageProperties{
|
||||
{"x-tenant-id", CLIValues["Tenant"].as< std::string >() },
|
||||
{"x-application-id", CLIValues["Endpoint"].as< std::string >()},
|
||||
{"x-protocol", "AMQP"}
|
||||
};
|
||||
|
||||
// The user name and the password are defined in the AMQ Qpid Proton
|
||||
// connection options, and the values are therefore set for the connection
|
||||
// options.
|
||||
|
||||
proton::connection_options AMQOptions;
|
||||
|
||||
AMQOptions.user( CLIValues["User"].as< std::string >() );
|
||||
AMQOptions.password( CLIValues["Password"].as< std::string >() );
|
||||
|
||||
// Then the network endpoint cna be constructed using the default names for
|
||||
// the various network endpoint servers in order to pass the defined
|
||||
// connection options.
|
||||
AMQOptions.properties( NebulOuSProperties );
|
||||
|
||||
// The network endpoint takes the endpoint name as the first argument, then
|
||||
// the URL for the broker and the port number. Then the network endpoint can
|
||||
// be constructed using the default names for the Session Layer and the
|
||||
// Presentation layer servers, but calling the endpoint for "Solver" to make
|
||||
// it more visible at the AMQ broker listing of subscribers. The endpoint
|
||||
// will be a unique application identifier. The server names are followed
|
||||
// by the defined connection options and the message options.
|
||||
|
||||
Theron::AMQ::NetworkEndpoint AMQNetWork(
|
||||
CLIValues["Endpoint"].as< std::string >(),
|
||||
CLIValues["Broker"].as< std::string >(),
|
||||
CLIValues["Port"].as< unsigned int >(),
|
||||
Theron::AMQ::Network::NetworkLayerLabel,
|
||||
"Solver",
|
||||
Theron::AMQ::Network::SessionLayerLabel,
|
||||
Theron::AMQ::Network::PresentationLayerLabel,
|
||||
AMQOptions
|
||||
AMQOptions, MessageProperties
|
||||
);
|
||||
|
||||
// --------------------------------------------------------------------------
|
||||
|
@ -97,11 +97,12 @@ class SolverManager
|
||||
{
|
||||
// There is a topic name used to publish solutions found by the solvers. This
|
||||
// topic is given to the constructor and kept as a constant during the class
|
||||
// execution.
|
||||
// execution. The same goes for the topic on which application execution
|
||||
// contexts will arrive for processing.
|
||||
|
||||
private:
|
||||
|
||||
const Theron::AMQ::TopicName SolutionReceiver;
|
||||
const Theron::AMQ::TopicName SolutionReceiver, ContextTopic;
|
||||
|
||||
// --------------------------------------------------------------------------
|
||||
// Solver management
|
||||
@ -259,6 +260,7 @@ public:
|
||||
NetworkingActor( Actor::GetAddress().AsString() ),
|
||||
ExecutionControl( Actor::GetAddress().AsString() ),
|
||||
SolutionReceiver( SolutionTopic ),
|
||||
ContextTopic( ContextPublisherTopic ),
|
||||
SolverPool(), ActiveSolvers(), PassiveSolvers(),
|
||||
Contexts(), ContextExecutionQueue()
|
||||
{
|
||||
@ -322,6 +324,25 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
// The destructor closes all the open topics if the network is still open
|
||||
// when the destructor is invoked.
|
||||
|
||||
virtual ~SolverManager( void )
|
||||
{
|
||||
if( HasNetwork() )
|
||||
{
|
||||
Send( Theron::AMQ::NetworkLayer::TopicSubscription(
|
||||
Theron::AMQ::NetworkLayer::TopicSubscription::Action::ClosePublisher,
|
||||
SolutionReceiver
|
||||
), GetSessionLayerAddress() );
|
||||
|
||||
Send( Theron::AMQ::NetworkLayer::TopicSubscription(
|
||||
Theron::AMQ::NetworkLayer::TopicSubscription::Action::CloseSubscription,
|
||||
ContextTopic
|
||||
), GetSessionLayerAddress() );
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
} // namespace NebulOuS
|
||||
|
4
makefile
4
makefile
@ -132,7 +132,7 @@ SOLVER_OBJECTS = $(addprefix $(OBJECTS_DIR)/, $(SOLVER_SOURCE:.cpp=.o) )
|
||||
# component's objective file, they can be built by a general rule
|
||||
|
||||
$(OBJECTS_DIR)/%.o : %.cpp
|
||||
$(CXX) $(CXXFLAGS) -c $< -o $@ $(INCLUDE_DIRECTORIES)
|
||||
$(CC) $(CXXFLAGS) -c $< -o $@ $(INCLUDE_DIRECTORIES)
|
||||
|
||||
#------------------------------------------------------------------------------
|
||||
# Solver component
|
||||
@ -143,7 +143,7 @@ $(OBJECTS_DIR)/%.o : %.cpp
|
||||
# the object files or the solver actors.
|
||||
|
||||
SolverComponent: $(SOLVER_OBJECTS) $(THERON)/Theron++.a
|
||||
$(CXX) -o SolverComponent $(CXXFLAGS) $(SOLVER_OBJECTS) $(LDFLAGS)
|
||||
$(CC) -o SolverComponent $(CXXFLAGS) $(SOLVER_OBJECTS) $(LDFLAGS)
|
||||
|
||||
# There is also a standard target to clean the automatically generated build
|
||||
# files
|
||||
|
Loading…
Reference in New Issue
Block a user