You can use the Pulsar Spark Connector to read Pulsar data and write the results back to Pulsar. This article describes how to use Pulsar Spark Connector.

🙋Pulsar Spark Connector opened source on July 9, 2019. See the source code and user guide here.

Configure the environment

The following example uses the Homebrew package manager to download and install software on macOS. You can choose another package manager based on your needs and operating system.

  1. Install Homebrew.
/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
Copy the code
  1. Install Java 8 or later. This example uses Homebrew to install JDK8.
brew tap adoptopenjdk/openjdk
brew cask install adoptopenjdk8
Copy the code
  1. Install Apache Spark 2.4.0 or later. Download Spark 2.4.3 from the official website and decompress it.
Tar XVFZ spark - 2.4.3 - bin - hadoop2.7. TGZCopy the code
  1. Download Apache Pulsar 2.4.0. Download Pulsar 2.4.0 from the official website.
Wget https://archive.apache.org/dist/pulsar/pulsar-2.4.0/apache-pulsar-2.4.0-bin.tar.gz tar XVFZ Apache - pulsar - 2.4.0 - bin. Tar. GzCopy the code
  1. Install Apache Maven.
brew install maven
Copy the code
  1. Set up the development environment. This example creates a Maven project named connector-test.

(1) Use archetype provided by Scala Maven Plugin to build a Scala project framework.

mvn archetype:generate
Copy the code

In the list that appears, select Net.alChim31.maven: the latest version of Scala-archetype-Simple, currently 1.7, and specify groupId, artifactId, and Version for the new project.

This example uses:

GroupId: com.example artifactId: connector-test version: 1.0-snapshotCopy the code

After the above steps, a Maven Scala project framework is basically set up.

(2) Introduce Spark and Pulsar Spark Connector dependencies in _pum. xml_ in the root directory of the project, and use _maven_shade_plugin_ to package the project. A. Define the version information of the dependent package.Copy the code
  <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.12</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
        <spark.version>2.4.3</spark.version>
        <pulsar-spark-connector.version>2.4.0</pulsar-spark-connector.version>
        <spec2.version>4.2.0</spec2.version>
        <maven-shade-plugin.version>3.1.0</maven-shade-plugin.version>
  </properties>
