Перейти к основному содержимому

Akka — основы

Разработчику Архитектору

Дальше: Apache Spark на Scala · Play Framework · Phoenix (BEAM)


Akka — основы

Akka реализует модель акторов на JVM: изолированные сущности обмениваются неизменяемыми сообщениями, состояние между акторами через общую память не передаётся. Это фундамент отказоустойчивых систем в Scala — от микросервисов до стриминга данных.

Практикум идёт по шагам — зависимости sbt, первый typed-актор, ActorSystem, supervision, счётчик с функциональным состоянием, worker pool и краткий обзор Akka Streams.

Примечание. Коммерческая ветка Akka с 2022 года изменила лицензию; open-source форк Apache Pekko сохраняет совместимый API. В учебных примерах ниже — классические концепции Akka/Pekko.

ШагТемаЗачем
1Зависимости sbtПодключить akka-actor-typed
2Greeter-акторОтправить и получить сообщение
3ActorSystemКорень иерархии и shutdown
4SupervisionПерезапуск при ошибке
5CounterСостояние без var в heap
6Worker poolРаспределение задач
7StreamsPipeline с backpressure
МатериалЗачем
Типы и pattern matchingsealed trait, case class
Архитектура JVM-приложенийПотоки, heap, GC
Play FrameworkHTTP-слой поверх JVM
Elixir — о разделеOTP и BEAM — родственная идея
PhoenixВеб на процессах BEAM

Навигация по экосистеме Scala

Akka и Elixir OTP

И Akka, и BEAM решают concurrency через сообщения. На JVM процессы тяжелее, чем на BEAM, зато Scala даёт статическую типизацию протокола сообщений. Сравните с Phoenix после этого практикума.


Термины перед стартом

ТерминКратко
Актор (actor)Изолированная сущность с очередью сообщений (mailbox) и поведением
ActorSystemКорень дерева акторов, пул потоков, конфигурация, lifecycle
BehaviorФункция (Context, Message) => Behavior — как актор реагирует
SupervisionРодитель решает, перезапустить ли упавшего потомка
MailboxОчередь входящих сообщений; обработка по одному за раз
Ask (?)Запрос с ожиданием ответа (Future); внутри актора использовать осторожно

Шаг 1 — проект и зависимости

Создайте каталог akka-hello и файл build.sbt:

name := "akka-hello"
version := "0.1"
scalaVersion := "2.13.14"

val akkaVersion = "2.8.5"

libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"org.scalatest" %% "scalatest" % "3.2.19" % Test
)

Для Apache Pekko замените groupId на org.apache.pekko и artifact pekko-*.

sbt run

sbt — тот же сборщик, что и в Play: run ищет объект с App или метод main.


Шаг 2 — первый актор (Akka Typed)

Typed API фиксирует протокол сообщений на уровне типов — опечатка в ! ловится компилятором.

// src/main/scala/Greeter.scala
import akka.actor.typed._
import akka.actor.typed.scaladsl._

object Greeter {
sealed trait Command
case class Greet(name: String, replyTo: ActorRef[Greeted]) extends Command
case class Greeted(from: String, message: String)

def apply(): Behavior[Command] =
Behaviors.receive { (context, message) =>
message match {
case Greet(name, replyTo) =>
context.log.info(s"Hello, $name")
replyTo ! Greeted("greeter", s"Hello, $name")
Behaviors.same
}
}
}

Разбор:

  • sealed trait Command — закрытый набор сообщений для этого актора.
  • replyTo ! Greeted(...) — оператор ! отправляет сообщение в mailbox другого актора.
  • Behaviors.same — поведение без смены состояния.

Шаг 3 — ActorSystem и запуск

// src/main/scala/Main.scala
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, ActorSystem}
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._

object Main extends App {
implicit val system: ActorSystem[Greeter.Command] =
ActorSystem(Greeter(), "hello-system")
implicit val ec = system.executionContext
implicit val timeout: Timeout = 3.seconds

import Greeter._

val greeter: ActorRef[Command] = system.ref

val result = greeter.ask(Greet("Scala", _))
println(Await.result(result, timeout.duration))

system.terminate()
}
  • ActorSystem(Greeter(), "hello-system") — поднимает корневой актор и потоки.
  • ask — pattern request-response; возвращает Future[Greeted].
  • system.terminate() — корректное завершение (важно в тестах и prod).
Блокировка внутри актора

Не вызывайте Await.result и Thread.sleep внутри Behaviors.receive — mailbox других сообщений будет голодать. ask снаружи актора допустим для учебного main.


Шаг 4 — иерархия и supervision

