This article describes how to import data from Neo4j into the Nebula Graph Database using the Nebula Graph Exchange data import tool. Before taking a look at how data import works in practice with Nebula Graph, let’s take a look at how the import function is implemented inside Nebula Graph.

Nebula Graph Exchange data processing principles

The import tool, named Nebula Graph Exchange, uses Spark as the import platform to support massive data imports and ensure performance. Spark itself provides a nice abstraction, the DataFrame, that makes it easy to support multiple data sources. Adding a new data source with DataFrame support requires only the code that the configuration file reads and the Reader class that returns the DataFrame.

A DataFrame can be thought of as a distributed storage table. DataFrame can be stored in different partitions on multiple nodes, and multiple partitions can be stored on different machines, enabling parallel operation. Spark also provides a concise API that makes it easy for users to manipulate dataframes as if they were local data sets. Most databases now provide the ability to export data directly to a DataFrame, and even if a database does not provide this capability you can build a DataFrame manually using the database driver.

After the Nebula Graph Exchange has processed the data from the data source into a DataFrame, it iterates through each row, retrieving values by column names based on the mapping of fields in the configuration file. After iterating through the batchSize rows, Exchange writes the captured data to the Nebula Graph at once. Currently, Exchange generates nGQL statements and writes data asynchronously to Nebula Client. The next step is to support direct export of THE SST files stored under Nebula Graph for better performance. Next, we will introduce the implementation of Neo4j data source import.

Neo4j data import concrete implementation

Although Neo4j officially provides a library that can directly export data to DataFrame, it is difficult to use it to read data to meet the requirements of breakpoint continuation. Instead of using this library directly, we use the official driver of Neo4j to read data. Exchange obtains better performance by invoking Neo4j drivers in different partitions to execute Cypher statements of different skip and limit and distribute data in different partitions. The number of partitions is specified by the configuration item partition.

The Neo4jReader class in Exchange will first replace the exec Cypher statement in user configuration with the count(*) statement after return to obtain the total amount of data, and then calculate the starting offset and size of each partition based on the number of partitions. If the user configures the check_point_path directory, the files in the directory will be read. If the directory is in the continued state, Exchange will calculate the offset and size of each partition. Each partition then adds a different SKIP and limit to the Cypher statement and calls driver execution. Finally, the returned data is processed into DataFrame to complete the data import of Neo4j.

The process is shown in the figure below:

Neo4j data import practices

The system environment for importing the demo is as follows:

  • CPU Name: Intel(R) Xeon(R) CPU E5-2697 V3 @ 2.60GHz
  • CPU cores: 14
  • The memory size: 251 g

The software environment is as follows:

  • Neo4j: 3.5.20 Community edition
  • Nebula Graph: Docker-compose deployment, default configuration
  • Spark: standalone version, 2.4.6 pre-build for Hadoop2.7

Because Nebula Graph is a strong Schema database, you need to create Space, Tag, and Edge schemas before importing the data. See the syntax here.

Here we create a Space named test with 1 copy. Here we create two point types with tags tagA and tagB, each containing four attributes, as well as an edge type called edgeAB, which also contains four attributes. The nGQL statements are as follows:

Create graph spaceCREATE SPACE test(replica_factor=1);
# 选择图空间 test
USE test;
# 创建标签 tagA
CREATE TAG tagA(idInt int, idString string, tboolean bool, tdouble double); Create the tag tagBCREATE TAG tagB(idInt int, idString string, tboolean bool, tdouble double); Create edge type edgeABCREATE EDGE edgeAB(idInt int, idString string, tboolean bool, tdouble double);
Copy the code

We also imported Mock data into Neo4j — 1 million points labeled tagA and tagB — and 10 million edges of type edgeAB that connect tagA and tagB types. It is also important to note that the data exported from Neo4j must have properties in Nebula Graph that correspond to the same type as Nebula Graph.

Finally, in order to improve the efficiency of importing Mock data into Neo4j and reading Mock data from Neo4j, the idInt attribute of tagA and tagB is indexed. With respect to indexes, note that Exchange does not import indexes, constraints, and other information from Neo4j into Nebula Graph, so you need to create and REBUILD indexes yourself after writing data to Nebula Graph.

To import the Neo4j data into Nebula Graph, we need to download and compile the package project, which is stored in the Tools/Exchange folder of the Nebula – Java repository. You can run the following commands:

git clone https://github.com/vesoft-inc/nebula-java.git
cd nebula-java/tools/exchange
mvn package -DskipTests
Copy the code

You can then see the target/exchange-1.0.1.jar file.

Next, write a configuration file in the following format: HOCON (Human – Optimized Config Object Notation), can be based on SRC/main/resources/server_application conf file on the basis of the changes. The Address, User, PSWD, and Space Settings under the Nebula configuration are configured first. The test environments are all configured by default, so no additional changes are required here. TagA and tagB are required. Only the tagA configuration is shown here. The tagB and tagA configurations are the same.

