As mentioned in Spark Connector Reader Principles and Practices, Spark Connector is a Spark data Connector that can be used to read and write external data systems. Spark Connector consists of two parts. Reader and Writer, and this article describes how to write Nebula Graph data using the Spark Connector.
Spark Connector Writer principle
Spark SQL allows users to customize data sources and can be extended to external data sources.
Nebula’s Spark Connector single-piece data writing is based on the DatasourceV2 implementation and requires the following steps:
- inheritance
WriteSupport
Pay equal attention to writingcreateWriter
, create a customDataSourceWriter
. - inheritance
DataSourceWriter
createNebulaDataSourceVertexWriter
Classes andNebulaDataSourceEdgeWriter
Class, rewritecreateWriterFactory
Method and returns a customDataWriterFactory
, the rewritecommit
Method to commit the entire transaction. rewriteabort
Method to do transaction rollback. Nebula Graph 1.x does not support transaction operations and is currently in this implementationcommit
和abort
No substantive operation. - inheritance
DataWriterFactory
createNebulaVertexWriterFactory
Classes andNebulaEdgeWriterFactory
Class, rewritecreateWriter
Method returns a customDataWriter
. - inheritance
DataWriter
createNebulaVertexWriter
Classes andNebulaEdgeWriter
Class, rewritewrite
Method to write the data out and rewrite itcommit
The override method is used to commit a transactionabort
The rollback method is used to do transaction rollback, alsoDataWriter
In thecommit
Methods andabort
Method has no substantial operation.
Nebula’s Implementation class diagram for Spark Connector Writer is as follows:
Write logic In the write methods of NebulaVertexWriter and NebulaEdgeWriter, the write logic is as follows:
- Create a client to connect to Nebula’s Graphd service;
- Specify graphSpace before data is written;
- The data that constructs Nebula is written to statement;
- Submit statement to perform write operation.
- Define a callback function to receive the result of a write operation.
Nebula’s Spark Connector’s bulk data writing is similar to the Exchange tool in that it is implemented by cumulative batch data submissions for map operations on DataFrame.
Spark Connector Writer practice
The Writer function of Spark Connector provides two types of interfaces for users to write data. The data source is DataFrame. Spark Writer provides two interfaces: single write and batch write.
Grab the Spark Connector code on GitHub:
Git clone - b v1.0 https://github.com/vesoft-inc/nebula-java.git CD nebula - Java/tools/nebula - spark MVN clean compile package install -Dgpg.skip -Dmaven.javadoc.skip=trueCopy the code
Copy the compiled package to your local Maven library.
The following is an example:
- Add to the POM file of the MVN project
nebula-spark
Rely on
<dependency> <groupId>com.vesoft</groupId> <artifactId>nebula- Spark </artifactId> <version>1.0.1</version>Copy the code
- Write the DataFrame data to Nebula in the Spark program
- 2.1 Write Nebula:
// Construct point and edge data DataFrame, The sample data in the nebula - Java/examples/SRC/main/resources directory val vertexDF = spark. Read. Json (" examples/SRC/main/resources/vertex ") VertexDF. The show () val edgeDF = spark. Read the json (" examples/SRC/main/resources/edge ") edgeDF. The show () / / write some vertexDF. Write Nebula (127.0.0.1: "3699", "nb", "100"). The writeVertices (" player ", "vertexId", Nebula ("127.0.0.1:3699", "NB ", "100"). WirteEdges ("follow", "source", "target")Copy the code
Configuration description:
- nebula(address: String, space: String, partitionNum: String)
- Address: Multiple IP addresses can be separated by commas (,), for example, IP1:3699,ip2:3699.
- Space: the graphSpace Nebula
- PartitionNum: The partitionNum in the Nebula specified when creating the space. If not specified, the default value is 100
- writeVertices(tag: String, vertexFiled: String, policy: String = “”)
- Tag: The tag in Nebula
- VertexFiled: Dataframe Columns that can be used as the Nebula ID. For example, columns A, B, and C of the Dataframe are filed. If column A is used as the ID column of the point, this parameter is set to A
- Policy: If the data type of the vertexFiled column in the DataFrame is not numeric, you need to configure a mapping policy for VID in the Nebula file
- writeEdges(edge: String, srcVertexField: String, dstVertexField: String, policy: String = “”)
- Edge: Edge in Nebula
- SrcVertexField: Column in a DataFrame that can be used as a source point
- DstVertexField: The column in the DataFrame that can be used as an edge target
- Policy: If the srcVertexField column or the dstVertexField column in the DataFrame is of a non-numeric type, you need to configure the mapping policy for the EDGE ID in Nebula
- 2.2 Batch writing Nebula
// Construct point and edge data DataFrame, The sample data in the nebula - Java/examples/SRC/main/resources directory val vertexDF = spark. Read. Json (" examples/SRC/main/resources/vertex ") VertexDF. The show () val edgeDF = spark. Read the json (" examples/SRC/main/resources/edge ") edgeDF. The show () / / write some new batch BatchInsert ("127.0.0.1:3699", "nb", 2000). BatchToNebulaVertex (vertexDF, "player", New NebulaBatchWriterUtils().batchinsert ("127.0.0.1:3699", "nb", 2000).batchtonebulaEdge (edgeDF, "follow", "source", "target")Copy the code
Configuration description:
- batchInsert(address: String, space: String, batch: Int = 2000)
- Address: Multiple IP addresses can be separated by commas (,), for example, IP1:3699,ip2:3699.
- Space: Nebula’s graphSpace
- Batch: Indicates the amount of data written in a batch. This parameter is optional. The default value is 2000
- batchToNebulaVertex(data: DataFrame, tag: String, vertexField: String, policy: String = “”)
- Data: DataFrame data to be written to Nebula
- Tag: The tag in Nebula
- VertexField: Column in the Dataframe available as a Nebula point ID
- Policy: Mapping policy for VID in Nebula. This configuration is optional when the vertexField column is a numeric value
- batchToNebulaEdge(data: DataFrame, edge: String, srcVertexField: String, dstVertexField: String, rankField: String = “”, policy: String = “”)
- Data: DataFrame data to be written to Nebula
- Edge: Edge in Nebula
- SrcVertexField: Column in a DataFrame that can be used as a source point
- DstVertexField: The column in the DataFrame that can be used as an edge target
- RankField: specifies the column in the DataFrame that can be used as an edge rank. This parameter is optional
- Policy: Indicates the mapping policy for edge midpoint. This parameter is optional when srcVertexField and dstVertexField are numeric
With Nebula Spark Connector Writer behind you, go to GitHub: github.com/vesoft-inc/… To try.
Like this article? GitHub: 🙇♂️🙇♀️
Ac graph database technology? NebulaGraphbot takes you into the NebulaGraphbot community