preface

CocoaAsyncSocket CocoaAsyncSocket is an IM framework written by Google based on BSD-Socket. It provides an easy-to-use, powerful asynchronous Socket library for Mac and iOS, and encapsulates the easy-to-use OC interface upwards. Save us for Socket and data Stream and other complicated programming. This is the entire library class, one is based on TCP, one is based on UDP, this article is mainly introduced GCDAsyncSocket source code

GCDAsyncSocket source code analysis

The client and server processes of socket are as follows. In most cases, we are based on client development, so the flow of our analysis follows the flow of the client:

First to seeGCDAsyncSocketProperties of this class

@implementation asyncsocket {//flags, uint32_t flags; uint16_t config; // proxy __weak id<GCDAsyncSocketDelegate> delegate; // queue dispatch_queue_t delegateQueue; IPV4Socket int socket4FD; IPV6Socket int socket6FD; Locahost VS 127.0.0.1 // socket int socketUN; // Unix domain server url NSURL *socketUrl; // stateIndex int stateIndex; // IPV4 address of the host interface NSData * connectInterface4; // The local IPV6 address NSData * connectInterface6; // Local Unix domain address NSData * connectInterfaceUN; // All Socket operations of this class are in this queue, serial dispatch_queue_t socketQueue; // source --> mergdata get_data buffer SSL CFStream // data dispatch_source_t accept4Source; dispatch_source_t accept6Source; dispatch_source_t acceptUNSource; // Connect to dispatch_source_t connectTimer; dispatch_source_treadSource;
    dispatch_source_t writeSource;
    dispatch_source_t readTimer; dispatch_source_t writeTimer; // Read and write packet arrays are similar to queues, with a maximum of 5 packets - FIFO NSMutableArray *readQueue; NSMutableArray *writeQueue; GCDAsyncReadPacket *currentRead; GCDAsyncWritePacket *currentWrite; Unsigned long socketFDBytesAvailable; // Unsigned long socketFDBytesAvailable; // Global public preBuffer GCDAsyncSocketPreBuffer *preBuffer;#if TARGET_OS_IPHONECFStreamClientContext streamContext; // Read data streams ---- c CFReadStreamRefreadStream; // Write data stream CFWriteStreamRef writeStream;#endifSSLContextRef sslContext; SSLContextRef sslContext; GCDAsyncSocketPreBuffer *sslPreBuffer; size_t sslWriteCachedLength; OSStatus sslErrCode; OSStatus lastSSLHandshakeError; / / socket queue identifies the key - key - queue void * IsOnSocketQueueOrTargetQueueKey; id userData; // Delay in connecting to alternative server addresses (alternative IPV4 or IPV6) NSTimeInterval alternateAddressDelay; }Copy the code

Let’s look at what these properties do

  • Flags: indicates the current status of the socket
// Flags enumeration enum GCDAsyncSocketFlags {kSocketStarted = 1 << 0, // Ifset, socket has been started (accepting/connecting)
    kConnected                     = 1 <<  1,  // If set, the socket is connected
    kForbidReadsWrites             = 1 <<  2,  // If set, no new reads or writes are allowed
    kReadsPaused                   = 1 <<  3,  // If set, reads are paused due to possible timeout
    kWritesPaused                  = 1 <<  4,  // If set, writes are paused due to possible timeout
    kDisconnectAfterReads          = 1 <<  5,  // If set, disconnect after no more reads are queued
    kDisconnectAfterWrites         = 1 <<  6,  // If set, disconnect after no more writes are queued
    kSocketCanAcceptBytes          = 1 <<  7,  // If set, we know socket can accept bytes. If unset, it's unknown. kReadSourceSuspended = 1 << 8, // If set, the read source is suspended kWriteSourceSuspended = 1 << 9, // If set, the write source is suspended kQueuedTLS = 1 << 10, // If set, we've queued an upgrade to TLS
    kStartingReadTLS               = 1 << 11,  // If set, we're waiting for TLS negotiation to complete kStartingWriteTLS = 1 << 12, // If set, we're waiting for TLS negotiation to complete
    kSocketSecure                  = 1 << 13,  // If set, socket is using secure communication via SSL/TLS
    kSocketHasReadEOF              = 1 << 14,  // If set, we have read EOF from socket
    kReadStreamClosed              = 1 << 15,  // If set, we've read EOF plus prebuffer has been drained kDealloc = 1 << 16, // If set, the socket is being deallocated #if TARGET_OS_IPHONE kAddedStreamsToRunLoop = 1 << 17, // If set, CFStreams have been added to listener thread kUsingCFStreamForTLS = 1 << 18, // If set, we're forced to use CFStream instead of SecureTransport
    kSecureSocketHasBytesAvailable = 1 << 19,  // If set, CFReadStream has notified us of bytes available
#endif
};
Copy the code
  • Config: indicates the socket protocol status
// Config enumeration enum GCDAsyncSocketConfig {kIPv4Disabled = 1 << 0, // Ifset, IPv4 is disabled
    kIPv6Disabled              = 1 << 1,  // If set, IPv6 is disabled
    kPreferIPv6                = 1 << 2,  // If set, IPv6 is preferred over IPv4 / / duplex - half duplex kAllowHalfDuplexConnection = 1 < < 3, / / Ifset, the socket will stay open even if the read stream closes
};
Copy the code
  • delegateThe: proxy property, which is stored when a proxy is passed in, is used for some state callback
    • Succeeded in creating the socket. Procedure
    • Socket closed
    • Socket writes data
    • Socket reading data
    • The socket to disconnect
// proxy __weak id<GCDAsyncSocketDelegate> delegate;Copy the code
  • DelegateQueue: The queue used by callbacks
// queue dispatch_queue_t delegateQueue;Copy the code
  • Three kinds of socket
    • The first kind ofsocket4FD, stands foripv4The type of the socket
    • The second,socket6FD, stands foripv6The type of the socket
    • The second type of socketUN is a socket for communication between threads. The other two types of sockets can also implement communication between threads, but this type of socket does not go through the transport layer, which is relatively fast, but is used in fewer places
IPV4Socket int socket4FD; IPV6Socket int socket6FD; Locahost VS 127.0.0.1 // socket int socketUN; // Unix domain server url NSURL *socketUrl; // stateIndex int stateIndex; // IPV4 address of the host interface NSData * connectInterface4; // The local IPV6 address NSData * connectInterface6; // Local Unix domain address NSData * connectInterfaceUN;Copy the code
  • Define objects of type dispatch_source_t to process data for high write performance
