Intro: The logstash problems recorded last year are not so clear because they involve a lot and are messy. Today, I found that it is too long to write, so I plan to divide them into 2-4 articles and discuss them one by one in several small pieces.

System overview: 1) Events (logs) generated by each service are supported by another Event mechanism. This paper discusses how to transmit across data centers (computer rooms). 2) Universal, suitable for the design of each language implementation, so based on file transfer, that is, the service in the local production of rolling logs, components to tail file way, these data will be immediately transferred to another machine room Kafka. Kafka 0.10.0.1 is not supported by this version of the logstash file. The input of the logstash file is a scroll log file. The output of the logstash file is kafka 0.10.0.1. There is no resolution. The plugin enables metric, queue persistent, and disk hardware. 5) The file system is ext4, and the system inodes are reused. 6) The configuration or data posted in this article has been desensitized. If you have any questions, please ignore them.

This article first discusses two problems encountered: 1) Even with kill -15, data will be lost after the Logstash restart.2) Performance problems. The message size may be affected, but the maximum logstash upload rate is set at 12,000 / SEC.

Logstash configuration

input { file { path => ["/data/xxx-*.csv"] start_position => "beginning" type => "xyz" sincedb_write_interval => "1" sincedb_path => ".sincedbxxx" discover_interval => "1" tags => "xxx" } file { path => ["/data/yyy-*.csv"] start_position  => "beginning" type => "xyz" sincedb_write_interval => "1" sincedb_path => ".sincedbyyy" discover_interval => "1" tags => "yyy" } } filter { mutate { gsub => [ "message","[\\\\]{2,}n","", "message","[\\\\]{2,}r","" ] } metrics { meter => "events@%{[type]}@%{[path]}" meter => "events@%{[type]}@sum" ... clear_interval => 86401 } } output { if "metric" in [tags] { stdout { codec => rubydebug } ... } else { kafka { ... }}}Copy the code

Problem 1 – Data will be lost after the restart

Since inodes are reused by file systems on high-demand machines, the sincedb inodes loaded during a logstash restart may be the sincedb’s sincedB displacement records of the last file reported by another file, not this one, resulting in data loss. For example, since sincedb records the displacement of inodes and inodes, the logstash reboot loads these inodes and displacements. When inodes are reusable, there is a problem: If the inode=111222 has a history of “111222 20460”, the file file-2018030403 has a displacement of 20460(bytes). The inode 111222 was reused and the corresponding file was file-2018030503 with a size of 1020460 bytes. When we reboot, the logstash file loads the progress of the inode 111222. That is, file-2018030503 is actually read from 20460 bytes and reported, instead of the configuration that the file starts from 0, that is, the beginning of the file. This results in a loss of 20,460 bytes when file-2018030503 is reported (additional processing is done for line reads).

Inode/inode/inode/inode/inode/inode/inode

This is because when logstash determines a new change, it compares the size of the new file corresponding to the inode with the size of the file in the record. If the size is smaller than the record value, the record value will be cleared to zero:

2) When the file file-2018030503 corresponding to the 111222 inode is created, The fileWatch plugin will detect that the inode 111222 has a file size of 0. 3) 0 is less than 20460. Then the Logstash will update the memory record “111222 20460” to “111222 0”. This 0 is the configuration from beginning.









Update the logstash-input-file file


If inodes are equal, the system checks whether Filepath is equal. If inodes are not equal, the system considers the file as a new file.

Therefore, data is no longer lost due to inode reuse during restart. Here is the code analysis, a longer, if not interested, can skip. Attr_reader :discoverer, :watched_files_collection

