Introduction: This article focuses on the design and implementation of proxy gateway itself, rather than the management and maintenance of proxy resources.

The author | new source | ali technology public number

1. Background

  1. Platform end to purchase a number of naked agents to do advertising show audit. The agent purchased from outside shall be used as follows:
  2. The proxy IP address :PORT is extracted through the given HTTP API. The returned result will give the valid duration of the proxy for 3-5 minutes and the location of the proxy.

Select the specified region from the extracted proxy, add authentication information, and request to obtain the result;

This paper designs and implements a proxy gateway through:

  1. Manage and maintain agent resources, and do agent authentication;
  2. Expose a unified proxy entry instead of a dynamically changing proxy IP address :PORT;
  3. Traffic filtering and limiting, for example, static resource agent;

This paper focuses on the design and implementation of proxy gateway itself rather than the management and maintenance of proxy resources.

Note: This article contains a lot of executable JAVA code to explain proxy-related principles

Ii. Technical Route

The technical route of this paper. Before implementing the proxy gateway, we first introduce the principles related to proxy and how to implement it

  1. Transparent agent;
  2. Non-transparent proxy;
  3. Transparent upstream agents;
  4. Non-transparent upstream agents;

Finally, this paper will build a proxy gateway, essentially an opaque upstream proxy, and give a detailed design and implementation.

1 Transparent Agent

Transparent proxy is the basis of proxy gateway, this article uses JAVA native NIO to introduce in detail. In the implementation of proxy gateway, the actual use of the NETTY framework. The native NIO implementation is helpful in understanding the NETTY implementation.

Transparent proxy designs three interacting parties, client, proxy service and server. The principle is as follows:

  1. When the proxy service receives a connection request, it determines: if it is a CONNECT request, it needs to reply the proxy connection success message to the client;
  2. After the CONNECT request is answered, the proxy service needs to CONNECT to the remote server specified by CONNECT and then directly forward the client to the remote service communication.
  3. When the proxy service receives a non-CONNECT request, it needs to resolve the remote server of the request, and then directly forward the communication between the client and the remote service.

The points to note are:

  1. Typically, HTTPS requests send CONNECT requests before passing through the proxy. A handshake protocol that encrypts communication over a channel after a successful connection; Therefore, the time to CONNECT to a remote is when the CONNECT request is received, since the data is encrypted thereafter;
  2. The transparent proxy does not need to pass the CONNECT request to the remote service when it receives it (the remote service does not recognize the request);
  3. When a transparent proxy receives a non-CONNECT request, it forwards it unconditionally;

The full transparent proxy implementation is less than about 300 lines of code, excerpted in full below:

@Slf4j
public class SimpleTransProxy {

    public static void main(String[] args) throws IOException {
        int port = 8006;
        ServerSocketChannel localServer = ServerSocketChannel.open();
        localServer.bind(new InetSocketAddress(port));
        Reactor reactor = new Reactor();
        // REACTOR线程
        GlobalThreadPool.REACTOR_EXECUTOR.submit(reactor::run);

        // WORKER单线程调试
        while (localServer.isOpen()) {
            // 此处阻塞等待连接
            SocketChannel remoteClient = localServer.accept();

            // 工作线程
            GlobalThreadPool.WORK_EXECUTOR.submit(new Runnable() {
                @SneakyThrows
                @Override
                public void run() {
                    // 代理到远程
                    SocketChannel remoteServer = new ProxyHandler().proxy(remoteClient);

                    // 透明传输
                    reactor.pipe(remoteClient, remoteServer)
                            .pipe(remoteServer, remoteClient);
                }
            });
        }
    }
}

@Data
class ProxyHandler {
    private String method;
    private String host;
    private int port;
    private SocketChannel remoteServer;
    private SocketChannel remoteClient;

    /**
     * 原始信息
     */
    private List<ByteBuffer> buffers = new ArrayList<>();
    private StringBuilder stringBuilder = new StringBuilder();

    /**
     * 连接到远程
     * @param remoteClient
     * @return
     * @throws IOException
     */
    public SocketChannel proxy(SocketChannel remoteClient) throws IOException {
        this.remoteClient = remoteClient;
        connect();
        return this.remoteServer;
    }

    public void connect() throws IOException {
        // 解析METHOD, HOST和PORT
        beforeConnected();

        // 链接REMOTE SERVER
        createRemoteServer();

        // CONNECT请求回应,其他请求WRITE THROUGH
        afterConnected();
    }

