Returning to the principles of the previous article, how does an Android client implement AIO Sokcet programming? Firstly, we would like to thank Sandao for the Smart-IOC framework of Android version of AIO (although the framework is written using Java NIO technology, but the whole framework processing mechanism is asynchronous and non-blocking, so we think it is AIO framework), which provides us with a simple and efficient IM basic framework. Of course, 😁 should be introduced when using other frameworks. Let’s briefly introduce smart-IOC

Smart – ioc profile

Java NIO implementation of smart-Socket Android version of communication framework, he is a simplified version of smart-Socket, is the author for the majority of Android developers want to use, and due to the API restrictions of the Gospel, here again thank the author.

preface

Although JDK7 has been out for a long time, the open source community doesn’t seem to be enthusiastic about its new AIO features. It seems that everyone is used to Netty and Mina when it comes to communication technology. Just about every programmer has been taught to Reinvent the Wheel, and is naturally immersed in their own comfort zones, enjoying the support of the open source community. As an example that fits in with this article, Netty or Mina will be the first thing that comes to mind for most Java programmers when they encounter communication related requirements. Even though they have never touched Netty/Mina, they believe that only these two frameworks can solve their problems. This situation may be due to the fact that we are too impatient, the pressure of work makes us have no time to recharge our batteries, especially those technical points that are not commonly used and a little bit deep, we have no energy to think about them. Fortunately, there are a number of studious programmers doing seemingly mundane things in their spare time, working hard on these technical points and making small works that inject new life into the open source community. At present, the known Java AIO projects on the code cloud include Voovan, Baseio and the protagonist of this paper: smart-Socket. None of these works can form enough influence to be recognized by the majority of Javaer, but through the promotion and practice of open source, I believe that the open source environment of AIO in the future will be more prosperous than now.

What is smart – socket

Smart; Agile; Beautiful; Neat), had high hopes for the project from the moment it was named. Focus on the research and development of communication components, abandon all big and complete solutions, only to provide small and beautiful basic services. Whether you are working in IOT, IM, RPC, or other communications development, smart-Socket is a cool choice. If there’s one thing to say about smart-socket, it’s this: Meet the smart-socket, and you’ve got a head start.

Smart-socket has been developed with strict requirements since the beginning of the project and pursues perfection in all aspects. First, smart-Socket is a very lightweight project, relying only on SLF4J as the project’s logging component. The JAR package released by smart-Socket is only 20KB. The simple interface design makes it very convenient to access communication services. If smart-socket can help you in this area, then I will definitely have done something meaningful myself.

The preparatory work

Smart-ioc code listing

The name of the type visibility instructions
Protocol interface public Protocol interfaces
MessageProcessor interface public Message handler interface
Filter interface public Filter interface
StateMachine @interface public Service status code
AioQuickClient class public AIO client
AioSession class public AIO Transfer session
IoServerConfig class package AIO service configuration
ReadCompletionHandler class package AIO reads the CompletionHandler implementation class
WriteCompletionHandler class package AIO write operation CompletionHandler implementation class
FastBlockingQueue class package Custom queue

The core interface

Protocol

public interface Protocol<T> {

    public T decode(final ByteBuffer data, AioSession<T> session, boolean eof);

    public ByteBuffer encode(T msg, AioSession<T> session);
}Copy the code

Protocol is a generic interface, which refers to the business message entity class. Protocol defines the interface for encoding and decoding messages. Let’s take a look at the two methods decode and encode.

  • Decode: Message decode, AIO data transmission is over ByteBuffer. After data is read and filled into ByteBuffer, the Decode method of the Protocol implementation class is called and ByteBuffer is passed in as the first parameter. The second parameter, AioSession, is the session object connected to the current Socket, which will be explained in detail later. The Protocol implementation class reads bytes from ByteBuffer and decodes the message according to its Protocol rules. After decoding, it encapsulates the message into a business object and returns it. In practice, however, the number of bytes read and passed into the ByteBuffer may not be enough to complete message decoding (called: Half-packet/unpacket), the Protocol implementation class may choose to partially decode or wait until ByteBuffer is sufficient to decode, but must return NULL if the message has not been decoded.
  • Encode: message encoding. Before outputting a service message to the network peer, it needs to encode it into a byte stream, which is also carried by ByteBuffer. The first parameter to this method, the generic T, is the business message object. The Protocol implementation class also converts the object referred to by T to a ByteBuffer and returns it according to the business rules. Smart-socket outputs the encoded ByteBuffer.

