Модуль V·Статья II·~3 мин чтения
Распределённые вычисления и Apache Spark
Алгоритмы для Big Data
Превратить статью в подкаст
Выберите голоса, формат и длину — AI запишет аудио
Системы для обработки больших данных
Один компьютер с 512 ГБ RAM и быстрым SSD мощен, но данные современных компаний — петабайты. Распределённые вычислительные системы позволяют обрабатывать эти данные на кластерах из тысяч машин. Понимание этих систем критично для ML-инженера, работающего с реальными данными.
MapReduce: парадигма параллельных вычислений
Идея (Dean & Ghemawat, Google, 2004): разбить вычисление на два этапа, каждый из которых параллелизуется по данным.
Map(k, v) → список (k', v'): применяется независимо к каждой записи. Пример: подсчёт слов — Map("hello world", 1) → [("hello",1), ("world",1)].
Shuffle: система автоматически группирует все пары (k',v') по ключу k'. Самый дорогой шаг — data movement по сети.
Reduce(k', список v') → список (k'',v''): агрегирует все значения для одного ключа. Пример: Reduce("hello", [1,1,1]) → ("hello", 3).
PageRank через MapReduce: итеративное умножение матрицы на вектор. Каждая итерация — один MapReduce job. Дискроблема: I/O между итерациями (запись и чтение с HDFS каждый раз).
Hadoop HDFS: распределённая файловая система. Блоки 128 MB, каждый реплицируется 3×. Master (NameNode) хранит метаданные. DataNode — хранение данных. Fault tolerance: при отказе DataNode NameNode заказывает новые реплики.
Apache Spark: in-memory вычисления
Ключевое отличие от MapReduce: данные в памяти между операциями (RDD.cache()). При итеративных алгоритмах (ML, PageRank) ускорение 10–100×.
RDD (Resilient Distributed Dataset): иммутабельная, распределённая коллекция элементов. Партиционирована по узлам кластера. Lineage graph (граф зависимостей) обеспечивает fault tolerance: при сбое пересчитываем только потерянные партиции.
Трансформации (ленивые): map, filter, flatMap, join, groupByKey, reduceByKey. Не выполняются немедленно — строят граф вычислений (DAG).
Actions (жадные): collect, count, reduce, saveAsTextFile — запускают исполнение DAG. Catalyst optimizer анализирует DAG, применяет оптимизации (pushdown filters, column pruning, join reordering).
DataFrame API: колонковое хранение, SQL-совместимость, оптимизация через Catalyst. Пример: spark.read.parquet("data").filter("age > 25").groupBy("city").agg(avg("salary")). Внутри: Catalyst находит оптимальный план выполнения.
Spark MLlib: масштабируемые ML-алгоритмы. LinearRegression, LogisticRegression, RandomForest, GBT, KMeans, ALS (Alternating Least Squares для коллаборативной фильтрации). ALS для рекомендаций: Netflix-стиль на миллиардах рейтингов.
GPU-вычисления: параллелизм на уровне ядра
GPU vs CPU: CPU: 8–64 ядра, большой кэш, оптимизирован для последовательных вычислений. GPU: 5000–10000+ ядер, малый кэш, оптимизирован для массово-параллельных вычислений.
CUDA-модель: N блоков × M потоков = N×M параллельных «нитей». Каждый поток работает с маленькой частью данных. Warps (32 нити) — основная единица исполнения.
Tensor Cores (NVIDIA Volta+): матричные умножения FP16/BF16 → 8× выше throughput vs FP32 cores. A100: 312 TFLOPS (TF32), 1248 TOPS (INT8). H100: 2000 TFLOPS (BF16).
torch.compile (PyTorch 2.0): компилирует Python/PyTorch graph в оптимизированные CUDA kernels через TorchInductor + Triton. 2× ускорение «бесплатно» на типичных трансформерах.
Эффективный инференс
Проблема: обученная LLaMA-70B модель требует 140 ГБ GPU RAM в FP16. Большинство задач не требуют полной точности.
Quantization (квантизация): FP16/BF16 вместо FP32 (2×), INT8 (4×), INT4 (8×). GPTQ: post-training quantization весов до INT4 с минимальной потерей качества. AWQ: количественная активация-aware.
KV-cache: при авторегрессивной генерации ключи и значения предыдущих токенов кэшируются. Без кэша: O(n²) операций для генерации n токенов. С кэшем: O(n). Ботлнек для длинного контекста: кэш 32K токенов × 70B = 40 ГБ.
vLLM: PagedAttention — управление KV-кэшем как виртуальной памятью ОС. Непрерывный batching: обрабатываем разные запросы разной длины эффективно. 24× выше пропускная способность vs naive serving.
Численный пример
Apache Spark WordCount на кластере 10 узлов, файл 1 ТБ текста:
- MapReduce (Hadoop): ~45 минут (disk I/O доминирует)
- Spark (RDD, in-memory): ~8 минут (6× ускорение)
- Spark (DataFrame + Parquet): ~3 минуты (колонковое хранение + Catalyst)
Вычисление PageRank для графа 1B узлов:
- MapReduce: 60 итераций × 5 мин = 5 часов
- Spark с cacheRDD: 60 итераций × 30 сек = 30 минут
Задание: (1) Реализуйте Word2Vec на PySpark MLlib на текстовом корпусе (Wikidump 1GB). Измерьте время и качество (аналогии типа king−man+woman). (2) Реализуйте ALS для коллаборативной фильтрации MovieLens 20M. Гиперпараметры: rank=50, maxIter=10, regParam=0.1. Вычислите RMSE. (3) Профилируйте исполнение: Spark UI → какой этап занимает больше всего времени? Как улучшить?
§ Акт · что дальше