    protected void beforeConnected() throws IOException {
        // 读取HEADER
        readAllHeader();

        // 解析HOST和PORT
        parseRemoteHostAndPort();
    }

    /**
     * 创建远程连接
     * @throws IOException
     */
    protected void createRemoteServer() throws IOException {
        remoteServer = SocketChannel.open(new InetSocketAddress(host, port));
    }

    /**
     * 连接建立后预处理
     * @throws IOException
     */
    protected void afterConnected() throws IOException {
        // 当CONNECT请求时,默认写入200到CLIENT
        if ("CONNECT".equalsIgnoreCase(method)) {
            // CONNECT默认为443端口,根据HOST再解析
            remoteClient.write(ByteBuffer.wrap("HTTP/1.0 200 Connection Established\r\nProxy-agent: nginx\r\n\r\n".getBytes()));
        } else {
            writeThrouth();
        }
    }

    protected void writeThrouth() {
        buffers.forEach(byteBuffer -> {
            try {
                remoteServer.write(byteBuffer);
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }

    /**
     * 读取请求内容
     * @throws IOException
     */
    protected void readAllHeader() throws IOException {
        while (true) {
            ByteBuffer clientBuffer = newByteBuffer();
            int read = remoteClient.read(clientBuffer);
            clientBuffer.flip();
            appendClientBuffer(clientBuffer);
            if (read < clientBuffer.capacity()) {
                break;
            }
        }
    }

    /**
     * 解析出HOST和PROT
     * @throws IOException
     */
    protected void parseRemoteHostAndPort() throws IOException {
        // 读取第一批,获取到METHOD
        method = parseRequestMethod(stringBuilder.toString());

        // 默认为80端口,根据HOST再解析
        port = 80;
        if ("CONNECT".equalsIgnoreCase(method)) {
            port = 443;
        }

        this.host = parseHost(stringBuilder.toString());

        URI remoteServerURI = URI.create(host);
        host = remoteServerURI.getHost();

        if (remoteServerURI.getPort() > 0) {
            port = remoteServerURI.getPort();
        }
    }

    protected void appendClientBuffer(ByteBuffer clientBuffer) {
        buffers.add(clientBuffer);
        stringBuilder.append(new String(clientBuffer.array(), clientBuffer.position(), clientBuffer.limit()));
    }

    protected static ByteBuffer newByteBuffer() {
        // buffer必须大于7,保证能读到method
        return ByteBuffer.allocate(128);
    }

    private static String parseRequestMethod(String rawContent) {
        // create uri
        return rawContent.split("\r\n")[0].split(" ")[0];
    }

    private static String parseHost(String rawContent) {
        String[] headers = rawContent.split("\r\n");
        String host = "host:";
        for (String header : headers) {
            if (header.length() > host.length()) {
                String key = header.substring(0, host.length());
                String value = header.substring(host.length()).trim();
                if (host.equalsIgnoreCase(key)) {
                    if (!value.startsWith("http://") && !value.startsWith("https://")) {
                        value = "http://" + value;
                    }
                    return value;
                }
            }
        }
        return "";
    }

}

@Slf4j
@Data
class Reactor {

    private Selector selector;

    private volatile boolean finish = false;

    @SneakyThrows
    public Reactor() {
        selector = Selector.open();
    }

    @SneakyThrows
    public Reactor pipe(SocketChannel from, SocketChannel to) {
        from.configureBlocking(false);
        from.register(selector, SelectionKey.OP_READ, new SocketPipe(this, from, to));
        return this;
    }

    @SneakyThrows
    public void run() {
        try {
            while (!finish) {
                if (selector.selectNow() > 0) {
                    Iterator<SelectionKey> it = selector.selectedKeys().iterator();
                    while (it.hasNext()) {
                        SelectionKey selectionKey = it.next();
                        if (selectionKey.isValid() && selectionKey.isReadable()) {
                            ((SocketPipe) selectionKey.attachment()).pipe();
                        }
                        it.remove();
                    }
                }
            }
        } finally {
            close();
        }
    }

    @SneakyThrows
    public synchronized void close() {
        if (finish) {
            return;
        }
        finish = true;
        if (!selector.isOpen()) {
            return;
        }
        for (SelectionKey key : selector.keys()) {
            closeChannel(key.channel());
            key.cancel();
        }
        if (selector != null) {
            selector.close();
        }
    }

    public void cancel(SelectableChannel channel) {
        SelectionKey key = channel.keyFor(selector);
        if (Objects.isNull(key)) {
            return;
        }
        key.cancel();
    }

    @SneakyThrows
    public void closeChannel(Channel channel) {
        SocketChannel socketChannel = (SocketChannel)channel;
        if (socketChannel.isConnected() && socketChannel.isOpen()) {
            socketChannel.shutdownOutput();
            socketChannel.shutdownInput();
        }
        socketChannel.close();
    }
}

@Data
@AllArgsConstructor
class SocketPipe {

    private Reactor reactor;

    private SocketChannel from;

    private SocketChannel to;

    @SneakyThrows
    public void pipe() {
        // 取消监听
        clearInterestOps();

        GlobalThreadPool.PIPE_EXECUTOR.submit(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                int totalBytesRead = 0;
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                while (valid(from) && valid(to)) {
                    byteBuffer.clear();
                    int bytesRead = from.read(byteBuffer);
                    totalBytesRead = totalBytesRead + bytesRead;
                    byteBuffer.flip();
                    to.write(byteBuffer);
                    if (bytesRead < byteBuffer.capacity()) {
                        break;
                    }
                }
                if (totalBytesRead < 0) {
                    reactor.closeChannel(from);
                    reactor.cancel(from);
                } else {
                    // 重置监听
                    resetInterestOps();
                }
            }
        });
    }

    protected void clearInterestOps() {
        from.keyFor(reactor.getSelector()).interestOps(0);
        to.keyFor(reactor.getSelector()).interestOps(0);
    }

    protected void resetInterestOps() {
        from.keyFor(reactor.getSelector()).interestOps(SelectionKey.OP_READ);
        to.keyFor(reactor.getSelector()).interestOps(SelectionKey.OP_READ);
    }

    private boolean valid(SocketChannel channel) {
        return channel.isConnected() && channel.isRegistered() && channel.isOpen();
    }
}
Copy the code

Above, take a page from NETTY:

