# ETL-системы
ETL (Extract, Transform, Load) – это технология обработки данных, которая включает три ключевых этапа:
ETL-процессы автоматизируют обработку больших объемов данных, обеспечивая их целостность.
ETL критически важны для аналитики и бизнеса. Ранее компании сталкивались с проблемами из-за хранения данных в разрозненных системах (базы данных, файлы и т. д.), и их обработку требовалось производить вручную. Разные форматы данных (JSON, CSV, XML, SQL) усложняли анализ. Все это приводило к тому, что подготовка бизнес-аналитики занимала недели.
При настройке ETL-процессов задаются правила трансформации, а дальше система выполняет их автоматически при каждом запуске по расписанию или при появлении новых данных.
Apache Airflow – инструмент для создания и управления ETL-пайплайнами. Он написан на Python и имеет открытый исходный код. Позволяет писать процессы обработки данных с помощью Python-скриптов, используя концепцию DAG (направленные ациклические графы), где каждая задача связана с другими и выполняется в нужном порядке.
Допустим, у нас есть сырые данные о продажах, хранящиеся в CSV-файле. Нам нужно:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import pandas as pd
import psycopg2
# Функция Extract – загружаем данные из CSV
def extract():
df = pd.read_csv('/opt/airflow/data/sales_raw.csv')
df.to_csv('/opt/airflow/data/sales_extracted.csv', index=False)
# Функция Transform – очищаем и обрабатываем данные
def transform():
df = pd.read_csv('/opt/airflow/data/sales_extracted.csv')
# Удаляем дубликаты
df = df.drop_duplicates()
# Заполняем пропущенные значения
df.fillna({'price': df['price'].mean()}, inplace=True)
# Добавляем столбец с налогом (20%)
df['price_with_tax'] = df['price'] * 1.2
df.to_csv('/opt/airflow/data/sales_transformed.csv', index=False)
# Функция Load – загружаем данные в PostgreSQL
def load():
df = pd.read_csv('/opt/airflow/data/sales_transformed.csv')
conn = psycopg2.connect(
host="db",
database="airflow",
user="airflow",
password="airflow"
)
cur = conn.cursor()
for _, row in df.iterrows():
cur.execute("""
INSERT INTO sales (product, price, price_with_tax)
VALUES (%s, %s, %s)
""", (row['product'], row['price'], row['price_with_tax']))
conn.commit()
cur.close()
conn.close()
# Создаём DAG в Airflow
default_args = {
'start_date': datetime(2024, 2, 20), # Указывает, с какого момента DAG начинает своё исполнение
'catchup': False # Eсли DAG запускается позже этой даты, пропущенные задачи не будут выполняться задним числом
}
dag = DAG(
'etl_sales_pipeline', # Имя DAG
schedule_interval='@daily', # Указывает, что DAG будет запускаться раз в день
default_args=default_args,
catchup=False
)
extract_task = PythonOperator(
task_id='extract', # Уникальный идентификатор задачи
python_callable=extract, # Вызов функции extract()
dag=dag # Задача привязывается к нашему DAG
)
transform_task = PythonOperator(
task_id='transform',
python_callable=transform,
dag=dag
)
load_task = PythonOperator(
task_id='load',
python_callable=load,
dag=dag
)
extract_task >> transform_task >> load_task # Определяем порядок выполнения
Luigi похож на Airflow, но с ориентацией на простоту и поддержание чистоты кода. Создан для обработки сложных процессов в виде цепочек задач и поддерживает параллельную обработку. Он использует Python-классы для создания задач, каждая из которых может быть связана с другими через зависимости. Luigi обрабатывает задачи последовательно или параллельно в зависимости от их зависимостей. Он подходит для построения процессов, где задачи могут быть разделены на более мелкие части, что позволяет эффективно работать с большими объемами данных.
Кроме того, Luigi предоставляет встроенный веб-интерфейс для мониторинга выполнения задач, что упрощает отслеживание состояния пайплайнов. Он поддерживает работу с различными системами хранения данных, включая локальные файлы, базы данных и облачные сервисы. Luigi также позволяет легко переиспользовать код, создавая модульные и масштабируемые ETL-пайплайны.
Prefect — более современный конкурент Airflow, созданный с упором на простоту, отказоустойчивость и гибкость. Основная концепция Prefect строится вокруг потоков и задач, где каждая задача представляет собой отдельный шаг обработки данных.
Одним из главных преимуществ Prefect является его способность автоматически обрабатывать ошибки, управлять зависимостями между задачами и повторять выполнение в случае сбоев. Это делает его надежным инструментом для работы с большими объемами данных.
В отличие от Airflow, Prefect не требует отдельного сервера для управления задачами, а его облачная версия (Prefect Cloud) предлагает удобные инструменты для мониторинга и управления потоками данных. Благодаря этому можно контролировать выполнение задач в реальном времени и оперативно реагировать на возможные сбои.
dbt (Data Build Tool) — это инструмент для трансформации данных внутри хранилищ. В отличие от классических ETL-инструментов, dbt выполняет только трансформацию, оставляя процесс извлечения и загрузки другим системам.
Главная особенность dbt заключается в том, что он позволяет аналитикам и инженерам данных работать с хранилищем данных так же, как разработчики работают с кодом. Он поддерживает контроль версий с помощью Git, обеспечивает модульность, что позволяет переиспользовать код, а также предоставляет инструменты для документирования и тестирования моделей. Это делает его удобным инструментом для построения надежных аналитических решений. Автоматизация выполнения моделей и управление зависимостями между ними позволяют пользователям сосредоточиться на аналитике, а не на технических аспектах трансформации данных.
Легковесный ETL-фреймворк на Python, предназначенный для потоковой обработки данных. В отличие от более тяжелых инструментов, таких как Apache Airflow или Prefect, Bonobo предлагает простую и интуитивно понятную архитектуру, в которой ETL-конвейеры строятся на основе обычных Python-функций. Это делает его удобным для быстрого прототипирования и выполнения небольших ETL-задач.
Одним из ключевых преимуществ Bonobo является минимальное количество шаблонного кода. Разработчик может описывать шаги обработки данных, используя простые функции, а сам фреймворк берет на себя управление потоком выполнения. Также он легко интегрируется с Pandas, что делает его привлекательным для аналитиков, которым нужно быстро преобразовывать данные без сложных инструментов.
Благодаря поддержке потоковой обработки данных Bonobo позволяет работать с данными в реальном времени, а его архитектура не требует развертывания отдельного сервера или сложной инфраструктуры. Это делает его удобным для задач, где важна легкость и скорость развертывания, а не сложные зависимости и оркестрация.
Однако он не подходит для масштабных и сложных ETL-процессов с множеством зависимостей, так как его возможности по управлению задачами ограничены. Масштабируемость также является его слабым местом, поскольку он ориентирован в первую очередь на небольшие и средние нагрузки.
Этот ETL-фреймворк по своему назначению похож на bonobo. Разработанный для удобной работы с SQL-запросами, он ориентирован на простоту и минимализм, предоставляя разработчикам удобные инструменты для организации ETL-скриптов без необходимости использования сложных оркестраторов. Основной упор сделан на работу с реляционными базами данных, особенно с PostgreSQL.
Одним из ключевых преимуществ Mara является встроенный веб-интерфейс, который позволяет мониторить выполнение ETL-пайплайнов, просматривать логи и анализировать возможные ошибки. Благодаря нативной поддержке SQL и возможности интеграции с различными источниками данных, Mara упрощает процесс написания и исполнения SQL-скриптов, снижая необходимость в сложных настройках.
Но в отличие от более масштабируемых решений, Mara не предназначена для сложных ETL-процессов с множеством зависимостей. Её архитектура больше подходит для небольших и средних проектов, где основная работа сосредоточена на SQL-преобразованиях. Кроме того, у Mara ограниченные возможности для работы с потоковыми данными, так как она ориентирована на пакетную обработку.