// source --> mergdata get_data buffer SSL CFStream // data dispatch_source_t accept4Source; dispatch_source_t accept6Source; dispatch_source_t acceptUNSource; // Connect to dispatch_source_t connectTimer; dispatch_source_treadSource;
dispatch_source_t writeSource;
dispatch_source_t readTimer;
dispatch_source_t writeTimer;
Copy the code
  • Define arrays to read and write so that each piece of data is sequential
// Read and write packet arrays are similar to queues, with a maximum of 5 packets - FIFO NSMutableArray *readQueue;
NSMutableArray *writeQueue;
Copy the code
  • AlternateAddressDelay: Delay for connecting to alternative addresses
// Delay in connecting to alternative server addresses (alternative IPV4 or IPV6) NSTimeInterval alternateAddressDelay;Copy the code

Using the process

/ / create a socketif(self.socket == nil) self.socket = [[GCDAsyncSocket alloc] initWithDelegate:self delegateQueue:dispatch_get_global_queue(0, 0)]; / / a socket connectionif(! self.socket.isConnected){ NSError *error; [self.socket connectToHost:@"127.0.0.1" onPort:8090 withTimeout:-1 error:&error];
    if (error) NSLog(@"% @",error);
}
Copy the code
  • initWithDelegate:delegateQueue:Source code analysis
- (id)initWithDelegate:(id)aDelegate delegateQueue:(dispatch_queue_t)dq
{
    return [self initWithDelegate:aDelegate delegateQueue:dq socketQueue:NULL];
}