  1. The REACTOR thread is first initialized, and then agent listening is enabled, which is processed when the agent request is received.
  2. When the proxy service receives the proxy request, it first preprocesses the proxy, and then SocketPipe forwards it both ways between the client and the remote server.
  3. Proxy preprocessing reads the first HTTP request and resolves METHOD, HOST, and PORT.
  4. If it is a CONNECT request, send a response Connection Established, then CONNECT to the remote server and return to SocketChannel
  5. If it is a non-connect request, CONNECT to the remote server, write the original request, and return the SocketChannel
  6. SocketPipe Performs bidirectional forwarding between the client and remote server. This itself registers the client and server socketChannels to the REACTOR
  7. The REACTOR is detecting a READABLE CHANNEL and sending it to SocketPipe for two-way forwarding.

test

After pointing to the code, the proxy service listens on port 8006.

Curl -x ‘localhost:8006’ httpbin.org/get Tests the HTTP request

Curl -x ‘localhost:8006’ httpbin.org/get Test HTTPS please…

Note that the proxy service is proxying the HTTPS request, but the -k option is not required, indicating an insecure proxy. Because the proxy service itself does not act as an intermediary, it does not resolve the content of the communication between the client and the remote server. This problem needs to be addressed in the case of non-transparent proxies.

2 Non-transparent proxy

An opaque proxy requires parsing the content transmitted by the client and the remote server and processing it accordingly.

If HTTP is used, the data transmitted by SocketPipe is plaintext data and can be intercepted and processed directly.

When HTTPS is used, valid data transmitted by SocketPipe is encrypted and cannot be processed transparently. In addition, whether HTTP or HTTPS is transmitted, the SocketPipe reads incomplete data, which requires batch processing.

  1. The SocketPipe cluster problem can be implemented using a relatively simple pattern similar to BufferedInputStream for InputStream. For details, see HttpObjectAggregator of NETTY.
  2. HTTPS raw request and result data encryption and decryption processing, need to implement NIO SOCKET CHANNEL;

SslSocketChannel encapsulation principle

JDK SocketChannel does not support SSL. The existing SSLSocket is blocking OIO. As shown in figure:

It can be seen that

  1. An SSL SESSION handshake is required for each inbound and outbound data;
  2. Decryption of inbound data and encryption of outbound data;
  3. Handshake, data encryption and data decryption are a unified set of state machines;

Below, the code implements SslSocketChannel

public class SslSocketChannel {

