A Better Way To Load Data into Microsoft SQL Server from Pandas

This post is not going to have much theory.

Here was my problem. Python and Pandas are excellent tools for munging data but if you want to store it long term a DataFrame is not the solution, especially if you need to do reporting. That’s why Edgar Codd discovered, and Michael Stonebreaker implemented, relational databases.

Obviously, if I had the choice I wouldn’t be using Microsoft SQL Server [MSS]. If I could, I would do this whole exercise in KDB, but KDB is expensive and not that good at storing long strings. Other relational databases might have better integration with Python, but at an enterprise MSS is the standard, and it supports all sorts of reporting.

So my task was to load a bunch of data about twenty thousand rows — in the long term we were going to load one hundred thousand rows an hour — into MSS.

Pandas is an amazing library built on top of numpy, a pretty fast C implementation of arrays.

Pandas has a built-in to_sql
method which allows anyone with a pyodbc engine to send their DataFrame into sql. Unfortunately, this method is really slow. It creates a transaction for every row. This means that every insert locks the table. This leads to poor performance (I got about 25 records a second.)

So I thought I would just use the pyodbc driver directly. After all, it has a special method for inserting many values called executemany. The MSS implementation of the pyodbc execute  many also creates a transaction per row. So does pymssql.
I looked on stack overflow, but they pretty much recommended using bulk insert.Which is still the fastest way to copy data into MSS. But it has some serious drawbacks.

For one, bulk insert needs to have a way to access the created flat file. It works best if that access path is actually a local disk and not a network drive. It also is a very primitive tool, so there is very little that you can do in terms of rollback and it’s not easily monitored. Lastly, transferring flat files, means that you are doing data munging writing to disk, then copying to another remote disk then putting the data back in memory. It might be the fastest method, but all those operations have overhead and it creates a fragile pipeline.

So if I can’t do bulk insert, and I can’t use a library. I have to roll my own.

It’s actually pretty simple:

 

 

You create your connection, I did this with sqlalchemy but you can use whatever:


import sqlalchemy
import urllib
cxn_str = "DRIVER={SQL Server Native Client 11.0};SERVER=server,port;DATABASE=mydb;UID=user;PWD=pwd"
params = urllib.quote_plus(cxn_str)
engine = sqlalchemy.create_engine("mssql+pyodbc:///?odbc_connect=%s" % params)
conn = engine.connect().connection
cursor = conn.cursor()

