This paper introduces the basic concept of Flink, focusing on the optimization of Flink in high-performance cover. This chapter mainly introduces Flink’s memory model and memory management knowledge

Flink memory hierarchy

MemorySegment

Memory segment the most basic memory allocation unit in Flink. The size is 32KB. It can be either on-heap memory (Java byte allocation) or off-heap memory (Netty based DirectByteBuffer) and provides methods for reading and writing binary data.

Memory pages

A memory page is a data access view on top of a MemorySegment, with data reads abstracted as DataInputView and data writes abstracted as DataOutputView. You don’t need to worry about MemorySegment details, reading and writing across memorySegments is handled automatically.

The buffer

Task operators transfer data between the upper layer of the network, using buffer, application and release by Flink self-management, implementation class is NetworkBuffer. A NetworkBuffer wraps a MemorySegment.

Buffer resource pool

BufferPool is used to manage buffers, including Buffer application, release, destruction, available Buffer notification, etc. The implementation class is LocalBufferPool, each task has its own LocalBufferPool.

BufferPoolFactory is used to provide the creation and destruction of bufferPools. The only implementation class is NetworkBufferPool, and each TaskManager has only one NetworkBufferPool. Tasks of the same TaskManager share NetworkBufferPool, which is created and allocated when the TaskManager starts.

Memory manager

MemoryManager is used to manage the cache of Flink for sorting, hash tables, intermediate results, or memory replaced with state of memory outside the heap (RocksDB). After 1.10, memory management is mainly reflected in Slots

Flink Segment Application flow chart

Several important classes

MemoryManager.java
public class MemoryManager {
    public static final int DEFAULT_PAGE_SIZE = 32 * 1024;
    /** The minimal memory page size. Currently set to 4 KiBytes. */
    public static final int MIN_PAGE_SIZE = 4 * 1024;
    // ------------------------------------------------------------------------
    /** Memory segments allocated per memory owner. */
    private final Map<Object, Set<MemorySegment>> allocatedSegments;
    /** Reserved memory per memory owner. */
    private final Map<Object, Long> reservedMemory;
    private final long pageSize;
    private final long totalNumberOfPages;
    private final UnsafeMemoryBudget memoryBudget;
    private final SharedResources sharedResources;

	MemoryManager(long memorySize, int pageSize, int verifyEmptyWaitGcMaxSleeps) {
        sanityCheck(memorySize, pageSize);

        this.pageSize = pageSize;
        this.memoryBudget = new UnsafeMemoryBudget(memorySize, verifyEmptyWaitGcMaxSleeps);
        this.totalNumberOfPages = memorySize / pageSize;
        this.allocatedSegments = new ConcurrentHashMap<>();
        this.reservedMemory = new ConcurrentHashMap<>();
        this.sharedResources = new SharedResources();
        verifyIntTotalNumberOfPages(memorySize, totalNumberOfPages);

        LOG.debug(
                "Initialized MemoryManager with total memory size {} and page size {}.",
                memorySize,
                pageSize);
    }
    
    public List<MemorySegment> allocatePages(Object owner, int numPages)
            throws MemoryAllocationException {
        List<MemorySegment> segments = new ArrayList<>(numPages);
        allocatePages(owner, segments, numPages);
        return segments;
    }
    
    public void allocatePages(Object owner, Collection<MemorySegment> target, int numberOfPages)
            throws MemoryAllocationException {
        // sanity check
        Preconditions.checkNotNull(owner, "The memory owner must not be null.");
        Preconditions.checkState(!isShutDown, "Memory manager has been shut down.");
        Preconditions.checkArgument(
                numberOfPages <= totalNumberOfPages,
                "Cannot allocate more segments %d than the max number %d",
                numberOfPages,
                totalNumberOfPages);

        // reserve array space, if applicable
        if (target instanceof ArrayList) {
            ((ArrayList<MemorySegment>) target).ensureCapacity(numberOfPages);
        }

        long memoryToReserve = numberOfPages * pageSize;
        try {
            memoryBudget.reserveMemory(memoryToReserve);
        } catch (MemoryReservationException e) {
            throw new MemoryAllocationException(
                    String.format("Could not allocate %d pages", numberOfPages), e);
        }

        Runnable gcCleanup = memoryBudget.getReleaseMemoryAction(getPageSize());
        allocatedSegments.compute(
                owner,
                (o, currentSegmentsForOwner) -> {
                    Set<MemorySegment> segmentsForOwner =
                            currentSegmentsForOwner == null
                                    ? new HashSet<>(numberOfPages)
                                    : currentSegmentsForOwner;
                    for (long i = numberOfPages; i > 0; i--) {
                        MemorySegment segment =
                                allocateOffHeapUnsafeMemory(getPageSize(), owner, gcCleanup);
                        target.add(segment);
                        segmentsForOwner.add(segment);
                    }
                    return segmentsForOwner;
                });

        Preconditions.checkState(!isShutDown, "Memory manager has been concurrently shut down.");
    }
    
    public void release(MemorySegment segment) {
        Preconditions.checkState(!isShutDown, "Memory manager has been shut down.");

        // check if segment is null or has already been freed
        if (segment == null || segment.getOwner() == null) {
            return;
        }

        // remove the reference in the map for the owner
        try {
            allocatedSegments.computeIfPresent(
                    segment.getOwner(),
                    (o, segsForOwner) -> {
                        segment.free();
                        segsForOwner.remove(segment);
                        return segsForOwner.isEmpty() ? null : segsForOwner;
                    });
        } catch (Throwable t) {
            throw new RuntimeException(
                    "Error removing book-keeping reference to allocated memory segment.", t);
        }
    }

    /**
     * Tries to release many memory segments together.
     *
     * <p>The segment is only freed and made eligible for reclamation by the GC. Each segment will
     * be returned to the memory pool, increasing its available limit for the later allocations.
     *
     * @param segments The segments to be released.
     */
    public void release(Collection<MemorySegment> segments) {
        if (segments == null) {
            return;
        }

        Preconditions.checkState(!isShutDown, "Memory manager has been shut down.");

        boolean successfullyReleased = false;
        do {
          	Iterator<MemorySegment> segmentsIterator = segments.iterator();

            try {
                MemorySegment segment = null;
                while (segment == null && segmentsIterator.hasNext()) {
                    segment = segmentsIterator.next();
                }
                while (segment != null) {
                    segment = releaseSegmentsForOwnerUntilNextOwner(segment, segmentsIterator);
                }
                segments.clear();
                // the only way to exit the loop
                successfullyReleased = true;
            } catch (ConcurrentModificationException | NoSuchElementException e) {
                // this may happen in the case where an asynchronous
                // call releases the memory. fall through the loop and try again
            }
        } while (!successfullyReleased);
    }
    
    public void releaseAll(Object owner) {
        if (owner == null) {
            return;
        }

        Preconditions.checkState(!isShutDown, "Memory manager has been shut down.");

        // get all segments
        Set<MemorySegment> segments = allocatedSegments.remove(owner);

        // all segments may have been freed previously individually
        if (segments == null || segments.isEmpty()) {
            return;
        }

        // free each segment
        for (MemorySegment segment : segments) {
            segment.free();
        }

        segments.clear();
    }
    
