Distributed Computing

This chapter describes parallel function calling, remote function calling, parallel remote calling, and pipeline function.

Parallel Function Call

DolphinDB can divide a large task into multiple subtasks for simultaneous execution.

Parallel function calls usually utilize one of the two higher order functions: peach or ploop. peach and ploop are the parallel computing version of each and loop, respectively. For the difference between each and loop, please refer to the section about loop.

There are 3 scenarios of parallel function calls.

(1) same function but different parameters

$ peach(log, (1..3, 4..6));

#0

#1

0

1.386294

0.693147

1.609438

1.098612

1.791759

$ ploop(log, (1..3, 4..6));
$ ([0,0.693147,1.098612],[1.386294,1.609438,1.791759])

(2) different functions but same parameters

$ peach(call{, 3 4 5}, (log, sum));

log

sum

1.098612

12

1.386294

12

1.609438

12

$ ploop(call{, 3 4 5}, (log, sum));
$ ([1.098612,1.386294,1.609438],12)

Note that in the examples above, we cannot write peach((log, sum), 3 4 5). This is because the first parameter of a template has to be a function name, not function name array. To call multiple functions in peach or ploop, we need to use the template call.

(3) different functions and different parameters

$ x=[log, exp];
$ y=[1 2 3, 4 5 6];
$ peach(call, x, y);

#0

#1

0

54.59815

0.693147

148.413159

1.098612

403.428793

$ ploop(call, x, y);
$ ([0,0.693147,1.098612],[54.59815,148.413159,403.428793])

How Parallel Computing Works in DolphinDB

DolphinDB supports parallel computing through multi-threading. Suppose there are n tasks and there are m local executors. (For local executors please refer to the section about distributed computing concepts). The calling thread (worker) generates n sub tasks and pushes n*m/(1 + m) sub tasks to the local executor task queue. The remaining n/(1 + m) sub tasks will be executed by the calling thread. After all n sub tasks are executed, the calling thread combines the individual results to produce the final result.

To use parallel function call, we need to make sure the number of local executors is set to be a positive integer in a configuration file.

The system throws an exception if parallel function calls are initiated within a sub task, as it may cause deadlock. If we initiate parallel function calls within a sub task, the system will allocate these new sub tasks to local executors, but all of the local executors have been assigned sub tasks when we initiate parallel function calls the first time (when n>1 + m). Since a local executor can only process one task at a time, we may have situations where local executors have self-contradictory work flow priorities and as a result the sub tasks cannot be executed.

Some built-in functions enable parallel function call if the number of local executors is set to be a positive integer in the system configuration file, such as peach, ploop, pnodeRun, ploadText and loadText.

Remote Function Call

In DolphinDB, we can send a local function that calls other local functions (all of which can be either built-in functions or user-defined functions) to run on a remote node on the fly without compilation or deployment. The system automatically serializes the function definition and the definitions of all dependent functions together with necessary local data to remote nodes.

We must open a connection before a remote call. To open a connection, we can run conn=`xdb(host, port)` where host is the host name (IP address or website) of the remote node and port is the port number of the remote node.

There are 3 ways to close a connection:

(1) call the close command.

(2) conn/=NULL.

(3) the connection will be closed automatically when the current session closes.

You can use remoteRun , remoteRunWithCompression and rpc for remote call. Their differences are:

  • rpc utilizes existing asynchronous connections among data nodes in the cluster; remoteRun uses explicitly created connections by xdb function.

  • The calling node and the remote node of the rpc function must be located in the same cluster; there is no such limitation for remoteRun.

There are 3 ways to remote call:

(1) Execute script on a remote node.

Syntax: remoteRun(conn, script) or conn(script) where script must be double quoted (a string).

$ conn =  xdb("localhost",81);
$ remoteRun(conn, "x=rand(1.0,10000000); y=x pow 2; avg y");
0.333254

(2) Execute a remote function on a remote node. The function is defined on the remote node, while the parameters are located on the local node.

Syntax: remoteRun(conn, “functionName”, param1, param2, …) or conn(“function name”, param1, param2, …)

functionName must be quoted. The function could be either a built-in or user-defined function.
$ conn =  xdb("localhost",81);
$ remoteRun(conn, "avg", rand(1.0,10000000) pow 2);
0.333446

(3) execute a local function on a remote node. The function is defined in the local node. It could be a built-in function or user-defined function, a named function or anonymous function. The parameters of the function are also located on the local node.

