ffmpeg, ошибка вида lame: output buffer too small

Для истории, проблема один-в-один как описана в http://drupal.org/node/376012.

FFmpeg, собранный с libmp3lame (для кодирования mp3) версии 3.98, сообщает следующее:

lame: output buffer too small (buffer index: 9404, free bytes: 388)

Он считает это фатальной ошибкой, завершается:

Audio encoding failed

Лечится downgrade libmp3lame до версии 3.97.

Проверено на ArchLinux.

Отдых в ВКС-Кантри (Владимирская область)

[vks]: http://www.95km.ru/

Мы провели 5 дней в доме отдыха [ВКС-Кантри][vks]. В качестве варианта для отдыха рассматривали окрестности Костромы, Ярославля, Липецка, Воронежа, Курска, Орла. В район Костромы-Ярославля не поехали из-за Рыбинского водохранилища, которое поздно освобождается ото льда и сильно понижает температуру в регионе, а температура этой весной и так низкая. В Липецке, Воронеже не нашлось дома отдыха, который бы принял нас с 10-месячной дочкой. Вывод после поисков: очень сложно найти координаты домов отдыха, чаще натыкаешься на сайты-"ловушки" туристических компаний, настоящие телефоны найти сложно. Даже если телефон найден, надеяться, что кто-то ответит не стоит. Такое ощущение, что им клиенты не нужны. Практически случайно наткнулись на [ВКС-Кантри][vks], подкупило наличие достаточно разумного собственного сайта, где есть всё, начиная от цен на номера, заканчивая фотографиями практически каждого номера. Без проблем принимают с маленькими детьми, есть детские кроватки, стульчики в столовой. По телефону администратор отвечала на все вопросы около одиннадцати часов вечера. Забронировали номер с понедельника на неделю.

Из плюсов: приветливый персонал, желание помочь и создать все необходимые условия. Отличная, вкусная еда - шведский стол три раза в день. Разумные цены на номера, дополнительные услуги - прокат, шашлыки. Чистые, приятные номера - полное ощущение, что мы были практически первыми, кто жил в этом номере.

Территории как таковой почти нет: дорожки почти отсутствуют, леса рядом нет, дом отдых расположен рядом с рекой Киржач, петляющей в равнинной местности. На берегу Киржача устроили шашлыки:

Для желающих поехать в помощь трек пути от M7 к дому отдыха:

Скачать трек

В общем и целом нам очень понравилось, немного подвела погода, уехали на два дня пораньше, потому что в снег и дождь делать особо нечего. Опишу пару поездок по окрестностям, которые можно осуществить, стартуя из [ВКС-Кантри][vks].

Суздаль

Доехать до Суздаля очень просто: прямо по M7 до Владимир, заезжаем во Владимир (не на объездную), далее минуя центр налево по северной границе Владимира идет некое подобие владимирской кольцевой автодороги, с которой поворот на Суздаль, до Суздаля ведет замечательная двухполосная дорога, прыгающая вниз-вверх по холмам.

Сам Суздаль производит впечатление странное. Полуразрушенный, загаженный, живущий как будто бы производством медовухи и приемом туристов. Город замер и заморозился лет на сто. Но почему же столько мусора? Почему все разваливается, заборы заваливаются? Убрать мусор, всё подкрасить, помыть, положить новый асфальт. Было бы фантастическое для туристов место. А так - стыдно.

Село Воскресение, место гибели Гагарина, Киржач, Андреевское

Еще один день, галопом по окрестностям. Трек с точками ниже. Село Воскресение - сегодня скорее дачный поселок. От дороги Покров-Киржач к селу ведет замечательная лесная грунтовая дорога, не поехали по ней дальше, но, судя по всему, она достаточно длинная. В самом селе восстанавливают церковь, была она очень красивая:

Дальше по дороге Покров-Киржач село Новоселово, перед началом села необозначенный в этом направлении движения поворот к монументу на месте гибели Гагарина и Серегина. Сам памятник ухоженный, как-то охраняется. Грустное место, самолет разбился в лесу.

Дорога Покров-Киржач неплохого качества, малозагруженная, петляет по лесам и холмам, сплошное удовольствие! Сам Киржач - небольшой городок. В нем есть действующий женский монастырь, несколько церквей. Хороший ресторан "Шерна". В нем чувствуется атмосфера маленького провинциального города, нет ощущения "мертвого" города, как в Суздале.

Последний пункт нашего путешествия - усадьба "Андреевское" Воронцовых и Воронцовых-Дашковых, XVIII век. Для этого возвращаемся на M7, едем до поворота на "детский санаторий", далее по дороге еще около 7 км, на дороге незаметные четыре лежачих полицейских, лучше ехать за местной машиной. В усадьбе сегодня располагается детский санаторий "Болдино". Сама усадьба сохранилась, видимо, практически полностью, церковь, приусадебный парк. Здание обветшало, требуется ремонт или реставрация, еще остался дух дворянской усадьбы, хотя и совсем немного его осталось.

Скачать трек

СпамоБорец: релиз 0.2.0

[СпамоБорец]: http://spam-fighter.ru/ "СпамоБорец"

[СпамоБорец][] - веб-сервис, предоставляющий функции по классификации произвольных текстовых сообщений, и, в частности, выделения спама из общего потока сообщений.

СпамоБорец

В качестве сообщений могут рассматриваться, например, следующие виды общения, которые сегодня есть в социальных сетях (и веб-сайтах, имеющих элементы социальной сети):

  • личные сообщения;
  • чаты;
  • комментарии к произвольным объектам;
  • девизы, сообщения "о себе" и т.п. на страницах профиля пользователя;
  • письма в службу поддержки.

Фильтрация и классификация сообщений основывается на нескольких независимых алгоритмах; результатом классификации может являться классификация как самого сообщения (причём, возможно, по нескольким категориям: спам, флуд, проституция и т.п.), так и классификация отправителей сообщений (как авторизованных, так и неавторизованных, по тем же самым категориям: спамер, флудер и т.п.). Применение классификации к отправителям сообщений позволяет на раннем этапе пресекать попытки спам-рассылок и тому подобных массовых действий на сайте.

Данный релиз является первым публичным релизом проекта [СпамоБорец][]. Качество и полнота функционала соответствует на сегодняшний день альфа-версии.

На сегодня доступен следующий функционал:

  • веб-сервис с JSON-RPC и XML-RPC API (описание API);
  • создание и запуск из отдельного окружения;
  • firewall из правил анализа сообщений;
  • гибкая настройка содержимого сообщений;
  • анализ текста сообщений на флуд;
  • анализ частоты сообщений по тексту и атрибутам автора: логин, IP и т.п.;
  • простая Байесовская модель анализа текста (тестовая);
  • валидация сообщений по длине, наличию атрибутов, регулярному выражению;
  • логирование сообщений, проходящих через сервер;
  • система администрирования и настройки (qooxdoo, см. скриншоты);
  • реализация клиентов API на PHP и тестовый модуль для Wordpress.

Как установить

Нужен python и setuptools:

easy_install spamfighter

После этого выбираем каталог для окружения (например, ~/spamfighter) и запускаем:

spamfighter-create ~/spamfighter

Далее следуем инструкциям на экране. Более подробную информация об установке можно найти в документации по СпамоБорцу.

Что дальше?

Этот пост - уведомление о выходе новой версии, за ним последует более подробный рассказ "для чего это нужно" и "как это использовать". Надеемся на позитивный feedback и участие в жизни [проекта][СпамоБорец].

Deferred: все подробности

В предыдущей статье были описаны основные принципы работы Deferred и его применение в асинхронном программировании. Сегодня мы постараемся рассмотреть в деталях функционирование Deferred и примеры его использования.

Итак, Deferred - это отложенный результат, результат выполнения, который станет известен через некоторое время.

Результатом, хранящимся в Deferred, может быть произвольное значение (успешное выполнение) или ошибка (исключение),

которое произошло в процессе выполнения асинхронной операции. Раз нас интересует результат операции и мы получили

от некоторой асинхронной функции Deferred, мы хотим выполнить действия в тот момент, когда результат выполнения будет

известен. Поэтому Deferred кроме результата хранит еще цепочку обработчиков: обработчиков результатов (callback) и

обработчиков ошибок (errback).

Рассмотрим поподробнее цепочку обработчиков:

Deferred

Обработчики располагаются по "слоям" или уровням, выполнение происходит четко по уровням сверху вниз. При этом на каждом

уровне расположены обработчики callback и errback, один из элементов может отсутствовать. На каждом уровне может

быть выполнен либо callback, либо errback, но не оба. Выполнение обработчиков происходит только один раз, повторного

входа быть не может.

Функции-обработчики callback являются функциями с одним аргументом - результатом выполнения:

def callback(result): ...

Функции-обработчики errback принимают в качестве параметра исключение, "завернутое" в класс [Failure][failure]:


def errback(failure):
    ...

Выполнение Deferred начинается с того, что в Deferred появляется результат: успешное выполнение или исключение.

В зависимости от результата выбирается соответствующая ветвь обработчиков: callback или errback. После этого происходит поиск

ближайшего уровня обработчиков, в котором существует соответствующий обработчик. В нашем примере на рисунке

был получен успешный результат выполнения и результат был передан обработчику callback1.

Дальнейшее выполнение приводит к вызову обработчиков на нижележащих уровнях. Если callback или errback завершается

