Mechanics of the As-Of-Join Primitive

Recently, I had the necessity to perform an operation that didn’t exist in KDB, but whose scent reminded me of a primitive provided in KDB – As Of Join (aj). I will sketch the problem that motivated me. Then I will describe what the As Of Join operation does and how it is implemented in KDB. Finally, I will describe the new verb I created that solves the original problem.

Primal Scenario:

We have the following tables produced by two systems in the same company. A table of user interactions and another table from an operational system recording transactions. To make this a bit more concrete, our operational system manages inventory in a particular warehouse and so has the following columns: Date,ItemID,Quantity,TimeStamp. Here are 10 sample rows and code to generate it:

/
For the sake of making things a bit narrower, 
I am truncating a GUID with the following helper functions
\
q)trunID:{`$8#string x}
q)genID:{trunID each x?0Ng}
q)sysTable:([] Date:20170805+til 10;
      ItemId:genID 10;
      Qty:10?10;
     timestamp:00:00:00+10?100000)
q)show sysTable
Date     ItemId   Qty timestamp
-------------------------------
20170805 72de67b9 2   14:27:23
20170806 02e583e3 6   23:43:36
20170807 cffa7aa8 4   07:42:14
20170808 327b47a8 1   26:28:04
20170809 d1b9ccd1 0   11:48:17
20170810 6cb019b3 2   04:59:38
20170811 bb466f80 9   17:54:00
20170812 70112cc0 6   00:54:33
20170813 c9d56ab5 0   22:19:31
20170814 96b75d97 8   10:20:35

The user interactions table keeps track of user related events. It has the following columns: UserId, Date, EventType, ItemId, Qty, timestamp. Here are 10 sample rows and code to generate it:

q)userTable:([]UserId:genID 10;
           Date:20170805+til 10;
           EventType:10#`buy;
           ItemId:genID 10;
           Qty:10?10;
           timestamp:00:00:00+10?100000)
q)show userTable

UserId   Date     EventType ItemId   Qty timestamp
--------------------------------------------------
88c5bbf5 20170805 buy       b19c6770 6   25:51:08 
42eeccad 20170806 buy       f12c9963 6   01:31:20 
c5510eba 20170807 buy       9ee93847 1   22:36:58 
5f344195 20170808 buy       7bfa7efa 8   20:50:09 
9a4f68c6 20170809 buy       24e2f268 5   16:27:03
26ba3a3e 20170810 buy       94922173 4   17:13:45 
c046e464 20170811 buy       4dec3017 9   13:16:27 
0e09baa4 20170812 buy       f5f7b90e 2   24:29:14 
a9f44e57 20170813 buy       0cb8391a 7   07:38:13 
1e1f2a49 20170814 buy       2df03fb0 0   23:10:12 
We will remove all rows that don’t have EvenType=`buy, so we can ignore this column from now on, but I left it in as an example of the kind of data cleaning you might perform. Also for the rest of this tutorial, I create a universe of itemIDs and userIds that I sample from this creates a more realistic simulation where itemIds will overlap in the two tables.  itemids:genID 1000; userids:genID 100000

We want to perform some analytics to figure out which warehouses we should use for different products based on user geography. To do this we need to join the inventory data to the user data. Unfortunately, there is nothing that links these two tables together, these systems are independent even though they share the same real world events.  You work with what you’ve got. Since these events are connected, if we see a transaction in the user table that has the same date, same ItemId and same Qty we can be pretty sure that it is the same event. This can be performed with a simple left-join like so:

/This join takes less than half a second on 1 million rows 
/in both tables
q)\t userTable lj `ItemId`Date`Qty xkey sysTable
453

However, our company actually has many, many users so that just using Date and ItemId and Qty is going to get many possible matches of which KDB will always choose the first. This is exacerbated by the fact that most times Qty is 1. So we need to incorporate the timestamp column to get better matching results.  We would like to match two rows if Date ItemId and Qty match and the timestamps are sufficiently close.  In particular, we would like to take the first match from the userTable where everything else being equal the timestamps are within 5 minutes of each other (assume that if they are outside this window, some other system must be responsible for that row).

The As Of Join Primitive for TimeSeries analysis

The idea behind As Of Join is that you can look at the state of universe in another table as of a certain time. The most common example involves joining a trades table to a quote table. I won’t belabour that example. Instead, I’ll translate this into our fictional internet store company.  I want to know what the inventory was as of a particular time when a product was ordered. I have an inventory table:

q)invTable:([]ItemId:i#itemids;
    Qty:i?1000
    DateTimeStamp: 
       (i#til 10)+(2017.08.05D00:00:00.000+i?1000000000000000))
/let's sort it in asc order of the DateTimeStamp
q)invTable:`DateTimeStamp xasc invTable
q)show invTable
ItemId   Qty DateTimeStamp                
------------------------------------------
30c86bca 375 2017.08.05D00:00:00.075460419
73d8673e 772 2017.08.05D00:00:00.114482831
2e6d75b2 166 2017.08.05D00:00:00.330689365
6c55cdcf 860 2017.08.05D00:00:00.336812816
a3715f66 364 2017.08.05D00:00:00.457884749
9abd4b9f 396 2017.08.05D00:00:00.726431612
89e2e90e 37  2017.08.05D00:00:00.795442611
c7891977 999 2017.08.05D00:00:00.817328697

Given  an ItemId and DateTimeStamp we would like to know how much of that Item we had as of that Time.

In other words, we want the Qty for that ItemId where the time is as close as possible to the given time but no greater.

To make the problem a bit more interesting, we can imagine that we have a table of all the products and we want all the inventory as of 2017.08.07D00.00.00. We can do this pretty easily:

/First generate a table with all of the products and that timestamp;
q)queryTable:([Items:itemids]DateTimeStamp:2017.08.07D00.00.00)
q)show queryTable
q)show queryTable
ItemId   DateTimeStamp                
--------------------------------------
d44f9458 2017.08.07D00:00:00.000000000
aa25ba3a 2017.08.07D00:00:00.000000000
beec200c 2017.08.07D00:00:00.000000000
6e9b637f 2017.08.07D00:00:00.000000000
70c3756c 2017.08.07D00:00:00.000000000
a8f4636b 2017.08.07D00:00:00.000000000
befc6ca7 2017.08.07D00:00:00.000000000

/
 Then we put the columns we want to join 
 in order from least frequent to most frequent
 with the time column last. 
 Then the two tables, query table first
\
q)aj[`ItemId`DateTimeStamp;queryTable;invTable]
ItemId   DateTimeStamp                 Qty
------------------------------------------
d44f9458 2017.08.07D00:00:00.000000000 143
aa25ba3a 2017.08.07D00:00:00.000000000 774
beec200c 2017.08.07D00:00:00.000000000 925
6e9b637f 2017.08.07D00:00:00.000000000 711
70c3756c 2017.08.07D00:00:00.000000000 447
a8f4636b 2017.08.07D00:00:00.000000000 190
/
If we want to see when the time is that is matching 
we can use it's sister command
aj0
\
q)aj0[`ItemId`DateTimeStamp;queryTable;invTable]
ItemId DateTimeStamp Qty
------------------------------------------
d44f9458 2017.08.06D03:43:09.477791637 143
aa25ba3a 2017.08.06D03:45:38.244558871 774
beec200c 2017.08.06D08:46:36.299622582 925
6e9b637f 2017.08.06D19:46:02.531682106 711
70c3756c 2017.08.06D07:43:15.845127668 447
a8f4636b 2017.08.06D03:44:03.353382316 190

/
We can verify that all the times are before 
 midnight on the 7th of august
\

Let’s now look at the internal mechanics of how this actually works.

First KDB finds all the candidate matches based on all the column till the last one. Then the aj verb makes use of the binary search function (bin) in KDB. Bin takes two arguments of the same type, the first is a sorted list in ascending order, the second is either a list or an item. For each second argument it returns the index that is equal or less in the first argument. A simple example will illustrate this:

q)0 1 2 3 4 5 6 7 bin 4
4
q)0 1 2 3.5 4 5 6 7 bin 3.0
2
q)0 1 2 3.5 4 5 6 7 bin 0.1 0.2 1.9 3.6
0 0 1 3

So assuming the candidate matches are sorted, picking out the latest as of the query time becomes as simple as performing binary search on each row we are trying to match against the candidate matches, which is exactly how KDB does it.

Another way of thinking about “bin” is that it returns the last element from a set of candidate matches, where match means being before the current element.

With this view in mind, it should be clear that we can’t use As Of Join to solve the Primal Scenario. Since as-of-join essentially gives you the last row that is before the time you are asking, whereas we are looking for the first row.  If we consider shifting the timestamps so that all the candidates within the tolerance are before the matching row’s timestamp we would still get the last match instead of the first one.

Luckily, there is another function called binr that returns the first index that is equal or greater to the query. Which leads us to:

q)0 1 2 3 4 5 6 7 binr 4
4
q)0 1 2 3.5 4 5 6 7 binr 3.0
3
q)0 1 2 3.5 4 5 6 7 binr 0.1 0.2 1.9 3.6
1 1 2 4

Since binr returns the first index that is equal or greater, we can think of it as returning the first match from a set of candidates where matches are determined as being greater or equal to the element being matched. This bin right function, so named because it returns the binary search from the right, is now enough to go and solve the primal scenario.

A New Verb AJR as of join right

Let’s take a quick look how aj is implemented:

k){.Q.ft[{d:x_z;$[&/j:-1<i:(x#z)bin x#y;y,'d i;+.[+.Q.ff[y]d;(!+d;j);:;.+d i j:&j]]}[x,();;0!z]]y}

Sure enough there is the bin verb right in the middle of the verb.

So we can create another verb called ajr and define it like this:

k){.Q.ft[{d:x_z;$[&/j:-1<i:(x#z)binr x#y;y,'d i;+.[+.Q.ff[y]d;(!+d;j);:;.+d i j:&j]]}[x,();;0!z]]y}

A quick note, the easiest way is to simply go into k mode by typing one backslash. But in a script it’s easier to just to write:

k)ajr:{.Q.ft[{d:x_z;$[&/j:-1<i:(x#z)binr x#y;
     y,'d i;+.[+.Q.ff[y]d;(!+d;j);:;.+d i j:&j]]}[x,();;0!z]]y}

or

"k" "ajr:{.Q.ft[{d:x_z;$[&/j:-1<i:(x#z)binr x#y;
     y,'d i;+.[+.Q.ff[y]d;(!+d;j);:;.+d i j:&j]]}[x,();;0!z]]y}"

However, you define this verb, you can now answer the question, what was the inventory like immediately after 2017.08.07D00:00:00.000000000 at the first recorded point.

q)ajr[`ItemId`DateTimeStamp;queryTable;invTable]
ItemId   DateTimeStamp                 Qty
------------------------------------------
d44f9458 2017.08.07D00:00:00.000000000 38 
aa25ba3a 2017.08.07D00:00:00.000000000 5 
beec200c 2017.08.07D00:00:00.000000000 723
6e9b637f 2017.08.07D00:00:00.000000000 783
70c3756c 2017.08.07D00:00:00.000000000 56 
a8f4636b 2017.08.07D00:00:00.000000000 112
befc6ca7 2017.08.07D00:00:00.000000000 871
c8bc95e4 2017.08.07D00:00:00.000000000 586

/Similarly we can define the ajr0 verb as follows:
/ and that way we can see what time it matched
q)k)ajr0:{.Q.ft[{d: z;$[&/j:-1<i:(x#z)binr x#y;
    y,'d i;+.[+.Q.ff[y]d;(!+d;j);:;.+d i j:&j]]}[x,();;0!z]]y}
q)ajr0[`ItemId`DateTimeStamp;queryTable;invTable]
ItemId   DateTimeStamp                 Qty
------------------------------------------
d44f9458 2017.08.12D15:17:13.710407838 38 
aa25ba3a 2017.08.11D22:32:05.314057246 5 
beec200c 2017.08.13D03:55:10.402670877 723
6e9b637f 2017.08.11D19:32:36.058830770 783
70c3756c 2017.08.12D18:53:29.659736235 56 
a8f4636b 2017.08.15D10:47:32.423256268 112
befc6ca7 2017.08.09D03:48:48.974271199 871
c8bc95e4 2017.08.10D12:42:14.533837072 586

 

So returning back to our primal scenario, all we need to do is add the tolerance to the userTable timestamp. Then we can use this verb to join on all the previous columns and and take the first match within the tolerance.

/
first I will create another column 
that is 5 minutes offset into the future to match on
\
q)update timestamp:timestamp+00:05,
  timestampOriginal:timestamp from `userTable
q)show userTable
UserId   Date   EventType ItemId Qty timestamp timestampOriginal
--------------------------------------------------------------------
1faaa20e 20170805 buy 992098f0 7 19:58:02 19:53:02 
a419e88a 20170806 buy 327d41d0 8 15:12:55 15:07:55 
79cdd3d7 20170807 buy bc8ff04f 0 00:38:48 00:33:48 
4155154c 20170808 buy b7318a04 3 04:21:50 04:16:50 
a22652c8 20170809 buy 9e35c5a0 7 09:01:45 08:56:45 
277f1b93 20170810 buy 8cc964a4 5 07:56:52 07:51:52 
a7966916 20170811 buy c2c5bfaf 9 22:59:05 22:54:05 

