Асинхронное программирование: концепция Deferred

Асинхронная концепция программирования заключается в том, что результат выполнения функции доступен не сразу же, а через некоторое время в виде

некоторого асинхронного (нарушающего обычный порядок выполнения) вызова. Зачем

такое может быть полезно? Рассмотрим несколько примеров.

Первый пример - сетевой сервер, веб-приложение. Чаще всего как таковых

вычислений на процессоре такие приложения не выполняют. Большая часть времени

(реального, не процессорного) тратится на ввод-вывод: чтение запроса от

клиента, обращение к диску за данными, сетевые обращение к другим подсистемам

(БД, кэширующие сервера, RPC и т.п.), запись ответа клиенту. Во время этих

операций ввода-вывода процессор простаивает, его можно загрузить обработкой

запросов других клиентов. Возможны различные способы решить эту задачу: отдельный

процесс на каждое соединение (Apache mpm_prefork, PostgreSQL, PHP FastCGI),

отдельный поток (нить) на каждое соединение или комбинированный вариант

процесс/нить (Apache mpm_worker, MySQL). Подход с использованием процессов или

нитей перекладывает мультиплексирование процессора между обрабатываемыми

соединениями на ОС, при этом расходуется относительно много ресурсов (память,

переключения контекста и т.п.), такой вариант не подходит для обработки

большого количества одновременных соединений, но идеален для ситуации, когда

объем вычислений достаточно высок (например, в СУБД). К плюсам модели нитей и

процессов можно добавить потенциальное использование всех доступных процессоров

в многопроцессорной архитектуре.

Альтернативой является использование однопоточной модели с использованием

примитивов асинхронного ввода-вывода, предоставляемых ОС (select, poll, и

т.п.). При этом объем ресурсов на каждое новое обслуживаемое соединение не

такой большой (новый сокет, какие-то структуры в памяти приложения). Однако

программирование существенно усложняется, т.к. данные из сетевых сокетов

поступают некоторыми "отрывками", причем за один цикл обработки данные поступают от разных

соединений, находящихся в разных состояниях, часть соединений могут быть

входящими от клиентов, часть - исходящими к внешним ресурсам (БД, другой сервер

и т.п.). Для упрощения разработки используются различные концепции: callback,

конечные автоматы и другие. Примеры сетевых серверов, использующих асинхронный

ввод-вывод: nginx, lighttpd,

HAProxy, pgBouncer, и т.д. Именно при такой

однопоточной модели возникает необходимость в асинхронном программировании.

Например, мы хотим выполнить запрос в БД. С точки зрения программы выполнение

запроса - это сетевой ввод-вывод: соединение с сервером, отправка запроса,

ожидание ответа, чтение ответа сервера БД. Поэтому если мы вызываем

функцию "выполнить запрос БД", то она сразу вернуть результат не сможет

(иначе она должна была бы заблокироваться), а вернет лишь нечто, что позволит

впоследствие получить результат запроса или, возможно, ошибку (нет соединения

с сервером, некорректный запрос и т.п.) Этим возвращаемым значением удобно

сделать именно Deferred.

Второй пример связан с разработкой обычных десктопных приложений. Предположим,

мы решили сделать аналог Miranda (QIP, MDC, ...), то есть свой мессенджер.

В интерфейсе программы есть контакт-лист, где можно удалить контакт. Когда

пользователь выбирает это действие, он ожидает что контакт исчезнет на экране

и что он действительно удалится из контакт-листа. На самом деле операция

удаления из серверного контакт-листа опирается на сетевое взаимодействие

с сервером, при этом пользовательский интерфейс не должен быть заблокирован

на время выполнения этой операции, поэтому в любом случае после выполнения

операции потребуется некоторое асинхронное взаимодействие с результатом операции.

Можно использовать механизм сигналов-слотов, callback'ов или что-то еще,

но лучше всего подойдет Deferred: операция удаления из контакт-листа возвращает

Deferred, в котором обратно придет либо положительный результат (всё хорошо),

либо исключение (точная ошибка, которую надо сообщить пользователю): в случае

ошибки контакт надо восстановить контакт в контакт-листе.

Примеры можно приводить долго и много, теперь о том, что же такое Deferred.

Deferred - это сердце framework'а асинхронного сетевого программирования

Twisted в Python. Это простая и стройная концепция, которая

позволяет перевести синхронное программирование в асинхронный код, не изобретая

велосипед для каждой ситуации и обеспечивая высокое качества кода.

Deferred - это просто возвращаемый результат функции, когда этот результат

неизвестен (не был получен, будет получен в другой нити и т.п.) Что мы можем

сделать с Deferred? Мы можем "подвеситься" в цепочку обработчиков, которые

будут вызваны, когда результат будет получен. При этом Deferred может нести не

только положительный результат выполнения, но и исключения, сгенерированные

функцией или обработчиками, есть возможность исключения обработать, перевыкинуть

и т.д. Фактически, для синхронного кода есть более-менее однозначная параллель в

терминах Deferred. Для эффективной разработки с Deferred оказываются полезными

такие возможности языка программирования, как замыкания, лямбда-функци.

Приведем пример синхронного кода и его альтернативу в терминах Deferred:

try: # Скачать по HTTP некоторую страницу page = downloadPage(url) # Распечатать содержимое print page except HTTPError, e: # Произошла ошибка print "An error occured: %s", e

В асинхронном варианте с Deferred он был бы записан следующим образом:


def printContents(contents):
    """
    Callback, при успешном получении текста страницы,
    распечатываем её содержимое.
    """
    print contents


