Development of big data applications, not only need a can support the huge amounts of data distributed database, an efficient use of multi-core multi-node distributed computing framework, more need a can organic combination with distributed database and distributed computing, high performance easy extension, expression ability is strong, can meet the needs of rapid development and modeling of programming language. DolphinDB took inspiration from the popular SQL and Python languages for big data processing scripting. This tutorial explains how to quickly develop applications for big data analytics through hybrid paradigm programming. You can also see how DolpinDB’s programming language (DolphinDB) is integrated with databases and distributed computing.

1. Vectorization programming

DolphinDB is the basic DolphinDB programming paradigm. Most DolphinDB functions take support vectors as input arguments. According to the different return values of functions, functions can be divided into two types: one is aggregate function, which returns a scalar; The other is a vector function that returns a vector as long as the input vector.

Vectorization has three main advantages:

  • The code is simple
  • Significantly reduce the cost of scripting language interpretation
  • Many algorithms can be optimized

Time series data can usually be represented by a vector, as can every column in a column database used for data analysis. DolphinDB, as an in-memory computing engine or analytical data warehouse, is particularly suitable for vectorization when analyzing time series data.

Let’s take the simple example of adding two vectors of length 10 million. Imperative programming for statements is not only lengthy, but also takes a hundred times more time than vectorization programming.

N = 10000000 a = rand(1.0, n) b = rand(1.0, n) N) c[I] = a[I] + b[I] // use vectorization programming: c = a + bCopy the code

Vectorization programming is actually batch processing of a set of homogeneous data, and instructions can be optimized not only at compile time but also in many algorithms. Take the moving average, one of the commonly used sliding window indicators of time series data. Assume that the total amount of data is N and the window size is K. If batch calculation is not adopted, the time complexity is O(nk). However, after calculating the moving average of one window, only one data point changes when calculating the next window, so the moving average of the new window can be calculated by adjusting the value of this point, so the time complexity of batch calculation is O(n). In DolphinDB, most of the functions used to calculate sliding-window indicators were optimized to approximate O(n) performance. These functions include mmax, Mmin, MImax, MIMin, MAVG, Msum, McOunt, MSTD, MVAR, MRANK, McOrr, McOvar, mbeta, and MMED. In the following example, the optimized MAVG function performs 300 times better than the AVG function for each window.

N = 10000000 a = rand(1.0, n) window = 60 Elapsed: elapsed: elapsed: 4039.23ms // Time Elapsed: elapsed: 4039.23ms Time elapsed: 12.968 msCopy the code

Vectorization programming also has its limitations. First, not all operations can be done with vectorization. In machine learning and statistical analysis, in some scenarios, we can only iteratively process row by row data, not vectorized calculation. DolphinDB plans to introduce just-in-time compilation (JIT) in future versions to improve performance by dynamically compiling line-by-line code written in for into machine code at runtime.

Secondly, vectorization usually loads the entire vector into a continuous memory, which is required by BOTH Matlab and R. Sometimes large contiguous segments of memory cannot be found due to fragmentation. DolphinDB, in particular, introduced big Arrays for memory fragments, which combine physically discontiguous chunks of memory into logically contiguous vectors. Whether or not the system uses big Array is determined dynamically and transparent to the user. Typically, for big array scans, the performance loss is between 1% and 5% for continuous memory; Random access to a Big array with a performance loss of about 20% to 30%. DolphinDB offered a small acceptable performance penalty in exchange for higher system availability in this regard.

2. SQL programming

SQL is a problem oriented language. The user only needs to give a description of the problem, and the SQL engine produces the result. Typically the SQL engine is part of the database, and other systems communicate with the database through JDBC, ODBC, or Native apis. DolphinDB SQL statements not only support standard SQL functions, but also have many extensions for analyzing big data, especially time series, which greatly simplifies code usage.

2.1 Integration of SQL and programming languages

In DolphinDB, the scripting language was seamlessly integrated with SQL. This integration is mainly reflected in the following aspects:

  • SQL statements are a subset of DolphinDB, expressions. SQL statements can be assigned directly to a variable or as arguments to a function.
  • Variables and functions created in context can be used in SQL statements. If the SQL statement involves a distributed table, these variables and functions are automatically serialized to the corresponding node.
  • An SQL statement is no longer a simple string, but code that can be dynamically generated.
  • SQL statements can operate not only on tables, but also on other data structures such as Scalar, vector, matrix, set and dictionary. Data tables can be transformed with other data structures.