- (id)initWithDelegate:(id)aDelegate delegateQueue:(dispatch_queue_t)dq socketQueue:(dispatch_queue_t)sq
{
    if((self = [super init])) { delegate = aDelegate; delegateQueue = dq; OS_OBJECT_USE_OBJC = 0; // This macro was created after sdk6.0. 0 is executedifIf it is less than 6.0, then retain release will be removed. ARC also manages GCD after 6.0#if ! OS_OBJECT_USE_OBJC
        
        if (dq) dispatch_retain(dq);
        #endifSocket4FD = SOCKET_NULL; //ipv6 socket6FD = SOCKET_NULL; // UnixSocket socketUN = SOCKET_NULL; //url socketUrl = nil; // stateIndex = 0;if(sq) {// An error is reported if the scoketQueue is global. An assertion must have a non-parallel queue. NSAssert(sq ! = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0), @"The given socketQueue parameter must not be a concurrent queue."); NSAssert(sq ! = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), @"The given socketQueue parameter must not be a concurrent queue."); NSAssert(sq ! = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), @"The given socketQueue parameter must not be a concurrent queue."); SocketQueue = sq; / / retain under the iOS6#if ! OS_OBJECT_USE_OBJC
            dispatch_retain(sq);
            #endif
        }
        else{// Create a socket with the name GCDAsyncSocket, serial socketQueue = dispatch_queue_create([GCDAsyncSocketQueueName UTF8String], NULL); } // The pointer is equal to the original pointer, which is a secondary pointer. IsOnSocketQueueOrTargetQueueKey = &IsOnSocketQueueOrTargetQueueKey; void *nonNullUnusedPointer = (__bridge void *)self; Dispatch_get_specific dispatch_get_specific dispatch_get_specific dispatch_get_specific dispatch_get_specific dispatch_get_specific dispatch_get_specific dispatch_get_specific dispatch_get_specific Context object? The fourth parameter is for destruction, You can specify a function dispatch_queue_set_specific (socketQueue, IsOnSocketQueueOrTargetQueueKey nonNullUnusedPointer, NULL); // The read array is limited to 5readQueue = [[NSMutableArray alloc] initWithCapacity:5]; currentRead = nil; WriteQueue = [[NSMutableArray alloc] initWithCapacity:5]; writeQueue = [NSMutableArray alloc] initWithCapacity:5]; currentWrite = nil; PreBuffer = [[GCDAsyncSocketPreBuffer alloc] initWithCapacity:(1024 * 4)];#pragma mark alternateAddressDelay??// Alternate address delay?? WTF alternateAddressDelay = 0.3; }return self;
}
Copy the code

InitWithDelegate: delegateQueue function is relatively simple, mainly to do some initialization work

ConnectToHost: onPort: withTimeout: error: connection socket is the main function of the entrance

- (BOOL)connectToHost:(NSString *)host
                onPort:(uint16_t)port
                withTimeout:(NSTimeInterval)timeout
                error:(NSError **)errPtr
{
    return[self connectToHost:host onPort:port viaInterface:nil withTimeout:timeout error:errPtr]; } // One morein- (BOOL)connectToHost:(NSString *)inHost
                onPort:(uint16_t)port
                viaInterface:(NSString *)inInterface withTimeout:(NSTimeInterval)timeout error:(NSError **)errPtr { LogTrace(); / / {}???? What's the point? -- Tracking current behavior // Justin caseNSString *host = [immutable objects are passed.inHost copy]; / / interface? Interface? NSString *interface = [inInterface copy]; // Declare two __block BOOL result = NO; // Error message __block NSError *preConnectErr = nil; //gcdBlock, all wrapped in the automatic release pool: // 1: a large number of temporary variables connect: reconnect // 2: custom thread management: nsoperation // 3: Non-ui command tool dispatch_block_t block = ^{@autoreleasepool {// Checkfor problems with host parameter
        
        if ([host length] == 0)
        {
            NSString *msg = @"Invalid host parameter (nil or \"\"). Should be a domain name or IP address string."; preConnectErr = [self badParamError:msg]; // This is the casereturnDaniel's code is really full of gray-ret // asynchronyreturn--> function -- compiled // macro definition -- precompiled --return- return_from_block in advance; } // Run through standard pre-connect checks // Run through standard pre-connect checks // Run through standard pre-connect checks // Run through standard pre-connect checks // // Arguments: Pointers operate on the same memory spaceif(! [self preConnectWithInterface:interface error:&preConnectErr]) { return_from_block; } // We've made it past all the checks. // It'S time to start the connection process. //flags Flags are identified as start Socket connection flags | = kSocketStarted; // Another {}? Is it just for marking? LogVerbose(@"Dispatching DNS lookup...");
        
        // It's possible that the given host parameter is actually a NSMutableString. // So we want to copy It now, within this block that will be executed synchronously. This way the asynchronous lookup Block below doesn'tNSString *hostCpy = [host copy]; Int aStateIndex = stateIndex; __weak GCDAsyncSocket *weakSelf = self; Server <-- > server dispatch_queue_t globalConcurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); // Execute dispatch_async(globalConcurrentQueue, ^{@autoreleasepool {// ignore circular references#pragma clang diagnostic push
        #pragma clang diagnostic warning "-Wimplicit-retain-self"// Find error NSError *lookupErr = nil; // Server address array (including IPV4 and IPV6 addresses sockaddr_in6 and sockaddr_in types) NSMutableArray *addresses = [[self class] lookupHost:hostCpy port:port error:&lookupErr]; //strongSelf __strong GCDAsyncSocket *strongSelf = weakSelf; // Complete Block security form, add aif
        if(strongSelf == nil) return_from_block; // If there is an errorif(lookupErr) {// cocketQueue dispatch_async(strongSelf->socketQueue, ^{@autoreleasepool {// Some error handling, StrongSelf Lookup :aStateIndex didFail:lookupErr }}); } / / normalelse{ NSData *address4 = nil; NSData *address6 = nil; // Iterate over the address arrayfor (NSData *address inAddresses) {// Check whether address4 is empty and address is IPV4if(! address4 && [[self class] isIPv4Address:address]) { address4 = address; } // Determine if address6 is empty and address is IPV6else if(! address6 && [[self class] isIPv6Address:address]) { address6 = address; } // Async dispatch_async(strongSelf->socketQueue, ^{ @autoreleasepool { [strongSelf lookup:aStateIndex didSucceedWithAddress4:address4 address6:address6]; }}); }#pragma clang diagnostic pop}}); / / open the connection timeout [self startConnectTimeout: timeout]; result = YES; }}; // Execute this Block in socketQueueif(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey)) block(); // If not, the queue will be synchronizedelsedispatch_sync(socketQueue, block); // If there is an error, the assignment is incorrectif(errPtr) *errPtr = preConnectErr; // Return result on whether the connection was successfulreturn result;
}
Copy the code

So let’s break it down, so LogTrace() is a macro that has two cases

// In debug state#define LogTrace() LogObjc(LOG_FLAG_VERBOSE, @"%@: %@", THIS_FILE, THIS_METHOD)// No debugging state#define LogTrace() {}
Copy the code

If you fold up the code inside the block, it’s pretty simple, so let’s focus on the code inside the block

dispatch_block_t block = ^{ @autoreleasepool {
    
    // Check for problems with host parameter
    
    if ([host length] == 0)
    {
        NSString *msg = @"Invalid host parameter (nil or \"\"). Should be a domain name or IP address string."; preConnectErr = [self badParamError:msg]; / / asynchronicityreturn--> function -- compiled // macro definition -- precompiled --return- return_from_block in advance; } // Run through standard pre-connect checks // Run through standard pre-connect checks // Run through standard pre-connect checks // Run through standard pre-connect checks // // Arguments: Pointers operate on the same memory spaceif(! [self preConnectWithInterface:interface error:&preConnectErr]) { return_from_block; } // We've made it past all the checks. // It'S time to start the connection process. //flags Flags are identified as start Socket connection flags | = kSocketStarted; // Another {}? Is it just for marking? LogVerbose(@"Dispatching DNS lookup...");
    
    // It's possible that the given host parameter is actually a NSMutableString. // So we want to copy It now, within this block that will be executed synchronously. This way the asynchronous lookup Block below doesn'tNSString *hostCpy = [host copy]; Int aStateIndex = stateIndex; __weak GCDAsyncSocket *weakSelf = self; Server <-- > server dispatch_queue_t globalConcurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); // Execute dispatch_async(globalConcurrentQueue, ^{@autoreleasepool {// ignore circular references#pragma clang diagnostic push
        #pragma clang diagnostic warning "-Wimplicit-retain-self"// Find error NSError *lookupErr = nil; // Server address array (including IPV4 and IPV6 addresses sockaddr_in6 and sockaddr_in types) NSMutableArray *addresses = [[self class] lookupHost:hostCpy port:port error:&lookupErr]; //strongSelf __strong GCDAsyncSocket *strongSelf = weakSelf; // Complete Block security form, add aif
        if(strongSelf == nil) return_from_block; // If there is an errorif(lookupErr) {// cocketQueue dispatch_async(strongSelf->socketQueue, ^{@autoreleasepool {// Some error handling, StrongSelf Lookup :aStateIndex didFail:lookupErr }}); } / / normalelse{ NSData *address4 = nil; NSData *address6 = nil; // Iterate over the address arrayfor (NSData *address inAddresses) {// Check whether address4 is empty and address is IPV4if(! address4 && [[self class] isIPv4Address:address]) { address4 = address; } // Determine if address6 is empty and address is IPV6else if(! address6 && [[self class] isIPv6Address:address]) { address6 = address; } // Async dispatch_async(strongSelf->socketQueue, ^{ @autoreleasepool { [strongSelf lookup:aStateIndex didSucceedWithAddress4:address4 address6:address6]; }}); }#pragma clang diagnostic pop}}); / / open the connection timeout [self startConnectTimeout: timeout]; result = YES; }};Copy the code

Return_from_block macro definition, why do we need to define a new macro to represent a return, because then we can precompile the return into it, so that we don’t have to exit the block before we compile it, but that’s rare

/**
 * Seeing a return statements within an inner block
 * can sometimes be mistaken for a return point of the enclosing method.
 * This makes inline blocks a bit easier to read.
**/
#define return_from_block return
Copy the code

PreConnectWithInterface: error: Prechecks if the interface has a value

// Check the interface before connecting, - (BOOL)preConnectWithInterface:(NSString *)interface error:(NSError **)errPtr {// socket - connect / / assertions first, if the queue is not initialized quueue, direct error NSAssert (dispatch_get_specific (IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue"); / / no agentif (delegate == nil) // Must have delegate set
    {
        if (errPtr)
        {
            NSString *msg = @"Attempting to connect without a delegate. Set a delegate first.";
            *errPtr = [self badConfigError:msg];
        }
        returnNO; } // There is no proxy queueif (delegateQueue == NULL) // Must have delegate queue set
    {
        if (errPtr)
        {
            NSString *msg = @"Attempting to connect without a delegate queue. Set a delegate queue first.";
            *errPtr = [self badConfigError:msg];
        }
        returnNO; } // The current state is not disconnectedif(! [self isDisconnected]) // Must be disconnected {if (errPtr)
        {
            NSString *msg = @"Attempting to connect while connected or accepting connections. Disconnect first.";
            *errPtr = [self badConfigError:msg];
        }
        returnNO; } // Check whether IPV4 IPV6 & is supported. Since enumerations are defined with the left shift << operation, it can be used to check that the config package does not contain an enumeration. BOOL isIPv4Disabled = (config & kIPv4Disabled)? BOOL isIPv4Disabled = (config & kIPv4Disabled) YES : NO; BOOL isIPv6Disabled = (config & kIPv6Disabled) ? YES : NO; // Whether neither is supportedif (isIPv4Disabled && isIPv6Disabled) // Must have IPv4 or IPv6 enabled
    {
        if (errPtr)
        {
            NSString *msg = @"Both IPv4 and IPv6 have been disabled. Must enable at least one protocol first.";
            *errPtr = [self badConfigError:msg];
        }
        returnNO; } // If there is interface, the local addressif(interface) { NSMutableData *interface4 = nil; NSMutableData *interface6 = nil; / / get the native IPV4 IPV6 address [self getInterfaceAddress4: & interface4 address6: & interface6 fromDescription: interface port: 0]; // If both are nilif ((interface4 == nil) && (interface6 == nil))
        {
            if (errPtr)
            {
                NSString *msg = @"Unknown interface. Specify valid interface by name (e.g. \"en1\") or IP address.";
                *errPtr = [self badParamError:msg];
            }
            return NO;
        }
        
        if (isIPv4Disabled && (interface6 == nil))
        {
            if (errPtr)
            {
                NSString *msg = @"IPv4 has been disabled and specified interface doesn't support IPv6.";
                *errPtr = [self badParamError:msg];
            }
            return NO;
        }
        
        if (isIPv6Disabled && (interface4 == nil))
        {
            if (errPtr)
            {
                NSString *msg = @"IPv6 has been disabled and specified interface doesn't support IPv4.";
                *errPtr = [self badParamError:msg];
            }
            returnNO; } // connectInterface4 = interface4; connectInterface6 = interface6; } // Clear queues (spuriousread// Write requests post disconnect) // Clear queue (false read /write requests post disconnect)readQueue removeAllObjects];
    [writeQueue removeAllObjects];
    
    return YES;
}
Copy the code

After detection, the global concurrent queue is retrieved asynchronously to create a thread, and lookupHost:port:error: function to retrieve the server address, recursively

Server <-- > server dispatch_queue_t globalConcurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); // Execute dispatch_async(globalConcurrentQueue, ^{@autoreleasepool {// ignore circular references#pragma clang diagnostic push
    #pragma clang diagnostic warning "-Wimplicit-retain-self"// Find error NSError *lookupErr = nil; // Server address array (including IPV4 and IPV6 addresses sockaddr_in6 and sockaddr_in types) NSMutableArray *addresses = [[self class] lookupHost:hostCpy port:port error:&lookupErr]; //strongSelf __strong GCDAsyncSocket *strongSelf = weakSelf; // Complete Block security form, add aif
    if(strongSelf == nil) return_from_block; // If there is an errorif(lookupErr) {// cocketQueue dispatch_async(strongSelf->socketQueue, ^{@autoreleasepool {// Some error handling, StrongSelf Lookup :aStateIndex didFail:lookupErr }}); } / / normalelse{ NSData *address4 = nil; NSData *address6 = nil; // Iterate over the address arrayfor (NSData *address inAddresses) {// Check whether address4 is empty and address is IPV4if(! address4 && [[self class] isIPv4Address:address]) { address4 = address; } // Determine if address6 is empty and address is IPV6else if(! address6 && [[self class] isIPv6Address:address]) { address6 = address; } // Async dispatch_async(strongSelf->socketQueue, ^{ @autoreleasepool { [strongSelf lookup:aStateIndex didSucceedWithAddress4:address4 address6:address6]; }}); }#pragma clang diagnostic pop
}});
Copy the code

