Small knowledge, big challenge! This paper is participating in theEssentials for programmers”Creative activities

Kafka

Github.com/joekiller/l…

The plugin has been merged into the official repository. The following is based on the logStash 1.4 version. The use of version 1.5 and later will be updated according to the official documentation.

The content of the plug-in itself is very simple and relies on the Jruby-Kafka module written by the same author. Note that this module only supports Kafka-0.8. The jruby-kafka module and the logstash-kafka plugin cannot be directly enabled if kafka version 0.7 is used.

The installation

  • Installation The installation is fully automated according to official documentation. Or you can manually install the plug-in yourself in the following ways, but focus on the kafka version, as noted above.
  1. Download logStash and unzip it and rename it to. / logstash - 1.4.0File directory.
  2. Download kafka-related components, as in the following exampleKafka_2. 8.0-0.8.1.1 – SRCAnd unzip it and rename it to. / kafka_2. 8.0 0.8.1.1.
  3. Download logstash-kafka v0.4.2 fromreleasesAnd unzip it and rename it to. / logstash - kafka - 0.4.2.
  4. from. / kafka_2. 8.0 0.8.1.1 / libsCopy all jar files to directory. / logstash 1.4.0 / vendor/jars/kafka_2. 8.0 0.8.1.1 / libsNext, where you need to createKafka_2. 8.0 0.8.1.1 / libsRelevant folders and directories.
  5. Copy respectively. / logstash - kafka - 0.4.2 / logstashIn theinputsoutputsUnder thekafka.rb, copy to the corresponding. / logstash 1.4.0 / lib/logstashIn theinputsoutputsIn the corresponding directory.
  6. Switch to the. / logstash - 1.4.0To install the jruby-kafka library, run the logstash-kafka gembag.rb script:GEM_HOME=vendor/bundle/jruby/1.9 GEM_PATH= java-jar vendor/jar/jruby-complete-1.7.11.jar --1.9.. / logstash - kafka - 0.4.2 / gembag rb.. / logstash - kafka - 0.4.2 / logstash - kafka. Gemspec.
  7. Now you can run the logstash plug-in using logstash-kafka. Such as:bin/logstash agent -f logstash.conf.

Input Configuration Example

The following configuration enables basic use of the Kafka reader (Consumer).

Consumers more detailed configuration please check kafka.apache.org/documentati… Kafka official documentation of the consumer part of the configuration document.

