preface

Quantitative financial research and real trading, more and more institutions need to calculate the price factor based on high-frequency market data (L1/L2 and order by order data). These factors are usually stateful: related not only to the current multiple indicators, but also to the historical state of multiple indicators. Take the domestic stock market as an example, a snapshot is received every 3 seconds, and each stock gets 4800 snapshots every day. The data of the previous snapshots or even the data of the previous days may be used in the calculation of factors. If a development environment system (such as Python) is different from a production environment system (such as C++), maintaining two sets of code can be a heavy burden for users.

DolphinDB’s Reactive State Engine is used to efficiently develop and compute the high frequency factors associated with DolphinDB’s statuses. The state engine accepts as input expressions or functions written during batch processing of historical data (r&d), avoiding the high cost of rewriting code in production and the burden of maintaining both R&D and production code. The state engine ensures that the result of streaming calculation is exactly the same as that of batch calculation. As long as the verification is correct in the batch calculation of historical data, the real-time calculation of streaming data can be guaranteed, which greatly reduces the cost of real-time calculation debugging.

1. Calculation of financial high-frequency factor

We introduce the problem of financial high-frequency factor calculation through a specific example. The following factor expression was written in the DolphinDB scripting language. It uses a custom function sum_diff and a built-in function EMA (Exponential Moving Average). Sum_diff is a stateless function and EMA is a stateful function that relies on historical data. To make things even trickier, as shown in the computational breakdown below, the computation requires multiple nesting using ema functions.

def sum_diff(x, y){
    return (x-y)/(x+y)
}

ema(1000 * sum_diff(ema(price, 20), ema(price, 40)),10) -  ema(1000 * sum_diff(ema(price, 20), ema(price, 40)), 20)
Copy the code

Faced with such scenarios, we need to solve the following problems:

  • Investment research stage can use historical data to quickly calculate 100~1000 similar factors for each stock?
  • In the firm offer stage, can 100~1000 similar factors be calculated for each stock when tick data arrives?
  • Is the code implementation of batch and stream computation efficient? Can batch and stream code together? Is correctness verification convenient?

2. Advantages and disadvantages of existing solutions

Python Pandas/Numpy is currently the most commonly used high-frequency factor solution in the research phase. Pandas has a sophisticated solution for panel data processing and built-in operators for most high-frequency factor calculations that can be implemented quickly. However, performance is a disadvantage of pandas’ implementation, especially when operators need to be implemented using custom functions. One solution is to start multiple Python processes to compute in parallel. Python Pandas is designed for historical data. If you do not modify the code for streaming data in the production environment, you can only use the method similar to Apache Spark to cache data and divide it into data Windows. As a result, performance issues are more pronounced in a production environment.

To address performance issues in production environments, many organizations re-implement research (historical) code in C++. To do so, you need to maintain two sets of code, and the development cost (time and manpower) increases dramatically. In addition, a great deal of effort is required to ensure that the results of the two systems are exactly the same.

A solution similar to Flink batch stream unification came into being. Flink supports SQL and window functions, and the common operators used by high-frequency factors are already built into Flink. Therefore, simple factors implemented in Flink are very efficient and perform very well. But Flink’s biggest problem is that it can’t implement complex high-frequency factor calculation. As mentioned in the previous chapter, the nesting of multiple window functions is required and cannot be implemented directly with Flink. This is why DolphinDB developed a responsive state engine.

Reactive State Engine

The responsive state engine is essentially a calculation black box with DolphinDB factor codes (expressions or functions) verified in historical data and real-time ticker data as input to output real-time factor values. Because it is much easier to develop and verify hf factors on static historical data sets than on streaming data, responsive state engines significantly reduce the cost and difficulty of streaming HF factor development.

def sum_diff(x, y){
 return (x-y)/(x+y)
}
factor1 = <ema(1000 * sum_diff(ema(price, 20), ema(price, 40)),10) -  ema(1000 * sum_diff(ema(price, 20), ema(price, 40)), 20)>

