preface

In this article, we will start to analyze the source code of the consumer client process, starting from the PartitionAssignor, to see how consumers are allocated partitions

The body of the

KafKa, a “subject” there can be multiple “partition”, a “partition” can only be a “consumer” consumption, one or more of the consumer to form a “consumer group”, when multiple consumers to subscribe to a topic that needs to be topic partition as far as possible inside the average assigned to each customer, such as nine partitions and three consumers, So a consumer can be divided into 3 zones.

Partitioning is determined by the consumer client, not the Kafka server. When consuming messages, we can customize which partition to consume, or we can use built-in policies to allocate the messages. Here are three different consumption policies

Abstract methods – AbstractPartitionAssignor

@Override
    public GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription) {
        Map<String, Subscription> subscriptions = groupSubscription.groupSubscription();
        Set<String> allSubscribedTopics = new HashSet<>();
        // Get all topics
        for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet())
            allSubscribedTopics.addAll(subscriptionEntry.getValue().topics());

        // Get the number of partitions for topic
        Map<String, Integer> partitionsPerTopic = new HashMap<>();
        for (String topic : allSubscribedTopics) {
            Integer numPartitions = metadata.partitionCountForTopic(topic);
            if(numPartitions ! =null && numPartitions > 0)
                partitionsPerTopic.put(topic, numPartitions);
            else
                log.debug("Skipping assignment for topic {} since no metadata is available", topic);
        }

        // Get the group of consumers into the partition set
        Map<String, List<TopicPartition>> rawAssignments = assign(partitionsPerTopic, subscriptions);

        // this class maintains no user data, so just wrap the results
        Map<String, Assignment> assignments = new HashMap<>();
        for (Map.Entry<String, List<TopicPartition>> assignmentEntry : rawAssignments.entrySet())
            assignments.put(assignmentEntry.getKey(), new Assignment(assignmentEntry.getValue()));
        return new GroupAssignment(assignments);
    }
Copy the code

The abstract approach is to convert metadata and subscription data into a topic-number of partitions, consumer-subscription topic mapping, from which you can get which topics a consumer subscribed to, as well as the number of partitions for the topic, and hand it to subclasses for implementation

RangeAssignor

The partition method is [number of partitions/number of consumers] to obtain the minimum allocation unit, and the extra partitions are allocated in the order of consumers

For example, in the picture above: partition: 7 / consumer: 3 = 2, 1 extra partition will be allocated to “consumer 1”. If there are 2 extra partitions, the second extra partition will be allocated to “consumer 2”

 @Override
    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                    Map<String, Subscription> subscriptions) {
        Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);

        Map<String, List<TopicPartition>> assignment = new HashMap<>();
        // Initialize the Map
        for (String memberId : subscriptions.keySet())
            assignment.put(memberId, new ArrayList<>());

        for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
            String topic = topicEntry.getKey();
            List<MemberInfo> consumersForTopic = topicEntry.getValue();

            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
            if (numPartitionsForTopic == null)
                continue;

            / / sorting
            Collections.sort(consumersForTopic);

            // How many topics can each consumer get
            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
            // How many topics are left
            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();

            List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
            // Walk through the consumers and then partition
            for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
                int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
                int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); assignment.get(consumersForTopic.get(i).memberId).addAll(partitions.subList(start, start + length)); }}return assignment;
    }
Copy the code

Source code is very simple, sort consumers, and then divide and mod to calculate

RoundRobinAssignor

The method is to use the round-robin way to allocate partitions, the principle is also very simple

@Override
    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                    Map<String, Subscription> subscriptions) {
        Map<String, List<TopicPartition>> assignment = new HashMap<>();
        List<MemberInfo> memberInfoList = new ArrayList<>();
        for (Map.Entry<String, Subscription> memberSubscription : subscriptions.entrySet()) {
            assignment.put(memberSubscription.getKey(), new ArrayList<>());
            memberInfoList.add(new MemberInfo(memberSubscription.getKey(),
                                              memberSubscription.getValue().groupInstanceId()));
        }
        // Use a circular queue loop to allocate consumers
        CircularIterator<MemberInfo> assigner = new CircularIterator<>(Utils.sorted(memberInfoList));

        for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
            final String topic = partition.topic();
            while(! subscriptions.get(assigner.peek().memberId).topics().contains(topic)) assigner.next(); assignment.get(assigner.next().memberId).add(partition); }return assignment;
    }
    private List<TopicPartition> allPartitionsSorted(Map
       
         partitionsPerTopic, Map
        
          subscriptions)
        ,>
       ,> {
        // Sort the partitions
        SortedSet<String> topics = new TreeSet<>();
        for (Subscription subscription : subscriptions.values())
            topics.addAll(subscription.topics());

        // Create TopicPartition objects in sequence
        List<TopicPartition> allPartitions = new ArrayList<>();
        for (String topic : topics) {
            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
            if(numPartitionsForTopic ! =null)
                allPartitions.addAll(AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic));
        }
        return allPartitions;
    }