This is the most powerful feature of remote call in DolphinDB. We can send a local function that calls other local functions (all of them can be either built-in functions or user-defined functions) to run on a remote node on the fly without compilation or deployment. The system will automatically serialize the function definition and the definitions of all dependent functions together with necessary local data to remote nodes. Some other systems can only remote call functions without any user-defined dependent functions.

Syntax of remoteRun: remoteRun(conn, functionName, param1, param2, …) or conn(functionName, param1, param2, …)

functionName must not be quoted. param1, param2, … are function arguments. The function could be either a built-in function or a user-defined function on the calling node.

Syntax of rpc: rpc(nodeAlias, function, param1, param2, …)

function must not be quoted. param1, param2, … are function arguments. The function could be either a built-in function or a user-defined function on the calling node.
Both the calling node and the remote node must be located in the same cluster. Otherwise, we need to use remoteRun function.
  • Example 1: remote call a user-defined function with a local dataset

Assume at the local node we have a table EarningsDates with 2 columns: stock ticker and date. For each of the 3 stocks in the table, we have the date when it announced earnings for the 3rd quarter of 2006. There is a table USPrices at a remote node with machine name “localhost” and port number 8081. It contains daily stock prices for all US stocks. We would like to get the stock prices from the remote node for all stocks in EarningsDates for the week after they announced earnings.

At the remote node, we import the data file to create the table USPrices, and then share it across all nodes as sharedUSPrices.

$ USPrices = loadText("c:/DolphinDB/Data/USPrices.csv");
$ share USPrices as sharedUSPrices;

When we create a connection to a remote node, the remote node creates a new session for this connection. This new session is completely isolated from other sessions on the remote node. This is convenient for development as developers don’t have to worry about name conflicts. In this case, however, we do want to share data among multiple sessions on the same node. We can use the statement share to share the objects. Currently only tables can be shared in DolphinDB.

We create a table EarningsDates at the local node, and send the table with the script over to a remote node. After the execution, the result is sent back to the local node.

We create a table EarningsDates at the local node, and send the table with the script over to a remote node. After the execution, the result is sent back to the local node.