/
Then we will perform the new AS OF JOIN RIGHT
\
q)ajr[`Date`ItemId`Qty`timestamp;sysTable;userTable]
Date ItemId Qty timestamp UserId EventType timestampOriginal
--------------------------------------------------------------------
20170805 2fc6d170 5 00:50:21 a5732847 buy 07:45:49 
20170805 78bc9724 5 00:54:57 3f039eb6 buy 04:12:17 
20170805 3135e6a8 8 00:57:40 705b75d5 buy 05:03:23 
20170805 4a4b9e36 9 01:42:09 6987be4e buy 02:00:52 
20170805 a6b5e1f3 5 01:59:56 3850deb3 buy 05:53:19 
20170805 d9d0a16c 9 02:12:31 620bda64 buy 10:39:09 
20170805 7c2652a7 3 02:30:49 7af80798 buy 04:19:24 
20170805 c90c6086 5 02:42:07 d4256817 buy 04:55:00 
20170805 be19e79f 6 02:43:57 5c0d3d58 buy 09:38:14 
20170805 264e5489 9 03:36:12 82b8912f buy 04:20:57 
20170805 0ba0bfd2 0 04:01:53 4f996c1d buy 16:11:22 
20170805 cb3e189d 0 04:07:03 675cd3b7 buy 08:43:38 
20170805 f2c41bca 6 04:36:03 855525d7 buy 09:33:50 
20170805 8eec99e2 2 04:36:26 15cab5af buy 05:07:27 
20170805 34894f41 9 04:42:40 ba2ca22d buy 06:11:16 

