1. An overview of the

This article focuses on the slot module of CODIS. It includes four stages: slot allocation, preparation for migration, execution of migration, and completion of migration, and how to process reads and writes of the corresponding slot during data migration. The article may be long.

2. Implementation mechanism

Slot management is completed by TopOM. When redis-server nodes are scaled up, Slots are allocated to redis-server and migration plans are formulated. Topom periodically processes the migration plans and executes migration logic.

3. Source code analysis

Slot allocation and migration planning

How to evenly allocate slots to server nodes after capacity expansion or reduction. Once redistribution is involved, data migration of slots is required. So slot migration should be minimized. Make a logical migration plan.

First, slot allocation. Codis provides automatic allocation and manual allocation. Automatic allocation is an equal allocation between the current slot and the server. Manual allocation assigns some slots to the target server.

SlotsRebalance (Automatic balance)

This method is to rebalance slot rebalance. For example, 1024 slots. If two servers are evenly allocated, each server should be loaded with 512 slots. If two new servers are added and each handles an average of 256 slots, slot migration is required. It’s a long method, so let’s do it separately.

1. The entire method execution needs the following data.

  • Assigned: stores the number of slots assigned to the current group.
  • Pendings stores the set of slots that can be moved out of the current group (the excess of the average).
  • Moveout stores the number of slots that need to be moved out (in, if negative) of the current group
  • Docking stores all slot ids that can be migrated.
    var (
        assigned = make(map[int]int)
        pendings = make(map[int] []int)
        moveout  = make(map[int]int)
        docking  []int
    )
Copy the code

ActionNothing indicates that slot is not in the migration process. The rest is state in the migration process. The entire migration process revolves around these states.

const (
    ActionNothing   = ""
    ActionPending   = "pending"
    ActionPreparing = "preparing"
    ActionPrepared  = "prepared"
    ActionMigrating = "migrating"
    ActionFinished  = "finished"
    ActionSyncing   = "syncing"
)
Copy the code

2. Next, fill in the corresponding information. The first is to iterate ctx.slots:

If slot state is not models.ActionNothing, represents the target tragetId (groupId) assigned++ during migration

LowerBound is the number of slots that each group should be responsible for.

If the state is models.actionnothing, the slot’s GroupId is not 0 (indicating that the slot has been assigned to a group), and the corresponding number of GroupId assigned to the slot is less than lowerBound, By default, the current slot is assigned to a current group. Otherwise, add it to a map that can be migrated from the current group. (Pendings)

// Calculate the number of valid groups
var groupSize = func(gid int) int {
    return assigned[gid] + len(pendings[gid]) - moveout[gid]
}

for _, m := range ctx.slots {
    ifm.Action.State ! = models.ActionNothing { assigned[m.Action.TargetId]++ } }var lowerBound = MaxSlotNum / len(groupIds)

// don't migrate slot if groupSize < lowerBound
for _, m := range ctx.slots {
    ifm.Action.State ! = models.ActionNothing {continue
    }
    ifm.GroupId ! =0 {
        if groupSize(m.GroupId) < lowerBound {
            assigned[m.GroupId]++
        } else {
            pendings[m.GroupId] = append(pendings[m.GroupId], m.Id)
        }
    }
}
Copy the code

The groupSize method is used to calculate the number of slots allocated to the current group. Data in the map is adjusted during the allocation process. Therefore, the groupSize changes until the load is balanced. This calculation method is also very simple, in fact, has been confirmed to allocate + can be moved out – confirmed to move out the number.

3. Initialize a red-black tree and customize the comparator. The node on the left corresponds to a smaller groupsize than the node on the right

var tree = rbtree.NewWith(func(x, y interface{}) int {
    var gid1 = x.(int)
    var gid2 = y.(int)
    ifgid1 ! = gid2 {ifd := groupSize(gid1) - groupSize(gid2); d ! =0 {
            return d
        }
        return gid1 - gid2
    }
    return 0
})
for _, gid := range groupIds {
    tree.Put(gid, nil)}// assign offline slots to the smallest group