$ EarningsDates=table(`XOM`AAPL`IBM as TICKER, 2006.10.26 2006.10.19 2006.10.17 as date)

$ def loadDailyPrice(data){
$     dateDict = dict(data.TICKER, data.date)
$     return select date, TICKER, PRC from objByName("sharedUSPrices") where dateDict[TICKER]<date<=dateDict[TICKER]+7
$ }
$ conn = xdb("localhost",8081)
$ prices = conn(loadDailyPrice, EarningsDates);

$ prices;

date

TICKER

PRC

2006.10.27

XOM

71.46

2006.10.30

XOM

70.84

2006.10.31

XOM

71.42

2006.11.01

XOM

71.06

2006.11.02

XOM

71.19

2006.10.18

IBM

89.82

2006.10.19

IBM

89.86

2006.10.20

IBM

90.48

2006.10.23

IBM

91.56

2006.10.24

IBM

91.49

2006.10.20

AAPL

79.95

2006.10.23

AAPL

81.46

2006.10.24

AAPL

81.05

2006.10.25

AAPL

81.68

2006.10.26

AAPL

82.19

  • Example 2: remote call a built-in function that quotes a user-defined function

$ def jobDemo(n){
$ s = 0
$ for (x in 1 : n) {
$ s += sum(sin rand(1.0, 100000000)-0.5)
$ print("iteration " + x + " " + s)
$ }
$ return s
$ };

Remote call with function remoteRun:

$ conn = xdb("DFS_NODE2")
$ conn.remoteRun(submitJob, "jobDemo", "job demo", jobDemo, 10);
Output: jobDemo4

$ conn.remoteRun(getJobReturn, "jobDemo")
Output: 4238.832005

Remote call with function rpc:

  • Please avoid cyclical calls with remoteRun as it may cause deadlocks. For example, if we run the following script on localhost:8080:

$ def testRemoteCall() {
$     h=xdb("localhost", 8080)
$     return h.remoteRun("1+2")
$ }
$ h = xdb("localhost", 8081)
$ h.remoteRun(testRemoteCall)

The node 8080 sends the locally defined function testRemoteCall to the node of 8081, which will send the script “1+2” back to execute on the node of 8080. When a node receives a job, it will assign a worker thread to execute the job. The remote call from 8080 to 8081 and “1+2” are both executed on the node 8080 and may be assigned the same worker. If these 2 jobs share the same worker, a deadlock occurs.

Parallel Remote Call

A remote call is in blocking mode, i.e., it will not return results until the remote node completes the function call. Parallel remote call with remoteRun needs to be used with ploop or peach. In the following example, template each executes the user-defined function experimentPi on node 8081, and then on node 8082; while template peach executes the function experimentPi on node 8081 and node 8082 simultaneously. We can see peach saves a significant amount of time compared with each.

$ def simuPi(n){
$ x=rand(1.0, n)
$ y=rand(1.0, n)
$ return 4.0 * sum(x*x + y*y<=1.0) / n
$ }
$ def experimentPi(repeat, n): avg each(simuPi, take(n, repeat));

$ // create 2 connections
$ conns = each(xdb, "localhost", 8081 8082);
$ conns;
$ ("Conn[localhost:8081:1166953221]","Conn[localhost:8082:1166953221]")

$ timer result = each(remoteRun{, experimentPi, 10, 10000000}, conns);
Time elapsed: 6579.82 ms

$ timer result = peach(remoteRun{, experimentPi, 10, 10000000}, conns);
Time elapsed: 4122.29 ms
// parallel computing saves running time

$ print avg(result)
3.141691

// close two connections
$ each(close, conns);

To use remoteRun in parallel remote call, we need to establish connections to each remote node with function xdb. To remote call the nodes within the same cluster as the calling node, we can use pnodeRun.

Syntax: pnodeRun(function, [nodes], [addNodeToResult])

function: the local function to call. It must not be quoted. It must have no parameters. It can be a function with no parameters by definition, or a partial application that wraps the orginal function and its parameters to a function with no parameters. It can be a built-in function or a user-defined function.

nodes: aliases of nodes. It is an optional parameter. If it is not specified, the system will call the function on all live data nodes in the cluster.

addNodeToResult: whether to add aliases of nodes to results. It is an optional parameter. The default value is true. If the returned result from each node already contains the node alias, we can set it to false.

pnodeRun calls a local function on multiple remote nodes in parallel and then merges the results. Both the calling node and the remote nodes must be located in the same cluster.

In the following simple example, we wrap the function sum and arguments 1..10 to a partial application sum{1..10}.

$ pnodeRun(sum{1..10}, `nodeA`nodeB);
$ Output:
$ Node          Value
$ DFS_NODE2        55
$ DFS_NODE3        55

pnodeRun is a very convenient tool for cluster management. For example, in a cluster with 4 nodes: “DFS_NODE1”, “DFS_NODE2”, “DFS_NODE3”, and “DFS_NODE4”, run the following script on each of the node:

$ def jobDemo(n){
$ s = 0
$ for (x in 1 : n) {
$     s += sum(sin rand(1.0, 100000000)-0.5)
$     print("iteration " + x + " " + s)
$ }
$ return s
$ };

$ submitJob("jobDemo1","job demo", jobDemo, 10);
$ submitJob("jobDemo2","job demo", jobDemo, 10);
$ submitJob("jobDemo3","job demo", jobDemo, 10);

To check the status of the most recent 2 completed batch jobs on each of the 4 nodes in the cluster:

$ pnodeRun(getRecentJobs{2});

Node

UserID

JobID

JobDesc

ReceivedTime

StartTime

EndTime

ErrorMsg

DFS_NODE4

root

jobDemo2

job demo

2017.11.21T15:40:22.026

2017.11.21T15:40:22.027

2017.11.21T15:40:43.103

DFS_NODE4

root

jobDemo3

job demo

2017.11.21T15:40:22.027

2017.11.21T15:40:22.037

2017.11.21T15:40:43.115

DFS_NODE1

root

jobDemo2

job demo

2017.11.21T15:39:48.087

2017.11.21T15:39:48.088

2017.11.21T15:40:03.714

DFS_NODE1

root

jobDemo3

job demo

2017.11.21T15:39:48.088

2017.11.21T15:39:48.089

2017.11.21T15:40:03.767

DFS_NODE2

root

jobDemo2

job demo

2017.11.21T15:39:58.788

2017.11.21T15:39:58.788

2017.11.21T15:40:14.114

DFS_NODE2

root

jobDemo3

job demo

2017.11.21T15:39:58.788

2017.11.21T15:39:58.791

2017.11.21T15:40:14.178

DFS_NODE3

root

jobDemo2

job demo

2017.11.21T15:40:16.945

2017.11.21T15:40:16.945

2017.11.21T15:40:38.466

DFS_NODE3

root

jobDemo3

job demo

2017.11.21T15:40:16.945

2017.11.21T15:40:16.947

2017.11.21T15:40:38.789

$ pnodeRun(getRecentJobs{2}, `DFS_NODE3`DFS_NODE4);

Node

UserID

JobID

JobDesc

ReceivedTime

StartTime

EndTime

ErrorMsg

DFS_NODE3

root

jobDemo2

job demo

2017.11.21T15:40:16.945

2017.11.21T15:40:16.945

2017.11.21T15:40:38.466

DFS_NODE3

root

jobDemo3

job demo

2017.11.21T15:40:16.945

2017.11.21T15:40:16.947

2017.11.21T15:40:38.789

DFS_NODE4

root

jobDemo2

job demo

2017.11.21T15:40:22.026

2017.11.21T15:40:22.027

2017.11.21T15:40:43.103

DFS_NODE4

root

jobDemo3

job demo

2017.11.21T15:40:22.027

2017.11.21T15:40:22.037

2017.11.21T15:40:43.115

pnodeRun follows these rules to merge the results from multiple nodes:

(1) If the function returns a scalar:

Return a table with 2 columns: node alias and function results.

Continuing with the example above:

$ pnodeRun(getJobReturn{`jobDemo1})
Output:
Node          Value
DFS_NODE3        2,123.5508
DFS_NODE2        (42,883.5404)
DFS_NODE1        3,337.4107
DFS_NODE4        (2,267.3681)

