# ETL-системы ## Что такое ETL? ETL (Extract, Transform, Load) – это технология обработки данных, которая включает три ключевых этапа: 1. **Extract (извлечение)** – получение данных из различных источников: баз данных, файлов, облачных сервисов и т. д. 2. **Transform (трансформация)** – очистка, нормализация, агрегация, изменение формата данных для дальнейшего анализа. 3. **Load (загрузка)** – перенос обработанных данных в целевую систему: хранилище данных (Data Warehouse), BI-инструменты. ![](HowETLworks.jpg) ETL-процессы автоматизируют обработку больших объемов данных, обеспечивая их целостность. ## Зачем были созданы ETL? ETL критически важны для аналитики и бизнеса. Ранее компании сталкивались с проблемами из-за хранения данных в разрозненных системах (базы данных, файлы и т. д.), и их обработку требовалось производить вручную. Разные форматы данных (JSON, CSV, XML, SQL) усложняли анализ. Все это приводило к тому, что подготовка бизнес-аналитики занимала недели. ### При использовании ETL-системы: - Автоматизируется обработка данных, что исключает ручной труд и ошибки; - Объединяются данные из разных источников в едином хранилище; - Они приводятся к единому формату, что упрощает анализ и позволяет оптимизировать бизнес-решения на основе свежих данных. При настройке ETL-процессов задаются правила трансформации, а дальше система выполняет их автоматически при каждом запуске по расписанию или при появлении новых данных. ## Пример ETL-системы – Apache Airflow Apache Airflow – инструмент для создания и управления ETL-пайплайнами. Он написан на Python и имеет открытый исходный код. Позволяет писать процессы обработки данных с помощью Python-скриптов, используя концепцию DAG (направленные ациклические графы), где каждая задача связана с другими и выполняется в нужном порядке. ![](AirflowLogo.png) ### Плюсы: - Хорошая поддержка распределенных вычислений и обработки больших данных. - Глубокая интеграция с облачными сервисами (AWS, GCP, Azure). - Графический интерфейс для мониторинга выполнения задач. - Открытый исходный код и большое сообщество. ### Минусы: - Может требовать значительных ресурсов для крупных процессов. - Требует понимания DevOps и работы с контейнерами (Docker, Kubernetes). ## Базовый пример ETL-пайплайна в Apache Airflow Допустим, у нас есть сырые данные о продажах, хранящиеся в CSV-файле. Нам нужно: 1. Очистить данные (убрать дубликаты, заполнить пропущенные значения). 2. Добавить новый столбец с налогом 20%. 3. Загрузить обработанные данные в базу PostgreSQL. ```python from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime import pandas as pd import psycopg2 # Функция Extract – загружаем данные из CSV def extract(): df = pd.read_csv('/opt/airflow/data/sales_raw.csv') df.to_csv('/opt/airflow/data/sales_extracted.csv', index=False) # Функция Transform – очищаем и обрабатываем данные def transform(): df = pd.read_csv('/opt/airflow/data/sales_extracted.csv') # Удаляем дубликаты df = df.drop_duplicates() # Заполняем пропущенные значения df.fillna({'price': df['price'].mean()}, inplace=True) # Добавляем столбец с налогом (20%) df['price_with_tax'] = df['price'] * 1.2 df.to_csv('/opt/airflow/data/sales_transformed.csv', index=False) # Функция Load – загружаем данные в PostgreSQL def load(): df = pd.read_csv('/opt/airflow/data/sales_transformed.csv') conn = psycopg2.connect( host="db", database="airflow", user="airflow", password="airflow" ) cur = conn.cursor() for _, row in df.iterrows(): cur.execute(""" INSERT INTO sales (product, price, price_with_tax) VALUES (%s, %s, %s) """, (row['product'], row['price'], row['price_with_tax'])) conn.commit() cur.close() conn.close() # Создаём DAG в Airflow default_args = { 'start_date': datetime(2024, 2, 20), # Указывает, с какого момента DAG начинает своё исполнение 'catchup': False # Eсли DAG запускается позже этой даты, пропущенные задачи не будут выполняться задним числом } dag = DAG( 'etl_sales_pipeline', # Имя DAG schedule_interval='@daily', # Указывает, что DAG будет запускаться раз в день default_args=default_args, catchup=False ) extract_task = PythonOperator( task_id='extract', # Уникальный идентификатор задачи python_callable=extract, # Вызов функции extract() dag=dag # Задача привязывается к нашему DAG ) transform_task = PythonOperator( task_id='transform', python_callable=transform, dag=dag ) load_task = PythonOperator( task_id='load', python_callable=load, dag=dag ) extract_task >> transform_task >> load_task # Определяем порядок выполнения ``` ## Другие ETL-инструменты ## Luigi (Python, open-source) Luigi похож на Airflow, но с ориентацией на простоту и поддержание чистоты кода. Создан для обработки сложных процессов в виде цепочек задач и поддерживает параллельную обработку. Он использует Python-классы для создания задач, каждая из которых может быть связана с другими через зависимости. Он обрабатывает задачи последовательно или параллельно в зависимости от их зависимостей. Luigi подходит для построения процессов, где задачи могут быть разделены на более мелкие части, что позволяет эффективно работать с большими объемами данных. ### Плюсы: - Простота в освоении и использовании. - Возможность мониторинга и отладки с помощью встроенных инструментов. - Хорошо подходит для сложных, многоступенчатых рабочих процессов. ### Минусы: - Меньше возможностей для интеграции с BI-инструментами. - Не такая мощная система планирования задач, как у Airflow. ## Pentaho Data Integration (PDI, aka Kettle) (Java, open-source) PDI подходит для бизнеса и BI-аналитиков, так как позволяет собирать процессы без программирования. Визуальный редактор позволяет создавать ETL-процессы. Может работать как самостоятельное приложение или интегрироваться с BI-инструментами. ### Плюсы: - Предоставляет возможности для работы с потоками данных в реальном времени. - Поддерживает множество плагинов и позволяет создавать свои собственные для расширения функционала. ### Минусы: - Сложность освоения для новичков. - Требует мощных серверов при обработке больших данных.