----
processor在tail_mode->processor.rb
    def process_all_states(watched_files)
      process_closed(watched_files)
      return if watch.quit?
      process_ignored(watched_files)
      return if watch.quit?
      process_delayed_delete(watched_files)
      return if watch.quit?
      process_restat_for_watched_and_active(watched_files)
      return if watch.quit?
      process_rotation_in_progress(watched_files)
      return if watch.quit?
      process_watched(watched_files)
      return if watch.quit?
      process_active(watched_files)
    end
    。。。
    def process_watched(watched_files)
      logger.trace("Watched processing")
      to_take = @settings.max_active - watched_files.count{|wf| wf.active?}
      if to_take > 0
        watched_files.select {|wf| wf.watched?}.take(to_take).each do |watched_file|
          watched_file.activate
          if watched_file.initial?
            create_initial(watched_file)
          else
            create(watched_file)
          end
          break if watch.quit?
        end
      else
        now = Time.now.to_i
        if (now - watch.lastwarn_max_files) > MAX_FILES_WARN_INTERVAL
          waiting = watched_files.size - @settings.max_active
          logger.warn(@settings.max_warn_msg + ", files yet to open: #{waiting}")
          watch.lastwarn_max_files = now
        end
      end
    end
    。。。
    def create_initial(watched_file)
      @create_initial.handle(watched_file)
    end
------discove 入口
discover.rb
def discover_any_files(path, ongoing)
  fileset = Dir.glob(path).select{|f| File.file?(f)}
  logger.trace("discover_files",  "count" => fileset.size)
  logger.warn("discover_files",  "count" => fileset.size)
  fileset.each do |file|
    pathname = Pathname.new(file)
    new_discovery = false
    watched_file = @watched_files_collection.watched_file_by_path(file)
    if watched_file.nil?
      begin
        path_stat = PathStatClass.new(pathname)
      rescue Errno::ENOENT
        next
      end
      watched_file = WatchedFile.new(pathname, path_stat, @settings)
      new_discovery = true
      logger.info("discover_files handling:", "new:"=> new_discovery, "watched_file:" => watched_file.details)
    end
    # if it already unwatched or its excluded then we can skip
    next if watched_file.unwatched? || can_exclude?(watched_file, new_discovery)
    logger.trace("discover_files handling:", "new discovery"=> new_discovery, "watched_file details" => watched_file.details)
    if new_discovery
      watched_file.initial_completed if ongoing
      # initially when the sincedb collection is filled with records from the persistence file
      # each value is not associated with a watched file
      # a sincedb_value can be:
      #   unassociated
      #   associated with this watched_file
      #   associated with a different watched_file
      if @sincedb_collection.associate(watched_file)
        if watched_file.file_ignorable?
          logger.trace("Discoverer discover_files: #{file}: skipping because it was last modified more than #{@settings.ignore_older} seconds ago")
          logger.info("Discoverer discover_files: #{file}: skipping because it was last modified more than #{@settings.ignore_older} seconds ago")
          # on discovery ignorable watched_files are put into the ignored state and that
          # updates the size from the internal stat
          # so the existing contents are not read.
          # because, normally, a newly discovered file will
          # have a watched_file size of zero
          # they are still added to the collection so we know they are there for the next periodic discovery
          watched_file.ignore_as_unread
        end
        # now add the discovered file to the watched_files collection and adjust the sincedb collections
        @watched_files_collection.add(watched_file)
      end
    end
    # at this point the watched file is created, is in the db but not yet opened or being processed
  end
end
------最终发现重命名的inode文件调用是在 tail_mode->handlers->base.rb
base.rb
def handle(watched_file)
  logger.trace("handling: #{watched_file.filename}")
  logger.info("handling: #{watched_file.filename}")
  unless watched_file.has_listener?
    watched_file.set_listener(@observer)
  end
  handle_specifically(watched_file)
end
def add_new_value_sincedb_collection(watched_file)
  sincedb_value = get_new_value_specifically(watched_file)
  logger.trace("add_new_value_sincedb_collection", "position" => sincedb_value.position, "watched_file details" => watched_file.details)
  sincedb_collection.set(watched_file.sincedb_key, sincedb_value)
  sincedb_value
end
def get_new_value_specifically(watched_file)
  position = watched_file.position_for_new_sincedb_value
  value = SincedbValue.new(position)
  value.set_watched_file(watched_file)
  watched_file.update_bytes_read(position)
  value
