Skip to content

Commit

Permalink
Add forEach, processRelation function to MAL Expression, and add …
Browse files Browse the repository at this point in the history
…`expPrefix` and `initExp` in MAL config (apache#9299)
  • Loading branch information
mrproliu authored Jul 4, 2022
1 parent d462682 commit 7bb8c69
Show file tree
Hide file tree
Showing 32 changed files with 949 additions and 41 deletions.
2 changes: 2 additions & 0 deletions docs/en/changes/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
* Add component ID(128) for Java Hutool plugin.
* Add Zipkin query exception handler, response error message for illegal arguments.
* Fix a NullPointerException in the endpoint analysis, which would cause missing MQ-related `LocalSpan` in the trace.
* Add `forEach`, `processRelation` function to MAL expression.
* Add `expPrefix`, `initExp` in MAL config.

#### UI

Expand Down
6 changes: 6 additions & 0 deletions docs/en/concepts-and-designs/mal.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ Examples:
#### time
`time()`: Returns the number of seconds since January 1, 1970 UTC.

#### foreach
`forEach([string_array], Closure<Void> each)`: Iterates all samples according to the first array argument, and provide two parameters in the second closure argument:
1. `element`: element in the array.
2. `tags`: tags in each sample.

## Down Sampling Operation
MAL should instruct meter-system on how to downsample for metrics. It doesn't only refer to aggregate raw samples to
Expand Down Expand Up @@ -245,6 +249,8 @@ They extract level relevant labels from metric labels, then informs the meter-sy
extracts endpoint level labels from the second array argument, extracts layer from `Layer` argument.
- `serviceRelation(DetectPoint, [source_svc_label1...], [dest_svc_label1...], Layer)` DetectPoint including `DetectPoint.CLIENT` and `DetectPoint.SERVER`,
extracts `sourceService` labels from the first array argument, extracts `destService` labels from the second array argument, extracts layer from `Layer` argument.
- `processRelation(detect_point_label, [service_label1...], [instance_label1...], source_process_id_label, dest_process_id_label)` extracts `DetectPoint` labels from first argument, the label value should be `client` or `server`.
extracts `Service` labels from the first array argument, extracts `Instance` labels from the second array argument, extracts `ProcessID` labels from the fourth and fifth arguments of the source and destination.

## More Examples

Expand Down
4 changes: 4 additions & 0 deletions docs/en/setup/backend/backend-meter.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,12 @@ If you're using Spring Sleuth, see [Spring Sleuth Setup](spring-sleuth-setup.md)
### Meters configuration

```yaml
# initExp is the expression that initializes the current configuration file
initExp: <string>
# filter the metrics, only those metrics that satisfy this condition will be passed into the `metricsRules` below.
filter: <closure> # example: '{ tags -> tags.job_name == "vm-monitoring" }'
# expPrefix is executed before the metrics executes other functions.
expPrefix: <string>
# expSuffix is appended to all expression in this file.
expSuffix: <string>
# insert metricPrefix into metric name: <metricPrefix>_<raw_metric_name>
Expand Down
4 changes: 4 additions & 0 deletions docs/en/setup/backend/backend-zabbix.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@ You can find details on Zabbix agent items from [Zabbix Agent documentation](htt
### Configuration file

```yaml
# initExp is the expression that initializes the current configuration file
initExp: <string>
# insert metricPrefix into metric name: <metricPrefix>_<raw_metric_name>
metricPrefix: <string>
# expPrefix is executed before the metrics executes other functions.
expPrefix: <string>
# expSuffix is appended to all expression in this file.
expSuffix: <string>
# Datasource from Zabbix Item keys.
Expand Down
4 changes: 4 additions & 0 deletions docs/en/setup/backend/prometheus-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ staticConfig:
# Labels assigned to all metrics fetched from the targets.
labels:
[ <labelname>: <labelvalue> ... ]
# initExp is the expression that initializes the current configuration file
initExp: <string>
# filter the metrics, only those metrics that satisfy this condition will be passed into the `metricsRules` below.
filter: <closure> # example: '{ tags -> tags.job_name == "vm-monitoring" }'
# expPrefix is executed before the metrics executes other functions.
expPrefix: <string>
# expSuffix is appended to all expression in this file.
expSuffix: <string>
# insert metricPrefix into metric name: <metricPrefix>_<raw_metric_name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
public class MeterConfig implements MetricRuleConfig {
private String metricPrefix;
private String expSuffix;
private String expPrefix;
private String filter;
private List<Rule> metricsRules;
private String initExp;

@Data
@NoArgsConstructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@
import org.apache.skywalking.oap.meter.analyzer.dsl.Result;
import org.apache.skywalking.oap.meter.analyzer.dsl.Sample;
import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily;
import org.apache.skywalking.oap.meter.analyzer.k8s.K8sInfoRegistry;
import org.apache.skywalking.oap.server.core.analysis.Layer;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
import org.apache.skywalking.oap.server.core.analysis.manual.relation.process.ProcessRelationClientSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.manual.relation.process.ProcessRelationServerSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationClientSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationServerSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
Expand Down Expand Up @@ -241,10 +242,6 @@ private void init() {
}
}
createMetric(ctx.getScopeType(), metricType.literal, ctx.getDownsampling());

if (ctx.isRetagByK8sMeta()) {
K8sInfoRegistry.getInstance().start();
}
}

