EMS: Control Service: Added 'TopicBeacon.transmitBasicMetrics()' to periodically send basic metrics collected from all active clients. Added 'TopicBeaconProperties.basicMetricsTopics' property and updated ems-server.yml
This commit is contained in:
parent
54477507d2
commit
bfa97e9291
@ -225,6 +225,7 @@ beacon.instance-topics = _ui_instance_info
|
||||
beacon.prediction-topics = metrics_to_predict
|
||||
beacon.prediction-rate = 60000
|
||||
beacon.slo-violation-detector-topics = metric.metric_list
|
||||
beacon.basic-metrics-topics = _client_metrics
|
||||
|
||||
|
||||
################################################################################
|
||||
|
@ -231,6 +231,7 @@ beacon:
|
||||
prediction-topics: metrics_to_predict
|
||||
prediction-rate: 60000
|
||||
slo-violation-detector-topics: metric.metric_list
|
||||
basic-metrics-topics: _client_metrics
|
||||
|
||||
################################################################################
|
||||
### Info Service settings
|
||||
|
@ -41,6 +41,7 @@ public class TopicBeaconProperties implements InitializingBean {
|
||||
@Min(1) private long predictionMinAllowedRate = 1;
|
||||
@Min(1) private long predictionMaxAllowedRate = 365*24*3600*1000L;
|
||||
private Set<String> sloViolationDetectorTopics = new HashSet<>();
|
||||
private Set<String> basicMetricsTopics = new HashSet<>();
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() {
|
||||
|
@ -13,6 +13,7 @@ import com.google.gson.*;
|
||||
import com.google.gson.stream.JsonReader;
|
||||
import com.google.gson.stream.JsonToken;
|
||||
import com.google.gson.stream.JsonWriter;
|
||||
import gr.iccs.imu.ems.baguette.server.ClientShellCommand;
|
||||
import gr.iccs.imu.ems.baguette.server.NodeRegistryEntry;
|
||||
import gr.iccs.imu.ems.brokercep.BrokerCepService;
|
||||
import gr.iccs.imu.ems.brokercep.event.EventMap;
|
||||
@ -151,6 +152,7 @@ public class TopicBeacon implements InitializingBean {
|
||||
transmitHeartbeat();
|
||||
transmitThresholdInfo();
|
||||
transmitInstanceInfo();
|
||||
transmitBasicMetrics();
|
||||
|
||||
// Call Beacon plugins
|
||||
beaconPlugins.stream().filter(Objects::nonNull).forEach(plugin -> {
|
||||
@ -211,6 +213,35 @@ public class TopicBeacon implements InitializingBean {
|
||||
}
|
||||
}
|
||||
|
||||
public void transmitBasicMetrics() {
|
||||
if (emptyIfNull(properties.getBasicMetricsTopics()).isEmpty()) return;
|
||||
|
||||
if (coordinator.getBaguetteServer().isServerRunning()) {
|
||||
for (ClientShellCommand csc : ClientShellCommand.getActive()) {
|
||||
Map<String, Object> stats = csc.getClientStatistics();
|
||||
try {
|
||||
String clientId = csc.getClientId();
|
||||
String clientIpAddress = csc.getClientIpAddress();
|
||||
if (StringUtils.isBlank(clientId) || StringUtils.isBlank(clientIpAddress))
|
||||
continue;
|
||||
long timestamp = Long.parseLong(StringUtils.defaultIfBlank(
|
||||
stats.getOrDefault("_received_at_server_timestamp", "").toString().trim(),
|
||||
"-1"
|
||||
));
|
||||
stats.put("clientId", clientId);
|
||||
stats.put("ipAddress", clientIpAddress);
|
||||
stats.put("receivedAtServer", Instant.ofEpochMilli(timestamp).toString());
|
||||
log.debug("Topic Beacon: Transmitting Basic Metrics for: instance={}, ip-address={}, message={}, topics={}",
|
||||
clientId, clientIpAddress, stats, properties.getInstanceTopics());
|
||||
sendEventToTopics(stats, properties.getBasicMetricsTopics());
|
||||
} catch (Exception e) {
|
||||
log.error("Topic Beacon: Transmitting Basic Metrics for: EXCEPTION while preparing basic metrics for client: id={}, ip-address={}, metrics={}\n",
|
||||
csc.getId(), csc.getClientIpAddress(), stats, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------
|
||||
|
||||
private void sendEventToTopics(String message, Set<String> topics) throws JMSException {
|
||||
|
Loading…
x
Reference in New Issue
Block a user