preface
Com. RabbitMQ :amqp-client:4.8.3
Ps: Recently, I received that the company’s task is to read and analyze Spring-Rabbit and AMQP-Client, so I would like to share the AMQP-Client with you. Since RabbitMQ is developed in the Erlang language (there are no plans to share this)
Warning: this is for those who have some knowledge of RabbitMQ
- RabbitMQ Getstarted: www.rabbitmq.com/#getstarted
- Java Client API Guide: www.rabbitmq.com/api-guide.h…
Nonsense no more words, open whole!!
Java Client Connection Demo
Java Client Connecting to RabbitMQ Demo
ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();
Channel channel = connection.createChannel();
byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
channel.close();
connection.close();
Copy the code
AMQP protocol interaction flow
Establish a Connection and Channel with RabbitMQ Broker, send a message, then close the Connection and Channel. The following figure shows the AMQP interaction process using Wireshark to capture packets. 172.30.0.74 is the local IP address of the AMQP client. 192.168.17.160 is the IP address of the RabbitMQ Broker.
A client and a broker create a Connection, Channel, and send messages
The client and broker send the Heartbeat and close the Connection and Channel
The AMQP command is used in pairs. The command in the Info protocol is -, and the command in the code is humped.
-
Write the AMQP 0-9-1 connection header to the underlying socket, containing the specified version information (the client tells the broker which protocol and version it is using. The underlying socket uses Java)
-
The client waits for connection.start to be sent by the broker (the broker tells the client about the protocol and version to communicate with, SASL authentication (see details), locale and RabbitMQ version information and support)
-
When received, the client sends connection. StartOk (The client tells the broker the account and password to use for the Connection, authentication mechanism, locale, client information, and capabilities)
-
The client waits for connection.tune to be sent by the broker (broker negotiates parameters with the client)
-
TuneOk (tell the broker when client parameters [ChannelMax, FrameMax, Heartbeat] are negotiated)
-
Client sends connection. Open (Client tells broker to Open a Connection and requests setting _virtualHost [vhost])
-
The broker returns connection. OpenOk after receiving the message.
-
The client sends channel. Open and the broker returns channel. OpenOk (the client creates the Channel; Broker received and created channel completed)
-
The client sends confirm. Select, and the broker returns confirm. SelectOk. (The client tells the broker that the message needs to be sent using the Confirm mechanism.)
-
The client sends the message basic. Publish, and the broker replies basic. Ack
-
During this time, the client and broker check each other’s heartbeat
-
The client closes Channel channel. Close and the broker replies with channel. CloseOk
-
The client closes the Connection connection.close and the broker replies back with connection.closeok
Source code analysis
Familiar with the closer interaction of the agreement process is easy to understand the source code, introduced the main start Connection related source: ConnectionFactory. NewConnection – > AMQConnection. Start
ConnectionFactory.newConnection()
public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName)
throws IOException, TimeoutException {
if(this.metricsCollector == null) {
this.metricsCollector = new NoOpMetricsCollector();
}
// make sure we respect the provided thread factory
// Create socketFactory and initialize the corresponding configuration
FrameHandlerFactory fhFactory = createFrameHandlerFactory();
// Initializes the parameters involved in Connection
ConnectionParams params = params(executor);
// set client-provided via a client property
if(clientProvidedName ! =null) {
Map<String, Object> properties = new HashMap<String, Object>(params.getClientProperties());
properties.put("connection_name", clientProvidedName);
params.setClientProperties(properties);
}
// This logic belongs to the logic provided by Rabbit to automatically restore connections
if (isAutomaticRecoveryEnabled()) {
// see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector);
conn.init();
return conn;
} else {
List<Address> addrs = addressResolver.getAddresses();
Exception lastException = null;
for (Address addr : addrs) {
try {
// Create, connect, and encapsulate a socket to return a SocketFrameHandler.
FrameHandler handler = fhFactory.create(addr);
// Initialize the configuration, _channel0, _channelManager, etc
AMQConnection conn = createConnection(params, handler, metricsCollector);
// Starting AMQConnection will be described later
conn.start();
this.metricsCollector.newConnection(conn);
return conn;
} catch (IOException e) {
lastException = e;
} catch(TimeoutException te) { lastException = te; }}if(lastException ! =null) {
if (lastException instanceof IOException) {
throw (IOException) lastException;
} else if (lastException instanceof TimeoutException) {
throw(TimeoutException) lastException; }}throw new IOException("failed to connect"); }}Copy the code
The logic 1-6 in the interaction process of AMQP protocol belongs to the key logic of amqConnection.start (), which is also the main point introduced to you this time
public void start(a)
throws IOException, TimeoutException {
// Initialize the worker thread
initializeConsumerWorkService();
// Initialize heartbeat sending
initializeHeartbeatSender();
// Start the Connection flag bit
this._running = true;
// Verify that the client sends the header protocol first
AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =
new AMQChannel.SimpleBlockingRpcContinuation();
// Enter the Rpc queue and block, waiting for the broker to return connection.start method
_channel0.enqueueRpc(connStartBlocker);
try {
// The following two lines are akin to AMQChannel's
// transmit() method for this pseudo-RPC.
_frameHandler.setTimeout(handshakeTimeout);
// 1. Send header protocol AMQP 0-9-1
_frameHandler.sendHeader();
} catch (IOException ioe) {
_frameHandler.close();
throw ioe;
}
// Initialize startMainLoop to receive and process messages sent by the broker
this._frameHandler.initialize(this);
AMQP.Connection.Start connStart;
AMQP.Connection.Tune connTune = null;
try {
// 2. The client waits for the connection.start message sent by the broker
connStart =
(AMQP.Connection.Start) connStartBlocker.getReply(handshakeTimeout/2).getMethod();
// Communication protocol and version, SASL authentication mechanism (see details), locale, and RabbitMQ version and support capabilities
_serverProperties = Collections.unmodifiableMap(connStart.getServerProperties());
Version serverVersion =
new Version(connStart.getVersionMajor(),
connStart.getVersionMinor());
// Version comparison
if(! Version.checkVersion(clientVersion, serverVersion)) {throw new ProtocolVersionMismatchException(clientVersion,
serverVersion);
}
String[] mechanisms = connStart.getMechanisms().toString().split("");
SaslMechanism sm = this.saslConfig.getSaslMechanism(mechanisms);
if (sm == null) {
throw new IOException("No compatible authentication mechanism found - " +
"server offered [" + connStart.getMechanisms() + "]");
}
String username = credentialsProvider.getUsername();
String password = credentialsProvider.getPassword();
LongString challenge = null;
LongString response = sm.handleChallenge(null, username, password);
do {
// 3. The client sends' connection. StartOk '
Method method = (challenge == null)?new AMQP.Connection.StartOk.Builder()
.clientProperties(_clientProperties)
.mechanism(sm.getName())
.response(response)
.build()
: new AMQP.Connection.SecureOk.Builder().response(response).build();
try {
Method serverResponse = _channel0.rpc(method, handshakeTimeout/2).getMethod();
if (serverResponse instanceof AMQP.Connection.Tune) {
// 4. The client waits for the connection.tune sent by the broker
connTune = (AMQP.Connection.Tune) serverResponse;
} else{ challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge(); response = sm.handleChallenge(challenge, username, password); }}catch (ShutdownSignalException e) {
Method shutdownMethod = e.getReason();
if (shutdownMethod instanceof AMQP.Connection.Close) {
AMQP.Connection.Close shutdownClose = (AMQP.Connection.Close) shutdownMethod;
if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) {
throw newAuthenticationFailureException(shutdownClose.getReplyText()); }}throw newPossibleAuthenticationFailureException(e); }}while (connTune == null);
} catch (TimeoutException te) {
_frameHandler.close();
throw te;
} catch (ShutdownSignalException sse) {
_frameHandler.close();
throw AMQChannel.wrap(sse);
} catch(IOException ioe) {
_frameHandler.close();
throw ioe;
}
try {
// Maximum number of channels
int channelMax =
negotiateChannelMax(this.requestedChannelMax,
connTune.getChannelMax());
_channelManager = instantiateChannelManager(channelMax, threadFactory);
// Maximum frame size
int frameMax =
negotiatedMaxValue(this.requestedFrameMax,
connTune.getFrameMax());
this._frameMax = frameMax;
/ / the heart
int heartbeat =
negotiatedMaxValue(this.requestedHeartbeat,
connTune.getHeartbeat());
setHeartbeat(heartbeat);
// 5. The client sends connection.tuneok
_channel0.transmit(new AMQP.Connection.TuneOk.Builder()
.channelMax(channelMax)
.frameMax(frameMax)
.heartbeat(heartbeat)
.build());
// 6. Client sends channel.open
_channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder()
.virtualHost(_virtualHost)
.build());
} catch (IOException ioe) {
_heartbeatSender.shutdown();
_frameHandler.close();
throw ioe;
} catch (ShutdownSignalException sse) {
_heartbeatSender.shutdown();
_frameHandler.close();
throw AMQChannel.wrap(sse);
}
// We can now respond to errors having finished tailoring the connection
this._inConnectionNegotiation = false;
}
Copy the code
The last
The RabbitMQ Client and RabbitMQ Broker will interact with each other through the AMQP protocol. There are also a lot of Connection details source need to slowly understand the reader
My wechat official account: Java Architect advanced programming focus on sharing Java technology dry goods, looking forward to your attention!