Skip to content

Commit

Permalink
Support eBPF Network Profiling (apache#9337)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrproliu authored Jul 13, 2022
1 parent bf74b66 commit 5ea7c0d
Show file tree
Hide file tree
Showing 35 changed files with 1,021 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,34 @@

package org.apache.skywalking.oap.server.network.trace.component.command;

import com.google.common.base.Joiner;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.skywalking.apm.network.common.v3.Command;
import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;

import java.util.List;

/**
* eBPF profiling task command, OAP uses this to send a task to the ebpf agent side
*/
public class EBPFProfilingTaskCommand extends BaseCommand implements Serializable {
public static final String NAME = "EBPFProfilingTaskQuery";

private String taskId;
private String processId;
private List<String> processIdList;
private long taskStartTime;
private long taskUpdateTime;
private String triggerType;
private FixedTrigger fixedTrigger;
private String targetType;

public EBPFProfilingTaskCommand(String serialNumber, String taskId, String processId, long taskStartTime,
public EBPFProfilingTaskCommand(String serialNumber, String taskId, List<String> processIdList, long taskStartTime,
long taskUpdateTime, String triggerType, FixedTrigger fixedTrigger,
String targetType) {
super(NAME, serialNumber);
this.taskId = taskId;
this.processId = processId;
this.processIdList = processIdList;
this.taskStartTime = taskStartTime;
this.taskUpdateTime = taskUpdateTime;
this.triggerType = triggerType;
Expand All @@ -54,7 +57,7 @@ public EBPFProfilingTaskCommand(String serialNumber, String taskId, String proce
public Command.Builder serialize() {
final Command.Builder builder = commandBuilder();
builder.addArgs(KeyStringValuePair.newBuilder().setKey("TaskId").setValue(taskId))
.addArgs(KeyStringValuePair.newBuilder().setKey("ProcessId").setValue(processId))
.addArgs(KeyStringValuePair.newBuilder().setKey("ProcessId").setValue(Joiner.on(",").join(processIdList)))
.addArgs(KeyStringValuePair.newBuilder().setKey("TaskUpdateTime").setValue(String.valueOf(taskUpdateTime)))
.addArgs(KeyStringValuePair.newBuilder().setKey("TriggerType").setValue(triggerType))
.addArgs(KeyStringValuePair.newBuilder().setKey("TargetType").setValue(targetType))
Expand Down
3 changes: 3 additions & 0 deletions docs/en/changes/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
* Remove legacy OAL `percentile` functions, `p99`, `p95`, `p90`, `p75`, `p50` func(s).
* Revert [#8066](https://github.com/apache/skywalking/pull/8066). Keep all metrics persistent even it is default value.
* Skip loading UI templates if folder is empty or doesn't exist.
* Support the `NETWORK` type of eBPF Profiling task.
* Support `sumHistogram` in `MAL`.
* [Breaking Change] Make the eBPF Profiling task support to the service instance level, index/table `ebpf_profiling_task` is required to be re-created when bump up from previous releases.

#### UI

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
this.registerServiceImplementation(
NetworkAddressAliasCache.class, new NetworkAddressAliasCache(moduleConfig));

this.registerServiceImplementation(TopologyQueryService.class, new TopologyQueryService(getManager()));
this.registerServiceImplementation(TopologyQueryService.class, new TopologyQueryService(getManager(), storageModels));
this.registerServiceImplementation(MetricsMetadataQueryService.class, new MetricsMetadataQueryService());
this.registerServiceImplementation(MetricsQueryService.class, new MetricsQueryService(getManager()));
this.registerServiceImplementation(TraceQueryService.class, new TraceQueryService(getManager()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
* Histogram includes data range buckets and the amount matched/grouped in the buckets. This is for original histogram
* graph visualization
*/
@MeterFunction(functionName = "histogram")
@MeterFunction(functionName = "sumHistogram")
@Slf4j
@ToString
public abstract class HistogramFunction extends Meter implements AcceptableValue<BucketedValues> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.skywalking.oap.server.core.command;

import java.util.List;
import java.util.Objects;
import java.util.UUID;

Expand Down Expand Up @@ -49,13 +50,13 @@ public ProfileTaskCommand newProfileTaskCommand(ProfileTask task) {
/**
* Used to notify the eBPF Profiling task to the eBPF agent side
*/
public EBPFProfilingTaskCommand newEBPFProfilingTaskCommand(EBPFProfilingTask task, String processId) {
public EBPFProfilingTaskCommand newEBPFProfilingTaskCommand(EBPFProfilingTask task, List<String> processId) {
final String serialNumber = UUID.randomUUID().toString();
EBPFProfilingTaskCommand.FixedTrigger fixedTrigger = null;
if (Objects.equals(task.getTriggerType(), EBPFProfilingTriggerType.FIXED_TIME)) {
fixedTrigger = new EBPFProfilingTaskCommand.FixedTrigger(task.getFixedTriggerDuration());
}
return new EBPFProfilingTaskCommand(serialNumber, task.getTaskId(), processId, task.getCreateTime(),
return new EBPFProfilingTaskCommand(serialNumber, task.getTaskId(), processId, task.getTaskStartTime(),
task.getLastUpdateTime(), task.getTriggerType().name(), fixedTrigger, task.getTargetType().name());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,21 @@
import com.google.gson.Gson;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.worker.NoneStreamProcessor;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTargetType;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTriggerType;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTaskRecord;
import org.apache.skywalking.oap.server.core.query.input.EBPFProfilingNetworkTaskRequest;
import org.apache.skywalking.oap.server.core.query.input.EBPFProfilingTaskFixedTimeCreationRequest;
import org.apache.skywalking.oap.server.core.query.type.EBPFNetworkKeepProfilingResult;
import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingTask;
import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingTaskCreationResult;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
Expand All @@ -40,18 +45,23 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

@RequiredArgsConstructor
public class EBPFProfilingMutationService implements Service {
private static final Gson GSON = new Gson();
public static final int FIXED_TIME_MIN_DURATION = (int) TimeUnit.SECONDS.toSeconds(60);
public static final int NETWORK_PROFILING_DURATION = (int) TimeUnit.MINUTES.toSeconds(10);
public static final int NETWORK_KEEP_ALIVE_THRESHOLD = (int) TimeUnit.SECONDS.toSeconds(60);

private final ModuleManager moduleManager;
private IEBPFProfilingTaskDAO processProfilingTaskDAO;
private IServiceLabelDAO serviceLabelDAO;
private IMetadataQueryDAO metadataQueryDAO;

private IEBPFProfilingTaskDAO getProcessProfilingTaskDAO() {
if (processProfilingTaskDAO == null) {
Expand All @@ -71,6 +81,15 @@ public IServiceLabelDAO getServiceLabelDAO() {
return serviceLabelDAO;
}

private IMetadataQueryDAO getMetadataQueryDAO() {
if (metadataQueryDAO == null) {
this.metadataQueryDAO = moduleManager.find(StorageModule.NAME)
.provider()
.getService(IMetadataQueryDAO.class);
}
return metadataQueryDAO;
}

/**
* Create eBPF Profiling task with {@link EBPFProfilingTriggerType#FIXED_TIME}
*/
Expand Down Expand Up @@ -101,15 +120,92 @@ public EBPFProfilingTaskCreationResult createTask(EBPFProfilingTaskFixedTimeCrea
task.setCreateTime(current);
task.setLastUpdateTime(current);
task.setTimeBucket(TimeBucket.getMinuteTimeBucket(current));
task.generateLogicalId();
NoneStreamProcessor.getInstance().in(task);

return EBPFProfilingTaskCreationResult.builder().status(true).id(task.id()).build();
return EBPFProfilingTaskCreationResult.builder().status(true).id(task.getLogicalId()).build();
}

public EBPFProfilingTaskCreationResult createTask(EBPFProfilingNetworkTaskRequest request) throws IOException {
final long current = System.currentTimeMillis();

// check request
final String error = checkCreateRequest(request);
if (StringUtil.isNotEmpty(error)) {
return buildError(error);
}

final IDManager.ServiceInstanceID.InstanceIDDefinition instanceIDDefinition =
IDManager.ServiceInstanceID.analysisId(request.getInstanceId());
// create task
final EBPFProfilingTaskRecord task = new EBPFProfilingTaskRecord();
task.setServiceId(instanceIDDefinition.getServiceId());
task.setProcessLabelsJson(Const.EMPTY_STRING);
task.setInstanceId(request.getInstanceId());
task.setStartTime(current);
task.setTriggerType(EBPFProfilingTriggerType.FIXED_TIME.value());
task.setFixedTriggerDuration(NETWORK_PROFILING_DURATION);
task.setTargetType(EBPFProfilingTargetType.NETWORK.value());
task.setCreateTime(current);
task.setLastUpdateTime(current);
task.setTimeBucket(TimeBucket.getMinuteTimeBucket(current));
task.generateLogicalId();
NoneStreamProcessor.getInstance().in(task);

return EBPFProfilingTaskCreationResult.builder().status(true).id(task.getLogicalId()).build();
}

public EBPFNetworkKeepProfilingResult keepEBPFNetworkProfiling(String taskId) throws IOException {
final EBPFProfilingTask task = getProcessProfilingTaskDAO().queryById(taskId);
// task not exists
if (task == null) {
return buildKeepProfilingError("profiling task not exists");
}
// target type not "NETWORK"
if (!Objects.equals(task.getTargetType(), EBPFProfilingTargetType.NETWORK)) {
return buildKeepProfilingError("current task is not a \"NETWORK\" task");
}
// task already finished
final Calendar taskTime = Calendar.getInstance();
taskTime.setTimeInMillis(task.getTaskStartTime());
taskTime.add(Calendar.SECOND, (int) task.getFixedTriggerDuration());
final Calendar now = Calendar.getInstance();
final long sec = TimeUnit.MILLISECONDS.toSeconds(taskTime.getTimeInMillis() - now.getTimeInMillis());
if (sec < 0) {
return buildKeepProfilingError("profiling task has been finished");
} else if (sec > NETWORK_KEEP_ALIVE_THRESHOLD) {
// if not archive the threshold, then ignore
return buildKeepProfilingSuccess();
}

// copy the task and extend the task time
final EBPFProfilingTaskRecord record = new EBPFProfilingTaskRecord();
record.setLogicalId(task.getTaskId());
record.setServiceId(task.getServiceId());
record.setProcessLabelsJson(Const.EMPTY_STRING);
record.setInstanceId(task.getServiceInstanceId());
record.setStartTime(task.getTaskStartTime());
record.setTriggerType(task.getTriggerType().value());
record.setFixedTriggerDuration(task.getFixedTriggerDuration() + NETWORK_PROFILING_DURATION);
record.setTargetType(EBPFProfilingTargetType.NETWORK.value());
record.setCreateTime(now.getTimeInMillis());
record.setLastUpdateTime(now.getTimeInMillis());
NoneStreamProcessor.getInstance().in(record);
return buildKeepProfilingSuccess();
}

private EBPFProfilingTaskCreationResult buildError(String msg) {
return EBPFProfilingTaskCreationResult.builder().status(false).errorReason(msg).build();
}

private EBPFNetworkKeepProfilingResult buildKeepProfilingError(String msg) {
return EBPFNetworkKeepProfilingResult.builder().status(false).errorReason(msg).build();
}

private EBPFNetworkKeepProfilingResult buildKeepProfilingSuccess() {
return EBPFNetworkKeepProfilingResult.builder().status(true).build();
}

private String checkCreateRequest(EBPFProfilingTaskFixedTimeCreationRequest request) throws IOException {
String err = null;

Expand Down Expand Up @@ -150,8 +246,8 @@ private String checkCreateRequest(EBPFProfilingTaskFixedTimeCreationRequest requ
}

// query exist processes
final List<EBPFProfilingTask> tasks = getProcessProfilingTaskDAO().queryTasks(
Arrays.asList(request.getServiceId()), request.getTargetType(), request.getStartTime(), 0);
final List<EBPFProfilingTask> tasks = getProcessProfilingTaskDAO().queryTasksByTargets(
request.getServiceId(), null, Arrays.asList(request.getTargetType()), request.getStartTime(), 0);
if (CollectionUtils.isNotEmpty(tasks)) {
final EBPFProfilingTask mostRecentTask = tasks.stream()
.min(Comparator.comparingLong(EBPFProfilingTask::getTaskStartTime)).get();
Expand All @@ -162,6 +258,22 @@ private String checkCreateRequest(EBPFProfilingTaskFixedTimeCreationRequest requ
return null;
}

private String checkCreateRequest(EBPFProfilingNetworkTaskRequest request) throws IOException {
String err = null;
err = requiredNotEmpty(err, "instance", request.getInstanceId());
if (StringUtil.isNotEmpty(err)) {
return err;
}

// validate have processes under the instance
final long processesCount = getMetadataQueryDAO().getProcessesCount(null, request.getInstanceId(), null, null, 0, 0);
if (processesCount <= 0) {
return "The instance doesn't have processes.";
}

return null;
}

private long calculateStartTime(EBPFProfilingTaskFixedTimeCreationRequest request) {
return request.getStartTime() - TimeUnit.SECONDS.toMillis(request.getDuration());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessTraffic;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.profiling.ebpf.analyze.EBPFProfilingAnalyzer;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTargetType;
import org.apache.skywalking.oap.server.core.query.enumeration.ProfilingSupportStatus;
import org.apache.skywalking.oap.server.core.query.type.Attribute;
import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingAnalyzation;
Expand Down Expand Up @@ -56,6 +57,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -171,8 +173,22 @@ public EBPFProfilingTaskPrepare queryPrepareCreateEBPFProfilingTaskData(String s
return prepare;
}

public List<EBPFProfilingTask> queryEBPFProfilingTasks(String serviceId) throws IOException {
return getTaskDAO().queryTasks(Arrays.asList(serviceId), null, 0, 0);
public List<EBPFProfilingTask> queryEBPFProfilingTasks(String serviceId, String serviceInstanceId, List<EBPFProfilingTargetType> targets) throws IOException {
if (CollectionUtils.isEmpty(targets)) {
targets = Arrays.asList(EBPFProfilingTargetType.values());
}
final List<EBPFProfilingTask> tasks = getTaskDAO().queryTasksByTargets(serviceId, serviceInstanceId, targets, 0, 0);
// combine same id tasks
final LinkedHashMap<String, EBPFProfilingTask> tmpMap = new LinkedHashMap<>();
for (EBPFProfilingTask task : tasks) {
final EBPFProfilingTask p = tmpMap.get(task.getTaskId());
if (p == null) {
tmpMap.put(task.getTaskId(), task);
continue;
}
p.combine(task);
}
return new ArrayList<>(tmpMap.values());
}

public List<EBPFProfilingSchedule> queryEBPFProfilingSchedules(String taskId) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public enum EBPFProfilingTargetType {
ON_CPU(1),

OFF_CPU(2),

NETWORK(3),
;
private final int value;
private static final Map<Integer, EBPFProfilingTargetType> DICTIONARY = new HashMap<>();
Expand Down
Loading

0 comments on commit 5ea7c0d

Please sign in to comment.