    public void releaseMemory(Object owner, long size) {
        checkMemoryReservationPreconditions(owner, size);
        if (size == 0L) {
            return;
        }

        reservedMemory.compute(
                owner,
                (o, currentlyReserved) -> {
                    long newReservedMemory = 0;
                    if (currentlyReserved != null) {
                        if (currentlyReserved < size) {
                            LOG.warn(
                                    "Trying to release more memory {} than it was reserved {} so far for the owner {}",
                                    size,
                                    currentlyReserved,
                                    owner);
                        }

                        newReservedMemory =
                                releaseAndCalculateReservedMemory(size, currentlyReserved);
                    }

                    return newReservedMemory == 0 ? null : newReservedMemory;
                });
    }

    private long releaseAndCalculateReservedMemory(long memoryToFree, long currentlyReserved) {
        final long effectiveMemoryToRelease = Math.min(currentlyReserved, memoryToFree);
        memoryBudget.releaseMemory(effectiveMemoryToRelease);

        return currentlyReserved - effectiveMemoryToRelease;
    }

    private void checkMemoryReservationPreconditions(Object owner, long size) {
        Preconditions.checkNotNull(owner, "The memory owner must not be null.");
        Preconditions.checkState(!isShutDown, "Memory manager has been shut down.");
        Preconditions.checkArgument(
                size >= 0L, "The memory size (%s) has to have non-negative size", size);
    }

    /**
     * Releases all reserved memory chunks from an owner to this memory manager.
     *
     * @param owner The owner to associate with the memory reservation, for the fallback release.
     */
    public void releaseAllMemory(Object owner) {
        checkMemoryReservationPreconditions(owner, 0L);
        Long memoryReservedForOwner = reservedMemory.remove(owner);
        if (memoryReservedForOwner != null) {
            memoryBudget.releaseMemory(memoryReservedForOwner);
        }
    }
}


Copy the code
UnsafeMemoryBudget.java
class UnsafeMemoryBudget{ private static final int MAX_SLEEPS = 11; // 2^11 - 1 = (2 x 1024) - 1 ms ~ 2 s total sleep duration static final int MAX_SLEEPS_VERIFY_EMPTY = 17; // 2^17 - 1 = (128 x 1024) - 1 ms ~ 2 min total sleep duration private static final int RETRIGGER_GC_AFTER_SLEEPS = 9; // ~ 0.5 SEC private final Long totalMemorySize; private final AtomicLong availableMemorySize; private final int verifyEmptyWaitGcMaxSleeps; UnsafeMemoryBudget(long totalMemorySize, int verifyEmptyWaitGcMaxSleeps) { this.totalMemorySize = totalMemorySize; this.availableMemorySize = new AtomicLong(totalMemorySize); this.verifyEmptyWaitGcMaxSleeps = verifyEmptyWaitGcMaxSleeps; } long getTotalMemorySize() { return totalMemorySize; } long getAvailableMemorySize() { return availableMemorySize.get(); } boolean verifyEmpty() { try { // we wait longer than during the normal reserveMemory as we have to GC all memory, // allocated by task, to perform the verification reserveMemory(totalMemorySize, verifyEmptyWaitGcMaxSleeps); } catch (MemoryReservationException e) { return false; } releaseMemory(totalMemorySize); return availableMemorySize.get() == totalMemorySize; } /** * Reserve memory of certain size if it is available. * * <p>Adjusted version of {@link java.nio.Bits#reserveMemory(long, int)} taken from Java 11. */ void reserveMemory(long size) throws MemoryReservationException { reserveMemory(size, MAX_SLEEPS); } @SuppressWarnings({"OverlyComplexMethod", "JavadocReference", "NestedTryStatement"}) void reserveMemory(long size, int maxSleeps) throws MemoryReservationException { long availableOrReserved = tryReserveMemory(size); // optimist! if (availableOrReserved >= size) { return; } boolean interrupted = false; try { // Retry allocation until success or there are no more // references (including Cleaners that might free direct //  buffer memory) to process and allocation still fails. boolean refprocActive; do { try { refprocActive = JavaGcCleanerWrapper.tryRunPendingCleaners(); } catch (InterruptedException e) { // Defer interrupts and keep trying. interrupted = true; refprocActive = true; } availableOrReserved = tryReserveMemory(size); if (availableOrReserved >= size) { return; } } while (refprocActive); // trigger VM's Reference processing System.gc(); // A retry loop with exponential back-off delays. // Sometimes it would suffice to give up once reference // processing is complete. But if there are many threads // competing for memory, this gives more opportunities for // any given thread to make progress. In particular, this // seems to be enough for a stress test like // DirectBufferAllocTest to (usually) succeed, while // without it that test likely fails. Since failure here // ends in MemoryReservationException, there's no need to hurry. long sleepTime = 1; int sleeps = 0; while (true) { availableOrReserved = tryReserveMemory(size); if (availableOrReserved >= size) { return; } if (sleeps >= maxSleeps) { break; } try { if (! JavaGcCleanerWrapper.tryRunPendingCleaners()) { if (sleeps >= RETRIGGER_GC_AFTER_SLEEPS) { // trigger again VM's Reference processing if we have to wait longer System.gc(); } Thread.sleep(sleepTime); sleepTime <<= 1; sleeps++; } } catch (InterruptedException e) { interrupted = true; } } // no luck throw new MemoryReservationException( String.format( "Could not allocate %d bytes, only %d bytes are remaining. This usually indicates " + "that you are requesting more memory than you have reserved. " +  "However, when running an old JVM version it can also be caused by slow garbage collection. " + "Try to upgrade to Java 8u72 or higher if running on an old Java version.", size, availableOrReserved)); } finally { if (interrupted) { // don't swallow interrupts Thread.currentThread().interrupt(); } } } private long tryReserveMemory(long size) { long currentAvailableMemorySize; while (size <= (currentAvailableMemorySize = availableMemorySize.get())) { if (availableMemorySize.compareAndSet( currentAvailableMemorySize, currentAvailableMemorySize - size)) { return size; } } return currentAvailableMemorySize; } void releaseMemory(@Nonnegative long size) { if (size == 0) { return; } boolean released = false; long currentAvailableMemorySize = 0L; while (! released && totalMemorySize >= (currentAvailableMemorySize = availableMemorySize.get()) + size) { released = availableMemorySize.compareAndSet( currentAvailableMemorySize, currentAvailableMemorySize + size); } if (! released) { throw new IllegalStateException( String.format( "Trying to release more managed memory (%d bytes) than has been allocated (%d bytes), the total size is %d bytes", size, currentAvailableMemorySize, totalMemorySize)); } } /** * Generates an release memory action that can be performed later * * <p>The generated runnable could be safely referenced by possible gc cleaner action without * worrying about cycle reference back to memory manager. */ Runnable getReleaseMemoryAction(@Nonnegative long size) { return () -> { releaseMemory(size); }; }}Copy the code
SegmentsUtil.java
public class SegmentsUtil {

    /** Constant that flags the byte order. */
    public static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;

    private static final int ADDRESS_BITS_PER_WORD = 3;

    private static final int BIT_BYTE_INDEX_MASK = 7;

    /**
     * SQL execution threads is limited, not too many, so it can bear the overhead of 64K per
     * thread.
     */
    private static final int MAX_BYTES_LENGTH = 1024 * 64;

    private static final int MAX_CHARS_LENGTH = 1024 * 32;

    private static final int BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);

    private static final ThreadLocal<byte[]> BYTES_LOCAL = new ThreadLocal<>();
    private static final ThreadLocal<char[]> CHARS_LOCAL = new ThreadLocal<>();

