sequence

This paper mainly studies cheddar TX

MessageAction

Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/MessageAction.java

public class MessageAction { private final TypedMessage typedMessage; private final int delaySeconds; public MessageAction(final TypedMessage typedMessage, final int delaySeconds) { this.typedMessage = typedMessage; this.delaySeconds = delaySeconds; } public TypedMessage message() { return typedMessage; } public int delay() { return delaySeconds; } public void apply(final MessageSender<TypedMessage> messageSender) { if (delay() > 0) { messageSender.sendDelayedMessage(typedMessage, delaySeconds); } else { messageSender.send(typedMessage); } } public void apply(final MessagePublisher<TypedMessage> messagePublisher) { messagePublisher.publish(typedMessage); }}Copy the code

MessageAction defines the typedMessage and delaySeconds properties, which provide two apply methods, Receive the messageSender parameters apply method when the delay is more than 0 to perform messageSender. SendDelayedMessage, otherwise do messageSender. Send (typedMessage); The Apply method that receives the messagePublisher parameter executes messagePublisher.publish

MessageSender

Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessageSender.java

public interface MessageSender<T extends Message> { /** * Send a message * @param message Message to send * @throws MessageSendException */ void send(T message) throws MessageSendException; /** * Send a message, where the message is not visible to receivers for the specified delay duration * @param message Message to send * @param  delaySeconds Duration for which sent message is invisible to receivers * @throws MessageSendException */ void sendDelayedMessage(T message, int delaySeconds) throws MessageSendException; }Copy the code

The MessageSender interface defines send and sendDelayedMessage methods

TransactionalResource

Cheddar/cheddar/cheddar-tx-api/src/main/java/com/clicktravel/cheddar/infrastructure/tx/TransactionalResource.java

public interface TransactionalResource {

    void begin() throws TransactionException;

    void commit() throws TransactionException;

    void abort() throws TransactionException;
}
Copy the code

The TransactionalResource interface defines begin, COMMIT, and abort methods

TransactionalMessageSender

Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/TransactionalMessageSender. java

public class TransactionalMessageSender implements MessageSender<TypedMessage>, TransactionalResource { private final Logger logger = LoggerFactory.getLogger(getClass()); private final MessageSender<TypedMessage> messageSender; private final ThreadLocal<MessagingTransaction> currentTransaction = new ThreadLocal<MessagingTransaction>(); public TransactionalMessageSender(final MessageSender<TypedMessage> messageSender) { this.messageSender = messageSender;  } private MessagingTransaction getCurrentTransaction() { if (currentTransaction.get() == null) { throw new NonExistentTransactionException(); } return currentTransaction.get(); } @Override public void begin() throws TransactionException { if (currentTransaction.get() ! = null) { throw new NestedTransactionException(currentTransaction.get()); } currentTransaction.set(new MessagingTransaction()); logger.trace("Beginning transaction: " + currentTransaction.get().transactionId()); } @Override public void commit() throws TransactionException { final MessagingTransaction transaction = getCurrentTransaction(); logger.trace("Committing transaction: " + transaction.transactionId()); transaction.applyActions(messageSender); currentTransaction.remove(); logger.trace("Transaction successfully committed: " + transaction.transactionId()); } @Override public void send(final TypedMessage typedMessage) throws MessageSendException { final MessagingTransaction transaction = getCurrentTransaction(); transaction.addMessage(typedMessage); } @Override public void sendDelayedMessage(final TypedMessage typedMessage, final int delay) throws MessageSendException { final MessagingTransaction transaction = getCurrentTransaction(); transaction.addDelayedMessage(typedMessage, delay); } @Override public void abort() throws TransactionException { currentTransaction.remove(); }}Copy the code

To achieve a TransactionalMessageSender MessageSender, TransactionalResource interface; The begin method sets the new MessagingTransaction to currentTransaction; Obtained the commit MessagingTransaction, perform applyActions method, and finally execute currentTransaction. Remove (); Abort method performs currentTransaction. Remove () method; The send method performs transaction.addMessage; The sendDelayedMessage method executes addDelayedMessage

MessagePublisher

Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessagePublisher.java

public interface MessagePublisher<T extends Message> {

