DolphinDB provides a powerful in-memory computing engine with built-in time series functions, distributed computing and streaming data processing engines that can efficiently compute K-lines in many scenarios. DolphinDB is shown how to calculate k-lines using batch processing and streaming.
- Calculate K line in batches for historical data
The start time of k-line window can be specified. There can be multiple trading sessions in a day, including overnight sessions; K line window can overlap; Use transaction volume as the dimension to divide k-line window. DolphinDB’s built-in Map-reduce function can be used for parallel computing when large amounts of data need to be read and the results need to be written to the database.
- Flow calculation of K line
Real-time market data was received using apis and k-line data was calculated using DolphinDB’s built-in TimeSeriesAggregator.
1. Historical data K line calculation
DolphinDB’s built-in functions bar, dailyAlignedBar, or WJ can be used to calculate K lines using historical data.
1.1 Do not specify the start time of k-line window, and automatically generate k-line results according to the data
Bar (X,Y) returns the remainder of X minus X divided by Y, commonly used to group data.
date = 09:32m 09:33m 09:45m 09:49m 09:56m 09:56m;
bar(date, 5);
Copy the code
The following result is returned:
[09:30m,09:30m,09:45m,09:45m,09:55m,09:55m]
Copy the code
** Example 1: ** Use the following data to simulate the US stock market:
N = 1000 date = take(2019.11.07 2019.11.08, n) time = (09:30:00.000 + rand(int(6.5*60*60*1000), n)).sort! () timestamp = concatDateTime(date, time) price = 100+cumsum(rand(0.02, n)-0.01) volume = rand(1000, n) symbol = rand(`AAPL`FB`AMZN`MSFT, n) trade = table(symbol, date, time, timestamp, price, volume).sortBy! (`symbol`timestamp) undef(`date`time`timestamp`price`volume`symbol)Copy the code
Calculate the 5-minute K line:
barMinutes = 5
OHLC = select first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(volume) as volume from trade group by symbol, date, bar(time, barMinutes*60*1000) as barStart
Copy the code
Notice that in the above data, the time column has an accuracy of milliseconds. If the precision of the time column is less than milliseconds, the numbers in barMinutes*60*1000 should be adjusted accordingly.
1.2 The start time of the K-line window needs to be specified
To specify the start time of the K-line window, use the dailyAlignedBar function. This function can handle multiple trading sessions of the day, as well as overnight sessions.
Note that when using the dailyAlignedBar function, the time column must contain date information, including DATETIME, TIMESTAMP, or NANOTIMESTAMP of three types of data. The parameter timeOffset, which specifies the start TIME of each trading session window, must use the corresponding SECOND, TIME, or NANOTIME data with the date information removed.
Example 2 (one trading session per day) : Calculate the 7-minute K line for the US stock market. The data follows the TRADE table in Example 1.
barMinutes = 7 OHLC = select first(price) as open, max(price) as high, min(price) as low, last(price) as close, Sum (volume) as volume from trade group by symbol, dailyAlignedBar(timestamp, 09:30:00.000, barMinutes*60*1000) as barStartCopy the code
Example 3 (two trading sessions per day) : China’s stock market has two trading sessions per day, from 9:30 to 11:30 in the morning and from 13:00 to 15:00 in the afternoon.
Simulation using the following data:
N = 1000000 date = take(2019.11.07 2019.11.08, n) time = (09:30:00.000 + rand(2*60*60*1000, n/2)).sort! The join () (13:00:00. 000 + rand (2 * 60 * 60 * 1000, n / 2)). Sort! () timestamp = concatDateTime(date, time) price = 100+cumsum(rand(0.02, n)-0.01) volume = rand(1000, n) symbol = rand(`600519`000001`600000`601766, n) trade = table(symbol, timestamp, price, volume).sortBy! (`symbol`timestamp) undef(`date`time`timestamp`price`volume`symbol)Copy the code
Calculate the 7-minute K line:
BarMinutes = 7 sessionsStart=09:30:00.000 13:00:00.000 OHLC = select first(price) as open, Max (price) as high, min(price) as low, last(price) as close, sum(volume) as volume from trade group by symbol, dailyAlignedBar(timestamp, sessionsStart, barMinutes*60*1000) as barStartCopy the code
Example 4 (two daily trading sessions, including overnight sessions) : Some futures have multiple daily trading sessions, including overnight sessions. In this case, the first trading session is from 8:45 to 13:45 PM, and the other session is an overnight session from 15:00 to 05:00 the next day.
Simulation using the following data:
DaySession = 08:45:00.000:13:45:00.000 nightSession = 15:00:00.000: 05:00:00.000 n = 1000000 TIMESTAMP = rand(concatDateTime(2019.11.06, daySession[0]).. ConcatDateTime (2019.11.08, nightSession [1]), n). Sort! () price = 100+cumsum(rand(0.02, n)-0.01) volume = rand(1000, n) symbol = rand(' A120001 'A120002' A120003 'A120004, n) trade = select * from table(symbol, timestamp, price, volume) where timestamp.time() between daySession or timestamp.time()>=nightSession[0] or timestamp.time()<nightSession[1] order by symbol, timestamp undef(`timestamp`price`volume`symbol)Copy the code
Calculate the 7-minute K line:
barMinutes = 7
sessionsStart = [daySession[0], nightSession[0]]
OHLC = select first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(volume) as volume from trade group by symbol, dailyAlignedBar(timestamp, sessionsStart, barMinutes*60*1000) as barStart
Copy the code
1.3 Overlapping K-line window: usewj
function
In the above examples, k-line Windows do not overlap. To compute overlapping K-line Windows, use the wj function. Using the wj function, you can specify a relative time range for the time column in the left table and perform the calculation in the right table.
Example 5 (two trading sessions per day, overlapping K-line window) : Simulate Chinese stock market data and calculate 30-minute K-line every 5 minutes.
N = 1000000 Symbols = '600519' 000001 '600000' 601766 trade = table(take(sampleDate, n) as date, (09:30:00. + 000 rand ($7200000, n / 2)). The sort! () join (13:00:00.000 + rand(7200000, n/2)).sort! () as time, rand(symbols, n) as symbol, 100+cumsum(rand(0.02, n)-0.01) as price, rand(1000, n) as volume)Copy the code
The window is first generated based on time, and a cross Join is used to generate a combination of stock and trade Windows.
BarWindows = table(symbols as symbols).cj(table((09:30:00.000 + 0.. 23 * 300000).join(13:00:00.000 + 0.. 23 * 300000) as time))Copy the code
Then use wj function to calculate k-line data of overlapping Windows:
OHLC = wj(barWindows, trade, 0:(30*60*1000),
<[first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(volume) as volume]>, `symbol`time)
Copy the code
1.4 Use transaction volume to divide k-line window
In the above examples, we used time as the dimension to divide k-line Windows. In practice, other dimensions can also be used as a basis for dividing k-line Windows. For example, the cumulative trading volume to calculate K line.
Example 6 (calculate K line using accumulated trading volume for two trading periods per day) : Simulate Chinese stock market data and calculate K line for every 10,000 increase in trading volume.
N = 1000000 Symbols = '600519' 000001 '600000' 601766 trade = table(take(sampleDate, n) as date, (09:30:00. + 000 rand ($7200000, n / 2)). The sort! () join (13:00:00.000 + rand(7200000, n/2)).sort! () as time, rand(symbols, n) as symbol, 100+cumsum(rand(0.02, n)-0.01) as price, rand(1000, n) as volume) volThreshold = 10000 select first(time) as barStart, first(price) as open, max(price) as high, min(price) as low, last(price) as close from (select symbol, price, cumsum(volume) as cumvol from trade context by symbol) group by symbol, bar(cumvol, volThreshold) as volBarCopy the code
The code uses a nested query approach. The subquery generates cumvol for each stock, and then in the main query generates the window based on the accumulated volume using the bar function.
1.5 Acceleration using MapReduce Functions
DolphinDB map-reduce function Mr Can be used to read and calculate historical data of a larger magnitude from the database, calculate k-lines, and store them in the database. This method can significantly increase speed.
This example uses nanosecond trading data from the U.S. stock market. The raw data is stored in the “trades” table of the “DFS ://TAQ” database. The DFS ://TAQ” database uses composite partitioning: value partitioning based on trade Date and range partitioning based on stock Symbol.
(1) Load the metadata of the original data table stored on disk into memory:
login(`admin, `123456)
db = database("dfs://TAQ")
trades = db.loadTable("trades")
Copy the code
(2) Create an empty data table on disk to store the calculation results. The following code creates a template table (model) and creates an empty OHLC table in the database “DFS ://TAQ” according to the schema of the template table to store k-line calculation results:
model=select top 1 Symbol, Date, Time.second() as bar, PRICE as open, PRICE as high, PRICE as low, PRICE as close, SIZE as volume from trades where Date=2007.08.01, Symbol= 'EBAY if(existsTable(" DFS ://TAQ", "OHLC")) db.dropTable("OHLC") db.createPartitionedTable(model, `OHLC, `Date`Symbol)Copy the code
(3) Use Mr Function to calculate K line data and write the results into OHLC table:
def calcOHLC(inputTable){ tmp=select first(PRICE) as open, max(PRICE) as high, min(PRICE) as low, last(PRICE) as close, sum(SIZE) as volume from inputTable where Time.second() between 09:30:00 : 15:59:59 group by Symbol, Date, 09:30:00+bar(Time.second()-09:30:00, 5*60) as bar loadTable("dfs://TAQ", `OHLC).append! (tmp) return tmp.size() } ds = sqlDS(<select Symbol, Date, Time, PRICE, SIZE from trades where Date between 2007.08.01:2019.08.01 >) Mr (ds, calcOHLC, +)Copy the code
In the above code, DS is a series of data sources generated by the function sqlDS, each representing data extracted from a data partition; The custom function calcOHLC is the Map function in the Map-Reduce algorithm. It calculates k-line data for each data source, writes the result to the database, and returns the number of k-line data written to the database. +” is the Reduce function in the Map-Reduce algorithm. It adds the results of all Map functions, that is, the number of k-line data written to the database, and returns the total number of K-line data written to the database.
2. Real-time K line calculation
The procedure for calculating real-time K-lines in DolphinDB Database is shown below:
DolphinDB real-time K-line calculation flowchart
Real-time data vendors typically offer data subscription services based on Python, Java, or other commonly used language apis. In this example, Python is used to simulate receiving market data and writing it to a stream table using the DolphinDB Python API. DolphinDB’s Timeserik Aggregator calculates k-lines for real-time data at specified frequencies and moving Windows.
The simulated real-time data source used in this example is the text file trades.csv. The file contains the following four columns (with one row of sample data) :
The following three sections describe the three steps of real-time K-line calculation:
2.1 Use Python to receive real-time data and write DolphinDB stream data tables
-
DolphinDB stream tables are set up
share streamTable(100:0,
Symbol
DatetimePrice
Volume,[SYMBOL,DATETIME,DOUBLE,INT]) as Trade -
Python programs read data from the data source trees.csv file and write DolphinDB.
The accuracy of Datetime in real-time data is seconds. Since Datetime [64] is the type of nanatimestamp used only in pandas DataFrame, the following code will be converted before writing. This process also applies to most scenarios where the data needs to be cleaned and transformed.
import dolphindb as ddb import pandas as pd import numpy as np csv_file = "trades.csv" csv_data = pd.read_csv(csv_file, dtype={'Symbol':str} ) csv_df = pd.DataFrame(csv_data) s = ddb.session(); S.connect ("127.0.0.1",8848,"admin","123456") # upload DataFrame to DolphinDB S.pload ({"tmpData":csv_df}) s.run("data = select Symbol, Datetime (Datetime) as Datetime, Price, Volume from tmpData") s.run("tableInsert(Trade,data)")Copy the code
2.2 Real-time calculation of K line
In this case, using the temporal aggregation engine createTimeSeriesAggregator function real-time computing K line data, and the results will be output to the stream data in table OHLC.
Real-time calculation of K line data can be divided into the following two situations according to different application scenarios:
-
Calculations are triggered only at the end of each time window
-
The time Windows are completely different, for example, k-line data of the past 5 minutes are calculated every 5 minutes
-
The time window overlaps partially, for example, the K line data of the past 5 minutes is calculated every 1 minute
-
The calculation is triggered at the end of each time window, and the data is updated at a certain frequency within each time window, for example, the K line data of the past one minute is calculated every minute. However, the K line data of the last one minute is not expected to be calculated after the end of the window. Expect updates every 1 second
Below for the above several kinds of circumstances, introduce how to use createTimeSeriesAggregator function real-time computing K line data. Create a time series aggregation engine based on site requirements.
2.2.1 Calculation is triggered only at the end of each time window
Only when the calculation is triggered at the end of each time window, it can be divided into two scenarios: completely non-coincidence and partially coincidence. In both cases can windowSize createTimeSeriesAggregator function through setting parameters and step to achieve. Details below.
First define the output table:
share streamTable(100:0, `datetime`symbol`open`high`low`close`volume,[DATETIME, SYMBOL, DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG]) as OHLC
Copy the code
Then, choose any of the following scenarios to create a time series aggregation engine based on different application scenarios.
Scenario 1: Calculate the k-line data of the past 5 minutes every 5 minutes, and use the following script to define the timing aggregation engine. The windowSize parameter is the same as the step parameter
tsAggrKline = createTimeSeriesAggregator(name="aggr_kline", windowSize=300, step=300, metrics=<[first(Price),max(Price),min(Price),last(Price),sum(volume)]>, dummyTable=Trade, outputTable=OHLC, timeColumn=`Datetime, keyColumn=`Symbol)
Copy the code
Scenario 2: Calculate K line data for the past 5 minutes every 1 minute. You can use the following script to define a sequential aggregation engine. The windowSize parameter value is a multiple of the step parameter value
tsAggrKline = createTimeSeriesAggregator(name="aggr_kline", windowSize=300, step=60, metrics=<[first(Price),max(Price),min(Price),last(Price),sum(volume)]>, dummyTable=Trade, outputTable=OHLC, timeColumn=`Datetime, keyColumn=`Symbol)
Copy the code
Finally, define the streaming data subscription. If live data has been written to the stream table Trade at this point, the live data is immediately subscribed and injected into the aggregation engine:
subscribeTable(tableName="Trade", actionName="act_tsaggr", offset=0, handler=append! {tsAggrKline}, msgAsTable=true)Copy the code
The first 5 rows of the output table for scenario 1:
2.2.2 Trigger the calculation at the end of each time window and update the calculation results at a certain frequency
Take the window time of 1 minute to calculate the VWAP price as an example. After the aggregation result is updated at 10:00, the next update will have to wait at least 10:01. According to the calculation rules, no calculation is triggered even if a lot of transactions occur in that minute. This is unacceptable in many financial trading scenarios, and in order to update information at a higher frequency, we introduced the updateTime parameter of the sequential aggregation engine.
The updateTime parameter represents the time interval for calculation. If updateTime is not specified, the time series aggregation engine will only trigger a calculation at the end of each time window. However, if updateTime is specified, calculations are triggered in three cases:
- At the end of each time window, the time series aggregation engine triggers a calculation
- The time series aggregation engine triggers a calculation for each updateTime unit
- If there is more than 2*updateTime units after data is entered (if 2*updateTime is less than 2 seconds, set to 2 seconds) and there is still uncalculated data in the current window, the time series aggregation engine will trigger a calculation
This ensures that the sequential aggregation engine fires calculations at the end of each time window and at a certain frequency within each time window.
Note that the sequential aggregation engine requires that keyedTable be used as the output table when using the updateTime parameter. Specific reasons are as follows:
-
If a normal table or streamTable is used as the output table, table and streamTable do not restrict the write of repeated data. Therefore, if the data meets the condition of triggering updateTime but does not meet the condition of triggering STEP, The timing aggregation engine will continuously add the same time calculation results to the output table, and the resulting output table will have a large number of records with the same time, and the result will be meaningless.
-
If keyedStreamTable is used as the output table, keyedStreamTable does not allow historical records to be updated or records with the same key value to be added to the table. When a new record is added to the table, the system automatically checks the primary key value of the new record. If the primary key value of the new record is the same as the primary key value of the existing record, the new record will not be written. In this scenario, when the data does not meet the conditions for triggering STEP but meets the conditions for triggering updateTime, the sequential aggregation engine writes the calculation results of the most recent window to the output table, but the writing is prohibited because the time is the same, and the updateTime parameter also loses its significance.
-
Using keyedTable as the output table keyedTable allows updates. When a new record is added to the table, the system automatically checks the primary key value of the new record. If the primary key value of the new record is the same as the primary key value of an existing record, the system updates the corresponding record in the table. The result of this scenario is that the calculation results may be updated at the same time. When the data does not meet the criteria for triggering step, but meets the criteria for triggering updateTime, the result is modified to be computed based on the data in the most recent window, rather than adding a new record to the output table. New records are not added to the output table until the data meets the criteria for triggering a STEP. This is the desired result, so the sequential aggregation engine requires that keyedTable be used as the output table when using the updateTime parameter.
For example, you want to calculate the k-line with a window of 1 minute, but you don’t want to wait until the window ends to calculate the k-line with the last 1 minute. We want to update the K line data for nearly 1 minute every 1 second. We can implement this scenario through the following steps.
First, we need to create a keyedTable as the output table with the time and ticker columns as the primary keys. When a new data injection output table is available, if the time of the new record already exists in the table, the corresponding time record in the table is updated. This ensures that the data is up to date at each time of each query.
share keyedTable(`datetime`Symbol, 100:0, `datetime`Symbol`open`high`low`close`volume,[DATETIME,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG]) as OHLC
Copy the code
Note that keyedTable is used as the output table when using the sequential aggregation engine. If the sequential aggregation engine specifies the keyColumn parameter, kyedTable needs to have both the time-dependent column and the keyColumn column as the primary key.
The sequential aggregation engine can be defined using the following script to calculate the K line data for the past 1 minute every minute and update the K line data for nearly 1 minute every second. The windowSize parameter is the same as the step parameter, and the updateTime parameter is set to one second, that is, the latest data is updated every one second. The useWindowStartTime parameter in the following example is used to specify the time in the output table as the start time of the data window.
tsAggrKline = createTimeSeriesAggregator(name="aggr_kline", windowSize=60, step=60, metrics=<[first(Price),max(Price),min(Price),last(Price),sum(volume)]>, dummyTable=Trade, outputTable=OHLC, timeColumn=`Datetime, keyColumn=`Symbol,updateTime=1, useWindowStartTime=true)
Copy the code
Note that when using the time series aggregation engine, windowSize must be an integer multiple of step, and step must be an integer multiple of updateTime.
Finally, define the streaming data subscription. If live data has been written to the stream table Trade at this point, the live data is immediately subscribed and injected into the aggregation engine:
subscribeTable(tableName="Trade", actionName="act_tsaggr", offset=0, handler=append! {tsAggrKline}, msgAsTable=true)Copy the code
Print the first 5 rows of the table:
2.3 Displaying K-line Data in Python
In this case, the aggregation engine’s output table is also defined as a stream data table, which the client can subscribe to through the Python API and present to the Python terminal.
The following code uses the Python API to subscribe to OHLC, the output table of the real-time aggregate computation, and print out the results using the print function.
Dolphindb as DDB import pandas as pd import numpy as NP # Handler (LST): Print (LST) # subscribe to DolphinDB("127.0.0.1", handler, "OHLC")Copy the code
DolphinDB databases can also be linked to visual systems such as Grafana to query output tables and present the results as graphs.