Copy the code

The core is to loop the consumer through the iterator of the loop, allocating partitions for each iteration until all partitions have been allocated

StickyAssignor

StickyAssignor is a sticky partition. For RangeAssignor and RoundRobinAssignor, these two partitions are fair, ensuring that each assignment is as fair as possible, but not taking into account the last assignment. Sticky partition method is to save the last allocation, according to the last allocation to carry out fair allocation.

The partitions are still as shown in the picture: consumer 1:1, 2, 3 Consumer 2:4, 5 consumer 3:6, 7

If consumer 1 is offline at this time, the RangeAssignor assignment is:

Consumers 2:1, 2,3,4 consumers 3:5, 6,7

If you follow StickyAssignor, you take into account the last assignment:

Consumers 2:4, 5,1,2 consumers 3:6, 7,3

Make a fair distribution while maintaining the original distribution as much as possible

@Override
    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                    Map<String, Subscription> subscriptions) {
        partitionMovements = new PartitionMovements();
        Map<String, List<TopicPartition>> consumerToOwnedPartitions = new HashMap<>();
        // This is the case where all the consumers in the current consumer group subscribe to the same topic
        if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, consumerToOwnedPartitions)) {
            log.debug("Detected that all consumers were subscribed to same set of topics, invoking the "
                          + "optimized assignment algorithm");
            partitionsTransferringOwnership = new HashMap<>();
            return constrainedAssign(partitionsPerTopic, consumerToOwnedPartitions);
        } else {
            log.debug("Detected that all not consumers were subscribed to same set of topics, falling back to the "
                          + "general case assignment algorithm");
            partitionsTransferringOwnership = null;
            returngeneralAssign(partitionsPerTopic, subscriptions); }}Copy the code

Since all sorts of actions trigger reassignment: adding/unsubscribing topics, logging consumers offline, and changing the number of topic partitions, StickyAssignor divides reassignment into two scenarios:

  • Consumers in the consumer group subscribe to the same list of topics
  • Subscribing to topics is confusing

Case one: all subscribe to the same topic