import akka.actor.typed.SupervisorStrategy

val supervisedGreeter = Behaviors.supervise(Greeter())
.onFailure[Exception](SupervisorStrategy.restart)

Стратегии:

СтратегияПоведение
restartНовый актор, mailbox может быть сброшен
resumeПродолжить после ошибки
stopОстановить актор
escalateПередать ошибку родителю

Это реализация let it crash — ошибки локализуются, система восстанавливается на уровне дерева акторов. На BEAM ту же философию несёт OTP — см. Elixir intro.


Шаг 5 — счётчик с функциональным состоянием

Состояние хранится в параметре Behavior, без var в shared memory:

object Counter {
sealed trait Command
case object Increment extends Command
case class Get(replyTo: ActorRef[Int]) extends Command

def apply(): Behavior[Command] = counter(0)

def counter(n: Int): Behavior[Command] =
Behaviors.receiveMessage {
case Increment =>
counter(n + 1)
case Get(replyTo) =>
replyTo ! n
Behaviors.same
}
}

Проверка из Main:

val counter = system.systemActorOf(Counter(), "counter")
counter ! Counter.Increment
counter ! Counter.Increment
val value = counter.ask(Counter.Get(_))
println(Await.result(value, timeout.duration)) // 2

Каждый Increment возвращает новое поведение counter(n + 1) — immutability на уровне модели.


Шаг 6 — worker pool (полный пример)

object WorkerPool {
sealed trait Command
case class Task(id: Int, payload: String)
case class Result(id: Int, output: String)
case class Submit(task: Task, replyTo: ActorRef[Result]) extends Command

def router(workers: Vector[ActorRef[Task]]): Behavior[Command] =
Behaviors.setup { context =>
var idx = 0
Behaviors.receiveMessage {
case Submit(task, replyTo) =>
val worker = workers(idx)
idx = (idx + 1) % workers.size
worker ! task
// упрощение: worker шлёт Result напрямую replyTo
Behaviors.same
}
}

object Worker {
def apply(): Behavior[Task] =
Behaviors.receiveMessage { task =>
// имитация работы
Behaviors.same
}
}
}

Схема:

Router
/ | \
W1 W2 W3 ← workers обрабатывают Task
  • Router принимает Submit и распределяет round-robin.
  • Worker выполняет задачу, шлёт Result клиенту.
  • При падении worker — supervisor перезапускает только его ветку.

Шаг 7 — Akka Streams

Для pipeline данных между акторами и внешними системами:

import akka.stream.scaladsl._
import akka.NotUsed

object StreamDemo extends App {
implicit val system = ActorSystem(Behaviors.empty, "streams")
implicit val ec = system.executionContext

val source: Source[Int, NotUsed] = Source(1 to 100)
val flow = Flow[Int].map(_ * 2).filter(_ % 3 == 0)
val sink = Sink.foreach(println)

source.via(flow).runWith(sink)
system.terminate()
}
  • Source — производитель данных.
  • Flow — трансформация с backpressure.
  • Sink — потребитель.

Streams интегрируются с Kafka, HTTP и Spark через коннекторы (Alpakka).


Конфигурация application.conf

Положите файл в src/main/resources/application.conf:

akka {
loglevel = "INFO"
actor {
default-dispatcher {
fork-join-executor {
parallelism-min = 2
parallelism-max = 8
}
}
}
}

Пулы потоков и mailbox настраиваются без изменения кода акторов.


Сравнение Akka и Elixir OTP

АспектAkka (JVM)Elixir (BEAM)
ЕдиницаАктор на JVM heapЛёгкий процесс
ИзоляцияЛогическая, heap общийHeap per process
Hot code reloadОграниченоСильная сторона OTP
ТипизацияScala typesDynamic + @spec
Веб-слойPlay, Akka HTTPPhoenix

Оба стека решают concurrency через сообщения.


Частые ошибки

ОшибкаПоследствиеЧто сделать
Await внутри актораStarvation mailboxВынести в Future, ответ — сообщением
Shared mutable stateRace conditionsТолько сообщения между акторами
Слишком мелкие акторыOverhead сообщенийГруппировать связанную логику
Игнор supervisionПадение всего процессаBehaviors.supervise на границах
ask без timeoutЗависший FutureВсегда задавать Timeout

Правило: внутри актора — только быстрая логика; тяжёлую работу выносите в Future с возвратом результата сообщением.


Production-сценарии

СценарийКомпонент Akka
REST поверх акторовAkka HTTP / Pekko HTTP
Шардирование stateCluster Sharding
Очереди событийAkka Streams + Kafka
ПersistenceEvent Sourcing, Akka Persistence