Note that the DolphinDB programming language is case sensitive. DolphinDB all SQL keywords must be lowercase.

In the following example, first generate an employee payroll:

empWages = table(take(1.. 10, 100) as id, take(2017.10m + 1.. 10, 100).sort() as month, take(5000 5500 6000 6500, 100) as wage);Copy the code

Then calculate the average salary for a given group of employees. The list of employees is stored in a local variable, empIds.

empIds = 3 4 6 7 9
select avg(wage) from empWages where id in empIds group by id;
id avg_wage
-- --------
3  5500
4  6000
6  6000
7  5500
9  5500
Copy the code

In addition to calculating the average salary, also display the name of the employee. The employee name is obtained using a dictionary empName.

empNames = dict(1.. 10, `Alice`Bob`Jerry`Jessica`Mike`Tim`Henry`Anna`Kevin`Jones) select empNames[first(id)] as name, avg(wage) from empWages where id in empIds group by id; id name avg_wage -- ------- -------- 3 Jerry 5500 4 Jessica 6000 6 Tim 6000 7 Henry 5500 9 Kevin 5500Copy the code

In the two examples above, the WHERE and SELECT clauses of SQL statements use arrays and dictionaries defined in the context, respectively, allowing simple hash tables to solve problems that would otherwise require subqueries and multiple table joins. If SQL involves a distributed database, these context variables are automatically serialized to the corresponding node. This not only makes the code look cleaner and more readable, but also improves performance.

SQL select statements return tables that can be directly assigned to a local variable for further analysis. DolphinDB also introduced the exec keyword, which can return a matrix, vector, or Scalar compared to SELECT for easier data analysis. In the following example, exec is used in conjunction with Pivot BY to return a matrix directly.

exec first(wage) from emp_wage pivot by month, id; 1 2 3 4 5 6 7 8 9 10 -- -- -- -- -- -- -- -- -- -- 2017.11 M | 5000 5500 6000 6500 5000 5500 6000 6500 2017.12 M | 6000 6500 5000 5500 6000 5500 5000 6500 5000 5500 6000 6500 2018.01 M | 5000 5500 6000 6500 5000 5500 6000 6500 2018.02 M | 6000 6500 5000 5500 6000 5500 5000 6500 5000 5500 6000 6500 2018.03 M | 5000 5500 6000 6500 5000 5500 6000 6500 2018.04 M | 6000 6500 5000 5500 6000 5500 5000 6500 5000 5500 6000 6500 2018.05 M | 5000 5500 6000 6500 5000 5500 6000 6500 2018.06 M | 6000 6500 5000 5500 6000 5500 5000 6500 5000 5500 6000 6500 2018.07 M | 5000 5500 6000 6500 5000 5500 6000 6500 2018.08 M | 6000 6500 5000 5500 6000 5500 5000 6500 5000 5500 6000 6500Copy the code

2.2 Friendly support for Panel Data

The SQL group by clause divides data into groups, each of which produces a value, or row. So using the group by clause generally reduces the number of rows significantly.

After grouping panel data, each group of data is usually time series data, such as grouping by stocks, and the data in each group is a stock price series. When working with panel data, you sometimes want to keep the number of rows per group, that is, generate a value for each row within the group. For example, generate a return sequence from a stock’s price sequence, or generate a moving average price sequence from a price sequence. Other database systems (e.g. SQL Server, PostGreSQL) use window functions to solve this problem. DolpinDB introduces the Context by clause to handle panel data. Compared with window functions, context BY is more concise in syntax and systematic in design (consisting of three clauses for processing grouped data together with Group BY and Pivot BY). Context by is also more powerful in expression, which is embodied in the following three aspects:

  • Not only can it be used with SELECT in queries, but it can also be used with UPDATES to update data.
  • Most database systems can only group existing fields in a table in window functions. The Context by clause can use any existing field and calculate the field.
  • Window functions are limited to a few functions. Context by not only does not limit the functions used, but can use any expression, such as a combination of functions.
  • Context by can be used with the HAVING clause to filter rows within each group.

Given that the TRADES data table records the daily end price of each stock, we can use context by to conveniently calculate the daily return and daily ranking of each stock. First group each stock by ticker symbol and calculate the daily return of each stock. Let’s assume that the data is chronological.

Update trades set ret = ratios(price) -1.0 context by sym;Copy the code

