SQLAlchemy and MySQL Encoding

I recently ran into an issue with the encoding of data coming back from MySQL through sqlalchemy. This is the first time that I’ve encountered such issues since this project first came online, months ago.

I am using utf8 encoding on my database, tables, and columns. I just added a new column, and suddenly my pages and/or AJAX calls started failing with one of the following two messages, respectively:

  • UnicodeDecodeError: ‘ascii’ codec can’t decode byte 0x96 in position 5: ordinal not in range(128)
  • UnicodeDecodeError: ‘utf8’ codec can’t decode byte 0x96 in position 5: invalid start byte

When I tell the stored procedure to return an empty string for the new column instead of its data, it works. The other text columns have an identical encoding.

It turns out that SQLAlchemy defaults to the latin1 encoding. If you need something different, than you’re in for a surprise. The official solution is to pass the “encoding” parameter to create_engine. This is the example from the documentation:

engine = create_engine("mysql://scott:tiger@hostname/dbname", encoding='latin1', echo=True)

In my case, I tried utf8. However, it still didn’t work. I don’t know if that ever works. It wasn’t until I uncovered a StackOverflow entry that I found the answer. I had to append “?charset=utf8” to the DSN string:

mysql+mysqldb://username:password@hostname:port/database_name?charset=utf8

The following are the potential explanations:

  • Since I copy and pasted values that were set into these columns, I accidentally introduced a character that was out of range.
  • The two encodings have an overlapping set of codes, and I finally introduced a character that was supported by one but not the other.

Whatever the case, it’s fixed and I’m a few hours older.

Using ctypes to Read Binary Data from a Double-Pointer

This is a sticky and exotic use-case of ctypes. In the example below, we make a call to some library function that treats ptr like a double-pointer, and sets ptr to point to a buffer and sets count with the number of bytes that are available there. The data at the pointer may have one or more NULL bytes that should not be interpreted as terminators.

from ctypes import *

ptr = ctypes.c_char_p()
count = ctypes.c_size_t()

r = library.some_call(
        ctypes.cast(ctypes.byref(ptr), 
                    ctypes.POINTER(ctypes.c_void_p)), 
        ctypes.byref(count))

if r != 0:
    raise ValueError("Library call failed.")

data = ctypes.string_at(ptr, count.value)

Method Overloads in Python 3.4

Python 3.4 added a “singledispatch” decorator to functools, which provides method overloads. This enables you to perform different operations based on the type of the first argument.

By default, it prefers to work with static methods. This mostly comes from the link above:

import functools


class TestClass(object):
    @functools.singledispatch
    def test_method(arg):
        print("Let me just say,", end=" ")
        print(arg)

    @test_method.register(int)
    def _(arg):
        print("Strength in numbers, eh?", end=" ")
        print(arg)

    @test_method.register(list)
    def _(arg):
        print("Enumerate this:")

        for i, elem in enumerate(arg):
            print(i, elem)

if __name__ == '__main__':
    TestClass.test_method(55555)
    TestClass.test_method([33, 22, 11])

However, there is a low-impact way to get overloading on instance-methods, too. We’ll just place our own wrapper around the standard singledispatch wrapper, and hijack the bulk of the functionality:

import functools

def instancemethod_dispatch(func):
    dispatcher = functools.singledispatch(func)
    def wrapper(*args, **kw):
        return dispatcher.dispatch(args[1].__class__)(*args, **kw)
    wrapper.register = dispatcher.register
    functools.update_wrapper(wrapper, func)
    return wrapper


class TestClass2(object):
    @instancemethod_dispatch
    def test_method(self, arg):
        print("2: Let me just say,", end=" ")
        print(arg)

    @test_method.register(int)
    def _(self, arg):
        print("2: Strength in numbers, eh?", end=" ")
        print(arg)

    @test_method.register(list)
    def _(self, arg):
        print("2: Enumerate this:")

        for i, elem in enumerate(arg):
            print(i, elem)

if __name__ == '__main__':
    t = TestClass2()
    t.test_method(55555)
    t.test_method([33, 22, 11])

Aside from superficial changes to the original example, we just added the instancemethod_dispatch function and updated the methods to take a “self” argument.

A special thanks to Zero Piraeus for penning the instancemethod_dispatch method (under the original name of “methdispatch”).

Hidden in Plain Site: The Python print() Statement

The use of print() is so commonplace and thoughtless that it’s easy to forget that it’s still a function. There are parameters often neglected. You may even find yourself using sys.stdout to avoid the automatic newline, which is folly.