Determine if the returned address is incorrect, if yes, assign the value to the member variable

// If there is an errorif(lookupErr) {// cocketQueue dispatch_async(strongSelf->socketQueue, ^{@autoreleasepool {// Some error handling, StrongSelf Lookup :aStateIndex didFail:lookupErr }}); } / / normalelse{ NSData *address4 = nil; NSData *address6 = nil; // Iterate over the address arrayfor (NSData *address inAddresses) {// Check whether address4 is empty and address is IPV4if(! address4 && [[self class] isIPv4Address:address]) { address4 = address; } // Determine if address6 is empty and address is IPV6else if(! address6 && [[self class] isIPv6Address:address]) { address6 = address; } // Async dispatch_async(strongSelf->socketQueue, ^{ @autoreleasepool { [strongSelf lookup:aStateIndex didSucceedWithAddress4:address4 address6:address6]; }}); }Copy the code

Asynchronous serial queue to connect

Dispatch_async (strongSelf->socketQueue, ^{ @autoreleasepool { [strongSelf lookup:aStateIndex didSucceedWithAddress4:address4 address6:address6]; }});Copy the code

A series of misjudgments and then connecting

- (void)lookup:(int)aStateIndex didSucceedWithAddress4:(NSData *)address4 address6:(NSData *)address6
{
    LogTrace();
    
    NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue"); / / at least one server address NSAssert (address4 | | address6, @"Expected at least one valid address"); // If the status is inconsistent, the connection is disconnectedif(aStateIndex ! = stateIndex) { LogInfo(@"Ignoring lookupDidSucceed, already disconnected");
        
        // The connect operation has been cancelled.
        // That is, socket was disconnected, or connection has already timed out.
        return;
    }
    
    // Check forProblems // Separate judge. BOOL isIPv4Disabled = (config & kIPv4Disabled) ? YES : NO; BOOL isIPv6Disabled = (config & kIPv6Disabled) ? YES : NO;if (isIPv4Disabled && (address6 == nil))
    {
        NSString *msg = @"IPv4 has been disabled and DNS lookup found no IPv6 address.";
        
        [self closeWithError:[self otherError:msg]];
        return;
    }
    
    if (isIPv6Disabled && (address4 == nil))
    {
        NSString *msg = @"IPv6 has been disabled and DNS lookup found no IPv4 address.";
        
        [self closeWithError:[self otherError:msg]];
        return; } // Start the normal connection process NSError *err = nil; // Calls the connection method, and returns an error if it failsif (![self connectWithAddress4:address4 address6:address6 error:&err])
    {
        [self closeWithError:err];
    }
}
Copy the code

