Python can handle thread, but it's only usable for non blocking tasks. Doomed by its big giant lock, you can't handle more than one core.

Zeromq is a library for abstracting and simplifying communication between applications. It means Zero Message Queue. It's a low level tool with a lots of binding. The python binding, pyzmq is pretty complete. Python can handle socket, but zmq do the dirty works, with simplicity.

brew install zmq
sudo pip install pyzmq

My example is common. I wont to parse log and storing it in a NOSQL base. My firsts tests are done with mongodb, because its website is more cute than riak's one. Reading log file is fast, writing Mongodb is fast too, parsing line is CPU intensive. With zeromq you have to find the right pattern, the right already invented wheel. For my task, the work queue task is the right pattern. There is a task list, and workers wich pick one, chew it, and store the result. The multiple server pattern seems to be a goog idea, but using the work queue as a server, and worker as a client is the simplest way.

The queue read the log files, handling it as an iterator. Each client ask for a task, the queue iterate once and answer the line. When the log files are finished, the queue answer an empty string for each query, workers stop querying. You can start one worker per core, per computer. For this example, I use ipc connection, Inter Process Communication, also knows as UNIX socket.

Server

c = zmq.Context()
s = c.socket(zmq.REP)
s.bind('ipc:///tmp/logator')

print "waiting for workers"
for line in sys.stdin:
	msg = s.recv(copy=False)
	s.send(line)
	
while True:
	msg = s.recv(copy=False)
	s.send('')

For each log's line (coming from a zcat piped to the script), we are waiting for a worker question. We don't handle what it's saying, we just gives its work. When it's done, we answer "no more work" for every one.

Client

c = zmq.Context()
s = c.socket(zmq.REQ)
#s.connect('tcp://127.0.0.1:10001')
s.connect('ipc:///tmp/logator')

class Logs(object):
	def __iter__(self):
		while True:
			s.send('', copy=False)
			line = s.recv(copy=False)
			if line == '': break
			yield str(line)

The log analyzer needs an iterator. The worker ask for a job, got it, yield it.

Real code are in the logator project.

With this strategy, each work can consume up to 75% CPU. It's hummer strategy, my CPU is fully used, 300% for 4 cores.

  • 1 worker : 1600 lines/second
  • 2 workers: 2700 lines/second (1350/worker)
  • 3 workers: 3400 lines/second (1133/worker)
  • 4 workers: 3800 lines/second (950/worker)

This is a zeromq poc to make my processor hot. If a worker crash, the log's line is lost. If you wont a real worker queue, have a look to beanstalkd or the amazon's SQS, boto, the python wrapper provides a nice API.