Skip to content

Commit

Permalink
feat: support path in nGQL, optimized nGQL edge reader (#133)
Browse files Browse the repository at this point in the history
* feat: support path in nGQL, not raise exception for other types

close #132

* fix typing and error handling as commented

* handle List Value typing

* wording of the exception and logging

* fix typing, added test for ngql

* test: add GET SUBGRAPH and FIND PATH

* fix ngql example and test cases

* fix typo in withConnectionRetry

* add back withConnectionRetry in ngql tests

* fix missing operateType

* fix null pointer

* test: add with prop for prop reading

* docs: update examples for with prop clause
  • Loading branch information
wey-gu authored Dec 21, 2023
1 parent 49306d0 commit 4666866
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,10 @@ object NebulaSparkReaderExample {
//.withNoColumn(true)
.withReturnCols(List("degree"))
// please make sure your ngql statement result is edge, connector does not check the statement.
.withNgql("match (v)-[e:friend]-(v2) return e")
// other examples of supported nGQL:
// - GET SUBGRAPH WITH PROP 3 STEPS FROM 2 YIELD EDGES AS relationships;
// - FIND ALL PATH WITH PROP FROM 2 TO 4 OVER friend YIELD path AS p;
.withNgql("match (v)-[e:friend]->(v2) return e")
.build()
val edge = spark.read.nebula(config, nebulaReadConfig).loadEdgesToDfByNgql()
edge.printSchema()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ object NebulaConnectionConfig {
/**
* set connectionRetry, connectionRetry is optional
*/
@deprecated("use withConnectionRetry instead", "3.7.0")
def withConenctionRetry(connectionRetry: Int): ConfigBuilder = {
this.connectionRetry = connectionRetry
this
}

def withConnectionRetry(connectionRetry: Int): ConfigBuilder = {
this.connectionRetry = connectionRetry
this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ package object connector {
val dfReader = reader
.format(classOf[NebulaDataSource].getName)
.option(NebulaOptions.TYPE, DataTypeEnum.EDGE.toString)
.option(NebulaOptions.OPERATE_TYPE, OperaType.READ.toString)
.option(NebulaOptions.SPACE_NAME, readConfig.getSpace)
.option(NebulaOptions.LABEL, readConfig.getLabel)
.option(NebulaOptions.RETURN_COLS, readConfig.getReturnCols.mkString(","))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,19 @@ class NebulaNgqlEdgePartitionReader extends InputPartitionReader[InternalRow] {
val list: mutable.Buffer[ValueWrapper] = value.asList()
edges.appendAll(
list.toStream
.filter(e => checkLabel(e.asRelationship()))
.filter(e => e != null && e.isEdge() && checkLabel(e.asRelationship()))
.map(e => convertToEdge(e.asRelationship(), properties))
)
} else {
LOG.error(s"Exception convert edge type ${valueType} ")
throw new RuntimeException(" convert value type failed");
} else if (valueType == Value.PVAL){
val list: java.util.List[Relationship] = value.asPath().getRelationships()
edges.appendAll(
list.toStream
.filter(e => checkLabel(e))
.map(e => convertToEdge(e, properties))
)
} else if (valueType != Value.NVAL && valueType != 0) {
LOG.error(s"Unexpected edge type encountered: ${valueType}. Only edge or path should be returned.")
throw new RuntimeException("Invalid nGQL return type. Value type conversion failed.");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,4 +337,85 @@ class ReadSuite extends AnyFunSuite with BeforeAndAfterAll {
})(Encoders.STRING)
}

test("read edge from nGQL: MATCH ()-[e:friend]->() RETURN e LIMIT 1000")
{
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("127.0.0.1:9559")
.withGraphAddress("127.0.0.1:9669")
.withConnectionRetry(2)
.build()
val nebulaReadConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("test_int")
.withLabel("friend")
.withNoColumn(false)
.withReturnCols(List("col1"))
.withNgql("MATCH ()-[e:friend]->() RETURN e LIMIT 1000")
.build()
val edge = sparkSession.read.nebula(config, nebulaReadConfig).loadEdgesToDfByNgql()
edge.printSchema()
edge.show(truncate = false)
assert(edge.count() == 12)
assert(edge.schema.fields.length == 4)
edge.map(row => {
row.getAs[Long]("_srcId") match {
case 1L => {
assert(row.getAs[Long]("_dstId") == 2)
assert(row.getAs[Long]("_rank") == 0)
assert(row.getAs[String]("col1").equals("friend1"))
}
}
""
})(Encoders.STRING)
}


test("read edge from nGQL: GET SUBGRAPH WITH PROP 3 STEPS FROM 2 YIELD EDGES AS relationships")
{
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("127.0.0.1:9559")
.withGraphAddress("127.0.0.1:9669")
.withConnectionRetry(2)
.build()
val nebulaReadConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("test_int")
.withNoColumn(false)
.withLabel("friend")
.withReturnCols(List("col1"))
.withNgql("GET SUBGRAPH WITH PROP 3 STEPS FROM 2 YIELD EDGES AS relationships")
.build()
val edge = sparkSession.read.nebula(config, nebulaReadConfig).loadEdgesToDfByNgql()
edge.printSchema()
edge.show(truncate = false)
assert(edge.count() == 6)
}

test("read edge from nGQL: FIND ALL PATH WITH PROP FROM 2 TO 4 OVER friend YIELD path AS p")
{
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("127.0.0.1:9559")
.withGraphAddress("127.0.0.1:9669")
.withConnectionRetry(2)
.build()
val nebulaReadConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("test_int")
.withNoColumn(false)
.withLabel("friend")
.withReturnCols(List("col1"))
.withNgql("FIND ALL PATH WITH PROP FROM 2 TO 4 OVER friend YIELD path AS p")
.build()
val edge = sparkSession.read.nebula(config, nebulaReadConfig).loadEdgesToDfByNgql()
edge.printSchema()
edge.show(truncate = false)
assert(edge.count() == 2)
}

}

0 comments on commit 4666866

Please sign in to comment.