Process safe connection pool for psycopg2 (postgresql)

published Nov 04, 2014 10:30   by admin ( last modified Nov 04, 2014 10:37 )

Summary: Gunicorn will fork its workers after the psycopg2 connection pool has started multiplying connections to postgresql. When gunicorn forks new process they will copy these connections. Because of this, several web server processes will clobber each other's connections to postgresql. Use a proxy class to re-initialise the pool, when running under a new process as shown in the code below.

One solution is to make the connection pool detect when it is running under a new process id and reset itself. The below code (which is in testing, use at your own risk) does this (Gist version here: Process safe pool manager for psycopg2):

import os
from psycopg2.pool import ThreadedConnectionPool


class ProcessSafePoolManager:

    def __init__(self, *args, **kwargs):
        self.last_seen_process_id = os.getpid()
        self.args = args
        self.kwargs = kwargs
        self._init()

    def _init(self):
        self._pool = ThreadedConnectionPool(*self.args, **self.kwargs)

    def getconn(self):
        current_pid = os.getpid()
        if not (current_pid == self.last_seen_process_id):
            self._init()
            print "New id is %s, old id was %s" % (current_pid, self.last_seen_process_id)
            self.last_seen_process_id = current_pid
        return self._pool.getconn()

    def putconn(self, conn):
        return self._pool.putconn(conn)

pool = ProcessSafePoolManager(1, 10, "host='127.0.0.1' port=12099")

Background

I'm working on a server (using the bottle micro framework, but that's optional) that needs to both serve lots of big files and do some CPU intensive work. It uses postgresql as a storage back end. An asynchronous server such as tornado or bjoern is good for serving big files a, but for the CPU heavy work several processes are needed, since python threads don't spread themselves out on the processor cores.

Gunicorn is a python server framework where you in one line of code can tell it to use asynchronous servers, and something called workers, which each one is a separate process. In bottle.py you can write something like this:

    run(host='localhost',  server='gunicorn', workers=15, worker_class="tornado", port=8080)

...and just like that you will have 15 parallel threaded Tornado servers at your disposal! I may make an egg of the above code if it seems useful in the long run.

Problem is, as soon as I set workers above 1, strange errors started to happen. After a lot of testing I started to suspect that the different workers were clobbering each other's TCP/IP connections, that is they were listening on the same ports for data from postgresql. Finally I found this web page that validated my suspicions: Celery jobs throw exceptions · Issue #3 · kennethreitz/django-postgrespool

One solution purportedly used by uwsgi is to fork before the connection pool is created. I thought of diving into the gunicorn code, or using exceptions and other things, but the quickest and cleanest solution (although maybe not the most performant) seemed to be to proxy the connection pool object from psycopg2. The proxy object checks to see if it is still running under the same process id as the previous time it was used, and if not, it will reinitialise the psycopg2 connection pool.

Optimisations

It seems that gunicorn always forks from a mother process, so once the process id has changed from the perspective of the pool manager, it will not change again. One could then do some live monkey patching to get rid of testing for a new process id after the first time it has changed, which may speed up performance. Or just set an attribute: self._changed_once = True.

Caveats and bugs

There probably are such. What happens to the old pool object once it is replaced? Are there dangling refereces to it?