/** * Returns true if all consumers have the same subscription. And with each user's previously owned and still subscribe to partition to fill the incoming consumerToOwnedPartitions * /
    private boolean allSubscriptionsEqual(Set
       
         allTopics, Map
        
          subscriptions, Map
         
          > consumerToOwnedPartitions)
         ,>
        ,>
        {
        Set<String> membersWithOldGeneration = new HashSet<>();
        Set<String> membersOfCurrentHighestGeneration = new HashSet<>();
        int maxGeneration = DEFAULT_GENERATION;

        Set<String> subscribedTopics = new HashSet<>();

        for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
            String consumer = subscriptionEntry.getKey();
            Subscription subscription = subscriptionEntry.getValue();
            
            if (subscribedTopics.isEmpty()) {
                subscribedTopics.addAll(subscription.topics());
                // Is used to determine whether consumers are all subscribed to the same topic
            } else if(! (subscription.topics().size() == subscribedTopics.size() && subscribedTopics.containsAll(subscription.topics()))) {return false;
            }

            // Get the last partition information previous_assignment
            MemberData memberData = memberData(subscription);

            List<TopicPartition> ownedPartitions = new ArrayList<>();
            consumerToOwnedPartitions.put(consumer, ownedPartitions);

            if(memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration || ! memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) {// If the current member is older, all previously owned partitions are invalid
                if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
                    membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
                    membersOfCurrentHighestGeneration.clear();
                    maxGeneration = memberData.generation.get();
                }

                membersOfCurrentHighestGeneration.add(consumer);
                for (final TopicPartition tp : memberData.partitions) {
                    // Filter out invalid topics
                    if(allTopics.contains(tp.topic())) { ownedPartitions.add(tp); }}}}for (String consumer : membersWithOldGeneration) {
            consumerToOwnedPartitions.get(consumer).clear();
        }
        return true;
    }
Copy the code

Iterate over consumer subscriptions to determine whether they are all subscribed to the same topic; ConstrainedAssign = constrainedAssign if you subscribe to the same topic, compare the last assignment with the current one, and delete those that are invalid. Therefore, we obtain a valid, but unfair, assignment

Sort objects by subject by partition if subject is the same
        SortedSet<TopicPartition> unassignedPartitions = getTopicPartitions(partitionsPerTopic);

        Set<TopicPartition> allRevokedPartitions = new HashSet<>();

        // Members who failed to reach the standard
        List<String> unfilledMembers = new LinkedList<>();
        // Maximum number of members
        Queue<String> maxCapacityMembers = new LinkedList<>();
        // Minimum number of members
        Queue<String> minCapacityMembers = new LinkedList<>();

        int numberOfConsumers = consumerToOwnedPartitions.size();
        // minQuota and maxQuota differ by 1
        int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers);
        int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers);
// Initialize the map with minQuota
        Map<String, List<TopicPartition>> assignment = new HashMap<>(
            consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota))));
Copy the code

Since all consumers subscribe to the same topic, theoretically the minimum that each consumer can allocate is [number of partitions/number of consumers], and the maximum is [minimum +1].

for (Map.Entry<String, List<TopicPartition>> consumerEntry : consumerToOwnedPartitions.entrySet()) {
            String consumer = consumerEntry.getKey();
            List<TopicPartition> ownedPartitions = consumerEntry.getValue();

            List<TopicPartition> consumerAssignment = assignment.get(consumer);
            int i = 0;
            // Allocate maxQuota beyond this amount to the undo set
            for (TopicPartition tp : ownedPartitions) {
                if (i < maxQuota) {
                    consumerAssignment.add(tp);
                    unassignedPartitions.remove(tp);
                } else {
                    allRevokedPartitions.add(tp);
                }
                ++i;
            }

            // Indicates that the consumer does not reach the average level
            if (ownedPartitions.size() < minQuota) {
                unfilledMembers.add(consumer);
            } else {
                // Add to the corresponding collection
                if (consumerAssignment.size() == minQuota)
                    minCapacityMembers.add(consumer);
                if(consumerAssignment.size() == maxQuota) maxCapacityMembers.add(consumer); }}Copy the code

Step 2: Make statistics on the last allocation, mainly to verify whether the last allocation reached the maximum value allowed this time. If it did, delete the excess, and then put the consumers into the corresponding set

After this step we have four sets:

  • UnfilledMembers: Consumers that have not reached the minimum value, focus on this and need to reach the minQuota for the number of partitions for the consumers in it
  • MinCapacityMembers: customers who happen to be at minQuota, indicating that the quota has been saturated and allocated, do not care about it
  • MaxCapacityMembers: The maximum allowed members have been reached and can be removed if necessary
  • UnassignedPartitions: Partitions that have not yet been allocated and are primarily allocated to consumers of unfilledMembers
Collections.sort(unfilledMembers);
        Iterator<TopicPartition> unassignedPartitionsIter = unassignedPartitions.iterator();

        // Assign partitions to those consumers who do not reach the average partition level
        while(! unfilledMembers.isEmpty() && ! unassignedPartitions.isEmpty()) { Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();while (unfilledConsumerIter.hasNext()) {
                String consumer = unfilledConsumerIter.next();
                List<TopicPartition> consumerAssignment = assignment.get(consumer);

                if (unassignedPartitionsIter.hasNext()) {
                    TopicPartition tp = unassignedPartitionsIter.next();
                    consumerAssignment.add(tp);
                    unassignedPartitionsIter.remove();
                    // If the partition is in the undo set, it is transferred from another consumer
                    if (allRevokedPartitions.contains(tp))
                        partitionsTransferringOwnership.put(tp, consumer);
                } else {
                    break;
                }

                if(consumerAssignment.size() == minQuota) { minCapacityMembers.add(consumer); unfilledConsumerIter.remove(); }}}Copy the code

Step 3: Take out the partition from the set of “unallocated partition” and allocate it to the consumers who “did not reach the minimum allocation number”, and allocate all the unallocated nodes as far as possible

// After the remaining theme partitions are allocated, if there are any consumers that do not meet the minimum standard, they need to be removed from maxQuota
        for (String consumer : unfilledMembers) {
            List<TopicPartition> consumerAssignment = assignment.get(consumer);
            int remainingCapacity = minQuota - consumerAssignment.size();
            while (remainingCapacity > 0) {
                String overloadedConsumer = maxCapacityMembers.poll();
                // This situation shows that there are too many consumers with too few partitions and some consumers cannot divide enough
                if (overloadedConsumer == null) {
                    throw new IllegalStateException("Some consumers are under capacity but all partitions have been assigned");
                }
                TopicPartition swappedPartition = assignment.get(overloadedConsumer).remove(0);
                consumerAssignment.add(swappedPartition);
                --remainingCapacity;
                partitionsTransferringOwnership.put(swappedPartition, consumer);
            }
            minCapacityMembers.add(consumer);
        }
Copy the code

Step 4: After Step 3, if there are still some consumers who have not reached the Minimum allocation, remove one from the maxCapacityMembers list to transfer

// If there are still subject areas that have not been allocated to consumers, they are divided in order of consumers
        for (TopicPartition unassignedPartition : unassignedPartitions) {
            String underCapacityConsumer = minCapacityMembers.poll();
            if (underCapacityConsumer == null) {
                throw new IllegalStateException("Some partitions are unassigned but all consumers are at maximum capacity");
            }
            // We can skip the bookkeeping of unassignedPartitions and maxCapacityMembers here since we are at the end
            assignment.get(underCapacityConsumer).add(unassignedPartition);

            if (allRevokedPartitions.contains(unassignedPartition))
                partitionsTransferringOwnership.put(unassignedPartition, underCapacityConsumer);
        }

return assignment;
Copy the code

Step 5: After the above steps, all the consumers have exceeded the minimum capacity, but some are not allocated and are allocated sequentially to nodes that reach the minimum capacity.

++ (With the flow above, it feels like step 5 is actually impossible to get into) ++

Summary: Since the subscribed topics are consistent, it can be allocated mindlessly. The basic logic is to adjust according to the last allocation

The second case

private Map<String, List<TopicPartition>> generalAssign(Map<String, Integer> partitionsPerTopic,
                                                            Map<String, Subscription> subscriptions) {
        // Get the last partition assignment for the current consumer.
        Map<String, List<TopicPartition>> currentAssignment = new HashMap<>();
        // Get which consumer a topic partition was last given to
        Map<TopicPartition, ConsumerGenerationPair> prevAssignment = new HashMap<>();

        prepopulateCurrentAssignments(subscriptions, currentAssignment, prevAssignment);
Copy the code

This method is also get the last distribution, initially prepopulateCurrentAssignments watch, the same

Note which consumers the current partition can be assigned to because the subscribed topics are different
        final Map<TopicPartition, List<String>> partition2AllPotentialConsumers = new HashMap<>();
        // Records the list of partitions that consumers can be assigned
        final Map<String, List<TopicPartition>> consumer2AllPotentialPartitions = new HashMap<>();

        / / initialize partition2AllPotentialConsumers
        for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
            for (int i = 0; i < entry.getValue(); ++i)
                partition2AllPotentialConsumers.put(new TopicPartition(entry.getKey(), i), new ArrayList<>());
        }

        for (Entry<String, Subscription> entry: subscriptions.entrySet()) {
            String consumerId = entry.getKey();
            consumer2AllPotentialPartitions.put(consumerId, newArrayList<>()); entry.getValue().topics().stream().filter(topic -> partitionsPerTopic.get(topic) ! =null).forEach(topic -> {
                for (int i = 0; i < partitionsPerTopic.get(topic); ++i) {
                    TopicPartition topicPartition = newTopicPartition(topic, i); consumer2AllPotentialPartitions.get(consumerId).add(topicPartition); partition2AllPotentialConsumers.get(topicPartition).add(consumerId); }});// Apply to the case of new consumers put new consumers
            if(! currentAssignment.containsKey(consumerId)) currentAssignment.put(consumerId,new ArrayList<>());
        }
Copy the code

Since each consumer subscribes to a different topic, two maps are resolved based on the current subscription information and partition information:

  • The topic partition can be assigned to which consumers: partition2AllPotentialConsumers
  • Which subject to consumers can be allocated: consumer2AllPotentialPartitions

I’ve also added a consumer, and I’m going to add a consumer to the currentAssignment, which is the last assignment, and this assignment is going to be modified from the last assignment

// Map the last allocation to a partition-consumer
        Map<TopicPartition, String> currentPartitionConsumer = new HashMap<>();
        for (Map.Entry<String, List<TopicPartition>> entry: currentAssignment.entrySet())
            for (TopicPartition topicPartition: entry.getValue())
                currentPartitionConsumer.put(topicPartition, entry.getKey());

        List<TopicPartition> sortedPartitions = sortPartitions(partition2AllPotentialConsumers);
Copy the code

Map the last partition, and get the current “subject-partition” that can be allocated, sort

// Start with the sorted partition
        List<TopicPartition> unassignedPartitions = new ArrayList<>(sortedPartitions);
        boolean revocationRequired = false;
        for (Iterator<Entry<String, List<TopicPartition>>> it = currentAssignment.entrySet().iterator(); it.hasNext();) {
            Map.Entry<String, List<TopicPartition>> entry = it.next();
            Delete it if the consumer is offline from the consumer group
            if(! subscriptions.containsKey(entry.getKey())) {for (TopicPartition topicPartition: entry.getValue())
                    currentPartitionConsumer.remove(topicPartition);
                it.remove();
            } else {
                for (Iterator<TopicPartition> partitionIter = entry.getValue().iterator(); partitionIter.hasNext();) {
                    TopicPartition partition = partitionIter.next();
                    // If the theme is adjusted it does not exist remove it
                    if(! partition2AllPotentialConsumers.containsKey(partition)) { partitionIter.remove(); currentPartitionConsumer.remove(partition);// If the current consumer does not subscribe to the topic, delete it
                    } else if(! subscriptions.get(entry.getKey()).topics().contains(partition.topic())) { partitionIter.remove(); revocationRequired =true;
                    } elseunassignedPartitions.remove(partition); }}}Copy the code

Then modify the last allocation information, maintain the original allocation on the basis of the deletion of those invalid themes or partitions

The above is a pre-allocation, pre-allocation on the basis of the previous case, and the first case is the same step, the difference is that the subscription topic is different, need to convert to get more information

// Sort by the number allocated
        TreeSet<String> sortedCurrentSubscriptions = new TreeSet<>(new SubscriptionComparator(currentAssignment));
        sortedCurrentSubscriptions.addAll(currentAssignment.keySet());

        balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions,
            consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer, revocationRequired);
        return currentAssignment;
Copy the code

Sort the consumers by the number of partitions allocated, enter the balance method, and perform the balancing operation

private void balance(Map<String, List<TopicPartition>> currentAssignment,
                         Map<TopicPartition, ConsumerGenerationPair> prevAssignment,
                         List<TopicPartition> sortedPartitions,
                         List<TopicPartition> unassignedPartitions,
                         TreeSet<String> sortedCurrentSubscriptions,
                         Map<String, List<TopicPartition>> consumer2AllPotentialPartitions,
                         Map<TopicPartition, List<String>> partition2AllPotentialConsumers,
                         Map<TopicPartition, String> currentPartitionConsumer,
                         boolean revocationRequired) {
        boolean initializing = currentAssignment.get(sortedCurrentSubscriptions.last()).isEmpty();
        boolean reassignmentPerformed = false;

        // Traverses unallocated partitions
        for (TopicPartition partition: unassignedPartitions) {
            if (partition2AllPotentialConsumers.get(partition).isEmpty())
                continue;
            // Since we sorted by the number of consumers assigned, we iterate over the sorted set to prioritize small consumers
            // All that remains is to determine whether the partition can be allocated
            assignPartition(partition, sortedCurrentSubscriptions, currentAssignment,
                consumer2AllPotentialPartitions, currentPartitionConsumer);
        }


private void assignPartition(TopicPartition partition, TreeSet
       
         sortedCurrentSubscriptions, Map
        
         > currentAssignment, Map
         
          > consumer2AllPotentialPartitions, Map
          
            currentPartitionConsumer)
          ,>
         ,>
        ,>
        {
        for (String consumer: sortedCurrentSubscriptions) {
            if (consumer2AllPotentialPartitions.get(consumer).contains(partition)) {
                sortedCurrentSubscriptions.remove(consumer);
                currentAssignment.get(consumer).add(partition);
                currentPartitionConsumer.put(partition, consumer);
                sortedCurrentSubscriptions.add(consumer);
                break; }}}Copy the code

Initializing retrials the allocation of the consumer with the maximum number of “allocated partitions” sorted. If it is empty, it indicates that there is no current allocation (the last allocation has nothing to do with the current one, topic or consumer has completely changed). If this is the case, it indicates that the allocation cannot be rolled back in the future

ReassignmentPerformed indicates whether a fair operation has occurred

++ First allocate all partitions that have not been allocated, regardless of balance ++

// So far it has been allocated but it may be unbalanced
        // This step is to calculate which partitions that may have multiple consumers will trigger redistribution
        Set<TopicPartition> fixedPartitions = new HashSet<>();
        for (TopicPartition partition: partition2AllPotentialConsumers.keySet())
            // Filter out partitions with only one consumer that cannot be reassigned
            if(! canParticipateInReassignment(partition, partition2AllPotentialConsumers)) fixedPartitions.add(partition); sortedPartitions.removeAll(fixedPartitions); unassignedPartitions.removeAll(fixedPartitions); Map<String, List<TopicPartition>> preBalanceAssignment = deepCopy(currentAssignment); Map<TopicPartition, String> preBalancePartitionConsumers =new HashMap<>(currentPartitionConsumer);

// Can be redistributed, partition can be divided among more than 2 consumers
private boolean canParticipateInReassignment(TopicPartition partition, Map
       
        > partition2AllPotentialConsumers)
       ,> {
        return partition2AllPotentialConsumers.get(partition).size() >= 2;
    }
Copy the code

The next step is to narrow down the “partitions” that can be redistributed fairly, filtering out those cases where only one consumer can be assigned a partition, for example, if only one consumer subscribes to the topic, redistribution cannot be performed

Map<String, List<TopicPartition>> fixedAssignments = new HashMap<>();
        for (String consumer: consumer2AllPotentialPartitions.keySet())
            if(! canParticipateInReassignment(consumer, currentAssignment, consumer2AllPotentialPartitions, partition2AllPotentialConsumers)) { sortedCurrentSubscriptions.remove(consumer); fixedAssignments.put(consumer, currentAssignment.remove(consumer)); }private boolean canParticipateInReassignment(String consumer, Map
       
        > currentAssignment, Map
        
         > consumer2AllPotentialPartitions, Map
         
          > partition2AllPotentialConsumers)
         ,>
        ,>
       ,> {
        List<TopicPartition> currentPartitions = currentAssignment.get(consumer);
        int currentAssignmentSize = currentPartitions.size();
        int maxAssignmentSize = consumer2AllPotentialPartitions.get(consumer).size();
        if (currentAssignmentSize > maxAssignmentSize)
            log.error("The consumer {} is assigned more partitions than the maximum possible.", consumer);

        // Trigger the reallocation condition
        if (currentAssignmentSize < maxAssignmentSize)
            return true;

        // All partitions for a topic have been allocated to a consumer
        // We need to determine if more than one consumer subscribed to the topic triggers the reallocation operation
        for (TopicPartition partition: currentPartitions)
            if (canParticipateInReassignment(partition, partition2AllPotentialConsumers))
                return true;

        return false;
    }
Copy the code

This step is to filter the consumers that can perform reallocation, under the condition that the number of the consumption currently allocated is < the maximum number that can be allocated, and if it is equal, determine whether the partition can be allocated to multiple consumers according to the previous step

It’s tricky here, but the idea is to get rid of a topic that only one consumer subscribes to

Map<String, List<TopicPartition>> preBalanceAssignment = deepCopy(currentAssignment);
        Map<TopicPartition, String> preBalancePartitionConsumers = new HashMap<>(currentPartitionConsumer);

        // Some topics use unassignedPartitions as arguments if not unsubscribed
        if(! revocationRequired) { performReassignments(unassignedPartitions, currentAssignment, prevAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer); }// Perform the redistribution of all partitions
        reassignmentPerformed = performReassignments(sortedPartitions, currentAssignment, prevAssignment, sortedCurrentSubscriptions,
                   consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer);
Copy the code

The difference is that the first parameter of performReassignments is different. Only after reading the source code of this method do you know why

private boolean performReassignments(List
       
         reassignablePartitions, Map
        
         > currentAssignment, Map
         
           prevAssignment, TreeSet
          
            sortedCurrentSubscriptions, Map
           
            > consumer2AllPotentialPartitions, Map
            
             > partition2AllPotentialConsumers, Map
             
               currentPartitionConsumer)
             ,>
            ,>
           ,>
          
         ,>
        ,>
        {
        boolean reassignmentPerformed = false;
        boolean modified;
        
        do {
            modified = false;
            Iterator<TopicPartition> partitionIterator = reassignablePartitions.iterator();
            while(partitionIterator.hasNext() && ! isBalanced(currentAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions)) { TopicPartition partition = partitionIterator.next();// Partitions must have two consumers before they can be reassigned
                if (partition2AllPotentialConsumers.get(partition).size() <= 1)
                    log.error("Expected more than one potential consumer for partition '{}'", partition);

                // Partitions must have consumers
                String consumer = currentPartitionConsumer.get(partition);
                if (consumer == null)
                    log.error("Expected partition '{}' to be assigned to a consumer", partition);

                // This partition was allocated in the previous generation, but this generation allocated more than the previous generation triggered redistribution
                if (prevAssignment.containsKey(partition) &&
                    currentAssignment.get(consumer).size() > currentAssignment.get(prevAssignment.get(partition).consumer).size() + 1) {
                    reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, prevAssignment.get(partition).consumer);
                    reassignmentPerformed = true;
                    modified = true;
                    continue;
                }

                // Because a partition can be assigned to multiple consumers, the number of allocated consumers is larger than other consumers and can be transferred
                for (String otherConsumer: partition2AllPotentialConsumers.get(partition)) {
                    if (currentAssignment.get(consumer).size() > currentAssignment.get(otherConsumer).size() + 1) {
                        reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, consumer2AllPotentialPartitions);
                        reassignmentPerformed = true;
                        modified = true;
                        break; }}}}while (modified);

        return reassignmentPerformed;
    }
Copy the code

You need to traverse the subject-partition to see if it’s balanced, and how do you need to move?

For example, currently we have allocated “zone 1” to “consumer 1”, and this zone can be allocated to “consumer 1” and “Consumer 2”. If the number of “consumer 1” allocated is greater than the number of “consumer 2” +1, it means that this zone can be moved to “consumer 2”.

private boolean isBalanced(Map
       
        > currentAssignment, TreeSet
        
          sortedCurrentSubscriptions, Map
         
          > allSubscriptions)
         ,>
        
       ,> {
        // The difference between the maximum quantity and the minimum quantity allocated according to the sorting result is 1, indicating that all consumers are balanced
        // But this is the ideal case and is unlikely for multiple topics
        int min = currentAssignment.get(sortedCurrentSubscriptions.first()).size();
        int max = currentAssignment.get(sortedCurrentSubscriptions.last()).size();
        if (min >= max - 1)
            return true;

        // Map the currently partitioned condition
        final Map<TopicPartition, String> allPartitions = new HashMap<>();
        Set<Entry<String, List<TopicPartition>>> assignments = currentAssignment.entrySet();
        for (Map.Entry<String, List<TopicPartition>> entry: assignments) {
            List<TopicPartition> topicPartitions = entry.getValue();
            for (TopicPartition topicPartition: topicPartitions) {
                if (allPartitions.containsKey(topicPartition))
                    log.error("{} is assigned to more than one consumer.", topicPartition); allPartitions.put(topicPartition, entry.getKey()); }}for (String consumer: sortedCurrentSubscriptions) {
            List<TopicPartition> consumerPartitions = currentAssignment.get(consumer);
            int consumerPartitionCount = consumerPartitions.size();

            // Skip if the current consumer is allocated all partitions it can own
            if (consumerPartitionCount == allSubscriptions.get(consumer).size())
                continue;

            // Traverses all subject partitions that can be assigned to the current consumer to verify that they are partitioned
            // If it is not partitioned, it means that it is allocated by other consumers but its number is less than that of other consumers
            // The partition can be added to the partition
            List<TopicPartition> potentialTopicPartitions = allSubscriptions.get(consumer);
            for (TopicPartition topicPartition: potentialTopicPartitions) {
                if(! currentAssignment.get(consumer).contains(topicPartition)) { String otherConsumer = allPartitions.get(topicPartition);int otherConsumerPartitionCount = currentAssignment.get(otherConsumer).size();
                    if (consumerPartitionCount < otherConsumerPartitionCount) {
                        log.debug("{} can be moved from consumer {} to consumer {} for a more balanced assignment.",
                            topicPartition, otherConsumer, consumer);
                        return false; }}}}return true;
    }
Copy the code

Above is to determine whether a distribution balance method, for every consumer to traversal for current “assigned number”, and then iterate through the current distribution of consumers can be all partitions “, if the inside partition is assigned to the other customers, other customers the number of “assigned” and a lot of, that can come! So imbalance

// If an overassignment is triggered but the assignment is not as good as before
        if(! initializing && reassignmentPerformed && getBalanceScore(currentAssignment) >= getBalanceScore(preBalanceAssignment)) { deepCopy(preBalanceAssignment, currentAssignment); currentPartitionConsumer.clear(); currentPartitionConsumer.putAll(preBalancePartitionConsumers); }// Restore those allocations that cannot be changed
        for (Entry<String, List<TopicPartition>> entry: fixedAssignments.entrySet()) {
            String consumer = entry.getKey();
            currentAssignment.put(consumer, entry.getValue());
            sortedCurrentSubscriptions.add(consumer);
        }

        fixedAssignments.clear();
Copy the code

After a few SAO operation, the discovery is not as good as the original allocation, on the retreat

conclusion

The code, while primitive, is mostly a transformation mapping. The core idea is:

  • Get the last assignment, pruning with the current subscription topic, remove the outdated assignment, and the remaining valid assignment.
  • To balance this valid allocation, if the partition can be allocated to many consumers, it must be allocated to the fewest consumers currently; if not, it needs to be shifted

CooperativeStickyAssignor

There are two kinds of Kafka’s New Deal: COOPERATIVE and EAGER, described above three kinds of weight allocation strategy is EAGER, whereas CooperativeStickyAssignor belongs to the COOPERATIVE

If it’s a large cluster, hundreds or thousands of clusters, it’s going online and offline and subscribing all the time. So the EAGER will redistribute on a large scale every time. Although there is a sticky balancing strategy, it is still slow. The cooperative protocol changes a global rebalancing into a small rebalancing until the final convergence of the balance process

@Override
    protected MemberData memberData(Subscription subscription) {
        return new MemberData(subscription.ownedPartitions(), Optional.empty());
    }

    @Override
    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                    Map<String, Subscription> subscriptions) {
        Map<String, List<TopicPartition>> assignments = super.assign(partitionsPerTopic, subscriptions);

        Map<TopicPartition, String> partitionsTransferringOwnership = super.partitionsTransferringOwnership == null ?
            computePartitionsTransferringOwnership(subscriptions, assignments) :
            super.partitionsTransferringOwnership;

        adjustAssignment(assignments, partitionsTransferringOwnership);
        return assignments;
    }
Copy the code

When a redistribution occurs, each consumer emits its own allocation information to save as ownedPartitions, and this memberData method retrieves ownedPartitions

(here is very strange, why is StickyAssignor by parsing binary data to get the last assignment information, while CooperativeStickyAssignor directly sent by consumers allocate partitions?)

AbstractStickyAssignor AbstractStickyAssignor AbstractStickyAssignor AbstractStickyAssignor AbstractStickyAssignor

 private void adjustAssignment(Map
       
        > assignments, Map
        
          partitionsTransferringOwnership)
        ,>
       ,> {
        for(Map.Entry<TopicPartition, String> partitionEntry : partitionsTransferringOwnership.entrySet()) { assignments.get(partitionEntry.getValue()).remove(partitionEntry.getKey()); }}Copy the code

Delete the partition assigned to…

# summary

Allocation process is more than four partitions, for the final a CooperativeStickyAssignor, still need to combine the service side can see that the pit left behind to solve