share streamTable(1:0, `sym`price, [STRING,DOUBLE]) as tickStream
result = table(1000:0, `sym`factor1, [STRING,DOUBLE])
rse = createReactiveStateEngine(name="reactiveDemo", metrics =factor1, dummyTable=tickStream, outputTable=result, keyColumn="sym")
subscribeTable(tableName=`tickStream, actionName="factors", handler=tableInsert{rse})
Copy the code

The above code implements streaming calculations of the above factors in DolphinDB. Factor1 is the realization of the above factors in historical data. Without any changes, factor1 is directly transmitted to the responsive state engine RSE to realize streaming computing. We associate the flow data table tickStream with the state engine RSE by subscribeTable function. Any injection of real-time data triggers a state engine calculation that outputs factor values to the result table. The following code generates 100 random pieces of data that are injected into the stream data table. The results are exactly the same as those calculated through the SQL statement.

Data = table(take(" 000001.sh ", 100) as sym, rand(10.0, 100) as price) tickStream.append! (data) factor1Hist = select sym, ema(1000 * sum_diff(ema(price, 20), ema(price, 40)),10) - ema(1000 * sum_diff(ema(price, 20), ema(price, 40)), 20) as factor1 from data context by sym assert each(eqObj, result.values(), factor1Hist.values())Copy the code

3.1 Working Principle

As shown in Figure 1, a stateful calculation of high frequency factors can actually be decomposed into a directed acyclic graph (DAG). There are three types of nodes in the figure:

1. Data source, such as Price.

2. Stateful operators, such as A, B, D, e.

3. Stateless operators such as c and result.

Start from the data source node and advance layer by layer according to the established path to get the final factor output. This is very similar to cell chain calculation in Excel. When the data in a cell changes, the associated cells change in turn. This is where the name of the reactive state engine comes from.

DolphinDB’s stateless operators are simple and can be represented and evaluated using the existing DolphinDB script engine. So the question becomes two points:

1. How to parse to get an optimized DAG.

2. How to optimize the calculation of each stateful operator.

3.2 Parsing and optimization

DolphinDB is a multi-paradigm programming language that supports vectorization and functionalization. It is not difficult to get the DAG of the calculation step through the function call relationship. At parsing time, because the schema of the input message is known, we can quickly infer the input data type and output data type for each node. The input parameter type is determined, the function name is determined, and a concrete instance of each state operator can be created.

Each DolphinDB operator (stateful and stateless) can be converted to a unique sequence of strings. Based on this, we can delete the repeated operators and improve the computational efficiency.

3.3 Built-in state functions

The state operator computes historical states. Therefore, if every time a calculation is triggered, historical data must be prepared, full calculation, not only memory consumption, but also CPU time consumption. Optimization of state functions, that is, incremental streaming implementation, is critical. The following state functions were optimized for DolphinDB’s responsive state engine. Currently, the state engine does not allow non-optimized state functions.

  • Cumulative window function: cumavg, cumsum, cumprod, cumcount, cummin, cummax, cumvar, cumvarp, cumstd, cumstdp, cumcorr, cumcovar, cumbeta, cumwsum, cumwavg
  • Sliding window function: ema, mavg, msum, mcount, mprod, mvar, mvarp, mstd, mstdp, mskew, mkurtosis, mmin, mmax, mimin, mimax, mmed, mpercentile, mrank, mcorr, mcovar, mbeta, mwsum, mwavg, mslr
  • Sequentially related functions: Deltas, ratios, ffill, move, prev, iterate, ewmMean, ewmVar, ewmStd, ewmCovar, ewmCorr

All the above functions return only one value, except MSLR, which returns two values. DolphinDB will allow users to develop their own DolphinDB stateful functions using plug-ins that can be registered and used in the stateful engine.

3.4 Customizing state functions

Custom state functions can be used in reactive state engines. Note the following points:

Before a function is defined, use @state to indicate that the function is a custom state function. Only assignment and return statements can be used in custom state functions. The return statement must be the last statement and can return multiple values. Use the iif function to indicate if… Else logic. There are many limitations if only one expression is allowed to represent a factor. First, in some cases, expressions alone cannot be used to implement a complete factor. The following example returns linear regression alpha, beta, and residual.

@state
def slr(y, x){
    a, b = mslr(y, x, 12)
    residual = mavg(y, 12) - b * mavg(x, 12) - a
    return a, b, residual
}
Copy the code

Second, many factors may use a common intermediate result, making the code cleaner when defining multiple factors. Custom functions can return multiple results at the same time. The following function multiFactors defines five factors.

@state
def multiFactors(lowPrice, highPrice, volumeTrade, closePrice, buy_active, sell_active, tradePrice, askPrice1, bidPrice1, askPrice10, agg_vol, agg_amt){
    a = ema(askPrice10, 30)
    term0 = ema((lowPrice - a) / (ema(highPrice, 30) - a), 50)
    term1 = mrank((highPrice - a) / (ema(highPrice, 5) - a), true,  15)
    term2 = mcorr(askPrice10, volumeTrade, 10) * mrank(mstd(closePrice, 20, 20), true, 10)
    buy_vol_ma = mavg(buy_active, 6)
    sell_vol_ma = mavg(sell_active, 6)
    zero_free_vol = iif(agg_vol==0, 1, agg_vol)
    stl_prc = ffill(agg_amt \ zero_free_vol \ 20).nullFill(tradePrice)
    buy_prop = stl_prc
 
    spd = askPrice1 - bidPrice1
    spd_ma = round(mavg(iif(spd < 0, 0, spd), 6), 5)
    term3 = buy_prop * spd_ma
    term4 = iif(spd_ma == 0, 0, buy_prop / spd_ma)
    return term0, term1, term2, term3, term4
}
Copy the code

Finally, some expressions are verbose and lack readability. When the factor expression in section 1 is changed to the custom state function factor1 below, the calculation logic is concise.

@state def factor1(price) { a = ema(price, 20) b = ema(price, 40) c = 1000 * sum_diff(a, b) return ema(c, 10) - ema(c, 20)}Copy the code

3.5 Output Result Filtering

The state engine computes a response to each input message, producing a record as a result, which by default is output to the results table, meaning n messages are input and n records are output. If you want to output only a portion of the results, you can enable filtering conditions. Only the results that meet the conditions are output.

The following example checks whether the stock price has changed, and only records of price changes are printed.

share streamTable(1:0, `sym`price, [STRING,DOUBLE]) as tickStream result = table(1000:0, `sym`price, [STRING,DOUBLE]) rse = createReactiveStateEngine(name="reactiveFilter", metrics =[<price>], dummyTable=tickStream, outputTable=result, keyColumn="sym", filter=<prev(price) ! = price>) subscribeTable(tableName=`tickStream, actionName="filter", handler=tableInsert{rse})Copy the code