/It will match to the first at or after the timestamp,
 since we pushed the timestamp up 5 minutes it will be from within 
the tolerance to possibly the last index if no matches.
The issue is that we can now be outside the tolerance 
on the other side, to prevent this we can simply filter
\
q)joined:ajr[`Date`ItemId`Qty`timestamp;sysTable;userTable]
/
We filter where the matches exist and 
timestamps are within the tolerance
\
q)select from joined where not null UserId,00:05>timestampOriginal-timestamp
Date ItemId Qty timestamp UserId EventType timestampOriginal
--------------------------------------------------------------------
20170805 ac2dbea6 1 07:21:21 99303d09 buy 07:20:27 
20170805 b842007c 2 11:05:40 5171565d buy 11:04:20 
20170805 2d948578 7 19:10:14 75344162 buy 19:13:26 
20170805 c061f673 5 23:20:43 40cf305a buy 23:20:55 
20170805 9396fa95 2 24:25:48 d7482537 buy 24:22:43 
20170806 a73e1906 0 04:45:55 5c7ebb13 buy 04:43:15 
20170806 49ad69e9 6 08:16:19 c6df7f37 buy 08:18:55 
20170806 931226c9 3 20:39:59 30789ba9 buy 20:34:59 
20170807 39e53bb2 9 03:43:39 ccc71cae buy 03:47:14 
20170807 40f3f6de 8 05:56:22 fc967f51 buy 05:59:30 

That’s it.

here is all the code to simulate this:

trunID:{`$8#string x};
genID:{trunID each x?0Ng};
i:10000;
itemids:genID 1000;
userids:genID 100000;
sysTable:([] Date:i#20170805+til 100;
 ItemId:i?itemids;
 Qty:i?10;
 timestamp:00:00:00+i?100000);
sysTable:`Date`timestamp xasc sysTable;
i:10000000
userTable:([]UserId:i?userids;
 Date:i#20170805+til 100;
 EventType:i#`buy;
 ItemId:i?itemids;
 Qty:i?10;
 timestamp:00:00:00+i?100000);

k)ajr0:{.Q.ft[{d: z;$[&/j:-1<i:(x#z)binr x#y;y,'d i;+.[+.Q.ff[y]d;(!+d;j);:;.+d i j:&j]]}[x,();;0!z]]y};
k)ajr:{.Q.ft[{d:x_z;$[&/j:-1<i:(x#z)binr x#y;y,'d i;+.[+.Q.ff[y]d;(!+d;j);:;.+d i j:&j]]}[x,();;0!z]]y};
update timestamp:timestamp+00:05,
 timestampOriginal:timestamp from `userTable;
userTable:`Date`timestamp xasc userTable;

joined:ajr[`Date`ItemId`Qty`timestamp;sysTable;userTable]
show select from joined where not null UserId,00:05>timestampOriginal-timestamp;

Advertisement

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s