MessageProcessor

public interface MessageProcessor<T> {

    public void process(AioSession<T> session, T msg);

    void stateEvent(AioSession<T> session, @StateMachine int stateMachine, Throwable throwable);
}Copy the code

MessageProcessor defines the interface of the MessageProcessor. After decoding the message through Protocol, the message object is handed over to the MessageProcessor implementation class for business processing.

  • The Process message processor, which executes each complete business message received.
  • StateEvent: Performs the state callback. The MessageProcessor implementation class can handle the events of its interest in this method.

Filter

Filter is a communication layer Filter interface provided by the framework. Users can develop some extensibility services based on this interface. This interface is not often used, but when used well it can help you learn the health of your server.

public interface Filter<T> {

    public void readFilter(AioSession<T> session, int readSize);

    public void processFilter(AioSession<T> session, T msg);

    public void processFailHandler(AioSession<T> session, T msg, Throwable e);

    public void writeFilter(AioSession<T> session, int writeSize);

}Copy the code

  • ReadFilter: Filter read operations. This method is triggered whenever a read operation occurs. The first parameter AioSession refers to the session where the read event occurs, and the second parameter readSize refers to the number of bytes read
  • ProcessFilter: Message processing Filter. Each business message is executed filter.processFilter before messageProcessor.process
  • ProcessFailHandler: processFailHandler is triggered when a runtime exception occurs in processFilter execution of a business message
  • WriteFilter: Filter write operations. This method is triggered whenever a write operation occurs. The first parameter, AioSession, is the session where the write event occurs, and the second parameter, writeSize, is the number of bytes output

Service status code

The implementation class of MessageProcessor notifies the MessageProcessor when a specific event occurs, and the user needs to implement the processing himself

Status code instructions
NEW_SESSION When the network connection is established, AioSession at the transport layer is established. If a session needs to be maintained at the business layer, it can be handled in this state machine
INPUT_SHUTDOWN Triggered when data has been read, in the traditional senseread()==-1
INPUT_EXCEPTION This state machine is triggered when an exception occurs during data reading
OUTPUT_EXCEPTION This state machine is triggered when an exception occurs while writing data
SESSION_CLOSING The AioSession. Close method is triggered, but since AioSession still has unfinished events, it will enter the SESSION_CLOSING state
SESSION_CLOSED AioSesson triggers the state machine after it completes the close operation
PROCESS_EXCEPTION Service processing exception
SESSION_CONNECT_FAILED Session connection failure

Service configuration IoServerConfig

Configuration items type The default value note
BANNER String The console prints the startup banner
VERSION String v1.3.11 Current smart-Socket version
writeQueueSize int 4 The length of the output cache queue in AioSession
readBufferSize int 512 AioSession Indicates the size of ByteBuffer, expressed in bytes
host String null The address from which the client connects to the remote server
filters The Filter array [] Define filter array
port int 8088 Port number open to the server
processor MessageProcessor null Custom message handlers
protocol Protocol null User-defined protocol codec

AsynchronousSocketChannel

An asynchronous SocketChannel that emulates JDK7’s AIO handling, that is, a SocketChannel wrapper class. Objective Compatible versions of Android can use AIO processing.

The property name type instructions
readBuffer ByteBuffer Byte buffers that need to read bytes from SocketChannel
writeBuffer ByteBuffer The byte buffer that needs to be written into SocketChannel
readCompletionHandler ReadCompletionHandler Read the event callback handler class
writeCompletionHandler WriteCompletionHandler Write the event callback handler class
selectionKey SelectionKey SocketChannel registers the Selector action class
channel SocketChannel Practice Operating socket channels

Core method

methods instructions
public AsynchronousSocketChannel(SelectionKey selectionKey) The constructor
public final void read(ByteBuffer dst, A attachment, CompletionHandler handler) Asynchronous read
public final void write(ByteBuffer dst, A attachment, CompletionHandler handler) Asynchronous write
public void doRead() Read operations from a channel
public void doWrite() Write read operations from channel

Communication session AioSession

AioSession is the most core and most complex class in smart-IOC

Core member attributes

