createPartitionedTable

Syntax

createPartitionedTable(dbHandle, table, tableName, [partitionColumns], [compressMethods], [sortColumns], [keepDuplicates=ALL], [sortKeyMappingFunction])

Arguments

The parameters sortColumns, keepDuplicates and sortKeyMappingFunction take effect only in a TSDB storage engine (i.e., database().engine = TSDB).

dbHandle the distributed database where the partitioned table will be saved. The database can be either in the local file system, or in the distributed file system. If the first parameter directory for dbHandle is unspecified, the result is an in-memory partitioned table.

table a table or a list of tables. The schema of table will be used to construct the new partitioned table.

tableName a string indicating the name of the distributed table to be saved on disk, or the name of the in-memory partitioned table.

partitionColumns a string or a string vector indicating the partitioning column(s). For a composite partition, partitionColumns is a string vector.

compressMethods a dictionary indicating which compression methods are used for specified columns. The keys are columns name and the values are compression methods (“lz4” or “delta”). If unspecified, use LZ4 compression method. Please note:

  • The delta compression method can be used for DECIMAL, SHORT, INT, LONG or temporal data types.

  • Save strings as SYMBOL type to enable compression of strings.

sortColumns is a STRING scalar/vector that specifies the columns to sort the ingested data within each partition. The sort columns must be of integral, temporal, string, or symbol type. Note that sortColumns is not necessarily consistent with the partitioning column.

  • If multiple columns are specified for sortColumns, the last column must be a time column. The preceding columns are used as the sort keys and they cannot be of TIME, TIMESTAMP, NANOTIME, or NANOTIMESTAMP type.

  • If only one column is specified for sortColumns, the column is used as the sort key, and it can be a time column or not. If the sort column is a time column and sortKeyMappingFunction is specified, the sort column specified in a SQL where condition can only be compared with temporal values of the same data type.

  • It is recommended to specify frequently-queried columns for sortColumns and sort them in the descending order of query frequency, which ensures that frequently-used data is readily available during query processing.

  • The number of sort key entries (which are unique combinations of the values of the sort keys) within each partition may not exceed 1000 for optimal performance. This limitation prevents excessive memory usage and ensures efficient query processing.

keepDuplicates specifies how to deal with records with duplicate sortColumns values. It can have the following values:

  • ALL: keep all records;

  • LAST: only keep the last record;

  • FIRST: only keep the first record.

It is recommended to specify the sortKeyMappingFunction parameter if there are many sort keys in a partition of a TSDB database and a small number of records with the same sort key. After dimensionality reduction, the blocks in a TSDB level file can store more data, which not only reduces the frequency of reading data blocks and disk I/O during query, but also improves the data compression ratio.

sortKeyMappingFunction is a vector of unary functions. It has the same length as the number of sort keys. The specified mapping functions are applied to each sort key (i.e., the sort columns except for the temporal column) for dimensionality reduction.

After the dimensionality reduction for the sort keys, records with a new sort key entry will be sorted based on the last column of sortColumns (the temporal column).

Note:

  • Note that dimensionality reduction is performed when writing to disk, so specifying this parameter may affect write performance.

  • The functions specified in sortKeyMappingFunction correspond to each and every sort key. If a sort key does not require dimensionality reduction, leave the corresponding element empty in the vector of sortKeyMappingFunction.

  • If a mapping function is hashBucket AND the sort key to which it applies is a HASH partitioning column, make sure the number of Hash partitions is not divisible by hashBucket().buckets (or vice versa), otherwise the column values from the same HASH partition would be mapped to the same hash bucket after dimensionality reduction.

Details

Create an empty partitioned table on disk or in memory with the same schema as the specified table. To create a table on disk, parameter table must be a table. To create an in-memory partitioned table, parameter table can be a table or a tuple of tables.

  • If the parameter table is a table: generate an empty partitioned table on disk with the schema of the model table. This function is used with append! or tableInsert to generate a partitioned table. It cannot be used to create a partitioned table with sequential domain.

  • If the parameter table is a list of tables: create an in-memory partitioned table. The number of tables given by the parameter table must be the same as the number of partitions in the database.

Note:

  • Only the schema of table is used. None of the rows in table is imported to the newly created partitioned table.

  • In a distributed OLAP database, the maximum number of handles (including temporary handles*) to partitioned tables is 8,192 per node. For the TSDB storage engine, there is no limit.

*temporary handles: If no handle is specified when you create a partitioned table in a distributed database with createPartitionedTable, each database creates a temporary handle to hold the return value. If you create multiple tables under the same database, the temporary handle for the database is overwritten each time.

Examples

Example 1. Create a DFS table

Example 1.1. Create a DFS table in OLAP database

$ n=1000000;
$ t=table(2020.01.01T00:00:00 + 0..(n-1) as timestamp, rand(`IBM`MS`APPL`AMZN,n) as symbol, rand(10.0, n) as value)
$ db = database("dfs://rangedb_tradedata", RANGE, `A`F`M`S`ZZZZ)
$ Trades = db.createPartitionedTable(table=t, tableName="Trades", partitionColumns="symbol", compressMethods={timestamp:"delta"});

At this point, the table Trades is empty. The schema of Trades is the same as the schema of table t. Next, we append table t to table Trades.

$ Trades.append!(t);

Now the contents of table Trades have been updated on disk. In the local file system, the system doesn’t dynamically refresh the contents of tables. We need to load the table into memory before we can work with it interactively.

$ Trades=loadTable(db,`Trades);
$ select min(value) from Trades;
0

