Skip to content

Commit

Permalink
Merge branch 'master' of github.com:OpenSPG/openspg
Browse files Browse the repository at this point in the history
  • Loading branch information
northmachine committed Jan 8, 2025
2 parents 04adb9a + e53b20a commit f0a2853
Show file tree
Hide file tree
Showing 265 changed files with 17,282 additions and 912 deletions.
11 changes: 5 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,15 @@ OpenSPG Core Capabilities:

## Get Started

* [Install OpenSPG](https://openspg.yuque.com/ndx6g9/ns5nw2/yl9p847hcfpluv46)
* [Install OpenSPG](https://openspg.yuque.com/ndx6g9/wc9oyq/yexegklu44bqqicm)
* Quick start with examples:
* [Enterprise Supply Chain Knowledge Graph](https://openspg.yuque.com/ndx6g9/ns5nw2/gyd703vk4l5qqb9y)
* [Risk Mining Knowledge Graph](https://openspg.yuque.com/ndx6g9/ns5nw2/yoleogat9akvyqnz)
* [Medical Knowledge Graph](https://openspg.yuque.com/ndx6g9/ns5nw2/kadwgc3iarqqemo1)
* [Enterprise Supply Chain Knowledge Graph](https://openspg.yuque.com/ndx6g9/wc9oyq/wni2suux7g2tt6s2)
* [Risk Mining Knowledge Graph](https://openspg.yuque.com/ndx6g9/wc9oyq/da4h0fbphifg3dpe)
* [Medical Knowledge Graph](https://openspg.yuque.com/ndx6g9/wc9oyq/iiktuogkwigoegcv)

## Advanced tutorials

* [OpenSPG User Guide](https://openspg.yuque.com/ndx6g9/ps5q6b)
* [OneKE User Guide](https://openspg.yuque.com/ndx6g9/ps5q6b/vfoi61ks3mqwygvy)
* [OpenSPG User Guide](https://openspg.yuque.com/ndx6g9/wc9oyq)

# How to contribute

Expand Down
11 changes: 5 additions & 6 deletions README_cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,15 @@ OpenSPG核心能力模型包括:

## Get Started

* [安装说明](https://openspg.yuque.com/ndx6g9/ooil9x/xht6kkegvs33cwwr)
* [安装说明](https://openspg.yuque.com/ndx6g9/0.5/nbb1bn3wegwue6yo)
* 通过案例快速上手:
* [企业供应链图谱](https://openspg.yuque.com/ndx6g9/ooil9x/bf5kppyi5w0g7se5)
* [黑产挖掘图谱](https://openspg.yuque.com/ndx6g9/ooil9x/pst9v980k2u2p17o)
* [医疗知识图谱](https://openspg.yuque.com/ndx6g9/ooil9x/odbzpk4694lc7yfd)
* [企业供应链图谱](https://openspg.yuque.com/ndx6g9/0.5/cefh3sufvay63tb0)
* [黑产挖掘图谱](https://openspg.yuque.com/ndx6g9/0.5/haf99dg5w8wrkvop)
* [医疗知识图谱](https://openspg.yuque.com/ndx6g9/0.5/otced6rb3z4vtq2y)

## 进阶教程

* [OpenSPG用户手册](https://openspg.yuque.com/ndx6g9/nmwkzz)
* [OneKE用户手册](https://openspg.yuque.com/ndx6g9/nmwkzz/dht0wtgycuw032gd)
* [OpenSPG用户手册](https://openspg.yuque.com/ndx6g9/0.5)

# 如何贡献代码

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ private static BaseLogicalNode<?> parse(Node node) {
case BUILDER_INDEX:
return new BuilderIndexNode(
node.getId(), node.getName(), (BuilderIndexNodeConfig) node.getNodeConfig());
case PYTHON:
return new PythonNode(
node.getId(), node.getName(), (PythonNodeConfig) node.getNodeConfig());
case PARAGRAPH_SPLIT:
return new ParagraphSplitNode(
node.getId(), node.getName(), (ParagraphSplitNodeConfig) node.getNodeConfig());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2023 OpenSPG Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied.
*/

package com.antgroup.openspg.builder.core.logical;

import com.antgroup.openspg.builder.model.pipeline.config.PythonNodeConfig;
import com.antgroup.openspg.builder.model.pipeline.enums.NodeTypeEnum;

public class PythonNode extends BaseLogicalNode<PythonNodeConfig> {

public PythonNode(String id, String name, PythonNodeConfig nodeConfig) {
super(id, name, NodeTypeEnum.PYTHON, nodeConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ private static BaseProcessor<?> parse(BaseLogicalNode<?> node) {
case BUILDER_INDEX:
return new BuilderIndexProcessor(
node.getId(), node.getName(), (BuilderIndexNodeConfig) node.getNodeConfig());
case PYTHON:
return new PythonProcessor(
node.getId(), node.getName(), (PythonNodeConfig) node.getNodeConfig());
case LLM_NL_EXTRACT:
return new LLMNlExtractProcessor(
node.getId(), node.getName(), (LLMNlExtractNodeConfig) node.getNodeConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,99 +15,49 @@

import com.antgroup.openspg.builder.core.runtime.BuilderContext;
import com.antgroup.openspg.builder.model.pipeline.config.OperatorConfig;
import java.util.*;
import java.util.stream.Collectors;
import com.antgroup.openspg.common.util.pemja.PemjaUtils;
import com.antgroup.openspg.common.util.pemja.model.PemjaConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import pemja.core.PythonInterpreter;
import pemja.core.PythonInterpreterConfig;

@Slf4j
public class PythonOperatorFactory implements OperatorFactory {

private String pythonExec;
private String[] pythonPaths;
private String pythonKnextPath;
private String pythonPaths;
private String hostAddr;
private Long projectId;

private PythonOperatorFactory() {}

public static OperatorFactory getInstance() {
return new PythonOperatorFactory();
}

private PythonInterpreter newPythonInterpreter() {

PythonInterpreterConfig.PythonInterpreterConfigBuilder builder =
PythonInterpreterConfig.newBuilder();
if (pythonExec != null) {
builder.setPythonExec(pythonExec);
}
if (pythonPaths != null) {
builder.addPythonPaths(pythonPaths);
}
return new PythonInterpreter(builder.build());
}

@Override
public void init(BuilderContext context) {
pythonExec = context.getPythonExec();
pythonPaths = (context.getPythonPaths() != null ? context.getPythonPaths().split(";") : null);
pythonKnextPath = context.getPythonKnextPath();
log.info("pythonExec={}, pythonPaths={}", pythonExec, Arrays.toString(pythonPaths));
}

public PythonInterpreter getPythonInterpreter(OperatorConfig config) {
PythonInterpreter interpreter = newPythonInterpreter();
loadOperatorObject(config, interpreter);
return interpreter;
pythonPaths = context.getPythonPaths();
hostAddr = context.getSchemaUrl();
projectId = context.getProjectId();
log.info("pythonExec={}, pythonPaths={}", pythonExec, pythonPaths);
}

@Override
public void loadOperator(OperatorConfig config) {}

@Override
public Object invoke(OperatorConfig config, Object... input) {
PythonInterpreter interpreter = getPythonInterpreter(config);
String pythonObject = getPythonOperatorObject(config);
try {
return interpreter.invokeMethod(pythonObject, config.getMethod(), input);
} finally {
interpreter.close();
}
}

private void loadOperatorObject(OperatorConfig config, PythonInterpreter interpreter) {
if (StringUtils.isNotBlank(pythonKnextPath)) {
interpreter.exec(String.format("import sys; sys.path.append(\"%s\")", pythonKnextPath));
}
String pythonOperatorObject = getPythonOperatorObject(config);
interpreter.exec(
String.format("from %s import %s", config.getModulePath(), config.getClassName()));
interpreter.exec(
String.format(
"%s=%s(%s)",
pythonOperatorObject,
PemjaConfig pemjaConfig =
new PemjaConfig(
pythonExec,
pythonPaths,
hostAddr,
projectId,
config.getModulePath(),
config.getClassName(),
paramToPythonString(config.getParams(), config.getParamsPrefix())));
}

private String getPythonOperatorObject(OperatorConfig config) {
String pythonOperatorObject = config.getClassName() + "_" + config.getUniqueKey();
return pythonOperatorObject;
}

private String paramToPythonString(Map<String, String> params, String paramsPrefix) {
if (MapUtils.isEmpty(params)) {
return "";
}
if (StringUtils.isBlank(paramsPrefix)) {
paramsPrefix = "";
}
String keyValue =
params.entrySet().stream()
.map(entry -> String.format("'%s': '%s'", entry.getKey(), entry.getValue()))
.collect(Collectors.joining(","));
return String.format("%s{%s}", paramsPrefix, keyValue);
config.getMethod(),
config.getParams(),
config.getParamsPrefix());
return PemjaUtils.invoke(pemjaConfig, input);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

public class BuilderIndexProcessor extends BaseProcessor<BuilderIndexNodeConfig> {

private ExecuteNode node;
private ExecuteNode node = new ExecuteNode();
private SearchEngineClient searchEngineClient;
private CacheClient cacheClient;

Expand All @@ -48,7 +48,9 @@ public void doInit(BuilderContext context) throws BuilderException {
super.doInit(context);
searchEngineClient = SearchEngineClientDriverManager.getClient(context.getSearchEngineUrl());
cacheClient = CacheClientDriverManager.getClient(context.getCacheUrl());
this.node = context.getExecuteNodes().get(getId());
if (context.getExecuteNodes() != null) {
this.node = context.getExecuteNodes().get(getId());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,23 @@
package com.antgroup.openspg.builder.core.physical.process;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
import com.antgroup.openspg.builder.model.exception.BuilderException;
import com.antgroup.openspg.builder.model.pipeline.ExecuteNode;
import com.antgroup.openspg.builder.model.pipeline.config.ExtractPostProcessorNodeConfig;
import com.antgroup.openspg.builder.model.pipeline.enums.StatusEnum;
import com.antgroup.openspg.builder.model.record.BaseRecord;
import com.antgroup.openspg.builder.model.record.SubGraphRecord;
import com.google.common.collect.Lists;
import com.antgroup.openspg.common.constants.BuilderConstant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class ExtractPostProcessor extends BasePythonProcessor<ExtractPostProcessorNodeConfig> {

private ExecuteNode node;
private ExecuteNode node = new ExecuteNode();

public ExtractPostProcessor(String id, String name, ExtractPostProcessorNodeConfig config) {
super(id, name, config);
Expand All @@ -37,39 +39,42 @@ public ExtractPostProcessor(String id, String name, ExtractPostProcessorNodeConf
@Override
public void doInit(BuilderContext context) throws BuilderException {
super.doInit(context);
this.node = context.getExecuteNodes().get(getId());
if (context.getExecuteNodes() != null) {
this.node = context.getExecuteNodes().get(getId());
}
}

@Override
public List<BaseRecord> process(List<BaseRecord> inputs) {
node.setStatus(StatusEnum.RUNNING);
node.addTraceLog("Start post processor...");
JSONObject pyConfig = new JSONObject();
pyConfig.put(BuilderConstant.TYPE, BuilderConstant.BASE);
node.addTraceLog("Start alignment...");
List<BaseRecord> results = new ArrayList<>();

List<Map> lists = Lists.newArrayList();
for (BaseRecord record : inputs) {
SubGraphRecord spgRecord = (SubGraphRecord) record;
lists.add(mapper.convertValue(spgRecord, Map.class));
Map map = mapper.convertValue(spgRecord, Map.class);
node.addTraceLog("invoke alignment operator:%s", config.getOperatorConfig().getClassName());
List<Object> result =
(List<Object>)
operatorFactory.invoke(
config.getOperatorConfig(),
BuilderConstant.POSTPROCESSOR_ABC,
pyConfig.toJSONString(),
map);
node.addTraceLog(
"invoke alignment operator:%s succeed", config.getOperatorConfig().getClassName());
List<SubGraphRecord> records =
JSON.parseObject(JSON.toJSONString(result), new TypeReference<List<SubGraphRecord>>() {});
for (SubGraphRecord subGraph : records) {
node.addTraceLog(
"alignment succeed node:%s edge%s",
subGraph.getResultNodes().size(), subGraph.getResultEdges().size());
results.add(subGraph);
}
}

node.addTraceLog(
"invoke post processor operator:%s", config.getOperatorConfig().getClassName());
Object result = operatorFactory.invoke(config.getOperatorConfig(), lists);
node.addTraceLog(
"invoke post processor operator:%s succeed", config.getOperatorConfig().getClassName());
SubGraphRecord subGraph = JSON.parseObject(JSON.toJSONString(result), SubGraphRecord.class);
node.addTraceLog(
"post processor succeed node:%s edge%s",
subGraph.getResultNodes().size(), subGraph.getResultEdges().size());

/*ProjectSchema projectSchema = CommonUtils.getProjectSchema(context);
List<BaseSPGRecord> nodes = CommonUtils.convertNodes(subGraph, projectSchema);
List<BaseSPGRecord> edges = CommonUtils.convertEdges(subGraph, projectSchema);
results.addAll(nodes);
results.addAll(edges);*/
results.add(subGraph);
node.addTraceLog("post processor complete...");
node.setOutputs(subGraph);
node.addTraceLog("alignment complete...");
node.setStatus(StatusEnum.FINISH);
return results;
}
Expand Down
Loading

0 comments on commit f0a2853

Please sign in to comment.