Airflow + NiFi
Что такое Airflow? Это библиотека (ну или набор библиотек) для разработки, планирования и мониторинга рабочих процессов. Основная особенность Airflow: для описания (разработки) процессов используется код на языке Python. Отсюда вытекает масса преимуществ для организации вашего проекта и разработки: по сути, ваш (например) ETL-проект — это просто Python-проект, и вы можете его организовывать как вам удобно, учитывая особенности инфраструктуры, размер команды и другие требования. Инструментально всё просто. Используйте, например, PyCharm + Git.
Принципы Apache AirFlow
Масштабируемый
Airflow имеет модульную архитектуру и использует очередь сообщений для управления произвольным количеством рабочих процессов. Apache AirFlow готов масштабироваться до бесконечности.
Динамический
Airflow пайплайны определены в Python, что позволяет создавать динамические конвейеры. Это позволяет писать код, который динамически создает конвейеры.
Расширяемый
Легко определяйте свои собственные операторы и расширяйте библиотеки, чтобы соответствовать уровню абстракции, подходящему для вашей среды.
Элегантный
Параметризация встроена в его ядро с использованием мощного механизма шаблонов Jinja.
Преимущества Apache AirFlow
Чистый питон
Нет больше командной строки или черной магии XML! Используйте стандартные функции Python для создания рабочих процессов, включая форматы даты и времени для планирования и циклы для динамического создания задач. Это позволяет сохранять полную гибкость при построении рабочих процессов.
Удобный интерфейс
Контролируйте, планируйте и управляйте рабочими процессами с помощью надежного и современного веб-приложения. Нет необходимости изучать старые cron-подобные интерфейсы. Вы всегда имеете полное представление о статусе и журналах выполненных и текущих задач.
Надежные интеграции
Airflow предоставляет множество операторов plug-and-play, готовых выполнять ваши задачи на Google Cloud Platform, Amazon Web Services, Microsoft Azure и многих других сторонних сервисах. Это позволяет легко применять Airflow к существующей инфраструктуре и расширять возможности технологий следующего поколения.
Прост в использовании
Любой, кто знает Python, может развернуть рабочий процесс. Apache Airflow не ограничивает объем ваших конвейеров; вы можете использовать его для создания моделей машинного обучения, передачи данных, управления инфраструктурой и многого другого.
Open Source
Где бы вы ни захотели поделиться своим улучшением, вы можете сделать это, открыв PR. Вот так просто, никаких барьеров, никаких длительных процедур. У Airflow много активных пользователей, которые охотно делятся своим опытом.
Теперь рассмотрим основные сущности Airflow. Поняв их суть и назначение, вы оптимально организуете архитектуру процессов. Пожалуй, основная сущность — это Directed Acyclic Graph (далее DAG).
DAG
DAG — это некоторое смысловое объединение ваших задач, которые вы хотите выполнить в строго определенной последовательности по определенному расписанию. Airflow представляет удобный web-интерфейс для работы с DAG’ами и другими сущностями:
DAG может выглядеть таким образом:
Разработчик, проектируя DAG, закладывает набор операторов, на которых будут построены задачи внутри DAG’а. Тут мы приходим еще к одной важной сущности: Airflow Operator.
Операторы
Оператор — это сущность, на основании которой создаются экземпляры заданий, где описывается, что будет происходить во время исполнения экземпляра задания. Релизы Airflow с GitHub уже содержат набор операторов, готовых к использованию. Примеры:
- BashOperator — оператор для выполнения bash-команды.
- PythonOperator — оператор для вызова Python-кода.
- EmailOperator — оператор для отправки email’а.
- HTTPOperator — оператор для работы с http-запросами.
- SqlOperator — оператор для выполнения SQL-кода.
- Sensor — оператор ожидания события (наступления нужного времени, появления требуемого файла, строки в базе БД, ответа из API — и т. д., и т. п.).
Есть более специфические операторы: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Вы также можете разрабатывать операторы, ориентируясь на свои особенности, и использовать их в проекте. Например, мы создали MongoDBToHiveViaHdfsTransfer, оператор экспорта документов из MongoDB в Hive, и несколько операторов для работы с ClickHouse: CHLoadFromHiveOperator и CHTableLoaderOperator. По сути, как только в проекте возникает часто используемый код, построенный на базовых операторах, можно задуматься о том, чтобы собрать его в новый оператор. Это упростит дальнейшую разработку, и вы пополните свою библиотеку операторов в проекте.
Далее все эти экземпляры задачек нужно выполнять, и теперь речь пойдет о планировщике.
Планировщик
Планировщик задач в Airflow построен на Celery. Celery — это Python-библиотека, позволяющая организовать очередь плюс асинхронное и распределенное исполнение задач. Со стороны Airflow все задачи делятся на пулы. Пулы создаются вручную. Как правило, их цель — ограничить нагрузку на работу с источником или типизировать задачи внутри DWH. Пулами можно управлять через web-интерфейс:
Пул, заданный на уровне DAG’а, можно переопределить на уровне задачи.
За планировку всех задач в Airflow отвечает отдельный процесс — Scheduler. Собственно, Scheduler занимается всей механикой постановки задачек на исполнение. Задача, прежде чем попасть на исполнение, проходит несколько этапов:
- В DAG’е выполнены предыдущие задачи, новую можно поставить в очередь.
- Очередь сортируется в зависимости от приоритета задач (приоритетами тоже можно управлять), и, если в пуле есть свободный слот, задачу можно взять в работу.
- Если есть свободный worker celery, задача направляется в него; начинается работа, которую вы запрограммировали в задачке, используя тот или иной оператор.
Scheduler работает на множестве всех DAG’ов и всех задач внутри DAG’ов.
Чтобы разобраться в том, как работает Airflow, важно понимать, что такое Execution Date для DAG’а. В Airflow DAG имеет измерение Execution Date, т. е. в зависимости от расписания работы DAG’а создаются экземпляры задачек на каждую Execution Date. И за каждую Execution Date задачи можно выполнить повторно — или, например, DAG может работать одновременно в нескольких Execution Date. Это наглядно отображено здесь:
К сожалению (а может быть, и к счастью: зависит от ситуации), если правится реализация задачки в DAG’е, то выполнение в предыдущих Execution Date пойдет уже с учетом корректировок. Это хорошо, если нужно пересчитать данные в прошлых периодах новым алгоритмом, но плохо, потому что теряется воспроизводимость результата (конечно, никто не мешает вернуть из Git’а нужную версию исходника и разово посчитать то, что нужно, так, как нужно).
Генерация задач
Реализация DAG’а — код на Python, поэтому у нас есть очень удобный способ сократить объем кода при работе, например, с шардированными источниками. Пускай у вас в качестве источника три шарда MySQL, вам нужно слазить в каждый и забрать какие-то данные. Причем независимо и параллельно.
Можно использовать и более сложную генерацию кода, например работать с источниками в виде БД или описывать табличную структуру, алгоритм работы с таблицей и с учетом особенностей инфраструктуры DWH генерировать процесс загрузки N таблиц к вам в хранилище. Или же, например, работу с API, которое не поддерживает работу с параметром в виде списка, вы можете сгенерировать по этому списку N задач в DAG’е, ограничить параллельность запросов в API пулом и выгрести из API необходимые данные. Гибко!
Репозиторий
В Airflow есть свой бекенд-репозиторий, БД (может быть MySQL или Postgres, у нас Postgres), в которой хранятся состояния задач, DAG’ов, настройки соединений, глобальные переменные и т. д., и т. п. Здесь хотелось бы сказать, что репозиторий в Airflow очень простой (около 20 таблиц) и удобный, если вы хотите построить какой-либо свой процесс над ним. Вспоминается 100500 таблиц в репозитории Informatica, которые нужно было долго вкуривать, прежде чем понять, как построить запрос.
Мониторинг
Учитывая простоту репозитория, вы можете сами построить удобный для вас процесс мониторинга задачек. Мы используем блокнот в Zeppelin, где смотрим состояние задач:
Это может быть и web-интерфейс самого Airflow:
Код Airflow открыт, поэтому мы у себя добавили алертинг в Telegram. Каждый работающий инстанс задачи, если происходит ошибка, спамит в группу в Telegram, где состоит вся команда разработки и поддержки.
Получаем через Telegram оперативное реагирование (если такое требуется), через Zeppelin — общую картину по задачам в Airflow.
Итого
Airflow в первую очередь open source, и не нужно ждать от него чудес. Будьте готовы потратить время и силы на то, чтобы выстроить работающее решение. Цель из разряда достижимых, поверьте, оно того стоит. Скорость разработки, гибкость, простота добавления новых процессов — вам понравится. Конечно, нужно уделять много внимания организации проекта, стабильности работы самого Airflow: чудес не бывает.
Сейчас у нас Airflow ежедневно отрабатывает около 6,5 тысячи задач. По характеру они достаточно разные. Есть задачи загрузки данных в основное DWH из множества разных и очень специфических источников, есть задачи расчета витрин внутри основного DWH, есть задачи публикации данных в быстрое DWH, есть много-много разных задач — и Airflow все их пережевывает день за днем. Если же говорить цифрами, то это 2,3 тысячи ELT задач различной сложности внутри DWH (Hadoop), около 2,5 сотен баз данных источников, это команда из 4-ёх ETL разработчиков, которые делятся на ETL процессинг данных в DWH и на ELT процессинг данных внутри DWH и конечно ещё одного админа, который занимается инфраструктурой сервиса.
Планы на будущее
Количество процессов неизбежно растет, и основное, чем мы будем заниматься в части инфраструктуры Airflow, — это масштабирование. Мы хотим построить кластер Airflow, выделить пару ног для worker’ов Celery и сделать дублирующую себя голову с процессами планировки заданий и репозиторием.
Что такое Apache NiFi?
NiFi была создана для обработки потока данных между системами. Проблема грамотного распределения информации возникла с тех пор, как у предприятий появилось более одной системы, где некоторые системы создавали данные, а некоторые - потребляли их.
Некоторые из высокоуровневых проблем потока данных включают:
Отказ систем
Сети выходят из строя, жесткие диски отказывают, программное обеспечение дает сбои, люди совершают ошибки.
Источник данных превышает возможности их обработки
Иногда источник данных может опережать некоторые звенья цепочки обработки или доставки - достаточно одного слабого звена, чтобы возникли проблемы.
Ограничения набора данных
Вы будете неизбежно получать слишком большие, слишком маленькие, слишком быстрые, слишком медленные, поврежденные, неправильные или в неправильном формате данные.
То, что в один день является шумом, на следующий становится сигналом
Приоритеты организации меняются - быстро. Включение новых и изменение существующих потоков должно происходить быстро.
Системы развиваются с разной скоростью
Протоколы и форматы, используемые системой, могут меняться в любое время и часто независимо от окружающих их систем. Поток данных существует для того, чтобы соединить то, что по сути является массово распределенной системой компонентов, которые слабо или совсем не предназначены для совместной работы.
Соответствие нормативным требованиям и безопасность
Законы, правила и политика меняются. Меняются соглашения между бизнесом. Взаимодействие систем и системы с пользователем должно быть безопасным и отказоустойчивым.
Непрерывное совершенствование производства
В лабораторных условиях часто невозможно даже приблизиться к воспроизведению производственных сред.
На протяжении многих лет поток данных был одним из самых сложных компонентов архитектуры. Однако сейчас существует ряд активных и быстро развивающихся направлений, делающих поток данных гораздо более удобным. К ним относятся: сервис-ориентированная архитектура, рост API, Big Data. Кроме того, растет качество данных, необходимых для обеспечения соответствия нормативным требованиям, конфиденциальности и безопасности. Несмотря на появление всех этих новых концепций, модели и потребности потока данных остаются в основном теми же. Основные различия заключаются в масштабах сложности, скорости изменений, необходимых для адаптации, и в том, что при масштабировании проблемные случаи становятся обычным явлением. NiFi создан для того, чтобы помочь решить эти современные проблемы, связанные с потоком данных.
Основные концепции NiFi
Ключевые концепции Apache NiFi
Ключевые концепции Apache NiFi заключаются в следующем:
- Поток: Поток создается для подключения различных процессоров для совместного использования и изменения данных, которые требуются от одного источника данных к другому месту назначения.
- Соединение: Соединение используется для соединения процессоров, которые действуют как очередь для хранения данных в очереди, когда это необходимо. Он также известен как ограниченный буфер в терминах программирования на основе потоков (FBP). Это позволяет нескольким процессам взаимодействовать с разной скоростью.
- Процессоры. Процессор — это модуль Java, который используется либо для извлечения данных из исходной системы, либо для их сохранения в целевой системе. Для добавления атрибута или изменения содержимого в FlowFile можно использовать несколько процессоров. Он отвечает за отправку, слияние, маршрутизацию, преобразование, обработку, создание, разделение и получение потоковых файлов.
- FlowFile: FlowFile — это базовая концепция NiFi, которая представляет собой единый объект данных, выбранных из исходной системы в NiFi. Это позволяет пользователям вносить изменения в Flowfile, когда он перемещается из исходного процессора в место назначения. Различные события, такие как создание, получение, клонирование и т. д., которые выполняются в Flowfile с использованием разных процессоров в потоке.
- Событие: событие представляет собой изменение в Flowfile при обходе потоком NiFi. Такие события отслеживаются в источнике данных.
- Происхождение данных: Происхождение данных — это репозиторий, который позволяет пользователям проверять данные, касающиеся файла Flow, и помогает в устранении неполадок, если возникают какие-либо проблемы при обработке файла Flow.
- Группа процессов: группа процессов представляет собой набор процессов и их соответствующих соединений, которые могут получать данные от входного порта и отправлять их через выходные порты.
Apache NiFi поддерживает широкий спектр протоколов, таких как SFTP, KAFKA, HDFS и т. д., что делает эту платформу более популярной в ИТ-индустрии. Есть так много причин, чтобы выбрать Apache NiFi. Они следующие:
- Apache NiFi помогает организациям интегрировать NiFi в существующую инфраструктуру.
- Он позволяет пользователям использовать функции экосистемы Java и существующие библиотеки.
- Он обеспечивает управление в режиме реального времени, что позволяет пользователю управлять потоком данных между любым источником, процессором и пунктом назначения.
- Он помогает визуализировать DataFlow на уровне предприятия.
- Он помогает агрегировать, преобразовывать, маршрутизировать, извлекать, прослушивать, разделять и перетаскивать поток данных.
- Он позволяет пользователям запускать и останавливать компоненты на индивидуальном и групповом уровнях.
- NiFi позволяет пользователям извлекать данные из различных источников в NiFi и позволяет им создавать потоковые файлы.
- Он предназначен для масштабирования в кластерах, которые обеспечивают гарантированную доставку данных.
- Визуализируйте и отслеживайте производительность и поведение в бюллетене потока, который предлагает встроенную и информативную документацию.
Архитектура Apache NiFi
Архитектура Apache NiFi включает веб-сервер, контроллер потока и процессор, работающий на виртуальной машине Java (JVM). Он имеет три репозитория, такие как репозиторий FlowFile, репозиторий контента и репозиторий происхождения.
- Веб сервер
Веб-сервер используется для размещения API управления и контроля на основе HTTP.
- Контроллер потока
Контроллер потока — это мозг операции. Он предлагает потоки для запуска расширений и управляет расписанием, когда расширения получают ресурсы для запуска.
- Расширения
Несколько типов расширений NiFi определены в других документах. Расширения используются для работы и выполнения в JVM.
- Репозиторий FlowFile
Репозиторий FlowFile включает текущее состояние и атрибут каждого FlowFile, который проходит через поток данных NiFi.
Он отслеживает состояние, которое активно в потоке в данный момент. Стандартным подходом является непрерывный журнал упреждающей записи, который находится в описанном разделе диска.
- Репозиторий контента
Репозиторий контента используется для хранения всех данных, присутствующих в файлах потока. Подход по умолчанию — довольно простой механизм, который хранит блоки данных в файловой системе.
Чтобы уменьшить конкуренцию за любой отдельный том, укажите более одного места хранения файловой системы, чтобы получить разные разделы.
- Репозиторий происхождения
В репозитории происхождения хранятся все данные о событиях происхождения. Конструкцию репозитория можно подключить к реализации по умолчанию, использующей один или несколько томов физических дисков.
Данные о событиях индексируются и доступны для поиска в каждом месте.
Начиная с версии NiFi 1.0, включен шаблон кластеризации с нулевым лидером. Каждый узел в кластере выполняет аналогичные задачи с данными, но работает с другим набором данных.
- Apache Kafka: Введение
- Apache Kafka: основные операции
- Apache Kafka: архитектура кластера
- Apache Kafka: алгоритм установки
- Интеграция Apache Kafka и Spark
- Введение в Apache Kafka
- Что такое Apache Kafka
Истории успеха