background

Two-level coordinated scheduling architecture based on Kafka message queue

  1. Obtain their topic offset and other metadata information from the worker(consumer or connect) and hand it to Kafka’s broker to complete the Leader/Follower election
  2. The worker Leader node obtains partation and member information stored in Kafka to carry out secondary allocation and realize load balancing based on specific services

The first level is responsible for electing the Leader, while the second level is responsible for assigning tasks to each member of worker nodes

The main purpose is to learn the idea of this kind of architecture design, although this kind of scenario is very limited

Distributed coordination design based on message queue

Code implementation

The core design

MemoryQueue: a simulated message queue that distributes messages and acts as a Kafka broker Worker: a secondary algorithm for task execution and specific services A coordinator located inside a message queue for Leader/Follower election Join the cluster request GroupResponse: Response information

MemoryQueue

Core data structure

// MemoryQueue Memory message queue
type MemoryQueue struct {
	done             chan struct{}
	queue            chan interface{}
	wg               sync.WaitGroup
	coordinator      map[string]*Coordinator
	worker           map[string]*Worker
}
Copy the code

A coordinator identifies the coordinator of each Group and creates an allocator for each Group

The node joins the cluster

Event Distribution processing

func (mq *MemoryQueue) handleEvent(event interface{}) {
	switch event.(type) {
	case GroupRequest:
		request := event.(GroupRequest)
		mq.handleGroupRequest(&request)
	case Task:
		task := event.(Task)
		mq.handleTask(&task)
	default:
		mq.Notify(event)
	}
	mq.wg.Done()
}
Copy the code

Group Group request processing

GetGroupCoordinator Obtains the coordinator of the specified group
func (mq *MemoryQueue) getGroupCoordinator(group string) *Coordinator {
	coordinator, ok := mq.coordinator[group]
	if ok {
		return coordinator
	}
	coordinator = NewCoordinator(group)
	mq.coordinator[group] = coordinator
	return coordinator
}

func (mq *MemoryQueue) handleGroupRequest(request *GroupRequest) {
	coordinator := mq.getGroupCoordinator(request.Group)
	exist := coordinator.addMember(request.ID, &request.Metadata)
	// If the worker is already in the group, nothing is done
	if exist {
		return
	}
	// Rebuild the request information
	groupResponse := mq.buildGroupResponse(coordinator)
	mq.send(groupResponse)
}

func (mq *MemoryQueue) buildGroupResponse(coordinator *Coordinator) GroupResponse {
	return GroupResponse{
		Tasks:       coordinator.Tasks,
		Group:       coordinator.Group,
		Members:     coordinator.AllMembers(),
		LeaderID:    coordinator.getLeaderID(),
		Generation:  coordinator.Generation,
		Coordinator: coordinator,
	}
}
Copy the code

Coordinator

Core data structure

// Coordinator
type Coordinator struct {
	Group      string
	Generation int
	Members    map[string]*Metadata
	Tasks      []string
	Heartbeats map[string]int64
}
Copy the code

Coordinators use Members information to store metadata information of worker nodes. Tasks store all Tasks of the current group, Heartbeats store heartbeat information of workerD, and Generation is a Generation counter. Each node change increases

The Leader is elected by offset

The primary node is elected through the stored worker metadata information

// getLeaderID Retrieves the Leader node based on the current information
func (c *Coordinator) getLeaderID(a) string {
	leaderID, maxOffset := "".0
	// The size of the offset is used to determine the leader
	for wid, metadata := range c.Members {
		if leaderID == "" || metadata.offset() > maxOffset {
			leaderID = wid
			maxOffset = metadata.offset()
		}
	}
	return leaderID
}
Copy the code

Worker

Core data structure

// Worker
type Worker struct {
	ID          string
	Group       string
	Tasks       string
	done        chan struct{}
	queue       *MemoryQueue
	Coordinator *Coordinator
}
Copy the code

The worker node contains information about a coordinator, which can be used to send heartbeat information to the node

Distribute request message

The worker receives different event types and processes them according to the type. HandleGroupResponse is responsible for receiving the information from the Coordinator at the server end, which contains the leader node and task information. The worker performs the secondary assignment, and handleAssign processes the task information after the assignment

// Execute Receives assigned tasks and requests execution
func (w *Worker) Execute(event interface{}) {
	switch event.(type) {
	case GroupResponse:
		response := event.(GroupResponse)
		w.handleGroupResponse(&response)
	case Assignment:
		assign := event.(Assignment)
		w.handleAssign(&assign)
	}
}
Copy the code

GroupResponse performs subsequent business logic based on the role type

GroupResponse will divide nodes into two types: Leader and Follower. The Leader node needs to continue assigning tasks after receiving the GroupResponse, while the Follower only needs to monitor events and send heartbeat

func (w *Worker) handleGroupResponse(response *GroupResponse) {
	if w.isLeader(response.LeaderID) {
		w.onLeaderJoin(response)
	} else {
		w.onFollowerJoin(response)
	}
}
Copy the code

