preface

Com. RabbitMQ :amqp-client:4.8.3

Ps: Recently, I received that the company’s task is to read and analyze Spring-Rabbit and AMQP-Client, so I would like to share the AMQP-Client with you. Since RabbitMQ is developed in the Erlang language (there are no plans to share this)

Warning: this is for those who have some knowledge of RabbitMQ

  • RabbitMQ Getstarted: www.rabbitmq.com/#getstarted
  • Java Client API Guide: www.rabbitmq.com/api-guide.h…

Nonsense no more words, open whole!!

Java Client Connection Demo

Java Client Connecting to RabbitMQ Demo

ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);

Connection conn = factory.newConnection();
Channel channel = connection.createChannel();

byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
channel.close();
connection.close();
Copy the code

AMQP protocol interaction flow

Establish a Connection and Channel with RabbitMQ Broker, send a message, then close the Connection and Channel. The following figure shows the AMQP interaction process using Wireshark to capture packets. 172.30.0.74 is the local IP address of the AMQP client. 192.168.17.160 is the IP address of the RabbitMQ Broker.

A client and a broker create a Connection, Channel, and send messages

The client and broker send the Heartbeat and close the Connection and Channel

The AMQP command is used in pairs. The command in the Info protocol is -, and the command in the code is humped.

  1. Write the AMQP 0-9-1 connection header to the underlying socket, containing the specified version information (the client tells the broker which protocol and version it is using. The underlying socket uses Java)

  2. The client waits for connection.start to be sent by the broker (the broker tells the client about the protocol and version to communicate with, SASL authentication (see details), locale and RabbitMQ version information and support)

  3. When received, the client sends connection. StartOk (The client tells the broker the account and password to use for the Connection, authentication mechanism, locale, client information, and capabilities)

  4. The client waits for connection.tune to be sent by the broker (broker negotiates parameters with the client)

  5. TuneOk (tell the broker when client parameters [ChannelMax, FrameMax, Heartbeat] are negotiated)

  6. Client sends connection. Open (Client tells broker to Open a Connection and requests setting _virtualHost [vhost])

  7. The broker returns connection. OpenOk after receiving the message.

  8. The client sends channel. Open and the broker returns channel. OpenOk (the client creates the Channel; Broker received and created channel completed)

  9. The client sends confirm. Select, and the broker returns confirm. SelectOk. (The client tells the broker that the message needs to be sent using the Confirm mechanism.)

  10. The client sends the message basic. Publish, and the broker replies basic. Ack

  11. During this time, the client and broker check each other’s heartbeat

  12. The client closes Channel channel. Close and the broker replies with channel. CloseOk

  13. The client closes the Connection connection.close and the broker replies back with connection.closeok

Source code analysis

Familiar with the closer interaction of the agreement process is easy to understand the source code, introduced the main start Connection related source: ConnectionFactory. NewConnection – > AMQConnection. Start

ConnectionFactory.newConnection()

public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName)
        throws IOException, TimeoutException {
        if(this.metricsCollector == null) {
            this.metricsCollector = new NoOpMetricsCollector();
        }
        // make sure we respect the provided thread factory
  			// Create socketFactory and initialize the corresponding configuration
        FrameHandlerFactory fhFactory = createFrameHandlerFactory();
  			// Initializes the parameters involved in Connection
        ConnectionParams params = params(executor);
        // set client-provided via a client property
        if(clientProvidedName ! =null) {
            Map<String, Object> properties = new HashMap<String, Object>(params.getClientProperties());
            properties.put("connection_name", clientProvidedName);
            params.setClientProperties(properties);
        }

  			// This logic belongs to the logic provided by Rabbit to automatically restore connections
        if (isAutomaticRecoveryEnabled()) {
            // see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection
            AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector);

            conn.init();
            return conn;
        } else {
            List<Address> addrs = addressResolver.getAddresses();
            Exception lastException = null;
            for (Address addr : addrs) {
                try {
                    // Create, connect, and encapsulate a socket to return a SocketFrameHandler.
                    FrameHandler handler = fhFactory.create(addr);
                    // Initialize the configuration, _channel0, _channelManager, etc
                    AMQConnection conn = createConnection(params, handler, metricsCollector);
                  	// Starting AMQConnection will be described later
                    conn.start();
                    this.metricsCollector.newConnection(conn);
                    return conn;
                } catch (IOException e) {
                    lastException = e;
                } catch(TimeoutException te) { lastException = te; }}if(lastException ! =null) {
                if (lastException instanceof IOException) {
                    throw (IOException) lastException;
                } else if (lastException instanceof TimeoutException) {
                    throw(TimeoutException) lastException; }}throw new IOException("failed to connect"); }}Copy the code

The logic 1-6 in the interaction process of AMQP protocol belongs to the key logic of amqConnection.start (), which is also the main point introduced to you this time