    /**
     * 握手加解密需要的四个存储
     */
    protected ByteBuffer myAppData; // 明文
    protected ByteBuffer myNetData; // 密文
    protected ByteBuffer peerAppData; // 明文
    protected ByteBuffer peerNetData; // 密文

    /**
     * 握手加解密过程中用到的异步执行器
     */
    protected ExecutorService executor = Executors.newSingleThreadExecutor();

    /**
     * 原NIO 的 CHANNEL
     */
    protected SocketChannel socketChannel;

    /**
     * SSL 引擎
     */
    protected SSLEngine engine;

    public SslSocketChannel(SSLContext context, SocketChannel socketChannel, boolean clientMode) throws Exception {
        // 原始的NIO SOCKET
        this.socketChannel = socketChannel;

        // 初始化BUFFER
        SSLSession dummySession = context.createSSLEngine().getSession();
        myAppData = ByteBuffer.allocate(dummySession.getApplicationBufferSize());
        myNetData = ByteBuffer.allocate(dummySession.getPacketBufferSize());
        peerAppData = ByteBuffer.allocate(dummySession.getApplicationBufferSize());
        peerNetData = ByteBuffer.allocate(dummySession.getPacketBufferSize());
        dummySession.invalidate();

        engine = context.createSSLEngine();
        engine.setUseClientMode(clientMode);
        engine.beginHandshake();
    }

    /**
     * 参考 https://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html
     * 实现的 SSL 的握手协议
     * @return
     * @throws IOException
     */
    protected boolean doHandshake() throws IOException {
        SSLEngineResult result;
        HandshakeStatus handshakeStatus;

        int appBufferSize = engine.getSession().getApplicationBufferSize();
        ByteBuffer myAppData = ByteBuffer.allocate(appBufferSize);
        ByteBuffer peerAppData = ByteBuffer.allocate(appBufferSize);
        myNetData.clear();
        peerNetData.clear();

        handshakeStatus = engine.getHandshakeStatus();
        while (handshakeStatus != HandshakeStatus.FINISHED && handshakeStatus != HandshakeStatus.NOT_HANDSHAKING) {
            switch (handshakeStatus) {
                case NEED_UNWRAP:
                    if (socketChannel.read(peerNetData) < 0) {
                        if (engine.isInboundDone() && engine.isOutboundDone()) {
                            return false;
                        }
                        try {
                            engine.closeInbound();
                        } catch (SSLException e) {
                            log.debug("收到END OF STREAM,关闭连接.", e);
                        }
                        engine.closeOutbound();
                        handshakeStatus = engine.getHandshakeStatus();
                        break;
                    }
                    peerNetData.flip();
                    try {
                        result = engine.unwrap(peerNetData, peerAppData);
                        peerNetData.compact();
                        handshakeStatus = result.getHandshakeStatus();
                    } catch (SSLException sslException) {
                        engine.closeOutbound();
                        handshakeStatus = engine.getHandshakeStatus();
                        break;
                    }
                    switch (result.getStatus()) {
                        case OK:
                            break;
                        case BUFFER_OVERFLOW:
                            peerAppData = enlargeApplicationBuffer(engine, peerAppData);
                            break;
                        case BUFFER_UNDERFLOW:
                            peerNetData = handleBufferUnderflow(engine, peerNetData);
                            break;
                        case CLOSED:
                            if (engine.isOutboundDone()) {
                                return false;
                            } else {
                                engine.closeOutbound();
                                handshakeStatus = engine.getHandshakeStatus();
                                break;
                            }
                        default:
                            throw new IllegalStateException("无效的握手状态: " + result.getStatus());
                    }
                    break;
                case NEED_WRAP:
                    myNetData.clear();
                    try {
                        result = engine.wrap(myAppData, myNetData);
                        handshakeStatus = result.getHandshakeStatus();
                    } catch (SSLException sslException) {
                        engine.closeOutbound();
                        handshakeStatus = engine.getHandshakeStatus();
                        break;
                    }
                    switch (result.getStatus()) {
                        case OK :
                            myNetData.flip();
                            while (myNetData.hasRemaining()) {
                                socketChannel.write(myNetData);
                            }
                            break;
                        case BUFFER_OVERFLOW:
                            myNetData = enlargePacketBuffer(engine, myNetData);
                            break;
                        case BUFFER_UNDERFLOW:
                            throw new SSLException("加密后消息内容为空,报错");
                        case CLOSED:
                            try {
                                myNetData.flip();
                                while (myNetData.hasRemaining()) {
                                    socketChannel.write(myNetData);
                                }
                                peerNetData.clear();
                            } catch (Exception e) {
                                handshakeStatus = engine.getHandshakeStatus();
                            }
                            break;
                        default:
                            throw new IllegalStateException("无效的握手状态: " + result.getStatus());
                    }
                    break;
                case NEED_TASK:
                    Runnable task;
                    while ((task = engine.getDelegatedTask()) != null) {
                        executor.execute(task);
                    }
                    handshakeStatus = engine.getHandshakeStatus();
                    break;
                case FINISHED:
                    break;
                case NOT_HANDSHAKING:
                    break;
                default:
                    throw new IllegalStateException("无效的握手状态: " + handshakeStatus);
            }
        }

        return true;
    }

