Асинхронное программирование: концепция Deferred
Асинхронная концепция программирования заключается в том, что результат выполнения функции доступен не сразу же, а через некоторое время в виде
некоторого асинхронного (нарушающего обычный порядок выполнения) вызова. Зачем
такое может быть полезно? Рассмотрим несколько примеров.
Первый пример - сетевой сервер, веб-приложение. Чаще всего как таковых
вычислений на процессоре такие приложения не выполняют. Большая часть времени
(реального, не процессорного) тратится на ввод-вывод: чтение запроса от
клиента, обращение к диску за данными, сетевые обращение к другим подсистемам
(БД, кэширующие сервера, RPC и т.п.), запись ответа клиенту. Во время этих
операций ввода-вывода процессор простаивает, его можно загрузить обработкой
запросов других клиентов. Возможны различные способы решить эту задачу: отдельный
процесс на каждое соединение (Apache mpm_prefork, PostgreSQL, PHP FastCGI),
отдельный поток (нить) на каждое соединение или комбинированный вариант
процесс/нить (Apache mpm_worker, MySQL). Подход с использованием процессов или
нитей перекладывает мультиплексирование процессора между обрабатываемыми
соединениями на ОС, при этом расходуется относительно много ресурсов (память,
переключения контекста и т.п.), такой вариант не подходит для обработки
большого количества одновременных соединений, но идеален для ситуации, когда
объем вычислений достаточно высок (например, в СУБД). К плюсам модели нитей и
процессов можно добавить потенциальное использование всех доступных процессоров
в многопроцессорной архитектуре.
Альтернативой является использование однопоточной модели с использованием
примитивов асинхронного ввода-вывода, предоставляемых ОС (select, poll, и
т.п.). При этом объем ресурсов на каждое новое обслуживаемое соединение не
такой большой (новый сокет, какие-то структуры в памяти приложения). Однако
программирование существенно усложняется, т.к. данные из сетевых сокетов
поступают некоторыми "отрывками", причем за один цикл обработки данные поступают от разных
соединений, находящихся в разных состояниях, часть соединений могут быть
входящими от клиентов, часть - исходящими к внешним ресурсам (БД, другой сервер
и т.п.). Для упрощения разработки используются различные концепции: callback,
конечные автоматы и другие. Примеры сетевых серверов, использующих асинхронный
HAProxy, pgBouncer, и т.д. Именно при такой
однопоточной модели возникает необходимость в асинхронном программировании.
Например, мы хотим выполнить запрос в БД. С точки зрения программы выполнение
запроса - это сетевой ввод-вывод: соединение с сервером, отправка запроса,
ожидание ответа, чтение ответа сервера БД. Поэтому если мы вызываем
функцию "выполнить запрос БД", то она сразу вернуть результат не сможет
(иначе она должна была бы заблокироваться), а вернет лишь нечто, что позволит
впоследствие получить результат запроса или, возможно, ошибку (нет соединения
с сервером, некорректный запрос и т.п.) Этим возвращаемым значением удобно
сделать именно Deferred.
Второй пример связан с разработкой обычных десктопных приложений. Предположим,
мы решили сделать аналог Miranda (QIP, MDC, ...), то есть свой мессенджер.
В интерфейсе программы есть контакт-лист, где можно удалить контакт. Когда
пользователь выбирает это действие, он ожидает что контакт исчезнет на экране
и что он действительно удалится из контакт-листа. На самом деле операция
удаления из серверного контакт-листа опирается на сетевое взаимодействие
с сервером, при этом пользовательский интерфейс не должен быть заблокирован
на время выполнения этой операции, поэтому в любом случае после выполнения
операции потребуется некоторое асинхронное взаимодействие с результатом операции.
Можно использовать механизм сигналов-слотов, callback'ов или что-то еще,
но лучше всего подойдет Deferred: операция удаления из контакт-листа возвращает
Deferred, в котором обратно придет либо положительный результат (всё хорошо),
либо исключение (точная ошибка, которую надо сообщить пользователю): в случае
ошибки контакт надо восстановить контакт в контакт-листе.
Примеры можно приводить долго и много, теперь о том, что же такое Deferred.
Deferred - это сердце framework'а асинхронного сетевого программирования
Twisted в Python. Это простая и стройная концепция, которая
позволяет перевести синхронное программирование в асинхронный код, не изобретая
велосипед для каждой ситуации и обеспечивая высокое качества кода.
Deferred - это просто возвращаемый результат функции, когда этот результат
неизвестен (не был получен, будет получен в другой нити и т.п.) Что мы можем
сделать с Deferred? Мы можем "подвеситься" в цепочку обработчиков, которые
будут вызваны, когда результат будет получен. При этом Deferred может нести не
только положительный результат выполнения, но и исключения, сгенерированные
функцией или обработчиками, есть возможность исключения обработать, перевыкинуть
и т.д. Фактически, для синхронного кода есть более-менее однозначная параллель в
терминах Deferred. Для эффективной разработки с Deferred оказываются полезными
такие возможности языка программирования, как замыкания, лямбда-функци.
Приведем пример синхронного кода и его альтернативу в терминах Deferred:
try:
# Скачать по HTTP некоторую страницу
page = downloadPage(url)
# Распечатать содержимое
print page
except HTTPError, e:
# Произошла ошибка
print "An error occured: %s", e
В асинхронном варианте с Deferred он был бы записан следующим образом:
def printContents(contents):
"""
Callback, при успешном получении текста страницы,
распечатываем её содержимое.
"""
print contents
def handleError(failure):
"""
Errback (обработчик ошибок), просто распечатываем текст ошибки.
"""
# Мы готовы обработать только HTTPError, остальные исключения
# "проваливаются" ниже.
failure.trap(HTTPError)
# Распечатываем само исключение
print "An error occured: %s", failure
# Теперь функция выполняется асинхронно и вместо непосредственного
# результата мы получаем Deferred
deferred = downloadPage(url)
# Навешиваем на Deferred-объект обработчики успешных результатов
# и ошибок (callback, errback).
deferred.addCallback(printContents)
deferred.addErrback(handleError)
На практике обычно мы возвращаем Deferred из функций, которые получают
Deferred в процессе своей работы, навешиваем большое количество обработчиков,
обрабатываем исключения, некоторые исключения возвращаем через Deferred (выбрасываем
наверх). В качестве более сложного примера приведем код в асинхронном
варианте для примера атомарного счетчика из статьи про структуры данных в memcached,
здесь мы предполагаем, что доступ к memcached как сетевому сервису идет через Deferred, т.е.
методы класса Memcache возвращают Deferred (который вернет либо результат операции, либо ошибку):
class MCCounter(MemcacheObject):
def __init__(self, mc, name):
"""
Конструктор.
@param name: имя счетчика
@type name: C{str}
"""
super(MCCounter, self).__init__(mc)
self.key = 'counter' + name
def increment(self, value=1):
"""
Увеличить значение счетчика на указанное значение.
@param value: инкремент
@type value: C{int}
@return: Deferred, результат операции
"""
def tryAdd(failure):
# Обрабатываем только KeyError, всё остальное "вывалится"
# ниже
failure.trap(KeyError)
# Пытаемся создать ключ, если раз его еще нет
d = self.mc.add(self.key, value, 0)
# Если вдруг кто-то еще создаст ключ раньше нас,
# мы это обработаем
d.addErrback(tryIncr)
# Возвращаем Deferred, он "вклеивается" в цепочку
# Deferred, в контексте которого мы выполняемся
return d
def tryIncr(failure):
# Всё аналогично функции tryAdd
failure.trap(KeyError)
d = self.mc.incr(self.key, value)
d.addErrback(tryAdd)
return d
# Пытаемся выполнить инкремент, получаем Deferred
d = self.mc.incr(self.key, value)
# Обрабатываем ошибку
d.addErrback(tryAdd)
# Возвращаем Deferred вызывающему коду, он может тем самым:
# а) узнать, когда операция действительно завершится
# б) обработать необработанные нами ошибки (например, разрыв соединения)
return d
def value(self):
"""
Получить значение счетчика.
@return: текущее значение счетчика
@rtype: C{int}
@return: Deferred, значение счетчика
"""
def handleKeyError(failure):
# Обрабатываем только KeyError
failure.trap(KeyError)
# Ключа нет - возвращаем 0, он станет результатом
# вышележащего Deferred
return 0
# Пытаемся получить значение ключа
d = self.mc.get(self.key)
# Будем обрабатывать ошибку отсутствия ключа
d.addErrback(handleKeyError)
# Возвращаем Deferred, наверх там можно будет повеситься
# на его callback и получить искомое значение счетчика
return d
На практике приведенный выше код можно написать "короче", объединяя часто используемые операции, например:
return self.mc.get(self.key).addErrback(handleKeyError)
Практически для каждой конструкции синхронного кода можно найти аналог в асинхронной концепции с Deferred:
- последовательности синхронных операторов соответствует цепочка callback с асинхронными вызовами;
- вызову одной подпрграммы с вводом-выводом из другой соответствует возврат Deferred из Deferred (ветвление Deferred);
- глубокой цепочки вложенности, распространению исключений по стеку соответствует цепочка функций, возвращающие друг другу Deferred;
- блокам try..except соответствуют обработчики ошибок (errback), которые могут "пробрасывать" исключения дальше, любое исключение в callback переводит выполнение в errback;
- для "параллельного" выполнения асинхронных операций есть
DeferredList
.
Нити часто применяются в асинхронных программах для осуществления вычислительных процедур, осуществления блокирующегося
ввода-вывода (когда не существует асинхронного аналога). Всё это легко моделируется в простой модели 'worker', тогда
нет необходимости при грамотной архитектуре в явной синхронизации, при этом всё элегантно включается в общий поток
вычислений с помощью Deferred:
def doCalculation(a, b):
"""
В этой функции осуществляются вычисления, синхронные операции ввода-вывода,
не затрагивающие основной поток.
"""
return a/b
def printResult(result):
print result
def handleDivisionByZero(failure):
failure.trap(ZeroDivisionError)
print "Ooops! Division by zero!"
deferToThread(doCalculation, 3, 2).addCallback(printResult).addCallback(
lambda _: deferToThread(doCalculation, 3, 0).addErrback(handleDivisionByZero))
В приведенном выше примере функция deferToThread
переносит выполнение указанной функции в отдельную нить и возвращает
Deferred, через который будет асинхронно получен результат выполнения функции или исключение, если они будет выброшено.
Первое деление (3/2) выполняется в отдельной нити, затем распечатывается его результат на экран, а затем запускается
еще одно вычисление (3/0), которое генерирует исключение, обрабатываемое функцией handleDivisionByZero
.
В одной статье не описать и части того, что хотелось бы сказать о Deferred, мне удалось не написать ни слова о том,
как же они работают. Если успел заинтересовать - читайте материалы ниже, а я обещаю написать еще.
Дополнительные материалы
- Документация Twisted Framework:
- Основы асинхронного программирования
- Использование Deferred, Откуда берутся Deferred?, Детальное описание Deferred
- Deferred в других языках программирования:
- JavaScript: Использование Deferred в JavaScript, Deferred в qooxdoo, Deferred в Dojo, Deferred в MochiKit
- C++: 1, 2, 3
- ...
- Александр Бурцев о Twisted
Комментарии
Comments powered by Disqus