end
--->
creat_initial.rb
module FileWatch module TailMode module Handlers
  class CreateInitial < Base
    def handle_specifically(watched_file)
      if open_file(watched_file)
        logger.trace("handle_specifically opened file handle: #{watched_file.file.fileno}, path: #{watched_file.filename}")
        logger.info("handle_specifically opened file handle: #{watched_file.file.fileno}, path: #{watched_file.filename}")
        add_or_update_sincedb_collection(watched_file)
      end
    end
    def update_existing_specifically(watched_file, sincedb_value)
      position = watched_file.last_stat_size
      if @settings.start_new_files_at == :beginning
        position = 0
      end
      logger.trace("update_existing_specifically - #{watched_file.path}: seeking to #{position}")
      logger.info("update_existing_specifically - #{watched_file.path}: seeking to #{position}")
      watched_file.update_bytes_read(position)
      sincedb_value.update_position(position)
    end
  end
end end end
--->
base.rb
def add_or_update_sincedb_collection(watched_file)
  sincedb_value = @sincedb_collection.find(watched_file)
  if sincedb_value.nil?
    sincedb_value = add_new_value_sincedb_collection(watched_file)
    watched_file.initial_completed
  elsif sincedb_value.watched_file == watched_file
    update_existing_sincedb_collection_value(watched_file, sincedb_value)
    watched_file.initial_completed
  else
    msg = "add_or_update_sincedb_collection: found sincedb record"
    logger.trace(msg,
      "sincedb key" => watched_file.sincedb_key,
      "sincedb value" => sincedb_value
    )
    # detected a rotation, Discoverer can't handle this because this watched file is not a new discovery.
    # we must handle it here, by transferring state and have the sincedb value track this watched file
    # rotate_as_file and rotate_from will switch the sincedb key to the inode that the path is now pointing to
    # and pickup the sincedb_value from before.
    msg = "add_or_update_sincedb_collection: the found sincedb_value has a watched_file - this is a rename, switching inode to this watched file"
    logger.trace(msg)
    existing_watched_file = sincedb_value.watched_file
    if existing_watched_file.nil?
      sincedb_value.set_watched_file(watched_file)
      logger.trace("add_or_update_sincedb_collection: switching as new file")
      watched_file.rotate_as_file
      watched_file.update_bytes_read(sincedb_value.position)
    else
      sincedb_value.set_watched_file(watched_file)
      logger.trace("add_or_update_sincedb_collection: switching from...", "watched_file details" => watched_file.details)
      watched_file.rotate_from(existing_watched_file)
    end
  end
  sincedb_value
end
watched_file.rb-->
def position_for_new_sincedb_value
  if @initial
    # this file was found in first discovery
    @settings.start_new_files_at == :beginning ? 0 : last_stat_size
  else
    # always start at the beginning if found after first discovery
    0
  end
endCopy the code

4.1.10 The similar code is improved and the corresponding logic can be found, which can be summarized as follows:

  1. Watch. Rb discovery mechanism, scan files in current directory, and: 1) SincedB [key]= sincedB (sincedB [key]=size) 2) If sincedB [key]= sincedB (sincedB [key]=size) 2) If sincedB [key]= sincedB (sincedB [key]=size) Remove the key from sincedb, but put watched_files and set state to WATCHED 3) Others…
  2. Tail_mode -> process_watched Periodically in process_all_states when the file is found, set watched_file. Activate and call create_initial, Create_initial is Handlers: : CreateInitial. New (self, sincedb_collection, observer, @ Settings) class, Handlers ->base. Rb The creat_initial.rb -> handLE_SPECIFICALLY method handle_SPECIFICALLY calls add_OR_UPDATe_sincedb_COLLECTION of Base.rb, the final call Sincedb_value = add_new_value_sincedb_COLLECTION (watched_file) Add_new_value_sincedb_collection passes Get_new_value_specifically retrieves sincedb_value, i.e. the current file offset(get_new_value_SPECIFICALLY has the following logic, 0 if beginning is configured, Size of last stat otherwise)