возвратом значения, которое не является Failure, выполнение считается успешным и полученный результат отправляется

на вход callback-обработчику на следующем уровне. Если же в процессе выполнения обработчика было выкинуто исключение

или возвращено значение типа Failure, управление будет передано errback на следующем уровне, который получит

исключение в качестве параметра.

В нашем примере обработчик callback1 выполнился успешно, его результат был передан обработчику callback2, в котором

было выкинуто исключение, которое привело к переходу к цепочке errback-обработчиков, на третьем уровне обработчик

errback отсутствовал и исключение было передано в errback4, который обработал исключение, вернул успешный результат

выполнения, который теперь является результатом Deferred, однако больше обработчиков нет. Если к Deferred будет

добавлен еще один уровень обработчиков, они смогут получить доступ к этому результату.

Как и все другие объекты Python, объект Deferred живет до тех пор, пока на него есть ссылки из других объектов. Обычно

объект, вернувший Deferred, сохраняет его, т.к. ему надо по завершении асинхронной операции передать в Deferred полученный

результат. Чаще всего другие участники (добавляющие обработчики событий) не сохраняют ссылки на Deferred, таким образом

объект Deferred будет уничтожен по окончании цепочки обработчиков. Если происходит уничтожение Deferred, в котором

осталось необработанное исключение (выполнение завершилось исключением и больше обработчиков нет), на экран печатается

отладочное сообщение с traceback исключения. Эта ситуация аналогична "выскакиванию" необработанного исключения на верхний

уровень в обычной синхронной программе.

Deferred в квадрате

Возвращаемым значением callback и errback может быть также другой Deferred, тогда выполнение цепочки обработчиков

текущего Deferred приостанавливается до окончания цепочки обработчиков вложенного Deferred.

deferred-in-deferred

В приведенном на рисунке примере обработчик callback2 возвращает не обычный результат, а другой Deferred - Deferred2. При

этом выполнение текущего Deferred приостанавливается до получения результата выполнения Deferred2. Результат Deferred2 -

успешный или исключение - становится результатом, передаваемым на следующий уровень обработчиков первого Deferred. В нашем примере

Deferred2 завершился с исключением, которое будет передано на вход обработчику errback2 первого Deferred.

Обработка исключений в errback

Каждый обработчик исключений errback является аналогом блока try..except, а блок except обычно реагирует на некоторый тип

исключений, такое поведение очень просто воспроизвести с помощью [Failure][failure]:


def errback(failure):
    """
    @param failure: ошибка (исключение), завернутое в failure
    @type failure: C{Failure}
    """
    failure.trap(KeyError)

    print "Got key error: %r" % failure.value

    return 0


Метод trap класса Failure проверяет, является ли завернутое в него исключение наследником или непосредственно классом

KeyError. Если это не так, оригинальное исключение снова выбрасывается, прерывая выполнение текущего errback, что приведет

к передаче управления следующему errback в цепочке обработчиков, что имитирует поведение блока except при несоответствии

типа исключения (управление передается следующему блоку). В свойстве value хранится оригинальное исключение, которое

можно использовать для получения дополнительной информации об ошибке.

Необходимо обратить внимание, что обработчик errback должен завершиться одним из двух способов:

  1. Вернуть некоторое значение, которое станет входным значением следующего callback, что по смыслу означает, что исключение было обработано.
  2. Выкинуть оригинальное или новое исключение - исключение не было обработано или было перевыброшено новое исключение, цепочка обработчиков errback продолжается.

Существует и третий вариант - вернуть Deferred, тогда дальнейшее выполнение обработчиков будет зависеть от результата

Deferred.

В нашем примере мы исключение обработали и передали в качестве результата 0 (например, отсутствие некоторого ключа эквивалентно

его нулевому значению).

Готовимся к асинхронности заранее

Как только появляется асинхронность, то есть некоторая функция вместо непосредственного значения возвращает Deferred, асинхронность

начинает распространяться по дереву функций выше, заставляя возвращать Deferred из функций, которые раньше были синхронными (возвращали

результат непосредственно). Рассмотрим условный пример такого превращения:


def f():
    return 33


def g():
    return f()*2

Если по каким-то причинам функция f не сможет вернуть результат сразу, она начнет возвращать Deferred:


def f():
    return Deferred().callback(33)

Но теперь и функция g вынуждена возращать Deferred, зацепляясь за цепочку обработчиков:


def g():
    return f().addCallback(lambda result: result*2)

Аналогичная схема "превращения" происходит и с реальными функциями: мы получаем результаты в виде Deferred от нижележащих в дереве

вызовов функции, навешиваем на их Deferred свои обработчики callback, которые соответствуют старому, синхронному коду нашей функции, если у

нас были обработчики исключений, добавляются и обработчики errback.

На практике лучше сначала выявить те места кода, которые будут асинхронными и будут использовать Deferred, чем переделывать синхронный

код в асинхронный. Асинхронный код начинается с тех вызовов, которые не могут построить результат непосредственно:

  • сетевой ввод-вывод;
  • обращение к сетевым сервисам СУБД, memcached;
  • удаленные вызовы RPC;
  • операции, выполнение которых будет выделено в нить в модели Worker и т.п.

В процессе написания приложения часто ясно, что в данной точке будет асинхронное обращение, но его еще нет (не реализован интерфейс с СУБД,

например). В такой ситуации можно использовать функции defer.success или defer.fail для создания Deferred, в котором уже содержится

результат. Вот как можно короче переписать функцию f:


from twisted.internet import defer



def f():
    return defer.success(33)

Если мы не знаем, будет ли вызываемая функция возвращать результат синхронно или вернет Deferred, и не хотим зависеть от её

поведения, мы можем завернуть обращение к ней в defer.maybeDeferred, которое любой вариант сделает эквивалентным вызову

Deferred:


from twisted.internet import defer



def g():
    return defer.maybeDeferred(f).addCallback(lambda result: result*2)

Такой вариант функции g будет работать как с синхронной, так и с асинхронной f.

Вместо заключения

Рассказывать о Deferred можно еще очень долго, в качестве дополнительного чтения могу опять порекомендовать

список материалов в конце предыдущей статьи.

[failure]: http://twistedmatrix.com/documents/current/api/twisted.python.failure.Failure.html "API reference Failure"

Асинхронное программирование: концепция Deferred

Асинхронная концепция программирования заключается в том, что результат выполнения функции доступен не сразу же, а через некоторое время в виде

некоторого асинхронного (нарушающего обычный порядок выполнения) вызова. Зачем

такое может быть полезно? Рассмотрим несколько примеров.

Первый пример - сетевой сервер, веб-приложение. Чаще всего как таковых

вычислений на процессоре такие приложения не выполняют. Большая часть времени

(реального, не процессорного) тратится на ввод-вывод: чтение запроса от

клиента, обращение к диску за данными, сетевые обращение к другим подсистемам

(БД, кэширующие сервера, RPC и т.п.), запись ответа клиенту. Во время этих

операций ввода-вывода процессор простаивает, его можно загрузить обработкой

запросов других клиентов. Возможны различные способы решить эту задачу: отдельный

процесс на каждое соединение (Apache mpm_prefork, PostgreSQL, PHP FastCGI),

отдельный поток (нить) на каждое соединение или комбинированный вариант

процесс/нить (Apache mpm_worker, MySQL). Подход с использованием процессов или

нитей перекладывает мультиплексирование процессора между обрабатываемыми

соединениями на ОС, при этом расходуется относительно много ресурсов (память,

переключения контекста и т.п.), такой вариант не подходит для обработки

большого количества одновременных соединений, но идеален для ситуации, когда

объем вычислений достаточно высок (например, в СУБД). К плюсам модели нитей и

процессов можно добавить потенциальное использование всех доступных процессоров

в многопроцессорной архитектуре.

Альтернативой является использование однопоточной модели с использованием

примитивов асинхронного ввода-вывода, предоставляемых ОС (select, poll, и

т.п.). При этом объем ресурсов на каждое новое обслуживаемое соединение не

такой большой (новый сокет, какие-то структуры в памяти приложения). Однако

программирование существенно усложняется, т.к. данные из сетевых сокетов

поступают некоторыми "отрывками", причем за один цикл обработки данные поступают от разных

соединений, находящихся в разных состояниях, часть соединений могут быть

входящими от клиентов, часть - исходящими к внешним ресурсам (БД, другой сервер

и т.п.). Для упрощения разработки используются различные концепции: callback,

конечные автоматы и другие. Примеры сетевых серверов, использующих асинхронный

ввод-вывод: nginx, lighttpd,

HAProxy, pgBouncer, и т.д. Именно при такой

однопоточной модели возникает необходимость в асинхронном программировании.

Например, мы хотим выполнить запрос в БД. С точки зрения программы выполнение

запроса - это сетевой ввод-вывод: соединение с сервером, отправка запроса,

ожидание ответа, чтение ответа сервера БД. Поэтому если мы вызываем

функцию "выполнить запрос БД", то она сразу вернуть результат не сможет

(иначе она должна была бы заблокироваться), а вернет лишь нечто, что позволит

впоследствие получить результат запроса или, возможно, ошибку (нет соединения

с сервером, некорректный запрос и т.п.) Этим возвращаемым значением удобно

сделать именно Deferred.

