Celery: Pickle от которого невозможно отказаться
Опубликовано 10 August 2015 в Python
Если вы решили поменять сериализатор задач для Celery с Pickle на какой-нибудь другой, ничего не выйдет. Вернее задачи действительно будут сериализоваться и десериализоваться с помощью указанного сериализатора. Но внутри Celery все равно используется Pickle и это никак не изменить. Вернее не в Celery, а в библиотеке billiard, которая используется мастер-процессом воркера для раздачи задач из очереди подпроцессам.
В нашем проекте мы столкнулись с тем, что мы иногда передаем в качестве параметра задач Celery NoneType. Причем не значение None, а именно NoneType.
from celery import Celery
app = Celery('app', broker='mongodb://localhost:27017/celery_dumps')
@app.task
def print_xy(x):
print(type(x))
if __name__ == '__main__':
print_xy.delay(type(None))
Не знаю как в Python 3, но в Python 2.7 за подобные вольности наказывают:
> python app.py
PicklingError: Can\'t pickle <type \'NoneType\'>
Я решил обойти несправедливость с NoneType за счет смены сериализатора на JSON. Это делается достаточно легко. Научил его сериализовать/десериализовать нужные нам типы данных (datetime и тому подобное). Код начал исполняться, вот только задачи пропадали, а воркеры "залипали". Оказалось, что проблема переехала в мастер процесс воркера. Все так же:
Task Handler ERROR: PicklingError("Can't pickle <type 'NoneType'>: attribute lookup __builtin__.NoneType failed",)
Подмена стандартного pickle на dill в kombu, предложенная maximilianr, не поможет. Библиотека billiard выбирает между pypickle и cPickle и ей плевать, что прописано в kombu.serialization.pickle.
from celery import Celery
import dill as dill
from io import BytesIO
import kombu
def add_dill():
registry = kombu.serialization.registry
kombu.serialization.pickle = dill
registry.unregister('pickle')
def pickle_loads(s, load=dill.load):
return load(BytesIO(s))
def pickle_dumps(obj, dumper=dill.dumps):
return dumper(obj, protocol=kombu.serialization.pickle_protocol)
registry.register('pickle', pickle_dumps, pickle_loads,
content_type='application/x-python-serialize',
content_encoding='binary')
add_dill() # Celery плевать на это
app = Celery('app', broker='mongodb://localhost:27017/celery_dumps')
@app.task
def print_xy(x):
print(type(x))
if __name__ == '__main__':
print_xy.delay(type(None))
Остается только метать лучи гнева, переписывать код так, что бы он был pickle-friendly, либо грязными хаками менять поведение стандартного pickle.
Если знаете элегантное решение этой проблемы, пишите в комментариях.
Возник вопрос? Мне всегда можно написать в Twitter: avkorablev