DolphinDB can store data distributed and has good support for distributed computing. DolphinDB allows users to script efficient distributed algorithms using the common distributed computing framework provided by the system, regardless of the underlying implementation. In this article, important concepts and related functions in the DolphinDB general computing framework are explained in detail, along with rich usage scenarios and examples.
1. The data source
Data sources are a basic concept in DolphinDB’s generic computing framework. It is a special type of data object and a meta-description of data. By executing data sources, users can obtain data entities such as tables, matrices, vectors, and so on. In DolphinDB’s distributed computing framework, lightweight data source objects rather than large data entities were transferred to remote nodes for subsequent calculations, which greatly reduced network traffic.
In DolphinDB, users often use sqlDS functions that generate data sources based on an SQL expression. This function does not query the table directly, but rather returns one or more meta-statements of SQL subqueries, known as data sources. After that, users can use the Map-Reduce framework to pass in data sources and calculation functions, distribute tasks to nodes corresponding to each data source, complete calculations in parallel, and then summarize the results.
Several common methods of obtaining data sources are described in detail in sections 3.1, 3.2, 3.3, and 3.4 of this article.
2. The Map – Reduce framework
The Map-reduce function is a core feature of the DolphinDB generic distributed computing framework.
2.1 Mr Function
DolphinDB’s map-reduce function Mr Has the syntax Mr (DS, mapFunc, [reduceFunc], [finalFunc], [Parallel =true]), which accepts a set of data sources and a mapFunc function as an argument. It distributes computing tasks to nodes where each data source resides and processes the data in each data source using mapFunc. The optional parameter reduceFunc will calculate the return value of mapFunc in pairs, and then calculate the result with the return value of the third mapFunc, so as to accumulate the calculation and summarize the results of mapFunc. If there are M map calls, the reduce function will be called m-1 times. The optional finalFunc argument further processes the return value of reduceFunc.
There is an example of performing distributed least square linear regression with Mr In the official documentation. This article uses the following example to show how a single Mr Call can randomly sample one-tenth of the data in each partition of a distributed table:
/ / create the database and DFS db table = database (" DFS: / / sampleDB ", VALUE, ` ` ` ` d c b a) t = the createPartitionedTable (table (100000:0, ` sym ` val. [SYMBOL,DOUBLE]), `tb, `sym) n = 3000000 t.append! (table(rand(' a 'b' c 'd, n) as sym, Def sampleMap(t) {sampleRate = 0.1 rowNum = t. ws() sampleIndex = (0.. (rowNum - 1). Shuffle ()[0:int(rowNum * sampleRate)] return t[sampleIndex]} ds = sqlDS(<select * from t>) // Create data source res = Mr (DS, sampleMap, unionAll) // perform calculationsCopy the code
In the example above, the user-defined sampleMap function takes a table (that is, data from the data source) as an argument and returns a random tenth of the rows. The Mr Function in this example has no reduceFunc parameter, so the return values of the various Map functions are put ina tuple and passed to finalFunc, unionAll. UnionAll merges the multiple tables returned by the map function into a sequentially partitioned distributed table.
2.2 the imr function
DolphinDB Database provides an iterative calculation function IMR based on the Map-Reduce method. Compared with Mr, it can support iterative calculation, and each iteration uses the results and input data sets of the previous iteration, so it can support the implementation of more complex algorithms. Iterative calculations require initial values and termination criteria for model parameters. Its syntax is IMR (ds, initValue, mapFunc, [reduceFunc], [finalFunc], terminateFunc, [Carryover =false]) where initValue is the initial value of the first iteration, The mapFunc argument is a function that takes the data source entity and the output of the final function from the previous iteration. For the first iteration, it is the initial value given by the user. Parameters in IMR are similar to Mr Functions. The finalFunc function takes two arguments, the first of which is the output of the final function in the previous iteration. For the first iteration, it is the initial value given by the user. The second argument is the output after calling the Reduce function. The terminateFunc parameter is used to determine whether the iteration is aborted. It takes two arguments. The first is the output of the Reduce function in the previous iteration, and the second is the output of the Reduce function in the current iteration. If it returns true, the iteration is aborted. The carryover argument indicates whether the map function call generates an object to be passed to the next map function call. If carryover is true, the map function takes three arguments and the last argument is the carried object, and the output of the map function is a tuple with the last element being the carried object. In the first iteration, the object carried is NULL.
There is an example of computing the median of distributed data through IMR in the official documentation. This article will provide a more complex example of a Logistic Regression calculation using Newton’s method to demonstrate the application of IMR in machine learning algorithms.
def myLrMap(t, lastFinal, yColName, xColNames, intercept) { placeholder, placeholder, Theta = lastFinal (Intercept) x = matrix(t[xColNames], take(1.0, t.rows())) else x = matrix(t[xColNames]) xt = x.transpose() y = t[yColName] scores = dot(x, Theta) p = 1.0 \ (1.0 + exp(-scores)) err = y-p w = P * (1.0-p) logLik = (y * log(p) + (1.0-y) * log(1.0 -scores) Flatten ().sum() grad = xt.dot(err) Return [logLik, grad, hessian]} def myLrFinal(lastFinal) reduceRes) { placeholder, placeholder, theta = lastFinal logLik, grad, hessian = reduceRes deltaTheta = solve(hessian, // deltaTheta = hessian^-1 * grad, Hessian * deltaTheta = grad return [logLik, grad, theta + deltaTheta]} def myLrTerm(prev, curr) tol) { placeholder, grad, placeholder = curr return grad.flatten().abs().max() <= tol } def myLr(ds, yColName, xColNames, intercept, initTheta, tol) { logLik, grad, theta = imr(ds, [0, 0, initTheta], myLrMap{, , yColName, xColNames, intercept}, +, myLrFinal, myLrTerm{, , tol}) return theta }Copy the code
In the above example, the map function computes the gradient vector and Hessian matrix for the data in the data source at the current coefficient; The reduce function adds the results of map, which is equivalent to finding the gradient vector and Hessian matrix of the whole data set. The final function optimizes the coefficients through the final gradient vector and Hessian matrix to complete an iteration. The criterion for the terminate function is whether the absolute value of the largest component of the gradient vector in this iteration is greater than the parameter TOL.
This example can be further optimized to improve performance through data source transformation operations, as described in Section 3.6.
DolphinDB distributed logistic regression was implemented as a built-in function in DolphinDB, a commonly used analysis tool. The built-in version (logisticRegression) provides more functionality.
3. Data source related functions
DolphinDB provides the following common methods for retrieving data sources:
3.1 sqlDS function
The sqlDS function creates a list of data sources based on the entered SQL meta-code. If the data table in an SQL query has N partitions, sqlDS generates N data sources. If the SQL query does not contain any partitioned tables, sqlDS will return a tuple containing only one data source.
SqlDS is an efficient way to convert SQL expressions into data sources. Users can perform distributed algorithms using the returned data sources by simply providing SQL expressions without paying attention to the specific data distribution. The example provided below shows how to perform olsEx distributed least squares regression on data in a DFS table using sqlDS.
/ / create the database and DFS db table = database (" DFS: / / olsDB ", VALUE, ` ` ` ` d c b a) t = the createPartitionedTable (table (100000:0, ` sym ` ` x y, [SYMBOL,DOUBLE,DOUBLE]), `tb, `sym) n = 3000000 t.append! (table(rand(`a`b`c`d, n) as sym, 1.. N + norm(0.0, 1.0, n) as x, 1.. N + norm(0.0, 1.0, n) as y) ds = sqlDS(<select x, y from t>Copy the code
3.2 repartitionDS function
SqlDS data source is automatically generated by the system according to the partition of data. Sometimes users need to place restrictions on the data source, such as repartitioning the data to reduce the amount of computation when fetching data, or requiring only a portion of the partitioned data. The repartitionDS function provides the ability to repartition the data source.
The function repartitionDS generates a new repartitioned data source for the meta-code based on the entered SQL meta-code and column names, partition type, partition scheme, and so on.
The following code provides an example of repartitionDS. In this example, DFS table T has fields deviceId, time, temperature, which are respectively of type symbol, datetime and double. The database adopts two-layer partition. The first layer partitions time according to VALUE, one partition per day. The second layer is divided into 20 HASH areas for deviceId.
You need to query the temperature of the 95th percentile based on the deviceId field aggregation. If you write the query select percentile(temperature,95) from T group by deviceID directly, the query cannot be completed because the percentile function does not have map-reduce implementation.
One solution is to load all the required fields locally and calculate the 95th percentile, but when the amount of data is too large, computing resources may be insufficient. RepartitionDS provides a solution: repartition the table based on deviceId according to its original partition scheme HASH, with each new partition corresponding to all the data of a HASH partition in the original table. The 95th percentile temperature is calculated in each new partition using the Mr Function, and the results are combined.
DeviceId = "device" + string(1.. 100000) db1 = database("", VALUE, 2019.06.01.. 2019.06.30) db2 = database("", HASH, INT:20) db = database(" DFS ://repartitionExample", COMPO, [db1, Db2]) / / create the DFS table t = db. CreatePartitionedTable (table (100000:0, ` deviceId ` time ` temperature, [SYMBOL, DATETIME, DOUBLE]), `tb, `deviceId`time) n = 3000000 t.append! (table(rand(deviceId, n) as deviceId, 2019.06.01T00:00:00 + rand(86400 * 10, n) as time, 60 + norm(0.0, 5.0, N) as temperature)) // repartitionDS = repartitionDS(<select deviceId, temperature from t>, 'deviceId) def(t) { return select percentile(temperature, 95) from t group by deviceId}, , unionAll{, false})Copy the code
The correctness of the result of this calculation can be guaranteed, because the new partition generated by repartitionDS is based on the original partition of deviceId, which can ensure that the deviceId of each data source does not overlap. Therefore, the correct result can be obtained only by combining the calculation results of each partition.
3.3 textChunkDS function
The textChunkDS function can divide a text file into several data sources to perform distributed calculations on the data represented by a text file. Its syntax is textChunkDS(filename, chunkSize, [delimiter=’,’], [schema]). The parameters filename, delimiter, and schema are the same as those of the loadText function. The chunkSize parameter represents the size of the data in each data source, in MB, and can be an integer from 1 to 2047.
The following example is another implementation of the olsEx example in the official documentation. It uses the textChunkDS function to generate several data sources from the text file, each of which is 100MB in size. After converting the generated data sources, olsEx is executed to calculate the least squares parameter:
ds = textChunkDS("c:/DolphinDB/Data/USPrices.csv", 100) ds.transDS! (USPrices -> select VOL\SHROUT as VS, abs(RET) as ABS_RET, RET, log(SHROUT*(BID+ASK)\2) as SBA from USPrices where VOL>0) rs=olsEx(ds, `VS, `ABS_RET`SBA, true, 2)Copy the code
Where the data source conversion operation transDS! , please refer to section 3.6.
3.4 Data source interfaces provided by third-party data sources
Some plug-ins that load third-party data, such as HDF5, provide interfaces to generate data sources. Users can execute distributed algorithms directly on the data sources they return without first importing third-party data into memory or saving it to disk or distributed tables.
DolphinDB HDF5 provides the hdf5DS function, which allows users to specify the number of data sources to be generated by setting the dsNum parameter. The following example generates 10 data sources from an HDF5 file and calculates the sample variance for column 1 of the result using the Map-Reduce framework:
ds = hdf5::hdf5DS("large_file.h5", "large_table", , 10)
def varMap(t) {
column = t.col(0)
return [column.sum(), column.sum2(), column.count()]
}
def varFinal(result) {
sum, sum2, count = result
mu = sum \ count
populationVar = sum2 \ count - mu * mu
sampleVar = populationVar * count \ (count - 1)
return sampleVar
}
sampleVar = mr(ds, varMap, +, varFinal)
Copy the code
3.5 Data Source Caching
Data sources can have 0,1 or more locations. The data source at position 0 is local. In the case of multiple locations, these locations are mutually backed up. The system randomly selects a location to perform distributed computation. When the data source is instructed to cache the data object, the system selects the location where we last successfully retrieved the data.
The user can instruct the system to cache or clear the cache for the data source. For iterative algorithms, such as machine learning algorithms, data caching can greatly improve performance. When the system runs out of memory, cached data is cleared. If this happens, the system can recover the data because the data source contains all meta-descriptions and data transformation functions.
The functions associated with data source caching are:
-
cacheDS! : indicates that the system caches the data source
-
clearcacheDS! : instructs the system to clear the cache after the next execution of the data source
-
CacheDSNow: Executes and caches the data source immediately and returns the total number of cached rows
-
ClearCacheDSNow: Clear the data source and cache immediately
3.6 Data Source Conversion
A data source object can also contain multiple data conversion functions to further process the retrieved data. The system performs these data conversion functions in turn, with the output of one function acting as the input (and only input) to the next.
It is often more efficient to include data transformation functions in the data source than to transform the data source in the core computing operation, the Map function. There is no performance difference if the retrieved data requires only one calculation, but it makes a huge difference when iterating over data sources with cached data objects. If the transformation operation is in the core computation operation, the transformation needs to be performed on each iteration; If the transformation operations are in the data source, they are performed only once. transDS! The function provides the ability to transform data sources.
For example, users might have to manually fill in missing values from data before performing the iterative machine learning function randomForestRegressor (although DolphinDB’s random forest algorithm had missed value processing built in). At this point, you can use transDS! The data source is processed as follows: For each feature column, the missing values are populated with the average value of that column. Assuming that columns X0, X1, x2 and x3 in the table are independent variables and column Y is dependent variable, the implementation method is as follows:
ds = sqlDS(<select x0, x1, x2, x3, y from t>) ds.transDS! (def (mutable t) { update t set x0 = nullFill(x0, avg(x0)), x1 = nullFill(x1, avg(x1)), x2 = nullFill(x2, avg(x2)), x3 = nullFill(x3, avg(x3)) return t }) randomForestRegressor(ds, `y, `x0`x1`x2`x3)Copy the code
Another example of transforming a data source is the scripted implementation of logistic regression mentioned in Section 2.2. In the implementation in Section 2.2, the map function call contains the operation of taking the corresponding column from the table of the data source and converting it to a matrix, which means that these operations occur with each iteration. In reality, the same input matrix is used for each iteration, and the transformation step only needs to be called once. Therefore, you can use transDS! Convert the data source to a triplet containing x, Xt, and y matrices:
Def myLrTrans(t, yColName, xColNames, intercept) {if (intercept) x = matrix(t[xColNames], take(1.0, t.rows())) else x = matrix(t[xColNames]) xt = x.transpose() y = t[yColName] return [x, xt, y] } def myLrMap(input, lastFinal) { x, xt, y = input placeholder, placeholder, Def myLr(mutable ds, yColName, xColNames, intercept, myLrTerm) def myLr(mutable ds, yColName, xColNames, intercept, initTheta, tol) { ds.transDS! (myLrTrans{, yColName, xColNames, intercept}) logLik, grad, theta = imr(ds, [0, 0, initTheta], myLrMap, +, myLrFinal, myLrTerm{, , tol}) return theta }Copy the code