This is the signature, as of 3.4:

print(*objects, sep=' ', end='\n', file=sys.stdout, flush=False)

None of the parameters need an explanation. The flush parameter was added in 3.3 .

Python Function Annotations

Python 3.x introduced a relatively unknown feature called “function annotations”. This introduces a way to tag parameters and your return value with arbitrary information at the function-definition level.

You can annotate using strings or any other type that you’d like:

>>> def some_function(parm1: "Example parameter"):
...   pass
... 
>>> some_function.__annotations__
{'parm1': 'Example parameter'}
>>> x = 5
>>> def some_function_2(parm1: x * 20):
...   pass
... 
>>> some_function_2.__annotations__
{'parm1': 100}

You can also annotate the return:

>>> def some_function_3() -> 'return-value tag':
...   pass
... 
>>> some_function_3.__annotations__
{'return': 'return-value tag'}

It’s important to note that there are already strong conventions in how to document your parameters, thanks to Sphinx. Therefore, the utility of annotations will most likely be entirely in terms of functionality. For example, you can annotate closures on-the-fly:

import random

c_list = []
for i in range(10):
    def closure_() -> random.random():
        pass

    c_list.append(closure_)

list(map(lambda x: print(x.__annotations__), c_list))

This is the output:

{'return': 0.9644971188983055}
{'return': 0.8639746158842893}
{'return': 0.18610468531065305}
{'return': 0.8528801446167985}
{'return': 0.3022338513329076}
{'return': 0.6455491244718428}
{'return': 0.09106740460937834}
{'return': 0.16987808849543917}
{'return': 0.9136478506241527}
{'return': 0.41691681086623544}

Absolutely nothing in the Python language or library is dependent on annotations, so they’re yours to play with or implement as you see fit.

Doing Fast Multipart Uploads to S3 Using Greenlets

S3 allows you to upload pieces of large files in parallel. Unfortunately, most/all of the examples that I’ve seen online are inefficient or inconvenient. For example:

  • Physical file splits of the original file: If you couldn’t guess that S3 would have a way to work off a single copy of the source file, than you probably shouldn’t be using this functionality.
  • Threading: Threads don’t truly run in parallel (in Python).
  • Function-based designs (as opposed to class-based): I’ve never been a fan of this in Python. Too much context info has to be curried.
  • Using multiprocessing: For every upload, you’ll have a number of processes, and all will still be in competition for the network device.

None of these strategies hold a candle to Greenlets (running off different file-pointers to the same physical copy of the file).

This example is located at RandomUtility: s3_parallel.

This is the principal class. Go to the original source for the imports and the couple module-level constants.