3.6 Snapshot Mechanism

DolphinDB’s built-in streaming computing engines, including the responsive status engine, support snapshot output to meet the need for business continuity in production environments.

A snapshot of a reactive state engine includes the ID of the last message processed and the current state of the engine. When a system exception occurs and the state engine is reinitialized, it can revert to the state of the last snapshot and start subscribing from the next message processed.

def sum_diff(x, y){
 return (x-y)/(x+y)
}
factor1 = <ema(1000 * sum_diff(ema(price, 20), ema(price, 40)),10) -  ema(1000 * sum_diff(ema(price, 20), ema(price, 40)), 20)>

share streamTable(1:0, `sym`price, [STRING,DOUBLE]) as tickStream
result = table(1000:0, `sym`factor1, [STRING,DOUBLE])
rse = createReactiveStateEngine(name="reactiveDemo", metrics =factor1, dummyTable=tickStream, outputTable=result, keyColumn="sym", snapshotDir= "/home/data/snapshot", snapshotIntervalInMsgCount=400000)
msgId = getSnapshotMsgId(rse)
if(msgId >= 0) msgId += 1
subscribeTable(tableName=`tickStream, actionName="factors", offset=msgId, handler=appendMsg{rse}, handlerNeedMsgId=true)
Copy the code

Responsive state engine to enable snapshot mechanism, create a need to specify two additional parameters snapshotDir and snapshotIntervalInMsgCount. SnapshotDir is used to specify the directory for storing snapshots. SnapshotIntervalInMsgCount article specifies how to deal with the news to create a snapshot. During engine initialization, the system checks whether a snapshot file named after the engine exists in the snapshot directory. The code above, for example, if there is a file/home/data/snapshot/reactiveDemo. The snapshot, load the snapshot. The getSnapshotMsgId function obtains the msgId of the latest snapshot. If no snapshot exists, -1 is returned.

To enable the snapshot mechanism for the state engine, call the subscribeTable function and modify accordingly:

  • You must first specify the offset of the message.
  • Second, the handler must use the appendMsg function. The appendMsg function takes two arguments, msgBody and msgId.
  • Again, the handlerNeedMsgId parameter must be specified as true.

3.7 Parallel Processing

When a large number of DolphinDB messages need to be processed, the optional parameters Filter and hash can be specified in the DolphinDB message subscription function subscribeTable to allow multiple subscription clients to process messages in parallel.

  • The filter parameter is used to specify the message filtering logic. Currently, three filtering modes are supported: value filtering, range filtering, and hash filtering.
  • The hash parameter specifies a hash value that determines which thread will execute the subscription. For example, if subexec* is set to 4 and the hash value is specified by the user is 5, the second thread will perform the computation for the subscription.

The following is an example of a responsive state engine that evaluates factors in parallel. Run the following command to set the following parameters: Subexec* =4 and create four status engines. Each status engine will subscribe to data of different stocks according to the hash value of the stock symbol in the flow table and specify different subscription threads to process the data. Finally, output the results to the same output table.

def sum_diff(x, y){ return (x-y)/(x+y) } factor1 = <ema(1000 * sum_diff(ema(price, 20), ema(price, 40)),10) - ema(1000 * sum_diff(ema(price, 20), ema(price, 40)), 20)> share streamTable(1:0, `sym`price, [STRING,DOUBLE]) as tickStream setStreamTableFilterColumn(tickStream, `sym) share streamTable(1000:0, `sym`factor1, [STRING,DOUBLE]) as resultStream for(i in 0.. 3){ rse = createReactiveStateEngine(name="reactiveDemo"+string(i), metrics =factor1, dummyTable=tickStream, outputTable=resultStream, keyColumn="sym") subscribeTable(tableName=`tickStream, actionName="sub"+string(i), handler=tableInsert{rse}, msgAsTable = true, hash = i, filter = (4,i)) } n=2000000 tmp = table(take("A"+string(1.. 4000), n) as sym, rand(10.0, n) as price) tickstream.append! (tmp)Copy the code

Note that if multiple state engines share the same output table, the output table must be a shared table. A table that is not shared is not thread-safe, and parallel writing can cause a system crash.

4. Unified solution of flow batch

DolphinDB has two ways of implementing DolphinDB for uniform streaming batch processing of financial high-frequency factors.

** The first method: ** Uses functions or expressions to realize financial high-frequency factors, which are substituted into different computing engines to calculate historical data or stream data. In SQL engine, the calculation of historical data can be realized. By substituting the responsive state engine, the convective data can be calculated. This is illustrated in the preface to chapter 3. DolphinDB expressions or functions represented in this mode are descriptions of the semantics of factors rather than concrete implementations. The specific implementation of factor calculation is completed by the corresponding computing engine, so as to achieve the best performance in different scenarios.

** Second method: ** Historical data is played back, converted into streaming data, and then a streaming data computing engine is used to complete the calculation. The only difference is that the data source for the stream table tickStream comes from the replay of the historical database. Using this method to calculate the factor value of historical data is less efficient than batch calculation based on SQL.

def sum_diff(x, y){ return (x-y)/(x+y) } factor1 = <ema(1000 * sum_diff(ema(price, 20), ema(price, 40)),10) - ema(1000 * sum_diff(ema(price, 20), ema(price, 40)), 20)> share streamTable(1:0, `sym`date`time`price, [STRING,DATE,TIME,DOUBLE]) as tickStream result = table(1000:0, `sym`factor1, [STRING,DOUBLE]) rse = createReactiveStateEngine(name="reactiveDemo", metrics =factor1, dummyTable=tickStream, outputTable=result, keyColumn="sym") subscribeTable(tableName=`tickStream, actionName="factors", Handler =tableInsert{rse}) // Load the day's data from the historical database DFS ://TAQ's trades table, InputDS = replayDS(<select sym, date, time, price from loadTable(" DFS ://TAQ" "Trades ") WHERE date=2021.03.08>, 'date,' time, 08:00:00.000 + (1.. 10) * 3600000) replay(inputDS, tickStream, `date, `time, 1000, true, 2)Copy the code

5. Performance testing

We tested the performance of a responsive state engine calculation factor. The test uses mock data and uses the warmupStreamEngine function to simulate a situation where the state engine has already processed part of the data. The test consists of 20 factors of different complexity degrees, of which two custom state functions return 3 and 5 factors respectively. For testing purposes, the calculations are handled using only a single thread.

@state def slr(y, x){ a, b = mslr(y, x, 12) residual = mavg(y, 12) - b * mavg(x, 12) - a return a, b, residual } @state def multiFactors(lowPrice, highPrice, volumeTrade, closePrice, buy_active, sell_active, tradePrice, askPrice1, bidPrice1, askPrice10, agg_vol, agg_amt){ a = ema(askPrice10, 30) term0 = ema((lowPrice - a) / (ema(highPrice, 30) - a), 50) term1 = mrank((highPrice - a) / (ema(highPrice, 5) - a), true, 15) term2 = mcorr(askPrice10, volumeTrade, 10) * mrank(mstd(closePrice, 20, 20), true, 10) buy_vol_ma = mavg(buy_active, 6) sell_vol_ma = mavg(sell_active, 6) zero_free_vol = iif(agg_vol==0, 1, agg_vol) stl_prc = ffill(agg_amt \ zero_free_vol \ 20).nullFill(tradePrice) buy_prop = stl_prc spd = askPrice1 - bidPrice1 spd_ma = round(mavg(iif(spd < 0, 0, spd), 6), 5) term3 = buy_prop * spd_ma term4 = iif(spd_ma == 0, 0, buy_prop / spd_ma) return term0, term1, term2, term3, term4 } metrics = array(ANY, 14) metrics[0] = <ema(1000 * sum_diff(ema(close, 20), ema(close, 40)),10) - ema(1000 * sum_diff(ema(close, 20), ema(close, 40)), 20)> metrics[1] = <mslr(high, volume, 8)[1]> metrics[2] = <mcorr(low, high, 11)> metrics[3] = <mstdp(low, 15)> metrics[4] = <mbeta(high, value, 63)> metrics[5] = <mcovar(low, value, 71)> metrics[6] = <(close/mavg(close, 1.. 6)-1)*100> metrics[7] = <mmin(high, 15)> metrics[8] = <mavg(((high+low)/2+(mavg(high, 2)+mavg(low, 2))/2)*(high-low)/volume, 7, 2)> metrics[9] = <mslr(mavg(close, 14), volume, 63)[1]> metrics[10] = <mcorr(mavg(open, 25), volume, 71)> metrics[11] = <mbeta(high, mstdp(close, 8), 77)> metrics[12] = <slr(close, volume)> metrics[13] = <multiFactors(low, high, volume, close, numTrade, numTrade, close, value, close, open, volume, numTrade)> dummy = streamTable(10000:0, `symbol`market`date`time`quote_type`preclose`open`high`low`close`numTrade`volume`value`position`recvtime,[SYMBOL,SHORT,D ATE,TIME,SHORT,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG,DOUBLE,LONG,TIMESTAMP]) def prepareData(tickNum, batch){ total = tickNum*batch data=table(total:total, `symbol`market`date`time`quote_type`preclose`open`high`low`close`numTrade`volume`value`position`recvtime,[SYMBOL,SHORT,D ATE,TIME,SHORT,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG,DOUBLE,LONG,TIMESTAMP]) data[`market]=rand(10, total) data[`date]=take(date(now()), total) data[`time]=take(time(now()), total) data[`symbol]=take("A"+string(1.. TickNum), total) data[' open]=rand(100.0, total) data[' high]=rand(100.0, total) data[' low]=rand(100.0, total) data[' low]=rand(100.0, total) Total) data[' close]=rand(100.0, total) data[' numTrade]=rand(100, total) data[' volume]=rand(100, total) data[' volume]=rand(100, total) Total) data [` value] = rand (100.0, total) data [` recvtime] = take (now (), total) return data } dropStreamEngine("demo1") dropStreamEngine("demo2") dropStreamEngine("demo3") DropStreamEngine ("demo4") //4000 shares, HisData = prepareData(4000, 100) realData = prepareData(4000, 1) colNames = ["symbol"].append! ("factor"+string(0.. 19)) colTypes = [SYMBOL].append! (take(DOUBLE, 20)) resultTable = streamTable(10000:0, colNames, colTypes) engine1 = createReactiveStateEngine(name="demo1", metrics=metrics, dummyTable=dummy, outputTable=resultTable, keyColumn="symbol") warmupStreamEngine(engine1, hisData) timer(10) engine1.append! (realData) dropAggregator("demo1") //1 stock, HisData = prepareData(1, 100) realData = prepareData(1, 1) colNames = ["symbol"].append! ("factor"+string(0.. 19)) colTypes = [SYMBOL].append! (take(DOUBLE, 20)) resultTable = streamTable(10000:0, colNames, colTypes) engine2 = createReactiveStateEngine(name="demo2", metrics=metrics, dummyTable=dummy, outputTable=resultTable, keyColumn="symbol") warmupStreamEngine(engine2, hisData) timer(10) engine2.append! (realData) dropAggregator("demo2") PrepareData (4000, 100) realData = prepareData(4000, 1) metrics3 = metrics[0] colNames = ["symbol", "factor0"] colTypes = [SYMBOL, DOUBLE] resultTable = streamTable(10000:0, colNames, colTypes) engine3 = createReactiveStateEngine(name="demo3", metrics=metrics3, dummyTable=dummy, outputTable=resultTable, keyColumn="symbol") warmupStreamEngine(engine3, hisData) timer(10) engine3.append! RealData = prepareData(200, 100) realData = prepareData(200, 1) colNames = ["symbol"].append! ("factor"+string(0.. 19)) colTypes = [SYMBOL].append! (take(DOUBLE, 20)) resultTable = streamTable(10000:0, colNames, colTypes) engine4 = createReactiveStateEngine(name="demo4", metrics=metrics, dummyTable=dummy, outputTable=resultTable, keyColumn="symbol") warmupStreamEngine(engine4, hisData) timer(10) engine4.append! (realData)Copy the code