Copy the code
B. Import Spark and Pulsar Spark Connector dependencies.Copy the code
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.compat.version}</artifactId>
        <version>${spark.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.compat.version}</artifactId>
        <version>${spark.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-catalyst_${scala.compat.version}</artifactId>
        <version>${spark.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>io.streamnative.connectors</groupId>
        <artifactId>pulsar-spark-connector_${scala.compat.version}</artifactId>
        <version>${pulsar-spark-connector.version}</version>
    </dependency>C. Add a Maven repository containing _pulsar-spark-connector_. ```xml<repositories>
      <repository>
        <id>central</id>
        <layout>default</layout>
        <url>https://repo1.maven.org/maven2</url>
      </repository>
      <repository>
        <id>bintray-streamnative-maven</id>
        <name>bintray</name>
        <url>https://dl.bintray.com/streamnative/maven</url>
      </repository>
    </repositories>D. Package the sample class with _pulsar-spark-connector_ using _maven_shade_plugin_. ```xml<plugin>
          <! -- Shade all the dependencies to avoid conflicts -->
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-shade-plugin</artifactId>
          <version>${maven-shade-plugin.version}</version>
          <executions>
            <execution>
              <phase>package</phase>
              <goals>
                <goal>shade</goal>
              </goals>
              <configuration>
                <createDependencyReducedPom>true</createDependencyReducedPom>
                <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
                <minimizeJar>false</minimizeJar>

                <artifactSet>
                  <includes>
                    <include>io.streamnative.connectors:*</include>
                  </includes>
                </artifactSet>
                <filters>
                  <filter>
                    <artifact>* : *</artifact>
                    <excludes>
                      <exclude>META-INF/*.SF</exclude>
                      <exclude>META-INF/*.DSA</exclude>
                      <exclude>META-INF/*.RSA</exclude>
                    </excludes>
                  </filter>
                </filters>
                <transformers>
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                  <transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer" />
                </transformers>
              </configuration>
            </execution>
          </executions>
        </plugin>
Copy the code

Pulsar Spark, speaking, reading and writing

The project in the example includes the following programs:

  1. Read data from Pulsar (name the App asStreamRead).
  2. Write the data to Pulsar (name the App asBatchWrite).

Build the flow processing job to read the data from Pulsar

  1. inStreamRead, createSparkSession.
val spark = SparkSession
  .builder()
  .appName("data-read")
  .config("spark.cores.max".2)
  .getOrCreate()
Copy the code
  1. In order to connect to Pulsar, you need to build inDataFrameSpecified when theservice.urladmin.urlAnd specify what to readtopic.
val ds = spark.readStream
  .format("pulsar")
  .option("service.url"."pulsar://localhost:6650")
  .option("admin.url"."http://localhost:8088")
  .option("topic"."topic-test")
  .load()
ds.printSchema()  // Print the schema information of topic-test to verify that the data is read successfully
Copy the code
  1. willdsOutput to the console to start job execution.
val query = ds.writeStream
    .outputMode("append")
    .format("console")
    .start()
query.awaitTermination()
Copy the code

Write the data to Pulsar

  1. Similarly, inBatchWriteIs created firstSparkSession.
val spark = SparkSession
    .builder()
    .appName("data-sink")
    .config("spark.cores.max".2)
    .getOrCreate()
Copy the code
  1. Create a list of 1-10, convert it to a Spark Dataset, and write to Pulsar.
import spark.implicits._
spark.createDataset(1 to 10)
    .write
    .format("pulsar")
    .option("service.url"."pulsar://localhost:6650")
    .option("admin.url"."http://localhost:8088")
    .option("topic"."topic-test")
    .save()
Copy the code

To run the program

First configure and start Spark and Pulsar’s single-node cluster, then package the sample project, submit the two jobs through Spark-Submit, and observe the program execution results.

  1. Change the Spark log level (optional).
cd ${spark.dir}/conf
cp log4j.properties.template log4j.properties
Copy the code

In the text editor, change the log level to WARN.

log4j.rootCategory=WARN, console
Copy the code
  1. Start the Spark cluster.
cd ${spark.dir}
sbin/start-all.sh
Copy the code
  1. Change the Pulsar WebService port to 8088 (edit ${Pulsar. Dir}/conf/standalone. Conf) to avoid conflicts with the Spark port.

    Copy the code

webServicePort=8088

4. Start Pulsar cluster. ```bash bin/pulsar standaloneCopy the code
  1. Package the sample project.
cd ${connector_test.dir}
mvn package
Copy the code
  1. Start theStreamReadmonitoringtopic-testData changes in.
${spark.dir}/bin/spark-submit --class com.example.StreamRead --master spark://localhost:7077 ${connector_test.dir}/ target/connector - test - 1.0 - the SNAPSHOT. The jarCopy the code
  1. In another terminal window, startBatchWritetopic-testWrite the numbers 1-10 at a time.
${spark.dir}/bin/spark-submit --class com.example.BatchWrite --master spark://localhost:7077 ${connector_test.dir}/ target/connector - test - 1.0 - the SNAPSHOT. The jarCopy the code
  1. In this case, you canStreamReadGets similar output from the terminal where it is located.
root |-- value: integer (nullable = false) |-- __key: binary (nullable = true) |-- __topic: string (nullable = true) |-- __messageId: binary (nullable = true) |-- __publishTime: timestamp (nullable = true) |-- __eventTime: timestamp (nullable = true) Batch: 0 + + -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- - + |value|__key|__topic|__messageId|__publishTime|__eventTime| +-----+-----+-------+-----------+-------------+-----------+ +-----+-----+-------+-----------+-------------+-----------+ Batch: 1 +-----+-----+--------------------+--------------------+--------------------+-----------+ |value|__key| __topic| __messageId| __publishTime|__eventTime| +-----+-----+--------------------+--------------------+--------------------+-----------+ | 6| null|persistent://publ... |[08 86 01 10 02 2...|2019-07-08 14:51:...| null| | 7| null|persistent://publ...|[08 86 01 10 02 2...|2019-07-08 14:51:...| null| | 8| null|persistent://publ...|[08 86 01 10 02 2...|2019-07-08 14:51:...| null| | 9| null|persistent://publ...|[08 86 01 10 02 2...|2019-07-08 14:51:...| null| | 10| null|persistent://publ...|[08 86 01 10 02 2...|2019-07-08 14:51:...| null| | 1| null|persistent://publ...|[08 86 01 10 03 2...|2019-07-08 14:51:...| null| | 2|  null|persistent://publ...|[08 86 01 10 03 2...|2019-07-08 14:51:...| null| | 3| null|persistent://publ...|[08 86 01 10 03 2...|2019-07-08 14:51:...| null| | 4| null|persistent://publ...|[08 86 01 10 03 2...|2019-07-08 14:51:...| null| | 5|  null|persistent://publ...|[08 86 01 10 03 2...|2019-07-08 14:51:...| null| +-----+-----+--------------------+--------------------+--------------------+-----------+Copy the code

So far, we have set up Pulsar and Spark clusters, constructed the framework of the sample project, and completed the operation of reading Pulsar data from Spark and writing Spark data to Pulsar using Pulsar Spark Connector. Final program tests are submitted.

For a complete example of the 🙋 program, see here.