Returning to the principles of the previous article, how does an Android client implement AIO Sokcet programming? Firstly, we would like to thank Sandao for the Smart-IOC framework of Android version of AIO (although the framework is written using Java NIO technology, but the whole framework processing mechanism is asynchronous and non-blocking, so we think it is AIO framework), which provides us with a simple and efficient IM basic framework. Of course, 😁 should be introduced when using other frameworks. Let’s briefly introduce smart-IOC
Smart – ioc profile
Java NIO implementation of smart-Socket Android version of communication framework, he is a simplified version of smart-Socket, is the author for the majority of Android developers want to use, and due to the API restrictions of the Gospel, here again thank the author.
preface
Although JDK7 has been out for a long time, the open source community doesn’t seem to be enthusiastic about its new AIO features. It seems that everyone is used to Netty and Mina when it comes to communication technology. Just about every programmer has been taught to Reinvent the Wheel, and is naturally immersed in their own comfort zones, enjoying the support of the open source community. As an example that fits in with this article, Netty or Mina will be the first thing that comes to mind for most Java programmers when they encounter communication related requirements. Even though they have never touched Netty/Mina, they believe that only these two frameworks can solve their problems. This situation may be due to the fact that we are too impatient, the pressure of work makes us have no time to recharge our batteries, especially those technical points that are not commonly used and a little bit deep, we have no energy to think about them. Fortunately, there are a number of studious programmers doing seemingly mundane things in their spare time, working hard on these technical points and making small works that inject new life into the open source community. At present, the known Java AIO projects on the code cloud include Voovan, Baseio and the protagonist of this paper: smart-Socket. None of these works can form enough influence to be recognized by the majority of Javaer, but through the promotion and practice of open source, I believe that the open source environment of AIO in the future will be more prosperous than now.
What is smart – socket
Smart; Agile; Beautiful; Neat), had high hopes for the project from the moment it was named. Focus on the research and development of communication components, abandon all big and complete solutions, only to provide small and beautiful basic services. Whether you are working in IOT, IM, RPC, or other communications development, smart-Socket is a cool choice. If there’s one thing to say about smart-socket, it’s this: Meet the smart-socket, and you’ve got a head start.
Smart-socket has been developed with strict requirements since the beginning of the project and pursues perfection in all aspects. First, smart-Socket is a very lightweight project, relying only on SLF4J as the project’s logging component. The JAR package released by smart-Socket is only 20KB. The simple interface design makes it very convenient to access communication services. If smart-socket can help you in this area, then I will definitely have done something meaningful myself.
The preparatory work
Smart-ioc code listing
The name of the | type | visibility | instructions |
---|---|---|---|
Protocol | interface | public | Protocol interfaces |
MessageProcessor | interface | public | Message handler interface |
Filter | interface | public | Filter interface |
StateMachine | @interface | public | Service status code |
AioQuickClient | class | public | AIO client |
AioSession | class | public | AIO Transfer session |
IoServerConfig | class | package | AIO service configuration |
ReadCompletionHandler | class | package | AIO reads the CompletionHandler implementation class |
WriteCompletionHandler | class | package | AIO write operation CompletionHandler implementation class |
FastBlockingQueue | class | package | Custom queue |
The core interface
Protocol
public interface Protocol<T> {
public T decode(final ByteBuffer data, AioSession<T> session, boolean eof);
public ByteBuffer encode(T msg, AioSession<T> session);
}Copy the code
Protocol is a generic interface, which refers to the business message entity class. Protocol defines the interface for encoding and decoding messages. Let’s take a look at the two methods decode and encode.
- Decode: Message decode, AIO data transmission is over ByteBuffer. After data is read and filled into ByteBuffer, the Decode method of the Protocol implementation class is called and ByteBuffer is passed in as the first parameter. The second parameter, AioSession, is the session object connected to the current Socket, which will be explained in detail later. The Protocol implementation class reads bytes from ByteBuffer and decodes the message according to its Protocol rules. After decoding, it encapsulates the message into a business object and returns it. In practice, however, the number of bytes read and passed into the ByteBuffer may not be enough to complete message decoding (called: Half-packet/unpacket), the Protocol implementation class may choose to partially decode or wait until ByteBuffer is sufficient to decode, but must return NULL if the message has not been decoded.
- Encode: message encoding. Before outputting a service message to the network peer, it needs to encode it into a byte stream, which is also carried by ByteBuffer. The first parameter to this method, the generic T, is the business message object. The Protocol implementation class also converts the object referred to by T to a ByteBuffer and returns it according to the business rules. Smart-socket outputs the encoded ByteBuffer.
MessageProcessor
public interface MessageProcessor<T> {
public void process(AioSession<T> session, T msg);
void stateEvent(AioSession<T> session, @StateMachine int stateMachine, Throwable throwable);
}Copy the code
MessageProcessor defines the interface of the MessageProcessor. After decoding the message through Protocol, the message object is handed over to the MessageProcessor implementation class for business processing.
- The Process message processor, which executes each complete business message received.
- StateEvent: Performs the state callback. The MessageProcessor implementation class can handle the events of its interest in this method.
Filter
Filter is a communication layer Filter interface provided by the framework. Users can develop some extensibility services based on this interface. This interface is not often used, but when used well it can help you learn the health of your server.
public interface Filter<T> {
public void readFilter(AioSession<T> session, int readSize);
public void processFilter(AioSession<T> session, T msg);
public void processFailHandler(AioSession<T> session, T msg, Throwable e);
public void writeFilter(AioSession<T> session, int writeSize);
}Copy the code
- ReadFilter: Filter read operations. This method is triggered whenever a read operation occurs. The first parameter AioSession refers to the session where the read event occurs, and the second parameter readSize refers to the number of bytes read
- ProcessFilter: Message processing Filter. Each business message is executed filter.processFilter before messageProcessor.process
- ProcessFailHandler: processFailHandler is triggered when a runtime exception occurs in processFilter execution of a business message
- WriteFilter: Filter write operations. This method is triggered whenever a write operation occurs. The first parameter, AioSession, is the session where the write event occurs, and the second parameter, writeSize, is the number of bytes output
Service status code
The implementation class of MessageProcessor notifies the MessageProcessor when a specific event occurs, and the user needs to implement the processing himself
Status code | instructions |
---|---|
NEW_SESSION | When the network connection is established, AioSession at the transport layer is established. If a session needs to be maintained at the business layer, it can be handled in this state machine |
INPUT_SHUTDOWN | Triggered when data has been read, in the traditional senseread()==-1 |
INPUT_EXCEPTION | This state machine is triggered when an exception occurs during data reading |
OUTPUT_EXCEPTION | This state machine is triggered when an exception occurs while writing data |
SESSION_CLOSING | The AioSession. Close method is triggered, but since AioSession still has unfinished events, it will enter the SESSION_CLOSING state |
SESSION_CLOSED | AioSesson triggers the state machine after it completes the close operation |
PROCESS_EXCEPTION | Service processing exception |
SESSION_CONNECT_FAILED | Session connection failure |
Service configuration IoServerConfig
Configuration items | type | The default value | note |
---|---|---|---|
BANNER | String | – | The console prints the startup banner |
VERSION | String | v1.3.11 | Current smart-Socket version |
writeQueueSize | int | 4 | The length of the output cache queue in AioSession |
readBufferSize | int | 512 | AioSession Indicates the size of ByteBuffer, expressed in bytes |
host | String | null | The address from which the client connects to the remote server |
filters | The Filter array | [] | Define filter array |
port | int | 8088 | Port number open to the server |
processor | MessageProcessor | null | Custom message handlers |
protocol | Protocol | null | User-defined protocol codec |
AsynchronousSocketChannel
An asynchronous SocketChannel that emulates JDK7’s AIO handling, that is, a SocketChannel wrapper class. Objective Compatible versions of Android can use AIO processing.
The property name | type | instructions |
---|---|---|
readBuffer | ByteBuffer | Byte buffers that need to read bytes from SocketChannel |
writeBuffer | ByteBuffer | The byte buffer that needs to be written into SocketChannel |
readCompletionHandler | ReadCompletionHandler | Read the event callback handler class |
writeCompletionHandler | WriteCompletionHandler | Write the event callback handler class |
selectionKey | SelectionKey | SocketChannel registers the Selector action class |
channel | SocketChannel | Practice Operating socket channels |
Core method
methods | instructions |
---|---|
public AsynchronousSocketChannel(SelectionKey selectionKey) | The constructor |
public final void read(ByteBuffer dst, A attachment, CompletionHandler handler) | Asynchronous read |
public final void write(ByteBuffer dst, A attachment, CompletionHandler handler) | Asynchronous write |
public void doRead() | Read operations from a channel |
public void doWrite() | Write read operations from channel |
Communication session AioSession
AioSession is the most core and most complex class in smart-IOC
Core member attributes
The property name | type | instructions |
---|---|---|
status | byte | The value can be: SESSION_STATUS_CLOSED(1). AioSession in this state cannot perform read and write operations. SESSION_STATUS_CLOSING(2) : AioSession State Indicates the transition state from SESSION_STATUS_ENABLED to SESSION_STATUS_CLOSED. In the SESSION_STATUS_CLOSING state, AioSession does not accept new read/write requests, but writes the data to be output in the cache. After the output is completed, AioSession changes the state to SESSION_STATUS_CLOSED. SESSION_STATUS_ENAB(3) : default status of AioSessio. It indicates that the current session status can be used for normal message communication, protocol codec, and service processing |
writeCacheQueue | FastBlockingQueue | Output buffer queue |
readCompletionHandler | ReadCompletionHandler | Read the callback |
writeCompletionHandler | WriteCompletionHandler | Write a callback |
channel | AsynchronousSocketChannel | AioSession currently maps the network channel |
semaphore | Semaphore | Semaphores that control output competition for resources |
ioServerConfig | IoServerConfig | AioQuickClient transparently transmits configuration items |
Core method
The method name | instructions |
---|---|
AioSession(AsynchronousSocketChannel, IoServerConfig, ReadCompletionHandler, WriteCompletionHandler) | The only constructor |
void readFromChannel() | Data Decode — > Business Processing — > Register read events |
public void write(final ByteBuffer buffer) | Writes the encoded business message to a buffer and fireswriteToChannel() |
void writeToChannel() | Writes buffer data to the network channel |
public void close(boolean immediate) | Close the session |
Write () outputs the data buffer to the peer end of the network
public final void write(final ByteBuffer buffer) throws IOException {
if (isInvalid()) {
throw new IOException("session is " + (status == SESSION_STATUS_CLOSED ? "closed" : "invalid"));
}
if(! buffer.hasRemaining()) { throw new InvalidObjectException("buffer has no remaining"); } // Whether the output queue is setif(ioServerConfig getWriteQueueSize () < = 0) {try {/ / blocking semaphore. Acquire (); // If there is no data to be written, writeBuffer = buffer; writeToChannel0(writeBuffer); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException(e.getMessage()); }return;
} else if((semaphore.tryacquire ())) {writeBuffer = buffer; writeToChannel0(writeBuffer);return; } try {// Store to write queue writecacheQueue.put (buffer); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } // It is asynchronous, so it is free againif(semaphore.tryAcquire()) { writeToChannel(); }}Copy the code
WriteToChannel () triggers an AIO write operation that requires a call to control synchronization
void writeToChannel() {
if(writeBuffer ! = null && writeBuffer.hasRemaining()) { writeToChannel0(writeBuffer);return;
}
if(writeCacheQueue == null || writeCacheQueue.size() == 0) { writeBuffer = null; semaphore.release(); // The state can be Closed or Closedif(isInvalid()) { close(); } // There may be new messages added to writeCacheQueue via the write methodelse if(writeCacheQueue ! = null && writeCacheQueue.size() > 0 && semaphore.tryAcquire()) { writeToChannel(); }return;
}
int totalSize = writeCacheQueue.expectRemaining(MAX_WRITE_SIZE);
ByteBuffer headBuffer = writeCacheQueue.poll();
if (headBuffer.remaining() == totalSize) {
writeBuffer = headBuffer;
} else {
if (writeBuffer == null || totalSize << 1 <= writeBuffer.capacity() || totalSize > writeBuffer.capacity()) {
writeBuffer = ByteBuffer.allocate(totalSize);
} else {
writeBuffer.clear().limit(totalSize);
}
writeBuffer.put(headBuffer);
writeCacheQueue.pollInto(writeBuffer);
writeBuffer.flip();
}
writeToChannel0(writeBuffer);
}Copy the code
WriteToChannel0 Triggers a write operation on the channel
protected final void writeToChannel0(ByteBuffer buffer) {
if (null != channel && channel.isOpen()) {
channel.write(buffer, this, writeCompletionHandler);
}
}Copy the code
ReadFromChannel triggers the read operation of the channel. When a serious message backlog is detected, flow control is triggered
void readFromChannel(Integer readSize) {
readBuffer.flip();
T dataEntry;
while ((dataEntry = ioServerConfig.getProtocol().decode(readBuffer, this, readSize)) ! = null) {// Handle message try {for (Filter<T> h : ioServerConfig.getFilters()) {
h.processFilter(this, dataEntry);
}
ioServerConfig.getProcessor().process(this, dataEntry);
} catch (Exception e) {
ioServerConfig.getProcessor().stateEvent(this, StateMachine.PROCESS_EXCEPTION, e);
for(Filter<T> h : ioServerConfig.getFilters()) { h.processFail(this, dataEntry, e); }}}if (readSize == -1 || status == SESSION_STATUS_CLOSING) {
close(false);
return;
}
if (status == SESSION_STATUS_CLOSED) {
return; } // Data has been readif (readBuffer.remaining() == 0) {
readBuffer.clear();
} else if (readBuffer.position() > 0) {// Compact is called only when a data read occurs to reduce memory copyreadBuffer.compact();
} else {
readBuffer.position(readBuffer.limit());
readBuffer.limit(readBuffer.capacity());
}
readFromChannel0(readBuffer);
}Copy the code
WriteToChannel0 triggers a read operation on the channel
protected final void readFromChannel0(ByteBuffer buffer) {
if(null ! = channel && channel.isOpen()) { channel.read(buffer, this,readCompletionHandler); }}Copy the code
AioQuickClient
Member attribute
The property name | type | instructions |
---|---|---|
mAsynchronousSocketChannel | AsynchronousSocketChannel | Asynchronous socket channels for asynchronous reading and writing |
mSession | AioSession | Client session information |
mSelector | Selector | The selector manages channels |
config | IoServerConfig | Stores AioQuickClient service configuration items |
config | IoServerConfig | Stores AioQuickClient service configuration items |
Core method
methods | instructions |
---|---|
public AioQuickClient(String host, int port, Protocol protocol, MessageProcessormessageProcessor) | The constructor |
public final void start() | Start Connecting to the server. |
private void acceptConnect(SelectionKey key) | Accept and establish the connection between client and server |
public final void timeout() | Connection timeout requires a manual call to this method |
public final void shutdown() | Shut down |
The constructor
public AioQuickClient(String host, int port, Protocol<T> protocol, MessageProcessor<T> messageProcessor) {
mConfig.setHost(host);
mConfig.setPort(port);
mConfig.setProtocol(protocol);
mConfig.setProcessor(messageProcessor);
}Copy the code
Start () Starts the connection to the server
public final void start() {
try {
mSelector = Selector.open();
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
SelectionKey selectionKey = socketChannel.register(mSelector, SelectionKey.OP_CONNECT);
mAsynchronousSocketChannel = new AsynchronousSocketChannel(selectionKey);
socketChannel.connect(new InetSocketAddress(mConfig.getHost(), mConfig.getPort()));
// ToolLog.e(TAG, "Start connecting to server"); ClientThread serverThread = new ClientThread(); serverThread.start(); } catch (final IOException | UnresolvedAddressException e) { e.printStackTrace(); mConfig.getProcessor().stateEvent(mSession, StateMachine.SESSION_CONNECT_FAILED, e); }}Copy the code
The inner ClentThread transport layer Channel class handles threads
class ClientThread extends Thread {
@Override
public void run() {
try {
while(mAsynchronousSocketChannel isOpen ()) {/ / preferential access to SelectionKey, if no attention event triggers the obstruction in the selector. The select (), reducing The Times for the select is called Set<SelectionKey> keySet = mSelector.selectedKeys();if(keySet.isEmpty()) { mSelector.select(); } Iterator<SelectionKey> keyIterator = keySet.iterator(); // Execute the event that has been triggered to be handledwhile(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); // Read dataif (key.isReadable()) {
mAsynchronousSocketChannel.removeOps(SelectionKey.OP_READ);
mAsynchronousSocketChannel.doRead();
} else if(key. IsWritable ()) {/ / output data mAsynchronousSocketChannel. RemoveOps (SelectionKey. OP_WRITE); mAsynchronousSocketChannel.doWrite(); }else if(key.isConnectable()) {// Establish a new connection, the Client triggers Connect, the Server triggers Accept Connect(key); } // Remove the handled event keyiterator.remove (); } } } catch (Exception e) { shutdown(); }}}Copy the code
AcceptConnect () handles the ready operation to receive a link
private void acceptConnect(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); channel.finishConnect(); key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ); / / create AioSession session mSession = new AioSession < > (mAsynchronousSocketChannel mConfig, new ReadCompletionHandler (), new WriteCompletionHandler()); mSession.initSession(); McOnfig.getprocessor ().stateEvent(mSession, statemachine.new_session, null); }Copy the code
At the end
If there is a mistake to correct, thank you 🙏