    /**
     * Allocate bytes that is only for temporary usage, it should not be stored in somewhere else.
     * Use a {@link ThreadLocal} to reuse bytes to avoid overhead of byte[] new and gc.
     *
     * <p>If there are methods that can only accept a byte[], instead of a MemorySegment[]
     * parameter, we can allocate a reuse bytes and copy the MemorySegment data to byte[], then call
     * the method. Such as String deserialization.
     */
    public static byte[] allocateReuseBytes(int length) {
        byte[] bytes = BYTES_LOCAL.get();

        if (bytes == null) {
            if (length <= MAX_BYTES_LENGTH) {
                bytes = new byte[MAX_BYTES_LENGTH];
                BYTES_LOCAL.set(bytes);
            } else {
                bytes = new byte[length];
            }
        } else if (bytes.length < length) {
            bytes = new byte[length];
        }

        return bytes;
    }

    public static char[] allocateReuseChars(int length) {
        ...
    }

    /**
     * Copy segments to a new byte[].
     *
     * @param segments Source segments.
     * @param offset Source segments offset.
     * @param numBytes the number bytes to copy.
     */
    public static byte[] copyToBytes(MemorySegment[] segments, int offset, int numBytes) {
        return copyToBytes(segments, offset, new byte[numBytes], 0, numBytes);
    }

    /**
     * Copy segments to target byte[].
     *
     * @param segments Source segments.
     * @param offset Source segments offset.
     * @param bytes target byte[].
     * @param bytesOffset target byte[] offset.
     * @param numBytes the number bytes to copy.
     */
    public static byte[] copyToBytes(
            MemorySegment[] segments, int offset, byte[] bytes, int bytesOffset, int numBytes) {
        if (inFirstSegment(segments, offset, numBytes)) {
            segments[0].get(offset, bytes, bytesOffset, numBytes);
        } else {
            copyMultiSegmentsToBytes(segments, offset, bytes, bytesOffset, numBytes);
        }
        return bytes;
    }

    public static void copyMultiSegmentsToBytes(
            MemorySegment[] segments, int offset, byte[] bytes, int bytesOffset, int numBytes) {
        int remainSize = numBytes;
        for (MemorySegment segment : segments) {
            int remain = segment.size() - offset;
            if (remain > 0) {
                int nCopy = Math.min(remain, remainSize);
                segment.get(offset, bytes, numBytes - remainSize + bytesOffset, nCopy);
                remainSize -= nCopy;
                // next new segment.
                offset = 0;
                if (remainSize == 0) {
                    return;
                }
            } else {
                // remain is negative, let's advance to next segment
                // now the offset = offset - segmentSize (-remain)
                offset = -remain;
            }
        }
    }

    /**
     * Copy segments to target unsafe pointer.
     *
     * @param segments Source segments.
     * @param offset The position where the bytes are started to be read from these memory segments.
     * @param target The unsafe memory to copy the bytes to.
     * @param pointer The position in the target unsafe memory to copy the chunk to.
     * @param numBytes the number bytes to copy.
     */
    public static void copyToUnsafe(
            MemorySegment[] segments, int offset, Object target, int pointer, int numBytes) {
        if (inFirstSegment(segments, offset, numBytes)) {
            segments[0].copyToUnsafe(offset, target, pointer, numBytes);
        } else {
            copyMultiSegmentsToUnsafe(segments, offset, target, pointer, numBytes);
        }
    }

    private static void copyMultiSegmentsToUnsafe(
            MemorySegment[] segments, int offset, Object target, int pointer, int numBytes) {
        ...
    }

    /**
     * Copy bytes of segments to output view. Note: It just copies the data in, not include the
     * length.
     *
     * @param segments source segments
     * @param offset offset for segments
     * @param sizeInBytes size in bytes
     * @param target target output view
     */
    public static void copyToView(
            MemorySegment[] segments, int offset, int sizeInBytes, DataOutputView target)
            throws IOException {
        ...
    }

    /**
     * Copy target segments from source byte[].
     *
     * @param segments target segments.
     * @param offset target segments offset.
     * @param bytes source byte[].
     * @param bytesOffset source byte[] offset.
     * @param numBytes the number bytes to copy.
     */
    public static void copyFromBytes(
            MemorySegment[] segments, int offset, byte[] bytes, int bytesOffset, int numBytes) {
        if (segments.length == 1) {
            segments[0].put(offset, bytes, bytesOffset, numBytes);
        } else {
            copyMultiSegmentsFromBytes(segments, offset, bytes, bytesOffset, numBytes);
        }
    }

    private static void copyMultiSegmentsFromBytes(
            MemorySegment[] segments, int offset, byte[] bytes, int bytesOffset, int numBytes) {
        ...
    }

    /** Maybe not copied, if want copy, please use copyTo. */
    public static byte[] getBytes(MemorySegment[] segments, int baseOffset, int sizeInBytes) {
        // avoid copy if `base` is `byte[]`
        if (segments.length == 1) {
            byte[] heapMemory = segments[0].getHeapMemory();
            if (baseOffset == 0 && heapMemory != null && heapMemory.length == sizeInBytes) {
                return heapMemory;
            } else {
                byte[] bytes = new byte[sizeInBytes];
                segments[0].get(baseOffset, bytes, 0, sizeInBytes);
                return bytes;
            }
        } else {
            byte[] bytes = new byte[sizeInBytes];
            copyMultiSegmentsToBytes(segments, baseOffset, bytes, 0, sizeInBytes);
            return bytes;
        }
    }

    /**
     * Equals two memory segments regions.
     *
     * @param segments1 Segments 1
     * @param offset1 Offset of segments1 to start equaling
     * @param segments2 Segments 2
     * @param offset2 Offset of segments2 to start equaling
     * @param len Length of the equaled memory region
     * @return true if equal, false otherwise
     */
    public static boolean equals(
            MemorySegment[] segments1,
            int offset1,
            MemorySegment[] segments2,
            int offset2,
            int len) {
        if (inFirstSegment(segments1, offset1, len) && inFirstSegment(segments2, offset2, len)) {
            return segments1[0].equalTo(segments2[0], offset1, offset2, len);
        } else {
            return equalsMultiSegments(segments1, offset1, segments2, offset2, len);
        }
    }

    @VisibleForTesting
    static boolean equalsMultiSegments(
            MemorySegment[] segments1,
            int offset1,
            MemorySegment[] segments2,
            int offset2,
            int len) {
        if (len == 0) {
            // quick way and avoid segSize is zero.
            return true;
        }

        int segSize1 = segments1[0].size();
        int segSize2 = segments2[0].size();

        // find first segIndex and segOffset of segments.
        int segIndex1 = offset1 / segSize1;
        int segIndex2 = offset2 / segSize2;
        int segOffset1 = offset1 - segSize1 * segIndex1; // equal to %
        int segOffset2 = offset2 - segSize2 * segIndex2; // equal to %

        while (len > 0) {
            int equalLen = Math.min(Math.min(len, segSize1 - segOffset1), segSize2 - segOffset2);
            if (!segments1[segIndex1].equalTo(
                    segments2[segIndex2], segOffset1, segOffset2, equalLen)) {
                return false;
            }
            len -= equalLen;
            segOffset1 += equalLen;
            if (segOffset1 == segSize1) {
                segOffset1 = 0;
                segIndex1++;
            }
            segOffset2 += equalLen;
            if (segOffset2 == segSize2) {
                segOffset2 = 0;
                segIndex2++;
            }
        }
        return true;
    }

