Перейти к основному содержимому

Apache Spark на Scala — обзор

Разработчику Архитектору

Дальше: Play Framework · Akka — основы · Scala — о разделе


Apache Spark на Scala — обзор

Apache Spark — распределённый движок для batch- и stream-обработки больших данных. Scala — язык, на котором Spark изначально писался: API DataFrame/Dataset наиболее идиоматичны именно в Scala, хотя доступны Python (PySpark) и Java.

Практикум идёт по шагам — подготовка данных, SparkSession, чтение CSV, трансформации, SQL, запись Parquet, кратко RDD и Structured Streaming.

ШагТемаЗачем
1Когда нужен SparkНе перегружать малые задачи
2SparkSessionТочка входа программы
3DataFrame + CSVЗагрузить таблицу
4Transform / ActionLazy evaluation
5SQLЗнакомый синтаксис для аналитиков
6Запись ParquetПромежуточный слой data lake
7spark-submitЗапуск на кластере
МатериалЗачем
Основы Scalacase class, implicits
Коллекции и типыmap, filter, fold
SQL — первый проектGROUP BY, JOIN
Akka — основыСтримы событий рядом со Spark
Play FrameworkСервисный слой поверх аналитики

Навигация по экосистеме Scala


Термины перед стартом

ТерминКратко
RDDResilient Distributed Dataset — низкоуровневый распределённый набор с lineage для recovery
DataFrameТабличный API с именованными столбцами; оптимизатор Catalyst
DatasetТипизированный DataFrame (Scala/Java)
DriverJVM-процесс вашей main: планирует DAG
ExecutorWorker-JVM: выполняет tasks, хранит кэш
ShuffleПерераспределение данных между узлами; дорогая операция
PartitionКусок данных на одном executor

Шаг 1 — когда нужен Spark

СценарийSparkАльтернатива на одной машине
Терабайты логов, join агрегатовДа
CSV 50 MB, отчёт в ExcelИзбыточенpandas, R, SQL
Stream Kafka → агрегатыStructured StreamingFlink, ksqlDB
ML на кластереMLlibscikit-learn локально

Spark масштабируется горизонтально — добавление executor-узлов ускоряет job.

Spark и веб-фреймворки

Play обслуживает HTTP-запросы; Akka — события и actors. Spark — офлайн/batch и stream-аналитика. В одной компании они часто сосуществуют в разных пайплайнах.


Шаг 2 — архитектура

  • Driver планирует DAG операций.
  • Executor выполняет tasks и хранит кэшированные партиции.
  • Shuffle — пересылка данных между узлами при groupBy / join.

Шаг 3 — проект sbt и SparkSession

build.sbt

name := "sales-report"
version := "0.1"
scalaVersion := "2.13.14"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.5.0" % "provided",
"org.apache.spark" %% "spark-sql" % "3.5.0" % "provided"
)

На кластере JAR Spark уже есть на workers — scope provided. Локально для учёбы замените на % "compile" или запускайте через spark-submit --packages.

Точка входа

// src/main/scala/SalesReport.scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object SalesReport extends App {

val spark = SparkSession.builder()
.appName("SalesReport")
.master("local[*]")
.getOrCreate()

import spark.implicits._

// ... трансформации ...

spark.stop()
}
  • local[*] — все ядра CPU на одной машине (учёба).
  • getOrCreate() — переиспользует сессию в REPL/notebook.
  • spark.stop() — освобождает ресурсы.

Шаг 4 — sample-данные и чтение CSV

Создайте sales.csv:

id,region,amount,date
1,EMEA,120.50,2024-01-10
2,APAC,80.00,2024-01-11
3,EMEA,-5.00,2024-01-12
4,AMER,200.00,2024-01-13
5,APAC,150.25,2024-01-14
val df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("sales.csv")

df.printSchema()
df.show()

DataFrame — распределённая таблица с именованными столбцами; физически данные разбиты на partitions по executors.