Followers node

The Follower node sends heartbeat messages

// onFollowerJoin The current role is follower
func (w *Worker) onFollowerJoin(response *GroupResponse) {
	w.Coordinator = response.Coordinator
	go w.heartbeat()
}
// heartbeat sends the heartbeat
func (w *Worker) heartbeat(a) {
	// timer := time.NewTimer(time.Second)
	// for {
	// select {
	// case <-timer.C:
	// w.Coordinator.heartbeat(w.ID, time.Now().Unix())
	// timer.Reset(time.Second)
	// case <-w.done:
	// return
	/ /}
	// }
}
Copy the code

The Leader node

// onLeaderJoin is currently the leader, performing task assignment and sending MQ
func (w *Worker) onLeaderJoin(response *GroupResponse) {
	fmt.Printf("Generation [%d] leaderID [%s]\n", response.Generation, w.ID)
	w.Coordinator = response.Coordinator
	go w.heartbeat()
	// Perform task sharding
	taskSlice := w.performAssign(response)

	// Assign tasks to each worker
	memerTasks, index := make(map[string] []string), 0
	for _, name := range response.Members {
		memerTasks[name] = taskSlice[index]
		index++
	}

	// Distribute the request
	assign := Assignment{LeaderID: w.ID, Generation: response.Generation, result: memerTasks}
	w.queue.send(assign)
}

// performAssign based on the number of current members and tasks
func (w *Worker) performAssign(response *GroupResponse)[] []string {

	perWorker := len(response.Tasks) / len(response.Members)
	leftOver := len(response.Tasks) - len(response.Members)*perWorker

	result := make([] []string.len(response.Members))

	taskIndex, memberTaskCount := 0.0
	for index := range result {
		if index < leftOver {
			memberTaskCount = perWorker + 1
		} else {
			memberTaskCount = perWorker
		}
		for i := 0; i < memberTaskCount; i++ {
			result[index] = append(result[index], response.Tasks[taskIndex])
			taskIndex++
		}
	}
	
Copy the code

The test data

Start a queue, then join the task and worker and observe the assignment result

	// Build the queue
	queue := NewMemoryQueue(10)
	queue.Start()

	// Send the task
	queue.send(Task{Name: "test1", Group: "test"})
	queue.send(Task{Name: "test2", Group: "test"})
	queue.send(Task{Name: "test3", Group: "test"})
	queue.send(Task{Name: "test4", Group: "test"})
	queue.send(Task{Name: "test5", Group: "test"})

	// Start the worker and assign different offsets to each worker to observe whether the leader can be assigned normally
	workerOne := NewWorker("test-1"."test", queue)
	workerOne.start(1)
	queue.addWorker(workerOne.ID, workerOne)

	workerTwo := NewWorker("test-2"."test", queue)
	workerTwo.start(2)
	queue.addWorker(workerTwo.ID, workerTwo)

	workerThree := NewWorker("test-3"."test", queue)
	workerThree.start(3) queue.addWorker(workerThree.ID, workerThree) time.Sleep(time.Second) workerThree.stop() time.Sleep(time.Second) workerTwo.stop() time.Sleep(time.Second)  workerOne.stop() queue.Stop()Copy the code

Running results: First, according to offset, finally test-3 leaders, then check the task allocation result, there are two nodes with two tasks, one task for each node, and then with the exit of worker, the task will be reassigned

Generation [1] leaderID [test-1]
Generation [2] leaderID [test-2]
Generation [3] leaderID [test-3]
Generation [1] worker [test-1]  run tasks: [test1 | |test2 | |test3 | |test4 | |test5]
Generation [1] worker [test-2]  run tasks: []
Generation [1] worker [test-3]  run tasks: []
Generation [2] worker [test-1]  run tasks: [test1 | |test2 | |test3]
Generation [2] worker [test-2]  run tasks: [test4 | |test5]
Generation [2] worker [test-3]  run tasks: []
Generation [3] worker [test-1]  run tasks: [test1 | |test2]
Generation [3] worker [test-2]  run tasks: [test3 | |test4]
Generation [3] worker [test-3]  run tasks: [test5]
Generation [4] leaderID [test-2]
Generation [4] worker [test-1]  run tasks: [test1 | |test2 | |test3]
Generation [4] worker [test-2]  run tasks: [test4 | |test5]
Generation [5] leaderID [test-1]
Generation [5] worker [test-1]  run tasks: [test1 | |test2 | |test3 | |test4 | |test5]
Copy the code

conclusion

In distributed scenarios, Leader/Follower elections are more likely to be based on the AP model consul, ETCD, zK, etc. The design in this paper is closely related to kafka’s own business scenarios. If there is time in the future, let’s continue to look at other designs. So much for the design borrowed from Kafka Connet

To continue to pay attention to the public number: cloth code farmers

www.sreguide.com