Time is always moving on, tick-tock, tick-tock.

Scheduled tasks need to be used in many scenarios. For example, the crontab in Linux system basically adopts the idea of time wheel in the bottom layer of scheduled time scheduling. The time wheel is divided into simple and layered, and the following records are layered. If acks=all is configured, the producer sends a request. If the ISR(all copies synchronized with the primary partition) returns success, the request will be marked as complete. Otherwise, the request will be considered incomplete.

Consider the following: if you implement a delay request yourself, how do you handle it? The conjecture is that there is a data structure that sorts scheduled tasks according to the expiration time, and then dynamically puts and takes out tasks. Meanwhile, a timer polls the data structure to determine whether the scheduled tasks taken out are due. If they are due, the data structure will continue to wait for the next period. In the case of multiple instances, the data structure needs to be stored in a separate place, and multiple instances need to be handled with only one instance fetching task to be performed.

Think of is in the existing projects every once in a while time run the task, the timing of the solution is to task configuration and task information is stored in the database, plays a timer service, every time to search the database configuration tasks regularly, check whether expires, if expire is executed, with the method of optimistic locking to prevent competition.

But what could such a problem be?

First of all, what is the timing interval? If there are two tasks at the same time, the timing time of one task is 1 millisecond, and the timing time of one task is 100 days, then it takes 1 millisecond to scan the task, but a lot of time in the middle will be wasted. And the database needs to be divided into tables, if you do not do so at the beginning, the subsequent will be very complicated; When scheduled tasks are of a large order of magnitude, it is easy to cause a single point of bottleneck.

Secondly, if the data structure storing scheduled tasks is the smallest heap, the time complexity of adding and deleting tasks is O(nlogn), which is not satisfactory when the number of tasks is large, and can be reduced to O(1) for time round.

We’ll see how Kafka’s time wheel solves these two problems later.

In Kafka, not every partition copy is scheduled for a topic. There is only one leader. When something goes wrong, the leader role is transferred to another copy for election. However, the time wheel exists on the leader, so there is no case of multiple instances timing at the same time, which is the difference from the scheme in the above project. Therefore, the scheme can be improved in the previous project. The timing information of the scheduled task and the general information of the task can be stored separately, and the instance can have a separate role to take charge of the timing management. What if the service fails at this time?

Time wheel introduction

The time wheel is like a watch dial, the second hand, hour hand, minute hand, 60 seconds =1 minute, 60 minutes =1 hour, each hand is ticking forward, corresponding to the data structure of the time wheel and the relationship between each part is shown in the figure below.

To introduce the concept of time wheel, Kafka’s timer only holds the first layer of time wheel TimingWhere, and each layer of time wheel holds the overflowWheel reference of the previous layer of time wheel. Each interval of a time wheel is called a bucket, and each time wheel has the same number of buckets. In a bucket, there is a bidirectional linked list representing the tasks to be executed at the same time (presumably). The bidirectional linked list is implemented by the sentinel mechanism, and each element is a TimerTaskEntry.

DelayQueue is not as good at inserting and deleting elements as TimingWheel, so why is it used here? For problem one, if the time difference between two tasks is too long, the timer will do an “empty wait”. Kafka uses the delayQueue to store the TimerTaskList. DelayQueue essentially implements PriorityQueue, the minimum heap lookup min time complexity is O(1), and not every TimerTaskEntry is inserted into DelayQueue, but the bidirectional linked list is its element, therefore, The DelayQueue will only be inserted if the bucket is empty and a new element is added to it. Kafka has a thread that takes min of tasks from the DelayQueue and proceeds to the time wheel instead of one second at a time.

See appendix for specific codes

Kafka manages the time wheel

The Timer interface and the SystemTimer class in Kakfa are responsible for managing the time wheel and integrating it into the framework

The timer interface

trait Timer {
  def add(timerTask: TimerTask): Unit
  def advanceClock(timeoutMs: Long): Boolean
  def size: Int
  def shutdown(): Unit
}
Copy the code

SystemTimer class

The implementation class of the Timer interface is a Timer class that encapsulates the time wheel object and provides delay request management for the Purgatory buffer.

ExecutorName: Purgatory name. There are different Purgatory in Kafka, such as Produce buffer for processing producer delayed requests, Fetch buffer for processing consumer delayed requests, and so on. Produce and Fetch are executornames

