This article attempts to take a quick walk through the Nebula Exchange SST writing process in a minimal manner (standalone, containable Spark, Hadoop, Nebula Graph). This article is available for Nebula- Exchange version 2.5 and above.

Original link:

  • Siwei. IO/Nebula – Exch…
  • At home: CN.siwei. IO/Nebula – Exch…

What is Nebula Exchange?

Nebula Exchange is an open source Spark Appliciton for the Nebula Graph community that I described earlier in Nebula Data Import Options. It is designed to support bulk or streaming data into the Nebula Graph Database.

Nebula Exchange supports a variety of data sources (from Apache Parquet, ORC, JSON, CSV, HBase, Hive MaxCompute to Neo4j, MySQL, ClickHouse, Kafka, Pulsar, More data sources are being added.

As you can see above, inside Exchange, while the data is being written to the (sink) Nebula Graph database by Writer after being processed by the Processor, in addition to the Nebula Graph data sources that can be read by different readers, In addition to the normal write process of ServerBaseWriter, it can bypass the whole write process and use Spark’s computing power to generate SST files of the underlying RocksDB in parallel, thus achieving ultra-high performance data import. This SST file import scenario is a familiar part of this article.

For more information, see: Nebula Graph Manual: What is Nebula Exchange

The Nebula Graph blog also has more Nebula Exchange hands-on articles

Overview of the steps

  • Experimental environment
  • Configuration Exchange
  • Generate the SST file
  • Write the SST file to Nebula Graph

Preparation of experimental environment

To minimize the use of Nebula Exchange’s SST capabilities, we need to:

  • To build a Nebula Graph cluster and create a Schema for importing data, we chose to use docker-compose, deploy it quickly with Nebula Up, and simply modify the network to make it accessible to the same containerized Exchange programs.
  • Set up a containerized Spark running environment
  • Set up HDFS in containers

1. Build the Nebula Graph cluster

With Nebula Up we can deploy a Nebula Graph cluster in a Single click in a Linux environment:

curl -fsSL nebula-up.siwei.io/install.sh | bash
Copy the code

After the deployment is successful, we need to make some changes to the environment. Here are two changes:

  1. Only one metaD service remains
  2. Use Docker’s external network

Please refer to Appendix I for detailed modifications

Docker-compose: docker-compose

cd ~/.nebula-up/nebula-docker-compose
vim docker-compose.yaml # Refer to Appendix I
docker network create nebula-net Create an external network
docker-compose up -d --remove-orphans
Copy the code

After that, we create the diagram space we want to test, and create the Schema for the diagram. For this, we can use Nebula Console. Again, Nebula Up comes with containerized Nebula Console.

  • Enter the container where Nebula-Console is located
~/.nebula-up/console.sh
/ #
Copy the code
  • Initiate a link to the graph database in the console container192.168. X.yIs the address of the first network card of my Linux VM, please change it to yours
Y -port 9669 -user root -p password [INFO] Connection pool is initialized successfully Welcome to Nebula Graph!Copy the code
  • Create a graph space (let’s call itsst), and Schema
create space sst(partition_num=5,replica_factor=1,vid_type=fixed_string(32));
:sleep 20
use sst
create tag player(name string, age int);
Copy the code

Sample output

(root@nebula) [(none)]> create space sst(partition_num=5,replica_factor=1,vid_type=fixed_string(32));
Execution succeeded (time spent 1468/1918 us)

(root@nebula) [(none)]> :sleep 20

(root@nebula) [(none)]> use sst
Execution succeeded (time spent 1253/1566 us)

Wed, 18 Aug 2021 08:18:13 UTC

(root@nebula) [sst]> create tag player(name string, age int);
Execution succeeded (time spent 1312/1735 us)

Wed, 18 Aug 2021 08:18:23 UTC
Copy the code

2. Set up a containerized Spark environment

Using the work done by Big Data Europe, this process is very easy.

It is worth noting:

  • Today’s Nebula Exchange requires a version of Spark, and as of August 2021, I used spark-2.4.5-Hadoop-2.7.
  • For convenience, I have Spark running on the same Nebula Graph machine and specified to run on the same Docker network
docker run --name spark-master --network nebula-net \
    -h spark-master -e ENABLE_INIT_DAEMON=false- d \ bde2020 / spark - master: 2.4.5 - hadoop2.7Copy the code

Then, we can enter the environment:

docker exec -it spark-master bash
Copy the code

Once inside the Spark container, you can install Maven like this:

exportMAVEN_VERSION = 3.5.4export MAVEN_HOME=/usr/lib/mvn
export PATH=$MAVEN_HOME/bin:$PATH

wget http://archive.apache.org/dist/maven/maven-3/$MAVEN_VERSION/binaries/apache-maven-$MAVEN_VERSION-bin.tar.gz && \
  tar -zxvf apache-maven-$MAVEN_VERSION-bin.tar.gz && \
  rm apache-maven-$MAVEN_VERSION-bin.tar.gz && \
  mv apache-maven-$MAVEN_VERSION /usr/lib/mvn
Copy the code

You can also download the Nebula Exchange JAR package in the container as follows:

cdWget ~ https://repo1.maven.org/maven2/com/vesoft/nebula-exchange/2.1.0/nebula-exchange-2.1.0.jarCopy the code

3. Set up HDFS as a container

Again with big-data-Euroupe’s work, this is very simple, but we need to modify it a little bit to use the Nebula -net docker network created earlier in its Docker-comedy.yml file.

Refer to Appendix II for detailed modifications

git clone https://github.com/big-data-europe/docker-hadoop.git
cd docker-hadoop
vim docker-compose.yml
docker-compose up -d
Copy the code

Configuration Exchange

The configuration mainly fills in the Nebula Graph cluster itself and the Space Name to write the data to, as well as the configuration associated with the data source (here we use CSV as an example), and finally the output (sink) as SST

  • Nebula Graph
    • GraphD address
    • MetaD address
    • credential
    • Space Name
  • The data source
    • source: csv
      • path
      • fields etc.
    • ink: sst

For details, see Appendix II

Note that the address for metaD can be obtained as 0.0.0.0:49377->9559 indicates that 49377 is an external address.

887740 $docker ps | grep meta c15750 vesoft/nebula - metad: v2.0.0". / bin/nebula - the metad..."6 hours ago Up 6 hours (healthy) 9560/ TCP, 0.0.0.0:49377->9559/ TCP, ::49377->9559/ TCP, 0.0.0.0:49376->19559/ TCP, ::49377->9559/ TCP, 0.0.0.0:49376->19559/ TCP, : : : 49376 - > 19559 / TCP, 0.0.0.0:49375 - > 19560 / TCP, : : : 49375-19560 / TCP nebula - > docker - compose_metad0_1Copy the code

Generate the SST file

1. Prepare source files and configuration files

docker cp exchange-sst.conf spark-master:/root/
docker cp player.csv spark-master:/root/
Copy the code

Here is an example of player.csv:

1100,Tim Duncan,42
1101,Tony Parker,36
1102,LaMarcus Aldridge,33
1103,Rudy Gay,32
1104,Marco Belinelli,32
1105,Danny Green,31
1106,Kyle Anderson,25
1107,Aron Baynes,32
1108,Boris Diaw,36
1109,Tiago Splitter,34
1110,Cory Joseph,27
1111,David West,38
Copy the code

2. Run the Exchange program

Enter the spark-master container and submit the Exchange application.

docker exec -it spark-master bash
cd /root/
/spark/bin/spark-submit --master local\ -- class com vesoft. Nebula. Exchange. Exchange nebula - exchange - 2.1.0. Jar \ - c exchange - SST. ConfCopy the code

Check the execution result:

The spark – submit output:

21/08/17 03:37:43 INFO TaskSetManager: Finished task 31.0 inStage 2.0 (dar) 33in 1093 ms on localhost (executor driver) (32/32)
21/08/17 03:37:43 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
21/08/17 03:37:43 INFO DAGScheduler: ResultStage 2 (foreachPartition at VerticesProcessor.scala:179) finished in22.336 s 21/08/17 03:37:43 INFO DAGScheduler: Job 1 Finished: ForeachPartition at VerticesProcessor. Scala: 179, took 22.500639 s 21/08/17 03:37:43 INFO Exchange $: SST - Import: failure.player: 0 21/08/17 03:37:43 WARN Exchange$: Edge is not defined 21/08/17 03:37:43 INFO SparkUI: Stopped Spark web UI at http://spark-master:4040 21/08/17 03:37:43 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!Copy the code

Verify SST files generated on HDFS:

docker exec -it namenode /bin/bash

root@2db58903fb53:/# hdfs dfs -ls /sstFound 10 items drwxr-xr-x - root supergroup 0 2021-08-17 03:37 /sst/1 drwxr-xr-x - root supergroup 0 2021-08-17 03:37 /sst/10 drwxr-xr-x - root supergroup 0 2021-08-17 03:37 /sst/2 drwxr-xr-x - root supergroup 0 2021-08-17 03:37 /sst/3 drwxr-xr-x - root supergroup 0 2021-08-17 03:37 /sst/4 drwxr-xr-x - root supergroup 0 2021-08-17 03:37 /sst/5 drwxr-xr-x  - root supergroup 0 2021-08-17 03:37 /sst/6 drwxr-xr-x - root supergroup 0 2021-08-17 03:37 /sst/7 drwxr-xr-x - root supergroup 0 2021-08-17 03:37 /sst/8 drwxr-xr-x - root supergroup 0 2021-08-17 03:37 /sst/9Copy the code

Write SST to Nebula Graph

The operations here are actually reference documents: SST import, come. There are two steps from the console:

  • Download
  • Ingest

Download actually triggers Nebula Graph to initiate a Download of the HDFS Client from the server, take the SST file from the HDFS, and place it in a local directory accessible to storageD. We need to deploy HDFS dependencies on the server side. Since we were minimum practice, I was lazy and did the Download operation manually.

1. Manual download

Nebula /data/storage/ Nebula /

/ Download /

In this example, our Space Name is SST and our Space ID is 49.

(root@nebula) [sst]> DESC space sst
+----+-------+------------------+----------------+---------+------------+--------------------+-------------+-----------+
| ID | Name  | Partition Number | Replica Factor | Charset | Collate    | Vid Type           | Atomic Edge | Group     |
+----+-------+------------------+----------------+---------+------------+--------------------+-------------+-----------+
| 49 | "sst" | 10               | 1              | "utf8"  | "utf8_bin" | "FIXED_STRING(32)" | "false"     | "default" |
+----+-------+------------------+----------------+---------+------------+--------------------+-------------+-----------+
Copy the code

Therefore, the following operation is to manually get the SST file from HDFS and copy it to storageD.

docker exec -it namenode /bin/bash

$ hdfs dfs -get /sst /sst
exit
docker cp namenode:/sst .
docker exec -it nebula-docker-compose_storaged0_1 mkdir -p /data/storage/nebula/49/download/
docker exec -it nebula-docker-compose_storaged1_1 mkdir -p /data/storage/nebula/49/download/
docker exec -it nebula-docker-compose_storaged2_1 mkdir -p /data/storage/nebula/49/download/
docker cp sst nebula-docker-compose_storaged0_1:/data/storage/nebula/49/download/
docker cp sst nebula-docker-compose_storaged1_1:/data/storage/nebula/49/download/
docker cp sst nebula-docker-compose_storaged2_1:/data/storage/nebula/49/download/
Copy the code

2. Import the SST file

  • Enter the container where Nebula-Console is located
~/.nebula-up/console.sh
/ #
Copy the code
  • Initiate a link to the graph database in the console container192.168. X.yIs the address of the first network card of my Linux VM, please change it to yours
Y -port 9669 -user root -p password [INFO] Connection pool is initialized successfully Welcome to Nebula Graph!Copy the code
  • performINGESTStart getting StorageD to read the SST file
(root@nebula) [(none)]> use sst
(root@nebula) [sst]> INGEST;
Copy the code

You can view the Nebula Graph server logs in real time as follows

tail -f ~/.nebula-up/nebula-docker-compose/logs/*/*
Copy the code

Successful INGEST log:

I0817 08:03:28.611877 169 eventListner. h:96] Ingest External SST file: column family default, the external file path /data/storage/nebula/49/download/8/8-6.sst, the internal file path /data/storage/nebula/49/data/000023.sst, the properties of the table:# data blocks=1; # entries=1; # deletions=0; # merge operands=0; # range deletions=0; raw key size=48; Raw business key size = 48.000000; raw value size=40; Raw business value size = 40.000000; data block size=75; index block size (user-key? 0, delta-value? 0) = 66; filter block size=0; (estimated) table size=141; filter policy name=N/A; prefix extractor name=nullptr; column family ID=N/A; column family name=N/A; comparator name=leveldb.BytewiseComparator; merge operator name=nullptr; property collectors names=[]; SST file compression algo=Snappy; SST file compression options=window_bits=-14; level=32767; strategy=0; max_dict_bytes=0; zstd_max_train_bytes=0; enabled=0; ; creation time=0; time stamp of earliest key=0; file creation time=0;169 StorageHttpIngestHandler E0817 08:03:28. 611912. The CPP: 63] SSTFile ingest successfullyCopy the code

The appendix

Appendix a

docker-compose.yaml

diff --git a/docker-compose.yaml b/docker-compose.yaml
index 48854de.. cfeaedb 100644
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@ @ - 6 + 6, 11, 13 @ @ services:
       USER: root
       TZ:   "${TZ}"
     command:
- - --meta_server_addrs=metad0:9559,metad1:9559,metad2:9559
+ - --meta_server_addrs=metad0:9559
       - --local_ip=metad0
       - --ws_ip=metad0
       - --port=9559
       - --ws_http_port=19559
+ - --ws_storage_http_port=19779
       - --data_path=/data/meta
       - --log_dir=/logs
       - --v=0
@ @ - 34, + 36, 14 @ @ services:
     cap_add:
       - SYS_PTRACE

- metad1:
-    image: vesoft/nebula-metad:v2.0.0
- environment:
- USER: root
- TZ: "${TZ}"
- command:
- - --meta_server_addrs=metad0:9559,metad1:9559,metad2:9559
- - --local_ip=metad1
- - --ws_ip=metad1
- - --port=9559
- - --ws_http_port=19559
- - --data_path=/data/meta
- - --log_dir=/logs
- - --v=0
- - --minloglevel=0
- healthcheck:
- test: ["CMD", "curl", "-sf", "http://metad1:19559/status"]
- interval: 30s
- timeout: 10s
- retries: 3
- start_period: 20s
- ports:
- - 9559.
- - 19559.
- - 19560.
- volumes:
- - ./data/meta1:/data/meta
- - ./logs/meta1:/logs
- networks:
- - nebula-net
- restart: on-failure
- cap_add:
- - SYS_PTRACE
-
- metad2:
-    image: vesoft/nebula-metad:v2.0.0
- environment:
- USER: root
- TZ: "${TZ}"
- command:
- - --meta_server_addrs=metad0:9559,metad1:9559,metad2:9559
- - --local_ip=metad2
- - --ws_ip=metad2
- - --port=9559
- - --ws_http_port=19559
- - --data_path=/data/meta
- - --log_dir=/logs
- - --v=0
- - --minloglevel=0
- healthcheck:
- test: ["CMD", "curl", "-sf", "http://metad2:19559/status"]
- interval: 30s
- timeout: 10s
- retries: 3
- start_period: 20s
- ports:
- - 9559.
- - 19559.
- - 19560.
- volumes:
- - ./data/meta2:/data/meta
- - ./logs/meta2:/logs
- networks:
- - nebula-net
- restart: on-failure
- cap_add:
- - SYS_PTRACE
-Storaged0: image: Vesoft /nebula- Storaged :v2.0.0 Environment: USER: root TZ: "${TZ}" Command:- - --meta_server_addrs=metad0:9559,metad1:9559,metad2:9559
+ - --meta_server_addrs=metad0:9559
       - --local_ip=storaged0
       - --ws_ip=storaged0
       - --port=9779
@ @ 54-119, 8 + 8 @ @ services:
       - --minloglevel=0
     depends_on:
       - metad0
- - metad1
- - metad2
     healthcheck:
       test: ["CMD", "curl", "-sf", "http://storaged0:19779/status"]
       interval: 30s
@ @ + 81-146, 7, 7 @ @ services:
       USER: root
       TZ:   "${TZ}"
     command:
- - --meta_server_addrs=metad0:9559,metad1:9559,metad2:9559
+ - --meta_server_addrs=metad0:9559
       - --local_ip=storaged1
       - --ws_ip=storaged1
       - --port=9779
@ @ + 92-157, 8, 8 @ @ services:
       - --minloglevel=0
     depends_on:
       - metad0
- - metad1
- - metad2
     healthcheck:
       test: ["CMD", "curl", "-sf", "http://storaged1:19779/status"]
       interval: 30s
@ @ + 119-184, 7, 7 @ @ services:
       USER: root
       TZ:   "${TZ}"
     command:
- - --meta_server_addrs=metad0:9559,metad1:9559,metad2:9559
+ - --meta_server_addrs=metad0:9559
       - --local_ip=storaged2
       - --ws_ip=storaged2
       - --port=9779
@ @ + 130-195, 8, 8 @ @ services:
       - --minloglevel=0
     depends_on:
       - metad0
- - metad1
- - metad2
     healthcheck:
       test: ["CMD", "curl", "-sf", "http://storaged2:19779/status"]
       interval: 30s
@ @ 222, + 157 @ @ services:
       USER: root
       TZ:   "${TZ}"
     command:
- - --meta_server_addrs=metad0:9559,metad1:9559,metad2:9559
+ - --meta_server_addrs=metad0:9559
       - --port=9669
       - --ws_ip=graphd
       - --ws_http_port=19669
+ - --ws_meta_http_port=19559
       - --log_dir=/logs
       - --v=0
       - --minloglevel=0
     depends_on:
       - metad0
- - metad1
- - metad2
     healthcheck:
       test: ["CMD", "curl", "-sf", "http://graphd:19669/status"]
       interval: 30s
@ @ 257, + 194 @ @ services:
       USER: root
       TZ:   "${TZ}"
     command:
- - --meta_server_addrs=metad0:9559,metad1:9559,metad2:9559
+ - --meta_server_addrs=metad0:9559
       - --port=9669
       - --ws_ip=graphd1
       - --ws_http_port=19669
+ - --ws_meta_http_port=19559
       - --log_dir=/logs
       - --v=0
       - --minloglevel=0
     depends_on:
       - metad0
- - metad1
- - metad2
     healthcheck:
       test: ["CMD", "curl", "-sf", "http://graphd1:19669/status"]
       interval: 30s
@@ -292,17 +231,21 @@ services:
       USER: root
       TZ:   "${TZ}"
     command:
- - --meta_server_addrs=metad0:9559,metad1:9559,metad2:9559
+ - --meta_server_addrs=metad0:9559
       - --port=9669
       - --ws_ip=graphd2
       - --ws_http_port=19669
+ - --ws_meta_http_port=19559
       - --log_dir=/logs
       - --v=0
       - --minloglevel=0
+ - --storage_client_timeout_ms=60000
+ - --local_config=true
     depends_on:
       - metad0
- - metad1
- - metad2
     healthcheck:
       test: ["CMD", "curl", "-sf", "http://graphd2:19669/status"]
       interval: 30s
@ @ + 266-323, 3, 4 @ @ services:

 networks:
   nebula-net:
+ external: true
Copy the code

Appendix 2

Github.com/big-data-eu… The docker – compose. Yml

diff --git a/docker-compose.yml b/docker-compose.yml
index ed40dc6.. 66ff1f4 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@ @ - 14, 6 + 14, 8 @ @ services:
       - CLUSTER_NAME=test
     env_file:
       - ./hadoop.env
+ networks:
+ - nebula-netDatanode: image: bde2020 / hadoop - datanode: 2.0.0 hadoop3.2.1 -- java8@ @ 25, 6 + 27, 8 @ @ services:
       SERVICE_PRECONDITION: "namenode:9870"
     env_file:
       - ./hadoop.env
+ networks:
+ - nebula-netThe resourcemanager: image: bde2020 / hadoop - the resourcemanager: 2.0.0 hadoop3.2.1 -- java8@ @ - 34, 6 + 38, 8 @ @ services:
       SERVICE_PRECONDITION: "namenode:9000 namenode:9870 datanode:9864"
     env_file:
       - ./hadoop.env
+ networks:
+ - nebula-netNodemanager1: image: bde2020 / hadoop - nodemanager: 2.0.0 hadoop3.2.1 -- java8@ @ - 43, 6 + 49, 8 @ @ services:
       SERVICE_PRECONDITION: "namenode:9000 namenode:9870 datanode:9864 resourcemanager:8088"
     env_file:
       - ./hadoop.env
+ networks:
+ - nebula-netHistoryserver: image: bde2020 / hadoop - historyserver: 2.0.0 hadoop3.2.1 -- java8@ @ + 62-54, 8, 14 @ @ services:
       - hadoop_historyserver:/hadoop/yarn/timeline
     env_file:
       - ./hadoop.env
+ networks:
+ - nebula-net

 volumes:
   hadoop_namenode:
   hadoop_datanode:
   hadoop_historyserver:
+
+networks:
+ nebula-net:
+ external: true
Copy the code

The appendix 3

nebula-exchange-sst.conf

{
  # Spark relation configSpark: {app: {name: Nebula Exchange 2.1} Master :local driver: {cores: 1 maxResultSize: 1G} Executor: { memory:1G } cores:{ max: 16 } }# Nebula Graph relation config
  nebula: {
    address:{
      graph:[192.168.8.128:9669 ""]
      meta:[192.168.8.128:49377 ""]
    }
    user: root
    pswd: nebula
    space: sst

    # parameters for SST import, not requiredPath: {local: "/ TMP" remote "/ SST" HDFS. The namenode: HDFS: / / 192.168.8.128: "9000"}# nebula client connection parameters
    connection {
      # socket connect & execute timeout, unit: millisecond
      timeout: 30000
    }

    error: {
      # max number of failures, if the number of failures is bigger than max, then exit the application.
      max: 32
      # failed import job will be recorded in output path
      output: /tmp/errors
    }

    # use google's RateLimiter to limit the requests send to NebulaGraph
    rate: {
      # the stable throughput of RateLimiter
      limit: 1024
      # Acquires a permit from RateLimiter, unit: MILLISECONDS
      # if it can't be obtained within the specified timeout, then give up the request.
      timeout: 1000
    }
  }

  # Processing tags
  # There are tag config examples for different dataSources.
  tags: [ # HDFS csv # Import mode is sst, just change type.sink to client if you want to use client import mode. { name: player type: { source: csv sink: sst } path: "file:///root/player.csv" # if your csv file has no header, then use _c0,_c1,_c2,.. to indicate fields fields: [_c1, _c2]
      nebula.fields: [name, age]
      vertex: {
        field:_c0
      }
      separator: ","
      header: false
      batch: 256
      partition: 32
    }

  ]
}
Copy the code

If there are any errors or omissions in this article, please go to GitHub: github.com/vesoft-inc/… Submit an issue to us in the Issue section or submit a suggestion under the suggestion feedback section of the official forum: discuss.nebula-graph.com.cn/ 👏; Ac graph database technology? To join the Nebula Exchange group, please fill out your Nebula card and Nebula Assistant will bring you into the group