for _, m := range ctx.slots {
    ifm.Action.State ! = models.ActionNothing {continue
    }
    ifm.GroupId ! =0 {
        continue
    }
    dest := tree.Left().Key.(int)
    tree.Remove(dest)


    docking = append(docking, m.Id)
    moveout[dest]--


    tree.Put(dest, nil)}Copy the code

Iterates over all slots that are not in the migration process (group = 0 and unassigned) and joins Docker to allow slots to be assigned. Select a group with the smallest groupSize. Its moveout -. Moveout represents the number of groups that can be moved out, and there are not enough groups at the moment, so — represents the number of groups that can be moved in. The Docking slots will later be assigned to this group. Not now, of course.

This will actually change the size of groupsize. Basically, cover everything.

4. To process red-black trees, assign groups with more slots than the average to groups with less than the average. In fact, the whole process has been adjusted around the logical groupSize (as in the beginning method).

for tree.Size() >= 2 {
    from := tree.Right().Key.(int)
    tree.Remove(from)


    if len(pendings[from]) == moveout[from] {
        continue
    }
    dest := tree.Left().Key.(int)
    tree.Remove(dest)

    var (
        fromSize = groupSize(from)
        destSize = groupSize(dest)
    )
    if fromSize <= lowerBound {
        break
    }
    if destSize >= upperBound {
        break
    }
    if d := fromSize - destSize; d <= 1 {
        break
    }
    moveout[from]++
    moveout[dest]--


    tree.Put(from, nil)
    tree.Put(dest, nil)}Copy the code

If pendings equals moveOut number, the group has been balanced.

Otherwise I’m going to supplement the left with moveOut. Notice that we have upperBound and lowerBound, just to avoid loops. Because if the number of groups is greater than the number of slots, lowerBound is 0. There will be problems.

5. This logic mainly deals with groups that need to be moved out. Adds slots that can be moved out to Docking. These slots are provided from Pending.

for gid, n := range moveout {
    if n < 0 {
        continue
    }
    if n > 0 {
        sids := pendings[gid]
        sort.Sort(sort.Reverse(sort.IntSlice(sids)))


        docking = append(docking, sids[0:n]...)
        pendings[gid] = sids[n:]
    }
    delete(moveout, gid)
}
sort.Ints(docking)
Copy the code

6. Assign the docking slot to the desired group.

The slot’s m.action. TargetId group is executed. And update status to models.actionPending (waiting to be migrated).

Index indicates the migration sequence. The Index of the maximum slot to be migrated is +1

m.Action.State = models.ActionPending
m.Action.Index = ctx.maxSlotActionIndex() + 1
m.Action.TargetId = plans[sid]
Copy the code

After the migration, only the slot targetId is specified. That is to say, the slot is only planned for migration, but where to actually deal with it let’s look at the following method.

Perform the migration

ProcessSlotAction

This method is called through a coroutine loop when Topom is started. It iterates through all slots, migrates the slots to be processed, and updates the slot actions. Again, let’s look at this method separately.

1. Each cycle will have limitations:

  • The key of marks is groupid. If the current groupid already has slots that need to be processed, the slots belonging to that group will not be selected
  • Plans key is slotId. If slot is selected, it will not be selected again. That is, every time this method is executed, each slot will only be operated once. This means that a call can change the state at most once for a slot.

Accept and Update are primarily methods of controlling the above conditions. In the s.S lotActionPrepareFilter calls. S.S lotActionPrepareFilter screening slot is according to conditions.

3. The principle for selecting slots first is based on the slot’s action.index variable, which is incremented each time (slot is assigned at the time it is assigned).

  • If the number of plans (the number of slots selected in SlotActionPrepareFilter) exceeds the maximum parallel configuration (100).
  • If a slot is in the pending, perpaing, or Prepared state, break the slot
  • If no slot needs to be migrated in the cluster, it will be executed