Шаг 5 — трансформации и действия

ТипПримерыКогда выполняется
Transformation (lazy)filter, select, groupBy, joinПри следующем action
Actioncount, show, collect, writeСразу, запускает job
val byRegion = df
.filter($"amount" > 0)
.groupBy($"region")
.agg(
sum($"amount").as("total"),
count("*").as("rows")
)

byRegion.orderBy($"total".desc).show()

Lazy evaluation позволяет Catalyst оптимизировать план до физического выполнения.


Шаг 6 — SQL поверх DataFrame

df.createOrReplaceTempView("sales")

spark.sql("""
SELECT region, SUM(amount) AS total, COUNT(*) AS rows
FROM sales
WHERE amount > 0
GROUP BY region
ORDER BY total DESC
""").show()

Знакомый SQL-синтаксис для аналитиков; под капотом — тот же оптимизатор. Основы SQL — первый проект.


Шаг 7 — запись результата

byRegion.write
.mode("overwrite")
.option("header", "true")
.parquet("output/by_region")

Форматы:

  • Parquet — колоночный, предпочтителен для data lake;
  • ORC, JSON, Delta Lake.

Качество данных и схемы — конфигурации и данные.


RDD — исторический слой

RDD — низкоуровневый API до DataFrame:

val rdd = spark.sparkContext.textFile("logs.txt")
val errors = rdd.filter(line => line.contains("ERROR"))
println(errors.count())

Сегодня DataFrame/Dataset предпочтительны: оптимизатор и меньше boilerplate. RDD нужен для нестандартных типов и legacy-кода.


Structured Streaming (кратко)

val stream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.load()

// агрегации → writeStream в sink
val query = stream
.writeStream
.format("console")
.outputMode("append")
.start()

query.awaitTermination()

Модель micro-batch — единый API для stream и batch. Для online-веба смотрите Phoenix Channels; для actor-pipeline — Akka Streams.


Полный учебный job

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object SalesReport extends App {
val spark = SparkSession.builder()
.appName("SalesReport")
.master("local[*]")
.getOrCreate()

import spark.implicits._

val df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("sales.csv")

val cleaned = df.filter($"amount" > 0)

val byRegion = cleaned
.groupBy($"region")
.agg(
sum($"amount").as("total"),
count("*").as("rows"),
avg($"amount").as("avg_check")
)

byRegion.orderBy($"total".desc).show()

byRegion.write.mode("overwrite").parquet("output/by_region")

println(s"Regions: ${byRegion.count()}")
spark.stop()
}

Запуск локально:

sbt run
# или после assembly:
spark-submit --class SalesReport --master local[*] target/scala-2.13/sales-report.jar

Сравнение Scala Spark и PySpark

КритерийScalaPython
Типизация DatasetСильнаяDataFrame only
Производительность UDFJVM nativePython overhead
Data science командаРежеpandas/sklearn привычнее
Spark internalsReference APIШирокое adoption

Для data engineering pipeline на кластере Scala остаётся сильным выбором; для ML research чаще PySpark.


Частые ошибки

ОшибкаСимптомРешение
collect() на большом DFOOM на driverПисать в файл или take(n)
Много мелких файловМедленный readcoalesce / repartition перед записью
Shuffle на каждом joinДолгие stagesBroadcast маленькой таблицы
inferSchema на продеНеверные типыЯвная схема в read.schema
Забыли spark.stop()Зависшие процессыtry/finally или Using

Production и эксплуатация

ПрактикаЗачем
Явная схемаСтабильность типов
Партиционирование по датеМеньше shuffle при фильтрах
spark.sql.adaptive.enabledAQE на Spark 3.x
Мониторинг Spark UIStages, skew, GC
Отдельный кластерИзоляция от Play/Akka сервисов
Production

Не смешивайте driver и executor heap с web-приложением. Секреты S3/HDFS — через vault/env. Тестируйте job на sample 1% данных перед полным прогоном.