We calculated the total time spent for 10 times and took the average as the time spent for a single time. The server CPU used for the test is Intel(R) Xeon(R) Silver 4216 CPU @ 2.10GHz. In the case of single thread, the test results are as follows:

6. Pipeline processing of multiple engines

DolphinDB’s built-in flow computing engines include a responsive state engine, a time series aggregation engine, a cross section engine, and an exception detection engine. Each of these engines implements a table interface, which makes pipelining multiple engines extremely simple, with one engine serving as the output of the other. Pipelining processing can solve more complicated factor calculation problems. For example, factor calculation often needs to use panel data to complete the calculation of time series and cross section, as long as the response state engine and cross section two engines in series processing can be completed.

The following example is a stream data implementation of the formula For factor 1 of World Quant’s 101 Alpha factors. The rank function is a cross section operation. The parameter part of rank is implemented by a responsive state engine. The rank function itself is implemented with a cross section engine. The cross section engine serves as the output of the state engine.

Rank (Ts_ArgMax(SignedPower((returns<0? Stddev (RETURNS,20):close), 2), 5))-0.5 Dummy = table(1:0, 'sym' time 'maxIndex, [SYMBOL, TIMESTAMP, INDEX]) `sym`time`factor1, [SYMBOL, TIMESTAMP, DOUBLE]) ccsRank = createCrossSectionalAggregator(name="alpha1CCS", Metrics =[<time>, <rank(maxIndex)\count(maxIndex) -0.5 >], dummyTable=dummy, outputTable=resultTable, keyColumn= 'sym, triggeringPattern='keyCount', triggeringInterval=3000, timeColumn=`time) @state def wqAlpha1TS(close){ ret = ratios(close) - 1 v = iif(ret < 0, mstd(ret, 20), Close) return mimax(signum(v)*v*v, 5)} CcsRank INPUT = table(1:0, 'sym' time 'close, [SYMBOL, TIMESTAMP, DOUBLE]) rse = createReactiveStateEngine(name="alpha1", metrics=<wqAlpha1TS(close)>, dummyTable=input, outputTable=ccsRank, keyColumn="sym")Copy the code

In the example above, we still need a human to distinguish which part is a cross section operation from which is a time series operation. In later versions, DolphinDB will use row functions (rowRank, rowSum, etc.) to represent the semantics of cross section operations and other vector functions for time series operations, allowing the system to automatically identify cross section and time series operations within a factor, further automating the engine pipeline.

Pipelined processing is very different from cascading processing of multiple flow tables. The two can accomplish the same task, but there is a big difference in efficiency. The latter involves multiple streaming tables and multiple subscriptions. The former actually has only one subscription, and all calculations are done sequentially in a single thread, resulting in better performance.

7, outlook

The responsive state engine has a large number of commonly used state operators built in, and supports custom state functions. It can also be combined with other streaming computing engines in a pipelined way, which is convenient for developers to quickly realize complex financial high-frequency factors.

The built-in state operator is all developed and implemented in C++, and the algorithm is optimized a lot. The flow calculation of the state operator is realized in an incremental way, so the calculation on a single thread has achieved very good performance. For large scale tasks, we can divide them into multiple sub-subscriptions by subscription filtering, and the subscription calculation can be done by multiple nodes and multiple cpus of each node in parallel. Subsequent releases will improve the creation, management, and monitoring of computing subjobs from manual to automatic.