Graphical transaction messages

Transaction messages can be used to solve distributed transaction problems through message re-commit mechanism.

How do I send transaction messages

/** * Send transaction messages */ @test public void sendTrasitionMessage() throws Exception {// Create transaction message provider TransactionMQProducer Producer  = new TransactionMQProducer("group"); // Set the namesrv address producer.setNamesrvaddr ("127.0.0.1:9876"); / / set the local transaction producer. Review the setTransactionListener (new TransactionListener () {/ * * * @ param message @ param message object * o * @override public LocalTransactionState executeLocalTransaction(Message Message, The Object o {the if (message. GetTags (.) the equals (" TAG1 ")) {/ / local transaction completes, return LocalTransactionState.COM MIT_MESSAGE commit message affairs; } else if (message. GetTags (.) the equals (" TAG2 ")) {/ / local failed to perform the transaction, transaction rollback news return LocalTransactionState. ROLLBACK_MESSAGE; } else if (message. GetTags (.) the equals (" TAG3 ")) {/ / message state unknown return LocalTransactionState. UNKNOW; } return null; } /** * if the message status is unknown, */ @override public LocalTransactionState checkLocalTransaction(MessageExt MessageExt) { System.out.println(messageExt.getBody()); return LocalTransactionState.COMMIT_MESSAGE; }}); // Start producer. Start (); Message mesage = new Message("Topic","TAG1","myMsg".getbytes ()); / / send a transaction message producer. SendMessageInTransaction (mesage, null); producer.shutdown(); }Copy the code

State of affairs

ROLLBACK_MESSAGE: the local transaction is rolled back to delete the message. UNKONW: The transaction status is unknown and the broker needs to look back to the local transactionCopy the code

Matters needing attention:

// Ignore DelayTimeLevel parameter if (msg.getDelayTimelevel ()! = 0) { MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); } 2. Messages are not guaranteed to be checked or consumed only once, so the consumer needs to do idempotent processing 3. Batch sending is not supportedCopy the code

Transaction message recommit mechanism (source code analysis):

Level if (MSG. GetDelayTimeLevel ()! = 0) { MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); } SendResult sendResult = null; / / set to advance message MessageAccessor. PutProperty (MSG, MessageConst PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); SendResult = this.send(MSG); } catch (Exception e) { throw new MQClientException("send message Exception", e); } switch (sendresult.getsendStatus ()) {case SEND_OK: {if (null! = localTransactionExecuter) {/ / localTransactionState = localTransactionExecuter perform local affairs. executeLocalTransactionBranch(msg, arg); } else if (transactionListener ! = null) { log.debug("Used new transaction API"); localTransactionState = transactionListener. executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; }} 3. Pass the request header to broker switch (localTransactionState) {case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; } requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId()); String remark = localException ! = null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; / / the state of affairs to broker endTransactionOneway (brokerAddr requestHeader, remark, enclosing defaultMQProducer. GetSendMsgTimeout ()); 4. If a local transaction commits, the broker commits the message. -- Push the transaction message to the real queue on the Broker side of the transaction commit/rollback operation (here take the endTransaction section) code entry: org.apache.rocketmq.broker.processor.EndTransactionProcessor if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage())); MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED); RemotingCommand sendResult = sendFinalMessage(msgInner); If (sendresult.getCode () == responsecode.success) {// Delete the message from the original transaction queue this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return sendResult} 4. If the local transaction rolls back, the broker deletes the message. - delete the message from the particular transaction queue else if (MessageSysFlag. TRANSACTION_ROLLBACK_TYPE = = requestHeader. GetCommitOrRollback ()) {result = this.brokerController.getTransactionalMessageService(). rollbackMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { this.brokerController.getTransactionalMessageService(). deletePrepareMessage(result.getPrepareMessage()); } return res; }} 5. If the transaction state is unknown due to network or other reasons, the broker checks back the unknown message. 6. Check the transaction status periodically. If the transaction fails, the message is deleted, and if the transaction succeeds, the message is committed to the real queue.Copy the code