Чек-лист первого проекта

  1. Sample CSV 100k+ строк локально (local[*]).
  2. groupBy + агрегаты + запись Parquet.
  3. Unit-тест трансформаций (Spark local mode + SharedSparkSession).
  4. Деплой через spark-submit на тестовый кластер.
  5. Документируйте входные/выходные контракты как для REST API.

Упражнения

  1. Добавьте столбец month через date_format и агрегируйте по региону и месяцу.
  2. Запишите результат в CSV и сравните размер с Parquet.
  3. Перепишите фильтр amount > 0 на Spark SQL.
  4. Посчитайте строки с ERROR через RDD и через DataFrame — сравните код.
  5. Нарисуйте, где в вашей компании могли бы стоять Play, Akka и Spark.

FAQ

Spark заменяет базу данных? Нет. Spark читает из S3/HDFS/JDBC/Kafka, вычисляет, пишет результат. OLTP остаётся в PostgreSQL и т.п.

Нужен ли Hadoop? Spark 3 часто работает с S3 + K8s без HDFS; YARN всё ещё встречается в enterprise.

Dataset vs DataFrame? В Scala 2.13 Dataset[T] типизирован; DataFrame = Dataset[Row].

Как связать со Spark Connect? Spark Connect — удалённый клиент без fat driver; актуально для BI и thin clients.


Связь с экосистемой энциклопедии

ТемаСтатья
CSV, качество данныхконфигурации и данные
SQL после агрегацииSQL — первый проект
Пакетная обработкаанализ данных — пакеты
Scala ETL до Sparkпростые приложения
HTTP-сервисыPlay Framework
АкторыAkka — основы

Broadcast join — оптимизация

Когда одна таблица мала (справочник регионов), используйте broadcast — копия на каждый executor без shuffle:

import org.apache.spark.sql.functions.broadcast

val regions = spark.read.option("header", true).csv("regions.csv")
val sales = spark.read.option("header", true).csv("sales.csv")

val enriched = sales.join(broadcast(regions), Seq("region_id"), "left")
enriched.show()

Без broadcast Spark может shuffle обе стороны — на больших sales это дорого.


Явная схема вместо inferSchema

import org.apache.spark.sql.types._

val schema = StructType(Seq(
StructField("id", IntegerType, nullable = false),
StructField("region", StringType, nullable = true),
StructField("amount", DoubleType, nullable = true),
StructField("date", DateType, nullable = true)
))

val df = spark.read
.schema(schema)
.option("header", true)
.csv("sales.csv")

Production всегда задаёт схему — inferSchema читает sample и может ошибиться на новых данных.


Unit-тест трансформаций

// test/scala/SalesReportSpec.scala
import org.scalatest.funsuite.AnyFunSuite
import org.apache.spark.sql.SparkSession

class SalesReportSpec extends AnyFunSuite {

lazy val spark: SparkSession = SparkSession.builder()
.master("local[2]")
.appName("test")
.getOrCreate()

test("filter positive amounts") {
import spark.implicits._
val df = Seq(
(1, "EMEA", 100.0),
(2, "APAC", -5.0)
).toDF("id", "region", "amount")

val positive = df.filter($"amount" > 0)
assert(positive.count() == 1)
}

override def afterAll(): Unit = spark.stop()
}

local[2] — два потока, достаточно для CI без кластера.


Партиции и файлы

ОперацияКогда
repartition(n)Увеличить параллелизм перед тяжёлым join
coalesce(n)Уменьшить число output-файлов без full shuffle
partitionBy("date")Hive-style каталоги по дате
byRegion.write
.partitionBy("region")
.mode("overwrite")
.parquet("output/by_region_partitioned")

Много мелких Parquet-файлов (тысячи) замедляет read — целевой размер файла 128–256 MB.


Spark UI и отладка