The property name type instructions
status byte The value can be: SESSION_STATUS_CLOSED(1). AioSession in this state cannot perform read and write operations. SESSION_STATUS_CLOSING(2) : AioSession State Indicates the transition state from SESSION_STATUS_ENABLED to SESSION_STATUS_CLOSED. In the SESSION_STATUS_CLOSING state, AioSession does not accept new read/write requests, but writes the data to be output in the cache. After the output is completed, AioSession changes the state to SESSION_STATUS_CLOSED. SESSION_STATUS_ENAB(3) : default status of AioSessio. It indicates that the current session status can be used for normal message communication, protocol codec, and service processing
writeCacheQueue FastBlockingQueue Output buffer queue
readCompletionHandler ReadCompletionHandler Read the callback
writeCompletionHandler WriteCompletionHandler Write a callback
channel AsynchronousSocketChannel AioSession currently maps the network channel
semaphore Semaphore Semaphores that control output competition for resources
ioServerConfig IoServerConfig AioQuickClient transparently transmits configuration items

Core method

The method name instructions
AioSession(AsynchronousSocketChannel, IoServerConfig, ReadCompletionHandler, WriteCompletionHandler) The only constructor
void readFromChannel() Data Decode — > Business Processing — > Register read events
public void write(final ByteBuffer buffer) Writes the encoded business message to a buffer and fireswriteToChannel()
void writeToChannel() Writes buffer data to the network channel
public void close(boolean immediate) Close the session

Write () outputs the data buffer to the peer end of the network

public final void write(final ByteBuffer buffer) throws IOException {
    if (isInvalid()) {
        throw new IOException("session is " + (status == SESSION_STATUS_CLOSED ? "closed" : "invalid"));
    }
    if(! buffer.hasRemaining()) { throw new InvalidObjectException("buffer has no remaining"); } // Whether the output queue is setif(ioServerConfig getWriteQueueSize () < = 0) {try {/ / blocking semaphore. Acquire (); // If there is no data to be written, writeBuffer = buffer; writeToChannel0(writeBuffer); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException(e.getMessage()); }return;
    } else if((semaphore.tryacquire ())) {writeBuffer = buffer; writeToChannel0(writeBuffer);return; } try {// Store to write queue writecacheQueue.put (buffer); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } // It is asynchronous, so it is free againif(semaphore.tryAcquire()) { writeToChannel(); }}Copy the code

WriteToChannel () triggers an AIO write operation that requires a call to control synchronization

void writeToChannel() {
    if(writeBuffer ! = null && writeBuffer.hasRemaining()) { writeToChannel0(writeBuffer);return;
    }

    if(writeCacheQueue == null || writeCacheQueue.size() == 0) { writeBuffer = null; semaphore.release(); // The state can be Closed or Closedif(isInvalid()) { close(); } // There may be new messages added to writeCacheQueue via the write methodelse if(writeCacheQueue ! = null && writeCacheQueue.size() > 0 && semaphore.tryAcquire()) { writeToChannel(); }return;
    }
    int totalSize = writeCacheQueue.expectRemaining(MAX_WRITE_SIZE);
    ByteBuffer headBuffer = writeCacheQueue.poll();
    if (headBuffer.remaining() == totalSize) {
        writeBuffer = headBuffer;
    } else {
        if (writeBuffer == null || totalSize << 1 <= writeBuffer.capacity() || totalSize > writeBuffer.capacity()) {
            writeBuffer = ByteBuffer.allocate(totalSize);
        } else {
            writeBuffer.clear().limit(totalSize);
        }
        writeBuffer.put(headBuffer);
        writeCacheQueue.pollInto(writeBuffer);
        writeBuffer.flip();
    }
    writeToChannel0(writeBuffer);

}Copy the code

WriteToChannel0 Triggers a write operation on the channel

protected final void writeToChannel0(ByteBuffer buffer) {
    if (null != channel && channel.isOpen()) {
        channel.write(buffer, this, writeCompletionHandler);
    }
}Copy the code

ReadFromChannel triggers the read operation of the channel. When a serious message backlog is detected, flow control is triggered

void readFromChannel(Integer readSize) {
    readBuffer.flip();

    T dataEntry;
    while ((dataEntry = ioServerConfig.getProtocol().decode(readBuffer, this, readSize)) ! = null) {// Handle message try {for (Filter<T> h : ioServerConfig.getFilters()) {
                h.processFilter(this, dataEntry);
            }
            ioServerConfig.getProcessor().process(this, dataEntry);
        } catch (Exception e) {
            ioServerConfig.getProcessor().stateEvent(this, StateMachine.PROCESS_EXCEPTION, e);
            for(Filter<T> h : ioServerConfig.getFilters()) { h.processFail(this, dataEntry, e); }}}if (readSize == -1 || status == SESSION_STATUS_CLOSING) {
        close(false);
        return;
    }
    if (status == SESSION_STATUS_CLOSED) {
        return; } // Data has been readif (readBuffer.remaining() == 0) {
        readBuffer.clear();
    } else if (readBuffer.position() > 0) {// Compact is called only when a data read occurs to reduce memory copyreadBuffer.compact();
    } else {
        readBuffer.position(readBuffer.limit());
        readBuffer.limit(readBuffer.capacity());
    }
    readFromChannel0(readBuffer);
}Copy the code

