A Short Interlude on KDB Load balancing

As you might know KDB is inherently single threaded. In other words, it can do at most one thing at a time in sequence.

This is a good thing. The benefits include:

  • Ordered and inherently sequential transactions that see a consistent view of the world.
  • Fewer Cache misses since data is processed to completion instead of being rescheduled.
  • Programs are easier to reason about.

This post isn’t really about how awesome being single threaded is. There are plenty of articles about that featuring Node.js and javascript. Linked here. Also plenty of stack overflow answers about the subject and an old paper about KDB.

OED

Synchronous: ADJECTIVE

  • Existing or occurring at the same time.

Asynchronous: ADJECTIVE

  • Not existing or occurring at the same time.
  • Computing Telecommunications
    Controlling the timing of operations by the use of pulses sent when the previous operation is completed rather than at regular intervals.

In general, in this single threaded world, you simply design components that do perform some function and they go out to other services that perform other functions. If you do everything over async IPC (Inter-Process-Communication). Then you are golden because all of your q processes are responding to the events as soon as they happen. In a simple example, if you have two q processes. One is responsible for holding realtime data and it’s derived/calculated values. And the other is responsible for calculating those derived values. The updater can keep updating its table and send async events to the model calculator to recalculate. When the model recalculates it can send an async request for the updater to include some values in it’s derived model.

However, perfect systems like this don’t get built. Instead, we have many q processes using synchronous IPC. There is a reason for this –though not a good one. It is significantly easier to reason about a synchronous request. In your mind you simply model all the requests one after another and then you do something with all of the requested values.

Unfortunately, if a request takes a long time to process you can force a service to spend all its time honoring your request. Continuing the example above, suppose someone didn’t understand the initial plan and decides to calculate all the values on the realtime service, that would cause the realtime service to spend more of it’s time processing and less of it’s time updating. We can of course turn off the ability to query a realtime service like this, but that is a bit like throwing the baby out with the bath water. Synchronous requests especially for a client are much easier to understand.

So let’s start with a quick analogy that will help explain what we want to do. Synchronous messages are like phone calls.

You call me, I answer the phone you ask a question, I answer.

Asynchronous messages are like text messages.

You text me. Sometime later I read the message. I might respond.

Ideally, what we would like is to hire a secretary who intercepts incoming calls answers the phone takes a message sends it as a text and keeps the caller on the line until the text is answered at which point the secretary can respond to the caller. Since this job is pretty easy we expect that we can have one secretary serving many callers and keeping them all on the line until their query has been resolved. Unfortunately, as of q version 3.5 we still cannot do this (though it has been promised in version 3.6).

So instead we will simulate this with another mode in KDBQ called multithreaded input mode. Multithreaded input mode allows you to answer queries concurrently (up to 1020) within one q process. However, it places many limitations on what kind of queries it can execute and you don’t want heavy calculations to actually execute concurrently for the reasons described earlier. So we settle for the next best thing. A single master forwards each query to a slave, the slave executes the query returns the result to the master and the master returns the result to the client. This allows us to scale a particular service without meddling with the client code. It turns out a bit of engineering is required to make this work and it does turn the Multithreaded input mode into a router, something e documentation says it is not built to do {YMMV}. ( I have tested it and it seems to work)

DISCLAIMER

This works on linux for version 3.x and on version 3.0-3.4 on mac, it uses UNIX sockets and mac doesn’t support abstract sockets. I have implemented something similar using the fifo system as well, but it has it’s own drawbacks and won’t be described here. I’m sure something similar with files can be done in windows systems, but I haven’t spent the time working it out at this point.

Architecture

You have one master process, this master is run with a negative port number, which will put it into multithreaded input mode. It also runs with a timer turned on so it will be updating and making sure it’s slaves are alive every so N milliseconds. I have set N a thousand so every 1 seconds (this is probably too often but it makes testing faster, since you can see the result of taking a slave offline within a second).

q master.q -p -5000 -t 1000

It runs with a master script called master.q

You also need to run some slave nodes that will listen on their own individual ports. For our example we will use 4001,4002,4003,4004.

q -p 4001

Now I will explain what is inside master.q

//the unix handles of the slave processes
handles:`:unix://4001`:unix://4002`:unix://4003`:unix://4004
h: {@[{hopen x};x;0Ni]} each handles;
han:handles!h;
han:(where han=0Ni)_han;
/ 
 we will define a function that updates all of
 open handles, so that none of them are stale. This will be executed
 in the main thread, which means that all of the slaves should 
 have returned by this point. So if they don't respond it means they
 are dead.
\
.z.ts:{ 
    alive:{@[{x (=;1b;1b)};x;0b]} each han;
    $[all[han] & count [han]=count[handles] ;;
      [dead:where not alive; hclose each han[key dead];
       inactive: handles where (key han) not in handles;
       h:{@[{hopen x};x;0Ni]} each (dead,inactive);
       hd:(where hd=0Ni)_hd:(dead,inactive)!h;
       `han upsert hd;]]};
/
Because each concurrent handle is unique we can assign a slave 
by binning the request handles over the number of active slaves.
In other words we are round robining over the slaves.
\
.z.pg:{[m] (first value (.z.w mod count han)_han) m};
.z.ps:.z.pg;

The reason we have to do all this work, is that any command that changes the global state is illegal and therefore can’t be in .z.pg, and though hopen isn’t illegal it pauses all of the concurrency for all the threads because it effectively changes global state.

As a consequence a serious limitation of this approach is that if a client opens a connection with the this multithreaded gateway while querying they can lock up all the other concurrent requests. In practice this is done by using the connection string query instead of hopen.

For example:

::5000 "2+2"

is equivalent to

h:hopen `::5000; h "2+2"

but the first case will lock up the multithreaded gateway and the second won’t.

Secondly this gateway does not support asynchronous requests with callbacks. That is you can call the async request but you won’t be able to make sure the host calls you back. There are probably some ugly workarounds, like including your host and port in the function.

This is a fully functional load balancer and allows clients to continue to write synchronous code without locking your servers. Technically each of these slaves can actually forward the requests onto other machines, since the slaves are not actually limited.

Advertisement

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s