input { kafka { zk_connect => "localhost:2181" group_id => "logstash" topic_id => "test" reset_beginning => false # Boolean (optional), default: false Consumer_threads => 5 # number (optional), default: 1 decorate_events => true # Boolean (optional), default: false}}Copy the code

Input to explain

Some useful configuration items on the consumer side:

  • group_id

Groups of consumers can be specified by group ID. Consumption between different groups is not affected by each other and is isolated from each other.

  • topic_id

Specify consumption topic, is also a required item, specify consumption topic, this is actually subscribe to a topic, and then to consume.

  • reset_beginning

The default is the end position, which means that the Logstash process will continue reading at the offset from the end of the last read, or start reading from the beginning if it has not been consumed before. If you’re importing old data, change this to “true” and the Logstash process starts reading from the beginning. It is similar to cat, but does not terminate after reading the last line. Instead, it becomes tail -f and continues listening for the corresponding data.

  • decorate_events

When the message is output, it outputs its own information, including the size of the consumption message, topic source, and consumer group information.

  • rebalance_max_retries

When a new consumer(logstash) is added to the same group, reblance will be created. After that, the consumer end of partitions will be transferred to the new consumer. If a consumer has access to a partition, it will register with ZooKeeper, the Partition Owner Registry, but the original consumer may not have released the partition yet. This value controls the number of retries for a registered node.

  • consumer_timeout_ms

An exception is thrown if no message has arrived within the specified time.

The above is the use of relatively important parameters example, more parameters can be optional can follow github.com/joekiller/l… View the input default parameter.

Pay attention to

1. If you want to use multiple Logstash ends to consume the same topic, you need to configure two or more Logstash ends with the same group_ID and topic_id. However, if the corresponding topic is divided into multiple partitions (partitions), the sequential consumption of messages cannot be guaranteed by multiple consumers.

To explain why multiple partitions are required, Kafka’s message model is for topic partitions to achieve a distributed effect. Only one Owner can consume different partitions under each topic. Therefore, only after multiple partitions can multiple consumers be started, corresponding to different areas to consume. Among them, the coordinated consumption part is coordinated by the server. There is no need for the user to think too much. But the consumption of messages is disordered.

Conclusion: To ensure the order of messages, use a partition. Each partition in Kafka can only be consumed by one consumer in the same group at a time.

The Output configuration

The following configuration can implement the basic use of Kafka producer.

The production end more detailed configuration please check kafka.apache.org/documentati… Kafka official documentation of the producer part of the configuration document.

Output {kafka {broker_list => "Localhost :9092" topic_id => "test" compression_codec => "snappy" # string (optional), One of ["none", "gzip", "snappy"], default: "none"}}Copy the code

The Output explain

There are still a lot of Settings for production. There are actually more Settings. Here are more Settings:

  • compression_codec

The message compression mode, which defaults to None, can be gzip or SNappy (performance with compression enabled or not enabled, data transfer size, etc.).

  • compressed_topics

You can compress for a particular topic. Set this parameter to topic to indicate that the topic is compressed.

  • request_required_acks

Confirmation mode of messages:

It can be set to 0: the producer does not wait for the broker to respond, but sends. Lowest latency and lowest assurance (loss of information after server failure)

Can be set to 1: the producer will receive a response from the leader after the leader writes (failure before the current leader server is replicated may result in information loss).

Can be set to -1: the producer will receive a response from the leader after all copies have been made.

  • partitioner_class

The partition policy is hash by default

  • send_buffer_bytes

The socket cache size is actually the size of the buffer

Message pattern correlation

  • serializer_class

The serialization processing class of the message body, converted to a byte stream for transmission. Note that encoder must use the same type as key_serializer_class below.

  • key_serializer_class

The default is the same as serializer_class

  • producer_type

Type of producer async Sending asynchronous execution messages sync Sending synchronous execution messages

  • queue_buffering_max_ms

In asynchronous mode, the message is cached at the set time and sent once

  • queue_buffering_max_messages

In asynchronous mode, the maximum number of messages to wait

  • queue_enqueue_timeout_ms

In asynchronous mode, if the waiting time for entering the queue is set to 0, the queue is either entered or discarded

  • batch_num_messages

Maximum number of messages sent at a time in asynchronous mode if queue_buffering_MAX_messages or queue_enqueue_timeout_ms limits are triggered

The above is the use of relatively important parameters example, more parameters can be optional can follow github.com/joekiller/l… View the output default parameter.

Tips:

By default, the plug-in uses JSON encoding to input and output corresponding messages. In the process of message transmission, logstash adds corresponding information such as timestamp and hostname to the message encoding by default. If you do not want the above information (usually for message forwarding), you can use the following configuration, for example:

 output {
    kafka {
        codec => plain {
            format => "%{message}"
        }
    }
}
Copy the code

HDFS

  • Github.com/dstore-dbap…

This plugin based on WebHDFS api of Hadoop, it just POST data to WebHDFS port. So, it’s a native Ruby code.

output { hadoop_webhdfs { workers => 2 server => "your.nameno.de:14000" user => "flume" path => "/user/flume/logstash/dt=%{+Y}-%{+M}-%{+d}/logstash-%{+H}.log" flush_size => 500 compress => "snappy" idle_flush_time => 10 Retry_interval => 0.5}}Copy the code
  • Github.com/avishai-ish…

This plugin based on HDFS api of Hadoop, it import java classes like org.apache.hadoop.fs.FileSystemetc.

Configuration

input {
    hdfs {
        path => "/path/to/output_file.log"
        enable_append => true
    }
}
Copy the code

Howto run

The CLASSPATH = $(find/path/to/hadoop - the name '*. Jar' | tr '\ n' ':') : / etc/hadoop/conf: / path/to/logstash - 1.1.7 - monCopy the code

scribe

Github.com/EverythingM…

input { scribe { host => "localhost" port => 8000 } } java -Xmx400M -server \ -cp Scribe_server. Jar: logstash - 1.2.1 - flatjar. Jar \ logstash runner agent \ -p/where/did/I/put/this/downloaded/plugin \ - f logstash.confCopy the code

Write your own plugin

As mentioned earlier, when running logstash, you can use the — pluginPath parameter to load your own plugins. So, what about plug-ins?

Plug-in format

A standard Logstash input plug-in has the following format:

require 'logstash/namespace'
require 'logstash/inputs/base'
class LogStash::Inputs::MyPlugin < LogStash::Inputs::Base
  config_name 'myplugin'
  milestone 1
  config :myoption_key.:validate= >:string.:default= >'myoption_value'
  public def register
  end
  public def run(queue)
  end
end
Copy the code

Most of these statements are common in the filter and output phases.

  • Config_name is used to define the name of the plugin to be written in the Logstash configuration file.
  • The milestone marks the development milestone of the plug-in. Generally, the milestone is 1, 2, and 3. If the plug-in is no longer maintained, the milestone is 0.
  • Config can define many, which are configurable parameters for the plug-in in the Logstash configuration file. Logstash is a nice way to verify that the data you receive is of the type you expect;
  • Register Logstash is a function that runs at startup. Some data that needs to stay in memory can be done in this step first. Like object initialization,filters/rubyIn the plug-ininitStatements, etc.

Tips:

If the logstash level is below milestone 3, the logstash level is not stable enough by default. During the startup phase, when the plug-in is read, a message like the following will be output. The log level is WARN. This does not mean a runtime error! Just a hint that if the user encounters a bug, welcome to provide clues.

{: timestamp = > “the 2015-02-06 T10: is 312000 + 0800″. :message=>”Using milestone 2 input plugin ‘file’. This plugin should be stable, but if you see strange behavior, please let us know! For more information on plugin renamed, see logstash.net/docs/1.4.2-…” , :level=>:warn}

The key approach to plug-ins

The run method is unique to the input plug-in. In the run method, you must implement a long-running program (the simplest being the loop instruction). The queue << event statement must then be called each time the data is received and processed as an event. An input process is complete.

If it is a filter plug-in, change it to:

require 'logstash/filters/base'
class LogStash::Filters::MyPlugin < LogStash::Filters::Base
  public def filter(event)
  end
end
Copy the code

The output plug-in is:

require 'logstash/outputs/base'
class LogStash::Outputs::MyPlugin < LogStash::Outputs::Base
  public def receive(event)
  end
end
Copy the code

In addition, in order not to lose data when terminating the process, it is recommended to implement the following method. Once this method is implemented, logstash will be called automatically when shutdown occurs:

public def teardown
end
Copy the code

Recommended reading

  • Extending logstash
  • Plugin Milestones

Why JRuby? Can you run it with MRI?

Those of you who know something about logging frameworks know that most of them are written in Java, which has natural advantages for large-scale systems. Another new generation of Fluentd is a standard Ruby product (Matz’s Ruby Interpreter). JRuby is used to implement logstash.

Jordan Cisse has written about this issue many times. To avoid the word count, here is his gist address:

  • Time sucksThis article is about the performance test of Time objects, the fastest generation method issprintfMethods MRI performance was 82600 call/ SEC, 131000 call/ SEC for JRuby1.6.7, and 215000 call/ SEC for JRuby1.7.0.
  • Comparing egexp patterns speedsThis article is about the performance test of regular expression, using the uniform re as(? -mix:('(? : [^ \ \ '] + | (? : \ \.) ')) +) *MRI1.9.2 matches/ SEC and JRuby1.6.5 matches/ SEC.
  • Logstash performance under rubyThis article is about the logstash data flow performance test, usinginputs/generatorPlug-ins generate data,outputs/stdoutPv tool to record statistics. Results MRI1.9.3 is 4000 events/ SEC and JRuby1.7.0 is 25000 events/ SEC.

Chances are you’re already running LogStash and find yourself with far more online data than this test — that’s because Jordan Cisse was developing LogStash in his spare time until 2013 and never used it on his own. So a lot of the tests were done on his own computer.

After getting a lot of attention for Logstash, the author published “Logstash Needs Full Time Love”, stating this and asking for a full-time job developing Logstash, and laying out the roadmap for post-1.1.0. (It turns out these needs weren’t urgent at the time, because most of them, with the exception of Kibana, are still missing.)

Further down the timeline, in 2011, you’ll see that logstash was originally written in MRI1.8.7 as well! The switch to JRuby was made official after the GROk module was rewritten from a C extension to an FFI extension.

Around the time of the language switch, Jordan Cisse published LogStash, Why JRuby? You can read it.

In fact, to this day, traces of multiple Ruby implementations (RUBY_ENGINE variable determination everywhere) are still scattered throughout the Logstash code, and the authors try to ensure that as much code as possible runs on MRI.

As a simple indication, in the plugin-independent core code, only the LogStash::Event generated @TIMESTAMP field using Java’s Joda library is unique to JRuby. Slightly modified into Ruby’s own Time library, you can run on MRI. Filters /date and outputs/ ElasticSearch are the only java-specific plug-ins.