    /**
     * 参考 https://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html
     * 实现的 SSL 的传输读取协议
     * @param consumer
     * @throws IOException
     */
    public void read(Consumer<ByteBuffer> consumer) throws IOException {
        // BUFFER初始化
        peerNetData.clear();
        int bytesRead = socketChannel.read(peerNetData);
        if (bytesRead > 0) {
            peerNetData.flip();
            while (peerNetData.hasRemaining()) {
                peerAppData.clear();
                SSLEngineResult result = engine.unwrap(peerNetData, peerAppData);
                switch (result.getStatus()) {
                    case OK:
                        log.debug("收到远程的返回结果消息为:" + new String(peerAppData.array(), 0, peerAppData.position()));
                        consumer.accept(peerAppData);
                        peerAppData.flip();
                        break;
                    case BUFFER_OVERFLOW:
                        peerAppData = enlargeApplicationBuffer(engine, peerAppData);
                        break;
                    case BUFFER_UNDERFLOW:
                        peerNetData = handleBufferUnderflow(engine, peerNetData);
                        break;
                    case CLOSED:
                        log.debug("收到远程连接关闭消息.");
                        closeConnection();
                        return;
                    default:
                        throw new IllegalStateException("无效的握手状态: " + result.getStatus());
                }
            }
        } else if (bytesRead < 0) {
            log.debug("收到END OF STREAM,关闭连接.");
            handleEndOfStream();
        }
    }

    public void write(String message) throws IOException {
        write(ByteBuffer.wrap(message.getBytes()));
    }

    /**
     * 参考 https://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html
     * 实现的 SSL 的传输写入协议
     * @param message
     * @throws IOException
     */
    public void write(ByteBuffer message) throws IOException {
        myAppData.clear();
        myAppData.put(message);
        myAppData.flip();
        while (myAppData.hasRemaining()) {
            myNetData.clear();
            SSLEngineResult result = engine.wrap(myAppData, myNetData);
            switch (result.getStatus()) {
                case OK:
                    myNetData.flip();
                    while (myNetData.hasRemaining()) {
                        socketChannel.write(myNetData);
                    }
                    log.debug("写入远程的消息为: {}", message);
                    break;
                case BUFFER_OVERFLOW:
                    myNetData = enlargePacketBuffer(engine, myNetData);
                    break;
                case BUFFER_UNDERFLOW:
                    throw new SSLException("加密后消息内容为空.");
                case CLOSED:
                    closeConnection();
                    return;
                default:
                    throw new IllegalStateException("无效的握手状态: " + result.getStatus());
            }
        }
    }

    /**
     * 关闭连接
     * @throws IOException
     */
    public void closeConnection() throws IOException  {
        engine.closeOutbound();
        doHandshake();
        socketChannel.close();
        executor.shutdown();
    }

    /**
     * END OF STREAM(-1)默认是关闭连接
     * @throws IOException
     */
    protected void handleEndOfStream() throws IOException  {
        try {
            engine.closeInbound();
        } catch (Exception e) {
            log.error("END OF STREAM 关闭失败.", e);
        }
        closeConnection();
    }

}
Copy the code

Above:

