Kafka deployment Manual
1. Create a directory
mkdir -p /data/kafka
Copy the code
2. Download the installation package
Wget - O/data/kafka/kafka_2. 13 - server. TGZ https://apache-mirror.rbc.ru/pub/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgzCopy the code
Command mode may download failed, can visit the website to download: kafka.apache.org/downloads
3. Unzip
The tar - XZF kafka_2. 13 - server. TGZCopy the code
4. Go to the Kafka installation directory
CD kafka_2. 13 - serverCopy the code
5. Start the Kafak runtime environment
nohup bin/zookeeper-server-start.sh config/zookeeper.properties
nohup bin/kafka-server-start.sh config/server.properties
Copy the code
6. Create a topic
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
Copy the code
7. WordCountDemo
To test this using the following Demo, you need to change Kafka-Broker1 to the host IP of the Linux where Kafka resides.
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Arrays;
import java.util.Properties;
public class WordCountApplication {
public static void main(final String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("TextLinesTopic");
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = newKafkaStreams(builder.build(), props); streams.start(); }}Copy the code