Второй пример связан с разработкой обычных десктопных приложений. Предположим,

мы решили сделать аналог Miranda (QIP, MDC, ...), то есть свой мессенджер.

В интерфейсе программы есть контакт-лист, где можно удалить контакт. Когда

пользователь выбирает это действие, он ожидает что контакт исчезнет на экране

и что он действительно удалится из контакт-листа. На самом деле операция

удаления из серверного контакт-листа опирается на сетевое взаимодействие

с сервером, при этом пользовательский интерфейс не должен быть заблокирован

на время выполнения этой операции, поэтому в любом случае после выполнения

операции потребуется некоторое асинхронное взаимодействие с результатом операции.

Можно использовать механизм сигналов-слотов, callback'ов или что-то еще,

но лучше всего подойдет Deferred: операция удаления из контакт-листа возвращает

Deferred, в котором обратно придет либо положительный результат (всё хорошо),

либо исключение (точная ошибка, которую надо сообщить пользователю): в случае

ошибки контакт надо восстановить контакт в контакт-листе.

Примеры можно приводить долго и много, теперь о том, что же такое Deferred.

Deferred - это сердце framework'а асинхронного сетевого программирования

Twisted в Python. Это простая и стройная концепция, которая

позволяет перевести синхронное программирование в асинхронный код, не изобретая

велосипед для каждой ситуации и обеспечивая высокое качества кода.

Deferred - это просто возвращаемый результат функции, когда этот результат

неизвестен (не был получен, будет получен в другой нити и т.п.) Что мы можем

сделать с Deferred? Мы можем "подвеситься" в цепочку обработчиков, которые

будут вызваны, когда результат будет получен. При этом Deferred может нести не

только положительный результат выполнения, но и исключения, сгенерированные

функцией или обработчиками, есть возможность исключения обработать, перевыкинуть

и т.д. Фактически, для синхронного кода есть более-менее однозначная параллель в

терминах Deferred. Для эффективной разработки с Deferred оказываются полезными

такие возможности языка программирования, как замыкания, лямбда-функци.

Приведем пример синхронного кода и его альтернативу в терминах Deferred:

try: # Скачать по HTTP некоторую страницу page = downloadPage(url) # Распечатать содержимое print page except HTTPError, e: # Произошла ошибка print "An error occured: %s", e

В асинхронном варианте с Deferred он был бы записан следующим образом:


def printContents(contents):
    """
    Callback, при успешном получении текста страницы,
    распечатываем её содержимое.
    """
    print contents


def handleError(failure):
    """
    Errback (обработчик ошибок), просто распечатываем текст ошибки.
    """

    # Мы готовы обработать только HTTPError, остальные исключения
    # "проваливаются" ниже.
    failure.trap(HTTPError)
    # Распечатываем само исключение
    print "An error occured: %s", failure


# Теперь функция выполняется асинхронно и вместо непосредственного

# результата мы получаем Deferred

deferred = downloadPage(url)

# Навешиваем на Deferred-объект обработчики успешных результатов

# и ошибок (callback, errback).

deferred.addCallback(printContents)

deferred.addErrback(handleError)

На практике обычно мы возвращаем Deferred из функций, которые получают

Deferred в процессе своей работы, навешиваем большое количество обработчиков,

обрабатываем исключения, некоторые исключения возвращаем через Deferred (выбрасываем

наверх). В качестве более сложного примера приведем код в асинхронном

варианте для примера атомарного счетчика из статьи про структуры данных в memcached,

здесь мы предполагаем, что доступ к memcached как сетевому сервису идет через Deferred, т.е.

методы класса Memcache возвращают Deferred (который вернет либо результат операции, либо ошибку):


class MCCounter(MemcacheObject):
    def __init__(self, mc, name):
        """
        Конструктор.

        @param name: имя счетчика
        @type name: C{str}
        """
        super(MCCounter, self).__init__(mc)
        self.key = 'counter' + name

    def increment(self, value=1):
        """
        Увеличить значение счетчика на указанное значение.

        @param value: инкремент
        @type value: C{int}
        @return: Deferred, результат операции
        """
        def tryAdd(failure):
            # Обрабатываем только KeyError, всё остальное "вывалится"
            # ниже
            failure.trap(KeyError)

            # Пытаемся создать ключ, если раз его еще нет
            d = self.mc.add(self.key, value, 0)
            # Если вдруг кто-то еще создаст ключ раньше нас,
            # мы это обработаем
            d.addErrback(tryIncr)
            # Возвращаем Deferred, он "вклеивается" в цепочку
            # Deferred, в контексте которого мы выполняемся
            return d

        def tryIncr(failure):
            # Всё аналогично функции tryAdd
            failure.trap(KeyError)

            d = self.mc.incr(self.key, value)
            d.addErrback(tryAdd)
            return d

        # Пытаемся выполнить инкремент, получаем Deferred
        d = self.mc.incr(self.key, value)
        # Обрабатываем ошибку
        d.addErrback(tryAdd)
        # Возвращаем Deferred вызывающему коду, он может тем самым:
        #  а) узнать, когда операция действительно завершится
        #  б) обработать необработанные нами ошибки (например, разрыв соединения)
        return d

    def value(self):
        """
        Получить значение счетчика.

        @return: текущее значение счетчика
        @rtype: C{int}
        @return: Deferred, значение счетчика
        """
        def handleKeyError(failure):
            # Обрабатываем только KeyError
            failure.trap(KeyError)

            # Ключа нет - возвращаем 0, он станет результатом
            # вышележащего Deferred
            return 0

        # Пытаемся получить значение ключа
        d = self.mc.get(self.key)
        # Будем обрабатывать ошибку отсутствия ключа
        d.addErrback(handleKeyError)
        # Возвращаем Deferred, наверх там можно будет повеситься
        # на его callback и получить искомое значение счетчика
        return d

На практике приведенный выше код можно написать "короче", объединяя часто используемые операции, например:

    return self.mc.get(self.key).addErrback(handleKeyError)

Практически для каждой конструкции синхронного кода можно найти аналог в асинхронной концепции с Deferred:

  • последовательности синхронных операторов соответствует цепочка callback с асинхронными вызовами;
  • вызову одной подпрграммы с вводом-выводом из другой соответствует возврат Deferred из Deferred (ветвление Deferred);
  • глубокой цепочки вложенности, распространению исключений по стеку соответствует цепочка функций, возвращающие друг другу Deferred;
  • блокам try..except соответствуют обработчики ошибок (errback), которые могут "пробрасывать" исключения дальше, любое исключение в callback переводит выполнение в errback;
  • для "параллельного" выполнения асинхронных операций есть DeferredList.

Нити часто применяются в асинхронных программах для осуществления вычислительных процедур, осуществления блокирующегося

ввода-вывода (когда не существует асинхронного аналога). Всё это легко моделируется в простой модели 'worker', тогда

нет необходимости при грамотной архитектуре в явной синхронизации, при этом всё элегантно включается в общий поток

вычислений с помощью Deferred:


def doCalculation(a, b):
    """
    В этой функции осуществляются вычисления, синхронные операции ввода-вывода,
    не затрагивающие основной поток.
    """

    return a/b


def printResult(result):
    print result


def handleDivisionByZero(failure):
    failure.trap(ZeroDivisionError)

    print "Ooops! Division by zero!"


deferToThread(doCalculation, 3, 2).addCallback(printResult).addCallback(
    lambda _: deferToThread(doCalculation, 3, 0).addErrback(handleDivisionByZero))

В приведенном выше примере функция deferToThread переносит выполнение указанной функции в отдельную нить и возвращает

Deferred, через который будет асинхронно получен результат выполнения функции или исключение, если они будет выброшено.

Первое деление (3/2) выполняется в отдельной нити, затем распечатывается его результат на экран, а затем запускается

еще одно вычисление (3/0), которое генерирует исключение, обрабатываемое функцией handleDivisionByZero.

В одной статье не описать и части того, что хотелось бы сказать о Deferred, мне удалось не написать ни слова о том,

как же они работают. Если успел заинтересовать - читайте материалы ниже, а я обещаю написать еще.

Дополнительные материалы

Структуры данных в memcached/MemcacheDB

Достаточно часто нам приходится хранить данные в memcached или

MemcacheDB. Это могут быть относительно

простые данные, например, закэшированные выборки из базы данных,

а иногда необходимо хранить и обрабатывать более сложные структуры данных,

которые обновляются одновременно из нескольких процессов, обеспечивать

быстрое чтение данных и т.п. Реализация таких структур данных уже не укладывается

в комбинацию команд memcached get/set. В данной статье будут описаны

способы хранения некоторых структур данных в memcached с примерами кода и описанием

основных идей.

Memcached и MemcacheDB в данной статье рассматриваются

вместе, потому что имеют общий интерфейс доступа и логика работы большей части

структур данных будет одинаковой, далее будем называть их просто "memcached".

Зачем нам нужно хранить структуры данных в memcached? Чаще всего для

распределенного доступа к данным из разных процессов, с разных серверов и т.п.

А иногда для решения задачи хранения данных достаточно интерфейса,

предоставляемого MemcacheDB, и необходимость в использовании СУБД отпадает.

Иногда проект разрабатывается изначально для нераспределенного случая

(работа в рамках одного сервера), однако предполагая будущую необходимость

масштабирования, лучше использовать сразу такие алгоритмы и структуры