ConnectWithAddress4: address6: error: The function is used to check whether the socket is connected

- (BOOL)connectWithAddress4:(NSData *)address4 address6:(NSData *)address6 error:(NSError **)errPtr
{
    LogTrace();
    
    NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue"); // Output something? LogVerbose(@"IPv4: %@:%hu", [[self class] hostFromAddress:address4], [[self class] portFromAddress:address4]);
    LogVerbose(@"IPv6: %@:%hu", [[self class] hostFromAddress:address6], [[self class] portFromAddress:address6]);
    
    // Determine socket typeBOOL preferIPv6 = (config & kPreferIPv6)? YES : NO; // Create andbind// If an IPV4 address is available, create an IPV4 Socketif (address4)
    {
        LogVerbose(@"Creating IPv4 socket"); socket4FD = [self createSocket:AF_INET connectInterface:connectInterface4 errPtr:errPtr]; } // If there is an IPV6 address, create an IPV6 Socketif (address6)
    {
        LogVerbose(@"Creating IPv6 socket"); socket6FD = [self createSocket:AF_INET6 connectInterface:connectInterface6 errPtr:errPtr]; } // If both are empty, return directlyif (socket4FD == SOCKET_NULL && socket6FD == SOCKET_NULL)
    {
        returnNO; } int alternateSocketFD, alternateSocketFD; NSData *address, *alternateAddress; //IPV6if((preferIPv6 && socket6FD) || socket4FD == SOCKET_NULL) { socketFD = socket6FD; alternateSocketFD = socket4FD; address = address6; alternateAddress = address4; } // Primary IPV4else{ socketFD = socket4FD; alternateSocketFD = socket6FD; address = address4; alternateAddress = address6; } int aStateIndex = stateIndex; // socket & connect [self connectSocket:socketFD address:address stateIndex:aStateIndex]; // If there is an alternate addressif(alternateAddress) {// Delay to connect to alternative address dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(alternateAddressDelay * NSEC_PER_SEC)), socketQueue, ^{ [self connectSocket:alternateSocketFD address:alternateAddress stateIndex:aStateIndex]; }); }return YES;
    }
Copy the code

CreateSocket: connectInterface: errPtr: Creates a socket

// createSocket - (int)createSocket:(int)family connectInterface:(NSData *)connectInterface errPtr:(NSError **)errPtr { Int socketFD = socket(family, SOCK_STREAM, 0); // If the creation failsif (socketFD == SOCKET_NULL)
    {
        if (errPtr)
        *errPtr = [self errnoErrorWithReason:@"Error in socket() function"];
        
        returnsocketFD; } // Bind to connectInterfaceif(! [selfbindSockets: socketFD toInterface: connectInterface error: errPtr]) {/ / bind failure, directly close returns [self closeSocket: socketFD];returnSOCKET_NULL; } // Prevent SIGPIPE signals // int nosigpipe = 1; //SO_NOSIGPIPE is used to prevent the process from exiting due to network errors. Use this to avoid sending signal setsockopt(socketFD, SOL_SOCKET, SO_NOSIGPIPE, & nosigPIPE, sizeof(nosigPIPE));return socketFD;
}
Copy the code

ConnectSocket: address: stateIndex: function creates a global concurrent queue to call the connect () function to connect

// Connect final method 3 finnal... - (void)connectSocket:(int)socketFD address:(NSData *)address stateIndex:(int)aStateIndex { // If there already is a socket connected, we close socketFD andreturn// If yes, close the connectionif (self.isConnected)
    {
        [self closeSocket:socketFD];
        return;
    }
    
    // Start the connection process inA background queue __weak GCDAsyncSocket *weakSelf = self; Dispatch_queue_t globalConcurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); New thread dispatch_async(globalConcurrentQueue, ^{#pragma clang diagnostic push
    #pragma clang diagnostic warning "-Wimplicit-retain-self"// The client sends a connection request to the server at a specific network address. Returns 0 on success, -1 on failure. int result = connect(socketFD, (const struct sockaddr *)[address bytes], (socklen_t)[address length]); __strong GCDAsyncSocket *strongSelf = weakSelf;if(strongSelf == nil) return_from_block; // In socketQueue, open thread dispatch_async(strongSelf->socketQueue, ^{@autoreleasepool {// If the status is connected, close the connection returnif(strongSelf.isConnected) { [strongSelf closeSocket:socketFD]; return_from_block; } // The connection is successfulif(the result = = 0) {/ / shut off another useless socket [self closeUnusedSocket: socketFD]; // call didConnect, generate stream, change state, etc! [strongSelf didConnect:aStateIndex]; } // Connection failedelse{// Close current socket [strongSelf closeSocket:socketFD]; // If there are no more sockets trying to connect, we inform the error to the delegateif (strongSelf.socket4FD == SOCKET_NULL && strongSelf.socket6FD == SOCKET_NULL)
                {
                    NSError *error = [strongSelf errnoErrorWithReason:@"Error in connect() function"]; [strongSelf didNotConnect:aStateIndex error:error]; }}}});#pragma clang diagnostic pop}); // The output is in connection LogVerbose(@"Connecting...");
}
Copy the code

If the connection is successful, change the state of the current socket and call the didConnect: method

- (void)didConnect:(int)aStateIndex {LogTrace(); NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue"); // State is differentif(aStateIndex ! = stateIndex) { LogInfo(@"Ignoring didConnect, already disconnected");
        
        // The connect operation has been cancelled.
        // That is, socket was disconnected, or connection has already timed out.
        return; } / / kConnected merged into the current flag flags | = kConnected; // Stop connection timeout [self endConnectTimeout];#if TARGET_OS_IPHONE// The endConnectTimeout Method executed above incremented The stateIndex. So we need to reassign aStateIndex = stateIndex;#endif
    
    // Setup read/write streams (as workaround for specific shortcomings in the iOS platform)
    // 
    // Note:
    // There may be configuration options that must be setBy the delegate before opening the streams. // The delegate must be set with the relevant configuration before opening the stream kCFStreamNetworkServiceTypeVoIP flag,whichOnly works on an unopened stream. / / the main examples are kCFStreamNetworkServiceTypeVoIP tag, can only work in not open stream? // // Thus wewaituntil after the socket:didConnectToHost:port: The delegate method has completed. // The delegate method has completed. // The delegate method has completedifNeeded. // These give the agent time to properly configure the Stream if necessary // create a Block to initialize the Stream dispatch_block_t SetupStreamsPart1 = ^{NSLog(@"hello~");
