Spark Streaming uses Direct mode to read Kafka data source flows. It is divided into two parts: the driver side, which assigns offsetRanges (fromOffset — untilOffset) to each batch, and the executor side, which assigns offsetRanges to each batch. Pull data from the Kafka server according to the allocation plan assigned by the driver.

Driver side entrance is org. Apache. Spark. Streaming. Kafka010. DirectKafkaInputDStream start method, this method is invoked when flow task start, is used to get the starting offset value

  override def start(a): Unit = {
    val c = consumer
    paranoidPoll(c)
    if (currentOffsets.isEmpty) {
      currentOffsets = c.assignment().asScala.map { tp =>
        tp -> c.position(tp)
      }.toMap
    }

    // don't actually want to consume any messages, so pause all partitions
    c.pause(currentOffsets.keySet.asJava)
  }
Copy the code

Get the consumer object first:

  def consumer(a): Consumer[K, V] = this.synchronized {
    if (null == kc) {
      kc = consumerStrategy.onStart(currentOffsets.mapValues(l => new java.lang.Long(l)).asJava)
    }
    kc
  }
Copy the code

Determine if the attribute KafkaConsumer already exists, and if it does not, create it first, calling the onStart method of ConsumerStrategy. There are three implementation classes

Subscribe Subscribe Subscribe Subscribe Subscribe Subscribe Subscribe Subscribe Subscribe Subscribe Subscribe Subscribe Subscribe Subscribe Subscribe Subscribe Subscribe Subscribe Subscribe Subscribe Subscribe Subscribe Subscribe Subscribe Subscribe Subscribe Take the common Subscribe method as an example and look at its onStart implementation:

  def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
    val consumer = new KafkaConsumer[K, V](kafkaParams)
    consumer.subscribe(topics)
    val toSeek = if (currentOffsets.isEmpty) {
      offsets
    } else {
      currentOffsets
    }
    if(! toSeek.isEmpty) {// work around KAFKA-3370 when reset is none
      // poll will throw if no position, i.e. auto offset reset none and no explicit position
      // but cant seek to a position before poll, because poll is what gets subscription partitions
      // So, poll, suppress the first exception, then seekval aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) val shouldSuppress = aor ! =null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE"
      try {
        consumer.poll(0)}catch {
        case x: NoOffsetForPartitionException if shouldSuppress =>
          logWarning("Catching NoOffsetForPartitionException since " +
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See KAFKA-3370")
      }
      toSeek.asScala.foreach { case (topicPartition, offset) =>
        consumer.seek(topicPartition, offset)
      }
      // we've called poll, we must pause or next poll may consume messages and set position
      consumer.pause(consumer.assignment())
    }

    consumer
  }
}
Copy the code

Create a New KafkaConsumer object and call subscribe to the list of topics. If the offsets passed in are not empty, poll(0) is used to try to connect the Kafka cluster for automatic partition allocation. Get the partitions of the list of topics to which you subscribed, call seek to specify the consumption group offset value to the offsets passed in, and call pause to pause the poll call. The KafkaConsumer object is then returned.

The method is called paranoidPoll to reset the offset of each partition:

  /** * The concern here is that poll might consume messages despite being paused, * which would throw off consumer position. Fix position if this happens. */
  private def paranoidPoll(c: Consumer[K, V]): Unit = {
    val msgs: ConsumerRecords[K, V] = c.poll(0)
    if(! msgs.isEmpty) {// position should be minimum offset per topicpartition
      msgs.asScala.foldLeft(Map[TopicPartition, Long]()) { (acc, m) =>
        val tp = new TopicPartition(m.topic, m.partition)
        val off = acc.get(tp).map(o => Math.min(o, m.offset)).getOrElse(m.offset)
        acc + (tp -> off)
      }.foreach { case (tp, off) =>
        logInfo(s"poll(0) returned messages, seeking $tp to $off to compensate")
        c.seek(tp, off)
      }
    }
  }
Copy the code

Finally, call the KafkaConsumer pause method and pause consuming data on each partition (i.e., the next call to poll does not return a message).

The process is as follows:

When the driver side starts scheduling batch tasks, each batch starts generating KafkaRDD by calling the Compute method of DirectKafkaInputDStream. Now analyze the method:

  override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
    val untilOffsets = clamp(latestOffsets())
    val offsetRanges = untilOffsets.map { case (tp, uo) =>
      val fo: Long = currentOffsets(tp)
      if (fo > uo) {
        logWarning(s"Beginning offset ${fo} is after the ending offset ${uo} " +
          s"for topic ${tp.topic()} partition ${tp.partition()}. " +
          "You either provided an invalid fromOffset, or the Kafka topic has been damaged")
        OffsetRange(tp.topic, tp.partition, fo, fo)
      } else {
        OffsetRange(tp.topic, tp.partition, fo, uo)
      }
    }
    val useConsumerCache = context.conf.getBoolean("spark.streaming.kafka.consumer.cache.enabled".true)
    logInfo(s"create KafkaRDD for this batch ${validTime}")
    val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray,
      getPreferredHosts, useConsumerCache)

    // Report the record number and metadata of this batch interval to InputInfoTracker.
    val description = offsetRanges.filter { offsetRange =>
      // Don't display empty ranges.offsetRange.fromOffset ! = offsetRange.untilOffset }.map { offsetRange => s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
        s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
    }.mkString("\n")
    // Copy offsetRanges to immutable.List to prevent from being modified by the user
    val metadata = Map(
      "offsets" -> offsetRanges.toList,
      StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
    val inputInfo = StreamInputInfo(id, rdd.count, metadata)
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

    currentOffsets = untilOffsets
    commitAll(a)
    Some(rdd)
  }
Copy the code

The whole process is as follows:

When KafkaRDD is generated, offsetRanges are passed in and the partitions of the RDD are generated according to this parameter, which corresponds to the partitions of the subscribed topics.

Now that the driver logic is analyzed, let’s look at the Executor side logic.

See KafkaRDD compute method, is actually built an iterator object, if is to read the topic of non continuous offset used CompactedKafkaRDDIterator, otherwise use KafkaRDDIterator.

KafkaRDDIterator next iterator next iterator next iterator next iterator

  override def next(a): ConsumerRecord[K, V] = {
    if(! hasNext) {throw new ju.NoSuchElementException("Can't call getNext() once untilOffset has been reached")
    }
    val r = consumer.get(requestOffset, pollTimeout)
    requestOffset += 1
    r
  }
Copy the code
  /** * Get the record for the given offset, waiting up to timeout ms if IO is necessary. * Sequential forward access will use buffers, but random access will be horribly inefficient. */
  def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
    logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset")
    if(offset ! = nextOffset) { logInfo(s"Initial fetch for $groupId $topic $partition $offset")
      seek(offset)
      poll(timeout)
    }

    if(! buffer.hasNext()) { poll(timeout) } require(buffer.hasNext(), s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
    var record = buffer.next()

    if(record.offset ! = offset) { logInfo(s"Buffer miss for $groupId $topic $partition $offset")
      seek(offset)
      poll(timeout)
      require(buffer.hasNext(),
        s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
      record = buffer.next()
      require(record.offset == offset,
        s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset " +
          s"got offset ${record.offset} instead. If this is a compacted topic, consider enabling " +
          "spark.streaming.kafka.allowNonConsecutiveOffsets"
      )
    }

    nextOffset = offset + 1
    record
  }
Copy the code

As you can see, the logic for pulling data is to start at the specified offset and pull a batch to the buffer (a memory set iterator), and then each time next fetches a chunk from the buffer and runs out of buffers, it pulls another batch.

At this point, the whole data pulling process ends.