private void createMetric(final ScopeType scopeType,
Expand All @@ -262,16 +259,12 @@ private void send(final AcceptableValue<?> v, final long time) {

private void generateTraffic(MeterEntity entity) {
if (entity.getDetectPoint() != null) {
switch (entity.getDetectPoint()) {
case SERVER:
entity.setServiceName(entity.getDestServiceName());
toService(requireNonNull(entity.getDestServiceName()), entity.getLayer());
serverSide(entity);
switch (entity.getScopeType()) {
case SERVICE_RELATION:
serviceRelationTraffic(entity);
break;
case CLIENT:
entity.setServiceName(entity.getSourceServiceName());
toService(requireNonNull(entity.getSourceServiceName()), entity.getLayer());
clientSide(entity);
case PROCESS_RELATION:
processRelationTraffic(entity);
break;
default:
}
Expand Down Expand Up @@ -309,7 +302,23 @@ private void toService(String serviceName, Layer layer) {
MetricsStreamProcessor.getInstance().in(s);
}

private void serverSide(MeterEntity entity) {
private void serviceRelationTraffic(MeterEntity entity) {
switch (entity.getDetectPoint()) {
case SERVER:
entity.setServiceName(entity.getDestServiceName());
toService(requireNonNull(entity.getDestServiceName()), entity.getLayer());
serviceRelationServerSide(entity);
break;
case CLIENT:
entity.setServiceName(entity.getSourceServiceName());
toService(requireNonNull(entity.getSourceServiceName()), entity.getLayer());
serviceRelationClientSide(entity);
break;
default:
}
}

private void serviceRelationServerSide(MeterEntity entity) {
ServiceRelationServerSideMetrics metrics = new ServiceRelationServerSideMetrics();
metrics.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
metrics.setSourceServiceId(entity.sourceServiceId());
Expand All @@ -319,7 +328,7 @@ private void serverSide(MeterEntity entity) {
MetricsStreamProcessor.getInstance().in(metrics);
}

private void clientSide(MeterEntity entity) {
private void serviceRelationClientSide(MeterEntity entity) {
ServiceRelationClientSideMetrics metrics = new ServiceRelationClientSideMetrics();
metrics.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
metrics.setSourceServiceId(entity.sourceServiceId());
Expand All @@ -328,4 +337,37 @@ private void clientSide(MeterEntity entity) {
metrics.setEntityId(entity.id());
MetricsStreamProcessor.getInstance().in(metrics);
}

private void processRelationTraffic(MeterEntity entity) {
switch (entity.getDetectPoint()) {
case SERVER:
processRelationServerSide(entity);
break;
case CLIENT:
processRelationClientSide(entity);
break;
default:
}
}

private void processRelationServerSide(MeterEntity entity) {
ProcessRelationServerSideMetrics metrics = new ProcessRelationServerSideMetrics();
metrics.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
metrics.setServiceInstanceId(entity.serviceInstanceId());
metrics.setSourceProcessId(entity.getSourceProcessId());
metrics.setDestProcessId(entity.getDestProcessId());
metrics.setEntityId(entity.id());
MetricsStreamProcessor.getInstance().in(metrics);
}

private void processRelationClientSide(MeterEntity entity) {
ProcessRelationClientSideMetrics metrics = new ProcessRelationClientSideMetrics();
metrics.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
metrics.setServiceInstanceId(entity.serviceInstanceId());
metrics.setSourceProcessId(entity.getSourceProcessId());
metrics.setDestProcessId(entity.getDestProcessId());
metrics.setEntityId(entity.id());
MetricsStreamProcessor.getInstance().in(metrics);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
import java.util.StringJoiner;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.skywalking.oap.meter.analyzer.dsl.DSL;
import org.apache.skywalking.oap.meter.analyzer.dsl.Expression;
import org.apache.skywalking.oap.meter.analyzer.dsl.ExpressionParsingException;
import org.apache.skywalking.oap.meter.analyzer.dsl.Result;
import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;

Expand All @@ -48,14 +53,28 @@ public static <T> Stream<T> log(Try<T> t, String debugMessage) {

public MetricConvert(MetricRuleConfig rule, MeterSystem service) {
Preconditions.checkState(!Strings.isNullOrEmpty(rule.getMetricPrefix()));
// init expression script
if (StringUtils.isNotEmpty(rule.getInitExp())) {
handleInitExp(rule.getInitExp());
}

this.analyzers = rule.getMetricsRules().stream().map(
r -> Analyzer.build(
formatMetricName(rule, r.getName()),
rule.getFilter(),
Strings.isNullOrEmpty(rule.getExpSuffix()) ?
r.getExp() : String.format("(%s).%s", r.getExp(), rule.getExpSuffix()),
service
)
r -> {
String exp = r.getExp();
if (!Strings.isNullOrEmpty(rule.getExpPrefix())) {
exp = String.format("(%s.%s).%s", StringUtils.substringBefore(exp, "."), rule.getExpPrefix(),
StringUtils.substringAfter(exp, "."));
}
if (!Strings.isNullOrEmpty(rule.getExpSuffix())) {
exp = String.format("(%s).%s", exp, rule.getExpSuffix());
}
return Analyzer.build(
formatMetricName(rule, r.getName()),
rule.getFilter(),
exp,
service
);
}
).collect(toList());
}

Expand Down Expand Up @@ -83,4 +102,13 @@ private String formatMetricName(MetricRuleConfig rule, String meterRuleName) {
metricName.add(rule.getMetricPrefix()).add(meterRuleName);
return metricName.toString();
}

private void handleInitExp(String exp) {
Expression e = DSL.parse(exp);
final Result result = e.run(ImmutableMap.of());
if (!result.isSuccess() && result.isThrowable()) {
throw new ExpressionParsingException(
"failed to execute init expression: " + exp + ", error:" + result.getError());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,23 @@ public interface MetricRuleConfig {
*/
String getExpSuffix();

/**
* Get MAL expression prefix
*/
String getExpPrefix();

/**
* Get all rules
*/
List<? extends RuleConfig> getMetricsRules();

String getFilter();

/**
* Get the init expression script
*/
String getInitExp();

interface RuleConfig {
/**
* Get definition metrics name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
import java.lang.reflect.Array;
import java.util.List;
import java.util.Map;

import org.apache.skywalking.oap.meter.analyzer.dsl.registry.ProcessRegistry;
import org.apache.skywalking.oap.meter.analyzer.dsl.tagOpt.K8sRetagType;
import org.apache.skywalking.oap.meter.analyzer.k8s.Kubernetes;
import org.apache.skywalking.oap.server.core.analysis.Layer;
import org.apache.skywalking.oap.server.core.source.DetectPoint;
import org.codehaus.groovy.ast.stmt.DoWhileStatement;
Expand Down Expand Up @@ -54,6 +57,8 @@ public static Expression parse(final String expression) {
icz.addImport("K8sRetagType", K8sRetagType.class.getName());
icz.addImport("DetectPoint", DetectPoint.class.getName());
icz.addImport("Layer", Layer.class.getName());
icz.addImport("ProcessRegistry", ProcessRegistry.class.getName());
icz.addImport("Kubernetes", Kubernetes.class.getName());
cc.addCompilationCustomizers(icz);

final SecureASTCustomizer secureASTCustomizer = new SecureASTCustomizer();
Expand All @@ -73,7 +78,9 @@ public static Expression parse(final String expression) {
.add(K8sRetagType.class)
.add(DetectPoint.class)
.add(Layer.class)
.build());
.add(ProcessRegistry.class)
.add(Kubernetes.class)
.build());
cc.addCompilationCustomizers(secureASTCustomizer);

GroovyShell sh = new GroovyShell(new Binding(), cc);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription;

import com.google.common.collect.ImmutableList;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.apache.skywalking.oap.server.core.analysis.meter.ScopeType;

import java.util.List;

@Getter
@RequiredArgsConstructor
@ToString
public class ProcessRelationEntityDescription implements EntityDescription {
private final ScopeType scopeType = ScopeType.PROCESS_RELATION;
private final List<String> serviceKeys;
private final List<String> instanceKeys;
private final String sourceProcessIdKey;
private final String destProcessIdKey;
private final String detectPointKey;
private final String delimiter;

@Override
public List<String> getLabelKeys() {
return ImmutableList.<String>builder()
.addAll(serviceKeys)
.addAll(instanceKeys)
.add(detectPointKey, sourceProcessIdKey, destProcessIdKey).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ static Optional<ExpressionParsingContext> get() {

ScopeType scopeType;

/**
* Mark whether the retagByK8sMeta func in expressions is active
*/
boolean isRetagByK8sMeta;

/**
* Get labels no scope related.
*
Expand Down
Loading

0 comments on commit 7bb8c69

Please sign in to comment.