Если вы решили поменять сериализатор задач для Celery с Pickle на какой-нибудь другой, ничего не выйдет. Вернее задачи действительно будут сериализоваться и десериализоваться с помощью указанного сериализатора. Но внутри Celery все равно используется Pickle и это никак не изменить. Вернее не в Celery, а в библиотеке billiard, которая используется мастер-процессом воркера для раздачи задач из очереди подпроцессам.

В нашем проекте мы столкнулись с тем, что мы иногда передаем в качестве параметра задач Celery NoneType. Причем не значение None, а именно NoneType.

from celery import Celery


app = Celery('app', broker='mongodb://localhost:27017/celery_dumps')


@app.task
def print_xy(x):
    print(type(x))


if __name__ == '__main__':
    print_xy.delay(type(None))

Не знаю как в Python 3, но в Python 2.7 за подобные вольности наказывают:

> python app.py

PicklingError: Can\'t pickle <type \'NoneType\'>

Я решил обойти несправедливость с NoneType за счет смены сериализатора на JSON. Это делается достаточно легко. Научил его сериализовать/десериализовать нужные нам типы данных (datetime и тому подобное). Код начал исполняться, вот только задачи пропадали, а воркеры "залипали". Оказалось, что проблема переехала в мастер процесс воркера. Все так же:

Task Handler ERROR: PicklingError("Can't pickle <type 'NoneType'>: attribute lookup __builtin__.NoneType failed",)

Подмена стандартного pickle на dill в kombu, предложенная maximilianr, не поможет. Библиотека billiard выбирает между pypickle и cPickle и ей плевать, что прописано в kombu.serialization.pickle.

from celery import Celery
import dill as dill
from io import BytesIO
import kombu


def add_dill():
    registry = kombu.serialization.registry
    kombu.serialization.pickle = dill

    registry.unregister('pickle')

    def pickle_loads(s, load=dill.load):
        return load(BytesIO(s))

    def pickle_dumps(obj, dumper=dill.dumps):
        return dumper(obj, protocol=kombu.serialization.pickle_protocol)

    registry.register('pickle', pickle_dumps, pickle_loads,
                      content_type='application/x-python-serialize',
                      content_encoding='binary')


add_dill()  # Celery плевать на это
app = Celery('app', broker='mongodb://localhost:27017/celery_dumps')


@app.task
def print_xy(x):
    print(type(x))


if __name__ == '__main__':
    print_xy.delay(type(None))

Остается только метать лучи гнева, переписывать код так, что бы он был pickle-friendly, либо грязными хаками менять поведение стандартного pickle.

Если знаете элегантное решение этой проблемы, пишите в комментариях.



c