Hmily frame characteristics [Github.com/yu199195/hm…]

  • Seamlessly integrate Spring,Spring Boot Start.
  • Seamless integration with RPC frameworks such as Dubbo,SpringCloud,Motan, etc.
  • Various transaction log storage modes (redis, mongdb,mysql, etc.).
  • Different log serialization methods (Kryo, Protostuff,hession).
  • Automatic transaction recovery.
  • Dependency passing for embedded transactions is supported.
    • Code zero invasion, simple and flexible configuration.

Why is Hmily so high-performance?

1. Use asynchronous reads and writes to transaction logs with Disruptor (a lockless, gC-free concurrency programming framework)

package com.hmily.tcc.core.disruptor.publisher;

import com.hmily.tcc.common.bean.entity.TccTransaction;
import com.hmily.tcc.common.enums.EventTypeEnum;
import com.hmily.tcc.core.concurrent.threadpool.HmilyThreadFactory;
import com.hmily.tcc.core.coordinator.CoordinatorService;
import com.hmily.tcc.core.disruptor.event.HmilyTransactionEvent;
import com.hmily.tcc.core.disruptor.factory.HmilyTransactionEventFactory;
import com.hmily.tcc.core.disruptor.handler.HmilyConsumerDataHandler;
import com.hmily.tcc.core.disruptor.translator.HmilyTransactionEventTranslator;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.IgnoreExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * event publisher.
 *
 * @author xiaoyu(Myth)
 */
@Component
public class HmilyTransactionEventPublisher implements DisposableBean {

    private Disruptor<HmilyTransactionEvent> disruptor;

    private final CoordinatorService coordinatorService;

    @Autowired
    public HmilyTransactionEventPublisher(final CoordinatorService coordinatorService) {
        this.coordinatorService = coordinatorService;
    }

    /**
     * disruptor start.
     *
     * @param bufferSize this is disruptor buffer size.
     * @param threadSize this is disruptor consumer thread size.
     */
    public void start(final int bufferSize, final int threadSize) {
        disruptor = new Disruptor<>(new HmilyTransactionEventFactory(), bufferSize, r -> {
            AtomicInteger index = new AtomicInteger(1);
            return new Thread(null, r, "disruptor-thread-" + index.getAndIncrement());
        }, ProducerType.MULTI, new BlockingWaitStrategy());

        final Executor executor = new ThreadPoolExecutor(threadSize, threadSize, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
                HmilyThreadFactory.create("hmily-log-disruptor".false),
                new ThreadPoolExecutor.AbortPolicy());

        HmilyConsumerDataHandler[] consumers = new HmilyConsumerDataHandler[threadSize];
        for (int i = 0; i < threadSize; i++) {
            consumers[i] = new HmilyConsumerDataHandler(executor, coordinatorService);
        }
        disruptor.handleEventsWithWorkerPool(consumers);
        disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
        disruptor.start();
    }

    /**
     * publish disruptor event.
     *
     * @param tccTransaction {@linkplain com.hmily.tcc.common.bean.entity.TccTransaction }
     * @param type           {@linkplain EventTypeEnum}
     */
    public void publishEvent(final TccTransaction tccTransaction, final int type) {
        final RingBuffer<HmilyTransactionEvent> ringBuffer = disruptor.getRingBuffer();
        ringBuffer.publishEvent(new HmilyTransactionEventTranslator(type), tccTransaction);
    }