Group by date and calculate the descending rank of returns for each stock per day:

select date, symbol,  ret, rank(ret, false) + 1 as rank from trades where isValid(ret) context by date;
Copy the code

Select the top 10 stocks with daily returns:

select date, symbol, ret from trades where isValid(ret) context by date having rank(ret, false) < 10;
Copy the code

Let’s use a more complex real-world example to show how the Context by clause can effectively solve the panel data problem. A paper 101 Formulaic Alphas introduces 101 quantitative Alpha factors used by Wall Street’s top quant hedge fund WorldQuant. A fund company uses C# to calculate these factors, among which the representative factor 98 not only uses the nesting of multiple indexes of longitudinal time series data, but also uses the sorting information of transverse cross-section data, realizing the use of hundreds of lines of code. It takes about 30 minutes to calculate Alpha factor 98 using 10 years’ historical data of nearly 9 million lines of more than 3,000 stocks in The Chinese stock market. DolphinDB, as shown below, achieved a performance improvement of nearly three orders of magnitude in just two seconds over four lines of core code.

def alpha98(stock){ t = select code, valueDate, adv5, adv15, open, vwap from stock order by valueDate update t set rank_open = rank(open), rank_adv15 = rank(adv15) context by valueDate update t set decay7 = mavg(mcorr(vwap, msum(adv5, 26), 5), 1.. 7), decay8 = mavg(mrank(9 - mimin(mcorr(rank_open, rank_adv15, 21), 9), true, 7), 1.. 8) context by code return select code, valueDate, rank(decay7)-rank(decay8) as A98 from t context by valueDate }Copy the code

2.3 Friendly support for time series data

DolphinDB’s database uses columnar data storage and vectorization programming for calculations, which is naturally time-series friendly.

  • DolphinDB supports different precision time types. SQL statements can be used to convert high-frequency data into low-frequency data with different precision, such as second, minute, and hourbarFunction in combination with the group by clause to convert data at any time interval.
  • DolphinDB supports modeling of time series relationships, including lead, lag, sliding window, and cumulative window. More importantly, DolphinDB was optimized for the usual metrics and functions used in such modeling, and performed an order of magnitude or two better than other systems.
  • DolphinDB provides efficient and common table joins designed for time series: ASOF Join and Window Join.

Let’s use a simple example to explain window Join. For example, the average salary of a group of people over the previous three months at a certain point in time. We can do this simply by using Window Join (Wj). Refer to the user manual for an explanation of the Window Join function