#if TARGET_OS_IPHONE// Fail to create read/write streamif(! [self createReadAndWriteStream]) { [self closeWithError:[self otherError:@"Error creating CFStreams"]].return; } // The argument is given to NO, meaning that the callback function is not called when bytes are availableif(! [self registerForStreamCallbacksIncludingReadWrite:NO]) { [self closeWithError:[self otherError:@"Error in CFStreamSetClient"]].return;
    }
    
#endif}; //part2 set stream dispatch_block_t SetupStreamsPart2 = ^{#if TARGET_OS_IPHONE// Return if the status is differentif(aStateIndex ! = stateIndex) { // The socket has been disconnected.return; } // If adding to runloop failsif(! [self addStreamsToRunLoop]) {// Error returns [self closeWithError:[self otherError:@"Error in CFStreamScheduleWithRunLoop"]].return; } // Read stream openif(! [self openStreams]) {// Enable error returns [self closeWithError:[self otherError:@"Error creating CFStreams"]].return;
        }
    
#endif}; // Notify delegate // host port NSString *host = [self connectedHost]; uint16_t port = [self connectedPort]; NSURL *url = [self connectedUrl]; __strong id theDelegate = delegate; // Proxy queue and Host are not nil and respond to didConnectToHost proxy methodif(delegateQueue && host ! = nil && [theDelegate respondsToSelector: @ the selector (socket: didConnectToHost: port:)]) {/ / call the initialize stream1 SetupStreamsPart1(); dispatch_async(delegateQueue, ^{@autoreleasepool {// call theDelegate method to theDelegate queue successfully connected [theDelegate socket:self didConnectToHost:host port:port]; Stream2 dispatch_async(socketQueue, ^{autoreleasepool {SetupStreamsPart2(); }}); }}); } // This is a Unix domain request callbackelse if(delegateQueue && url ! = nil && [theDelegate respondsToSelector:@selector(socket:didConnectToUrl:)]) { SetupStreamsPart1(); dispatch_async(delegateQueue, ^{ @autoreleasepool { [theDelegate socket:self didConnectToUrl:url]; dispatch_async(socketQueue, ^{ @autoreleasepool { SetupStreamsPart2(); }}); }}); } // Otherwise, only stream is initializedelse{ SetupStreamsPart1(); SetupStreamsPart2(); } // Get the connected socket int socketFD = (socket4FD ! = SOCKET_NULL) ? socket4FD : (socket6FD ! = SOCKET_NULL) ? socket6FD : socketUN; // FCNTL, function description: according to the file descriptor to operate on the file feature. HTTP: / / http://blog.csdn.net/pbymw8iwm/article/details/7974789 / / Enable non - blocking IO on the socket / / socket supports non-blocking IO int result = fcntl(socketFD, F_SETFL, O_NONBLOCK);if(result == -1) {// error NSString *errMsg = @"Error enabling non-blocking IO on socket (fcntl)";
        [self closeWithError:[self otherError:errMsg]];
        
        return;
    }
    
    // Setup our read/write sources // Initialize read and writesource
    [self setupReadAndWriteSourcesForNewlyConnectedSocket:socketFD];
    
    // Dequeue any pending read// Write requests // start the next task [self maybeDequeueRead]; [self maybeDequeueWrite]; }Copy the code

This method does the following things:

  • Add the current state flags to connected and turn off the connection timeout timer that we started with
/ / kConnected merged into the current flag flags | = kConnected; // Stop connection timeout [self endConnectTimeout];Copy the code
  • Two blocks are initialized:SetupStreamsPart1,SetupStreamsPart2Both blocks do things related to read and write flows. SetupStreamsPart1 is used to create read/write streams and register callbacks. The other SetupStreamsPart2 is used to add the stream to the runloop of the current thread and open the stream.
// create a Block to initialize Stream dispatch_block_t SetupStreamsPart1 = ^{... }; //part2 set stream dispatch_block_t SetupStreamsPart2 = ^{... };Copy the code
  • Determine if there is an agentqueue,hostorurlWhether these parameters are null and whether the proxy respondsdidConnectToHostordidConnectToUrlProxy, which correspond to normal socket connection and socket connection respectivelyunix domin socketThe connection. If the corresponding proxy is implemented, the successfully connected proxy is invoked.
// Proxy queue and Host are not nil and respond to didConnectToHost proxy methodif(delegateQueue && host ! = nil && [theDelegate respondsToSelector:@selector(socket:didConnectToHost:port:)]) { ... } // This is a Unix domain request callbackelse if(delegateQueue && url ! = nil && [theDelegate respondsToSelector:@selector(socket:didConnectToUrl:)]) { ... } // Otherwise, only stream is initializedelse{... }Copy the code
  • While calling the proxy, we call the two read-write flow-related blocks that we initialized earlier.
// call to initialize stream1 SetupStreamsPart1(); dispatch_async(delegateQueue, ^{@autoreleasepool {// call theDelegate method to theDelegate queue successfully connected [theDelegate socket:self didConnectToHost:host port:port]; Stream2 dispatch_async(socketQueue, ^{autoreleasepool {SetupStreamsPart2(); }}); }});Copy the code
  • Enable the socket to support non-blocking I/O, which blocks I/O by default
IO int result = FCNTL (socketFD, F_SETFL, O_NONBLOCK);Copy the code
  • Initialize read and write source
[self setupReadAndWriteSourcesForNewlyConnectedSocket:socketFD];
Copy the code

The createReadAndWriteStream() function inside block (SetupStreamsPart1) creates a read-write stream