данных, которые могут обеспечить легкое масштабирование. Например, даже

если данные будут храниться просто в памяти процесса, но интерфейс к доступа

к ним повторяет семантику memcached, то при переходе к распределенной и

масштабируемой архитектуре достаточно будет заменить обращения к внутреннему

хранилищу на обращения к серверу (или кластеру серверов) memcached.

Примеры кода будут написаны на Python, в них будет использоваться следующий

интерфейс доступа к memcached:

class Memcache(object): def get(self, key): """ Получить значение ключа.

    @param key: ключ
    @type key: C{str}
    @return: значение ключа
    @raises KeyError: ключ отсутствует
    """

def add(self, key, value, expire=0):
    """
    Установить значение ключа, если он еще не существует.

    @param key: ключ
    @type key: C{str}
    @param value: значение ключа
    @param expire: "срок годности" ключа в секундах (0 - бесконечно)
    @type expire: C{int}
    @raises KeyError: ключ уже существует
    """

def incr(self, key, value=1):
    """
    Увеличить на единицу значение ключа.

    @param key: ключ
    @type key: C{str}
    @param value: инкремент
    @type value: C{int}
    @return: новое значение ключа (после увеличения)
    @rtype: C{int}
    @raises KeyError: ключ не существует
    """

def append(self, key, value):
    """
    Добавить суффикс к значению существующего ключа.

    @param key: ключ
    @type key: C{str}
    @param value: суффикс
    @raises KeyError: ключ не существует
    """

def delete(self, key):
    """
    Удалить ключ.

    @param key: ключ
    @type key: C{str}
    """

Все наши структуры данных будут наследниками базового класса следующего вида:


class MemcacheObject(object):
    def __init__(self, mc):
        """"
        Конструктор.

        @param mc: драйвер memcached
        @type mc: L{Memcache}
        """
        self.mc = mc


Блокировка

Задача

Блокировка (mutex, или в данной ситуации скорее spinlock) - это не совсем структура данных,

но она может быть использована как "кирпичик" при построении сложных структур данных в memcached.

Блокировка используется для исключения одновременного доступа к некоторым ключам, возможности

блокировки отсутствуют в API memcached, поэтому она эмулируется с помощью других операций.

Итак, блокировка определяется своим именем, работает распределённо (то есть из любого процесса,

имеющего доступ к memcached), имеет автоматическое "умирание" (на случай, если процесс, поставивший

блокировку, не сможет её снять, например, по причине аварийного завершения).

Операции над блокировкой:

  • попробовать заблокироваться (try-lock);
  • разблокировать (unlock).

Решение


class MCLock(MemcacheObject):
    def __init__(self, mc, name, timeout=5):
        """
        Конструктор.

        @param name: имя блокировки
        @type name: C{str}
        @param timeout: таймаут аварийного снятия блокировки (секунды)
        @type timeout: C{int}
        """
        super(MCLock, self).__init__(mc)
        self.key = 'lock_' + name
        self.locked = False
        self.timeout = timeout

    def try_lock(self):
        """
        Попробовать заблокироваться.

        @return: удалось ли заблокироваться?
        @rtype: C{bool}
        """
        assert not self.locked
        try:
            self.mc.add(self.key, 1, self.timeout)
        except KeyError:
            return False

        self.locked = True
        return True

    def unlock(self):
        """
        Снять блокировку.
        """
        assert self.locked
        self.mc.delete(self.key)
        self.locked = False


Обсуждение

Корректность работы блокировки основывается на операции add, которая гарантирует что ровно один процесс

сможет "первым" установить значение ключа, все остальные процессы получат ошибку. Приведенный выше класс может

быть обернут более удобно, например, для with-обработчика Python.

Более подробно вопрос блокировок обсуждался в посте про

одновременное перестроение кэшей.

Атомарный счетчик

Задача

Необходимо вычислять количество событий, которые происходят распределённо на разных

серверах, также получать текущее значение счетчика. При этом счетчик не должен "терять"

события и начислять "лишние" значения.

Тип данных: целое число.

Начальное значение: ноль.

Операции:

  • увеличить значение счетчика на указанное значение;
  • получить текущее значение счетчика.

Решение


class MCCounter(MemcacheObject):
    def __init__(self, mc, name):
        """
        Конструктор.

        @param name: имя счетчика
        @type name: C{str}
        """
        super(MCCounter, self).__init__(mc)
        self.key = 'counter' + name

    def increment(self, value=1):
        """
        Увеличить значение счетчика на указанное значение.

        @param value: инкремент
        @type value: C{int}
        """
        while True:
            try:
                self.mc.incr(self.key, value)
                return
            except KeyError:
                pass

            try:
                self.mc.add(self.key, value, 0)
                return
            except KeyError:
                pass

    def value(self):
        """
        Получить значение счетчика.

        @return: текущее значение счетчика
        @rtype: C{int}
        """
        try:
            return self.mc.get(self.key)
        except KeyError:
            return 0


Обсуждение

Реализация счетчика достаточно очевидная, самый сложный момент - это "вечный" цикл в методе increment.

Он необходим для корректной инициализации значения счетчика в условиях конкурентного доступа к нему.

Если операция incr завершилась ошибкой отсутствия ключа, нам необходимо создать ключ счетчика, но этот

код могут одновременно выполнять несколько процессов, тогда одному из них удастся add, а второй получит ошибку

и инкрементирует уже созданный счетчик с помощью incr. Зацикливание невозможно, т.к. одна из операций incr или

add должна быть успешной: после создания ключа в memcached операция incr будет успешной всё время существования ключа.

Как быть с ситуацией, когда ключ со счетчиком из memcached пропадет? Возможны две ситуации, когда ключ исчезнет:

  1. memcached не хватает памяти для размещения других ключей;
  2. процесс memcached аварийно завершился (сервер "упал").

(В случае MemcacheDB, настроенной репликации и т.п. падение сервера не должно приводить к потере ключей.)

В первом случае надо лишь правильно обслуживать memcached - памяти должно хватать для счетчиков, иначе мы начинаем

терять значения, а это неприемлемо в данной ситуации (но совершенно нормально, например, для хранения кэшей в memcached).

Второй случай всегда возможен, если такая ситуация часто возникает, можно дублировать счетчики в нескольких

memcached-серверах.

Счетчик посетителей

Задача

Необходимо вычислить количество уникальных обращений к сайту в течение некоторого периода T. Например, T может быть

равно 5 минутам. Пусть мы можем сказать, когда было последнее обращение данного посетителя к сайту - более T минут

назад или менее (то есть является ли оно уникальным в течение периода T).

Операции над счетчиком:

  • увеличить значение счетчика на единицу (обращение уникального в течение периода T посетителя);
  • получить текущее число посетителей.

Решение


def time():
    """
    Текущее время в секундах с любого момента времени (например, UNIX Epoch).

    @return: текущее время в секундах
    @rtype: C{int}
    """


class MCVisitorCounter(MemcacheObject):
    def __init__(self, mc, name, T):
        """
        Конструктор.

        @param name: имя счетчика
        @type name: C{str}
        @param T: период счетчика, секунды
        @type T: C{int}
        """
        super(MCVisitorCounter, self).__init__(mc)
        self.keys = ('counter' + name + '_0', 'counter' + name + '_1')

    def _active(self):
        """
        Получить индекс текущего счетчика.

        @return: 0 или 1
        """
        return 0 if (time() % (2*T)) 



### Обсуждение



Работа счетчика посетителей



Основная структура работы со счетчиком в memcached в данной задаче повторяет решение задачи атомарного счетчика.

Базовая идея - использование теневого и текущего счетчика: теневой счетчик  получает обновления (увеличивается), а 

текущий используется для получения значения числа посетителей (его значение было накоплено, когда она был теневым).

Каждый период T текущий и теневой счетчик меняются местами. Ключ теневого счетчика создается со 

сроком годности 2*T секунд, то есть его время жизни - T секунд, пока он был теневым (и увеличивался), и Т секунд, пока

его значение использовалось для чтения. К моменту того, как этот ключ снова станет теневым счетчиком, он должен

быть уничтожен memcached, т.к. срок его годности истек. Идея теневого и текущего счетчика напоминает, например,

двойную буферизацию при выводе на экран в играх.



Необходимо обратить внимание на функцию `_active()`: она реализована именно через вычисление остатка от деления

текущего времени в секундах, а не, например, через периодическое (раз в Т секунд) смену значения `active`, т.к.

если время на всех серверах синхронизировано с разумной точностью, то и результат функции `_active()` будет на

всех серверах одинаковым, что важно для корректного распределенного функционирования данной структуры данных.



Описанный подход будет работать корректно только при достаточно большом количестве посетителей, когда временной интервал между

очередной сменой значения функции `_active()` и обращением к функции `increment()` будет как можно меньше. То есть

когда `increment()` вызывается часто, то есть когда посетителей достаточно много, например, 1000 человек при периоде Т=3 минуты. Иначе

создание и уничтожение ключей не будет синхронизировано с моментом смены значения `_active()` и начнут пропадать 

посетители, накопленные в теневом счетчике, который будет уничтожен в процессе накопления значений и 

создан заново. В этой ситуации его время жизни 2*T будет "сдвинуто" относительно моментов 0, T функции