    /**
     * hash segments to int, numBytes must be aligned to 4 bytes.
     *
     * @param segments Source segments.
     * @param offset Source segments offset.
     * @param numBytes the number bytes to hash.
     */
    public static int hashByWords(MemorySegment[] segments, int offset, int numBytes) {
        if (inFirstSegment(segments, offset, numBytes)) {
            return MurmurHashUtil.hashBytesByWords(segments[0], offset, numBytes);
        } else {
            return hashMultiSegByWords(segments, offset, numBytes);
        }
    }

    private static int hashMultiSegByWords(MemorySegment[] segments, int offset, int numBytes) {
        byte[] bytes = allocateReuseBytes(numBytes);
        copyMultiSegmentsToBytes(segments, offset, bytes, 0, numBytes);
        return MurmurHashUtil.hashUnsafeBytesByWords(bytes, BYTE_ARRAY_BASE_OFFSET, numBytes);
    }

    /**
     * hash segments to int.
     *
     * @param segments Source segments.
     * @param offset Source segments offset.
     * @param numBytes the number bytes to hash.
     */
    public static int hash(MemorySegment[] segments, int offset, int numBytes) {
        if (inFirstSegment(segments, offset, numBytes)) {
            return MurmurHashUtil.hashBytes(segments[0], offset, numBytes);
        } else {
            return hashMultiSeg(segments, offset, numBytes);
        }
    }

    private static int hashMultiSeg(MemorySegment[] segments, int offset, int numBytes) {
        byte[] bytes = allocateReuseBytes(numBytes);
        copyMultiSegmentsToBytes(segments, offset, bytes, 0, numBytes);
        return MurmurHashUtil.hashUnsafeBytes(bytes, BYTE_ARRAY_BASE_OFFSET, numBytes);
    }

    /** Is it just in first MemorySegment, we use quick way to do something. */
    private static boolean inFirstSegment(MemorySegment[] segments, int offset, int numBytes) {
        return numBytes + offset <= segments[0].size();
    }

    /**
     * Given a bit index, return the byte index containing it.
     *
     * @param bitIndex the bit index.
     * @return the byte index.
     */
    private static int byteIndex(int bitIndex) {
        return bitIndex >>> ADDRESS_BITS_PER_WORD;
    }

    /**
     * unset bit.
     *
     * @param segment target segment.
     * @param baseOffset bits base offset.
     * @param index bit index from base offset.
     */
    public static void bitUnSet(MemorySegment segment, int baseOffset, int index) {
        int offset = baseOffset + byteIndex(index);
        byte current = segment.get(offset);
        current &= ~(1 << (index & BIT_BYTE_INDEX_MASK));
        segment.put(offset, current);
    }

    /**
     * set bit.
     *
     * @param segment target segment.
     * @param baseOffset bits base offset.
     * @param index bit index from base offset.
     */
    public static void bitSet(MemorySegment segment, int baseOffset, int index) {
        int offset = baseOffset + byteIndex(index);
        byte current = segment.get(offset);
        current |= (1 << (index & BIT_BYTE_INDEX_MASK));
        segment.put(offset, current);
    }

    /**
     * read bit.
     *
     * @param segment target segment.
     * @param baseOffset bits base offset.
     * @param index bit index from base offset.
     */
    public static boolean bitGet(MemorySegment segment, int baseOffset, int index) {
        int offset = baseOffset + byteIndex(index);
        byte current = segment.get(offset);
        return (current & (1 << (index & BIT_BYTE_INDEX_MASK))) != 0;
    }

    /**
     * unset bit from segments.
     *
     * @param segments target segments.
     * @param baseOffset bits base offset.
     * @param index bit index from base offset.
     */
    public static void bitUnSet(MemorySegment[] segments, int baseOffset, int index) {
        if (segments.length == 1) {
            MemorySegment segment = segments[0];
            int offset = baseOffset + byteIndex(index);
            byte current = segment.get(offset);
            current &= ~(1 << (index & BIT_BYTE_INDEX_MASK));
            segment.put(offset, current);
        } else {
            bitUnSetMultiSegments(segments, baseOffset, index);
        }
    }

    private static void bitUnSetMultiSegments(MemorySegment[] segments, int baseOffset, int index) {
        int offset = baseOffset + byteIndex(index);
        int segSize = segments[0].size();
        int segIndex = offset / segSize;
        int segOffset = offset - segIndex * segSize; // equal to %
        MemorySegment segment = segments[segIndex];

        byte current = segment.get(segOffset);
        current &= ~(1 << (index & BIT_BYTE_INDEX_MASK));
        segment.put(segOffset, current);
    }

    /**
     * set bit from segments.
     *
     * @param segments target segments.
     * @param baseOffset bits base offset.
     * @param index bit index from base offset.
     */
    public static void bitSet(MemorySegment[] segments, int baseOffset, int index) {
        if (segments.length == 1) {
            int offset = baseOffset + byteIndex(index);
            MemorySegment segment = segments[0];
            byte current = segment.get(offset);
            current |= (1 << (index & BIT_BYTE_INDEX_MASK));
            segment.put(offset, current);
        } else {
            bitSetMultiSegments(segments, baseOffset, index);
        }
    }

    private static void bitSetMultiSegments(MemorySegment[] segments, int baseOffset, int index) {
        int offset = baseOffset + byteIndex(index);
        int segSize = segments[0].size();
        int segIndex = offset / segSize;
        int segOffset = offset - segIndex * segSize; // equal to %
        MemorySegment segment = segments[segIndex];

        byte current = segment.get(segOffset);
        current |= (1 << (index & BIT_BYTE_INDEX_MASK));
        segment.put(segOffset, current);
    }

    /**
     * read bit from segments.
     *
     * @param segments target segments.
     * @param baseOffset bits base offset.
     * @param index bit index from base offset.
     */
    public static boolean bitGet(MemorySegment[] segments, int baseOffset, int index) {
        int offset = baseOffset + byteIndex(index);
        byte current = getByte(segments, offset);
        return (current & (1 << (index & BIT_BYTE_INDEX_MASK))) != 0;
    }

    /**
     * get boolean from segments.
     *
     * @param segments target segments.
     * @param offset value offset.
     */
    public static boolean getBoolean(MemorySegment[] segments, int offset) {
        ...
    }

    private static boolean getBooleanMultiSegments(MemorySegment[] segments, int offset) {
        ...
    }

    /**
     * set boolean from segments.
     *
     * @param segments target segments.
     * @param offset value offset.
     */
    public static void setBoolean(MemorySegment[] segments, int offset, boolean value) {
        ...
    }

    private static void setBooleanMultiSegments(
            MemorySegment[] segments, int offset, boolean value) {
        ...
    }

    /**
     * get byte from segments.
     *
     * @param segments target segments.
     * @param offset value offset.
     */
    public static byte getByte(MemorySegment[] segments, int offset) {
        ...
    }

    private static byte getByteMultiSegments(MemorySegment[] segments, int offset) {
        ...
    }

    /**
     * set byte from segments.
     *
     * @param segments target segments.
     * @param offset value offset.
     */
    public static void setByte(MemorySegment[] segments, int offset, byte value) {
        ...
    }

    private static void setByteMultiSegments(MemorySegment[] segments, int offset, byte value) {
        ...
    }

    /**
     * get int from segments.
     *
     * @param segments target segments.
     * @param offset value offset.
     */
    public static int getInt(MemorySegment[] segments, int offset) {
        ...
    }

    private static int getIntMultiSegments(MemorySegment[] segments, int offset) {
        ...
    }

    private static int getIntSlowly(
            MemorySegment[] segments, int segSize, int segNum, int segOffset) {
        ...
    }

