Offer to come, dig friends take it! I am participating in the 2022 Spring Recruit Punch card activity. Click here for details.

This paper is mainly based on redis 6.2 source code to analyze the data structure and common operations of timing events.

The data structure

In Redis, aeTimeEvent structure is used to create a scheduled task event, the code is as follows:

/* Time event structure */
typedef struct aeTimeEvent {
    / / identifier
    long long id; /* time event identifier. */
    // Timing nanoseconds
    monotime when;
    // The timer callback function
    aeTimeProc *timeProc;
    // Unregister the timer callback function
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *prev;
    struct aeTimeEvent *next;
    int refcount; /* refcount to prevent timer events from being * freed in recursive time event calls. */
} aeTimeEvent;
Copy the code

Common operation

1. Create scheduled events

The most important timing function in Redis, which executes periodically, uses the serverCron function. Due to the small number of scheduled tasks in Redis, it is not strictly sorted according to the expiration time, but according to the id increment + header method to ensure the basic order.

if (aeCreateTimeEvent(server.el, 1, serverCron, NULL.NULL) == AE_ERR) {
  serverPanic("Can't create event loop timers.");
  exit(1);
}

// Create a timer object
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;

    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    te->when = getMonotonicUs() + milliseconds * 1000;
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    te->prev = NULL;
    / / head interpolation
    te->next = eventLoop->timeEventHead;
    te->refcount = 0;
    if (te->next)
        te->next->prev = te;
    eventLoop->timeEventHead = te;
    return id;
}
Copy the code

2. A scheduled event is triggered

Redis uses IO multiplexing to perform scheduled tasks.

  • To find the nearest timing event, seeusUntilEarliestTimer
/* How many microseconds until the first timer should fire. * If there are no timers, -1 is returned. * * Note that's O(N) since time events are unsorted. * Possible optimizations (not needed by Redis so far, but...) : * 1) Insert the event in order, so that the nearest is just the head. * Much better but still insertion or deletion of timers is O(N). * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)). */
static int64_t usUntilEarliestTimer(aeEventLoop *eventLoop) {
    aeTimeEvent *te = eventLoop->timeEventHead;
    if (te == NULL) return - 1;

    aeTimeEvent *earliest = NULL;
    while (te) {
        if(! earliest || te->when < earliest->when) earliest = te; te = te->next; } monotime now = getMonotonicUs();return (now >= earliest->when) ? 0 : earliest->when - now;
}
Copy the code

In this case, the time complexity may be high.

  • Update the remaining expiration time. Why? As we mentioned earlier, I/O reuse may need to be updated because I/O events return.
if(flags & AE_TIME_EVENTS && ! (flags & AE_DONT_WAIT)) usUntilTimer = usUntilEarliestTimer(eventLoop);if (usUntilTimer >= 0) {
  tv.tv_sec = usUntilTimer / 1000000;
  tv.tv_usec = usUntilTimer % 1000000;
  tvp = &tv;
} else {
  if (flags & AE_DONT_WAIT) {
    / / don't wait
    tv.tv_sec = tv.tv_usec = 0;
    tvp = &tv;
  } else {
    /* Otherwise we can block */
    tvp = NULL; /* wait forever */}}Copy the code

3. Execute scheduled events

Delete one-time execution directly, and periodically add to the list again after execution.

/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop) {
  int processed = 0;
  aeTimeEvent *te;
  long long maxId;

  te = eventLoop->timeEventHead;
  maxId = eventLoop->timeEventNextId- 1;
  monotime now = getMonotonicUs();
  
  // Delete the timer
  while(te) {
    long long id;
		
    // Delete the event in the next round
    /* Remove events scheduled for deletion. */
    if (te->id == AE_DELETED_EVENT_ID) {
      aeTimeEvent *next = te->next;
      /* If a reference exists for this timer event, * don't free it. This is currently incremented * for recursive timerProc calls */
      if (te->refcount) {
        te = next;
        continue;
      }
      if (te->prev)
        te->prev->next = te->next;
      else
        eventLoop->timeEventHead = te->next;
      if (te->next)
        te->next->prev = te->prev;
      if (te->finalizerProc) {
        te->finalizerProc(eventLoop, te->clientData);
        now = getMonotonicUs();
      }
      zfree(te);
      te = next;
      continue;
    }
    
    if (te->id > maxId) {
      te = te->next;
      continue;
    }

    if (te->when <= now) {
      int retval;

      id = te->id;
      te->refcount++;
      The // timeProc function returns retVal for the interval at which time events are executed
      retval = te->timeProc(eventLoop, id, te->clientData);
      te->refcount--;
      processed++;
      now = getMonotonicUs();
      if(retval ! = AE_NOMORE) { te->when = now + retval *1000;
      } else {
        // If time out, mark it for deletionte->id = AE_DELETED_EVENT_ID; }}// Execute the next one
    te = te->next;
  }
  return processed;
}
Copy the code

conclusion

  1. Advantages: Simple implementation
  2. Disadvantages: If there are many scheduled tasks, the efficiency is low.