This might be for non-pending, perpaing, or prepared slots. Batch Execution logic.

var (
    marks = make(map[int]bool)
    plans = make(map[int]bool))var accept = func(m *models.SlotMapping) bool {
    if marks[m.GroupId] || marks[m.Action.TargetId] {
        return false
    }
    if plans[m.Id] {
        return false
    }
    return true
}
var update = func(m *models.SlotMapping) bool {
    ifm.GroupId ! =0 {
        marks[m.GroupId] = true
    }
    marks[m.Action.TargetId] = true
    plans[m.Id] = true
    return true
}
var parallel = math2.MaxInt(1, s.config.MigrationParallelSlots)

for parallel > len(plans) {
    _, ok, err := s.SlotActionPrepareFilter(accept, update)
    iferr ! =nil {
        return err
    } else if! ok {break}}Copy the code

4. Process the selected slot. Don’t try so hard to see how to deal with, let’s look at s.S lotActionPrepareFilter method.

for sid, _ := range plans {
    fut.Add()
    go func(sid int) {
        log.Warnf("slot-[%d] process action", sid)
        var err = s.processSlotAction(sid)
        iferr ! =nil {
            status := fmt.Sprintf("[ERROR] Slot[%04d]: %s", sid, err)
            s.action.progress.status.Store(status)
        } else {
            s.action.progress.status.Store("")
        }
        fut.Done(strconv.Itoa(sid), err)
    }(sid)
}
Copy the code

SlotActionPrepareFilter

This method is mainly to select the slot, the selection conditions are described above. However, each time a slot is selected, the state will be switched.

Selection rules said above, so that piece of code will not look, mainly function nesting around, boring.

The following code is executed after slot is selected. It’s all about state handling.

switch m.Action.State {
case models.ActionPending:
    defer s.dirtySlotsCache(m.Id)
    m.Action.State = models.ActionPreparing
    iferr := s.storeUpdateSlotMapping(m); err ! =nil {
        return 0.false, err
    }
    fallthrough
case models.ActionPreparing:
    defer s.dirtySlotsCache(m.Id)
    log.Warnf("slot-[%d] resync to prepared", m.Id)
    m.Action.State = models.ActionPrepared
    iferr := s.resyncSlotMappings(ctx, m); err ! =nil {
        log.Warnf("slot-[%d] resync-rollback to preparing", m.Id)
        m.Action.State = models.ActionPreparing
        s.resyncSlotMappings(ctx, m)
        log.Warnf("slot-[%d] resync-rollback to preparing, done", m.Id)
        return 0.false, err
    }
    iferr := s.storeUpdateSlotMapping(m); err ! =nil {
        return 0.false, err
    }
    fallthrough
case models.ActionPrepared:
    defer s.dirtySlotsCache(m.Id)
    log.Warnf("slot-[%d] resync to migrating", m.Id)
    m.Action.State = models.ActionMigrating
    iferr := s.resyncSlotMappings(ctx, m); err ! =nil {
        log.Warnf("slot-[%d] resync to migrating failed", m.Id)
        return 0.false, err
    }
    iferr := s.storeUpdateSlotMapping(m); err ! =nil {
        return 0.false, err
    }
    fallthrough
case models.ActionMigrating:
    return m.Id, true.nil
case models.ActionFinished:

    return m.Id, true.nil
default:
    return 0.false, errors.Errorf("slot-[%d] action state is invalid", m.Id)
}
Copy the code

Switch the models.ActionPending state directly to Models. ActionPreparing.

Models. ActionPrepared-> Models. ActionPrepared and models.ActionPrepared-> Models. actionDB2 call the S.RESyncslotmappings method. Notifies proxy of slot status changes. The proxy does logical processing based on the state.

When executing, the status changes to models.actionMIGRATING and returns.

resyncSlotMappings

This will call the Proxy FillSlots method. As analyzed earlier, the proxy resets the information for the slot and creates a connection for it.

When called here, slot.MigrateFrom and slot.MigrateFromGroupId of the slot are assigned values, representing the slot to be migrated.