P = table(1 2 3 as id, 2018.06m 2018.07m 2018.07m as month) s = table(1 2 1 2 1 2 1 2 as id, 2018.04m + 00 1 1 2 2 wage from wj(p, s) select * from wj(p, s) - 3:1, "avg (wage) >, ` id ` month) id the month avg_wage -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 1, 2018.06 M 5250 2 2018.07 M 4833.333333 3 2018.07 MCopy the code

In other database systems, equal Join (id field) and non-Equal Join (month field), as well as the group by clause, can be used to solve the above problem. In addition to being more complex, however, DolphinDB s Window Join is more than two orders of magnitude worse than DolphinDB s.

Window Join is widely used in the field of financial analysis. A classic use is to associate the tables of trades with the tables of quotes to calculate transaction costs.

The following is a table of trades, not partitioned or partitioned by date and ticker symbol:

Sym date time price qty -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- IBM 2018.06.01 10:01:01. MSFT 2018.06.01 005 143.19 100 10:01:04. 006 107.94 200Copy the code

The following are quotes, unzoned or zoned by date and ticker symbol:

Sym date time bid ask bidSize askSize -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- IBM 2018.06.01 10:01:01.006 143.18 143.21 400 200 MSFT 2018.06.01 10:01:04.010 107.92 107.97 800 100Copy the code

Use ASOF Join to find the most recent offer for each trade and use the midpoint of the offer as a benchmark for transaction costs:

DateRange = 2018.05.01: 2018.08.01 Select sum(abs(price - (bid+ask)/2.0)*qty)/sum(price*qty) as cost from aj(trades, quotes, `date`sym`time) where date between dateRange group by sym;Copy the code

Use Window Join to find the first 10 milliseconds for each trade and calculate the average median price as a benchmark for transaction costs:

select sum(abs(price - mid)*qty)/sum(price*qty) as cost from pwj(trades, quotes, -10:0, <avg((bid + ask)/2.0) as mid>, 'date' sym 'time) where date between dateRange group by sym;Copy the code

2.4 Other extensions to SQL

DolphinDB Database has many other extensions to SQL for big data analysis. Here are some common features.

  • User-defined functions can be used in SQL on this node or in a distributed environment without compilation, packaging, or deployment.
  • As shown in Section 5.4, DolphinDB SQL is tightly integrated with the distributed computing framework, making in-database Analytics easier and more efficient.
  • DolphinDB supports composite columns, which output multiple return values from complex analysis functions to one row of a table.

To use a combined field in an SQL statement, the output of a function must be a simple key-value pair or array. If not, you can use custom functions to convert. Refer to the user manual for detailed usage of the combined fields.

Factor1 =3.2 1.2 5.9 6.9 11.1 9.6 1.4 7.3 2.0 0.1 6.1 2.9 6.3 8.4 5.6 Factor2 =1.7 1.3 4.2 6.8 9.2 1.3 1.4 7.8 7.9 9.9 9.3 T =table(take(1 2 3, 15).sort() as id, 1.. 15 as y, factor1, factor2);Copy the code

Run ols for each ID, y = alpha + beta1 * factor1 + beta2 * factor2, and output the parameters alpha, beta1, beta2.

select ols(y, [factor1,factor2], true, 0) as `alpha`beta1`beta2 from t group by id; Id alpha beta1 beta 2 -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 1 1.063991 0.258685 0.732795 6.886877 0.148325 0.303584 2 3 11.833867 0.272352 to 0.065526Copy the code

When you print parameters, print R2. Wrap the output with custom functions.

def myols(y,x){ r=ols(y,x,true,2) return r.Coefficient.beta join r.RegressionStat.statistics[0] } select myols(y,[factor1,factor2]) as `alpha`beta1`beta2`R2 from t group by id; Id alpha beta1 beta 2 R2 -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 1 1.063991 0.258685 0.732795 0.946056 2, 6.886877 -0.148325 0.303584 0.992413 3 11.833867 0.272352-0.065526 0.144837Copy the code

Imperative programming

DolphinDB, like mainstream scripting languages such as Python and JavaScript and compiled strongly typed languages such as C++, C and Java, supports imperative programming, which tells the computer what to do first and what to do next. DolphinDB currently supports 18 DolphinDB statements (see Chapter 5 of the User’s manual for details), including the most common assignment statement, the branch statement if.. Else, and loop statements for and do.. While, etc.

DolphinDB supports assignment to both single and multiple variables.

x = 1 2 3
y = 4 5
y += 2
x, y = y, x //swap the value of x and y
x, y =1 2 3, 4 5
Copy the code

DolphinDB currently supports for and do.. While statement. The body of a for loop can include pairs, vectors, matrices, and tables.

Sum from 1 to 100:

s = 0
for(x in 1:101) s += x
print s
Copy the code

Summation of elements in an array:

s = 0;
for(x in 1 3 5 9 15) s += x
print s
Copy the code

Print the mean value of each column of the matrix:

m = matrix(1 2 3, 4 5 6, 7 8 9)
for(c in m) print c.avg()
Copy the code

Compute the product of two columns for each row in the data table:

t= table(["TV set", "Phone", "PC"] as productId, 1200 600 800 as price, 10 20 7 as qty)
for(row in t) print row.productId + ": " + row.price * row.qty
Copy the code

DolphinDB branch statement if.. Else is consistent with other languages.

if(condition){
    <true statements>
}
else{
     <false statements>
}
Copy the code

It is not recommended to use control statements (for statements, if.. Else statement) process data line by line. These control statements are generally used for the processing and scheduling of the upper module, and the data processing module at the bottom is recommended to use vector programming, functional programming, SQL programming and other ways to deal with.

4. Functional programming

DolphinDB supports most of the features of functional programming, including:

  • Pure function
  • User-defined functions (or UDFs for short)
  • Lambda functions
  • Higher Order Function
  • Partial Applications
  • Closure (closure

Please refer to chapter 7 of the user manual for details.

4.1 Custom functions and Lambda functions

DolphinDB allows you to create custom functions with or without names (usually lambda functions). The created function conforms to the requirements of pure functions, that is, only the input parameters of the function can affect the output of the function. DolphinDB, unlike Python, refers only to function arguments and local variables inside a function, not to variables defined outside the function. From a software engineering perspective, this sacrifices some of the flexibility of syntactic sugar, but goes a long way toward improving software quality.

Def date (dates){return dates[def(x): dates (x) between $date (x))} $date ($date (x) between $date (x))} $date ($date (x) between $date (x) 2018.10.01) [2018.08.01 2018.10.01]Copy the code

In the example above, we define a function getWeekDays, which takes a set of dates and returns the dates between Monday and Friday. The realization of the function adopts vector filtering function, that is, accept a Boolean monocular function for data filtering. We define a lambda function for data filtering.

4.2 Higher-order functions

A higher-order function is one that can take another function as an argument. In DolphinDB, higher-order functions are used mainly as template functions for processing data. Usually the first parameter is another function for specific processing. For example, if object A has m elements and object B has n elements, A common pattern is for any element in A to be paired with any element in B to produce an M by N matrix. DolphinDB abstracts this data processing pattern as a higher-order function called Cross. DolphinDB provides many similar template functions, Including all, any, each, loop, eachLeft, eachRight, eachPre, eachPost, accumulate, Reduce, groupby, contextby, pivot, cross, Moving, rolling and so on.

The following example uses three higher-order functions, using only three lines of code, to calculate the correlation between each two stocks based on their tick-level trading data during the day.

Simulation generation of 10000000 data points (ticker symbol, trading time and price) :

Syms = rand(' FB 'GOOG' MSFT 'AMZN' IBM, n) time = 09:30:00.000 + rand(' FB 'GOOG' MSFT 'AMZN' IBM, n) price = 500.0 + rand(500.0, n)Copy the code

Use pivot function to generate the stock price perspective matrix, one stock per column, one minute per action:

priceMatrix = pivot(avg, price, time.minute(), syms)
Copy the code

The each and ratios functions work with each column of the stock price matrix to convert the stock price to yield:

retMatrix = each(ratios, priceMatrix) - 1
Copy the code

Cross and CORr functions are used together to calculate the correlation of return rate of each two stocks:

corrMatrix = cross(corr, retMatrix, retMatrix)
Copy the code

The result is:

AMZN FB GOOG IBM MSFT -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- AMZN | 1 0.015181 0.056245 0.005822 0.084104 FB | 1-0.028113 0.034159 0.117279 0.015181 GOOG | 1-0.039278-0.028113-0.056245-0.025165 IBM | 0.005822 0.034159 to 0.039278 1-0.049922 MSFT | 0.084104-0.117279-0.025165-0.049922-1Copy the code

4.3 Partial Application

Partial application refers to the generation of a new function when some or all of its parameters are given. In DolphinDB, parentheses () were used for function calls and {} for some applications. The ratios function used in the example in Section 3.2 applies eachPre{ratio} as part of the higher order eachPre function.

The following two lines of code are isomorphic:

retMatrix = each(ratios, priceMatrix) - 1
retMatrix = each(eachPre{ratio}, priceMatrix) - 1
Copy the code

Partial applications are often used for higher-order functions. When higher-order functions are used, they usually have specific requirements for certain parameters. Partial applications can ensure that all parameters meet the requirements. For example, to compute the correlation between a vector A and each column in a matrix M, the function corr can be used in conjunction with the higher-order function each. However, if the vector and matrix are directly listed as parameters of CORR in each, the system will try to calculate the correlation between an element of the vector and a column of the matrix, resulting in an error. At this time, a new function corr{a} can be formed by partial application of function corr and vector A, and then used in each column of the matrix together with higher-order function each, as shown in the following example. We could also use the for statement to solve this problem, but the code is verbose and time-consuming.

a = 12 14 18
m = matrix(5 6 7, 1 3 2, 8 7 11)
Copy the code

Use each and parts to calculate the correlation of vector A with each column in the matrix:

each(corr{a}, m)
Copy the code

Use the for statement to solve the above problem:

cols = m.columns()
c = array(DOUBLE, cols)
for(i in 0:cols)
    c[i] = corr(a, m[i])
Copy the code

Another great use of partial applications is to keep functions in state. In general, we expect functions to be stateless, that is, the output of the function is entirely determined by the input parameters. But sometimes we want functions to have states. For example, in stream computing, the user is typically given a Message handler that accepts a new message and returns a result. If we want the message handler to return the average of all the data received so far, we can solve this problem with a partial application.

def cumavg(mutable stat, NewNum){stat[0] = (stat[0] * stat[1] + newNum)/(stat[1] + 1) stat[1] += 1 return stat[0]} msgHandler = cumavg{0.0 0.0} Each (msgHandler, 1 2 3 4 5) [1,1.5,2,2.5,3]Copy the code

5. Remote procedure call programming

Remote Procedure Call (RPC) is one of the most common infrastructures in distributed systems. DolphinDB’s distributed file system implementation, distributed database implementation, and distributed computing framework implementation all use DolphinDB’s original RPC system. DolphinDB’s scripting language uses RPC to execute code on remote machines. DolphinDB has the following characteristics when using RPC:

  • You can not only execute functions registered on the remote machine, but also serialize locally customized functions to the remote node for execution. Permissions when the code is running on the remote machine are the same as the permissions of the currently logged in user locally.

  • The parameters of a function can be either the conventional Scalar, vector, matrix, set, Dictionary, and table, or functions including custom ones.

  • You can use either an exclusive connection between two nodes or a shared connection between clustered data nodes.

5.1 Using remoteRun to perform remote functions

DolphinDB uses XDB to create a connection to a remote node. The remote node can be any DolphinDB running node and does not have to be part of the current cluster. After the connection is created, functions registered on the remote node or locally customized functions can be executed on the remote node.

h = xdb("localhost", 8081);
Copy the code

Execute a script on a remote node:

remoteRun(h, "sum(1 3 5 7)");
16
Copy the code

The above remote calls can also be abbreviated as:

h("sum(1 3 5 7)");
16
Copy the code

Execute a function registered on the remote node on the remote node:

h("sum", 1 3 5 7);
16
Copy the code

Execute local custom functions on remote system nodes:

def mysum(x) : reduce(+, x)
h(mysum, 1 3 5 7);
16
Copy the code

Create a shared table SALES on the remote node (localhost:8081)

H ("share table(2018.07.02 2018.07.02 2018.07.03 as date, 1 2 3 as qTY, 10 15 7 as price) as sales");Copy the code

If a local custom function has a dependency, the dependent custom function is automatically serialized to the remote node:

defg salesSum(tableName, d): select mysum(price*qty) from objByName(tableName) where date=d
h(salesSum, "sales", 2018.07.02);
40
Copy the code

5.2 Executing Remote Functions using RPC

Another way DolphinDB uses remote procedure call functionality is through RPC functions. The RPC function accepts the name of the remote node, the definition of the function to be executed, and the required parameters. RPC can only be used between control nodes and data nodes in the same cluster, but instead of creating a new connection, existing network connections are reused. This saves network resources and delays in creating new connections. This makes sense when the node has many users. RPC functions can only perform one function on the remote node. If you want to run a script, wrap it in a custom function. The following example must be used in a DolphinDB cluster. NodeB is the alias of the remote node, and shared table SALES already exists on nodeB.

RPC (" nodeB "salesSum," sales ", 2018.07.02); 40Copy the code

When using RPC, it is recommended to use partial applications to write function parameters and function definitions together to form a new zero-argument function definition to increase code readability.

RPC (" nodeB salesSum {" sales ", 2018.07.02}); 40Copy the code

Master is the alias of the controller node. DolphinDB can only create users on control nodes:

rpc("master", createUser{"jerry", "123456"});
Copy the code

The arguments required by RPC functions can also be another function, including built-in functions and custom functions:

rpc("nodeB", reduce{+, 1 2 3 4 5});
15
Copy the code

5.3 Using other functions to perform remote functions indirectly

Both remoteRun and RPC can execute locally customized functions on a remote node. This is one of the biggest differences between DolphinDB’s RPC subsystem and other RPC systems. In other systems, typically RPC clients can only passively call registration functions that have been exposed by remote nodes. In the field of big data analytics, data scientists often come up with new interface requirements based on new r&d projects. Waiting for IT departments to release new apis often takes a long time, which can seriously affect the efficiency and timing of r&d. To execute custom functions on remote nodes, custom functions must currently be developed using DolphinDB scripts. In addition, higher requirements on data security are put forward, and user access permissions must be carefully planned and set. If users are restricted to using only registered functions, user access management can be as simple as denying external users access to all data and granting external users access to registered view functions.

DolphinDB also provides many functions that use remote procedure calls indirectly, in addition to using remoteRun and RPC functions directly. For example, linear regression in distributed databases uses RPC and olsEx. In addition, pnodeRun is used to run the same function in parallel on multiple nodes of the cluster and combine the results returned. This is useful in cluster management.

Each data node returns the last 10 running or completed batch jobs:

pnodeRun(getRecentJobs{10});
Copy the code

Return the last 10 SQL queries for nodeA and nodeB:

pnodeRun(getCompletedQueries{10}, `nodeA`nodeB);
Copy the code

Clear the cache on all data nodes:

pnodeRun(clearAllCache);
Copy the code

5.4 Distributed Computing

Mr Is used to develop distributed computing based on MapReduce; Imr for developing distributed computing based on iterative MapReduce. Users only need to specify distributed data sources and core functions such as Map, reduce, and final functions. Below we demonstrate an example of calculating median and linear regression using distributed data.

N =10000000 x1 = pow(rand(1.0,n), 2) x2 = norm(3.0, 1.0,n) y = 0.5 + 3 * x1-0.5*x2 + norm(0.0, 1.0, n) t=table(rand(10, n) as id, y, x1, x2) login(`admin,"123456") db = database("dfs://testdb", VALUE, 0.. 9) db.createPartitionedTable(t, "sample", "id").append! (t)Copy the code

Using the customized map function myOLSMap, the built-in reduce function (+), the customized final function myOLSFinal, and the built-in map-Reduce framework function Mr, build a function myOLSEx to run linear regression on distributed data sources.

Def myOLSMap(table, yColName, xColNames){x = matrix(take(1.0, table.rows()), table[xColNames]) xt = x.ranspose (); return xt.dot(x), xt.dot(table[yColName]) } def myOLSFinal(result){ xtx = result[0] xty = result[1] return xtx.inv().dot(xty)[0] } def myOLSEx(ds, yColName, xColNames){ return mr(ds, myOLSMap{, yColName, xColNames}, +, myOLSFinal) }Copy the code

Calculate linear regression coefficients using user-defined distributed algorithms and distributed data sources:

sample = loadTable("dfs://testdb", "sample") myOLSEx(sqlDS(<select * from sample>), `y, `x1`x2); [0.4991, 3.0001, 0.4996]Copy the code

The built-in ols function and undivided data were used to calculate the coefficients of linear regression, and the same result was obtained:

ols(y, [x1,x2],true); [0.4991, 3.0001, 0.4996]Copy the code

In the following example, we construct an algorithm that computes the approximate median of a set of data on a distributed data source. The basic principle of the algorithm is to use the bucketCount function to calculate the number of data in a group on each node, and then add up the data on each node. So we can figure out where the median falls. If the interval is not small enough, further subdivide the interval until it is less than the given precision requirement. The median algorithm requires many iterations, so we use the iterative computing framework IMR.

def medMap(data, range, colName): bucketCount(data[colName], double(range), 1024, true) def medFinal(range, Cumsum () index = x.sof (x[1025]/2.0) ranges = range[1] -range [0] if(index == -1) return (range[0] - Ranges *32):range[1] else if(index == 1024) return range[0]:(range[1] + ranges*32) else{interval = ranges / 1024.0 startValue = range[0] + (index - 1) * interval return startValue : (startValue + interval) } } def medEx(ds, colName, range, precision){ termFunc = def(prev, cur): cur[1] - cur[0] <= precision return imr(ds, range, medMap{,,colName}, +, medFinal, termFunc).avg() }Copy the code

Calculate the median of distributed data using the approximate median algorithm above:

Sample = loadTable(" DFS ://testdb", "sample") medEx(sqlDS(<select y from sample>), 'y, 0.00:1.0, 0.001); 0.052973Copy the code

Calculate the median of unpartitioned data using the built-in MED function:

med(y); 0.052947Copy the code

6. Metaprogramming

Metaprogramming refers to using program code to create program code that can run dynamically. The goal of metaprogramming is generally to delay the execution of code or to create code on the fly.

DolphinDB supports metaprogramming to dynamically create expressions, such as expressions for function calls and SQL query expressions. Many business details cannot be determined in the coding phase. For example, a custom report can determine a complete SQL query expression only at run time when the customer selects the table, field, and field format.

Delayed code execution generally falls into the following categories:

  • Provide a callback function
  • Delayed execution sets the stage for overall optimization
  • The problem description is completed in the program coding phase, but the problem implementation is completed in the program running phase

DolphinDB implements metaprogramming in two ways: using Angle brackets <> to indicate dynamic code that needs to be deferred, and using functions to create various expressions. Common functions for metaprogramming include objByName, sqlCol, sqlColAlias, SQL, expr, eval, Partial, makeCall.

Use <> to generate deferred dynamic expressions:

a = <1 + 2 * 3>
a.typestr();
CODE

a.eval();
7
Copy the code

Use the function to generate a deferred dynamic expression:

a = expr(1, +, 2, *, 3)
a.typestr();
CODE

a.eval();
7
Copy the code

Reports can be customized using metaprogramming. The user’s input includes the data table, the field name, and the corresponding format string for the field. In the following example, the SQL expression is dynamically generated and executed based on the entered data table, field name and format, and filtering criteria.

def generateReport(tbl, colNames, colFormat, filter){
	colCount = colNames.size()
	colDefs = array(ANY, colCount)
	for(i in 0:colCount){
		if(colFormat[i] == "") 
			colDefs[i] = sqlCol(colNames[i])
		else
			colDefs[i] = sqlCol(colNames[i], format{,colFormat[i]})
	}
	return sql(colDefs, tbl, filter).eval()
}
Copy the code

The simulation generates a 100-row data table:

t = table(1.. 100 as id, (1.. 100 + 2018.01.01) as date, rand(100.0, 100) as price, rand(10000, 100) as qty;Copy the code

Enter filter criteria, fields, and formats to customize reports. The filtering criteria use metaprogramming.

GenerateReport (t, [" id ", "date", "price", "qty"], [" 000 ", "MM/dd/yyyy", "00.00", "# # # #"], < 95 > id < 5 > or id); Id Date Price qty --- ---------- ---------- 001 01/02/2018 50.27 2,886 002 01/03/2018 30.85 1,331 003 01/04/2018 17.89 18 004 01/05/2018 51.00 6,439 096 04/07/2018 57.73 8,339 097 04/08/2018 47.16 2,425 098 04/09/2018 27.90 4,621 099 04/10/2018 31.55 7,644 100 04/11/2018 46.63 8,383Copy the code

DolphinDB uses metaprogramming for some of its built-in functions. In window Join, you specify one or more aggregate functions for the window data set of the right table and the parameters that these functions need to run. Since the problem is described and executed at two different stages, we use metaprogramming to implement deferred execution.

t = table(take(`ibm, 3) as sym, 10:01:01 10:01:04 10:01:07 as time, 100 101 105 as price) q = table(take(`ibm, 8) as sym, 10:01:01+ 0.. 7 as time, 101 103 103 104 104 107 108 107 as ask, 98 99 102 103 103 104 106 106 as bid) wj(t, q, -2 : 1, < [Max (ask), min (bid), avg (+ ask (bid) * 0.5) as avg_mid] >, ` time); Sym time price max_ask min_bid avg_mid -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- IBM 10:01:01 100.25 IBM, 100, 103, 98 10:01:04 101 104 99 102.625 IBM 10:01:07 105 108 103 105.625Copy the code

Another built-in DolphinDB feature that uses metaprogramming is updating memory partition tables. Of course, memory partition table update, delete, sort and other functions can also be completed through SQL statements.

Create a date-partitioned database in memory and simulate the generation of trades tables:

db = database("", VALUE, 2018.01.02 2018.01.03)
date = 2018.01.02 2018.01.02 2018.01.02 2018.01.03 2018.01.03 2018.01.03
t = table(`IBM`MSFT`GOOG`FB`IBM`MSFT as sym, date, 101 103 103 104 104 107 as price, 0 99 102 103 103 104 as qty)
trades = db.createPartitionedTable(t, "trades", "date").append!(t);
Copy the code

Delete the records with QTY 0 and sort them in ascending order by volume in each partition:

trades.erase! (<qty=0>).sortBy! (<price*qty>);Copy the code

Add a new field logPrice:

trades[`logPrice]=<log(price)>;
Copy the code

Update the number of trades in IBM stock:

trades[`qty, <sym=`IBM>]=<qty+100>;
Copy the code

7. Summary

DolpinDB is a programming language designed for data analysis. Unlike other data analysis languages such as Matlab, SAS, pandas, etc., DolpinDB is tightly integrated with distributed databases and distributed computing and is naturally capable of handling large amounts of data. DolphinDB supports SQL programming, functional programming, and meta-programming. The language is simple, flexible and expressive, which greatly improves the development efficiency of data scientists. DolphinDB supports vectorization and distributed computing and is extremely fast.