sequence

This paper mainly studies the state of Storm Trident

StateType

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/trident/state/StateType. Java

public enum StateType {
    NON_TRANSACTIONAL,
    TRANSACTIONAL,
    OPAQUE
}
Copy the code
  • StateType has three types: NON_TRANSACTIONAL TRANSACTIONAL, TRANSACTIONAL TRANSACTIONAL, and OPAQUE OPAQUE transactions
  • There are three types of spout: Non-transactional, Transactional, and Opaque Transactional

State

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/trident/state/state. The Java

/**
 * There's 3 different kinds of state: * * 1. non-transactional: ignores commits, updates are permanent. no rollback. a cassandra incrementing state would be like this 2. * repeat-transactional: idempotent as long as all batches for a txid are identical 3. opaque-transactional: the most general kind of state. * updates are always done based on the previous version of the value if the current commit = latest stored commit Idempotent even if the * batch for a txid can change. * * repeat transactional is idempotent for transactional spouts opaque transactional is idempotent for opaque or transactional spouts * * Trident should log warnings when state is idempotent but updates will not be idempotent because of spout */ // retrieving is encapsulated in Retrieval interface public interface State { void beginCommit(Long txid); // can be null for things like partitionPersist occuring off a DRPC stream void commit(Long txid); }Copy the code
  • Non-transactional, ignoring commits, updates are persistent, no rollback, increstate of Cassandra is one of these types. At-most or at-least once
  • Repeat – transactional, referred to as “transactional, asked whether replayed, of the same batch txid is always the same, and inside a tuple is constant, a tuple belongs only to a batch, no overlap between each batch; For state updates, replay encounters the same TXID and can be skipped; Requires less state in the database, but is less fault tolerant and ensures exactly once semantics
  • Opaque Transactional (opaque for short) is widely used. It is more fault tolerant than opaque transactional. It does not require a tuple to be in the same batch/ TXID all the time. However, it can guarantee that every tuple is processed successfully only once in one batch. OpaqueTridentKafkaSpout is an implementation of this type, which can tolerate kafka node loss errors; For state updates, replay encounters the same TXID and overwrites it with the current prevValue; Requires more space in the database to store state, but is fault-tolerant and ensures exactly once semantics

MapState

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/trident/state/map/MapState. Java

public interface MapState<T> extends ReadOnlyMapState<T> {
    List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);

    void multiPut(List<List<Object>> keys, List<T> vals);
}
Copy the code
  • MapState inherits the ReadOnlyMapState interface, which in turn inherits the State interface
  • Here, we mainly analyze several implementation classes of MapState

NonTransactionalMap

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/trident/state/map/NonTransactionalMap. Java

public class NonTransactionalMap<T> implements MapState<T> {
    IBackingMap<T> _backing;

    protected NonTransactionalMap(IBackingMap<T> backing) {
        _backing = backing;
    }

    public static <T> MapState<T> build(IBackingMap<T> backing) {
        return new NonTransactionalMap<T>(backing);
    }

    @Override
    public List<T> multiGet(List<List<Object>> keys) {
        return _backing.multiGet(keys);
    }

    @Override
    public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
        List<T> curr = _backing.multiGet(keys);
        List<T> ret = new ArrayList<T>(curr.size());
        for (int i = 0; i < curr.size(); i++) {
            T currVal = curr.get(i);
            ValueUpdater<T> updater = updaters.get(i);
            ret.add(updater.update(currVal));
        }
        _backing.multiPut(keys, ret);
        return ret;
    }

    @Override
    public void multiPut(List<List<Object>> keys, List<T> vals) {
        _backing.multiPut(keys, vals);
    }

    @Override
    public void beginCommit(Long txid) {
    }

    @Override
    public void commit(Long txid) {
    }
}
Copy the code
  • NonTransactionalMap wraps IBackingMap, beginCommit and commit methods do nothing
  • The multiUpdate method constructs the List RET, which is then implemented using the multiPut of IBackingMap