Once that’s done, the next step is process_active, which determines whether the file is grown by monitoring whether its @size is greater than @bytes_read

It is important to note that it is a good idea to manually modify the.sincedb file before restarting it by upgrading the logstuck-input-file. Otherwise, this will happen until all the relevant row inodes in the.sincedb file have the Filepath column.

More to think about: Inode equality and Logstash solve reuse problems based on filepath. If a new file with the same name or location changes, it should still cause data loss or duplication. In fact, you can determine the time when the file is created. However, the system clock rollback problem cannot be solved, that is, only forward time can be processed.

Performance issues

It was initially thought that 9ms network latency across the data center or kafka parameter configuration caused low write kafka performance. This is not easy to optimize, but has been ruled out and is not the cause. The performance bottleneck of Kafka in the same network segment was still maintained at 12,000 /s. The investigation process is quite lengthy, so I will not elaborate here, but simply write down the conclusion:

The bottleneck is the queue.type: persisted Ack queue configured in config/logstash. Yml.Copy the code

The test machine is my MAC machine, and the hard disk is SSD/8G memory. If memory mode is used, the performance can be at least 36,000 / second. However, in order to avoid data loss, no changes can be made during optimization. Secondly, the input is divided into two parts, and the performance is slightly improved. On the production machine, the performance can reach at least 20,000pcs/second, which is improved by about 50%+. More: if it is SSD, that is, I develop the machine test, but input can be up to 28,000 / SEC, double input can be up to 40,000 / SEC

It was initially thought that increasing the number of Worker Threads in the configuration would improve performance, but it wasn’t. Worker Threads are only multithreading for pipelines, handling events, and the bottleneck is the interaction between input and Ackqueue. I can’t find the configuration parameters for multi-threaded input, so if you find them, please email aXRob21hc2xhdUBxcS5jb20= (base64).

other

Subsequent articles, will also analyze logstash using process encountered a strange machine problem, and the transmission of data integrity, security issues, and will analyze the logstash WrappedSynchronousQueue/WrappedAckedQueue mechanism, We will also analyze how to solve the problem of transmitting Kafka without losing data.

Basically, although the Logstash queue is called a persistent ACK queue, it sounds like a transfer failure is retried to keep the data intact. In fact, it is not, but this ACK is not even for a single integrity transfer, that is, this ACK mechanism is the same as Flume source-channel-sink Transaction mechanism, even if Flume supports persistent File Channel, This Transaction is for the Source and Channel (Channel and sink) transmission (or transfer) between this movement is transactional, rather than keep data must be transmitted successfully, the Transaction is to transfer the action, Instead of transmitting the entire process. This is just as fundamentally incorrect as most people understand that the new version of Kafka supports the exactly once concept.

Conclusion:

1. Maybe you or many articles think it wouldn’t be hard to develop a universal real-time log reporting tool, and I thought so at first, even if it only required support for local files to be read and pushed to Kafka. But if you want to understand how the Logstash team works, you can cancel that idea unless you have more time than the Logstash team. 2. Perhaps you can find many online analyses comparing the pros and cons of several open source tools for pandemic log reporting. Considering that everyone has different development languages/habits/experiences, and even container-based reporting mechanisms, it is not appropriate to recommend which one to use. But the biggest (or only) disadvantage of LogStash over other implementations is almost certainly that it is written in JRuby. 3. If you have to recommend, it’s best to pick one, step through all the holes and defects (unless the component is no longer maintained), and understand the problems before deciding which one to use. 4. Try not to use logstash to parse data unless absolutely necessary. Repeat three times! First, performance is out of control, and more importantly, parsing exceptions is out of control, which is a bit difficult for non-Ruby developers. So I recommend leaving it to the upstream and downstream as much as possible, and just using Logstash as a pure unstructured data transfer tool, despite its rich parsing plug-ins. However, if you are not very sensitive to data exceptions, using the Logstash plug-in does not affect the resolution.