Использование открытого исходного кода для интеграции данных и автоматической синхронизации данных
Представляем Вашему вниманию пошаговые инструкции по настройке и запуску Airbyte с открытым исходным кодом для перемещения данных между несколькими источниками и пунктами назначения данных с помощью Apache Airflow.
Apache Airflow и Airbyte - взаимодополняющие инструменты, которые могут быть использованы совместно для эффективной интеграции данных. Airbyte можно использовать для извлечения данных из сотен источников, а также их загрузки в любой из поддерживаемых пунктов назначения. Airflow может быть использован для планирования и оркестрации задач, в том числе, для запуска синхронизаций Airbyte. Сочетание Airflow и Airbyte обеспечивает гибкое и масштабируемое решение для управления интеграцией данных и их обработки.
Благодаря данному руководству Вы сможете с легкостью установить Airbyte Open Source и Apache Airflow, работающие в локальной среде Docker Desktop. После установки Вы настроите простое соединение с Airbyte, а затем создадите ациклический граф (DAG), ориентированный на Airflow, который запускает синхронизацию данных по только что созданному соединению Airbyte, а после этого оркеструет некоторые дополнительные задачи, зависящие от завершения синхронизации данных.
Что такое Apache Airflow?
Apache Airflow - это open-source инструмент-оркестрор, который используется для программного планирования и мониторинга рабочих процессов. Очень часто он используется для управления последовательностью задач, выполняемых конвейером интеграции данных.
С помощью Airflow пользователи могут определять рабочие процессы в виде направленных ациклических графов (DAG), где каждая задача представляет собой отдельную операцию. Задачи могут выполняться параллельно или последовательно, а также могут быть запланированы для выполнения в строго определенное время или в ответ на определенные события.
Союз Airbyte и Apache Airflow
Airflow может выполнять задачи, которые необходимы для запуска синхронизации Airbyte, и/или может использоваться для планирования задач, зависящих от завершения синхронизации Airbyte.
С помощью данного урока Вы создадите простую группу Airflow DAG, которая будет выполнять задачи в Вашей локальной среде и сделает следующее:
- Запустит конвейер Airbyte ELT , который загружает данные из Faker и записывает их в локальную файловую систему;
- Дождется завершения синхронизации ELT-конвейера;
- Проверит, существует ли ожидаемый локальный файл;
- Переименует локальный файл, который был создан в результате синхронизации.
Примечание: цель данного руководства состоит в том, чтобы показать, насколько на самом деле легко настроить Airflow DAG для взаимодействия с Airbyte, а также дать общее представление о возможностях Airflow DAG. Мы воспользуемся достаточно простым примером, который при желании может стать основой для реализации более сложного процесса.
Версии
В будущем возможны определенные изменения в API и/или Airflow, которые смогут сделать некоторые из инструкций, приведенных в этом руководстве, неактуальными. Данное руководство было составлено в феврале 2023 года, при этом использовались следующие инструменты:
- Airbyte OSS 0.40.32
- Docker Desktop v4.10.1
- macOS Monterey 12.5.1
- MacBook Pro
- Airflow v2.5.1 Git Version: .release:2.5.1+49867b660b6231c1319969217bc61917f7cf9829
Установка Airbyte
Если у Вас уже запущена локальная копия Airbyte, тогда смело пропускайте этот раздел.
[Необязательно] Измените BASIC_AUTH_USERNAME и BASIC_AUTH_PASSWORD в (скрытом) файле .env. В этом руководстве я использую следующие значения:
После запуска Airbyte введите в браузере localhost:8000, который предложит Вам ввести имя пользователя и пароль следующим образом:
Создание подключения
Создайте соединение, которое будет отправлять данные из источника Sample Data (Faker) в локальный выход JSON. Нажмите на кнопку "Создать первое соединение", как показано ниже:
Затем появится опция настройки подключения к источнику. Из выпадающего списка выберите источник Faker, как показано ниже.
Выбрав Sample Data в качестве источника данных, Вы увидите экран, который должен выглядеть следующим образом (нажмите на кнопку Set up source (Установить источник), как показано ниже).
Затем подождите несколько секунд, пока источник Sample Data будет проходить проверку, после чего Вам будет предложено настроить пункт назначения, который будет использоваться для подключения. Выберите Local JSON, как показано ниже:
После выбора Local JSON в качестве выходных данных Вам нужно будет указать, куда именно необходимо записывать файлы JSON. По умолчанию «пункт назначения» расположен в каталоге /tmp/airbyte_local. В этом руководстве я задал путь назначения /json_from_faker, что означает, что данные будут скопированы в /tmp/airbyte_local/json_from_faker на localhost, где запущен Airbyte. После указания пути назначения нажмите на кнопку Set up Destination.
Это приведет Вас на страницу настройки соединения. Установите частоту репликации на Manual (так как для запуска синхронизации Airbyte мы будем использовать Airflow, а не планировщик Airbyte), затем нажмите на кнопку Set up connection, как показано на рисунке ниже.
Запустите синхронизацию источника Sample Data (faker) с локальным JSON-выходом, нажав на Sync now, как показано на рисунке ниже.
Синхронизация должна занять всего лишь несколько секунд, после чего Вы увидите, что синхронизация прошла успешно, как показано ниже.
Теперь Вы можете убедиться в том, что некоторые данные были скопированы в нужное место. Как уже говорилось ранее, в этом примере JSON-данные можно увидеть в каталоге /tmp/airbyte_local_json_from_faker. Поскольку было сгенерировано три потока, должны быть доступны следующие три JSON-файла:
Теперь Вы создали простой пример соединения в Airbyte, которое может быть запущено вручную. Ручное соединение идеально подходит для ситуаций, когда Вы хотите использовать внешний оркестратор.
В следующем разделе Вы увидите, как запустить ручную синхронизацию на этом соединении, обратившись непосредственно к конечной точке REST. После этого Вы увидите, как можно использовать Airflow для запуска синхронизации через ту же конечную точку.
Тестирование конечных точек API с помощью cURL
Прежде чем использовать конечную точку REST из Airflow, нужно убедиться в том, что она работает так, как ожидалось. Получите connectionId из URL, отображаемого в браузере, как показано на следующем изображении:
Для проверки работы конечной точки API Airbyte Вы можете использовать. Обязательно обновите connectionID в следующей команде, чтобы отразить значение, полученное из URL выше. Выполните команду REST API следующим образом:
В ответ на команду, приведенную выше, должно появиться следующее сообщение, которое указывает на то, что синхронизация успешно началась:
Синхронизация выполняется каждый раз, когда Вы выполняете команду cURL. В моем случае я выполнил команду дважды с интервалом в минуту, поэтому мой пользовательский интерфейс выглядит следующим образом:
Установка и запуск Airflow
Теперь, когда Вы убедились в том, что конечная точка REST работает как надо, можно начинать работу с Airflow, который для выполнения синхронизации должен вызывать ту же конечную точку Airbyte API. Информация, содержащаяся в данном разделе, основана на запуске Airflow в Docker, и дополнена ЦУ касательного того, как установить провайдер Airbyte.
Создайте новую папку, которая будет использоваться для установки Airflow, и перейдите в нее по cd:
Для запуска Airflow загрузите файл Docker-compose следующим образом:
Затем создайте дополнительные подкаталоги и присвойте им UID Airflow следующим образом:
Для поддержки Airbyte образ Airflow Docker требует наличия функциональности провайдера Airbyte. Первым шагом будет создание файла Dockerfile в папке airflow со следующим содержимым:
Для того, чтобы разрешить сборку нового образа Airflow с помощью только что созданного Dockerfile, Вам следует убрать следующую строку в docker-compose.yaml:
Кроме того, добавьте следующую строку в docker-compose.yaml, чтобы Airflow мог видеть и работать с файлами в локальных папках, в которые будет записываться Airbyte:
Соответствующие изменения в docker-compose.yaml выделены на следующем изображении:
Создайте образ Docker следующим образом:
В результате Вы увидите, что провайдеры Airbyte, указанные в Dockerfile, были установлены, как показано на следующем изображении:
Далее Вы можете инициализировать Airflow следующим образом:
Поздравляю, теперь Вы готовы к запуску Airflow! Для запуска Airflow и связанных с ним контейнеров выполните следующую команду:
Когда контейнеры запущены, Вы можете просмотреть их список с помощью следующей команды:
Рис.26
В моем случае есть несколько контейнеров, работающих для Airbyte, и несколько контейнеров, работающих для Airflow:
Убедитесь, что папки Airbyte видны из Airflow - войдите в контейнер планировщика Airflow с помощью следующей команды:
Внутри этого контейнера Вы должны увидеть папку airbyte_local в каталоге /tmp, как показано ниже.:
Далее Вам необходимо войти в Airflow, для этого установите браузер на localhost:8080:
Как указано в инструкциях к REST API Airflows, имя пользователя по умолчанию – airflow и пароль по умолчанию также airflow. После входа в систему Вы увидите экран, который выглядит следующим образом:
Создание соединений в Airflow
У Airflow есть своя концепция соединений, и мы будем использовать соединение Airflow для запуска синхронизации с помощью соединения Airbyte.
Чтобы продемонстрировать то, как Airflow может выполнять дополнительные задачи, зависящие от завершения синхронизации Airbyte, мы также определим соединение Airflow, которое будет использоваться для доступа и изменения файлов в локальной файловой системе.
Чтобы определить соединение, которое Airflow будет использовать для связи с Airbyte, перейдите в меню admin→connections, как показано ниже:
Затем нажмите на символ +, как показано на рисунке ниже:
Заполните информацию о соединении, которое Airflow будет использовать для подключения к Airbyte, следующим образом и нажмите на кнопку Test.
Параметры подключения:
- Идентификатор подключения: определите идентификатор, который группы DAG Airflow могут использовать для связи с Airbyte. В этом примере идентификатору присвоено имя airflow-call-to-airbyte-example, которое будет использоваться в определении группы DAG (показано позже);
- Тип соединения: указывает на то, что это соединение с Airbyte. Внимание, если Вы не видите Airbyte в выпадающем меню, значит, образ Docker был создан неправильно. Добавление провайдера Airbyte в образ Docker было выполнено ранее в этом руководстве;
- Хост: хост, на котором запущен Airbyte. Обратите внимание на использование host.docker.internal, который «превращается» во внутренний IP-адрес, используемый хостом;
- Логин: по умолчанию для подключения к Airbyte используется пользователь airbyte. Если Вы изменили имя пользователя, тогда используйте то имя пользователя, которое Вы указали;
- Пароль: Вы можете использовать пароль по умолчанию или любой другой пароль, заданный Вами;
- Порт: по умолчанию Airbyte использует порт 8000.
Для того, чтобы вернуться на экран Connections (Соединения), нажмите на Save (Сохранить).
Поскольку группа DAG, которую мы определим, будет работать с файлами, необходимо создать соединение File. Заново нажмите на символ +, как показано ниже:
В результате Вы попадете на экран, который выглядит следующим образом:
Параметры подключения следующие:
- Идентификатор подключения: как упоминалось выше, этот идентификатор будет использоваться в группе DAG для подключения к файловой системе. В нашем примере задано значение airflow-file-connector;
- Тип подключения: выберите Файл (путь). Этот коннектор будет использоваться в группе DAG для взаимодействия с файлами в локальной файловой системе
После сохранения вышеуказанного соединения экран "Подключения" должен выглядеть следующим образом:
Теперь, когда соответствующие соединения Airflow определены, их можно использовать в группе Airflow DAG.
Создание DAG в Airflow
В этом разделе для простой группы DAG я даю код на языке Python, который выполняет следующие задачи:
1. trigger_airbyte: использует AirbyteTriggerSyncOperator для асинхронного запуска Airbyte для выполнения синхронизации от входа Sample Data (Faker) к выходу Local JSON (файл) с помощью соединения Airbyte, которое мы определили выше. Поскольку эта операция выполняется асинхронно, она немедленно возвращается вместе с идентификатором задания, который используется для определения завершения синхронизации;
2. wait_for_sync_completion: использует AirbyteJobSensor для ожидания, пока Airbyte завершит синхронизацию;
3. raw_products_file_sensor: использует FileSensor для подтверждения существования файла, созданного Airbyte. Один из файлов, созданных источником Sample Data (Faker), называется _airbyte_raw_products.jsonl, и эта задача ожидает, что этот файл существует;
4. mv_raw_producs_filet: использует BashOperator для переименования файла сырых продуктов.
Ниже приведен код, демонстрирующий все эти шаги.
Скопируйте этот код в файл example_dag.py в каталоге airflow/dags, который Вы создали ранее. Затем установите AIRBYTE_CONNECTION_ID в значение, которое Вы извлекли из URL-адреса соединения Airbyte ранее.
Предполагается, что в качестве пути к файлам в коннекторе Airbyte, который мы определили ранее, Вы указали /json_from_faker - если это не так, то обновите RAW_PRODUCTS_FILE и COPY_OF_RAW_PRODUCTS в коде для того, чтобы отразить правильный путь.
Для того, чтобы увидеть новую группу DAG, нажмите на DAGs в верхней части экрана, а затем нажмите на кнопку обновления, показанную ниже:
Через некоторое время появится группа DAG, которую Вы только что добавили в папку DAGs. Имя, которое появится, соответствует dag_id, который Вы указали в коде.
В списке групп DAG новая группа будет выглядеть следующим образом:
Просмотр новой группы DAG
Группу DAG, заданную кодом, приведенным выше, можно просмотреть в Airflow, нажав на кнопку Graph, которая выглядит следующим образом:
Выполнение Airflow DAG
Щелкните на только что созданную группу DAG под названием airbyte_example_airflow_dag, выделенную на изображении выше. Это приведет Вас к экрану, содержащему дополнительную информацию о группе DAG. Запустите группу DAG, нажав на кнопку в правом верхнем углу, как показано на следующем изображении:
После запуска группы DAG Вы увидите примерно такой экран:
Каждый раз при выполнении вышеуказанной группы DAG Вы должны видеть связанную с ней синхронизацию в Airbyte:
Наконец, после завершения работы группы DAG Вы можете посмотреть в своей локальной файловой системе файлы, созданные Airbyte, а также файл, который Airflow переименовал из _airbyte_raw_products.jsonl в moved_raw_products.jsonl. Ваша папка /tmp/airbyte_local/json_from_faker должна выглядеть следующим образом:
Заключение
В этой статье мы показали Вам, как можно настроить достаточно простое соединение Airbyte, для которого синхронизации запускается группой Airflow DAG. После завершения каждой синхронизации Airflow проверяет, создан ли необходимый файл, а затем переименовывает его. Хотя задачи, отображенные в данной статье, довольно просты, концепции, которые они отображают, являются по-настоящему мощными! Вы всегда можете расширить функционал, описанный в данном руководстве, для того, чтобы построить сложнейшие конвейеры Airbyte ELT, которые будут оркестрованы Airflow! Удачи!