The in-memory table t is saved as a DFS table Trades on disk.

After appending data to a DFS table, we don’t need to use function loadTable to load the table before querying the table, as the distributed file system automatically refreshes the table after appending operations. After system restarts, however, we need to use loadTable to load a DFS table before querying the table.

Example 1.2. Create a DFS table in TSDB database

$ n = 10000
$ SecurityID = rand(`st0001`st0002`st0003`st0004`st0005, n)
$ sym = rand(`A`B, n)
$ TradeDate = 2022.01.01 + rand(100,n)
$ TotalVolumeTrade = rand(1000..3000, n)
$ TotalValueTrade = rand(100.0, n)
$ schemaTable_snap = table(SecurityID, TradeDate, TotalVolumeTrade, TotalValueTrade).sortBy!(`SecurityID`TradeDate)

$ dbPath = "dfs://TSDB_STOCK"
$ if(existsDatabase(dbPath)){dropDatabase(dbPath)}
$ db_snap = database(dbPath, VALUE, 2022.01.01..2022.01.05, engine='TSDB')
$ snap=createPartitionedTable(dbHandle=db_snap, table=schemaTable_snap, tableName="snap", partitionColumns=`TradeDate, sortColumns=`SecurityID`TradeDate, keepDuplicates=ALL, sortKeyMappingFunction=[hashBucket{,5}])
$ snap.append!(schemaTable_snap)
$ flushTSDBCache()
$ snap = loadTable(dbPath, `snap)
$ select * from snap

Example 2. Create in-memory partitioned tables

Example 2.1. Create a partitioned in-memory table

$ n = 200000
$ colNames = `time`sym`qty`price
$ colTypes = [TIME,SYMBOL,INT,DOUBLE]
$ t = table(n:0, colNames, colTypes)
$ db = database(, RANGE, `A`D`F)
$ pt = db.createPartitionedTable(t, `pt, `sym)

$ insert into pt values(09:30:00.001,`AAPL,100,56.5)
$ insert into pt values(09:30:01.001,`DELL,100,15.5)

Example 2.2. Create a partitioned keyed table

$ n = 200000
$ colNames = `time`sym`qty`price
$ colTypes = [TIME,SYMBOL,INT,DOUBLE]
$ t = keyedTable(`time`sym, n:0, colNames, colTypes)
$ db = database(, RANGE, `A`D`F)
$ pt = db.createPartitionedTable(t, `pt, `sym)

$ insert into pt values(09:30:00.001,`AAPL,100,56.5)
$ insert into pt values(09:30:01.001,`DELL,100,15.5)

Example 2.3. Create a partitioned stream table

Please note that when creating a partitioned stream table, the second parameter of createPartitionedTable must be a tuple of tables, and its length must be equal to the number of partitions. Each table in the tuple represents a partition. In the following example, trades_stream1 and trades_stream2 form a partitioned stream table trades. We can’t directly write data to trades. Instead, we need to write to trades_stream1 and trades_stream2.

$ n=200000
$ colNames = `time`sym`qty`price
$ colTypes = [TIME,SYMBOL,INT,DOUBLE]
$ trades_stream1 = streamTable(n:0, colNames, colTypes)
$ trades_stream2 = streamTable(n:0, colNames, colTypes)
$ db=database(, RANGE, `A`D`F)
$ trades = createPartitionedTable(db,[trades_stream1, trades_stream2], "", `sym)

$ insert into trades_stream1 values(09:30:00.001,`AAPL,100,56.5)
$ insert into trades_stream2 values(09:30:01.001,`DELL,100,15.5)

$ select * from trades;

time

sym

qty

price

09:30:00.001

AAPL

100

56.5

09:30:01.001

DELL

100

15.5

Example 2.4. Create a partitioned MVCC table

Similar to creating a partitioned stream table, to create a partitioned MVCC table, the second parameter of createPartitionedTable must be a tuple of tables, and its length must be equal to the number of partitions. Each table in the tuple represents a partition. In the following example, trades_mvcc1 and trades_mvcc2 form a partitioned MVCC table trades. We can’t directly write data to trades. Instead, we need to write to trades_mvcc1 and trades_mvcc2.

$ n=200000
$ colNames = `time`sym`qty`price
$ colTypes = [TIME,SYMBOL,INT,DOUBLE]
$ trades_mvcc1 = mvccTable(n:0, colNames, colTypes)
$ trades_mvcc2 = mvccTable(n:0, colNames, colTypes)
$ db=database(, RANGE, `A`D`F)
$ trades = createPartitionedTable(db,[trades_mvcc1, trades_mvcc2], "", `sym)

$ insert into trades_mvcc1 values(09:30:00.001,`AAPL,100,56.5)
$ insert into trades_mvcc2 values(09:30:01.001,`DELL,100,15.5)

$ select * from trades;

time

sym

qty

price

09:30:00.001

AAPL

100

56.5

09:30:01.001

DELL

100

15.5