def handleError(failure):
    """
    Errback (обработчик ошибок), просто распечатываем текст ошибки.
    """

    # Мы готовы обработать только HTTPError, остальные исключения
    # "проваливаются" ниже.
    failure.trap(HTTPError)
    # Распечатываем само исключение
    print "An error occured: %s", failure


# Теперь функция выполняется асинхронно и вместо непосредственного

# результата мы получаем Deferred

deferred = downloadPage(url)

# Навешиваем на Deferred-объект обработчики успешных результатов

# и ошибок (callback, errback).

deferred.addCallback(printContents)

deferred.addErrback(handleError)

На практике обычно мы возвращаем Deferred из функций, которые получают

Deferred в процессе своей работы, навешиваем большое количество обработчиков,

обрабатываем исключения, некоторые исключения возвращаем через Deferred (выбрасываем

наверх). В качестве более сложного примера приведем код в асинхронном

варианте для примера атомарного счетчика из статьи про структуры данных в memcached,

здесь мы предполагаем, что доступ к memcached как сетевому сервису идет через Deferred, т.е.

методы класса Memcache возвращают Deferred (который вернет либо результат операции, либо ошибку):


class MCCounter(MemcacheObject):
    def __init__(self, mc, name):
        """
        Конструктор.

        @param name: имя счетчика
        @type name: C{str}
        """
        super(MCCounter, self).__init__(mc)
        self.key = 'counter' + name

    def increment(self, value=1):
        """
        Увеличить значение счетчика на указанное значение.

        @param value: инкремент
        @type value: C{int}
        @return: Deferred, результат операции
        """
        def tryAdd(failure):
            # Обрабатываем только KeyError, всё остальное "вывалится"
            # ниже
            failure.trap(KeyError)

            # Пытаемся создать ключ, если раз его еще нет
            d = self.mc.add(self.key, value, 0)
            # Если вдруг кто-то еще создаст ключ раньше нас,
            # мы это обработаем
            d.addErrback(tryIncr)
            # Возвращаем Deferred, он "вклеивается" в цепочку
            # Deferred, в контексте которого мы выполняемся
            return d

        def tryIncr(failure):
            # Всё аналогично функции tryAdd
            failure.trap(KeyError)

            d = self.mc.incr(self.key, value)
            d.addErrback(tryAdd)
            return d

        # Пытаемся выполнить инкремент, получаем Deferred
        d = self.mc.incr(self.key, value)
        # Обрабатываем ошибку
        d.addErrback(tryAdd)
        # Возвращаем Deferred вызывающему коду, он может тем самым:
        #  а) узнать, когда операция действительно завершится
        #  б) обработать необработанные нами ошибки (например, разрыв соединения)
        return d

    def value(self):
        """
        Получить значение счетчика.

        @return: текущее значение счетчика
        @rtype: C{int}
        @return: Deferred, значение счетчика
        """
        def handleKeyError(failure):
            # Обрабатываем только KeyError
            failure.trap(KeyError)

            # Ключа нет - возвращаем 0, он станет результатом
            # вышележащего Deferred
            return 0

        # Пытаемся получить значение ключа
        d = self.mc.get(self.key)
        # Будем обрабатывать ошибку отсутствия ключа
        d.addErrback(handleKeyError)
        # Возвращаем Deferred, наверх там можно будет повеситься
        # на его callback и получить искомое значение счетчика
        return d

На практике приведенный выше код можно написать "короче", объединяя часто используемые операции, например:

    return self.mc.get(self.key).addErrback(handleKeyError)

Практически для каждой конструкции синхронного кода можно найти аналог в асинхронной концепции с Deferred:

  • последовательности синхронных операторов соответствует цепочка callback с асинхронными вызовами;
  • вызову одной подпрграммы с вводом-выводом из другой соответствует возврат Deferred из Deferred (ветвление Deferred);
  • глубокой цепочки вложенности, распространению исключений по стеку соответствует цепочка функций, возвращающие друг другу Deferred;
  • блокам try..except соответствуют обработчики ошибок (errback), которые могут "пробрасывать" исключения дальше, любое исключение в callback переводит выполнение в errback;
  • для "параллельного" выполнения асинхронных операций есть DeferredList.

Нити часто применяются в асинхронных программах для осуществления вычислительных процедур, осуществления блокирующегося

ввода-вывода (когда не существует асинхронного аналога). Всё это легко моделируется в простой модели 'worker', тогда

нет необходимости при грамотной архитектуре в явной синхронизации, при этом всё элегантно включается в общий поток

вычислений с помощью Deferred:


def doCalculation(a, b):
    """
    В этой функции осуществляются вычисления, синхронные операции ввода-вывода,
    не затрагивающие основной поток.
    """

    return a/b


def printResult(result):
    print result


def handleDivisionByZero(failure):
    failure.trap(ZeroDivisionError)

    print "Ooops! Division by zero!"


deferToThread(doCalculation, 3, 2).addCallback(printResult).addCallback(
    lambda _: deferToThread(doCalculation, 3, 0).addErrback(handleDivisionByZero))

В приведенном выше примере функция deferToThread переносит выполнение указанной функции в отдельную нить и возвращает

Deferred, через который будет асинхронно получен результат выполнения функции или исключение, если они будет выброшено.

Первое деление (3/2) выполняется в отдельной нити, затем распечатывается его результат на экран, а затем запускается

еще одно вычисление (3/0), которое генерирует исключение, обрабатываемое функцией handleDivisionByZero.

В одной статье не описать и части того, что хотелось бы сказать о Deferred, мне удалось не написать ни слова о том,

как же они работают. Если успел заинтересовать - читайте материалы ниже, а я обещаю написать еще.

Дополнительные материалы

Comments

Comments powered by Disqus
Contents © 2015 Andrey - Powered by Nikola