(2) If the function returns a vector:

Return a matrix. Each column of the matrix would be the function returns from nodes. The column label of the matrix would be the nodes.

(3) If the function returns a key-value dictionary:

Return a table with each row representing the function return from one node.

(4) If the function returns a table:

Return a table which is the union of individual tables from multiple nodes.

Please see the aforementioned example of pnodeRun(getRecentJobs{2}).

(5) If the function is a command (a command returns nothing):

Return nothing

(6) For all other cases:

Return a dictionary. The key would be node alias and the value would be the function return.

Data Source

Data source is a special type of data object that contains the following information about a data entity:

1. Meta descriptions of a data entity. By executing a data source, we can obtain a materialized data entity such as table, matrix, vector, etc. In DolphinDB’s distributed computing framework, lightweight data source objects instead of large data entities are passed to remote sites for computing, which dramatically reduces the network traffic.

2. The locations of the execution venue. A data source could have 0, 1 or multiple locations. A data source with 0 location is a local data source. In case of multiple locations, these locations are backing up each other. The system randomly picks a location for distributed computing. When the data source is instructed to cache the materialized data object, the system picks the location where data were successfully retrieved last time.

3. An attribute to instruct the system to cache the data or clear the cache. For iterative computing algorithms (e.g. machine learning algorithms), data caching could significantly boost computing performance. Cached data will be cleared when the system runs out of memory. If this happens, the system can recover the data since the data source contains all meta descriptions and data transforming functions.

4. A data source object may also contain multiple data transforming functions to further process the retrieved data. These data transforming functions are executed sequentially, with the output of one function as the input (and the only input) of the next function. It is generally more efficient for the data transforming functions to be included in the data source instead of the core computing operation. While there is no performance difference if the retrieved data is needed only once, it makes a huge difference for iterative computing with data sources with cached data objects. If the transforming operations are in the core computing unit, each iteration needs to execute the transformation; if the transforming operations are in the data source, they are executed only once.

Related functions/commands:

1. The function sqlDS creates a list of data sources according to the input SQL meta code. If the data tables in the SQL query has n partitions, sqlDS generates n data sources. If the SQL query doesn’t contain any partitioned table, sqlDS returns a tuple containing one data source.

Syntax: sqlDS(metaCode)

The argument is SQL meta code. For more details about meta code please refer to the section of Metaprogramming .

$ db = database("dfs://DBSeq",RANGE,`A`F`Z);
$ USPrices = loadTextEx(db, "USPrices",`TICKER ,"D:/DolphinDB/Data/USPrices.csv");
$ ds = sqlDS(<select log(SHROUT*(BID+ASK)/2) as SBA from USPrices where VOL>0>);