    /**
     * set int from segments.
     *
     * @param segments target segments.
     * @param offset value offset.
     */
    public static void setInt(MemorySegment[] segments, int offset, int value) {
        ...
    }

    private static void setIntMultiSegments(MemorySegment[] segments, int offset, int value) {
        ...
    }

    private static void setIntSlowly(
            MemorySegment[] segments, int segSize, int segNum, int segOffset, int value) {
        ...
    }

    /**
     * get long from segments.
     *
     * @param segments target segments.
     * @param offset value offset.
     */
    public static long getLong(MemorySegment[] segments, int offset) {
        ...
    }

    private static long getLongMultiSegments(MemorySegment[] segments, int offset) {
        ...
    }

    private static long getLongSlowly(
            MemorySegment[] segments, int segSize, int segNum, int segOffset) {
        ...
    }

    public static void setLong(MemorySegment[] segments, int offset, long value) {
        ...
    }

    private static void setLongMultiSegments(MemorySegment[] segments, int offset, long value) {
        ...
    }

    private static void setLongSlowly(
            MemorySegment[] segments, int segSize, int segNum, int segOffset, long value) {
        ...
    }

   
    public static short getShort(MemorySegment[] segments, int offset) {
        ...
    }

    private static short getShortMultiSegments(MemorySegment[] segments, int offset) {
        ...
    }

    
    public static void setShort(MemorySegment[] segments, int offset, short value) {
        ...
    }

    private static void setShortMultiSegments(MemorySegment[] segments, int offset, short value) {
        ...
    }

    
    public static float getFloat(MemorySegment[] segments, int offset) {
        ...
    }

    private static float getFloatMultiSegments(MemorySegment[] segments, int offset) {
        ...
    }

    
    public static void setFloat(MemorySegment[] segments, int offset, float value) {
        ...
    }

    private static void setFloatMultiSegments(MemorySegment[] segments, int offset, float value) {
        ...
    }

    public static double getDouble(MemorySegment[] segments, int offset) {
        ...
    }

    private static double getDoubleMultiSegments(MemorySegment[] segments, int offset) {
        ...
    }

    public static void setDouble(MemorySegment[] segments, int offset, double value) {
        ...
    }

    private static void setDoubleMultiSegments(MemorySegment[] segments, int offset, double value) {
        ...
    }

    private static int getTwoByteSlowly(
            MemorySegment[] segments, int segSize, int segNum, int segOffset) {
        ...
    }

    private static void setTwoByteSlowly(
            MemorySegment[] segments, int segSize, int segNum, int segOffset, int b1, int b2) {
        ...
    }

    public static int find(
            MemorySegment[] segments1,
            int offset1,
            int numBytes1,
            MemorySegment[] segments2,
            int offset2,
            int numBytes2) {
        ...
    }

    private static int findInMultiSegments(
        ...
    }
}
Copy the code
Flink MemorySegment structure diagram

MemorySegment:
  • UNSAFE: non-secure API for handling heap/non-heap memory
  • BYTE_ARRAY_BASE_OFFSET: The starting index of a binary byte array, relative to the byte array object
  • Byte [] heapMemory a reference to the accessed memory if it is heapMemory, or null if it is non-heap memory
  • long address; Final Long addressLimit; Int size; Size of memory segment in bytes
  • Object Owner Indicates the owner of this Segment
DataView DataView
Datainputview. Java interface definitions void skipBytesToRead(int numBytes) throws IOException; int read(byte[] b, int off, int len) throws IOException; int read(byte[] b) throws IOException;Copy the code
Dataoutputview.java interface definition
void skipBytesToWrite(int numBytes) throws IOException;

void write(DataInputView source, int numBytes) throws IOException;
Copy the code

AbstractPagedOutputView && AbstractPagedIntputView

  • Provides a further implementation of view based on page
  • Provides input/output views across multiple memory pages.
  • Includes decoding/encoding methods for reading/writing data from the page and checking across page boundaries

Parameter Description:

CurrentSegment Indicates the memory segment currently being manipulated

HeaderLength Each memory segment must be preceded by a header that stores metadata. This header length must be skipped when accessing data. This pageView is required to refer to all memory segments with the same header length

PositionInSegment Similar to a pointer to the current position of a segment (relative to the segment)

LimitInSegment is like a pointer to the end of a Segment because each block size is a fixed 32K;

NumSegments Number of segments numSegments Number of segments managed in a Page

AbstractPagedInputView.java

Several important interface definitions and implementations

Interface definition. How to get the next Segment in page

protected abstract MemorySegment nextSegment(MemorySegment current)
            throws EOFException, IOException;
            