public void start(a)
            throws IOException, TimeoutException {
  			// Initialize the worker thread
        initializeConsumerWorkService();
  			// Initialize heartbeat sending
        initializeHeartbeatSender();
  			// Start the Connection flag bit
        this._running = true;

  			// Verify that the client sends the header protocol first
        AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =
            new AMQChannel.SimpleBlockingRpcContinuation();

  			// Enter the Rpc queue and block, waiting for the broker to return connection.start method
        _channel0.enqueueRpc(connStartBlocker);
        try {
            // The following two lines are akin to AMQChannel's
            // transmit() method for this pseudo-RPC.
            _frameHandler.setTimeout(handshakeTimeout);
          	// 1. Send header protocol AMQP 0-9-1
            _frameHandler.sendHeader();
        } catch (IOException ioe) {
            _frameHandler.close();
            throw ioe;
        }

  			// Initialize startMainLoop to receive and process messages sent by the broker
        this._frameHandler.initialize(this);

        AMQP.Connection.Start connStart;
        AMQP.Connection.Tune connTune = null;
        try {
          	// 2. The client waits for the connection.start message sent by the broker
            connStart =
                    (AMQP.Connection.Start) connStartBlocker.getReply(handshakeTimeout/2).getMethod();

          	// Communication protocol and version, SASL authentication mechanism (see details), locale, and RabbitMQ version and support capabilities
            _serverProperties = Collections.unmodifiableMap(connStart.getServerProperties());

            Version serverVersion =
                    new Version(connStart.getVersionMajor(),
                                       connStart.getVersionMinor());
						
          	// Version comparison
            if(! Version.checkVersion(clientVersion, serverVersion)) {throw new ProtocolVersionMismatchException(clientVersion,
                                                                  serverVersion);
            }

            String[] mechanisms = connStart.getMechanisms().toString().split("");
            SaslMechanism sm = this.saslConfig.getSaslMechanism(mechanisms);
            if (sm == null) {
                throw new IOException("No compatible authentication mechanism found - " +
                                              "server offered [" + connStart.getMechanisms() + "]");
            }

            String username = credentialsProvider.getUsername();
            String password = credentialsProvider.getPassword();
            LongString challenge = null;
            LongString response = sm.handleChallenge(null, username, password);

            do {
                // 3. The client sends' connection. StartOk '
                Method method = (challenge == null)?new AMQP.Connection.StartOk.Builder()
                                                  .clientProperties(_clientProperties)
                                                  .mechanism(sm.getName())
                                                  .response(response)
                                                  .build()
                                        : new AMQP.Connection.SecureOk.Builder().response(response).build();

                try {
                    Method serverResponse = _channel0.rpc(method, handshakeTimeout/2).getMethod();
                    if (serverResponse instanceof AMQP.Connection.Tune) {
                        // 4. The client waits for the connection.tune sent by the broker
                        connTune = (AMQP.Connection.Tune) serverResponse;
                    } else{ challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge(); response = sm.handleChallenge(challenge, username, password); }}catch (ShutdownSignalException e) {
                    Method shutdownMethod = e.getReason();
                    if (shutdownMethod instanceof AMQP.Connection.Close) {
                        AMQP.Connection.Close shutdownClose = (AMQP.Connection.Close) shutdownMethod;
                        if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) {
                            throw newAuthenticationFailureException(shutdownClose.getReplyText()); }}throw newPossibleAuthenticationFailureException(e); }}while (connTune == null);
        } catch (TimeoutException te) {
            _frameHandler.close();
            throw te;
        } catch (ShutdownSignalException sse) {
            _frameHandler.close();
            throw AMQChannel.wrap(sse);
        } catch(IOException ioe) {
            _frameHandler.close();
            throw ioe;
        }

        try {
          	// Maximum number of channels
            int channelMax =
                negotiateChannelMax(this.requestedChannelMax,
                                    connTune.getChannelMax());
            _channelManager = instantiateChannelManager(channelMax, threadFactory);
						
          	// Maximum frame size
            int frameMax =
                negotiatedMaxValue(this.requestedFrameMax,
                                   connTune.getFrameMax());
            this._frameMax = frameMax;

          	/ / the heart
            int heartbeat =
                negotiatedMaxValue(this.requestedHeartbeat,
                                   connTune.getHeartbeat());

            setHeartbeat(heartbeat);

            // 5. The client sends connection.tuneok
            _channel0.transmit(new AMQP.Connection.TuneOk.Builder()
                                .channelMax(channelMax)
                                .frameMax(frameMax)
                                .heartbeat(heartbeat)
                              .build());
            // 6. Client sends channel.open
            _channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder()
                                      .virtualHost(_virtualHost)
                                    .build());
        } catch (IOException ioe) {
            _heartbeatSender.shutdown();
            _frameHandler.close();
            throw ioe;
        } catch (ShutdownSignalException sse) {
            _heartbeatSender.shutdown();
            _frameHandler.close();
            throw AMQChannel.wrap(sse);
        }

        // We can now respond to errors having finished tailoring the connection
        this._inConnectionNegotiation = false;
    }
Copy the code

The last

The RabbitMQ Client and RabbitMQ Broker will interact with each other through the AMQP protocol. There are also a lot of Connection details source need to slowly understand the reader


My wechat official account: Java Architect advanced programming focus on sharing Java technology dry goods, looking forward to your attention!