Stream - (BOOL)createReadAndWriteStream {LogTrace(); NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue"); // If one of them has a value, return itif (readStream || writeStream)
    {
        // Streams already created
        returnYES; Int socketFD = (socket4FD! = SOCKET_NULL) ? socket4FD : (socket6FD ! = SOCKET_NULL) ? socket6FD : socketUN; // If both are empty, return NOif (socketFD == SOCKET_NULL)
    {
        // Cannot create streams without a file descriptor
        returnNO; } // If it is not connected, return NOif(! [self isConnected]) { // Cannot create streams until file descriptor is connectedreturn NO;
    }
    
    LogVerbose(@"Creating read and write stream...");
    
Pragma mark - binds Socket and CFStream/ / the following interface is used to create a socket stream, a used to read and write: one for CFStreamCreatePairWithSocket (NULL, CFSocketNativeHandle socketFD, &readStream, &writeStream);
    
    // The kCFStreamPropertyShouldCloseNativeSocket property should be false by default (for our case).
    // But let's not take any chances. // Both read and write streams are set to not close or release with the bound socket. KCFBooleanFalse not together, kCFBooleanTrue togetherif (readStream)
        CFReadStreamSetProperty(readStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse);
    if(writeStream) CFWriteStreamSetProperty(writeStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse); // If one of them is nullif ((readStream == NULL) || (writeStream == NULL))
    {
        LogWarn(@"Unable to create read and write stream..."); // Close the corresponding streamif (readStream)
        {
            CFReadStreamClose(readStream);
            CFRelease(readStream);
            readStream = NULL;
        }
        if(writeStream) { CFWriteStreamClose(writeStream); CFRelease(writeStream); writeStream = NULL; } // Create failedreturnNO; } // Created successfullyreturn YES;
}
Copy the code

The code in the block (SetupStreamsPart1 registerForStreamCallbacksIncludingReadWrite () function: registered stream

/ / register Stream callback - (BOOL) registerForStreamCallbacksIncludingReadWrite: (BOOL) includeReadWrite {LogVerbose (@"% @ % @", THIS_METHOD, (includeReadWrite ? @"YES" : @"NO"));
    
    NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue"); NSAssert(()readStream ! = NULL && writeStream ! = NULL), @"Read/Write stream is null"); // The client stream context object streamContext.version = 0; streamContext.info = (__bridge void *)(self); streamContext.retain = nil; streamContext.release = nil; streamContext.copyDescription = nil; // The open has completed successfully. // The stream has bytes to be read. // The stream can accept bytesforWriting. // An error has occurred on the stream. // The end of the stream has been reached. One is that the stream event ends CFOptionFlagsreadStreamEvents = kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered ; // If it contains read and writeif(includeReadWrite) // The stream has Bytes to be read.readStreamEvents |= kCFStreamEventHasBytesAvailable; // Setting up the client to read the stream will call CFReadStreamCallback on the previously set flags. If the setting fails, return NOif(! CFReadStreamSetClient(readStream, readStreamEvents, &CFReadStreamCallback, &streamContext))
    {
        returnNO; } / / write flag, as well as CFOptionFlags writeStreamEvents = kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered;if (includeReadWrite)
    writeStreamEvents |= kCFStreamEventCanAcceptBytes;
    
    if(! CFWriteStreamSetClient(writeStream, writeStreamEvents, &CFWriteStreamCallback, &streamContext)) {returnNO; } // The callback is set to YESreturn YES;
}
Copy the code

The addStreamsToRunLoop function inside block (SetupStreamsPart2) adds stream to runloop

// Add stream to runloop - (BOOL)addStreamsToRunLoop {LogTrace(); NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
    NSAssert((readStream ! = NULL && writeStream ! = NULL), @"Read/Write stream is null"); // Check whether flag contains kAddedStreamsToRunLoop.if(! (flags & kAddedStreamsToRunLoop)) { LogVerbose(@"Adding streams to runloop..."); [[self class] startCFStreamThreadIfNeeded]; [[self class] performSelector:@selector(scheduleCFStreams:) onThread:cfstreamThread withObject:selfwaitUntilDone:YES]; / / add logo flags | = kAddedStreamsToRunLoop; }return YES;
}
Copy the code

The openStreams function inside block (SetupStreamsPart2) opens the stream

// Open stream - (BOOL)openStreams {LogTrace(); NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue"); NSAssert()readStream ! = NULL && writeStream ! = NULL), @"Read/Write stream is null"); /* kCFStreamStatusNotOpen = 0, kCFStreamStatusOpening, // open isin-progress
    kCFStreamStatusOpen,
    kCFStreamStatusReading,
    kCFStreamStatusWriting,
    kCFStreamStatusAtEnd,    // no further bytes can be read/written
    kCFStreamStatusClosed,
    kCFStreamStatusError
    */
    CFStreamStatus readStatus = CFReadStreamGetStatus(readStream); CFStreamStatus writeStatus = CFWriteStreamGetStatus(writeStream); // If any of them are not enabledif ((readStatus == kCFStreamStatusNotOpen) || (writeStatus == kCFStreamStatusNotOpen))
    {
        LogVerbose(@"Opening read and write stream..."); -nsstream BOOL r1 = CFReadStreamOpen(readStream); BOOL r2 = CFWriteStreamOpen(writeStream); // One startup failedif(! r1 || ! r2) { LogError(@"Error in CFStreamOpen");
            returnNO; }}return YES;
}
Copy the code

To register a stream callback, look at CFReadStreamCallback and call doReadData when the data is read