protected abstract int getLimitForSegment(MemorySegment segment);
Copy the code
protected void doAdvance() throws IOException { // note: this code ensures that in case of EOF, we stay at the same position such that // EOF is reproducible (if nextSegment throws a reproducible EOFException) this.currentSegment = nextSegment(this.currentSegment); this.limitInSegment = getLimitForSegment(this.currentSegment); this.positionInSegment = this.headerLength; // 8-bit header, no data stored}Copy the code
public int read(byte[] b, int off, int len) throws IOException { if (off < 0 || len < 0 || off + len > b.length) { throw new IndexOutOfBoundsException(); } int remaining = this.limitInSegment - this.positionInsegment; Get (this.positionInSegment, b, off, len); if (Remaining >= len) {// Remaining space is enough to write data. // Current pointer offset this.positionInSegment += len; return len; } else {if (remaining == 0) {try {advance(); } catch (EOFException eof) { return -1; } / /. Remaining = this.limitInSegment - this.positionInsegment; remaining = this.limitinsegment - this.positionInsegment; } int bytesRead = 0; While (true) {// Reading min at once (len,remaining) There are two cases // Remaining < len indicates that the current block is not enough and the remaining block needs to be moved to the next one. // Case 2: Len-bytesread =0; len-bytesRead =0; len-bytesRead =0; Int toRead = math.min (remaining, len-bytesRead); int toRead = math.min (remaining, len-bytesRead); this.currentSegment.get(this.positionInSegment, b, off, toRead); off += toRead; bytesRead += toRead; If (len > bytesRead) {// Segment try {advance(); } catch (EOFException eof) { this.positionInSegment += toRead; return bytesRead; } remaining = this.limitInSegment - this.positionInSegment; } else {// Update the positionInSegment. This. PositionInSegment += toRead; break; } } return len; }}Copy the code
AbstractPagedInputView.java
protected abstract MemorySegment nextSegment(MemorySegment current, int positionInCurrent)
        throws IOException;
Copy the code
protected final MemorySegment nextSegment(MemorySegment current, int posInSegment) throws IOException { if (current ! = null) { writeSegment(current, posInSegment, false); } final MemorySegment next = this.writer.getNextReturnedBlock(); this.blockCount++; return next; } @Override public void write(MemorySegment segment, int off, int len) throws IOException { int remaining = this.segmentSize - this.positionInSegment; if (remaining >= len) { segment.copyTo(off, currentSegment, positionInSegment, len); this.positionInSegment += len; } else { if (remaining == 0) { advance(); remaining = this.segmentSize - this.positionInSegment; } while (true) { int toPut = Math.min(remaining, len); segment.copyTo(off, currentSegment, positionInSegment, toPut); off += toPut; len -= toPut; if (len > 0) { this.positionInSegment = this.segmentSize; advance(); remaining = this.segmentSize - this.positionInSegment; } else { this.positionInSegment += toPut; break; }}}}Copy the code
MemoryManager

The total amount of memory managed and the size of each page get the number of pages to generate the corresponding number of pages as available memory.

private final MemoryType memoryType; // The storage form of memory, which is divided into in-heap memory and off-heap memory, can only exist in one form of memory.

private final long memorySize; // The total amount of memory managed in the current memory management.

private final int pageSize; // The size of each page in memory management. The default 32 k. The minimum is 4 k

public static final int DEFAULT_PAGE_SIZE = 32 * 1024; public static final int MIN_PAGE_SIZE = 4 * 1024;

private final int totalNumPages; // Total pages = memorySize/pageSize

private final boolean isPreAllocated; // Whether to pre-allocate. The default value is true for out-of-heap memory to enable preallocation

private int numNonAllocatedPages; // Number of memory pages not applied, because flink memory is applied when used

public void allocatePages(Object owner, Collection<MemorySegment> target, int numberOfPages) throws MemoryAllocationException { // sanity check Preconditions.checkNotNull(owner, "The memory owner must not be null."); Preconditions.checkState(! isShutDown, "Memory manager has been shut down."); Preconditions.checkArgument( numberOfPages <= totalNumberOfPages, "Cannot allocate more segments %d than the max number %d", numberOfPages, totalNumberOfPages); // reserve array space, if applicable if (target instanceof ArrayList) { ((ArrayList<MemorySegment>) target).ensureCapacity(numberOfPages); } long memoryToReserve = numberOfPages * pageSize; try { memoryBudget.reserveMemory(memoryToReserve); } catch (MemoryReservationException e) { throw new MemoryAllocationException( String.format("Could not allocate %d pages", numberOfPages), e); } / / bind the owner and the actual distribution of Runnable gcCleanup = memoryBudget. GetReleaseMemoryAction (getPageSize ()); allocatedSegments.compute( owner, (o, currentSegmentsForOwner) -> { Set<MemorySegment> segmentsForOwner = currentSegmentsForOwner == null ? new HashSet<>(numberOfPages) : currentSegmentsForOwner; // Allocate segment and fill in target for (long I = numberOfPages; i > 0; i--) { MemorySegment segment = allocateOffHeapUnsafeMemory(getPageSize(), owner, gcCleanup); target.add(segment); segmentsForOwner.add(segment); } return segmentsForOwner; }); Preconditions.checkState(! isShutDown, "Memory manager has been concurrently shut down."); } /** * Tries to release the memory for the specified segment. * * <p>If the segment has already been released, it is only freed. If it is null or has no owner, * the request is simply ignored. The segment is only freed and made eligible for reclamation by * the GC. The segment will be returned to the memory pool, increasing its available limit for * the later allocations. * * @param segment The segment to be released. */ public void release(MemorySegment segment) { Preconditions.checkState(! isShutDown, "Memory manager has been shut down."); // check if segment is null or has already been freed if (segment == null || segment.getOwner() == null) { return; } // remove the reference in the map for the owner try { allocatedSegments.computeIfPresent( segment.getOwner(), (o, segsForOwner) -> { segment.free(); segsForOwner.remove(segment); return segsForOwner.isEmpty() ? null : segsForOwner; }); } catch (Throwable t) { throw new RuntimeException( "Error removing book-keeping reference to allocated memory segment.", t); }Copy the code
MemorySegmentPool

A SegmentPool is a pool for applying and managing memorysegments, allocating memorysegments, and recycling and caching

private static final long PER_REQUEST_MEMORY_SIZE = 16 * 1024 * 1024; // Default Pagesize 16K

private final MemoryManager memoryManager; // Actual memory management

private final ArrayList cachePages; / / the buffer pool

private final int maxPages; // Maximum number of admin pages

private int pageUsage; // The total page is already in use

@Override
public void returnAll(List<MemorySegment> memory) {
    this.pageUsage -= memory.size();
    if (this.pageUsage < 0) {
        throw new RuntimeException("Return too more memories.");
    }
    this.cachePages.addAll(memory);
}

@Override
public MemorySegment nextSegment() {
    int freePages = freePages();
    if (freePages == 0){
    		return null;
    }

    if (this.cachePages.isEmpty()) {
        int numPages = Math.min(freePages, this.perRequestPages);
        try {
            this.memoryManager.allocatePages(owner, this.cachePages, numPages);
        } catch (MemoryAllocationException e) {
            throw new RuntimeException(e);
        }
    }
    this.pageUsage++;
    return this.cachePages.remove(this.cachePages.size() - 1);
}

public List<MemorySegment> allocateSegments(int required) {
    int freePages = freePages();
    if (freePages < required) {
        return null;
    }

    List<MemorySegment> ret = new ArrayList<>(required);
    for (int i = 0; i < required; i++) {
        MemorySegment segment;
        try {
            segment = nextSegment();
            Preconditions.checkNotNull(segment);
        } catch (Throwable t) {
            // unexpected, we should first return all temporary segments
            returnAll(ret);
            throw t;
        }
        ret.add(segment);
    }
    return ret;
}
Copy the code
ResultPartition

RP indicates the data chunk written by BufferWriter. An RP is a collection of ResultSubPartitions (RS)

A ResultPartition has three phases in its life cycle: production, consumption, and release.

ResultSubpartition

Represents a partition of data created by an operator and transmitted to the receiving operator along with the data logic to be transferred. The specific implementation of RS determines the final data transmission logic. It is designed as a plug-in mechanism to meet the various data transmission requirements of the system.

InputGate: On the receiving end, logically equivalent to RP, processes and collects data from the upstream buffer

InputChannel. On the receiving side, logically equivalent to RS, is used to receive data for a particular partition.

Buffer: Serializers, deserializers are used to reliably convert typed data into pure binary data and process data across buffers

ResultSubpartitionView

Summary:

  • Build segment object pool (pool) to enhance reuse and reduce the overhead of repeated allocation and recycling
  • Standardize the size of segment (page), improve operation efficiency
  • Abstract access complexity across segments (Viewization)

Sort Shuffle Indicates the read and write process

Flink Sort Shuffle flowchart

A few core class descriptions:

The SortMergeResultPartition core class that handles Segment Sort Shuffle contains a SortBuffer instance and a PartitionedFileWriter instance.

SortBuffer sortedBuffer sortedBuffer instance

PartitionedFileWriter fileWriter Specifies the actual instance of writing a file

PartitionSortedBuffer the memory Buffer structure that really implements Sort Buffer

  • ArrayList Buffers Specifies the Segment array for record
  • Long firstIndexEntryAddresses []. The address of the first record in each partition
  • long[] lastIndexEntryAddresses; The address of the last record in each partition
  • Int. BufferSize Specifies the requested buffer size
  • Long numTotalRecords. Total number of records in sortBuffer
  • Long numTotalBytes. The total number of append Bytes records in sortBuffer
  • Long numTotalBytesRead. Number of read Bytes records in sortBuffer
  • boolean isFinished && boolean isReleased
  • Int writeSegmentIndex. The subscript position of the segment currently being written. The subscript of the buffers array
  • Int writeSegmentOffset. Offset is written to the current segment
  • int[] subpartitionReadOrder;
  • Long readIndexEntryAddress. The address of the index currently read
  • Int recordRemainingBytes. The address of the record currently read
  • Int readOrderIndex Current readable channel subscript, which sets the channel corresponding to the partition that can be read
  • BufferPool BufferPool BufferPool object used to apply for a Buffer

There are three sets of concepts involved:

Partition. The Partition

Index The index is a long data with 64 bits. The high 32 bits record the data length and the low 32 bits record the data type.

The highest 32 bits refer only to the subscript of the segment in the segments list, and the lowest 32 bits refer only to the offset within the segment.

public boolean append(ByteBuffer source, int targetChannel, DataType dataType) throws IOException { checkArgument(source.hasRemaining(), "Cannot append empty data."); checkState(! isFinished, "Sort buffer is already finished."); checkState(! isReleased, "Sort buffer is already released."); ByteBuffer size - current position int totalBytes = source.remaining(); // return false directly if it can not allocate enough buffers for the given record if (! allocateBuffersForRecord(totalBytes)) { return false; } // write the index entry and record or event data writeIndex(targetChannel, totalBytes, dataType); writeRecord(source); ++numTotalRecords; numTotalBytes += totalBytes; return true; } private Boolean allocateBuffersForRecord(int numRecordBytes) throws IOException {// The required bytes INDEX_ENTRY_SIZE is counted int numBytesRequired = INDEX_ENTRY_SIZE + numRecordBytes; If the segment is written for the first time, bufferSize is the set size for the segment. WriteSegmentOffset is the offset for writing the segment.  int availableBytes = writeSegmentIndex == buffers.size() ? 0 : bufferSize - writeSegmentOffset; // return directly if current available bytes is adequate if (availableBytes >= numBytesRequired) { return true; } skip the remaining free space if the available bytes is not enough for an index entry If less than 16 bytes are used to write IndexEntry IndexEntry does not support cross-segment writeSegmentOffset and writeSegmentIndex writeSegmentIndex++ need to be updated WriteSegmentOffset = 0 position of the next segment if (availableBytes < INDEX_ENTRY_SIZE) { updateWriteSegmentIndexAndOffset(availableBytes); availableBytes = 0; } // allocate exactly enough buffers for the appended record // BufferSize; Do {MemorySegment segment = requestBufferFromPool(); // If availableBytes are not sufficient < numBytesRequired; if (segment == null) { // return false if we can not allocate enough buffers for the appended record return false; } availableBytes += bufferSize; addBuffer(segment); } while (availableBytes < numBytesRequired); return true; } private void updateWriteSegmentIndexAndOffset (int numBytes) {/ / update the current segment written offset writeSegmentOffset + = numBytes; // Using the next available free buffer if the current is full If (writeSegmentOffset == bufferSize) {++writeSegmentIndex; writeSegmentOffset = 0; } } private void writeIndex(int channelIndex, int numRecordBytes, Buffer.DataType DataType) {// Find the Segment writeSegmentIndex is the subindex of buffers MemorySegment Segment = buffers.get(writeSegmentIndex); // An index is a 64-bit long. The high 32 bits record the data length and the low 32 bits record the data type. segment.putLong(writeSegmentOffset, ((long) numRecordBytes << 32) | dataType.ordinal()); // indexEntryAddress is also a data of type long. The upper 32 bits only refer to the subscript of the segment in the segments list, and the lower 32 bits refer to the offset inside the segment. long indexEntryAddress = ((long) writeSegmentIndex << 32) | writeSegmentOffset; The new data index is appended to the previous data index. If there is no previous data, add firstIndexEntryAddresses directly to indicate that the current data is the earliest data in the partition. long lastIndexEntryAddress = lastIndexEntryAddresses[channelIndex]; lastIndexEntryAddresses[channelIndex] = indexEntryAddress; If (lastIndexEntryAddress >= 0) {// +8 indicates that the size of the data index is written to indexEntryAddress segment = buffers.get(getSegmentIndexFromPointer(lastIndexEntryAddress)); segment.putLong( getSegmentOffsetFromPointer(lastIndexEntryAddress) + 8, indexEntryAddress); } else { firstIndexEntryAddresses[channelIndex] = indexEntryAddress; } // move the write position forward so as to write the corresponding record updateWriteSegmentIndexAndOffset(INDEX_ENTRY_SIZE); } / / get the segment in the list of subscript private int getSegmentIndexFromPointer (value) {return (int) (value > > > 32); } / / get the segment internal offset offset private int getSegmentOffsetFromPointer (value) {return (int) (value); } private void writeRecord(ByteBuffer source) {while (source.hasRemaining()) { MemorySegment segment = buffers. Get (writeSegmentIndex); Int toCopy = math.min (bufferSize -); // toCopy = min(remaining size of the current segment,source size of the current segment) source.remaining( writeSegmentOffset, source.remaining()); segment.put(writeSegmentOffset, source, toCopy); // move the write position forward so as to write the remaining bytes or next record updateWriteSegmentIndexAndOffset(toCopy); }}Copy the code

Source is the buffer that needs to be written. Channel is the partition channel to be written to. Datatype currently has two types: data Buffer and event Buffer. The reads and writes happen to be reversible, which I won’t show you in detail here

@Override public BufferWithChannel copyIntoSegment(MemorySegment target) { checkState(hasRemaining(), "No data remaining."); checkState(isFinished, "Should finish the sort buffer first before coping any data."); checkState(! isReleased, "Sort buffer is already released."); int numBytesCopied = 0; DataType bufferDataType = DataType.DATA_BUFFER; int channelIndex = subpartitionReadOrder[readOrderIndex]; Do {/ / read the metadata part int sourceSegmentIndex = getSegmentIndexFromPointer (readIndexEntryAddress); int sourceSegmentOffset = getSegmentOffsetFromPointer(readIndexEntryAddress); MemorySegment sourceSegment = buffers.get(sourceSegmentIndex); long lengthAndDataType = sourceSegment.getLong(sourceSegmentOffset); int length = getSegmentIndexFromPointer(lengthAndDataType); DataType dataType = DataType.values()[getSegmentOffsetFromPointer(lengthAndDataType)]; // return the data read directly if the next to read is an event if (dataType.isEvent() && numBytesCopied > 0) { break; } bufferDataType = dataType; // get the next index entry address and move the read position forward long nextReadIndexEntryAddress = sourceSegment.getLong(sourceSegmentOffset + 8); sourceSegmentOffset += INDEX_ENTRY_SIZE; // allocate a temp buffer for the event if the target buffer is not big enough if (bufferDataType.isEvent() && target.size() < length) { target = MemorySegmentFactory.allocateUnpooledSegment(length); } += copyRecordOrEvent(Target, numBytesCopied, sourceSegmentIndex, sourceSegmentOffset, length); if (recordRemainingBytes == 0) { // move to next channel if the current channel has been finished if (readIndexEntryAddress == lastIndexEntryAddresses[channelIndex]) { updateReadChannelAndIndexEntryAddress(); break; } readIndexEntryAddress = nextReadIndexEntryAddress; } } while (numBytesCopied < target.size() && bufferDataType.isBuffer()); numTotalBytesRead += numBytesCopied; Buffer buffer = new NetworkBuffer(target, (buf) -> {}, bufferDataType, numBytesCopied); return new BufferWithChannel(buffer, channelIndex); } private int copyRecordOrEvent( MemorySegment targetSegment, int targetSegmentOffset, int sourceSegmentIndex, int sourceSegmentOffset, int recordLength) { if (recordRemainingBytes > 0) { // skip the data already read if there is remaining partial record after the previous // copy long position = (long) sourceSegmentOffset + (recordLength - recordRemainingBytes); sourceSegmentIndex += (position / bufferSize); sourceSegmentOffset = (int) (position % bufferSize); } else { recordRemainingBytes = recordLength; } int targetSegmentSize = targetSegment.size(); int numBytesToCopy = Math.min(targetSegmentSize - targetSegmentOffset, recordRemainingBytes); do { // move to next data buffer if all data of the current buffer has been copied if (sourceSegmentOffset == bufferSize) { ++sourceSegmentIndex; sourceSegmentOffset = 0; } int sourceRemainingBytes = Math.min(bufferSize - sourceSegmentOffset, recordRemainingBytes); int numBytes = Math.min(targetSegmentSize - targetSegmentOffset, sourceRemainingBytes); MemorySegment sourceSegment = buffers.get(sourceSegmentIndex); sourceSegment.copyTo(sourceSegmentOffset, targetSegment, targetSegmentOffset, numBytes); recordRemainingBytes -= numBytes; targetSegmentOffset += numBytes; sourceSegmentOffset += numBytes; } while ((recordRemainingBytes > 0 && targetSegmentOffset < targetSegmentSize)); return numBytesToCopy; }Copy the code

PartitionedFileWriter

Mainly write Index file and data data file

public void writeBuffer(Buffer target, int targetSubpartition) throws IOException { checkState(! isFinished, "File writer is already finished."); checkState(! isClosed, "File writer is already closed."); if (targetSubpartition ! = currentSubpartition) { checkState( subpartitionBuffers[targetSubpartition] == 0, "Must write data of the same channel together."); subpartitionOffsets[targetSubpartition] = totalBytesWritten; currentSubpartition = targetSubpartition; } totalBytesWritten += writeToByteChannel(dataFileChannel, target, writeDataCache, header); ++subpartitionBuffers[targetSubpartition]; } private void writeIndexEntry(long subpartitionOffset, int numBuffers) throws IOException { if (! indexBuffer.hasRemaining()) { if (! extendIndexBufferIfPossible()) { flushIndexBuffer(); indexBuffer.clear(); allIndexEntriesCached = false; } } indexBuffer.putLong(subpartitionOffset); indexBuffer.putInt(numBuffers); } static long writeToByteChannel( FileChannel channel, Buffer buffer, [] arrayWithHeaderBuffer) throws IOException {// Write Buffer Header final ByteBuffer headerBuffer = arrayWithHeaderBuffer[0]; headerBuffer.clear(); headerBuffer.putShort(buffer.isBuffer() ? HEADER_VALUE_IS_BUFFER : HEADER_VALUE_IS_EVENT); headerBuffer.putShort( buffer.isCompressed() ? BUFFER_IS_COMPRESSED : BUFFER_IS_NOT_COMPRESSED); headerBuffer.putInt(buffer.getSize()); headerBuffer.flip(); final ByteBuffer dataBuffer = buffer.getNioBufferReadable(); arrayWithHeaderBuffer[1] = dataBuffer; final long bytesExpected = HEADER_LENGTH + dataBuffer.remaining(); // The file channel implementation guarantees that all bytes are written when invoked // because it is a blocking channel (the implementation mentioned it as guaranteed). // However, the api docs leaves it somewhat open, so it seems to be an undocumented contract // in the JRE. // We build this safety net to be on the safe side. if (bytesExpected < channel.write(arrayWithHeaderBuffer)) { writeBuffers(channel, arrayWithHeaderBuffer); } return bytesExpected; } static Buffer readFromByteChannel( FileChannel channel, ByteBuffer headerBuffer, MemorySegment memorySegment, BufferRecycler bufferRecycler) throws IOException { headerBuffer.clear(); if (! tryReadByteBuffer(channel, headerBuffer)) { return null; } headerBuffer.flip(); final ByteBuffer targetBuf; final boolean isEvent; final boolean isCompressed; final int size; Try {isEvent = headerBuffer.getShort() == HEADER_VALUE_IS_EVENT; isCompressed = headerBuffer.getShort() == BUFFER_IS_COMPRESSED; size = headerBuffer.getInt(); targetBuf = memorySegment.wrap(0, size); } catch (BufferUnderflowException | IllegalArgumentException e) { // buffer underflow if header buffer is undersized // IllegalArgumentException if size is outside memory segment size throwCorruptDataException(); return null; // Silence compiler} // readByteBufferFully(channel, targetBuf); Buffer.DataType dataType = isEvent ? Buffer.DataType.EVENT_BUFFER : Buffer.DataType.DATA_BUFFER; return new NetworkBuffer(memorySegment, bufferRecycler, dataType, isCompressed, size); }Copy the code

Flink memory model

JobManager memory model
  • jobmanager.memory.process.size:
TaskManager memory model
  • Flink memory = heap memory + out-of-heap memory
  • JobManager = JobManager + TaskManager
  • Out-of-heap memory = out-of-frame memory + out-of-task memory + network buffer memory
component Heap memory Out of memory
JobManager Frame heap memory Frame out of heap memory
TaskManager Taskmanager. Memory. Framework, heap size (taskmanager itself memory) taskmanager. Memory. Task. Heap. Size task when the user code used by the memory on the heap Taskmanager. Memory. Framework. Off – heap. The size of external memory taskmanager taskmanager itself occupies. The memory. Task. Off – heap. Size task executes the external memory used by the user code
Network Buffer There is no taskmanager.memory.network.fraction taskmanager.memory.network.min taskmanager.memory.network.max
JVM Mete Space The JVM dimension There is no
JVM taskmanager.memory.jvm-overhead.min taskmanager.memory.jvm-overhead.max taskmanager.memory.jvm-overhead.fraction There is no
Managed memory There is no Taskmanager. Memory. Managed. Fraction taskmanager. Memory.. Managed heap size external memory, used for sorting, hash table, caching intermediate results

The total memory

Total process memory:

The total memory consumed by the Flink Java application (including user code) and the JVM to run the entire process.

Total process memory = Flink used memory + JVM meta-space + JVM execution overhead

taskmanager.memory.flink.size

Flink total memory

Memory consumed by the Flink Java application, including user code, but not memory allocated by the JVM to run it

Total memory = inside and outside the frame heap + inside and outside the task heap + Network + management

taskmanager.memory.flink.size 
Copy the code

Knowing the overall architecture and memory management mechanism of Flink, what can be done to optimize and configure Flink cluster and task memory?

1. Number of records and size of each record

The first thing to do to determine the size of a cluster is to estimate the number of records per second (often referred to as throughput) expected to enter the stream computing system, as well as the size of each record. Different record types will have different sizes, which will ultimately affect the resources required for a Flink application to run smoothly.

2. The number of different keys and the state size of each key

The number of different keys in your application, and the amount of state each key needs to store, will affect the resources a Flink application needs to run efficiently and avoid any backpressure.

3. Update frequency of the status and access mode of the status backend

A third consideration is the frequency of state updates, since updating a state is usually an expensive action. The access patterns of different state backends (such as RocksDB, Java Heap) vary greatly. Each read and update of RocksDB involves serialization, deserialization and JNI operations, while the Java Heap state backends do not support incremental checkpoint. In large-state scenarios, a large amount of data needs to be persisted each time. These factors can significantly affect the size of the cluster and the resources required for a Flink job.

4. Network capacity

Network capacity is affected not only by the Flink application itself, but also by external services such as Kafka and HDFS that may be interacting with it. These external services may cause additional network traffic. For example, enabling Replication might generate additional traffic between message brokers on a network.

5. Disk bandwidth

If your application relies on a disk-based state back end such as RocksDB, or if you are considering using Kafka or HDFS, disk bandwidth also needs to be taken into consideration.

6. Number of machines and their available CPU and memory

Last but not least, before starting application deployment, you need to consider the number of machines available in the cluster and the amount of CPU and memory available. This ultimately ensures that the cluster has sufficient processing power once the application is put into production.