DAG в Apache AirFlow
Конвейер обработки данных (data pipeline) в Airflow – это просто Python-скрипт, который определяет объект DAG (Directed Acyclic Graph). DAG AirFlow – это цепочка задач для запланированного запуска по расписанию в виде направленного ациклического графа. Проектируя DAG, инженер данных как разработчик Data Flow, определяет набор операторов, с помощью которых будут выполняться входящие в граф задачи.
DAG КАК ФАЙЛ КОНФИГУРАЦИИ
Планировщик Airflow сканирует и компилирует файлы DAG при каждом такте. Это занимает довольно много ресурсов для тяжеловесных файлов с большим количеством верхнеуровневого кода. Поэтому целесообразно делать DAG максимально понятными с т.н. «чистым кодом», что они были похожи на файлы конфигурации. Для этого пригодится YAML/JSON-определение рабочего процесса (workflow), чтобы затем на его основе создавать DAG. Такой подход даст как минимум следующие преимущества:
- DAG, которые создаются автоматически (программным способом), будут согласованными и воспроизводимыми в любое время;
- доступность для пользователей, не работающих с Python.
Более того, блоки кода, не связанные с конфигурацией, можно отделить от определения DAG и использовать атрибут template_searchpath для их добавления. Например, если требуется выполнить некоторый SQL-запрос, подключившись к источнику данных, эта команда SQL должна быть загружена из файла. А расположение этого файла следует указать в template_searchpath. Аналогичное эмпирическое правило подходит и для запросов Hive (.hql).
ИСПОЛЬЗУЙТЕ В СИСТЕМУ ПЛАГИНОВ AIRFLOW
Организуйте качественный репозиторий плагинов и поддерживайте его для создания пользовательских плагинов. Создавайте плагин по единому (универсальному) образцу, чтобы его можно было многократно использовать в разных сценариях использования. Это позволит управлять версиями плагинов, а также поддерживать порядок в рабочих процессах с помощью параметров их конфигурации, а не логики реализации. Вставляйте операции внутри метода выполнения, а не при инициализации класса.
НЕ ВЫПОЛНЯЙТЕ ОБРАБОТКУ ДАННЫХ В ФАЙЛАХ DAG
Поскольку файлы Directed Acyclic Graph представляют собой Python-скрипты, может возникнуть соблазн использовать pandas или аналогичные библиотеки обработки данных. Однако, не стоит это делать: помните, что Airflow – это оркестратор рабочих процессов, а не среда их исполнения. Все вычисления должны выполняться в специализированной целевой системе.
ДЕЛЕГИРУЙТЕ ОПЕРАТОРАМ API-ВЫЗОВЫ И ПОДКЛЮЧЕНИЯ К БАЗАМ ДАННЫХ
Вызов API или соединение с БД, выполненное на верхнем уровне кода в файлах DAG, перегружает веб-сервер. Эти вызовы, определенные вне оператора, вызываются при каждом такте. Поэтому рекомендуется передать их оператору util/common.
СДЕЛАЙТЕ DAG-ФАЙЛЫ И ЗАДАЧИ ИДЕМПОТЕНТНЫМИ
DAG должен выдавать одни и те же данные при каждом запуске, например, чтение из раздела и запись в него, должны быть неизменными. Чтобы избежать внезапных ошибок, создавайте и удаляйте сами разделы, не трогая DAG.
ИСПОЛЬЗУЙТЕ ОДНУ ПЕРЕМЕННУЮ ДЛЯ КАЖДОГО DAG
Каждый раз при обращении к переменным Directed Acyclic Graph, создается соединение с базой данных, чтобы считать метаданные. Это может перегрузить СУБД, особенно при нескольких DAG, каждый из которых вызывает более одной переменной. Поэтому лучше использовать одну переменную для каждого графа с объектом JSON в рамках единого соединения. А, проанализировать этот JSON, можно получить требуемую пару ключ-значение.
МАРКИРУЙТЕ DAG
Наличие тегов помогает фильтровать и группировать Directed Acyclic Graph. Поэтому стоит маркировать цепочки задач в соответствии с системой тегов, характерной для вашей инфраструктуры. Например, теги могут базироваться на проекте, категории приложения и прочих особенностях экосистемы data pipeline’ов, принятых в компании. Также это может помочь в управлении множеством взаимозависимых DAG: об этой проблеме и способах ее решения читайте в нашей новой статье.
НЕ ЗЛОУПОТРЕБЛЯЙТЕ XCOM
Напомним, XCom (cross-communication) в Apache AirFlow используется как канал обмена данными между задачами в одном DAG, с помощью пары ключ-значение и названием задачи-отправителя. XCom создаётся в операторе Python на основании возвращаемого им значения или вручную с помощью функции xcom_push. После выполнения задачи значение сохраняется в контексте, чтобы его через функцию xcom_pull приняла следующая задача в другом Python-операторе или из шаблона jinja внутри любой предобработанной строки. При этом данные хранятся в серверной СУБД метаданных, хотя в Airflow 2.0 сама операция XCom скрыта внутри Python-оператора и полностью абстрагируется от разработчика DAG. Подробнее об этом и других новинках нового релиза Apache AirFlow мы писали здесь. Тем не менее, несмотря на отмеченное улучшение с инкапсуляцией XCom, при передаче большого количества данных между задачами или слишком частого выполнения этой процедуры, серверная СУБД с метаданными будет перегружена.
ИСПОЛЬЗУЙТЕ ПРОМЕЖУТОЧНОЕ ХРАНИЛИЩЕ МЕЖДУ ЗАДАЧАМИ
Чтобы избежать вышеуказанной проблемы с XCom и организовать быстрый обмен большим объемом данных между задачами, имеет смысл сохранить их в промежуточной хранилище. И передавать последующей задаче ссылку на эти данные, не пересылая их самих.
ИСПОЛЬЗУЙТЕ ВОЗМОЖНОСТИ ШАБЛОНОВ JINJA
Airflow использует возможности Jinja Templating и предоставляет разработчику Data Flow готовый набор встроенных параметров и макросов, позволяя также самостоятельно определять их и создавать новые шаблоны. Напомним, Jinja – это язык шаблонов для Python-разработчиков, похожий на шаблоны Django. Он быстрый, популярный и безопасный благодаря дополнительной изолированной среде выполнения шаблонов (песочнице) и автоматической системе экранирования HTML для предотвращения XSS [2].
КОНТРОЛИРУЙТЕ ДОСТУП НА УРОВНЕ DAG
Мы уже упоминали, что в Apache AirFlow 2.0 добавлены новые кластерные политики, которые предоставляют интерфейс для работы с каждой задачей или DAG во время его загрузки, а также непосредственно перед выполнением задачи. Поэтому следует использовать эти возможности по максимуму, определив в пользовательских настройках airflow_local_settings политики dag_policy, task_policy и task_instance_mutation_hook. Разумеется, предварительно следует создать настраиваемую роль – пользователя Linux, который будет выполнять разрешенные действия с графами и задачами.
ЕЩЕ 5 ПОЛЕЗНЫХ ПРАКТИК В РАБОТЕ С AIRFLOW
- Используйте статическую дату начала start_date, чтобы корректно определить расписание запуска DAG
- При глобальных структурных изменениях переименуйте файл с цепочкой задач, создав его новую версию, чтобы сохранить всю историю. При этом в частности и при создании любых файлов вообще придерживайтесь согласованной и понятной структуры их именования и хранения
- Придерживайтесь последовательного метода организации зависимостей между задачами
- Разработайте и внедрите стратегию уведомления о сбоях
- Установите повторные попытки выполнения отказавших задач на уровне DAG