  1. Unified handshake based on SSL protocol;
  2. Respectively read decryption, and write encryption method;
  3. Decorator that implements SslSocketChannel as SocketChannel;

SslSocketChannel tests the server

Based on the above encapsulation, the simple test server is as follows

@Slf4j public class NioSslServer { public static void main(String[] args) throws Exception { NioSslServer sslServer = New NioSslServer (" 127.0.0.1 ", 8006); sslServer.start(); Curl curl curl curl curl curl curl curl curl curl curl curl curl curl curl curl private Selector selector; Public NioSslServer(String hostAddress, int port) throws Exception {// Initialize SSLContext Context = serverSSLContext(); / / registered listeners. The selector = SelectorProvider provider () openSelector (); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(hostAddress, port)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } public void start() throws Exception {log.debug(" waiting for connection."); while (true) { selector.select(); Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); if (! key.isValid()) { continue; } if (key.isAcceptable()) { accept(key); } else if (key.isReadable()) { ((SslSocketChannel)key.attachment()).read(buf->{}); Write ("HTTP/1.1 200 OK\r\ nContent-type: text/plain\r\n\r\nOK\r\n\r\n"); ((SslSocketChannel)key.attachment()).closeConnection(); }}}} private void Accept (SelectionKey key) throws Exception {log.debug(" Receives new requests."); SocketChannel socketChannel = ((ServerSocketChannel)key.channel()).accept(); socketChannel.configureBlocking(false); SslSocketChannel sslSocketChannel = new SslSocketChannel(context, socketChannel, false); if (sslSocketChannel.doHandshake()) { socketChannel.register(selector, SelectionKey.OP_READ, sslSocketChannel); } else { socketChannel.close(); Log.debug (" handshake failed, connection closed."); }}}Copy the code

Above:

  1. Because it’s NIO, simple tests need to be tested using the underlying Selector component of NIO;
  2. Initialize ServerSocketChannel and listen on port 8006.
  3. Upon receiving the request, encapsulate the SocketChannel as SslSocketChannel and register it with the Selector
  4. After receiving data, SslSocketChannel performs read and write operations.

SslSocketChannel Test client

Based on the above server encapsulation, the simple test client is as follows

@Slf4j public class NioSslClient { public static void main(String[] args) throws Exception { NioSslClient sslClient = new NioSslClient("httpbin.org", 443); sslClient.connect(); // request 'https://httpbin.org/get'} private String remoteAddress; private int port; private SSLEngine engine; private SocketChannel socketChannel; private SSLContext context; /** * Requires remote HOST and PORT * @param remoteAddress * @param PORT * @throws Exception */ public NioSslClient(String) remoteAddress, int port) throws Exception { this.remoteAddress = remoteAddress; this.port = port; context = clientSSLContext(); engine = context.createSSLEngine(remoteAddress, port); engine.setUseClientMode(true); } public boolean connect() throws Exception { socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress(remoteAddress, port)); while (! SocketChannel. FinishConnect ()) {/ / through the REACTOR, there will be no wait for things to / / the debug (" connection.." ); } SslSocketChannel sslSocketChannel = new SslSocketChannel(context, socketChannel, true); sslSocketChannel.doHandshake(); / / a handshake is completed, open the SELECTOR SELECTOR. The SELECTOR = SelectorProvider provider () openSelector (); socketChannel.register(selector, SelectionKey.OP_READ, sslSocketChannel); // Write request sslSocketChannel.write("GET/GET HTTP/1.1\r\n" + "Host: httpbin.org:443\r\n" + "user-agent: Curl /7.62.0\r\n" + "Accept: */*\r\n" + "\r\n"); While (true) {selector. Select (); Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); if (key.isValid() && key.isReadable()) { ((SslSocketChannel)key.attachment()).read(buf->{ log.info("{}", new String(buf.array(), 0, buf.position())); }); ((SslSocketChannel)key.attachment()).closeConnection(); return true; } } } } }Copy the code

Above:

  1. The client side encapsulation test is to verify that the encapsulation SSL protocol is OK in both directions,
  2. In the non-transparent upstream proxy that follows, SslSocketChannel is used as both server and client
  3. The above encapsulation is similar to server encapsulation except that SocketChannel is initialized with connect instead of bind

conclusion

Above:

  1. An opaque proxy needs to get the complete request data, which can be done in a Decorator pattern.
  2. An opaque proxy needs to obtain the decrypted HTTPS request data. SslSocketChannel encapsulates the original SocketChannel.
  3. Finally, after getting the request, do the corresponding processing, and finally realize the non-transparent proxy.

3 Transparent upstream proxy

Transparent upstream proxies are simpler than transparent proxies, the difference being

