|
@@ -0,0 +1,152 @@
|
|
|
+# ETL-системы
|
|
|
+
|
|
|
+## Что такое ETL?
|
|
|
+ETL (Extract, Transform, Load) – это технология обработки данных, которая включает три ключевых этапа:
|
|
|
+1. **Extract (извлечение)** – получение данных из различных источников: баз данных, файлов, облачных сервисов и т. д.
|
|
|
+2. **Transform (трансформация)** – очистка, нормализация, агрегация, изменение формата данных для дальнейшего анализа.
|
|
|
+3. **Load (загрузка)** – перенос обработанных данных в целевую систему: хранилище данных (Data Warehouse), BI-инструменты.
|
|
|
+
|
|
|
+
|
|
|
+ETL-процессы автоматизируют обработку больших объемов данных, обеспечивая их целостность.
|
|
|
+
|
|
|
+
|
|
|
+## Зачем были созданы ETL?
|
|
|
+ETL критически важны для аналитики и бизнеса. Ранее компании сталкивались с проблемами из-за хранения данных в разрозненных системах (базы данных, файлы и т. д.), и их обработку требовалось производить вручную. Разные форматы данных (JSON, CSV, XML, SQL) усложняли анализ. Все это приводило к тому, что подготовка бизнес-аналитики занимала недели.
|
|
|
+
|
|
|
+### При использовании ETL-системы:
|
|
|
+- Автоматизируется обработка данных, что исключает ручной труд и ошибки;
|
|
|
+- Объединяются данные из разных источников в едином хранилище;
|
|
|
+- Они приводятся к единому формату, что упрощает анализ и позволяет оптимизировать бизнес-решения на основе свежих данных.
|
|
|
+
|
|
|
+При настройке ETL-процессов задаются правила трансформации, а дальше система выполняет их автоматически при каждом запуске по расписанию или при появлении новых данных.
|
|
|
+
|
|
|
+## Пример ETL-системы – Apache Airflow
|
|
|
+Apache Airflow – инструмент для создания и управления ETL-пайплайнами. Он написан на Python и имеет открытый исходный код. Позволяет писать процессы обработки данных с помощью Python-скриптов, используя концепцию DAG (направленные ациклические графы), где каждая задача связана с другими и выполняется в нужном порядке.
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+### Плюсы:
|
|
|
+- Хорошая поддержка распределенных вычислений и обработки больших данных.
|
|
|
+- Глубокая интеграция с облачными сервисами (AWS, GCP, Azure).
|
|
|
+- Графический интерфейс для мониторинга выполнения задач.
|
|
|
+- Открытый исходный код и большое сообщество.
|
|
|
+
|
|
|
+### Минусы:
|
|
|
+- Может требовать значительных ресурсов для крупных процессов.
|
|
|
+- Требует понимания DevOps и работы с контейнерами (Docker, Kubernetes).
|
|
|
+
|
|
|
+## Базовый пример ETL-пайплайна в Apache Airflow
|
|
|
+Допустим, у нас есть сырые данные о продажах, хранящиеся в CSV-файле. Нам нужно:
|
|
|
+1. Очистить данные (убрать дубликаты, заполнить пропущенные значения).
|
|
|
+2. Добавить новый столбец с налогом 20%.
|
|
|
+3. Загрузить обработанные данные в базу PostgreSQL.
|
|
|
+
|
|
|
+```python
|
|
|
+from airflow import DAG
|
|
|
+from airflow.operators.python import PythonOperator
|
|
|
+from datetime import datetime
|
|
|
+import pandas as pd
|
|
|
+import psycopg2
|
|
|
+
|
|
|
+# Функция Extract – загружаем данные из CSV
|
|
|
+def extract():
|
|
|
+ df = pd.read_csv('/opt/airflow/data/sales_raw.csv')
|
|
|
+ df.to_csv('/opt/airflow/data/sales_extracted.csv', index=False)
|
|
|
+
|
|
|
+# Функция Transform – очищаем и обрабатываем данные
|
|
|
+def transform():
|
|
|
+ df = pd.read_csv('/opt/airflow/data/sales_extracted.csv')
|
|
|
+
|
|
|
+ # Удаляем дубликаты
|
|
|
+ df = df.drop_duplicates()
|
|
|
+
|
|
|
+ # Заполняем пропущенные значения
|
|
|
+ df.fillna({'price': df['price'].mean()}, inplace=True)
|
|
|
+
|
|
|
+ # Добавляем столбец с налогом (20%)
|
|
|
+ df['price_with_tax'] = df['price'] * 1.2
|
|
|
+
|
|
|
+ df.to_csv('/opt/airflow/data/sales_transformed.csv', index=False)
|
|
|
+
|
|
|
+# Функция Load – загружаем данные в PostgreSQL
|
|
|
+def load():
|
|
|
+ df = pd.read_csv('/opt/airflow/data/sales_transformed.csv')
|
|
|
+
|
|
|
+ conn = psycopg2.connect(
|
|
|
+ host="db",
|
|
|
+ database="airflow",
|
|
|
+ user="airflow",
|
|
|
+ password="airflow"
|
|
|
+ )
|
|
|
+ cur = conn.cursor()
|
|
|
+
|
|
|
+ for _, row in df.iterrows():
|
|
|
+ cur.execute("""
|
|
|
+ INSERT INTO sales (product, price, price_with_tax)
|
|
|
+ VALUES (%s, %s, %s)
|
|
|
+ """, (row['product'], row['price'], row['price_with_tax']))
|
|
|
+
|
|
|
+ conn.commit()
|
|
|
+ cur.close()
|
|
|
+ conn.close()
|
|
|
+
|
|
|
+# Создаём DAG в Airflow
|
|
|
+default_args = {
|
|
|
+ 'start_date': datetime(2024, 2, 20), # Указывает, с какого момента DAG начинает своё исполнение
|
|
|
+ 'catchup': False # Eсли DAG запускается позже этой даты, пропущенные задачи не будут выполняться задним числом
|
|
|
+}
|
|
|
+
|
|
|
+dag = DAG(
|
|
|
+ 'etl_sales_pipeline', # Имя DAG
|
|
|
+ schedule_interval='@daily', # Указывает, что DAG будет запускаться раз в день
|
|
|
+ default_args=default_args,
|
|
|
+ catchup=False
|
|
|
+)
|
|
|
+
|
|
|
+extract_task = PythonOperator(
|
|
|
+ task_id='extract', # Уникальный идентификатор задачи
|
|
|
+ python_callable=extract, # Вызов функции extract()
|
|
|
+ dag=dag # Задача привязывается к нашему DAG
|
|
|
+)
|
|
|
+
|
|
|
+transform_task = PythonOperator(
|
|
|
+ task_id='transform',
|
|
|
+ python_callable=transform,
|
|
|
+ dag=dag
|
|
|
+)
|
|
|
+
|
|
|
+load_task = PythonOperator(
|
|
|
+ task_id='load',
|
|
|
+ python_callable=load,
|
|
|
+ dag=dag
|
|
|
+)
|
|
|
+
|
|
|
+extract_task >> transform_task >> load_task # Определяем порядок выполнения
|
|
|
+
|
|
|
+```
|
|
|
+
|
|
|
+## Другие ETL-инструменты
|
|
|
+
|
|
|
+## Luigi (Python, open-source)
|
|
|
+Luigi похож на Airflow, но с ориентацией на простоту и поддержание чистоты кода. Создан для обработки сложных процессов в виде цепочек задач и поддерживает параллельную обработку. Он использует Python-классы для создания задач, каждая из которых может быть связана с другими через зависимости. Он обрабатывает задачи последовательно или параллельно в зависимости от их зависимостей. Luigi подходит для построения процессов, где задачи могут быть разделены на более мелкие части, что позволяет эффективно работать с большими объемами данных.
|
|
|
+
|
|
|
+### Плюсы:
|
|
|
+- Простота в освоении и использовании.
|
|
|
+- Возможность мониторинга и отладки с помощью встроенных инструментов.
|
|
|
+- Хорошо подходит для сложных, многоступенчатых рабочих процессов.
|
|
|
+
|
|
|
+### Минусы:
|
|
|
+- Меньше возможностей для интеграции с BI-инструментами.
|
|
|
+- Не такая мощная система планирования задач, как у Airflow.
|
|
|
+
|
|
|
+## Pentaho Data Integration (PDI, aka Kettle) (Java, open-source)
|
|
|
+PDI подходит для бизнеса и BI-аналитиков, так как позволяет собирать процессы без программирования. Визуальный редактор позволяет создавать ETL-процессы. Может работать как самостоятельное приложение или интегрироваться с BI-инструментами.
|
|
|
+
|
|
|
+### Плюсы:
|
|
|
+- Предоставляет возможности для работы с потоками данных в реальном времени.
|
|
|
+- Поддерживает множество плагинов и позволяет создавать свои собственные для расширения функционала.
|
|
|
+
|
|
|
+### Минусы:
|
|
|
+- Сложность освоения для новичков.
|
|
|
+- Требует мощных серверов при обработке больших данных.
|
|
|
+
|