sequence
This paper mainly studies cheddar TX
MessageAction
Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/MessageAction.java
public class MessageAction { private final TypedMessage typedMessage; private final int delaySeconds; public MessageAction(final TypedMessage typedMessage, final int delaySeconds) { this.typedMessage = typedMessage; this.delaySeconds = delaySeconds; } public TypedMessage message() { return typedMessage; } public int delay() { return delaySeconds; } public void apply(final MessageSender<TypedMessage> messageSender) { if (delay() > 0) { messageSender.sendDelayedMessage(typedMessage, delaySeconds); } else { messageSender.send(typedMessage); } } public void apply(final MessagePublisher<TypedMessage> messagePublisher) { messagePublisher.publish(typedMessage); }}Copy the code
MessageAction defines the typedMessage and delaySeconds properties, which provide two apply methods, Receive the messageSender parameters apply method when the delay is more than 0 to perform messageSender. SendDelayedMessage, otherwise do messageSender. Send (typedMessage); The Apply method that receives the messagePublisher parameter executes messagePublisher.publish
MessageSender
Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessageSender.java
public interface MessageSender<T extends Message> { /** * Send a message * @param message Message to send * @throws MessageSendException */ void send(T message) throws MessageSendException; /** * Send a message, where the message is not visible to receivers for the specified delay duration * @param message Message to send * @param delaySeconds Duration for which sent message is invisible to receivers * @throws MessageSendException */ void sendDelayedMessage(T message, int delaySeconds) throws MessageSendException; }Copy the code
The MessageSender interface defines send and sendDelayedMessage methods
TransactionalResource
Cheddar/cheddar/cheddar-tx-api/src/main/java/com/clicktravel/cheddar/infrastructure/tx/TransactionalResource.java
public interface TransactionalResource {
void begin() throws TransactionException;
void commit() throws TransactionException;
void abort() throws TransactionException;
}
Copy the code
The TransactionalResource interface defines begin, COMMIT, and abort methods
TransactionalMessageSender
Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/TransactionalMessageSender. java
public class TransactionalMessageSender implements MessageSender<TypedMessage>, TransactionalResource { private final Logger logger = LoggerFactory.getLogger(getClass()); private final MessageSender<TypedMessage> messageSender; private final ThreadLocal<MessagingTransaction> currentTransaction = new ThreadLocal<MessagingTransaction>(); public TransactionalMessageSender(final MessageSender<TypedMessage> messageSender) { this.messageSender = messageSender; } private MessagingTransaction getCurrentTransaction() { if (currentTransaction.get() == null) { throw new NonExistentTransactionException(); } return currentTransaction.get(); } @Override public void begin() throws TransactionException { if (currentTransaction.get() ! = null) { throw new NestedTransactionException(currentTransaction.get()); } currentTransaction.set(new MessagingTransaction()); logger.trace("Beginning transaction: " + currentTransaction.get().transactionId()); } @Override public void commit() throws TransactionException { final MessagingTransaction transaction = getCurrentTransaction(); logger.trace("Committing transaction: " + transaction.transactionId()); transaction.applyActions(messageSender); currentTransaction.remove(); logger.trace("Transaction successfully committed: " + transaction.transactionId()); } @Override public void send(final TypedMessage typedMessage) throws MessageSendException { final MessagingTransaction transaction = getCurrentTransaction(); transaction.addMessage(typedMessage); } @Override public void sendDelayedMessage(final TypedMessage typedMessage, final int delay) throws MessageSendException { final MessagingTransaction transaction = getCurrentTransaction(); transaction.addDelayedMessage(typedMessage, delay); } @Override public void abort() throws TransactionException { currentTransaction.remove(); }}Copy the code
To achieve a TransactionalMessageSender MessageSender, TransactionalResource interface; The begin method sets the new MessagingTransaction to currentTransaction; Obtained the commit MessagingTransaction, perform applyActions method, and finally execute currentTransaction. Remove (); Abort method performs currentTransaction. Remove () method; The send method performs transaction.addMessage; The sendDelayedMessage method executes addDelayedMessage
MessagePublisher
Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessagePublisher.java
public interface MessagePublisher<T extends Message> {
/**
* Forward a message for publication
* @param message
* @throws MessagePublishException
*/
void publish(T message) throws MessagePublishException;
}
Copy the code
The MessagePublisher interface defines the publish method
TransactionalMessagePublisher
Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/TransactionalMessagePublish er.java
public class TransactionalMessagePublisher implements MessagePublisher<TypedMessage>, TransactionalResource {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final MessagePublisher<TypedMessage> messagePublisher;
private final ThreadLocal<MessagingTransaction> currentTransaction = new ThreadLocal<MessagingTransaction>();
public TransactionalMessagePublisher(final MessagePublisher<TypedMessage> messagePublisher) {
this.messagePublisher = messagePublisher;
}
private MessagingTransaction getCurrentTransaction() {
if (currentTransaction.get() == null) {
throw new NonExistentTransactionException();
}
return currentTransaction.get();
}
@Override
public void begin() throws TransactionException {
if (currentTransaction.get() != null) {
throw new NestedTransactionException(currentTransaction.get());
}
currentTransaction.set(new MessagingTransaction());
logger.trace("Beginning transaction: " + currentTransaction.get().transactionId());
}
@Override
public void commit() throws TransactionException {
final MessagingTransaction transaction = getCurrentTransaction();
logger.trace("Committing transaction: " + transaction.transactionId());
transaction.applyActions(messagePublisher);
currentTransaction.remove();
logger.trace("Transaction successfully committed: " + transaction.transactionId());
}
@Override
public void publish(final TypedMessage typedMessage) throws MessagePublishException {
final MessagingTransaction transaction = getCurrentTransaction();
transaction.addMessage(typedMessage);
}
@Override
public void abort() throws TransactionException {
currentTransaction.remove();
}
}
Copy the code
To achieve a TransactionalMessagePublisher MessagePublisher, TransactionalResource interface; The begin method sets the new MessagingTransaction to currentTransaction; Obtained the commit MessagingTransaction, perform applyActions method, and finally execute currentTransaction. Remove (); Abort method performs currentTransaction. Remove () method; The publish method performs transaction.addMessage
Transaction
Cheddar/cheddar/cheddar-tx-api/src/main/java/com/clicktravel/cheddar/infrastructure/tx/Transaction.java
public interface Transaction {
String transactionId();
}
Copy the code
The Transaction interface defines the transactionId method
MessagingTransaction
Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/MessagingTransaction.java
public class MessagingTransaction implements Transaction { private final Queue<MessageAction> messageActions; private final String transactionId; public MessagingTransaction() { messageActions = new LinkedList<>(); transactionId = UUID.randomUUID().toString(); } @Override public String transactionId() { return transactionId; } public void applyActions(final MessagePublisher<TypedMessage> messagePublisher) { while (! messageActions.isEmpty()) { final MessageAction messageAction = messageActions.remove(); messagePublisher.publish(messageAction.message()); } } public void applyActions(final MessageSender<TypedMessage> messageSender) { while (! messageActions.isEmpty()) { final MessageAction messageAction = messageActions.remove(); messageAction.apply(messageSender); } } public void addMessage(final TypedMessage typedMessage) { messageActions.add(new MessageAction(typedMessage, 0)); } public void addDelayedMessage(final TypedMessage typedMessage, final int delay) { messageActions.add(new MessageAction(typedMessage, delay)); }}Copy the code
The MessagingTransaction method implements the Transaction interface. Its transactionId method returns the constructor generated UUID; Publish and MessageAction.apply (messageSender)
summary
The tx provides TransactionalMessagePublisher, TransactionalMessageSender cheddar, they all realized TransactionalResource interface; Both commit methods execute transaction.applyActions; MessageAction provides two apply method, receive the messageSender parameters apply method when the delay is more than 0 to perform messageSender. SendDelayedMessage, Otherwise, messagesender. send(typedMessage) is executed. The Apply method that receives the messagePublisher parameter executes messagePublisher.publish.
doc
- Cheddar