HTTP для пользователей часто остаётся в Play; внутренние сервисы — на акторах. Batch-аналитика — Spark.

Production

Мониторинг: Kamon, Micrometer. Cluster — минимум три узла для quorum. Логируйте correlation id в сообщениях. Тестируйте supervision через TestKit / ActorTestKit.


Упражнения

  1. Добавьте актор Logger, которому Greeter шлёт копию каждого приветствия.
  2. Реализуйте Decrement и Set(n) в Counter.
  3. Напишите тест: после трёх Increment Get возвращает 3.
  4. Постройте Stream из файла построчно (FileIO.fromPath) и посчитайте строки с ERROR.
  5. Прочитайте Phoenix Endpoint и выпишите три параллели с ActorSystem.

FAQ

Актор — это поток ОС? Нет. Актор — сущность приложения с mailbox; на пул потоков dispatcher мапит множество акторов.

Typed или Classic API? Для новых проектов — Typed (akka-actor-typed). Classic (akka-actor) встречается в legacy.

Pekko или Akka? Pekko — Apache fork с открытой лицензией; API почти совпадает. Проверьте политику компании.

Когда нужен Spark, а когда Akka? Spark — распределённые SQL/ETL на кластере. Akka — long-running сервисы, стримы событий, stateful actors.


Полный worker pool — код

// Worker.scala
object Worker {
sealed trait Command
case class Work(taskId: Int, data: String, replyTo: ActorRef[WorkResult]) extends Command
case class WorkResult(taskId: Int, workerName: String, output: String)

def apply(name: String): Behavior[Command] =
Behaviors.receive { (context, msg) =>
msg match {
case Work(taskId, data, replyTo) =>
context.log.info(s"$name processing task $taskId")
replyTo ! WorkResult(taskId, name, data.toUpperCase)
Behaviors.same
}
}
}

// Router.scala
object Router {
sealed trait Command
case class Submit(taskId: Int, data: String, replyTo: ActorRef[Worker.WorkResult]) extends Command

def apply(workerCount: Int): Behavior[Command] =
Behaviors.setup { context =>
val workers = (1 to workerCount).map { i =>
context.spawn(Worker(s"worker-$i"), s"worker-$i")
}.toVector

var idx = 0
Behaviors.receiveMessage {
case Submit(taskId, data, replyTo) =>
val worker = workers(idx)
idx = (idx + 1) % workers.size
worker ! Worker.Work(taskId, data, replyTo)
Behaviors.same
}
}
}

Запуск из Main:

val router = system.systemActorOf(Router(3), "router")
val f1 = router.ask(Router.Submit(1, "hello", _))
val f2 = router.ask(Router.Submit(2, "akka", _))
println(Await.result(f1, timeout.duration))
println(Await.result(f2, timeout.duration))

Каждый worker — отдельный актор с mailbox; router не хранит shared var state кроме индекса round-robin внутри одного актора (безопасно).


Тестирование с ActorTestKit

// test/scala/GreeterSpec.scala
import org.scalatest.wordspec.AnyWordSpec
import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.actor.testkit.typed.scaladsl.TestProbe
import Greeter._

class GreeterSpec extends AnyWordSpec {
val testKit = ActorTestKit()

"Greeter" should {
"reply with Greeted" in {
val probe = testKit.createTestProbe[Greeted]()
val greeter = testKit.spawn(Greeter())
greeter ! Greet("Test", probe.ref)
probe.expectMessage(Greeted("greeter", "Hello, Test"))
}
}

testKit.shutdownTestKit()
}
  • TestProbe — тестовый актор с очередью ожидания сообщений.
  • expectMessage — таймаут и падение теста при несовпадении.

Ask pattern и back-pressure

Когда клиенту нужен ответ, используют ask:

import akka.actor.typed.scaladsl.AskPattern._
import akka.util.Timeout

implicit val timeout: Timeout = 3.seconds
val future: Future[Int] = counter.ask(Counter.Get(_))

Внутри production-актора избегайте ask на себя — deadlock. Предпочитайте fire-and-forget ! и отдельный reply channel.

Для потоков высокого объёма — Akka Streams с explicit backpressure вместо unbounded mailbox.


Cluster и remoting (обзор)

akka {
actor.provider = cluster
remote.artery {
canonical.hostname = "127.0.0.1"
canonical.port = 2551
}
cluster.seed-nodes = ["akka://HelloSystem@127.0.0.1:2551"]
}
ClusterSharding(system).init(
Entity(typeKey = TypeKey) { entityContext =>
Counter(entityContext.entityId)
}
)

