Модуль V·Статья II·~3 мин чтения

Распределённые вычисления и Apache Spark

Алгоритмы для Big Data

Превратить статью в подкаст

Выберите голоса, формат и длину — AI запишет аудио

Системы для обработки больших данных

Один компьютер с 512 ГБ RAM и быстрым SSD мощен, но данные современных компаний — петабайты. Распределённые вычислительные системы позволяют обрабатывать эти данные на кластерах из тысяч машин. Понимание этих систем критично для ML-инженера, работающего с реальными данными.

MapReduce: парадигма параллельных вычислений

Идея (Dean & Ghemawat, Google, 2004): разбить вычисление на два этапа, каждый из которых параллелизуется по данным.

Map(k, v) → список (k', v'): применяется независимо к каждой записи. Пример: подсчёт слов — Map("hello world", 1) → [("hello",1), ("world",1)].

Shuffle: система автоматически группирует все пары (k',v') по ключу k'. Самый дорогой шаг — data movement по сети.

Reduce(k', список v') → список (k'',v''): агрегирует все значения для одного ключа. Пример: Reduce("hello", [1,1,1]) → ("hello", 3).

PageRank через MapReduce: итеративное умножение матрицы на вектор. Каждая итерация — один MapReduce job. Дискроблема: I/O между итерациями (запись и чтение с HDFS каждый раз).

Hadoop HDFS: распределённая файловая система. Блоки 128 MB, каждый реплицируется 3×. Master (NameNode) хранит метаданные. DataNode — хранение данных. Fault tolerance: при отказе DataNode NameNode заказывает новые реплики.

Apache Spark: in-memory вычисления

Ключевое отличие от MapReduce: данные в памяти между операциями (RDD.cache()). При итеративных алгоритмах (ML, PageRank) ускорение 10–100×.

RDD (Resilient Distributed Dataset): иммутабельная, распределённая коллекция элементов. Партиционирована по узлам кластера. Lineage graph (граф зависимостей) обеспечивает fault tolerance: при сбое пересчитываем только потерянные партиции.

Трансформации (ленивые): map, filter, flatMap, join, groupByKey, reduceByKey. Не выполняются немедленно — строят граф вычислений (DAG).

Actions (жадные): collect, count, reduce, saveAsTextFile — запускают исполнение DAG. Catalyst optimizer анализирует DAG, применяет оптимизации (pushdown filters, column pruning, join reordering).

DataFrame API: колонковое хранение, SQL-совместимость, оптимизация через Catalyst. Пример: spark.read.parquet("data").filter("age > 25").groupBy("city").agg(avg("salary")). Внутри: Catalyst находит оптимальный план выполнения.

Spark MLlib: масштабируемые ML-алгоритмы. LinearRegression, LogisticRegression, RandomForest, GBT, KMeans, ALS (Alternating Least Squares для коллаборативной фильтрации). ALS для рекомендаций: Netflix-стиль на миллиардах рейтингов.

GPU-вычисления: параллелизм на уровне ядра

GPU vs CPU: CPU: 8–64 ядра, большой кэш, оптимизирован для последовательных вычислений. GPU: 5000–10000+ ядер, малый кэш, оптимизирован для массово-параллельных вычислений.

CUDA-модель: N блоков × M потоков = N×M параллельных «нитей». Каждый поток работает с маленькой частью данных. Warps (32 нити) — основная единица исполнения.

Tensor Cores (NVIDIA Volta+): матричные умножения FP16/BF16 → 8× выше throughput vs FP32 cores. A100: 312 TFLOPS (TF32), 1248 TOPS (INT8). H100: 2000 TFLOPS (BF16).

torch.compile (PyTorch 2.0): компилирует Python/PyTorch graph в оптимизированные CUDA kernels через TorchInductor + Triton. 2× ускорение «бесплатно» на типичных трансформерах.

Эффективный инференс

Проблема: обученная LLaMA-70B модель требует 140 ГБ GPU RAM в FP16. Большинство задач не требуют полной точности.

Quantization (квантизация): FP16/BF16 вместо FP32 (2×), INT8 (4×), INT4 (8×). GPTQ: post-training quantization весов до INT4 с минимальной потерей качества. AWQ: количественная активация-aware.

KV-cache: при авторегрессивной генерации ключи и значения предыдущих токенов кэшируются. Без кэша: O(n²) операций для генерации n токенов. С кэшем: O(n). Ботлнек для длинного контекста: кэш 32K токенов × 70B = 40 ГБ.

vLLM: PagedAttention — управление KV-кэшем как виртуальной памятью ОС. Непрерывный batching: обрабатываем разные запросы разной длины эффективно. 24× выше пропускная способность vs naive serving.

Численный пример

Apache Spark WordCount на кластере 10 узлов, файл 1 ТБ текста:

  • MapReduce (Hadoop): ~45 минут (disk I/O доминирует)
  • Spark (RDD, in-memory): ~8 минут (6× ускорение)
  • Spark (DataFrame + Parquet): ~3 минуты (колонковое хранение + Catalyst)

Вычисление PageRank для графа 1B узлов:

  • MapReduce: 60 итераций × 5 мин = 5 часов
  • Spark с cacheRDD: 60 итераций × 30 сек = 30 минут

Задание: (1) Реализуйте Word2Vec на PySpark MLlib на текстовом корпусе (Wikidump 1GB). Измерьте время и качество (аналогии типа king−man+woman). (2) Реализуйте ALS для коллаборативной фильтрации MovieLens 20M. Гиперпараметры: rank=50, maxIter=10, regParam=0.1. Вычислите RMSE. (3) Профилируйте исполнение: Spark UI → какой этап занимает больше всего времени? Как улучшить?

§ Акт · что дальше