{# ======neo4j connection Settings: ======= name: tagA # must match the tag name in Nebula Graph. The Tag Server must be built in Nebula Graph: "Bolt ://127.0.0.1:7687" # IP address configuration for neo4j user: neo4j # user name for neo4j password: neo4j # password for neo4j Database: graph.db # (optional): neo4j database name, community version does not support # ====== import Settings ============ type: {source: Neo4j # also supports PARQUET, ORC, JSON, CSV, HIVE, MYSQL, PULSAR, KAFKA... With Nebula: Sink: Client, you can write to the Nebula Graph database file directly from the Nebula Graph client. [idInt, idString, tdouble, tboolean] fields : [idInt, idString, tdouble, tboolean] [idInt, idString, tdouble, tboolean] [idInt, idString, tdouble, tboolean] [idInt, idString, tdouble, tboolean] To preserve the order of fields, future direct exports of the Nebula underlying storage file will require Vertex: idInt # as a NEO4j field with NEBULA VID, of type long or int. Check_point_path: "file:///tmp/test" # Exec: directory where the import progress information is stored for breakpoint continuation "match (n:tagA) return n.idInt as idInt, n.idString as idString, n.tdouble as tdouble, n.tboolean as tboolean order by n.idInt" }Copy the code

The Settings for the edges are mostly the same as for the points, but because the edges are identified with the vid of the starting point and the VID of the ending point in the Nebula Graph, you need to specify the field for the vid of the starting point and the field for the vid of the ending point.

The special configuration of edges is given below.

source: {
  field: a.idInt
  # policy: "hash"
}
#Vid Settings for the starting point
target: {
  field: b.idInt
  # policy: "uuid"
}
#The VID setting for the endpoint

ranking: idInt
#(Optional): Field for rank

partition: 1
#Here, the number of partitions is set to 1 for a later reason

exec: "match (a:tagA)-[r:edgeAB]->(b:tagB) return a.idInt, b.idInt, r.idInt as idInt, r.idString as idString, r.tdouble as tdouble, r.tboolean as tboolean order by id(r)"
Copy the code

The policy Hash/UUID can be set under vertex and edge source and target configuration items. It can use the hash/ UUID function to map a string to an integer as the VID of a point.

The above example does not require the policy setting because the VID as a point is an integer. See the hash/ UUID difference here.

If there is no order BY constraint in Cypher standard, it cannot guarantee the consistent ordering of each query result. Although it seems that the order of results returned by Neo4j is unchanged even without order by Neo4j, in order to prevent possible data loss during import, It is still strongly recommended to include order by in Cypher statements, although this will increase the import time. To improve import efficiency, it is best to select indexed attributes as sorting attributes for the ORDER BY statement. If there is no index, you can also observe the default sort and select the appropriate sort attribute for efficiency. If no rule can be found in the default sorting, the ID of point/relationship can be used as the sorting attribute, and the partition value can be set as small as possible to reduce the sorting pressure of Neo4j. In this paper, the partition of edge edgeAB is set to 1.

In addition, Nebula Graph uses ID as the unique primary key when creating points and edges, overwriting the data in that primary key if it already exists. So if a Neo4j property value is used as the Nebula Graph’S ID, and the value is repeated in Neo4j, one and only one of the data for the “duplicate ID” will be stored in the Nebula Graph, and the others will be overwritten. Because the data import process is concurrently writing data to Nebula Graph, the resulting saved data is not guaranteed to be the latest data in Neo4j.

The state of the database should not change between breakpoints, such as adding or deleting data, and the number of partitions should not change. Otherwise, data may be lost.

Finally, because Exchange needs to execute different SKIP and limit Cypher statements on different partitions, user supplied Cypher statements cannot contain SKIP and limit statements.

To execute Exchange data, run the following command:

$SPARK_HOME/bin/spark-submit  --class com.vesoft.nebula.tools.importer.Exchange --master "local[10]"The target/exchange - 1.0.1. Jar - c/path/to/conf/neo4j_application. Conf
Copy the code

In these configurations, it takes 13s to import 1 million points and 213s to import 10 million edges, for a total of 226s.

Bonus: Some comparisons between Neo4j 3.5 Community and Nebula Graph 1.0.1

Neo4j and Nebula Graph share some differences in system architecture, data model, and access. The following table lists common similarities and differences

Hi, I’m Lee Mong-Je, the Nebula Graph developer behind this story. If you have a puzzle with Nebula Graph, please check out our Nebula Graph forum

Like this article? GitHub: 🙇♂️🙇♀️

Ac graph database technology? NebulaGraphbot takes you into the NebulaGraphbot community

Recommended reading

  • NebulaGraph takes service to NebulaGraph