После запуска job откройте http://localhost:4040 (пока SparkContext жив):

  • Jobs — stages, duration;
  • Storage — cached RDD/DataFrame;
  • Executors — skew, GC time.

При straggler task — data skew; решение — salting ключа или rebalance.


Delta Lake (кратко)

Delta добавляет ACID поверх Parquet:

import io.delta.tables._

spark.range(5).write.format("delta").save("/tmp/delta-table")
val delta = DeltaTable.forPath(spark, "/tmp/delta-table")
delta.delete("id < 2")

Подходит для incremental ETL и time travel (VERSION AS OF).


Интеграция с оркестраторами

Airflow DAG
├── extract (Spark job 1)
├── transform (Spark job 2)
└── load to warehouse

Spark job — отдельный процесс spark-submit, не внутри Play controller. Результаты — в warehouse; API читает агрегаты через JDBC.


Расширенный FAQ

SparkSession thread-safe? Да, одна session на JVM driver; не создавайте session per request.

PySpark UDF медленный — почему? Сериализация Python ↔ JVM; предпочитайте built-in functions Spark SQL.

Как передать параметры job? spark-submit --conf или аргументы main(args).

Structured Streaming и batch — один API? Да; streaming — непрерывный micro-batch; для веб-realtime — Phoenix.


Связанные материалы

ТемаМатериал
Раздел ScalaScala — о разделе
Big data теорияанализ данных — о разделе
Realtime вебPhoenix
Следующий шаг

Изучите Delta Lake и оркестрацию (Airflow). Для сервисного слоя вернитесь к Play или Akka.



Практикум — шаг 8: window functions

import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy($"region").orderBy($"date")
df.withColumn("running_total", sum($"amount").over(w))

Практикум — шаг 9: cache и persist

val cached = df.filter($"amount" > 0).cache()
cached.count() // materialize
cached.groupBy($"region").count().show()
cached.unpersist()
УровеньКогда
MEMORY_ONLYПовторные action на том же DF
DISK_ONLYНе влезает в RAM
MEMORY_AND_DISKГибрид

Практикум — шаг 10: join strategies

sales.join(broadcast(regions), "region_id")
JoinРиск
broadcastМала правая таблица
sort-mergeshuffle обе стороны
skewsalting ключа

Практикум — шаг 11: AQE

spark.sql.adaptive.enabled = true
spark.sql.adaptive.coalescePartitions.enabled = true

Воркшоп ETL (60 мин)

МинЗадача
0–10sales.csv + schema
10–25filter + groupBy
25–35SQL temp view
35–45parquet write
45–55unit test
55–60spark-submit

Troubleshooting Spark

СимптомРешение
OOM driverno collect, write file
Много файловcoalesce
Skewsalting, AQE
inferSchemaявная schema

FAQ Spark

Spark заменяет БД? — нет, вычисляет и пишет результат.

Hadoop обязателен? — S3 + K8s часто достаточно.

Dataset vs DataFrame? — Dataset типизирован в Scala.

Spark Connect? — thin client без fat driver.

Streaming? — Structured Streaming micro-batch.



Дополнительные сценарии (Spark)

Сценарий A: local job от CSV до Parquet

# sales.csv в корне
sbt run
ls output/by_region/

Сценарий B: Spark UI

Откройте http://localhost:4040 во время show() — найдите stage с shuffle.

Сценарий C: явная schema

Замените inferSchema на StructType — сравните printSchema до и после.

Сценарий D: broadcast join

Малый regions.csv + большой sales.csv — включите broadcast(regions).

Сценарий E: unit-тест filter

SalesReportSpecfilter(amount > 0) оставляет 4 строки из 5 в sample.


Упражнения — контрольная точка

  1. Агрегат по month + region.
  2. Запись CSV и Parquet — сравните размер файлов.
  3. SQL-версия того же отчёта.
  4. RDD count ERROR и DataFrame filter.
  5. Схема развёртывания Play + Akka + Spark в компании.
Содержание