TransactionalMap

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/trident/state/map/TransactionalMap. Java

public class TransactionalMap<T> implements MapState<T> {
    CachedBatchReadsMap<TransactionalValue> _backing;
    Long _currTx;

    protected TransactionalMap(IBackingMap<TransactionalValue> backing) {
        _backing = new CachedBatchReadsMap(backing);
    }

    public static <T> MapState<T> build(IBackingMap<TransactionalValue> backing) {
        returnnew TransactionalMap<T>(backing); } @Override public List<T> multiGet(List<List<Object>> keys) { List<CachedBatchReadsMap.RetVal<TransactionalValue>> vals  = _backing.multiGet(keys); List<T> ret = new ArrayList<T>(vals.size());for (CachedBatchReadsMap.RetVal<TransactionalValue> retval : vals) {
            TransactionalValue v = retval.val;
            if(v ! = null) { ret.add((T) v.getVal()); }else{ ret.add(null); }}return ret;
    }

    @Override
    public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
        List<CachedBatchReadsMap.RetVal<TransactionalValue>> curr = _backing.multiGet(keys);
        List<TransactionalValue> newVals = new ArrayList<TransactionalValue>(curr.size());
        List<List<Object>> newKeys = new ArrayList();
        List<T> ret = new ArrayList<T>();
        for (int i = 0; i < curr.size(); i++) {
            CachedBatchReadsMap.RetVal<TransactionalValue> retval = curr.get(i);
            TransactionalValue<T> val = retval.val;
            ValueUpdater<T> updater = updaters.get(i);
            TransactionalValue<T> newVal;
            boolean changed = false;
            if (val == null) {
                newVal = new TransactionalValue<T>(_currTx, updater.update(null));
                changed = true;
            } else {
                if(_currTx ! = null && _currTx.equals(val.getTxid()) && ! retval.cached) { newVal = val; }else {
                    newVal = new TransactionalValue<T>(_currTx, updater.update(val.getVal()));
                    changed = true;
                }
            }
            ret.add(newVal.getVal());
            if(changed) { newVals.add(newVal); newKeys.add(keys.get(i)); }}if(! newKeys.isEmpty()) { _backing.multiPut(newKeys, newVals); }return ret;
    }

    @Override
    public void multiPut(List<List<Object>> keys, List<T> vals) {
        List<TransactionalValue> newVals = new ArrayList<TransactionalValue>(vals.size());
        for(T val : vals) { newVals.add(new TransactionalValue<T>(_currTx, val)); } _backing.multiPut(keys, newVals); } @Override public void beginCommit(Long txid) { _currTx = txid; _backing.reset(); } @Override public void commit(Long txid) { _currTx = null; _backing.reset(); }}Copy the code
  • The “beginCommit” will set the current txID and reset _backing while the “commit” will reset the TXID. Then reset _backing
  • The multiUpdate method checks if _currTx already has a value and the value! retval.cached(That is, it is not multiPut in this transaction), then the value will not be updated (skip the update), newVal = val
  • TransactionalValue multiPut methods in batch, and then use the CachedBatchReadsMap. MultiPut (List < List > keys, List vals) method, this method will update to the cache update the value

    OpaqueMap

    Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/trident/state/map/OpaqueMap. Java

    public class OpaqueMap<T> implements MapState<T> {
        CachedBatchReadsMap<OpaqueValue> _backing;
        Long _currTx;
    
        protected OpaqueMap(IBackingMap<OpaqueValue> backing) {
            _backing = new CachedBatchReadsMap(backing);
        }
    
        public static <T> MapState<T> build(IBackingMap<OpaqueValue> backing) {
            return new OpaqueMap<T>(backing);
        }
    
        @Override
        public List<T> multiGet(List<List<Object>> keys) {
            List<CachedBatchReadsMap.RetVal<OpaqueValue>> curr = _backing.multiGet(keys);
            List<T> ret = new ArrayList<T>(curr.size());
            for (CachedBatchReadsMap.RetVal<OpaqueValue> retval : curr) {
                OpaqueValue val = retval.val;
                if(val ! = null) {if (retval.cached) {
                        ret.add((T) val.getCurr());
                    } else{ ret.add((T) val.get(_currTx)); }}else{ ret.add(null); }}return ret;
        }
    
        @Override
        public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
            List<CachedBatchReadsMap.RetVal<OpaqueValue>> curr = _backing.multiGet(keys);
            List<OpaqueValue> newVals = new ArrayList<OpaqueValue>(curr.size());
            List<T> ret = new ArrayList<T>();
            for (int i = 0; i < curr.size(); i++) {
                CachedBatchReadsMap.RetVal<OpaqueValue> retval = curr.get(i);
                OpaqueValue<T> val = retval.val;
                ValueUpdater<T> updater = updaters.get(i);
                T prev;
                if (val == null) {
                    prev = null;
                } else {
                    if (retval.cached) {
                        prev = val.getCurr();
                    } else {
                        prev = val.get(_currTx);
                    }
                }
                T newVal = updater.update(prev);
                ret.add(newVal);
                OpaqueValue<T> newOpaqueVal;
                if (val == null) {
                    newOpaqueVal = new OpaqueValue<T>(_currTx, newVal);
                } else {
                    newOpaqueVal = val.update(_currTx, newVal);
                }
                newVals.add(newOpaqueVal);
            }
            _backing.multiPut(keys, newVals);
            return ret;
        }
    
        @Override
        public void multiPut(List<List<Object>> keys, List<T> vals) {
            List<ValueUpdater> updaters = new ArrayList<ValueUpdater>(vals.size());
            for (T val : vals) {
                updaters.add(new ReplaceUpdater<T>(val));
            }
            multiUpdate(keys, updaters);
        }
    
        @Override
        public void beginCommit(Long txid) {
            _currTx = txid;
            _backing.reset();
        }
    
        @Override
        public void commit(Long txid) {
            _currTx = null;
            _backing.reset();
        }
    
        static class ReplaceUpdater<T> implements ValueUpdater<T> {
            T _t;
    
            public ReplaceUpdater(T t) {
                _t = t;
            }
    
            @Override
            public T update(Object stored) {
                return_t; }}}Copy the code
    • The “beginCommit” will set the current TXID and reset _backing. The “commit” will reset the TXID and then reset _backing
    • Unlike TransactionalMap, ReplaceUpdater is used when multiPut is performed, and multiUpdate is called to enforce overwriting
    • The multiUpdate method differs from TransactionalMap in that it updates based on the prev value and evaluates newVal

    summary

    • Trident updates state in strict batch order. For example, batch with TXID 3 can be processed only after batch with TXID 2 is processed
    • There are three types of state: Non-transactional, Transactional, and Opaque Transactional. The corresponding SPOUT is also of these three types
      • Non-transactional does not guarantee exactly once. It can be at-least once or at-most once. Its state calculation refers to NonTransactionalMap, and beginCommit and COMMIT operations are not processed
      • Transactional type can guarantee exactly once, but it has strict requirements. Txids and tuples of the same batch must remain the same when replayed. Therefore, it has poor fault tolerance, but its state calculation is relatively simple. Refer to TransactionalMap and skip the value of the same TXID
      • The Opaque Transactional type can also guarantee exactly once. It allows a tuple to be processed in another batch after a tuple fails, so it has good fault tolerance. However, more PREV values are stored in state calculation. If the value of the same TXID is encountered, overwrite the current value with the prev value
    • Trident will ensure that the exactly Once state calculation is wrapped up and used by passing the corresponding StateFactory to the persistentAggregate. A Factory that supports multiple StateTypes can choose to use the StateType attribute to construct different Transactional states by passing in different parameters. You can also customize the StateFactory by implementing StateFactory, or you can customize stateQuery queries by inheriting BaseQueryFunction, or by inheriting BaseStateUpdater for custom updates. It is passed in through partitionPersist

    doc

    • Trident Tutorial
    • Trident State