As mentioned in Spark Connector Reader Principles and Practices, Spark Connector is a Spark data Connector that can be used to read and write external data systems. Spark Connector consists of two parts. Reader and Writer, and this article describes how to write Nebula Graph data using the Spark Connector.

Spark Connector Writer principle

Spark SQL allows users to customize data sources and can be extended to external data sources.

Nebula’s Spark Connector single-piece data writing is based on the DatasourceV2 implementation and requires the following steps:

  1. inheritanceWriteSupportPay equal attention to writingcreateWriter, create a customDataSourceWriter.
  2. inheritanceDataSourceWritercreateNebulaDataSourceVertexWriterClasses andNebulaDataSourceEdgeWriterClass, rewritecreateWriterFactoryMethod and returns a customDataWriterFactory, the rewritecommitMethod to commit the entire transaction. rewriteabortMethod to do transaction rollback. Nebula Graph 1.x does not support transaction operations and is currently in this implementationcommitabortNo substantive operation.
  3. inheritanceDataWriterFactorycreateNebulaVertexWriterFactoryClasses andNebulaEdgeWriterFactoryClass, rewritecreateWriterMethod returns a customDataWriter.
  4. inheritanceDataWritercreateNebulaVertexWriterClasses andNebulaEdgeWriterClass, rewritewriteMethod to write the data out and rewrite itcommitThe override method is used to commit a transactionabortThe rollback method is used to do transaction rollback, alsoDataWriterIn thecommitMethods andabortMethod has no substantial operation.

Nebula’s Implementation class diagram for Spark Connector Writer is as follows:

Write logic In the write methods of NebulaVertexWriter and NebulaEdgeWriter, the write logic is as follows:

  1. Create a client to connect to Nebula’s Graphd service;
  2. Specify graphSpace before data is written;
  3. The data that constructs Nebula is written to statement;
  4. Submit statement to perform write operation.
  5. Define a callback function to receive the result of a write operation.

Nebula’s Spark Connector’s bulk data writing is similar to the Exchange tool in that it is implemented by cumulative batch data submissions for map operations on DataFrame.

Spark Connector Writer practice

The Writer function of Spark Connector provides two types of interfaces for users to write data. The data source is DataFrame. Spark Writer provides two interfaces: single write and batch write.

Grab the Spark Connector code on GitHub:

Git clone - b v1.0 https://github.com/vesoft-inc/nebula-java.git CD nebula - Java/tools/nebula - spark MVN clean compile package install -Dgpg.skip -Dmaven.javadoc.skip=trueCopy the code

Copy the compiled package to your local Maven library.

The following is an example:

  1. Add to the POM file of the MVN projectnebula-sparkRely on
<dependency> <groupId>com.vesoft</groupId> <artifactId>nebula- Spark </artifactId> <version>1.0.1</version>Copy the code
  1. Write the DataFrame data to Nebula in the Spark program
  • 2.1 Write Nebula:
// Construct point and edge data DataFrame, The sample data in the nebula - Java/examples/SRC/main/resources directory val vertexDF = spark. Read. Json (" examples/SRC/main/resources/vertex ") VertexDF. The show () val edgeDF = spark. Read the json (" examples/SRC/main/resources/edge ") edgeDF. The show () / / write some vertexDF. Write Nebula (127.0.0.1: "3699", "nb", "100"). The writeVertices (" player ", "vertexId", Nebula ("127.0.0.1:3699", "NB ", "100"). WirteEdges ("follow", "source", "target")Copy the code

Configuration description:

  • nebula(address: String, space: String, partitionNum: String)
    • Address: Multiple IP addresses can be separated by commas (,), for example, IP1:3699,ip2:3699.
    • Space: the graphSpace Nebula
    • PartitionNum: The partitionNum in the Nebula specified when creating the space. If not specified, the default value is 100
  • writeVertices(tag: String, vertexFiled: String, policy: String = “”)
    • Tag: The tag in Nebula
    • VertexFiled: Dataframe Columns that can be used as the Nebula ID. For example, columns A, B, and C of the Dataframe are filed. If column A is used as the ID column of the point, this parameter is set to A
    • Policy: If the data type of the vertexFiled column in the DataFrame is not numeric, you need to configure a mapping policy for VID in the Nebula file
  • writeEdges(edge: String, srcVertexField: String, dstVertexField: String, policy: String = “”)
    • Edge: Edge in Nebula
    • SrcVertexField: Column in a DataFrame that can be used as a source point
    • DstVertexField: The column in the DataFrame that can be used as an edge target
    • Policy: If the srcVertexField column or the dstVertexField column in the DataFrame is of a non-numeric type, you need to configure the mapping policy for the EDGE ID in Nebula
  • 2.2 Batch writing Nebula
// Construct point and edge data DataFrame, The sample data in the nebula - Java/examples/SRC/main/resources directory val vertexDF = spark. Read. Json (" examples/SRC/main/resources/vertex ") VertexDF. The show () val edgeDF = spark. Read the json (" examples/SRC/main/resources/edge ") edgeDF. The show () / / write some new batch BatchInsert ("127.0.0.1:3699", "nb", 2000). BatchToNebulaVertex (vertexDF, "player", New NebulaBatchWriterUtils().batchinsert ("127.0.0.1:3699", "nb", 2000).batchtonebulaEdge (edgeDF, "follow", "source", "target")Copy the code

Configuration description:

  • batchInsert(address: String, space: String, batch: Int = 2000)
    • Address: Multiple IP addresses can be separated by commas (,), for example, IP1:3699,ip2:3699.
    • Space: Nebula’s graphSpace
    • Batch: Indicates the amount of data written in a batch. This parameter is optional. The default value is 2000
  • batchToNebulaVertex(data: DataFrame, tag: String, vertexField: String, policy: String = “”)
    • Data: DataFrame data to be written to Nebula
    • Tag: The tag in Nebula
    • VertexField: Column in the Dataframe available as a Nebula point ID
    • Policy: Mapping policy for VID in Nebula. This configuration is optional when the vertexField column is a numeric value
  • batchToNebulaEdge(data: DataFrame,  edge: String, srcVertexField: String, dstVertexField: String, rankField: String = “”,  policy: String = “”)
    • Data: DataFrame data to be written to Nebula
    • Edge: Edge in Nebula
    • SrcVertexField: Column in a DataFrame that can be used as a source point
    • DstVertexField: The column in the DataFrame that can be used as an edge target
    • RankField: specifies the column in the DataFrame that can be used as an edge rank. This parameter is optional
    • Policy: Indicates the mapping policy for edge midpoint. This parameter is optional when srcVertexField and dstVertexField are numeric

With Nebula Spark Connector Writer behind you, go to GitHub: github.com/vesoft-inc/… To try.

Like this article? GitHub: 🙇♂️🙇♀️

Ac graph database technology? NebulaGraphbot takes you into the NebulaGraphbot community