Cluster Sharding распределяет акторы по узлам по ключу entity — паттерн для миллионов session/stateful entities. HTTP-фасад — Akka HTTP или Play.


Связь с Spark и Play

СлойТехнологияПример
HTTP APIPlayREST для пользователей
Event processingAkka StreamsKafka → transform → sink
Batch analyticsSpark213.md nightly reports
Realtime UIPhoenix на BEAM104

Akka связывает online-сервисы; Spark — offline. Не запускайте SparkSession внутри актора.


Расширенный FAQ

Сколько акторов на одном JVM? Миллионы лёгких акторов возможны; ограничение — память mailbox и GC. Профилируйте.

Чем Typed лучше Classic? Компилятор проверяет протокол; меньше ActorRef[Any] и runtime ClassCast.

Pekko migration? Замена пакетов akka.*org.apache.pekko.* и зависимостей; скрипты есть в документации Pekko.

Akka Persistence? Event sourcing + snapshots для durable state; сложнее Counter, но нужен для banking-grade.


Связанные материалы

ТемаМатериал
HTTP на ScalaPlay Framework — первая программа
Big dataApache Spark на Scala
BEAM и PhoenixPhoenix, Elixir intro
Архитектура JVMАрхитектура JVM-приложений
Раздел ScalaScala — о разделе
Следующий шаг

Для batch-обработки больших таблиц переходите к Apache Spark. Для сравнения с BEAM — Phoenix.



Практикум — шаг 8: Greeter + TestProbe

class GreeterSpec extends AnyWordSpec {
val kit = ActorTestKit()
"Greeter" should {
"reply" in {
val probe = kit.createTestProbe[Greeter.Greeted]()
kit.spawn(Greeter()) ! Greeter.Greet("T", probe.ref)
probe.expectMessage(Greeter.Greeted("greeter", "Hello, T"))
}
}
kit.shutdownTestKit()
}

Практикум — шаг 9: TimerScheduler

Behaviors.withTimers { timers =>
timers.startSingleTimer(Tick, 5.seconds)
Behaviors.receiveMessage { case Tick => Behaviors.same }
}

Практикум — шаг 10: FileIO Stream

FileIO.fromPath(path)
.via(Framing.delimiter(ByteString("\n"), 4096))
.map(_.utf8String)
.filter(_.contains("ERROR"))
.runWith(Sink.fold(0)((n, _) => n + 1))

Воркшоп worker pool (45 мин)

МинЗадача
0–10Worker + Router
10–20ask Submit
20–30Supervisor
30–40Тест
40–45Stream ERROR count

Troubleshooting (расширенный)

СимптомРешение
Dead letterlifecycle / stop
ask timeoutTimeout, replyTo
OOM mailboxStreams backpressure
SerializationCBOR / protobuf

FAQ дополнительный

spawn vs systemActorOf? — дочерний vs system level.

Cluster на одной JVM? — не нужен.

Akka HTTP vs Play? — лёгкий REST vs полный стек.

Pipe pattern? — Future → сообщение себе.

Pekko? — замена groupId на org.apache.pekko.



Дополнительные практические сценарии (Akka)

Сценарий A: минимальный typed проект

mkdir akka-lab && cd akka-lab
# создайте build.sbt из шага 1 статьи
sbt run

Сценарий B: счётчик с ask

val c = system.systemActorOf(Counter(), "c")
c ! Counter.Increment
c ! Counter.Increment
val v = c.ask(Counter.Get(_))

Ожидание: 2 после двух Increment.

Сценарий C: supervision при падении

Behaviors.supervise(Worker()).onFailure[Exception](SupervisorStrategy.restart)

Отправьте Task, вызывающий exception — worker перезапустится.

Сценарий D: stream подсчёта ERROR

Создайте app.log с 3 строками ERROR, запустите FileIO pipeline — вывод 3.

Сценарий E: сравнение с BEAM

ИдеяAkkaElixir
Изоляцияmailboxprocess heap
RestartsupervisorSupervisor OTP
ВебPlay / Akka HTTPPhoenix

FAQ — эксплуатация Akka

Как graceful shutdown? CoordinatedShutdown и system.terminate().

Как тестировать без sleep? TestProbe, expectMessage, не Thread.sleep.

Когда Cluster Sharding? Миллионы session/state entity по ключу.


Упражнения — контрольная точка

  1. Logger-актор получает копию каждого Greet.
  2. Stream: слова длиннее 5 символов из файла.
  3. Timer: Tick каждые 2 секунды, 5 раз, затем stop.
  4. Тест supervision restart.
  5. Таблица параллелей с OTP из Elixir intro.
Содержание