slot.BackendAddr = ctx.getGroupMaster(m.Action.TargetId)
slot.BackendAddrGroupId = m.Action.TargetId
slot.MigrateFrom = ctx.getGroupMaster(m.GroupId)
slot.MigrateFromGroupId = m.GroupId
Copy the code

The core here is to tell all proxies to establish a connection with Redis and wait for subsequent migration. The connection to the slot has changed.

Requests for the slot being migrated are sent wrapped in slotsmgrt-exec-wrapper command. After the said.

processSlotAction

Going back to the above logic, this method is called after slot is selected. This method essentially migrates data from each DB in the slot. Db is obtained through keyspace resolution of the info command.

There are two migration modes: synchronous and asynchronous

  • SLOTSMGRTTAGSLOT-ASYNC
  • SLOTSMGRTTAGSLOT Randomly migrates one key at a time

After the migration is complete, call SlotActionComplete to update the status to migration complete. And the resyncSlotMappings method is called to notify the Proxy that the data migration is complete.

The details of the migration are less studied. It’s essentially a transfer of key. But a key point is the atomicity of the migration. Codis performs shard migration for large keys.

Handling of client requests during migration

Synchronous migration

If the slot is being migrated, call d.slotsmgrt to help migrate the key and process the request for the key.

func (d *forwardSync) process(s *Slot, r *Request, hkey []byte) (*BackendConn, error) {
  // If the key is being migrated, check whether the migration is complete
  ifs.migrate.bc ! =nil && len(hkey) ! =0 {
    iferr := d.slotsmgrt(s, hkey, r.Database, r.Seed16()); err ! =nil {
      log.Debugf("slot-%04d migrate from = %s to %s failed: hash key = '%s', database = %d, error = %s",
        s.id, s.migrate.bc.Addr(), s.backend.bc.Addr(), hkey, r.Database, err)
      return nil, err
    }
  }
  r.Group = &s.refs
  r.Group.Add(1)
  return d.forward2(s, r), nil
}
Copy the code

Asynchronous transfer

func (d *forwardSemiAsync) process(s *Slot, r *Request, hkey []byte) (_ *BackendConn, retry bool, _ error) {
    if s.backend.bc == nil {
        log.Debugf("slot-%04d is not ready: hash key = '%s'",
            s.id, hkey)
        return nil.false, ErrSlotIsNotReady
    }
    ifs.migrate.bc ! =nil && len(hkey) ! =0 {
        resp, moved, err := d.slotsmgrtExecWrapper(s, hkey, r.Database, r.Seed16(), r.Multi)
        switch {
        caseerr ! =nil:
            log.Debugf("slot-%04d migrate from = %s to %s failed: hash key = '%s', error = %s",
                s.id, s.migrate.bc.Addr(), s.backend.bc.Addr(), hkey, err)
            return nil.false, err
        case! moved:switch {
            caseresp ! =nil:
                r.Resp = resp
                return nil.false.nil
            }
            return nil.true.nil
        }
    }
    r.Group = &s.refs
    r.Group.Add(1)
    return d.forward2(s, r), false.nil
}
Copy the code

Call d.slotsmgrtexecWrapper, wrapped in slotsmgrt-exec-wrapper, to request the target node to migrate.

SLOTSMGRT – EXEC – WRAPPER:

  • Return moved to true if key does not exist
  • Moved is false if key exists
    • If it is a write command, the system checks whether the key is being migrated. If the key is being migrated and the client is not allowed to block, an error message is returned and the proxy needs to retry.
    • If it is a read command, normal processing

If moved is true, the original node is requested.

3. Summary

The overall slot allocation is interesting. The first is the slot allocation algorithm and the implementation of migration. Slot migration relies on state machine implementation. Different things are done for different states. The main migration is still performed by Topom. However, when the migration starts, all proxies are synchronized to update the status of the current slot to being migrated. If the migration is taking place, the proxy will handle it specially to ensure data consistency.