Static void CFReadStreamCallback (CFReadStreamRef stream, CFStreamEventTypetype, void *pInfo) {// Get the trigger callback sokcet GCDAsyncSocket *asyncSocket = (__bridge GCDAsyncSocket *)pInfo; switch(type) {// If it is a callback to readable datacasekCFStreamEventHasBytesAvailable: {// Call dispatch_async(asyncSocket->socketQueue, ^{@autoreleasepool {LogCVerbose(@)"CFReadStreamCallback - HasBytesAvailable"); // If it is not the same stream, return it directlyif (asyncSocket->readStream ! = stream) return_from_block; // If TLS is being initialized, go to the handshake firstif ((asyncSocket->flags & kStartingReadTLS) && (asyncSocket->flags & kStartingWriteTLS))
                {
                    // If we set kCFStreamPropertySSLSettings before we opened the streams, this might be a lie.
                    // (A callback related to the tcp stream, but not to the SSL layer).
                    
                    if (CFReadStreamHasBytesAvailable(asyncSocket->readStream)) { asyncSocket->flags |= kSecureSocketHasBytesAvailable; [asyncSocket cf_finishSSLHandshake]; }} // to read dataelse
                {
                    asyncSocket->flags |= kSecureSocketHasBytesAvailable;
                    [asyncSocket doReadData]; }}});break; } // This is an error callback default: {// get error NSError *error = (__bridge_transfer NSError *)CFReadStreamCopyError(stream); // Error reaching stream tailif (error == nil && type == kCFStreamEventEndEncountered)
            {
                error = [asyncSocket connectionClosedError];
            }
            
            dispatch_async(asyncSocket->socketQueue, ^{ @autoreleasepool {
            
                LogCVerbose(@"CFReadStreamCallback - Other");
                
                if (asyncSocket->readStream ! = stream) return_from_block; // If SSL authentication is being performedif((asyncSocket->flags & kStartingReadTLS) && (asyncSocket->flags & kStartingWriteTLS)) {// Then disable SSL, An error [asyncSocket cf_abortSSLHandshake: error]; }else{// Directly close [asyncSocket closeWithError:error]; }}});break; }}}Copy the code

The doReadData function here processes the read data, and then calls the completeCurrentRead method to delegate the data out

// Read data - (void)doReadData
{
    ...
    if (done) {// Complete the data read [self completeCurrentRead]; // If there is no error, there is still readable data in the prebufferif(! error && (! SocketEOF | | [preBuffer availableBytes] > 0)) {/ / to read operation leave, continue to the next read [self maybeDequeueRead]; }}... }Copy the code
// Finish reading data this time - (void)completeCurrentRead {LogTrace(); // Assert currentRead NSAssert(currentRead, @"Trying to complete current read when there is no current read."); NSData *result = nil; // If the Buffer is created by ourselvesif(currentRead->bufferOwner) { // We created the buffer on behalf of the user. // Trim our buffer to be the proper size. // Trim the buffer to the appropriate size // set the size to the size we readsetLength:currentRead->bytesDone]; // assign a value to result result = currentRead->buffer; }else
    {
        // We did NOT create the buffer.
        // The buffer is owned by the caller.
        // Only trim the buffer ifWe had to increase its size. // This is the caller's data, we just increase the sizeif([currentRead->buffer Length] > currentRead->originalBufferLength) {// Get the read size NSUIntegerreadSize = currentRead->startOffset + currentRead->bytesDone; NSUInteger origSize = currentRead->originalBufferLength; // Get maximum NSUInteger buffSize = MAX(readSize, origSize); // Set the buffer to a larger size [currentRead->buffer]setLength:buffSize]; // Uint8_t *buffer = (uint8_t *)[currentRead->buffer mutableBytes] + currentRead->startOffset; //reslut is from the beginning of the pointer to the length of the write freeWhenDone is YES, Create the release of buffer result = [NSData dataWithBytesNoCopy: buffer length: currentRead - > bytesDone freeWhenDone: NO]; } __strong id theDelegate = delegate;#pragma mark - Finally called the proxy method and received the data
    if(delegateQueue && [theDelegate respondsToSelector: @ the selector (socket: didReadData: withTag:)]) {/ / get the current data packets GCDAsyncReadPacket *theRead = currentRead; // Ensure currentRead retained since result may not own buffer dispatch_async(delegateQueue, ^{@autoreleasepool {// Call result back from the agent queue. [theDelegate socket:self didReadData:result withTag:theRead->tag]; }}); } // Cancel read timeout [self endCurrentRead]; }Copy the code

CFWriteStreamCallback is handled much the same way as CFReadStreamCallback

Static void CFWriteStreamCallback (CFWriteStreamRef stream, CFStreamEventTypetype, void *pInfo)
{
    GCDAsyncSocket *asyncSocket = (__bridge GCDAsyncSocket *)pInfo;
    
    switch(type) {// if you can write sectioncase kCFStreamEventCanAcceptBytes:
        {
            dispatch_async(asyncSocket->socketQueue, ^{ @autoreleasepool {
            
                    LogCVerbose(@"CFWriteStreamCallback - CanAcceptBytes");
                    
                    if(asyncSocket->writeStream ! = stream) return_from_block;if ((asyncSocket->flags & kStartingReadTLS) && (asyncSocket->flags & kStartingWriteTLS))
                    {
                        // If we setkCFStreamPropertySSLSettings before we opened the streams, This might be a lie. // (a callback related to the TCP stream, but not to the SSL layer). // Check whether sokcet can write data without blockingif(CFWriteStreamCanAcceptBytes(asyncSocket->writeStream)) { asyncSocket->flags |= kSocketCanAcceptBytes; [asyncSocket cf_finishSSLHandshake]; }}else
                    {
                        asyncSocket->flags |= kSocketCanAcceptBytes;
                        [asyncSocket doWriteData]; }}});break;
        }
        default:
        {
            NSError *error = (__bridge_transfer NSError *)CFWriteStreamCopyError(stream);
            
            if (error == nil && type == kCFStreamEventEndEncountered)
            {
                error = [asyncSocket connectionClosedError];
            }
            
            dispatch_async(asyncSocket->socketQueue, ^{ @autoreleasepool {
            
                LogCVerbose(@"CFWriteStreamCallback - Other");
                
                if(asyncSocket->writeStream ! = stream) return_from_block;if ((asyncSocket->flags & kStartingReadTLS) && (asyncSocket->flags & kStartingWriteTLS))
                {
                    [asyncSocket cf_abortSSLHandshake:error];
                }
                else{ [asyncSocket closeWithError:error]; }}});break; }}}Copy the code

Call doWriteData to perform the write operation, and call completeCurrentWrite to call the data back out

// Start writing data, the current task - (void)doWriteData { ... // If doneif (done) {// Complete the operation [self completeCurrentWrite];if(! Error) {dispatch_async(socketQueue, ^{@autoreleasepool{// Start the next read task [self maybeDequeueWrite]; }}); }}... }Copy the code
// Complete the current write - (void)completeCurrentWrite {LogTrace(); NSAssert(currentWrite, @"Trying to complete current write when there is no current write.");
    
    
    __strong id theDelegate = delegate;
    
    if(delegateQueue && [theDelegate respondsToSelector:@selector(socket:didWriteDataWithTag:)]) { long theWriteTag = currentWrite->tag; // call the callback dispatch_async(delegateQueue, delegateQueue, delegateQueue) ^{ @autoreleasepool { [theDelegate socket:self didWriteDataWithTag:theWriteTag]; }}); } // Close the current write [self endCurrentWrite]; }Copy the code

Refer to the article

IOS IM Advanced – CocoaAsyncSocket source code parsing (Connect) iOS IM Advanced – CocoaAsyncSocket source code parsing (Connect) iOS IM Advanced – CocoaAsyncSocket source code parsing (Connect IOS Instant Messaging – CocoaAsyncSocket Source code parsing (End of Read)