$ typestr ds;
ANY VECTOR

$ size ds;
2

$ ds[0];
DataSource< select [15] log(SHROUT * (BID + ASK) / 2) as SBA from USPrices where VOL > 0 [partition = /DBSeq/A_F/40r] >

$ ds[1];
DataSource< select [15] log(SHROUT * (BID + ASK) / 2) as SBA from USPrices where VOL > 0 [partition = /DBSeq/F_Z/40r] >

2. The function transDS! adds data transforming functions to a data source or a list of data sources.

Syntax: transDS!(ds, func)

3. The function cacheDS! instructs the system to cache the data source. It returns true or false to indicate if this operation is successful.

Syntax: cacheDS!(ds)

4. The function clearDSCache! instructs the system to clear the cache after the next time the data source is executed.

Syntax: clearDSCache!(ds)

5. The function cacheDSNow immediately executes and caches the data source and returns the total number of cached rows.

Syntax: cacheDSNow(ds)

6. The function clearDSCacheNow immediately clear the data source and cache.

Syntax: clearDSCacheNow(ds)

Map-Reduce

The Map-Reduce function is the core function of DolphinDB’s generic distributed computing framework.

Syntax: mr(ds, mapFunc, [reduceFunc], [finalFunc], [parallel=true])

  • ds: the list of data sources. This required parameter must be a tuple and each element of the tuple is a data source object. Even if there is only one data source, we still need a tuple to wrap the data source.

  • mapFunc: the map function. It accepts one and only one argument, which is the materialized data entity from the corresponding data source. If we would like the map function to accept more parameters in addition to the materialized data source, we can use a Partial Application to convert a multiple-parameter function to a unary function. The number of map function calls is the number of data sources. The map function returns a regular object (scalar, pair, array, matrix, table, set, or dictionary) or a tuple (containing multiple regular objects).

  • reduceFunc: the binary reduce function that combines two map function call results. The reduce function in most cases is trivial. An example is the addition function. The reduce function is optional. If the reduce function is not specified, the system simply returns all individual map call results to the final function.

  • finalFunc: the final function accepts one and only one parameter. The output of the last reduce function call is the input of the final function. The final function is optional. If it is not specified, the system returns the individual map function call results.

  • parallel: an optional boolean flag indicating whether to execute the map function in parallel locally. The default value is true, i.e. enabling parallel computing. When there is very limited available memory and each map call needs a large amount of memory, we can disable parallel computing to prevent the out-of-memory problem. We may also want to disable the parallel option to ensure thread safety. For example, if multiple threads write to the same file simultaneously, errors may occur.

The following is an example of distributed linear regression. X is the matrix of independent variables and y is the dependent variable. X and y are stored in multiple data sources. To estimate the least square parameters, we need to calculate X T X and X T y. We can calculate the tuple of (X T X, X T y) from each data source, then add up the results from all data sources to get X T X and X T y for the entire dataset.

$ def myOLSMap(table, yColName, xColNames, intercept){
$     if(intercept)
$         x = matrix(take(1.0, table.rows()), table[xColNames])
$     else
$         x = matrix(table[xColNames])
$     xt = x.transpose()
$     return xt.dot(x), xt.dot(table[yColName])
$ }

$ def myOLSFinal(result){
$     xtx = result[0]
$     xty = result[1]
$     return xtx.inv().dot(xty)[0]
$ }

$ def myOLSEx(ds, yColName, xColNames, intercept){
$     return mr(ds, myOLSMap{, yColName, xColNames, intercept}, +, myOLSFinal)
$ }

In the example above, we define the map function and final function. In practice, we may define transformation functions for data sources as well. These functions are defined in the local instance only and they don’t exist at remote instances. Users don’t need to compile them or deploy them to the remote instances before using them. DolphinDB’s distributed computing framework handles these complicated issues for end users on the fly. It is extremely easy to develop distributed analytical functions and applications in DolphinDB.

As a frequently used analytics tool, the distributed least square linear regression is implemented in DolphinDB core library already. The built-in version( olsEx) provides more features.

Iterative Computing

Iterative computing is a commonly used computing methodology. Many machine learning methods and statistical models use iterative algorithms to estimate model parameters.