    @Override
    public void destroy(a) { disruptor.shutdown(); }}Copy the code
  • The default value of bufferSize is 4094 x 4, which can be configured as required by the user. “`java

HmilyConsumerDataHandler[] consumers = new HmilyConsumerDataHandler[threadSize]; for (int i = 0; i < threadSize; i++) { consumers[i] = new HmilyConsumerDataHandler(executor, coordinatorService); } disruptor.handleEventsWithWorkerPool(consumers);

* Here, multiple consumers are used to process tasks in the queue. Execute the confrim,cancel method asynchronously. ```java package com.hmily.tcc.core.service.handler; import com.hmily.tcc.common.bean.context.TccTransactionContext; import com.hmily.tcc.common.bean.entity.TccTransaction; import com.hmily.tcc.common.enums.TccActionEnum; import com.hmily.tcc.core.concurrent.threadpool.HmilyThreadFactory; import com.hmily.tcc.core.service.HmilyTransactionHandler; import com.hmily.tcc.core.service.executor.HmilyTransactionExecutor; import org.aspectj.lang.ProceedingJoinPoint; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * this is transaction starter. * * @author xiaoyu */ @Component public class StarterHmilyTransactionHandler implements HmilyTransactionHandler { private static final int MAX_THREAD = Runtime.getRuntime().availableProcessors() << 1; private final HmilyTransactionExecutor hmilyTransactionExecutor; private final Executor executor = new ThreadPoolExecutor(MAX_THREAD, MAX_THREAD, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), HmilyThreadFactory.create("hmily-execute", false), new ThreadPoolExecutor.AbortPolicy()); @Autowired public StarterHmilyTransactionHandler(final HmilyTransactionExecutor hmilyTransactionExecutor) { this.hmilyTransactionExecutor = hmilyTransactionExecutor; } @Override public Object handler(final ProceedingJoinPoint point, final TccTransactionContext context) throws Throwable { Object returnValue; try { TccTransaction tccTransaction = hmilyTransactionExecutor.begin(point); try { //execute try returnValue = point.proceed(); tccTransaction.setStatus(TccActionEnum.TRYING.getCode()); hmilyTransactionExecutor.updateStatus(tccTransaction); } catch (Throwable throwable) { //if exception ,execute cancel final TccTransaction currentTransaction = hmilyTransactionExecutor.getCurrentTransaction(); executor.execute(() -> hmilyTransactionExecutor .cancel(currentTransaction)); throw throwable; } //execute confirm final TccTransaction currentTransaction = hmilyTransactionExecutor.getCurrentTransaction(); executor.execute(() -> hmilyTransactionExecutor.confirm(currentTransaction)); } finally { hmilyTransactionExecutor.remove(); } return returnValue; }}Copy the code
  • The thread pool is used to execute cancel asynchronously when there is an exception on the AOP aspect of the try method and confrim when there is no exception.

One might ask here: What about cancel method exceptions, or Confrim method exceptions?

A: First of all, it’s very rare because you just executed a try last time. Second, if this happens, the log will be saved during the try phase, and Hmily has a built-in scheduling thread pool for recovery, so don’t worry.

One might ask: what if there is a log save exception here?

A: First of all, this is a tricky question, first of all, log configuration parameters, when the framework startup, you will be asked to configure. Second, even if there is a log exception during the run, the framework will fetch it from the cache, which will not affect the correct execution of the program. Finally, if the log saving is abnormal and the system goes down again under extreme circumstances, congratulations, you can buy lottery tickets, the best solution is not to fix it.

3. Use of the ThreadLocal cache.

  /**
     * transaction begin.
     *
     * @param point cut point.
     * @return TccTransaction
     */
    public TccTransaction begin(final ProceedingJoinPoint point) {
        LogUtil.debug(LOGGER, () -> "... Hmily transaction! start....");
        //build tccTransaction
        final TccTransaction tccTransaction = buildTccTransaction(point, TccRoleEnum.START.getCode(), null);
        //save tccTransaction in threadLocal
        CURRENT.set(tccTransaction);
        //publishEvent
        hmilyTransactionEventPublisher.publishEvent(tccTransaction, EventTypeEnum.SAVE.getCode());
        //set TccTransactionContext this context transfer remote
        TccTransactionContext context = new TccTransactionContext();
        //set action is try
        context.setAction(TccActionEnum.TRYING.getCode());
        context.setTransId(tccTransaction.getTransId());
        context.setRole(TccRoleEnum.START.getCode());
        TransactionContextLocal.getInstance().set(context);
        return tccTransaction;
    }
Copy the code
  • The first thing to understand is that threadLocal holds transaction information about the originator method. This is important. Don’t get a little confused. RPC calls are saved in a call chain.
 /**
     * add participant.
     *
     * @param participant {@linkplain Participant}
     */
    public void enlistParticipant(final Participant participant) {
        if (Objects.isNull(participant)) {
            return;
        }
        Optional.ofNullable(getCurrentTransaction())
                .ifPresent(c -> {
                    c.registerParticipant(participant);
                    updateParticipant(c);
                });
    }
Copy the code

4. The use of GuavaCache

package com.hmily.tcc.core.cache;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.Weigher;
import com.hmily.tcc.common.bean.entity.TccTransaction;
import com.hmily.tcc.core.coordinator.CoordinatorService;
import com.hmily.tcc.core.helper.SpringBeanUtils;
import org.apache.commons.lang3.StringUtils;

import java.util.Optional;
import java.util.concurrent.ExecutionException;

/**
 * use google guava cache.
 * @author xiaoyu
 */
public final class TccTransactionCacheManager {

    private static final int MAX_COUNT = 10000;

    private static final LoadingCache<String, TccTransaction> LOADING_CACHE =
            CacheBuilder.newBuilder().maximumWeight(MAX_COUNT)
                    .weigher((Weigher<String, TccTransaction>) (string, tccTransaction) -> getSize())
                    .build(new CacheLoader<String, TccTransaction>() {
                        @Override
                        public TccTransaction load(final String key) {
                            returncacheTccTransaction(key); }});private static CoordinatorService coordinatorService = SpringBeanUtils.getInstance().getBean(CoordinatorService.class);

    private static final TccTransactionCacheManager TCC_TRANSACTION_CACHE_MANAGER = new TccTransactionCacheManager();

    private TccTransactionCacheManager(a) {}/**
     * TccTransactionCacheManager.
     *
     * @return TccTransactionCacheManager
     */
    public static TccTransactionCacheManager getInstance(a) {
        return TCC_TRANSACTION_CACHE_MANAGER;
    }

    private static int getSize(a) {
        return (int) LOADING_CACHE.size();
    }

    private static TccTransaction cacheTccTransaction(final String key) {
        return Optional.ofNullable(coordinatorService.findByTransId(key)).orElse(new TccTransaction());
    }

    /**
     * cache tccTransaction.
     *
     * @param tccTransaction {@linkplain TccTransaction}
     */
    public void cacheTccTransaction(final TccTransaction tccTransaction) {
        LOADING_CACHE.put(tccTransaction.getTransId(), tccTransaction);
    }

    /**
     * acquire TccTransaction.
     *
     * @param key this guava key.
     * @return {@linkplain TccTransaction}
     */
    public TccTransaction getTccTransaction(final String key) {
        try {
            return LOADING_CACHE.get(key);
        } catch (ExecutionException e) {
            return newTccTransaction(); }}/**
     * remove guava cache by key.
     * @param key guava cache key.
     */
    public void removeByKey(final String key) {
        if(StringUtils.isNotEmpty(key)) { LOADING_CACHE.invalidate(key); }}}Copy the code
  • In the participant, we use ThreadLocal, and in the participant, why not? In fact, there are two reasons: first. Because try and Confrim are not on the same thread, ThreadLocal will fail. When RPC clustering is considered, it may be loaded on different machines. Here’s a detail:

    private static TccTransaction cacheTccTransaction(final String key) {
        return Optional.ofNullable(coordinatorService.findByTransId(key)).orElse(new TccTransaction());
    }
    Copy the code

    When GuavaCache does not exist, it will query the log and return it, thus ensuring the support of the cluster environment.

    These four points make Hmily an asynchronous, high-performance distributed transaction TCC framework.

    How is Hmily used? (Github.com/yu199195/hm…

    The framework package was not uploaded to the Maven central repository because of the package naming problem, so users need to pull the code and compile deploy into their own private servers.

    1. The dubbo users

    • Introduce XML into your Api project

    com.hmily.tcc hmily-tcc-annotation {you version} “`

  • Introduce in your service provider project

 <dependency>
            <groupId>com.hmily.tcc</groupId>
            <artifactId>hmily-tcc-dubbo</artifactId>
            <version>{you version}</version>
        </dependency>
Copy the code
  • Configure the startup bean XML
* configuration properties many, of course, here I only gives a demo, specific can refer to this class: ` ` ` Java package com.hmily.tcc.com mon. Config. import com.hmily.tcc.common.enums.RepositorySupportEnum; import lombok.Data; /** * hmily config. * * @author xiaoyu */ @Data public class TccConfig { /** * Resource suffix this parameter please fill in about is the transaction store path. * If it's a table store this is a table suffix, it's stored the same way. * If this parameter is not filled in, the applicationName of the application is retrieved by default */ private String repositorySuffix; /** * log serializer. * {@linkplain com.hmily.tcc.common.enums.SerializeEnum} */ private String serializer = "kryo"; /** * scheduledPool Thread size. */ private int scheduledThreadMax = Runtime.getRuntime().availableProcessors() << 1; /** * scheduledPool scheduledDelay unit SECONDS. */ private int scheduledDelay = 60; /** * retry max. */ private int retryMax = 3; /** * recoverDelayTime Unit seconds * (note that this time represents how many seconds after the local transaction was created before execution). */ private int recoverDelayTime = 60; /** * Parameters when participants perform their own recovery. * 1.such as RPC calls time out * 2.such as the starter down machine */ private int loadFactor = 2; /** * repositorySupport. * {@linkplain RepositorySupportEnum} */ private String repositorySupport = "db"; /** * disruptor bufferSize. */ private int bufferSize = 4096 * 2 * 2; /** * this is disruptor consumerThreads. */ private int consumerThreads = Runtime.getRuntime().availableProcessors() << 1; /** * db config. */ private TccDbConfig tccDbConfig; /** * mongo config. */ private TccMongoConfig tccMongoConfig; /** * redis config. */ private TccRedisConfig tccRedisConfig; /** * zookeeper config. */ private TccZookeeperConfig tccZookeeperConfig; /** * file config. */ private TccFileConfig tccFileConfig; }Copy the code

SpringCloud user

     <dependency>          <groupId>com.hmily.tcc</groupId>          <artifactId>hmily-tcc-springcloud</artifactId>          <version>{you version}</version>      </dependency>
Copy the code

Motan user

     <dependency>          <groupId>com.hmily.tcc</groupId>          <artifactId>hmily-tcc-motan</artifactId>          <version>{you version}</version>      </dependency>
Copy the code

Hmily-spring-boot-start this is easier, just import different JARS depending on your RPC framework.

  • If you are a Dubbo user, then introduce

    <dependency> <groupId>com.hmily.tcc</groupId> <artifactId>hmily-tcc-spring-boot-starter-dubbo</artifactId> <version>${your version}</version></dependency>
    Copy the code
    • If you are a SpringCloud user, then import
    <dependency> <groupId>com.hmily.tcc</groupId> <artifactId>hmily-tcc-spring-boot-starter-springcloud</artifactId> <version>${your version}</version></dependency>
    Copy the code
  • If you are a Motan user, then introduce:

<dependency>     <groupId>com.hmily.tcc</groupId>     <artifactId>hmily-tcc-spring-boot-starter-motan</artifactId>     <version>${your version}</version> </dependency>
Copy the code
  • Then do the following configuration in your YML:

    hmily:tcc : serializer : kryo recoverDelayTime : 128 retryMax : 3 scheduledDelay : 128 scheduledThreadMax : 10 repositorySupport : db tccDbConfig : driverClassName : com.mysql.jdbc.Driver url :  JDBC: mysql: / / 192.168.1.98:3306 / TCC? useUnicode=true& characterEncoding=utf8             username : root password : 123456    #repositorySupport: redis #tccRedisConfig: #masterName: mymaster #sentinelUrl: 192.168.1.91:26379; 192.168.1.92:26379; 192.168.1.93:26379 #password: foobaredbbexONE123 # repositorySupport: zookeeper # host: 92.168.1.73:2181 # sessionTimeOut: 100000 # rootPath: / TCC # repositorySupport: mongodb # mongoDbUrl MongoDbName: happylife # repositorySupport: 192.168.1.68:27017 # repositorySupport: file # path : /account # prefix : account
    Copy the code
    • That’s it, and then you can annotate the interface methods with @tCC annotations for fun.
    • Of course, because of the space problem, many things are just simple description, especially the logic aspect.
    • If you are interested, you can do star and fork on Github, and you can also add wechat and QQ groups to communicate.
    • Here is the github address: github.com/yu199195/hm…
    • Finally, thank you again, if interested friends, can provide your excellent awesome PR…