time() % (2*T). В силу того, что memcached рассматривает срок годности ключей дискретно относительно секунд (с точностью не более одной секунды), 

любой сдвиг времени жизни ключа может составлять 1,2,...,T-1 секунду (отсутствие сдвига - 0 секунд). Дискретность

срока годности означает, что если ключ был создан в 25 секунд 31 миллисекунду со сроком годности 10 секунд,

то он будет уничтожен на 35-й (или 36-й) секунде 0 миллисекунд (а не 31-й миллисекунде). Для компенсация сдвига

в целое число секунд необходимо исправить строчку 43 примера следующим образом:



                self.mc.add(self.keys[1-active], 1, 2*T - time() % T)
Значение счетчика посетителей не изменяется в течение периода времени T, для более приятного отображения можно к значению счетчика при выводе добавить нормально распределенную величину с небольшой дисперсией и математическим ожиданием около 5% от значения счетчика. Немного более сложный вариант такого счетчика (с интерполяцией во времени), но с точно такой же идеей был описан в посте про [атомарность операции и счетчики в memcached](http://www.smira.ru/2008/10/24/web-caching-memcached-3). ## Лог событий ### Задача Задача этой структуры данных - хранение событий, произошедших в распределенной системе за последние T секунд. Каждое событие имеет момент времени, когда оно произошло, остальное содержимое события определяется логикой приложения. Операции над логом событий: * добавить сообщение в лог событий (должна быть максимально быстрой); * получить события, произошедшие в период времени от Tmin до Tmax (должна быть эффективной, но вызывается реже, чем добавление); ### Решение

def time():
    """
    Текущее время в секундах с любого момента времени (например, UNIX Epoch).

    @return: текущее время в секундах
    @rtype: C{int}
    """


class Event:
    """
    Событие, помещаемое в лог событий.
    """

    def when(self):
        """
        Момент времени, когда произошло событие (секунды).
        """

    def serialize(self):
        """
        Сериализовать событие.

        @return: сериализованное представление
        @rtype: C{str}
        """

    @static
    def deserialize(serialized):
        """
        Десериализовать набор событий.

        @param serialized: сериализованное представление одного 
                           или нескольких событий
        @type serialized: C{str}
        @return: массив десериализованных событий
        @rtype: C{list(Event)}
        """


class MCEventLog(MemcacheObject):
    def __init__(self, mc, name, timeChunk=10, numChunks=10):
        """
        Конструктор.

        @param name: имя лога событий
        @type name: C{str}
        @param timeChunk: емкость одного ключа лога в секундах
        @type timeChunk: C{int}
        @param numChunks: число выделяемых ключей под лог
        @type numChunks: C{int}
        """
        super(MCEventLog, self).__init__(mc)
        self.keyTemplate = 'messagelog' + name + '_%d';
        self.timeChunk = timeChunk
        self.numChunks = numChunks

    def put(self, event):
        """
        Поместить событие в лог.

        @param event: событие
        @type event: L{Event}
        """
        serialized = event.serialize()
        key = self.keyTemplate % (event.when() // self.timeChunk % self.numChunks)

        while True:
            try:
                self.mc.append(key, serialized)
                return
            except KeyError:
                pass

            try:
                self.mc.add(key, serialized, self.timeChunk * (self.numChunks-1))
                return
            except KeyError:
                pass

    def fetch(self, first=None, last=None):
        """
        Получить события из лога за указанный период (или все события).

        @param first: минимальное время возвращаемого сообщения
        @type first: C{int}
        @param last: максимальное время возвращаемого сообщения
        @type last: C{int}
        @return: массив событий
        @rtype: C{list(Event)}
        """
        if last is None or last > time():
            last = time()

        if first is None or last  self.timeChunk * (self.numChunks-1):
            first = time() - self.timeChunk * (self.numChunks-1)

        firstKey = first / self.timeChunk % self.numChunks
        lastKey = last / self.timeChunk % self.numChunks

        if firstKey = first and 
                                                e.when() 



### Обсуждение



Основная идея лога событий - кольцевой буфер, состоящий из `numChunks` ключей в memcached.

Каждый ключ активен (то есть дополняется значениями) в течение `timeChunk` секунд, после

чего активным становится следующий ключ (если активным был последний ключ, эта роль

переходит к первому ключу). Полный цикл буфера, т.е. период времени между двумя использованиями

одного ключа составляет `numChunks * timeChunk` секунд, а время жизни каждого ключа - 

`(numChunks - 1) * timeChunk` секунд, таким образом при любом сдвиге времени создания

ключа по модулю `timeChunk` к моменту времени следующего использования ключ гарантированно

будет уничтожен. Таким образом, ёмкость лога событий (или период времени, за который

сохраняются события) составляет `(numChunks - 1) * timeChunk` секунд. Такое разбиение

лога на ключи позволяет при получении событий из лога вынимать лишь

те ключи, которые соответствуют интересному нам временному отрезку.



Выбор параметров

`timeChunk` и `numChunks` зависит от применения лога событий: сначала определяется желаемый срок

хранения событий, затем по частоте событий выбирается такое значение `timeChunk`, чтобы размер

каждого ключа лога событий был относительно небольшим (например, 10-20Кб). Из этих соображений

можно найти значение и второго параметра, `numChunks`.



В примере используется некоторый класс `Event`, который обладает единственным интересным

для нас свойством - временем, когда произошло событие. В методе `put` лога событий предполагается,

что событие `event`, переданное в качестве параметра, произошло "недавно", то есть с момента

`event.when()` прошло не более чем `(numChunks - 1) * timeChunk` секунд (емкость лога). При

работе `put` вычисляется ключ, в который должна быть помещена информация о событии, в соответствие

с его временной меткой. После этого с помощью уже знакомой по предыдущим примерам техники ключ либо создается, либо

к значению уже существующего ключа дописывается сериализованное представление события.



Метод `fetch` вычисляет потенциальный набор ключей лога, в которых могут находиться события,

произошедшие во временной интервал от `first` до `last`. Если временные рамки не заданы,

`last` считается равным текущему моменту времени, а `first` - моменту времени, отстоящему

от текущего на емкость лога. Набор ключей вычисляется с учетом кольцевой структуры метода,

после чего выбираются соответствующие ключи, десериализуются последовательно записанные

в них события и проводится дополнительная фильтрация на попадание в отрезок `[first, last]`.



Приведенная выше сигнатура метода позволяет последовательными обращениями выводить новые

события из лога:


 1. Первый раз вызывается `events = fetch()`. Вычисляется `lastSeen` как `max(events.when())`.
 2. Все последующие обращения выглядят следующим образом: `events = fetch(first=lastSeen)`, при этом
    `lastSeen` каждый раз перевычисляется.


## Массив



### Задача 1



В массиве хранится список значений произвольного типа, относительно редко происходит

обновление списка, гораздо чаще происходит получение списка целиком.



Операции над массивом:

 * изменить массив (редкая операция);
 * получить массив целиком (частая операция).


### Решение 1




def serializeArray(array):
    """
    Сериализовать массив в бинарное представление.
    """


def deserializeArray(str):
    """
    Десериализовать массив из бинарного представления.
    """


class MCArray1(MemcacheObject):
    def __init__(self, mc, name):
        """
        Конструктор.

        @param name: имя массива
        @type name: C{str}
        """
        super(MCArray1, self).__init__(mc)
        self.lock = MCLock(name)
        self.key = 'array' + name

    def fetch(self):
        """
        Получить текущее значение массива.

        @return: массив
        @rtype: C{list}
        """
        try:
            return deserializeArray(self.mc.get(self.key))
        except KeyError:
            return []

    def change(self, add_elems=[], delete_elems=[]):
        """
        Изменить значение массива, добавив или удалив из него
        элементы.

        @param add_elems: элементы, которые надо добавить
        @type add_elems: C{list}
        @param delete_elems: элементы, которые надо удалить
        @type delete_elems: C{list}
        """
        while not self.lock.try_lock():
            pass

        try:
            try:
                array = deserializeArray(self.mc.get(self.key))
            except KeyError:
                array = []
            array = filter(lambda e: e not in delete_elems, array) + add_elems
            self.mc.set(self.key, serializeArray(array), 0)
        finally:
            self.lock.unlock()


Обсуждение 1

Приведенный выше способ решения на самом деле не имеет никакого отношения к массивам, а может

быть применен для любой структуры данных. Он основан на модели reader-writer, когда есть много

читателей и относительно мало писателей. Читатели в любой момент с помощью метода fetch получают

содержимое массива, при этом важно, что "писатель" сhange записывает содержимое одной командой

memcached, то есть в силу внутренней атомарности операций get и set в memcached и несмотря на

отсутствие синхронизации между методами fetch и сhange, результат fetch всегда будет консистентным:

это будет значение до или после очередного изменения. Писатели блокируются от одновременного

изменения массива с помощью блокировки MCLock, описанной выше.

В данной ситуации можно было бы избежать использования блокировки и воспользоваться командами

gets, cas и add из протокола memcached для того, чтобы гарантировать атомарность изменений

с помощью функции change.

Задача 2

Массив хранит список значений некоторого типа, часто происходит операция вида "добавить значение

в массив". Относительно редко массив запрашивается целиком. Для простоты реализации

в дальнейшем будет рассматриваться массив целых чисел, хотя для решения задачи тип данных не имеет

существенного значения.

Операции над массивом:

  • добавить значение в массив (частая операция);
  • получить массив целиком.

Решение 2


def serializeInt(int):
    """
    Сериализовать целое число в бинарное представление (str).
    """


def deserializeIntArray(str):
    """
    Десериализовать массив целых чисел из бинарного представления.
    """


class MCArray2(MemcacheObject):
    def __init__(self, mc, name):
        """
        Конструктор.

        @param name: имя массива
        @type name: C{str}
        """
        super(MCArray2, self).__init__(mc)
        self.key = 'array' + name

    def fetch(self):
        """
        Получить текущее значение массива.

        @return: массив
        @rtype: C{list}
        """
        try:
            return deserializeIntArray(self.mc.get(self.key))
        except KeyError:
            return []

    def add(self, element):
        """
        Добавить элемент в массив.

        @param element: элемент, который необходимо добавить в массив
        @type element: C{int}
        """
        element = serializeInt(element)
        while True:
            try:
                self.mc.append(self.key, element)
            except KeyError:
                return

            try:
                self.mc.add(self.key, element, 0)
            except KeyError:
                return


Обсуждение 2

Эта реализация практически повторяет аналогичный код для лога событий, только упрощенный в силу

наличия всего одного ключа. По сравнению с первым вариантом реализации типа данных "массив" уменьшилось число операций

memcached, все изменяющие массив процессы могут выполняться без задержек (отсутствие блокировок).

Как и в первом варианте, не проверяется наличие дубликатов при добавлении элемента в массив

(может быть и хорошо, и плохо, в зависимости от применения).

Возможны следующие улучшения (или расширения) описанного примера:

  • использование нескольких ключей для хранения массива вместо одного, распределение элементов по ключам с использованием хэширования; такой вариант позволит ограничить размер каждого ключа, при условии что массив большой (содержит много элементов);
  • реализация в том же стиле операции удаления элемента из массива, тогда массив можно представить как последовательность операций "удалить" и "добавить", например сериализованное представление +1 +3 +4 -3 +5 будет после десериализации образовывать массив [1, 4, 5]; при этом как операция добавления элемента, так и удаления, будет приводить к дописыванию байт в конец сериализованного представления (атомарная операция append).

Таблица

Задача

Необходимо хранить множество строк. Операции над множеством:

  • проверка принадлежности строки множеству (самая частая операция);
  • получение множества целиком, добавление элемента, удаление элемента - редкие операции.

Можно рассматривать данную структуру данных как таблицу, в которой осуществляется

быстрый поиск нужной строки. Или как хэш, хранящийся в распределенной памяти.

Решение


def serializeArray(array):
    """
    Сериализовать массив в бинарное представление.
    """


def deserializeArray(str):
    """
    Десериализовать массив из бинарного представления.
    """


class MCTable(MemcacheObject):
    def __init__(self, mc, name):
        """
        Конструктор.

        @param name: имя таблицы
        @type name: C{str}
        """
        super(MCTable, self).__init__(mc)
        self.lock = MCLock(name)
        self.key = 'table' + name

    def has(self, key):
        """
        Проверка наличия ключа в таблице.

        @param key: ключ
        @type key: C{str}
        @rtype: C{bool}
        """
        try:
            self.mc.get(self.key + '_v_' + key)
            return True
        except KeyError:
            return False

    def fetch(self):
        """
        Получить целиком значение элементов таблицы.

        @return: значение таблицы
        @rtype: C{list(str)}
        """
        try:
            return deserializeArray(self.mc.get(self.key + '_keys'))
        except KeyError:
            pass

    def add(self, key):
        """
        Добавить ключ в таблицу.

        @param key: ключ
        @type key: C{str}
        """
        while not self.lock.try_lock():
            pass

        try:
            try:
                array = deserializeArray(self.mc.get(self.key + '_keys'))
            except KeyError:
                array = []
            if key not in array:
                array.append(key)
            self.mc.set(self.key + '_v_' + key, 1, 0)
            self.mc.set(self.key + '_keys', serializeArray(array), 0)
        finally:
            self.lock.unlock()

    def delete(self, key):
        """
        Удалить ключ из таблицы.

        Реализация аналогична методу add().
        """

Обсуждение

Вообще говоря memcached представляет собой огромную хэш-таблицу, правда в ней отсутствует

одна операция, которая необходима для нашей структуры данных: получение списка ключей.

Поэтому реализация таблицы использует отдельные ключи для хранения каждого элемента таблицы,

и отдельно еще один ключ для хранения списка всех её элементов. Реализация хранения списка

всех элементов фактически совпадает с реализацией "массива 1". Для сериализации доступа

к списку всех элементов используется блокировка, при этом методы fetch и add

не синхронизированы друг с другом, т.к. список всех элементов меняется атомарно и при чтении

ключа мы всегда получим некоторое консистентное состояние.

Проверка наличия ключа в таблице выполняется максимально быстро: проверяется наличие соответствующего

ключа в memcached. Любое изменение списка элементов всегда происходит одновременно и в ключе, хранящем

весь список, и в отдельных ключах для каждого элемента (которые используются только для проверки).

На основе приведенной схемы можно реализовать полноценный хэш, когда для каждого элемента

таблицы будет храниться связанное значение, это значение необходимо будет записывать только в

отдельные ключи, соответствующие элементам, а список элементов не будет содержать значений.

Заключение

Итак, приведем список "приемов" или "трюков", описанных в данной статье:

  • атомарность операций с помощью memcached (пара add/set и т.п.);
  • блокировки;
  • теневые ключи;
  • кольцевой буфер с автоматическим "отмиранием" ключей;
  • блокировки и модель reader-writer.

В статье не рассматривались вопросы оптимизации, специфичной для memcached, например,

использование multi-get запросов. Это делалось сознательно, чтобы не перегружать исходный код и рассказ.

Во многих ситуациях приведенные выше примеры следует рассматривать скорее как псевдокод, чем как пример

идеальной реализации на Python.

Если Вы нашли ошибку, хотите предложить более ясное, более оптимальное решение поставленным задачам,

хотите предложить реализацию для какой-то еще структуры данных - буду рад комментариям и критике.

PostgreSQL vs. MySQL

Тема этого поста была навеяна небольшим спором, разразившимся в ЖЖ. Эта битва будет вечной, как и война добра со злом, светлого и темного, FreeBSD и Linux, и т.п. Можно ломать копья, кричать "кто круче", но спор ни к чему не приведет. Можно делать синтетические тесты, пытаясь определить, кто же быстрее, но один тест скажет, что MySQL быстрее на вставках, а другой - что PostgreSQL лучше масштабируется на многоядерных архитектурах. Найдутся рассказы о том, как всё стало классно, когда мы с MySQL перешли на PostgreSQL, найдутся и обратные истории.

Возвращаясь к теме "спора" с Горным (спора в кавычках, т.к. как такого спора не хотелось, да и не получилось): я не хочу никому сказать, что PostgreSQL - круче или что MySQL - полный отстой. Согласно опыту gornal, проект на PostgreSQL сделать нельзя и поэтому он ужасен, а аргументов вобщем-то других нет, я привел некоторые аргументы в чем PostgreSQL может быть лучше. Но не в этом дело. Совсем не в этом.

Господа, на PostgreSQL можно делать успешные проекты. Это классная, современная СУБД, она не "тормозит", не бойтесь её. Она действительно работает, легко администрируется, устанавливается и т.п. К ней есть куча интереснейших расширений. Если Вы изучали SQL по мануалу MySQL, она Вам ничего не даст, но если Вы читали, например, Дейта или слушали курс по реляционной алгебре и о реляционных СУБД - в PostgreSQL Вы сможете воплотить БД своей мечты и оптимизировать её столько, сколько Вам захочется. Не бойтесь! Попробуйте, может, Вам понравится. Не понравится - есть MySQL, Informix, Oracle, DB2, Firebird и т.п. Но бояться не надо, она не тормозит, честное слово ;)

Всегда будут люди, которым нравятся какие-то продукты, например, ораклоид с пеной у рта будет доказывать, что кроме Oracle нет нормальных СУБД. Gornal скажет, что только на MySQL можно сделать успешный проект. Я люблю PostgreSQL и буду пользоваться им, покуда он будет подходить для тех задач, которые я решаю.

Не верьте тем, кто говорит, что PostgreSQL - отстой, попробуйте сами. Найдите для себя идеальную СУБД, попробуйте её возможности. Я использую PostgreSQL уже около четырех лет, но до сих пор открываю для себя что-то новое, есть специалисты по PostgreSQL, они знают огромное количество еще более интересного.

Напоследок: есть успешные проекты на PostgreSQL в мире веба и около него. Это Skype, Мой Круг и другие. И в сердце Smotri.Com как пламенный мотор трудится PostgreSQL, вся БД обслуживается по сути пятью серверами, один мастер и четыре слейва (Slony). Это самая простая на свете конструкция, её практически не оптимизировали с точки зрения специфики PostgreSQL. Я оцениваю возможности оптимизации в 2-3 раза без изменения логики приложения на сегодня. Дальнейшее возможно с изменением логики (например, шардинг или отгрузка части данных в MemcacheDB или что-то подобное).

Напоследок, российская специфика PostgreSQL:

МТС и "долг" в тысячу рублей

Эта история грустная, но со счастливым концом. И для начала хочется сказать спасибо компании МТС, что ситуация разрешилась в лучшую сторону. Итак, всё по порядку...

Обычным письмом приходит извещение от компании МТС, что на лицевом счету XXXXYYYY имеется непогашенная задолженность, что компания МТС скоро подаст в суд а данные мои передаст в бюро кредитных историй в качестве злостного неплательщика. Я удивился, пошел в Интернет-помощника, выяснил какой телефонный номер привязан к этому лицевому счету, на нем нет задолженности (судя по балансу), да и не было - регулярные платежи каждый месяц или иногда даже чаще.

Звонок в службу поддержки, переводят меня на "финансовый департамент", там милая девушка объясняет, что у каждого абонента имеется два баланса: один, который видит он, и, второй, "бухгалтерский", который он не видит. Так вот, данные балансы разошлись, и, как легко догадаться, на внутреннем балансе числится долг в 1000 рублей, а на внешнем что-то типа +100 рублей (т.е. разница больше чем 1000 рублей). Она сообщила, что такое "бывает", они будут разбираться и через неделю сообщат результаты.

Прошла неделя... Баланс счета (тот, который вижу я) стал -1000 рублей -- надо думать, разобрались! Звоню, девушка думает и сообщает, что три года назад в 2005 году что-то не было засчитано правильно и с тех пор (sic!) за мной висит долг в 1000 рублей. Я спросил девушку, а как я могу это проверить или увидеть, потому что мне вобщем-то тяжело понять, как такое могло быть? Мне кажется, что она хотела сделать так, чтобы я больше не приставал с вопросами, и сказала, что я могу это увидеть в счетах, которые мне выставлялись за тот период (2005 год). Самым замечательным оказалось то, что как раз в то время я стал пользоваться Gmail, и одно из первых писем, сохраненных в моем ящике - это ежемесячные счета от МТС. Как раз в тот период на этом лицевом счете случился какой-то глюк биллинга МТС, который они исправляли около недели, разделяли счета, и т.п., и я даже был готов легко поверить, что именно тогда и могла образоваться такая задолженность. Я взглянул на счета за три месяца того периода, в них всё сходилось - баланс счета, платежи, расходы. Я снова позвонил в финансовую службу и спросил, как мне передать им эти счета, чтобы они мне показали, где же именно я оказался должен? Девушка мне предложила отправить письмо на адрес info@mts.ru, что я и сделал:

Две недели назад мне пришло письмо о имеющейся задолженности по
лицевому счету 2XXXXXXXXXX.
При этом баланс счета в интернет-помощнике, например, был положительным.
Я обратился в контактный центр компании МТС, в ситуации пообещали разобраться.

Сегодня я обратился вновь, т.к. не так и не получил ответа. Мне
сообщили, что с моим счетом разобрались и выявлена задолженность в
сумме около 40 у.е., которая возникла за период октября-ноября 2005
года (даты не помню точно, но у вас есть эта информация). И что
начиная с декабря 2005 года между балансом в автоинформаторе и
балансом в счетах существовал разрыв в 40 у.е., что я могу увидеть на
счетах, присланных мне за этот период. Я нашел счета за указанный
период, которые были мне присланы в то время.

К письму прилагаю счета за октябрь-ноябрь-декабрь 2005 года. В них все
суммы сходятся и задолженности нет, как и не было в последующих
периодах. Прошу разъяснить, откуда возникла данная задолженность, и
каким образом я мог быть о ней извещен через счета.

Также в письме, которое пришло 2 недели назад, компания МТС сообщала,
что мои данные могут быть переданы в бюро кредитных историй как
злостного неплательщика. Я бы хотел получить информацию о том,
произошло ли это действие, и гарантии того, что не произойдет в данной
ситуации, т.к. и по счетам, которые я получал, и по информации
автоинформатора мой счет не имел задолженности.

Если информация о задолженности не подтвердится, прошу вернуть сумму
на мой лицевой счет.

Никакого ответа я не получил (даже ответа вида "ваше сообщение в службу поддержки принято, его номер "2222", к которому я привык даже в службах поддержки небольших компаний). Здесь же письмо ухнуло как в черную дыру. Через день я снова позвонил в контактный центр МТС и спросил девушку, когда мне стоит ждать ответа на обращение, на что она мне сообщила, что если это просто вопрос, то две недели, а если претензия - 45 дней. Я спросил девушку, а как определить, у меня претензия или вопрос? Ответа не получил. Я спросил, а 45 дней - этот срок чем установлен? Она засомневалась, пошла советоваться со старшей. Вернулась, и сообщила, что "установлен законом". Я спросил: "Каким?" На что я получил ответ, потрясший меня до глубины души: "Конституцией". Я на всякий случай уточнил: "Конституцией РФ?" На что получил утвердительный ответ. Вот такие люди работают в службе поддержки :) В конце концов я получил совет отправить еще одно письмо на info@mts.ru, чтобы узнать срок рассмотрения, ок, я отправил.

Через два дня я получил ответ, что не позднее недели, а еще через два и ответ на моё первое письмо:

Благодарим Вас за пользование услугами компании МТС и в ответ на
Ваше обращение сообщаем, что на Вашем абонентском номере 916XXXXXXX
наблюдалось несовпадение реального баланса (ежемесячные счета на
услуги связи) и баланса, запрашиваемого  Вами по данному номеру.
Данная ситуация находилась под контролем у сотрудников технической
службы МТС и была устранена  05.11.2008г., после чего баланс Вашего
лицевого счета стал отрицательным и составил 1001,72 руб.
Мы сожалеем о возникшем недоразумении. Вследствие доставленных Вам
неудобств Компанией МТС принято решение о зачислении на Ваш лицевой
счет денежных средств в сумме 1200 руб., что соответствует размеру
уменьшения баланса. Указанная сумма зачислена на Ваш лицевой счет 10.11.2008г.
Заказав ежемесячные счета за период с 01.09.2008г. по 31.10.2008г.,
а в последующем и за ноябрь месяц, Вы сможете убедиться в том, что
на Вашем лицевом счете не совпадали реальный и запрашиваемый балансы.
В счетах, полученных Вами за 2005г., Вам представлена информация о
реальном балансе, но Вы, к сожалению, за давностью времени, не
можете ее сравнить с запрашиваемым Вами балансом на тот период
времени. Исходя из этого, Вы не можете увидеть несовпадение
реального баланса и баланса, запрашиваемого Вами.
Надеемся на Ваше понимание и дальнейшее сотрудничество.

Результат - я ничего не понял, где баланс не сходился - в счетах он сходился, и в последних тоже, но не суть, может я правда чего-то не увидел, я не специалист. Деньги вернулись на счет, всё хорошо.

Дело не в деньгах, 1000 рублей - это еще не такая большая сумма, я готов заплатить, даже если я ничего не был должен. Обидно попасть в "бюро кредитных историй", на этот вопрос, кстати, я ответа не получил до сих пор. Я понимаю, что письма неплательщикам рассылаются шаблонные, но вряд ли меня можно в полной мере считать таковым. Что меня несколько пугает - это то, что по сути любой сотовый оператор (и не только) может в какой-то момент сообщить, что у "вас имеется задолженность в сумме YYYYY рублей". При этом количество нулей в этой сумме может быть практически произвольным. Расчеты с компанией у меня авансовые, то есть если образовалась задолженнность в 2005 году, то она не могла на самом деле тянуться два года (как не могла и в случае кредитных расчетов). А, самое главное, что если не эти копии счетов, сохранившиеся у меня почти случайно - ничего ведь доказать я не могу. Сумму поступлений на счет можно доказать чеками и т.п., но я никогда их не хранил, сумма расходов - только в биллинге оператора, и, по-хорошему, оператор в любой момент может увеличить сумму моих расходов за произвольный период или уменьшить сумму платежей - и доказать что-либо мне как клиенту будет очень сложно, а для суда они смогут сделать сколько угодно счетов с печатями, подтверждающими мой долг. А мне предоставить нечего, кроме электронных писем со счетами.

Вывод - хранить счета от оператора вечно. Иначе потом ничего никому не докажешь.

Еще раз спасибо МТСу, что деньги вернули, я так и не понял, был я должен их или нет, но если и был, то я этого не знал по вине их же биллинговой системы.

D. J. Bernstein

Кто такой D. J. Bernstein или просто djb? С моей точки зрения, в первую очередь, - это интереснейший человек. Людям из мира web он больше всего известен как автор интересных, безопасных и быстрых программ: qmail и djbdns. Эти программы обладают архитектурой, непохожей на все привычные серверные решения: Apache, BIND, postfix и т.п. Они пропитаны духом и философией UNIX, не демоны, пожирающие мегабайты памяти и решающие тысячи задач, а простые, маленькие программы, простые форматы файлов, четко определенные цели и взаимодействия, всё написано одновременно просто и в тоже время надежно.

А кто в курсе, что D. J. Bernstein еще и математик? Что он является профессором и преподает в University of Illinois at Chicago, опубликовал большое количество статей, каждая из которых производит впечатление выверенной, педантичной, качественной работы, какой и должна быть математическая статья. В основном статьи так или иначе посвящены теме криптографии, и, особенно, вопросам "быстрой" криптографии, когда криптографический алгоритм может быть использован на маломощных устройствах или для обработки больших потоков данных.

DJB не является профессиональным юристом, однако он "засудил" правительство США (дело Bernstein v. United States) по поводу экспортных ограничений на системы безопасности (и, в частности, криптографии). Bernstein выступает против патентов на алгоритмы и идеи, пытаясь доказать несостоятельность существующих патентов (например, по причине того, что до подачи патентной заявки идеи были опубликованы). DJB и против попыток издательств получить экслюзивные права на научные публикации, чтобы потом по сути органичить доступ к ним, сделав его платным.

DJB дотошен, я бы сказал, что он "зануда". Он может потратить не один день на рассказ о том, как распределяются и крадутся деньги грантов в его университете UIC (ничего не напоминает?). При этом он пытается бороться с этой системой. Вряд ли его "любят", потому что он говорит многие вещи прямо. Он может свободно осуждать Paul Vixie, ISC, Nominum и других за реализацию BIND, по сути говоря о том, что они не могут написать хорошее программное обеспечение, "трясут" деньги за поддержку. При этом в BIND постоянно обнаруживаются проблемы безопасности, а его авторы предлагают заплатить деньги, чтобы узнать о них и получть патч, сознательно задерживая выход исправленной версии. DJB расскажет историю своей поездки на конференцию в Санкт-Петербург в стиле "и по улице у них бродят медведи". Он раскритикует IPv6 и планы его внедрения (с моей точки зрения, совершенно справедливо). DJB расскажет и о том, как надо и как не надо писать статьи.

Иногда хочется спросить, как он всё это успевает? Как удается человеку оставаться таким принципиальным? Как в нём одновременно уживается талант математика, профессора, программиста. Как одновременно он зануден в проведении очередного математического доказательства, в оптимизации на уровне еще одного такта ассемблерного кода для разных арихитектур, в выработке концепции безопасности программного обеспечения. Не знаю как, но, с моей точки зрения, он - выдающийся человек, у которого можно и нужно учиться.

CDN своими руками или раздача видеоконтента

В продолжение темы про доставку видеоконтента: мы обеспечили хранение и обработку контента, как теперь отдать контент таким образом, чтобы он оказался как можно "ближе" к потребителю. Большая часть статьи будет посвящена обобщенному подходу географически распределенной раздачи контента, а в конце в качестве примера он будет применен к доставке видеофайлов и вещаний конечным пользователям.

Необходимость географической распределенности

Кроме самого факта, что контент был доставлен пользователю, мы должны обеспечить качество доставки контента. Для FLV-файла видео это означает, что скорость, с которой он доставляется пользователю, должна быть выше либо равна битрейта потока, иначе видео у пользователя при просмотре будет «затыкаться».

Кроме того, имеет смысл «приблизить» контент к пользователю географически. Это связано с пропускной способностью каналов (отсутствием иногда хороших магистральных каналов), а также с разницей в стоимости локального и внешнего трафика для конечного пользователя (например, в регионах РФ).

Такой шаг необходимо сделать при желании выйти на международный рынок, а также при региональном развитии внутри РФ. Сегодня в регионах очень часто самыми популярными сайтами являются региональные порталы, которые предоставляют различные сервисы, в том числе и сервис видеохостинга, а их популярность обусловлена как стоимостью трафика, так и скоростью доступа/временем отклика. Можно представить, что пользователь готов подождать открытия страницы, загрузки плеера, но тяжело предположить, что пользователь согласится смотреть видео, которые прерывается из-за постоянной буферизации, или смотреть вещание, которое доходит до пользователя в виде слайдшоу (после пропуска пакетов остались только опорные кадры видео).

Таким образом, осознав необходимость географической распределенности для контента, мы покупаем/арендуем сервера в непосредственной близости от потребителя: в Европе, США, Украине, Екатеринбурге и т.д. Что же делать дальше?

Механизм работы

В абстрактном варианте в процессе доставки контента участвуют две сущности: ресурс, наш контент (например, вещание или видео), и посетитель. Необходимо для начала узнать, где находится наш потребитель ресурса – посетитель нашего сайта. Надеяться, что он сам расскажет эту информацию о себе, не приходится, поэтому мы берём IP-адрес посетителя, выполняем поиск в базе данных GeoIP (коих сегодня довольно много, как платных, так и бесплатных), и получаем на выходе информацию о местоположении пользователя: его страну, регион, город, название его провайдера.

Посетитель IP

Ресурс (контент) расположен всегда на конкретном сервере, а сервер имеет своё физическое местоположение, заранее нам известное. Поэтому ресурс через сервер тоже обладает своим географическим местоположением: те же страна, регион, город и т.п. Кроме того, мы можем создавать копии ресурса на специальных зеркалирующих серверах, таким образом один и тот же ресурс будет доступен на нескольких серверах, а значит, в нескольких географических точках.

Ресурс и расположение на серверах

Выбор ближайшего к посетителю ресурса

Теперь у нас есть посетитель, его местоположение, ресурс, который он хочет получить вместе со всеми его копиями и местами их расположения. Теперь необходимо выбрать именно тот ресурс, который ближе всего к пользователю. Как это можно сделать?

Логично было бы посчитать расстояние от посетителя до ресурса и всех его копий и выбрать самый близкую к посетителю копию ресурса. Каким образом задать такое расстояние?

Граф расстояний между городами

Это расстояние не всегда совпадает с расстоянием на карте между двумя точками, и является скорее мерой качества и пропускной способности каналов между регионами, странами или городами (или даже между отдельными провайдерами). В первом приближении достаточно задать расстояние между отдельными странами и городами. Пример такой расстановки расстояний приведен на рисунке выше.

Таким образом, мы получаем взвешенный ориентированный граф, на котором необходимо решить задачу поиска кратчайшего пути (минимизация суммы весов ребер графа, входящих в этот путь). Это классическая задача, которая может быть легко решена за полиномиальное время. Граф хоть и является слабосвязанным, но всё-таки его размеры достаточно велики, поэтому в реальном времени для каждого посетителя выполнять такой расчет не является самым эффективным решением. Но мы можем немного «схитрить»: мы знаем, что при всех поисках кратчайшего пути конечным пунктом пути будут места, где расположены сервера с ресурсами, таким образом, число различных конечный точек искомого пути фиксировано и относительно невелико. Теперь нам достаточно рассчитать и закэшировать, например, в memcached, длины кратчайших путей от всех вершин графа (где могут располагаться посетители нашего сайта) до мест расположения ресурсов.

Уже эта обработанная информация будет использована в реальном времени, когда за ресурсом обратится посетитель. Если мы найдем несколько копий ресурса, на которых будет достигаться кратчайшее расстояние от посетителя, мы выберем любую из копий случайным образом (возможно, в соответствие с весом того сервера, где расположен ресурс).

Копирование ресурса

Схема выбора работает замечательно, когда мы имеем копии ресурса на серверах, разбросанных по всему миру. Однако как эти копии создаются? Было бы неразумно копировать все ресурсы на все сервера, лучше выбирать именно те ресурсы, которые будут востребованы конкретной аудиторией.

Для этого мы предлагаем следующее решение: когда посетитель обращается за ресурсом, всем географическим местам, где мог бы находиться ресурс, но в данный момент не находится, начисляется бонус:

Формула вычисления бонуса

где расстояние – это расстояние от посетителя до данного географического места, а k – некоторый коэффициент, который задает то, насколько быстро ресурс будет перемещен на указанный сервер. Если нет пути от посетителя до данного места (т.е. посетитель далеко), то расстояние будет равно бесконечности, а бонус будет равен нулю. Если же путь существует, то ресурс будет скопирован на сервер в данной точке тем быстрее, чем ближе он к посетителю и чем больше таких посетителей попросят доступ к данному ресурсу.

Как только бонус превышает некоторый порог, осуществляется копирование ресурса на сервер, расположенный в данном географическом месте, и в дело уже вступает алгоритм выбора копии ресурса, описанный выше.

Географически распределенные видеофайлы и вещания

Теперь мы можем применить описанный выше подход к контенту видеохостинга: вещаниям и видео.

Видеофайлы. В этом случае ресурс – это сам файл видео (причём любого типа, как FLV, так, например, и оригинал видео). Копии ресурса – копии файла, находящиеся на зеркалирующих файловых серверов. Обращение к ресурсу – скачивание файла, проигрывание FLV-видео посетителем в Flash Player’е. Копирование ресурса – это просто копирование файла с основного файлового сервера на один из зеркалирующих файловых серверах, расположенных в различных точках.

Вещания. Для вещаний ситуация очень похожа, здесь ресурсом является, конечно же само вещание, расположенное на основном для него сервере вещания (том сервере, куда подключен автор вещания). Копии ресурса – это все ретрансляции данного вещания на других серверах вещания. Обращение к ресурсу – это «вход» в вещание нового посетителя. Копирование ресурса – открытие ретрансляции на сервере вещаний, расположенном в той или иной точке мира.

В случае вещаний интересным является то, что ретрансляция является одновременно способом борьбы с высокой нагрузкой и средством приближения контента к потребителю, то есть ретрансляция осуществляется одновременно в двух плоскостях: с одной стороны, удовлетворить запросы всех пользователей наиболее близко расположенным контентом. А с другой стороны необходимо обеспечить в каждой географической точке такое количество ретрансляций, чтобы нагрузка на сервера вещаний оставалась в норме.

Contents © 2015 Andrey - Powered by Nikola