  1. Transparent proxy needs to respond to CONNECT request, transparent upstream proxy does not need to directly forward;
  2. The transparent proxy resolves the HOST and PORT in the CONNECT request and connects to the server. The transparent upstream proxy only needs to connect to the IP address of the downstream proxy :PORT to directly forward requests.
  3. Transparent upstream proxy, just a simple SocketChannel pipe; Identify the downstream proxy server to connect to forward the request;

You can achieve a transparent upstream proxy by making these simple changes to the transparent proxy.

4 Non-transparent upstream proxy

An opaque upstream proxy is more complex than an opaque proxy

Above, divided into four components: client, Proxy service (ServerHandler), proxy service (ClientHandler), server

  1. If the request is HTTP, the data goes directly through the client <->ServerHandler<->ClientHandler<-> server, the proxy gateway only needs to do a simple request clustering, can apply the corresponding management policy;
  2. In the CASE of HTTPS requests, the proxy, acting as a middleman between the client and server, can only get encrypted data. Therefore, the proxy gateway needs to communicate with the client as the HTTPS server. Then communicate with the server as the HTTPS client.
  3. When an AGENT serves as an HTTPS server, consider that it is an opaque agent and implement protocols related to the opaque agent.
  4. When a proxy serves as an HTTPS client, consider that its downstream is a transparent proxy and the real server is the one requested by the client.

Design and implementation

This paper needs to build a non-transparent upstream proxy, the following uses NETTY framework to give a detailed design implementation. The unified agent Gateway is divided into two parts, ServerHandler and ClientHandler, as follows

  1. This section describes the implementation of proxy gateway server.
  2. This section describes the implementation of proxy gateway client.

1 Proxy gateway Server

Mainly includes

  1. Example Initialize the proxy gateway server
  2. Initialize the server processor
  3. Server protocol upgrade and processing

Example Initialize the proxy gateway service

Public void the start () {HookedExecutors. NewSingleThreadExecutor (). Submit (() - > {the info (" to start a proxy server, listen on port: {}", auditProxyConfig.getProxyServerPort()); EventLoopGroup bossGroup = new NioEventLoopGroup(auditProxyConfig.getBossThreadCount()); EventLoopGroup workerGroup = new NioEventLoopGroup(auditProxyConfig.getWorkThreadCount()); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.DEBUG)) .childHandler(new ServerChannelInitializer(auditProxyConfig)) .bind(auditProxyConfig.getProxyServerPort()).sync().channel().closeFuture().sync(); } catch (InterruptedException e) {log.error(" The proxy server is interrupted.", e); Thread.currentThread().interrupt(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}); }Copy the code

Proxy gateway initialization is relatively simple.

  1. BossGroup A thread group that receives requests
  2. WorkerGroup A thread group that processes the received request data. The processing logic is encapsulated in ServerChannelInitializer.

The request handler for the proxy gateway service is defined in ServerChannelInitializer as

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline()
            .addLast(new HttpRequestDecoder())
            .addLast(new HttpObjectAggregator(auditProxyConfig.getMaxRequestSize()))
            .addLast(new ServerChannelHandler(auditProxyConfig));
    }
Copy the code

First of all, HTTP request is parsed, and then batch processing is done. Finally, ServerChannelHandler implements the proxy gateway protocol.

Proxy gateway protocol:

  1. Determine if it is a CONNECT request, and if so, store the CONNECT request. Pause the read, send a successful response from the agent, and upgrade the protocol after a successful response;
  2. In essence, SslSocketChannel is used to transparently encapsulate the original SocketChannel.
  3. Finally, CONNECT to the remote server according to CONNECT request;

The detailed implementation is:

@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { FullHttpRequest request = (FullHttpRequest)msg; Try {if (isConnectRequest(request)) {// CONNECT to saveConnectRequest(CTX, request); Ctx.channel ().config().setautoread (false); ConnectionEstablished (CTX, Ctx.newpromise ().addListener(future -> {if (future.issuccess ())) {// Upgrade if (isSslRequest(request) &&! isUpgraded(ctx)) { upgrade(ctx); } // Open message read ctx.channel().config().setautoread (true); ctx.read(); }})); } else {// Other request, check whether upgrade if (! Isupgrade (CTX)) {// Upgrade engine upgrade(CTX); } // Connect to remote connectRemote(CTX, request); } } finally { ctx.fireChannelRead(msg); }}Copy the code

2 Proxy gateway client

The proxy gateway server needs to connect to the remote service to access the proxy gateway client.

Proxy gateway client initialization:

@param CTX * @param httpRequest */ protected void connectRemote(ChannelHandlerContext CTX, FullHttpRequest httpRequest) { Bootstrap b = new Bootstrap(); b.group(ctx.channel().eventLoop()) // use the same EventLoop .channel(ctx.channel().getClass()) .handler(new ClientChannelInitializer(auditProxyConfig, ctx, safeCopy(httpRequest))); // FullHttpRequest originRequest = ctx.channel().attr(CONNECT_REQUEST).get(); if (originRequest == null) { originRequest = httpRequest; } ChannelFuture cf = b.connect(new InetSocketAddress(calculateHost(originRequest), calculatePort(originRequest))); Channel cch = cf.channel(); ctx.channel().attr(CLIENT_CHANNEL).set(cch); }Copy the code

Above:

  1. Reuse proxy gateway server workerGroup thread group;
  2. Request and result processing is encapsulated in ClientChannelInitializer;
  3. The HOST and PORT of the connected remote server can be resolved in the request received by the server.

Initialization logic for the proxy gateway client processor:

@Override protected void initChannel(SocketChannel ch) throws Exception { SocketAddress socketAddress = calculateProxy(); if (! Objects.isNull(socketAddress)) { ch.pipeline().addLast(new HttpProxyHandler(calculateProxy(), auditProxyConfig.getUserName(), auditProxyConfig .getPassword())); } if (isSslRequest()) { String host = host(); int port = port(); if (StringUtils.isNoneBlank(host) && port > 0) { ch.pipeline().addLast(new SslHandler(sslEngine(host, port))); } } ch.pipeline().addLast(new ClientChannelHandler(clientContext, httpRequest)); }Copy the code

Above:

  1. If the downstream is the proxy, then the HttpProxyHandler is used to communicate with the remote server via the downstream proxy.
  2. If SSL is required, socketChannels are transparently encapsulated to implement SSL communication.
  3. Finally, the ClientChannelHandler simply forwards simple messages; The only difference is that since the proxy gateway intercepts the first request, it needs to forward the intercepted request to the server.

Iv. Other Issues

Problems that proxy gateway implementations may face:

1 Memory Problem

The problem the agent usually faces is OOM. This article implements the proxy gateway to ensure in-memory caching of the HTTP/HTTPS request body that is currently being processed. The upper limit of memory usage is theoretically the number of requests processed in real time * the average size of the request body, HTTP/HTTPS request results, direct use of out-of-heap memory, zero copy forwarding.

2 Performance Problems

Performance issues should not be considered in advance. In this paper, NETTY framework is used to implement the proxy gateway, internal use of a large number of out-of-heap memory, zero copy forwarding, to avoid performance problems.

After phase I proxy gateway went online, it faced performance problems caused by a long connection.

  1. After a TCP connection is established between the CLIENT and SERVER (for example, TCP heartbeat detection), either the CLIENT or the SERVER closes the TCP connection.
  2. If both parties occupy TCP connection resources for a long time and do not close the connection, SOCKET resources will be leaked. The CPU resources are full and idle connections are being processed. The new connection cannot be established.

IdleStateHandler is used to monitor idle TCP connections periodically and forcibly close them. The problem is solved.

Five summarizes

This paper focuses on the core of unified Proxy gateway and introduces the principle of proxy related technology in detail.

The administrative part of the proxy gateway, which can be maintained in the ServerHandler part or ClientHandler part;

  1. ServerHandler can intercept conversion requests
  2. ClientHanlder controls the exit of the request

Note: This article uses Netty zero copy; Store requests for parsing; But the processing of RESPONSE is not implemented; That is, RESPONSE is directly through the gateway, this aspect avoids the common proxy implementation, memory leakage OOM related problems;

Finally, after the realization of the proxy gateway, the corresponding control is made for the resources of the proxy and the requests flowing through the proxy gateway, mainly including:

  1. When a static resource is requested, the proxy gateway requests the remote server directly, not through the downstream proxy
  2. When the request HEADER contains a region identifier, the proxy gateway tries to ensure that the request is routed to the specified region proxy to access the remote server

The original link

This article is the original content of Aliyun and shall not be reproduced without permission.