class ParallelUpload(object):
    def __init__(self, ak, sk, bucket_name, filepath, 
                 chunk_size_b=_DEFAULT_CHUNK_SIZE_B,
                 monitor_interval_s=_DEFAULT_MONITOR_INTERVAL_S):
        self.__ak = ak
        self.__sk = sk
        self.__bucket_name = bucket_name
        self.__filepath = filepath
        self.__s3_key_name = os.path.basename(filepath)
        self.__chunk_size_b = chunk_size_b
        self.__coverage = 0.0
        self.__monitor_interval_s = _DEFAULT_MONITOR_INTERVAL_S

        self.__filesize_b = os.path.getsize(self.__filepath)
        self.__chunks = int(math.ceil(float(self.__filesize_b) / 
                                      float(self.__chunk_size_b)))

        self.__progress = [0.0] * self.__chunks

    def __get_bucket(self, bucket_name):
        conn = boto.s3.connection.S3Connection(self.__ak, self.__sk)
        return conn.lookup(bucket_name)

    def __standard_upload(self):
        bucket = self.__get_bucket(self.__bucket_name)
        new_s3_item = bucket.new_key(self.__s3_key_name)
        new_s3_item.set_contents_from_filename(
            self.__filepath, 
            cb=self.__standard_cb, 
            num_cb=20)

    def __standard_cb(self, current, total):
        _logger.debug("Status: %.2f%%", float(current) / float(total) * 100.0)

    def __multipart_cb(self, i, current, total):
        self.__progress[i] = float(current) / float(total) * 100.0

    def __transfer_part(self, (mp_info, i, offset)):
        (mp_id, mp_key_name, mp_bucket_name) = mp_info

        bucket = self.__get_bucket(mp_bucket_name)
        mp = boto.s3.multipart.MultiPartUpload(bucket)
        mp.key_name = mp_key_name
        mp.id = mp_id

        # At any given time, this will describe the farther percentage into the 
        # file that we're actively working on.
        self.__coverage = max(
                            (float(offset) / float(self.__filesize_b) * 100.0), 
                            self.__coverage)

        # The last chunk might be shorter than the rest.
        eff_chunk_size = min(offset + self.__chunk_size_b, 
                             self.__filesize_b) - \
                         offset

        with open(filepath, 'rb') as f:
            f.seek(offset)
            mp.upload_part_from_file(
                f, 
                i + 1, 
                size=eff_chunk_size, 
                cb=functools.partial(self.__multipart_cb, i), 
                num_cb=100)

    def __mp_show_progress(self):
        while 1:
            columns = [("%3d%% " % self.__progress[i]) 
                       for i 
                       in range(self.__chunks)]

            pline = ' '.join(columns)
            _logger.debug(pline)

            gevent.sleep(self.__monitor_interval_s)

    def __multipart_upload(self):
        bucket = self.__get_bucket(self.__bucket_name)

        mp = bucket.initiate_multipart_upload(self.__s3_key_name)
        mp_info = (mp.id, mp.key_name, mp.bucket_name)
        chunk_list = range(0, self.__filesize_b, self.__chunk_size_b)

        try:
            gen = ((mp_info, i, offset) 
                   for (i, offset) 
                   in enumerate(chunk_list))

            f = functools.partial(gevent.spawn, self.__transfer_part)

            if self.__monitor_interval_s > 0:
                p = gevent.spawn(self.__mp_show_progress)

            g_list = map(f, gen)

            gevent.joinall(g_list)

            if self.__monitor_interval_s > 0:
                p.kill()
                p.join()
        except:
            mp.cancel_upload()
            raise
        else:
            mp.complete_upload()

    def start(self):
        if self.__filesize_b < _MIN_MULTIPART_SIZE_B:
            self.__standard_upload()
        else:
            self.__multipart_upload()

The output when called as a command will look like this:

$ python s3_parallel.py (access key) (secret key) (bucket name) (file-path)
2014-06-17 10:16:48,458 - __main__ - DEBUG -   0%    0%    0%    0%    0%    0%    0% 
2014-06-17 10:16:58,459 - __main__ - DEBUG -   3%    3%    2%    2%    2%    1%    7% 
2014-06-17 10:17:08,460 - __main__ - DEBUG -   6%    5%    5%    4%    5%    4%   14% 
2014-06-17 10:17:18,461 - __main__ - DEBUG -  10%    7%    8%    8%    7%    6%   18% 
2014-06-17 10:17:28,461 - __main__ - DEBUG -  16%   10%   13%   11%   10%    8%   26% 
2014-06-17 10:17:38,462 - __main__ - DEBUG -  21%   14%   20%   15%   14%   12%   35% 
2014-06-17 10:17:48,462 - __main__ - DEBUG -  26%   17%   27%   19%   19%   15%   48% 
2014-06-17 10:17:58,463 - __main__ - DEBUG -  32%   20%   33%   24%   24%   18%   59% 
2014-06-17 10:18:08,463 - __main__ - DEBUG -  37%   24%   39%   29%   28%   22%   70% 
2014-06-17 10:18:18,464 - __main__ - DEBUG -  43%   28%   44%   34%   32%   26%   82% 
2014-06-17 10:18:28,464 - __main__ - DEBUG -  48%   31%   50%   39%   36%   31%   91% 
2014-06-17 10:18:38,465 - __main__ - DEBUG -  52%   35%   55%   44%   43%   36%  100% 
2014-06-17 10:18:48,465 - __main__ - DEBUG -  60%   39%   63%   47%   47%   40%  100% 
2014-06-17 10:18:58,466 - __main__ - DEBUG -  68%   44%   69%   53%   53%   45%  100% 
2014-06-17 10:19:08,466 - __main__ - DEBUG -  77%   49%   75%   58%   57%   49%  100% 
2014-06-17 10:19:18,467 - __main__ - DEBUG -  83%   54%   84%   65%   62%   52%  100% 
2014-06-17 10:19:28,467 - __main__ - DEBUG -  88%   58%   90%   71%   69%   58%  100% 
2014-06-17 10:19:38,468 - __main__ - DEBUG -  96%   61%   96%   77%   74%   63%  100% 
2014-06-17 10:19:48,468 - __main__ - DEBUG - 100%   67%  100%   83%   83%   70%  100% 
2014-06-17 10:19:58,469 - __main__ - DEBUG - 100%   73%  100%   93%   93%   76%  100% 
2014-06-17 10:20:08,469 - __main__ - DEBUG - 100%   83%  100%  100%  100%   86%  100% 
2014-06-17 10:20:18,470 - __main__ - DEBUG - 100%   95%  100%  100%  100%  100%  100% 