WriteToChannel0 triggers a read operation on the channel

protected final void readFromChannel0(ByteBuffer buffer) {
    if(null ! = channel && channel.isOpen()) { channel.read(buffer, this,readCompletionHandler); }}Copy the code

AioQuickClient

Member attribute

The property name type instructions
mAsynchronousSocketChannel AsynchronousSocketChannel Asynchronous socket channels for asynchronous reading and writing
mSession AioSession Client session information
mSelector Selector The selector manages channels
config IoServerConfig Stores AioQuickClient service configuration items
config IoServerConfig Stores AioQuickClient service configuration items

Core method

methods instructions
public AioQuickClient(String host, int port, Protocol protocol, MessageProcessormessageProcessor) The constructor
public final void start() Start Connecting to the server.
private void acceptConnect(SelectionKey key) Accept and establish the connection between client and server
public final void timeout() Connection timeout requires a manual call to this method
public final void shutdown() Shut down
The constructor
public AioQuickClient(String host, int port, Protocol<T> protocol, MessageProcessor<T> messageProcessor) {
    mConfig.setHost(host);
    mConfig.setPort(port);
    mConfig.setProtocol(protocol);
    mConfig.setProcessor(messageProcessor);
}Copy the code

Start () Starts the connection to the server

public final void start() {
    try {

        mSelector = Selector.open();
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        SelectionKey selectionKey = socketChannel.register(mSelector, SelectionKey.OP_CONNECT);

        mAsynchronousSocketChannel = new AsynchronousSocketChannel(selectionKey);
        socketChannel.connect(new InetSocketAddress(mConfig.getHost(), mConfig.getPort()));
//            ToolLog.e(TAG, "Start connecting to server"); ClientThread serverThread = new ClientThread(); serverThread.start(); } catch (final IOException | UnresolvedAddressException e) { e.printStackTrace(); mConfig.getProcessor().stateEvent(mSession, StateMachine.SESSION_CONNECT_FAILED, e); }}Copy the code

The inner ClentThread transport layer Channel class handles threads

class ClientThread extends Thread {

    @Override
    public void run() {

        try {
            while(mAsynchronousSocketChannel isOpen ()) {/ / preferential access to SelectionKey, if no attention event triggers the obstruction in the selector. The select (), reducing The Times for the select is called Set<SelectionKey> keySet = mSelector.selectedKeys();if(keySet.isEmpty()) { mSelector.select(); } Iterator<SelectionKey> keyIterator = keySet.iterator(); // Execute the event that has been triggered to be handledwhile(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); // Read dataif (key.isReadable()) {
                        mAsynchronousSocketChannel.removeOps(SelectionKey.OP_READ);
                        mAsynchronousSocketChannel.doRead();
                    } else if(key. IsWritable ()) {/ / output data mAsynchronousSocketChannel. RemoveOps (SelectionKey. OP_WRITE); mAsynchronousSocketChannel.doWrite(); }else if(key.isConnectable()) {// Establish a new connection, the Client triggers Connect, the Server triggers Accept Connect(key); } // Remove the handled event keyiterator.remove (); } } } catch (Exception e) { shutdown(); }}}Copy the code

AcceptConnect () handles the ready operation to receive a link

private void acceptConnect(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel();  channel.finishConnect(); key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ); / / create AioSession session mSession = new AioSession < > (mAsynchronousSocketChannel mConfig, new ReadCompletionHandler (), new WriteCompletionHandler()); mSession.initSession(); McOnfig.getprocessor ().stateEvent(mSession, statemachine.new_session, null); }Copy the code

At the end

If there is a mistake to correct, thank you 🙏