    /**
     * Forward a message for publication
     * @param message
     * @throws MessagePublishException
     */
    void publish(T message) throws MessagePublishException;

}
Copy the code

The MessagePublisher interface defines the publish method

TransactionalMessagePublisher

Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/TransactionalMessagePublish er.java

public class TransactionalMessagePublisher implements MessagePublisher<TypedMessage>, TransactionalResource {

    private final Logger logger = LoggerFactory.getLogger(getClass());

    private final MessagePublisher<TypedMessage> messagePublisher;
    private final ThreadLocal<MessagingTransaction> currentTransaction = new ThreadLocal<MessagingTransaction>();

    public TransactionalMessagePublisher(final MessagePublisher<TypedMessage> messagePublisher) {
        this.messagePublisher = messagePublisher;
    }

    private MessagingTransaction getCurrentTransaction() {
        if (currentTransaction.get() == null) {
            throw new NonExistentTransactionException();
        }
        return currentTransaction.get();
    }

    @Override
    public void begin() throws TransactionException {
        if (currentTransaction.get() != null) {
            throw new NestedTransactionException(currentTransaction.get());
        }
        currentTransaction.set(new MessagingTransaction());
        logger.trace("Beginning transaction: " + currentTransaction.get().transactionId());
    }

    @Override
    public void commit() throws TransactionException {
        final MessagingTransaction transaction = getCurrentTransaction();
        logger.trace("Committing transaction: " + transaction.transactionId());
        transaction.applyActions(messagePublisher);
        currentTransaction.remove();
        logger.trace("Transaction successfully committed: " + transaction.transactionId());
    }

    @Override
    public void publish(final TypedMessage typedMessage) throws MessagePublishException {
        final MessagingTransaction transaction = getCurrentTransaction();
        transaction.addMessage(typedMessage);
    }

    @Override
    public void abort() throws TransactionException {
        currentTransaction.remove();
    }
}
Copy the code

To achieve a TransactionalMessagePublisher MessagePublisher, TransactionalResource interface; The begin method sets the new MessagingTransaction to currentTransaction; Obtained the commit MessagingTransaction, perform applyActions method, and finally execute currentTransaction. Remove (); Abort method performs currentTransaction. Remove () method; The publish method performs transaction.addMessage

Transaction

Cheddar/cheddar/cheddar-tx-api/src/main/java/com/clicktravel/cheddar/infrastructure/tx/Transaction.java

public interface Transaction {

    String transactionId();

}
Copy the code

The Transaction interface defines the transactionId method

MessagingTransaction

Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/MessagingTransaction.java

public class MessagingTransaction implements Transaction { private final Queue<MessageAction> messageActions; private final String transactionId; public MessagingTransaction() { messageActions = new LinkedList<>(); transactionId = UUID.randomUUID().toString(); } @Override public String transactionId() { return transactionId; } public void applyActions(final MessagePublisher<TypedMessage> messagePublisher) { while (! messageActions.isEmpty()) { final MessageAction messageAction = messageActions.remove(); messagePublisher.publish(messageAction.message()); } } public void applyActions(final MessageSender<TypedMessage> messageSender) { while (! messageActions.isEmpty()) { final MessageAction messageAction = messageActions.remove(); messageAction.apply(messageSender); } } public void addMessage(final TypedMessage typedMessage) { messageActions.add(new MessageAction(typedMessage, 0)); } public void addDelayedMessage(final TypedMessage typedMessage, final int delay) { messageActions.add(new MessageAction(typedMessage, delay)); }}Copy the code

The MessagingTransaction method implements the Transaction interface. Its transactionId method returns the constructor generated UUID; Publish and MessageAction.apply (messageSender)

summary

The tx provides TransactionalMessagePublisher, TransactionalMessageSender cheddar, they all realized TransactionalResource interface; Both commit methods execute transaction.applyActions; MessageAction provides two apply method, receive the messageSender parameters apply method when the delay is more than 0 to perform messageSender. SendDelayedMessage, Otherwise, messagesender. send(typedMessage) is executed. The Apply method that receives the messagePublisher parameter executes messagePublisher.publish.

doc

  • Cheddar