Python 3: Opening for Write, but Failing if it Already Exists

Python 3.3 added a new file mode that allows you to create a new file and open it for write only if it does not already exist.

>>> with open('new_file', 'x') as f:
...   pass
... 
>>> with open('new_file', 'x') as f:
...   pass
... 
Traceback (most recent call last):
  File "", line 1, in 
FileExistsError: [Errno 17] File exists: 'new_file'

Spawn an SSL Webserver in Your Python Unit-Tests

You might eventually have to unit-test a website that has a functional need to be run as SSL. For example, you might need to test a client that must connect using SSL authentication.

You can accomplish this by combining Python’s built-in webserver with ssl.SSLSocket.

This code is a distant relative of another example, but is lighter, simpler, and more Pythonic.

It runs out of the current directory (you’ll have to chdir() from the code if you want something different, since the webserver doesn’t take a path), and expects server.private_key.pem and server.crt.pem to exist.

import os.path
import socket
import SocketServer
import BaseHTTPServer
import SimpleHTTPServer
import ssl

class _SecureHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
    def setup(self):
        self.connection = self.request
        self.rfile = socket._fileobject(self.request, 'rb', self.rbufsize)
        self.wfile = socket._fileobject(self.request, 'wb', self.wbufsize)

class _SecureHTTPServer(BaseHTTPServer.HTTPServer):
    def __init__(self, private_key_pem_filepath, cert_pem_filepath,
                 binding=None, handler_cls=_SecureHTTPRequestHandler):
        if binding is None:
            # The default port is 1443 so that we don't have to be root.
            binding = ('', 1443)

        # We can't use super() because it's not a new-style class.
        SocketServer.BaseServer.__init__(self, binding, handler_cls)

        s = socket.socket(self.address_family, self.socket_type)
        self.socket = ssl.SSLSocket(
                        s,
                        keyfile=private_key_pem_filepath,
                        certfile=cert_pem_filepath)

        self.server_bind()
        self.server_activate()

app_path = os.path.abspath(os.path.dirname(__file__))

private_key_pem_filepath = os.path.join(app_path, 'server.private_key.pem')
certificate_pem_filepath = os.path.join(app_path, 'server.crt.pem')

httpd = _SecureHTTPServer(
            private_key_pem_filepath,
            certificate_pem_filepath)

print("Running.")
httpd.serve_forever()

This code may also be found in the RandomUtility repository.

Easy and Loveable Cartesian Products in Python

Use more than one for in the same list comprehension:

[(i, j, k) 
 for i in (11, 22, 33) 
 for j in (44, 55, 66) 
 for k in (77, 88, 99)]

This results in all possible permutations (a cartesian product), where enumeration starts on the right:

[(11, 44, 77), (11, 44, 88), (11, 44, 99), 
 (11, 55, 77), (11, 55, 88), (11, 55, 99), 
 (11, 66, 77), (11, 66, 88), (11, 66, 99), 

 (22, 44, 77), (22, 44, 88), (22, 44, 99), 
 (22, 55, 77), (22, 55, 88), (22, 55, 99), 
 (22, 66, 77), (22, 66, 88), (22, 66, 99), 

 (33, 44, 77), (33, 44, 88), (33, 44, 99), 
 (33, 55, 77), (33, 55, 88), (33, 55, 99), 
 (33, 66, 77), (33, 66, 88), (33, 66, 99)]

Reading Keypresses Under Python

An elegant solution for reading a individual keypresses under Python.

import termios, sys, os

def read_keys():
    fd = sys.stdin.fileno()
    old = termios.tcgetattr(fd)
    new = termios.tcgetattr(fd)
    new[3] = new[3] & ~termios.ICANON & ~termios.ECHO
    new[6][termios.VMIN] = 1
    new[6][termios.VTIME] = 0
    termios.tcsetattr(fd, termios.TCSANOW, new)
    try:
        while 1:
            yield os.read(fd, 1)
    finally:
        termios.tcsetattr(fd, termios.TCSAFLUSH, old)

Example:

>>> for key in read_keys():
...   print("KEY: %s" % (key))
... 
KEY: g
KEY: i
KEY: f
KEY: d
KEY: s
KEY: w
KEY: e

Inspired by this.