Проект «Инженерия данных» для начинающих — Пакетная обработка данных
1. Введение
Настоящий проект по инженерии данных обычно включает в себя несколько компонентов. Настройка проекта обработки данных с соблюдением лучших практик может занять очень много времени. Если:
- Вы – аналитик данных, студент, ученый или инженер, который хочет получить опыт работы с данными, но не может найти хороший стартовый проект.
- У вас есть желание поработать над проектом по обработке данных, который имитирует реальный проект.
- Вам нужен сквозной проект по обработке данных.
- Ищете хороший проект, чтобы получить опыт работы с данными для дальнейших собеседований.
Тогда этот урок для вас. На этом уроке вы
- Настроете Apache Airflow, AWS EMR, AWS Redshift, AWS Spectrum и AWS S3.
- Изучите лучшие практики конвейера данных.
- Узнаете, как выявлять точки сбоя в конвейерах данных и создавать системы, устойчивые к сбоям.
- Узнаете, как спроектировать и построить конвейер данных на основе бизнес-требований.
Если вас интересует проект потоковой обработки, ознакомьтесь с Проектом инженерии данных для начинающих — Stream Edition
2. Цель
Предположим, вы работаете в компании, которая занимается анализом поведения пользователей, собирает пользовательские данные и создает профиль пользователя. Вам поручено построить конвейер данных для заполнения таблицы user_behavior_metric. Таблица user_behavior_metric — это таблица OLAP, предназначенная для использования аналитиками, программными панелями и т. д. Она построена из
- user_purchase: OLTP-таблицы с информацией о покупках пользователей.
- movie_review.csv: данных, отправляемых каждый день внешним поставщиком данных.
3. Проект
Мы будем использовать Airflow для оркестрации следующих задач:
- Классификация обзоров фильмов с помощью Apache Spark.
- Загрузка классифицированных отзывов о фильмах в хранилище данных.
- Извлечение данных о покупках пользователей из базы данных OLTP и загрузка их в хранилище данных.
- Объединение данных о классификации обзоров фильмов и данных о покупках пользователей для получения параметров пользовательского поведения.
4. Настройка
4.1 Предварительное условие
- Docker с не менее 4 ГБ ОЗУ и Docker Compose v1.27.0 или более поздней версии.
- psql
- AWS account
- AWS CLI installed или configured
Для настройки инфраструктуры и базовых таблиц у нас есть скрипт setup_infra.sh. Его можно запустить в таком виде, как показано ниже.
git clone https://github.com/josephmachado/beginner_de_project.git cd beginner_de_project ./setup_infra.sh {your-bucket-name}
Замените местозаполнитель именем вашего сегмента, например) ./setup_infra.sh sde-sample-bkt. Выбранное вами имя корзины должно быть уникальным. Если какая-либо из команд не прошла, откройте файл setup_infra.sh и вручную запустите неудачные команды.
Сценарий установки займет у вас около 10 минут, чтобы настроить необходимую инфраструктуру. Журналы установки хранятся локально в файле с именем setup.log. Войдите на www.localhost:8080, чтобы увидеть пользовательский интерфейс Airflow. И имя пользователя и пароль – airflow.
Когда вы закончите проект, не забудьте удалить инфраструктуру с помощью скрипта tear_down_infra.sh .
./tear_down_infra.sh {your-bucket-name}
4.2 Инфраструктура
Для нашего проекта у нас будет следующее
Локальные компоненты:
- Веб-сервер и планировщик Apache Airflow в док-контейнерах.
- База данных метаданных Apache Airflow (Postgres) в док-контейнере.
Компоненты AWS:
- AWS S3 как наше озеро данных.
- AWS Redshift Spectrum как наше хранилище данных.
- AWS IAM, чтобы разрешить Spectrum доступ к данным в S3 во время запроса.
- AWS EMR для запуска заданий Apache Spark для классификации текста.
4.3 Структура озера данных
Мы будем использовать AWS S3 в качестве озера данных. Здесь будут храниться данные из внешних систем для дальнейшей обработки. AWS S3 будет использоваться в качестве хранилища для использования с AWS Redshift Spectrum.
В нашем проекте мы будем использовать один участок памяти с несколькими папками.
- raw: для хранения необработанных данных. Обозначается как Raw Area в разделе design.
- stage: Обозначается как Stage Area a в разделе design.
- raw: используется для хранения искровых скриптов для использования AWS EMR.
Этот шаблон позволит нам создавать разные корзины для разных сред.
4.4 Создание таблиц и конфигураций Airflow
Сценарий установки также создает таблицы, необходимые для нашего конвейера данных. Создаем следующие таблицы:
- таблица retail.user_purchase, определенная в pgsetup/create_user_purchase.sql в репозитории. Данные загружаются в файловую систему контейнера Postgres. Эти данные загружаются в таблицу с помощью команды COPY .
- Таблица spectrum.user_purchase_staging, определяемая как имеющая данные, хранящиеся в том же месте, где находится озеро данных. Обратите внимание, что в таблице также есть раздел, определенный в файле insert_date.
- Таблица spectrum.classified_movie_review, определенная как имеющая данные, хранящиеся в том же месте, где находится озеро данных.
- Таблица public.user_behavior_metric — это таблица, в которую мы хотим загрузить данные.
Кроме того, скрипт также создает соединения и переменные Airflow.
- redshift connection: для подключения к кластеру AWS Redshift.
- postgres_default connection: для подключения к локальной базе данных Postgres.
- BUCKET variable: для указания сегмента, который будет использоваться в качестве озера данных для этого конвейера.
- EMR_ID variable: для отправки команд в кластер AWS EMR.
Вы можете увидеть их в пользовательском интерфейсе Airflow, как показано ниже.
4.5 Затраты на инфраструктуру AWS
Мы используем
- 3 узла m4.xlarge типа для нашего кластера AWS EMR.
- 1 dc2.large для нашего кластера AWS Redshift.
- 1 iam role, чтобы разрешить Redshift доступ к S3.
- 1 S3 bucket размером около 150 МБ.
По очень грубой оценке наша установка обойдется нам в 313.91 USD в месяц. Это означает, что наша инфраструктура будет стоить примерно около 0.43 USD в час. Используйте этот калькулятор стоимости AWS для проверки своих затрат.
5. Пошаговый разбор кода
Данные для user_behavior_metric генерируются из двух основных наборов данных. Мы рассмотрим, как загружается каждый из них, преобразуется и используется для получения данных для финальной таблицы.
5.1 Загрузка данных о покупках пользователй в хранилищах данных
Чтобы загрузить данные о покупках пользователей из Postgres в AWS Redshift, мы выполняем следующие задачи.
- extract_user_purchase_data: выгружает данные из Postgres в локальную файловую систему в контейнере Postgres. Эта файловая система синхронизируется между нашими локальными контейнерами Postgres и Airflow, что позволяет Airflow получить доступ к этим данным.
- user_purchase_to_stage_data_lake: перемещает извлеченные данные в промежуточную область озера данных по адресу stage/user_purchase/{{ ds }}/user_purchase.csv, где ds заменяется датой запуска в формате YYYY-MM-DD . Этот ds будет служить разделом insert_date, определенным при создании таблицы.
- user_purchase_stage_data_lake_to_stage_tbl: запускает запрос Redshift, чтобы сообщить таблице spectrum.user_purchase_staging о новой дате в разделе.
extract_user_purchase_data = PostgresOperator( dag=dag, task_id="extract_user_purchase_data", sql="./scripts/sql/unload_user_purchase.sql", postgres_conn_id="postgres_default", params={"user_purchase": "/temp/user_purchase.csv"}, depends_on_past=True, wait_for_downstream=True, ) user_purchase_to_stage_data_lake = PythonOperator( dag=dag, task_id="user_purchase_to_stage_data_lake", python_callable=_local_to_s3, op_kwargs={ "file_name": "/temp/user_purchase.csv", "key": "stage/user_purchase/{{ ds }}/user_purchase.csv", "bucket_name": BUCKET_NAME, "remove_local": "true", }, ) user_purchase_stage_data_lake_to_stage_tbl = PythonOperator( dag=dag, task_id="user_purchase_stage_data_lake_to_stage_tbl", python_callable=run_redshift_external_query, op_kwargs={ "qry": "alter table spectrum.user_purchase_staging add if not exists partition(insert_date='{{ ds }}') \ location 's3://" + BUCKET_NAME + "/stage/user_purchase/{{ ds }}'", }, ) extract_user_purchase_data >> user_purchase_to_stage_data_lake >> user_purchase_stage_data_lake_to_stage_tbl ./scripts/sql/unload_user_purchase.sql COPY ( select invoice_number, stock_code, detail, quantity, invoice_date, unit_price, customer_id, country from retail.user_purchase -- we should have a date filter here to pull only required date's data ) TO '{{ params.user_purchase }}' WITH (FORMAT CSV, HEADER); -- user_purchase will be replaced with /temp/user_purchase.csv from the params in extract_user_purchase_data task
Хранить весь набор данных в нашей локальной файловой системе не всегда хорошо. Мы можем получить ошибку нехватки памяти, если наш набор данных будет слишком велик. Таким образом, альтернативой может быть потоковый процесс, который записывает пакеты в наше озеро данных.
5.2 Загрузка классификации обзоров фильмов в хранилище данных
Чтобы получить классифицированные данные обзоров фильмов в AWS Redshift, мы выполняем следующие задачи:
- movie_review_to_raw_data_lake: : копирует локальный файл data/movie_review.csv в необработанную область озера данных.
- spark_script_to_s3: копирует наш скрипт pyspark в область сценариев озера данных. Это позволяет AWS EMR ссылаться на него.
-
start_emr_movie_classification_script: добавляет шаги EMR, определенные в dags/scripts/emr/clean_movie_review.json, в наш кластер EMR. Эта задача добавляет в кластер 3 шага EMR, они делают следующее
- Перемещает необработанные данные из S3 в HDFS: данные копируются из необработанной области озера данных в HDFS EMR.
- Классифицирует обзоры фильмов: запускает сценарий pyspark классификации обзоров.
- Перемещает секретные данные из HDFS в S3: данные копируются из HDFS EMR в промежуточную область озера данных.
- wait_for_movie_classification_transformation: это задача датчика, которая ожидает завершения последнего шага (Move classified data from HDFS to S3) для завершения.
movie_review_to_raw_data_lake = PythonOperator( dag=dag, task_id="movie_review_to_raw_data_lake", python_callable=_local_to_s3, op_kwargs={ "file_name": "/data/movie_review.csv", "key": "raw/movie_review/{{ ds }}/movie.csv", "bucket_name": BUCKET_NAME, }, ) spark_script_to_s3 = PythonOperator( dag=dag, task_id="spark_script_to_s3", python_callable=_local_to_s3, op_kwargs={ "file_name": "./dags/scripts/spark/random_text_classification.py", "key": "scripts/random_text_classification.py", "bucket_name": BUCKET_NAME, }, ) start_emr_movie_classification_script = EmrAddStepsOperator( dag=dag, task_id="start_emr_movie_classification_script", job_flow_id=EMR_ID, aws_conn_id="aws_default", steps=EMR_STEPS, params={ "BUCKET_NAME": BUCKET_NAME, "raw_movie_review": "raw/movie_review", "text_classifier_script": "scripts/random_text_classifier.py", "stage_movie_review": "stage/movie_review", }, depends_on_past=True, ) last_step = len(EMR_STEPS) - 1 wait_for_movie_classification_transformation = EmrStepSensor( dag=dag, task_id="wait_for_movie_classification_transformation", job_flow_id=EMR_ID, step_id='{{ task_instance.xcom_pull("start_emr_movie_classification_script", key="return_value")[' + str(last_step) + "] }}", depends_on_past=True, ) [ movie_review_to_raw_data_lake, spark_script_to_s3, ] >> start_emr_movie_classification_script >> wait_for_movie_classification_transformation
5.3 Генерация метрики поведения пользователя
С данными пользовательских покупок, и классификаций фильмов в хранилище данных, мы можем получить данные для таблицы user_behavior_metric. Это делается с помощью задачи generate_user_behavior_metric. Эта задача запускает скрипт Redshift SQL для заполнения таблицы public.user_behavior_metric.
generate_user_behavior_metric = PostgresOperator( dag=dag, task_id="generate_user_behavior_metric", sql="scripts/sql/generate_user_behavior_metric.sql", postgres_conn_id="redshift", ) end_of_data_pipeline = DummyOperator(task_id="end_of_data_pipeline", dag=dag) # dummy operator to indicate DAG complete [ user_purchase_stage_data_lake_to_stage_tbl, wait_for_movie_classification_transformation, ] >> generate_user_behavior_metric >> end_of_data_pipeline
SQL-запрос генерирует совокупные показатели на уровне клиента, используя spectrum.user_purchase_staging и spectrum.classified_movie_review.
-- scripts/sql/generate_user_behavior_metric.sql DELETE FROM public.user_behavior_metric WHERE insert_date = '{{ ds }}'; INSERT INTO public.user_behavior_metric ( customerid, amount_spent, review_score, review_count, insert_date ) SELECT ups.customerid, CAST( SUM(ups.Quantity * ups.UnitPrice) AS DECIMAL(18, 5) ) AS amount_spent, SUM(mrcs.positive_review) AS review_score, count(mrcs.cid) AS review_count, '{{ ds }}' FROM spectrum.user_purchase_staging ups JOIN ( SELECT cid, CASE WHEN positive_review IS True THEN 1 ELSE 0 END AS positive_review FROM spectrum.classified_movie_review WHERE insert_date = '{{ ds }}' ) mrcs ON ups.customerid = mrcs.cid WHERE ups.insert_date = '{{ ds }}' GROUP BY ups.customerid;
Войдите на www.localhost:8080, чтобы увидеть пользовательский интерфейс Airflow. Имя пользователя и пароль – airflow. Включите DAG. На выполнение одного прогона может уйти около 10 минут.
5.4. Проверка результатов
Вы можете проверить Redshift таблицу public.user_behavior_metric из своего терминала, как показано ниже.
export REDSHIFT_HOST=$(aws redshift describe-clusters --cluster-identifier sde-batch-de-project --query 'Clusters[0].Endpoint.Address' --output text) psql postgres://sde_user:sdeP0ssword0987@$REDSHIFT_HOST:5439/dev
В командной строке SQL используйте следующие запросы, чтобы просмотреть сгенерированные данные.
select insert_date, count(*) as cnt from spectrum.classified_movie_review group by insert_date order by cnt desc; -- 100,000 per day select insert_date, count(*) as cnt from spectrum.user_purchase_staging group by insert_date order by cnt desc; -- 541,908 per day select insert_date, count(*) as cnt from public.user_behavior_metric group by insert_date order by cnt desc; -- 908 per day
Подсчеты должны совпадать. Когда вы закончите проект, не забудьте удалить инфраструктуру с помощью скрипта tear_down_infra.sh
./tear_down_infra.sh {your-bucket-name}
6. Размышления к проекту
Теперь, когда у вас успешно работает конвейер данных, пришло время рассмотреть некоторые варианты проекта.
- Идемпотентный конвейер данных
эту статью, чтобы подробно разобраться в идемпотентности.. Подсказка: существует по крайней мере одна неидемпотентная задача.
- Мониторинг и оповещение
Конвейер данных можно контролировать из пользовательского интерфейса Airflow. Шаги EMR можно отслеживать через пользовательский интерфейс AWS. У нас нет никаких сигналов тревоги в случае сбоя, проблем с качеством данных, зависаний задач и т.д. В реальных проектах обычно есть система мониторинга и оповещения. Некоторыми распространенными системами, используемыми для мониторинга и оповещения, являются cloud watch, datadog, или newrelic.
- Контроль качества
Мы не проверяем качество данных в этом конвейере данных. Мы можем настроить базовый подсчет, стандартное отклонение и т. д., прежде чем загружать данные в итоговую таблицу. Для расширенных требований к тестированию рассмотрите возможность использования структуры качества данных, например, great_expectations .
- Параллельные запуски
показано здесь. Даже при соответствующей настройке параллелизма в нашем конвейере данных есть одна блокирующая задача. Выяснение сути блокирующей задачи мы оставим читателю в качестве упражнения.
- Изменение частоты DAG
Какие изменения необходимы для запуска этого DAG каждый час? Как должна измениться схема таблицы, чтобы поддерживать этот процесс? Вам нужно переименовать DAG?
- Обратное заполнение
эту команду для запуска обратного заполнения. Добавьте к команде префикс docker exec -d beginner_de_project_airflow-webserver_1, чтобы запустить ее в нашем док-контейнере Airflow.
- Размер данных
Будет ли конвейер данных работать успешно, если размер ваших данных увеличится в 10, 100, 1000 раз? Почему? почему нет?
7. Дальнейшее развитие проекта
Если вы заинтересованы в дальнейшей работе с этим конвейером данных, рассмотрите возможность внести свой вклад в следующее:
- Модульные тесты, прогонные DAG тесты и интеграционные тесты.
- Используйте API потока задач для DAG.