StartMs: Indicates the start time of the SystemTimer, in milliseconds

class SystemTimer(executorName: String, tickMs: Long = 1, wheelSize: Int = 20, startMs: Long = time.system.hiresclockms) extends Timer {private[this] val taskExecutor = Executors.newFixedThreadPool(1, (runnable: Runnable) => KafkaThread.nonDaemon("executor-" + executorName, runnable)) private[this] val delayQueue = new DelayQueue[TimerTaskList]() private[this] val taskCounter = new AtomicInteger(0) private[this] val timingWheel = new TimingWheel( tickMs = tickMs, wheelSize = wheelSize, startMs = startMs, taskCounter = taskCounter, delayQueue ) private[this] val readWriteLock = new ReentrantReadWriteLock() private[this] val readLock = ReadWriteLock. ReadLock () private[this] val writeLock = readWriteLock. WriteLock () Private def addTimerTaskEntry(timerTaskEntry: timerTaskEntry): Unit = {if (! Timingwheel.add (timerTaskEntry)) {// If (! Cancelled) // The time wheel has been cancelled. Submit (timertaskentry.timerTask)}}} def advanceClock(timeoutMs: Long): Boolean = { var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) if (bucket ! = null) {// Get writelock. lock() try {while (bucket! = null) {timingWheel.advancecLock (bucket.getexpiration ()) // Advance to the expiration time of the bucket bucket.flush(reinsert) // Write all tasks under the bucket back to the time wheel Bucket = delayqueue.poll () // Get the next bucket}} finally {writelock.unlock ()} true} else {false}} Tasks on the layer 2 time wheel can be demoted to the layer 1 time wheel, which is implemented by the Reinsert method.Copy the code

So far, the bottom layer has encapsulated the time wheel and provided the upper layer with a convenient way to operate the time wheel, so how does the upper class operate the code as follows

DelayedOperation class

TimerTask inherits the TimerTask class, supports the cancellation of delayed requests, supports binding to TimerTaskEntry.

Completed: Whether the delayed request was completed

TryCompletePending: The thread that picked up the lock can check again to see if it is finished

There are seven methods, because it is the abstract class that will be a subclass, forceComplete, isCompleted, onExpiration, onComplete, tryComplete, maybeTryComplete, run

So basically looking at the maybeTryComplete() method, if you have thread A and thread B, and thread A has the lock, and it detects that it’s not done, and thread B can’t get the lock and it’s blocked, and it’s actually done, then you have two threads that are blocked and the request times out, To optimize this comes the maybeTryComplete method.

abstract class DelayedOperation(override val delayMs: Long, lockOpt: Option[Lock] = None) extends TimerTask with Logging { private val completed = new AtomicBoolean(false) private val tryCompletePending = new AtomicBoolean(false) private[server] val lock: Lock = lockOpt.getOrElse(new ReentrantLock) private[server] def maybeTryComplete(): Boolean = { var retry = false var done = false do { if (lock.tryLock()) { try { tryCompletePending.set(false) done = TryComplete ()} finally {lock.unlock()} Retry = trycompletepending.get ()} else {// If the lock is not available, another thread is holding the lock. // Set tryCompletePending to true to give the thread holding the lock a chance to retry =! tryCompletePending.getAndSet(true) } } while (! isCompleted && retry) done }Copy the code

DelayedOperationPurgatory class

PurgatoryName: The name of purgatory, the buffer that stores delayed requests

brokerId

There are also two built-in classes: Watchers and WatcherList, which is a key based delay request list, key string types can be consumer group, said the topic partition TopicPartitionOperationKey type, etc., Can be used to monitor the finalizable status of deferred requests saved in it. WatcherList has a watchersByKey field, which is a Pool, essentially a ConcurrentHashMap. Key is of any type, and Value is a set of Watchers objects corresponding to key.

final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, timeoutTimer: Timer, brokerId: Int = 0, purgeInterval: Int = 1000, reaperEnabled: Boolean = true, timerEnabled: Boolean = true) extends Logging with KafkaMetricsGroup { def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {assert(watchkeys. nonEmpty, "The watch key list can't be empty") Return true var isCompletedByMe = operation.tryComplete() if (isCompletedByMe) return true var watchCreated = false IsCompleted) return false if (operation.iscompleted) return false // WatchForOperation (key, operation) // Set the watchCreated flag to indicate that operation has been added to the watchList. If (! WatchCreated) {watchCreated = true / / update the total number of requests in purgatory estimatedTotalOperations. IncrementAndGet ()}} / / try again to complete the delay request IsCompletedByMe = operation. MaybeTryComplete () if (isCompletedByMe) return true / / if still can't complete, join the expiration queue if (! operation.isCompleted) { if (timerEnabled) timeoutTimer.add(operation) if (operation.isCompleted) { operation.cancel() }  } false }Copy the code

Trying to connect the dots

Time wheel basic class source code reading (bottom up)

TimerTask classes

trait TimerTask extends Runnable { val delayMs: Private [this] var timerTaskEntry: timerTaskEntry = null} Private [this] var timerTaskEntry: timerTaskEntryCopy the code

TimerTaskEntry class

private[timer] class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) extends Ordered[TimerTaskEntry] { @volatile var list: TimerTaskList = null var next: TimerTaskEntry = null var prev: TimerTaskEntry = null // if this timerTask is already held by an existing timer task entry, // setTimerTaskEntry will remove it. if (timerTask ! = null) timerTask.setTimerTaskEntry(this) def cancelled: Boolean = { timerTask.getTimerTaskEntry ! = this} def remove(): Unit = {var currentList = list; = null) { currentList.remove(this) currentList = list } } }Copy the code

TimerTaskList class

TaskCounter records how many timed tasks are in the linked list, with expiration as the start time of the bucket

The setExpiration method uses the CAS atomic method, and returns true if the expiration time is changed, because Kafka uses delayQueue to advance the TimerTaskList. When the bucket expires kafka reuses the bucket by resetting the expiration time and inserting it back into the delayQueue.

private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { private[this] val root = new TimerTaskEntry(null, -1) root.next = root root.prev = root private[this] val expiration = new AtomicLong(-1L) def setExpiration(expirationMs:  Long): Boolean = { expiration.getAndSet(expirationMs) ! = expirationMs } }Copy the code

TimingWheel class

TickMs: Every other time, “tick”

WheelSize: the size of the bucket

StartMs: indicates the creation time of the time wheel object

TaskCounter: Total number of scheduled tasks in a time wheel

Queue: Holds the TimerTaskList object, which is used to advance the time wheel

Interval: indicates the total length of the time round at the layer. For example, there are 20 buckets at the first layer, each of which is 1ms and the total length of the layer is 20ms. Then there are 20 buckets at the second layer, each of which is 20ms and the total length of the layer is 400ms.

Buckets: At the bottom of the time wheel is an array

CurrentTime: the current timestamp, tweaked to be an integer multiple of the maximum tick less than the currentTime

OverflowWheel: Create a time wheel as needed. If the first time wheel does not work, create a second time wheel. If the second time wheel does not work, continue up until it can be lowered

Since each round is multiplied, it does not take many rounds to satisfy the requirement.

private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) { private[this] val interval = tickMs * wheelSize private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) } private[this] var currentTime = startMs  - (startMs % tickMs) // rounding down to multiple of tickMs // overflowWheel can potentially be updated and read by two  concurrent threads through add(). // Therefore, it needs to be volatile due to the issue of Double-Checked Locking pattern with JVM @volatile private[this] var overflowWheel: TimingWheel = null def add(timerTaskEntry: TimerTaskEntry): Boolean = { val expiration = timerTaskEntry.expirationMs if (timerTaskEntry.cancelled) { false } else if (expiration < CurrentTime + tickMs) {// Expiration false} else if (expiration < currentTime + Interval) {val virtualId = expiration / tickMs val bucket = buckets((virtualId % wheelSize.toLong).toInt) bucket.add(timerTaskEntry) if (bucket.setexpiration (virtualId * tickMs)) {queue.offer(bucket) // If the expiration time has been changed, } true} else {if (overflowWheel == null) addOverflowWheel() Add (timerTaskEntry)}} def advanceClock(timeMs: Long): If (timeMs >= currentTime + tickMs) {currentTime = timeMs - (timeMs % tickMs) // If there is an upper time wheel, update currentTime and recursively prepare for the advance. // The advance thread is initiated by the background thread Reaper. = null) overflowWheel.advanceClock(currentTime) } }Copy the code