Welcome to my GitHub

Github.com/zq2599/blog…

Content: all original article classification summary and supporting source code, involving Java, Docker, Kubernetes, DevOPS, etc.;

This paper gives an overview of

This is the third part of the Flink Sink Connector series. The main content is to experience Flink’s official Cassandra connector. The whole operation is shown in the following figure. Then print and write the result to Cassandra simultaneously:

Full range of links

  1. “Flink sink Actual Combat: A Preliminary Study”
  2. Flink Sink Combat II: Kafka
  3. Flink’s Sink: Cassandra3
  4. “Flink sink Combat iv: Custom”

Software version

The software version information of this actual combat is as follows:

  1. Cassandra: 3.11.6
  2. Kafka: 2.4.0 (Scala: 2.12)
  3. The JDK: 1.8.0 comes with _191
  4. Flink: 1.9.2
  5. Maven: 3.6.0
  6. CentOS Linux release 7.7.1908
  7. CentOS Linux release 7.7.1908
  8. IDEA: 2018.3.5 (Ultimate Edition)

About Cassandra

This time, Cassandra is deployed in three clusters. For details about how to set up Cassandra, see The Ansible Rapid Deployment cassandra3 Cluster.

Prepare the Keyspace and table for Cassandra

Create keyspace and table first:

  1. CQLSH Login to Cassandra:
CQLSH 192.168.133.168Copy the code
  1. Create keySpace (3 copies) :
CREATE KEYSPACE IF NOT EXISTS example
    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'};
Copy the code
  1. Build tables:
CREATE TABLE IF NOT EXISTS example.wordcount (
    word text,
    count bigint,
    PRIMARY KEY(word)
    );
Copy the code

Prepare kafka topic

  1. Start kafka service;
  2. To create a topic named test001, use the following command:
Sh \ --create \ --bootstrap-server 127.0.0.1:9092 \ --replication-factor 1 \ --partitions 1 \ --topic test001Copy the code
  1. To enter the session mode for sending messages, run the following command:
./kafka-console-producer.sh \
--broker-list kafka:9092 \
--topic test001
Copy the code
  1. In session mode, typing any string and then press Enter sends a string message to the broker;

Download the source code

If you don’t want to write code, the entire series of source codes can be downloaded at GitHub, with the following address and link information (github.com/zq2599/blog…

The name of the link note
Project home page Github.com/zq2599/blog… The project’s home page on GitHub
Git repository address (HTTPS) Github.com/zq2599/blog… The project source warehouse address, HTTPS protocol
Git repository address (SSH) [email protected]:zq2599/blog_demos.git The project source warehouse address, SSH protocol

This git project has multiple folders. The application of this chapter is in the FlinksinkDemo folder, as shown in the red box below:

There are two ways to write Cassandra

Flink’s official connector supports writing Cassandra in two ways:

  1. Type Tuple write: aligns the fields of the Tuple object to the parameters of the specified SQL;
  2. POJO type writes: DataStax maps POJO objects to annotated tables and fields;

Then use both methods separately;

Development (Tuple write)

  1. The FlinksinkDemo project was created in Flink’s Sink Combat II: Kafka and continues to be used here;
  2. Add casandra’s Connector dependency to POM.xml:
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>Flink - connector - cassandra_2. 11</artifactId>
  <version>1.10.0</version>
</dependency>
Copy the code
  1. Also add the Flink-streaming – Scala dependency, otherwise compile the CassandraSink. AddSink code will fail:
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
  <scope>provided</scope>
</dependency>
Copy the code
  1. Java (Job, Job, Tuple2, Tuple2, Tuple, Tuple, Tuple, Tuple);
package com.bolingcavalry.addsink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.util.Properties;


public class CassandraTuple2Sink {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Set the parallelism
        env.setParallelism(1);

        // The attribute object used to connect kafka
        Properties properties = new Properties();
        / / broker address
        properties.setProperty("bootstrap.servers"."192.168.50.43:9092");
        / / they address
        properties.setProperty("zookeeper.connect"."192.168.50.43:2181");
        // Consumer groupId
        properties.setProperty("group.id"."flink-connector");
        Instantiate the Consumer class
        FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(
                "test001".new SimpleStringSchema(),
                properties
        );

        // Specify that consumption starts from the latest location, which is equivalent to discarding historical messages
        flinkKafkaConsumer.setStartFromLatest();

        // Get the DataSource using the addSource method
        DataStream<String> dataStream = env.addSource(flinkKafkaConsumer);

        DataStream<Tuple2<String, Long>> result = dataStream
                .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
                             @Override
                             public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
                                 String[] words = value.toLowerCase().split("\\s");

                                 for (String word : words) {
                                     // In Cassandra's table, each word is the primary key and therefore cannot be empty
                                     if(! word.isEmpty()) { out.collect(new Tuple2<String, Long>(word, 1L));
                                     }
                                 }
                             }
                         }
                )
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        result.addSink(new PrintSinkFunction<>())
                .name("print Sink")
                .disableChaining();

        CassandraSink.addSink(result)
                .setQuery("INSERT INTO example.wordcount(word, count) values (? ,?) ;")
                .setHost("192.168.133.168")
                .build()
                .name("cassandra Sink")
                .disableChaining();

        env.execute("kafka-2.4 source, cassandra-3.11.6 sink, tuple2"); }}Copy the code
  1. In the above code, the data is fetched from Kafka, processed by Word count and written to Cassandra. Note the addSink API(which contains the parameters of the database connection), which is recommended by Flink. In addition, to see the DAG situation in the Flink Web UI, Call the disableChaining method to disable the operator chain. In the production environment, this line can be removed.
  2. After coding, run MVN clean package-u-dskiptests and get flinksinkDemo-1.0-snapshot.jar in target directory.
  3. Upload FlinksinkDemo-1.0-snapshot. jar on Flink’s Web UI and specify the execution class, as shown in the red box below:

8. DAG after starting the task is as follows:9. Go to the session mode window and send a string “aaa BBB CCC AAA AAA AAA aaa”. 10. Check Cassandra data and find that three records have been added, and the content is in line with expectations:TaskManager: Tuple2: Tuple2: Cassandra: Tuple2: Tuple2: Tuple2: Tuple2: Tuple2: Tuple212. The number of records of all subtasks on DAG is also in line with expectations:

Development (POJO writing)

Next try POJO writes, where instances of data structures in business logic are written to Cassandra without specifying SQL:

  1. Add the following dependencies to pom.xml to implement POJO writing to the database:
<dependency>
  <groupId>com.datastax.cassandra</groupId>
  <artifactId>cassandra-driver-core</artifactId>
  <version>3.1.4</version>
  <classifier>shaded</classifier>
  <! -- Because the shaded JAR uses the original POM, you still need to exclude this dependency explicitly: -->
  <exclusions>
    <exclusion>
	<groupId>io.netty</groupId>
	<artifactId>*</artifactId>
	</exclusion>
  </exclusions>
</dependency>
Copy the code
  1. Please note that the above exclusions node configuration, rely on the datastax, according to the official guidance on indirect rely on exclusion, netty related official address: docs.datastax.com/en/develope…
  2. Create an entity class WordCount with database-related annotations:
package com.bolingcavalry.addsink;

import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.Table;

@Table(keyspace = "example", name = "wordcount")
public class WordCount {

    @Column(name = "word")
    private String word = "";

    @Column(name = "count")
    private long count = 0;

    public WordCount(a) {}public WordCount(String word, long count) {
        this.setWord(word);
        this.setCount(count);
    }

    public String getWord(a) {
        return word;
    }

    public void setWord(String word) {
        this.word = word;
    }

    public long getCount(a) {
        return count;
    }

    public void setCount(long count) {
        this.count = count;
    }

    @Override
    public String toString(a) {
        return getWord() + ":"+ getCount(); }}Copy the code
  1. Then create the task class CassandraPojoSink:
package com.bolingcavalry.addsink;

import com.datastax.driver.mapping.Mapper;
import com.datastax.shaded.netty.util.Recycler;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.util.Properties;

public class CassandraPojoSink {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Set the parallelism
        env.setParallelism(1);

        // The attribute object used to connect kafka
        Properties properties = new Properties();
        / / broker address
        properties.setProperty("bootstrap.servers"."192.168.50.43:9092");
        / / they address
        properties.setProperty("zookeeper.connect"."192.168.50.43:2181");
        // Consumer groupId
        properties.setProperty("group.id"."flink-connector");
        Instantiate the Consumer class
        FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(
                "test001".new SimpleStringSchema(),
                properties
        );

        // Specify that consumption starts from the latest location, which is equivalent to discarding historical messages
        flinkKafkaConsumer.setStartFromLatest();

        // Get the DataSource using the addSource method
        DataStream<String> dataStream = env.addSource(flinkKafkaConsumer);

        DataStream<WordCount> result = dataStream
                .flatMap(new FlatMapFunction<String, WordCount>() {
                    @Override
                    public void flatMap(String s, Collector<WordCount> collector) throws Exception {
                        String[] words = s.toLowerCase().split("\\s");

                        for (String word : words) {
                            if(! word.isEmpty()) {// In Cassandra's table, each word is the primary key and therefore cannot be empty
                                collector.collect(new WordCount(word, 1L));
                            }
                        }
                    }
                })
                .keyBy("word")
                .timeWindow(Time.seconds(5))
                .reduce(new ReduceFunction<WordCount>() {
                    @Override
                    public WordCount reduce(WordCount wordCount, WordCount t1) throws Exception {
                        return newWordCount(wordCount.getWord(), wordCount.getCount() + t1.getCount()); }}); result.addSink(new PrintSinkFunction<>())
                .name("print Sink")
                .disableChaining();

        CassandraSink.addSink(result)
                .setHost("192.168.133.168")
                .setMapperOptions(() -> new Mapper.Option[] { Mapper.Option.saveNullFields(true) })
                .build()
                .name("cassandra Sink")
                .disableChaining();

        env.execute("Kafka-2.4 source, Cassandra -3.11.6 sink, pojo"); }}Copy the code
  1. As can be seen from the above code, it is very different from the previous Tuple write type. In order to prepare the POJO type data set, in addition to the flatMap anonymous class input parameter needs to be overwritten, the reduce method anonymous class input parameter needs to be written, and setMapperOptions needs to be called to set the mapping rule.
  2. After compiling and building, upload the JAR to Flink and specify the task class as CassandraPojoSink:

7. Clear the previous data and run the TRUNCATE example.wordcount command on the CQLSH of Cassandra. 8. Send a string message to Kafka as before:9. Check the database and find that the result meets the expectation:

10. The situation of DAG and SubTask is as follows:

At this point, flink results data into Cassandra combat is completed, I hope to give you some reference;

Welcome to pay attention to the public number: programmer Xin Chen

Wechat search “programmer Xin Chen”, I am Xin Chen, looking forward to enjoying the Java world with you…

Github.com/zq2599/blog…