diff --git a/ems-core/control-service/src/main/java/gr/iccs/imu/ems/control/controller/ControlServiceCoordinator.java b/ems-core/control-service/src/main/java/gr/iccs/imu/ems/control/controller/ControlServiceCoordinator.java index 23cafad..ed3e726 100644 --- a/ems-core/control-service/src/main/java/gr/iccs/imu/ems/control/controller/ControlServiceCoordinator.java +++ b/ems-core/control-service/src/main/java/gr/iccs/imu/ems/control/controller/ControlServiceCoordinator.java @@ -17,6 +17,7 @@ import gr.iccs.imu.ems.brokercep.BrokerCepService; import gr.iccs.imu.ems.brokercep.BrokerCepStatementSubscriber; import gr.iccs.imu.ems.brokercep.event.EventMap; import gr.iccs.imu.ems.control.collector.netdata.ServerNetdataCollector; +import gr.iccs.imu.ems.control.plugin.MetasolverPlugin; import gr.iccs.imu.ems.control.plugin.PostTranslationPlugin; import gr.iccs.imu.ems.control.plugin.TranslationContextPlugin; import gr.iccs.imu.ems.control.properties.ControlServiceProperties; @@ -78,6 +79,8 @@ public class ControlServiceCoordinator implements InitializingBean { private final List translationContextPlugins; private final TranslationContextPrinter translationContextPrinter; + private final List metasolverPlugins; + private final List mvvServiceImplementations; private MetricVariableValuesService mvvService; // Will be populated in 'afterPropertiesSet()' @@ -115,6 +118,7 @@ public class ControlServiceCoordinator implements InitializingBean { log.debug("ControlServiceCoordinator.afterPropertiesSet(): Post-translation plugins: {}", postTranslationPlugins); log.debug("ControlServiceCoordinator.afterPropertiesSet(): TranslationContext plugins: {}", translationContextPlugins); + log.debug("ControlServiceCoordinator.afterPropertiesSet(): MetaSolver plugins: {}", metasolverPlugins); } private void initMvvService() { @@ -421,6 +425,7 @@ public class ControlServiceCoordinator implements InitializingBean { // Translate application model into a TranslationContext object log.info("ControlServiceCoordinator.translateAppModelAndStore(): Model translation: model-id={}", appModelId); _TC = translator.translate(appModelId); + _TC.populateTopLevelMetricNames(); log.debug("ControlServiceCoordinator.translateAppModelAndStore(): Model translation: RESULTS: {}", _TC); // Run post-translation plugins @@ -674,11 +679,16 @@ public class ControlServiceCoordinator implements InitializingBean { log.debug("ControlServiceCoordinator.configureMetaSolver(): MetaSolver configuration: scaling-topics: {}", scalingTopics); // Get top-level metric topics from _TC - Set metricTopics = _TC.getTopLevelMetricNames(true).stream() + Set topLevelMetrics = _TC.getTopLevelMetricNames(true); + log.debug("ControlServiceCoordinator.configureMetaSolver(): Top-Level metrics: {}", topLevelMetrics); + Set metricTopics = topLevelMetrics.stream() .filter(m -> !scalingTopics.contains(m)) .collect(Collectors.toSet()); log.debug("ControlServiceCoordinator.configureMetaSolver(): MetaSolver configuration: metric-topics: {}", metricTopics); + // Let Metasolver plugins modify topics sets + metasolverPlugins.forEach(p -> p.topicsCollected(_TC, scalingTopics, metricTopics)); + // Prepare subscription configurations String upperwareBrokerUrl = brokerCep != null ? brokerCep.getBrokerCepProperties().getBrokerUrlForClients() : null; boolean usesAuthentication = brokerCep.getBrokerCepProperties().isAuthenticationEnabled(); @@ -694,15 +704,18 @@ public class ControlServiceCoordinator implements InitializingBean { } List> subscriptionConfigs = new ArrayList<>(); for (String t : scalingTopics) - subscriptionConfigs.add(_prepareSubscriptionConfig(upperwareBrokerUrl, username, password, certificate, t, "", "SCALE")); + subscriptionConfigs.add(_prepareSubscriptionConfig(_TC, upperwareBrokerUrl, username, password, certificate, t, "", "SCALE")); for (String t : metricTopics) - subscriptionConfigs.add(_prepareSubscriptionConfig(upperwareBrokerUrl, username, password, certificate, t, "", "MVV")); + subscriptionConfigs.add(_prepareSubscriptionConfig(_TC, upperwareBrokerUrl, username, password, certificate, t, "", "MVV")); log.debug("ControlServiceCoordinator.configureMetaSolver(): MetaSolver subscriptions configuration: {}", subscriptionConfigs); // Retrieve MVV to Current-Config MVV map Map mvvMap = _TC.getMvvCP(); log.debug("ControlServiceCoordinator.configureMetaSolver(): MetaSolver MVV configuration: {}", mvvMap); + // Let Metasolver plugins modify MVV map + metasolverPlugins.forEach(p -> p.mvvsCollected(_TC, mvvMap)); + // Prepare MetaSolver configuration Map msConfig = new HashMap<>(); msConfig.put("subscriptions", subscriptionConfigs); @@ -756,7 +769,7 @@ public class ControlServiceCoordinator implements InitializingBean { return modelId; } - protected Map _prepareSubscriptionConfig(String url, String username, String password, String certificate, String topic, String clientId, String type) { + protected Map _prepareSubscriptionConfig(TranslationContext _TC, String url, String username, String password, String certificate, String topic, String clientId, String type) { Map map = new HashMap<>(); map.put("url", url); map.put("username", username); @@ -765,6 +778,10 @@ public class ControlServiceCoordinator implements InitializingBean { map.put("topic", topic); map.put("client-id", clientId); map.put("type", type); + + // Let Metasolver plugins modify subscription + metasolverPlugins.forEach(p -> p.prepareSubscription(_TC, map)); + return map; } diff --git a/ems-core/control-service/src/main/java/gr/iccs/imu/ems/control/plugin/MetasolverPlugin.java b/ems-core/control-service/src/main/java/gr/iccs/imu/ems/control/plugin/MetasolverPlugin.java new file mode 100644 index 0000000..8cebce4 --- /dev/null +++ b/ems-core/control-service/src/main/java/gr/iccs/imu/ems/control/plugin/MetasolverPlugin.java @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2017-2023 Institute of Communication and Computer Systems (imu.iccs.gr) + * + * This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless + * Esper library is used, in which case it is subject to the terms of General Public License v2.0. + * If a copy of the MPL was not distributed with this file, you can obtain one at + * https://www.mozilla.org/en-US/MPL/2.0/ + */ + +package gr.iccs.imu.ems.control.plugin; + +import gr.iccs.imu.ems.translate.TranslationContext; +import gr.iccs.imu.ems.util.Plugin; + +import java.util.Map; +import java.util.Set; + +/** + * Executed during Metasolver configuration generation + */ +public interface MetasolverPlugin extends Plugin { + default void topicsCollected(TranslationContext translationContext, Set scalingTopics, Set metricTopics) { } + + default void prepareSubscription(TranslationContext translationContext, Map subscriptionConfigMap) { } + + default void mvvsCollected(TranslationContext translationContext, Map mvvMap) { } +} diff --git a/ems-core/translator/src/main/java/gr/iccs/imu/ems/translate/TranslationContext.java b/ems-core/translator/src/main/java/gr/iccs/imu/ems/translate/TranslationContext.java index f63836a..07aeab1 100644 --- a/ems-core/translator/src/main/java/gr/iccs/imu/ems/translate/TranslationContext.java +++ b/ems-core/translator/src/main/java/gr/iccs/imu/ems/translate/TranslationContext.java @@ -21,6 +21,7 @@ import lombok.Setter; import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; import java.io.Serializable; import java.util.*; @@ -69,11 +70,6 @@ public class TranslationContext implements Serializable { // Grouping-to-Topics map private final Map> G2T = new HashMap<>(); - // Metric-to-Metric Context map - @Getter - @JsonIgnore - private final transient Map> M2MC = new HashMap<>(); - // Composite Metric Variables set @Getter @JsonIgnore @@ -114,10 +110,11 @@ public class TranslationContext implements Serializable { private final Set ifThenConstraints = new LinkedHashSet<>(); // Load-annotated Metric + protected final Map loadAnnotatedDestinationToMetricContextNameMap = new LinkedHashMap<>(); protected final Set loadAnnotatedMetricsSet = new LinkedHashSet<>(); // Top-Level metric names - protected Set topLevelMetricNames = new LinkedHashSet<>(); + protected final Set topLevelMetricNames = new LinkedHashSet<>(); // Export files @Getter @Setter @@ -190,6 +187,7 @@ public class TranslationContext implements Serializable { this.metricConstraints.addAll( cloneSet(_TC.metricConstraints) ); this.logicalConstraints.addAll( cloneSet(_TC.logicalConstraints) ); this.ifThenConstraints.addAll( cloneSet(_TC.ifThenConstraints) ); + this.loadAnnotatedDestinationToMetricContextNameMap.putAll(_TC.loadAnnotatedDestinationToMetricContextNameMap); this.loadAnnotatedMetricsSet.addAll(_TC.loadAnnotatedMetricsSet); this.topLevelMetricNames.addAll(_TC.topLevelMetricNames); this.exportFiles.addAll(_TC.exportFiles); @@ -279,48 +277,43 @@ public class TranslationContext implements Serializable { return newGroupingsMap; } - public MetricContext getMetricContextForMetric(Metric m) { - if (M2MC==null) return null; - Set set = M2MC.get(m); - return set == null ? null : set.iterator().next(); - } - public Set getMetricConstraints() { - return new HashSet<>(metricConstraints); + return metricConstraints!=null ? new HashSet<>(metricConstraints) : new HashSet<>(); } public Set getLogicalConstraints() { - return new HashSet<>(logicalConstraints); + return logicalConstraints!=null ? new HashSet<>(logicalConstraints) : new HashSet<>(); } public HashSet getCompositeMetricVariables() { - return new HashSet<>(CMVar_1); + return CMVar_1!=null ? new HashSet<>(CMVar_1) : new HashSet<>(); } public HashSet getCompositeMetricVariableNames() { - return new HashSet<>(CMVar); + return CMVar!=null ? new HashSet<>(CMVar) : new HashSet<>(); } public HashSet getRawMetricVariables() { - return new HashSet<>(RMVar_1); + return RMVar_1!=null ? new HashSet<>(RMVar_1) : new HashSet<>(); } public HashSet getRawMetricVariableNames() { - return new HashSet<>(RMVar); + return RMVar!=null ? new HashSet<>(RMVar) : new HashSet<>(); } public boolean isMVV(String name) { + if (MVV==null) return false; for (String mvv : MVV) if (mvv.equals(name)) return true; return false; } public Set getMVV() { - return new HashSet<>(MVV); + return MVV!=null ? new HashSet<>(MVV) : new HashSet<>(); } public Map getMvvCP() { - return new HashMap<>(MvvCP); + return MvvCP!=null ? new HashMap<>(MvvCP) : new HashMap<>(); } // ==================================================================================================================================================== @@ -396,14 +389,6 @@ public class TranslationContext implements Serializable { rules.forEach(rule -> addGroupingRulePair(grouping, topic, rule)); } - public void addMetricMetricContextPair(Metric m, MetricContext mc) { - _addPair(M2MC, m, mc); - } - - public void addMetricMetricContextPairs(Metric m, List mcs) { - _addPair(M2MC, m, mcs); - } - public void addCompositeMetricVariable(MetricVariable mv) { CMVar.add(mv.getName()); CMVar_1.add(mv); @@ -672,6 +657,23 @@ public class TranslationContext implements Serializable { // ==================================================================================================================================================== // Load-Metrics-related helper methods + public void addLoadAnnotatedDestinationNameToMetricContextName(@NonNull String metricContextName, @NonNull String destinationName) { + loadAnnotatedDestinationToMetricContextNameMap.put(destinationName, metricContextName); + } + + public void addLoadAnnotatedDestinationNameToMetricContextName(@NonNull Map map) { + loadAnnotatedDestinationToMetricContextNameMap.putAll(map); + } + + public Map getLoadAnnotatedDestinationNameToMetricContextNameMap() { + return loadAnnotatedDestinationToMetricContextNameMap!=null + ? new HashMap<>(loadAnnotatedDestinationToMetricContextNameMap) : new HashMap<>(); + } + + public String getLoadAnnotatedDestinationMetricContextName(@NonNull String key) { + return getLoadAnnotatedDestinationNameToMetricContextNameMap().get(key); + } + public void addLoadAnnotatedMetric(@NonNull String metricName) { loadAnnotatedMetricsSet.add(metricName); } @@ -681,7 +683,7 @@ public class TranslationContext implements Serializable { } public Set getLoadAnnotatedMetricsSet() { - return new HashSet<>(loadAnnotatedMetricsSet); + return loadAnnotatedMetricsSet!=null ? new HashSet<>(loadAnnotatedMetricsSet) : new HashSet<>(); } // ==================================================================================================================================================== @@ -692,11 +694,19 @@ public class TranslationContext implements Serializable { } public void populateTopLevelMetricNames() { - Set set = DAG.getTopLevelNodes().stream() + if (getDAG()==null) { + log.warn("TranslationContext.populateTopLevelMetricNames(): DAG is NULL"); + return; + } + log.trace("TranslationContext.populateTopLevelMetricNames(): DAG is *NOT* NULL"); + Set set = getDAG().getTopLevelNodes().stream() .map(DAGNode::getElementName) .collect(Collectors.toSet()); - topLevelMetricNames.clear(); - topLevelMetricNames.addAll(set); + log.trace("TranslationContext.populateTopLevelMetricNames(): set: {}", set); + synchronized (topLevelMetricNames) { + topLevelMetricNames.clear(); + topLevelMetricNames.addAll(set); + } } public Set getTopLevelMetricNames(boolean forcePopulate) { @@ -719,6 +729,9 @@ public class TranslationContext implements Serializable { return clazz.cast(result); } + public void printExtraInfo(Logger log) { + } + // ==================================================================================================================================================== /*public void prepareForSerialization() { diff --git a/ems-core/translator/src/main/java/gr/iccs/imu/ems/translate/TranslationContextPrinter.java b/ems-core/translator/src/main/java/gr/iccs/imu/ems/translate/TranslationContextPrinter.java index bc6edd3..59297e1 100644 --- a/ems-core/translator/src/main/java/gr/iccs/imu/ems/translate/TranslationContextPrinter.java +++ b/ems-core/translator/src/main/java/gr/iccs/imu/ems/translate/TranslationContextPrinter.java @@ -100,8 +100,6 @@ public class TranslationContextPrinter { log.info("*********************************************************"); log.info("Topics-Connections map:\n{}", _TC.getTopicConnections()); log.info("*********************************************************"); - log.info("Metric-to-Metric Context map:\n{}", map2string(_TC.getM2MC())); - log.info("*********************************************************"); log.info("MVV set:\n{}", _TC.getMVV()); log.info("*********************************************************"); log.info("MVV_CP map:\n{}", _TC.getMvvCP()); @@ -114,9 +112,16 @@ public class TranslationContextPrinter { log.info("*********************************************************"); log.info("Metric Constraints:\n{}", _TC.getMetricConstraints()); log.info("*********************************************************"); + log.info("Load-Annotated Destination-to-Metric Context names map:\n{}", + _TC.getLoadAnnotatedDestinationNameToMetricContextNameMap()); + log.info("*********************************************************"); log.info("Load-Annotated Metrics:\n{}", _TC.getLoadAnnotatedMetricsSet()); log.info("*********************************************************"); log.info("Top-Level Metric names:\n{}", _TC.getTopLevelMetricNames()); + log.info("*********************************************************"); + + _TC.printExtraInfo(log); + log.info("*********************************************************"); log.info("Additional Results:\n{}", _TC.getAdditionalResults()); log.info("*********************************************************"); @@ -124,7 +129,11 @@ public class TranslationContextPrinter { log.info("*********************************************************"); } - public String prettifyG2R(Map>> map, String startIdent) { + public static void separator() { + log.info("*********************************************************"); + } + + public static String prettifyG2R(Map>> map, String startIdent) { StringBuilder sb = new StringBuilder(); String ident2 = startIdent+" "; String ident3 = startIdent+" "; @@ -147,7 +156,7 @@ public class TranslationContextPrinter { return sb.toString(); } - protected Map> map2string(Map map) { + public static Map> map2string(Map map) { if (map==null) return null; Map> newMap = new HashMap<>(); for (Object key : map.keySet()) { @@ -172,7 +181,7 @@ public class TranslationContextPrinter { return newMap; } - protected Collection getFunctionNames(Collection col) { + public static Collection getFunctionNames(Collection col) { return col.stream() .map(FunctionDefinition::getName) .collect(Collectors.toList());