You take your df.
I’m going to assume that all of your values in the df are strings, if they are not.
And you have longs, you are going to need to do some munging to make sure that when we stringfy the values sql knows what’s up.
(so if you have a long, then you need to chop the L off because MSS doesn’t know that 12345678910111234L is a number. here is the regex to get rid of it: re.sub(r”(?<=\d)L”,”,s ) credit to my coworker who is a regex wiz.

We need to tuplify our records, Wes Mckinney says:

records = [tuple(x) for x in df.values]

Which is good enough for me. But we also need to stringfy them as well.

So that becomes:

records = [str(tuple(x)) for x in df.values]

MSS has a batch insert mode that supports up to 1000 rows at a time. Which means 1000 rows per transaction.

We need the sql script that does batch insertion: we’ll call that insert_

insert_ = &quot;&quot;&quot;

INSERT INTO mytable
(col1
,col2
,col3
...)
VALUES

&quot;&quot;&quot;

Since we can only insert 1000 rows. We need to break up the rows into 1000 row batches. So here is a way of batching the records.
My favorite solution comes from here.

def chunker(seq, size):
return (seq[pos:pos + size] for pos in xrange(0, len(seq), size))

Now all we need to do is have a way of creating the rows and batching.
Which becomes:

for batch in chunker(records, 1000):
rows = ','.join(batch)
insert_rows = insert_ + rows
cursor.execute(insert_rows)
conn.commit()

That’s it.
This solution got me inserting around 1300 rows a second. A 1.5 orders of magnitude increase.

There are further improvements for sure, but this will easily get me past a hundred thousand rows an hour.

Advertisements

Kafka On the Shore: My Experiences Benchmarking Apache Kafka Part II

This is part II of a series on Benchmarking Kafka Part I can be found here:

In the first part we used Spark to blast a 2gb csv file of 10 million rows into a three machine Kafka cluster. We got the speeds down to about 30 seconds. Which means it would take about 4 hours to blast a Terabyte. Which is fast, but not blazing fast.

Any number I put here  will become obsolete within a year, perhaps sooner. Nevertheless, I’ll put myself out there. If on modest hardware we could achieve 1 terabyte in 40 minutes that would be enough I think to impress some people. which is about 400mb/s

Now again, because of Kafka’s memory flush cycle. We can only get the speed we want up to 8gb per machine. Really less, because there is some Ram usage by the os itself and any other applications running on those machines, including in my case Spark usage.  So conservatively we can try and get 4gb per machine. At 400mb/s for two minutes straight.

Using some tricks, this kind of throughput can be accomplished on pretty modest hardware.

  1. no replication
  2. partitioning

Now the hard part is finding a machine gun that can fire those messages that fast. A distributed solution seems like the best move and replicates real world type of messaging many sources each blasting away messages.

So I fire up a spark instance and load in a large csv file of 10 million rows ~1.8gb. I re-partition the data set to take advantage of the number of cores available to me. And then I run the mapPartitions function, which allows each partition to independently of all others blast kafka with all of it’s messages, eliminating much of the overhead.

I then get a sustained message blast of about 200mb without the machine falling over.

[my interest in benchmarking kafka has been temporarily put to rest. I no longer have a kafka cluster at my disposal and looking at more local messaging solutions; both are zero broker:

Zero MQ

Aeron]

Kafka On the Shore: My Experiences Benchmarking Apache Kafka Part I

Recently, I have become a fan of Kappa Architecture. After seeing a bunch of videos and slides by Martin Kleppmann

The premise is pretty simple instead of having a batch process and a stream process. Build a stream process that is so fast it can handle batches.
Sounds simple. Well, how fast does it need to go before the difference between batch and stream is erased.

Take a simple example. Suppose I was going to start building a data processing application. In theory, I can build the following structure: A batch framework that only needs to work once and then a streaming framework that will work once the intial load is complete, and I only need to maintain the streaming framework, once I’m up and running.

Except, wait! The relevant quote is
In Theory, everything works in practice, 

or my other favorite

Theory is when you know everything and nothing works. Practice is when everything works but no one knows why.  In our lab theory and practice are combined: nothing works and no one knows why.

Anyway, here are three simple reasons why you this strategy fails.

  1. It’s possible that some producers can only produce in batches.
  2. Perhaps you need to reprocess the data after you change your black box
  3. You acquire new producers that you need to onboard.

Alright, so clearly you either need to maintain some kind of working batch processing framework. But what you don’t want is to maintain two sets of procedures that both need to be updated. That’s effectively de-normalization and it is a bad idea in databases and it is a bad idea in code (though hardly anyone talks about it).

This leads to Kappa, what if you could process everything as a stream even batches.

I imagine that this application is an island. The only way to reach the island is a stream, so if you have a truck with a ton of stuff ie a batch. We are going to unload the truck as fast as possible and send all of the stuff on tiny canoes.

So to test if this going to be scalable I decided to try some of the simplest batch files I knew. CSVs, everyone hates them but almost every system can produce them. Especially if by Comma you mean any string delimiter. Even log files can be thought of as a special case of a CSV. So can Json.

CSVs are annoying for several reasons, firstly they aren’t fixed width so there isn’t a really nice way to seek through the file without reading it. Secondly, it provides virtually no schema information.

Alright no matter, let’s see how fast we can load some pretty hefty csvs into kafka.

I try reading the csv with Python and find that on a pretty lightweight machine:

4 core, 8gb, 100gb, I can read a csv file at 100,000 lines per second. That’s pretty fast.

A 10 million row file with 30 columns can be read in 100 seconds, ~ 2gb.

That give us a gigabyte per minute. However, after adding kafka messaging overhead and playing with the batching. I can only produce about 8000 messages per second. 2 orders of magnitude slowdown.

##imports

import json as json
from kafka import SimpleProducer, KafkaClient
from datetime import datetime as dt

kafka = KafkaClient([“host:9092”])
producer = SimpleProducer(kafka, async=True, batch_send_every_n=2000, batch_send_every_t=20)
def kafkasend(message):
packing = {“timestamp”:str(dt.now()), “json”:dict(zip(header, message))}
producer.send_messages(‘topic’,json.dumps(packing).encode(‘utf-8’))

I consider adding a pool of kafka producers that will each read from an in memory buffer off the csv to produce the 100,000 lines per second. But a quick calculation tells me that this is still going to be too slow. A batch process of a 2gb file should take less than 1 minute, not close to 2 minutes.

So out comes Spark. I have a small cluster of 10 machines each exactly as lightweight as the previous machine: 4 cores, 8gb, 100gb. 1gb ethernet

I fire up 15 cores,  and proceed to break this 10 million row csv and send it across the wire. Here something magical happens. I can send the file in under 30 seconds, it takes some time for spark to warm up. In fact, Kafka shows that it’s recieving messages at 220mbs per second. That’s roughly 1 million messages per second during peak.

def sendkafkamap(messages):

kafka = KafkaClient([“host:9092”])
producer = SimpleProducer(kafka, async=False, batch_send_every_n=2000, batch_send_every_t=10)
for message in messages:

yield producer.send_messages(‘citi.test.topic.new’, message)

text = sc.textFile(“/user/sqoop2/LP_COLLAT_HISTORY_10M.csv”)

header = text.first().split(‘,’)

def remove_header(itr_index, itr):

return iter(list(itr)[1:]) if itr_index == 0 else itr

noHeader = text.mapPartitionsWithIndex(remove_header)

import json as json
import time
from kafka import SimpleProducer, KafkaClient
from datetime import datetime as dt
messageRDD = noHeader.map(lambda x: json.dumps({“timestamp”:int(time.time()), “json”:dict(zip(header, x.split(“,”)))}).encode(‘utf-8’))

sentRDD = messageRDD.mapPartitions(sendkafkamap)

This was surprising, because Jay Kreps could only get 193mb/s at the peak. And he was using machines significantly more powerful than mine on both the producer side and the kafka side. His machines had:

  • Intel Xeon 2.5 GHz processor with six cores
  • Six 7200 RPM SATA drives
  • 32GB of RAM
  • 1Gb Ethernet

kafka

I also ran this test over and over for 14 hours, and I could maintain a write speed close to 200mbs the entire time, though I did get more spikes than he did. My machines are roughly half as powerful, which leads me to the following conclusion. The network card is the real bottle neck here. So as long as you have many producers you can scale your production until the network card is saturated long before your in memory buffer or processor cause any problems. Because there is a lot of overhead for replication and sending acknowlegments. the 220mb is probably close to the maximum. Since I was replicating it 2 more times on the two other machines. That means, while the machine was recieving 220 it was also sending 440 out. that gives you 680 of the ethernet used. Then you have to account for all of the packing the messages recieve and acknowledgments. Perhaps, with some more tuning, I could have got another 100mb in. But there is another factor at play the Spikes.

Kafka, uses a very clever piece of engineering, basically kafka pushes all of the messages out of the jvm and into the write ahead log in memory and the OS decides when to flush to disk. If the messages fill the memory too fast and the OS can’t keep up, you get a spike down. So the extra memory really just gives you a longer time for the OS to catch up and flush to disk. Of course if this is happening nonstop for 4 hours and you are streaming roughly a terabyte (at 4gb per minute you get .24 terabytes an hour). Unless, you have a terabyte of main memory, you will get spikes. Because you will need to flush main memory to disk and while that happens the OS basically prevents kafka from writing to main memory so it’s consumptions speed plummets.

For more read Jay Krepsanswer to Kafka writes every message to broker disk. Still, performance wise it is better than some of the in-memory message storing message queues. Why is that? on Quora

http://www.quora.com/widgets/content

Alright, so we want to process large CSVs as if they were a stream. I don’t know what is fast enough for your use case. But for me, if we can build a system that allows us to onboard a terabyte in 4 hours then we are not far behind the batch system.

Can we do better, the answer is yes.

Until now, though we had three kafka servers. Only one was doing the consuming, since I was not partitioning the messages, I was simply producing to the leader at partition 0.  At this time I don’t have results from round robining to the kafka server. I will continue this train of thought in part II.