DolphinDB offers function imr for iterative computing based on the map-reduce methodology. Each iteration uses the result from the previous iteration and the input dataset. The input dataset for each iteration is unchanged so that it can be cached. Iterative computing requires initial values for the model parameters and a termination criterion.

Syntax: imr(ds, initValue, mapFunc, [reduceFunc], [finalFunc], terminateFunc, [carryover=false])

  • ds: the list of data sources. It must be a tuple with each element as a data source object. Even if there is only one data source, we still need a tuple to wrap the data source. In iterative computing, data sources are automatically cached and the cache will be cleared after the last iteration.

  • initValue: the initial values of model parameter estimates. The format of the initial values must be the same as the output of the final function.

  • mapFunc: the map function. It has two arguments. The first argument is the data entity represented by the corresponding data source. The second argument is the output of the final function in the previous iteration, which is an updated estimate of the model parameter. For the first iteration, it is the initial values given by the user.

  • reduceFunc: the binary reduce function combines two map function call results. If there are M map calls, the reduce function will be called M-1 times. The reduce function in most cases is trivial. An example is the addition function. The reduce function is optional.

  • finalFunc: the final function in each iteration. It accepts two arguments. The first argument is the output of the final function in the previous iteration. For the first iteration, it is the initial values given by the user. The second argument is the output of the reduce function call. If the reduce function is not specified, a tuple representing the collection of individual map call results would be the second argument.

  • terminateFunc: this is either a function that determines if the computation would continue, or a specified number of iterations. The termination function accepts two parameters. The first is the output of the reduce function in the previous iteration and the second is the output of the reduce function in the current iteration. If the function returns a true value, the iterations will end.

  • carryover: a Boolean value indicating whether a map function call produces a carryover object to be passed to the next iteration of the map function call. The default value is false. If it is set to true, the map function has 3 arguments and the last argument is the carryover object, and the map function output is a tuple whose last element is the carryover object. In the first iteration, the carryover object is the NULL object.

Now let’s use the example of distributed median calculation to illustrate the function imr. Assume the data is scattered on multiple nodes and we would like to calculate the median of a variable across all the nodes. First, for each data source, put the data into buckets and use the map function to count the number of data points in each bucket. Then use the reduce function to merge the bucket counts from multiple data sources. Locate the bucket that contains the median. In the next iteration, the chosen bucket is divided into smaller buckets. The iterations will finish when the size of the chosen bucket is no more than the specified number.

$ def medMap(data, range, colName){
$ return bucketCount(data[colName], double(range), 1024, true)
$ }

$ def medFinal(range, result){
$     x= result.cumsum()
$     index = x.asof(x[1025]/2.0)
$     ranges = range[1] - range[0]
$     if(index == -1)
$         return (range[0] - ranges*32):range[1]
$     else if(index == 1024)
$         return range[0]:(range[1] + ranges*32)
$     else{
$         interval = ranges / 1024.0
$         startValue = range[0] + (index - 1) * interval
$         return startValue : (startValue + interval)
$     }
$ }

$ def medEx(ds, colName, range, precision){
$     termFunc = def(prev, cur): cur[1] - cur[0] <= precision
$     return imr(ds, range, medMap{,,colName}, +, medFinal, termFunc).avg()
$ }

The pipeline function

The pipeline function optimizes tasks that meet the following conditions through multithreading: (1) Can be decomposed into multiple sub-tasks.

(2) Each subtask contains multiple steps.

(3) The k-th step of the i-th subtask can only be executed after the (k-1)-th step of the i-th subtask and the k-th step of the (i-1)-th subtask are completed.

In the following example, we need to convert the partitioned table stockData into a csv file. This table contains data from 2008 to 2018 and exceeds the available memory of the system, so we cannot load the entire table into memory and then converted it into a csv file. The task can be divided into multiple sub-tasks, each of which consists of two steps: load one month of data into memory, and then store the data in the csv file. To store the data of a month in the csv file, it must be ensured that the data of the month has been loaded into the memory, and the that data of the previous month has been stored in the csv file.

$ v = 2000.01M..2018.12M
$ def queryData(m){
$     return select * from loadTable("dfs://stockDB", "stockData") where TradingTime between datetime(date(m)) : datetime(date(m+1))
$ }
$ def saveData(tb){
$     tb.saveText("/hdd/hdd0/data/stockData.csv",',', true)
$ }
$ pipeline(each(partial{queryData}, v),saveData);