Akka Http WebSocket with Stream and Actor 最佳实践

摘要:本文介绍如何使用Actor 和 Stream 来处理 Akka Http WebSocket消息。该实现具有如下特性:

  1. 一个 Actor 可以处理多个 WebSocket
  2. 客户端终止 WebSocket,服务端可以收到通知
  3. 更加容易集群化,并且使用 Actor Router 进行负载平衡

Introduction

我们主要参考如下两篇文章,其中第二篇个人感觉更好一点,前一个会有内存溢出问题。

  1. Akka Http, handle Websockets with Actors
  2. Answer on StackOverflow

Theory

基本思路如下图:

  1. Source 处理从 Actor 向 Client 发送的消息
  2. Sink 处理从 Client 向 Actor 发送的消息
1
2
3
                      --> Sink    -->
Client | | Actor
<-- Source <--

Implementation 实现

分为4步

Sink Source Flow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def chatActorFlow(connectionId: UUID) : Flow[String, String, Any] = {

val sink = Flow[String]
.map(msg => Protocol.SignedMessage(connectionId, msg))
.to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId)))

val source = Source.actorRef(16, OverflowStrategy.fail)
.mapMaterializedValue {
actor : ActorRef => {
chatRef ! Protocol.OpenConnection(actor, connectionId)
}
}

Flow.fromSinkAndSource(sink, source)
}

这段代码是整个实现的核心,通过定义 Flow,连接了客户端和服务端完成基本的消息传递。

Source.actorRef

首先我们理解一下: Source.actorRef(16, OverflowStrategy.fail),我们参考 Source.actorRef官方 API 文档 (该文档位于 Companion object Source):

Creates a Source that is materialized as an akka.actor.ActorRef.

我们可以发现,这个方法实际上是创建了一个 Source,这个 Source 的特点就是将会被 Materialized 为 akka.actor.ActorRef

Messages sent to this actor will be emitted to the stream if there is demand from downstream, otherwise they will be buffered until request for demand is received.

我们通过这种方法创建的 Source的功能就是,如果向其中的 Actor 发送消息,这个消息将会被释放到 stream 中。所以这样一来,Actor 就自然的和 Stream 关联在了一起,我们也可以从其实现源码探知真相:

1
2
3
Source
.fromGraph(new ActorRefSource(bufferSize, overflowStrategy, completionMatcher, failureMatcher))
.withAttributes(DefaultAttributes.actorRefSource)

mapMaterializedValue

下面一个重要操作是 mapMaterializedValue, 这个方法目标很简单就是把一个 Stream 立即 Materialize,对于我们上面创建的Source,物质化后就是一个 ActorRef,我们就可以针对这个 ActorRef 进行业务逻辑的操控了。

注:什么是 Materialization?简单的说就是把蓝图实现了,如果我们把Flow 的设计看做是一个水管管道的设计,那么物质化的过程就是打开所有的阀门和开源,水将会从源头流经整个管道系统。

It is important to remember that even after constructing the RunnableGraph by connecting all the source, sink and different operators, no data will flow through it until it is materialized. Materialization is the process of allocating all resources needed to run the computation described by a Graph (in Akka Streams this will often involve starting up Actors).

ActorRef

拿到了和客户端 Stream 建立桥梁的 ActorRef,我们如何使用呢?

1
2
3
actor : ActorRef => {
chatRef ! Protocol.OpenConnection(actor, connectionId)
}

这里只做了一件事,就是基于该 ActorRef 和连接的ID构造 Protocol.OpenConnection 消息,把该消息发送个 chatRef。后面这个 Actor 可以看做是所有的 WebSocket的控制者。下面的章节我们会介绍这个动作的用途。

Sink from the message

前面创建的 Source 已经解决了如何连接 WebSocket 客户端,并通过 Actor 向其发送消息,那么如何接受来自客户端的消息呢?这里创建了 Sink

1
2
3
val sink = Flow[String]
.map(msg => Protocol.SignedMessage(connectionId, msg))
.to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId)))

该 Flow 是传递 String 类型的,表示我们只能用它处理 String 消息,然后消息字符串会被重新打包为一个 Protocol.SignedMessage,这个结构包含了 connectionId,从而能够确定该消息是哪个 WebSocket 连接发送。

Sink.actorRef

Sends the elements of the stream to the given ActorRef.

这个方法对和 Source.actorRef 对应起来的:Stream 收到的消息将会转发给 ActorRef

我们可以用下面这张图概括一下这两个 actorRef 的作用。

1
2
Client <-- Stream <-- Source.actorRef.mapMaterializedValue{ar=>???} <-- Actor:ar
Client --> Stream --> Sink.actorRef(ar) --> Actor:ar

Flow map and to

def mapT ⇒ T): Repr[T]

Transform this stream by applying the given function to each of the elements as they pass through this processing step.
Adheres to the ActorAttributes.SupervisionStrategy attribute.
Emits when the mapping function returns an element
Backpressures when downstream backpressures
Completes when upstream completes
Cancels when downstream cancels

def toMat2: Sink[In, Mat]

Connect this Flow to a Sink, concatenating the processing steps of both.

1
2
3
4
5
6
7
8
9
          +------------------------------+
| Resulting Sink[In, Mat] |
| |
| +------+ +------+ |
| | | | | |
In ~~> | flow | ~~Out~~> | sink | |
| | Mat| | M| |
| +------+ +------+ |
+------------------------------+

The materialized value of the combined Sink will be the materialized value of the current flow (ignoring the given Sink’s value), use toMat if a different strategy is needed.

综上可以,这个定义就是将 String 类型的 Flow,将其连接到一个 Sink,而这个 Sink 的功能就是将 Stream 上流动的 String 消息传递给 Actor,而这个 Actor 正是我们前面提到的所有 WebSocket 连接的控制者。

def actorRef[T](ref: ActorRef, onCompleteMessage: Any) 的第二个参数表示:

If the target actor terminates the stream will be canceled. When the stream is completed successfully the given onCompleteMessage will be sent to the destination actor. When the stream is completed with failure a akka.actor.Status.Failure message will be sent to the destination actor.

所以一旦当前的 WebSocket 被终止,其 Stream 就会被 Complete,这时候这个消息就会传递给 Actor,这样控制者就有能力来处理 WebSocket被终止后的善后工作,比如清理相关的数据。

Flow.fromSinkAndSource(sink, source)

组合 SinkSource,创建 Flow

def fromSinkAndSourceI, O: Flow[I, O, NotUsed]

Creates a Flow from a Sink and a Source where the Flow’s input will be sent to the Sink and the Flow’s output will come from the Source.
The resulting flow can be visualized as:

1
2
3
4
5
6
7
8
9
          +----------------------------------------------+
| Resulting Flow[I, O, NotUsed] |
| |
| +---------+ +-----------+ |
| | | | | |
I ~~> | Sink[I] | [no-connection!] | Source[O] | ~~> O
| | | | | |
| +---------+ +-----------+ |
+----------------------------------------------+

The completion of the Sink and Source sides of a Flow constructed using this method are independent. So if the Sink receives a completion signal, the Source side will remain unaware of that. If you are looking to couple the termination signals of the two sides use Flow.fromSinkAndSourceCoupled instead.

我们在看这个文档的时候一定要注意,图中的 IO,也就是 InOut 在我们这个场景中都对应这 WebSocket 的客户端,因为 WebSocket 是双向的,所以万万要注意这一点。至于为什么这个 Flow 能够为双向的 WebSocket服务,我们在 Akka Http WebSocket 的介绍中会详细介绍其原理。

Message Handle Flow

1
2
3
4
5
6
7
8
def websocketFlow: Flow[Message, Message, Any] =
Flow[Message]
.mapAsync(1) {
case TextMessage.Strict(s) => Future.successful(s)
case TextMessage.Streamed(s) => s.runFold("")(_ + _)
case b: BinaryMessage => throw new Exception("Binary message cannot be handled")
}.via(chatActorFlow(UUID.randomUUID()))
.map(TextMessage(_))

前面介绍了通过 SinkSource 构造了一个 Flow, 这个 Flow 已经具有以下能力:

  1. Sink:从 Stream 获取 String 类型的数据,传递给总操控者 Actor (chatActerRef)
  2. Source:向某个特定的 Actor(ar) 发送消息,消息可以传递给 Stream
  3. Stream 被终止或取消时,将会有特殊消息(Protocol.CloseConnection)发送给总操控者 Actor (chatActerRef)
  4. 特定的 Actor(ar) 创建的时候,将会有特殊消息(Protocol.OpenConnection)发送给总操控者 Actor (chatActerRef)

这里我们利用 via 关键字,将处理 WebSocket 消息的 Flow 进行连接,从而创建出我们最终用于处理 WebSocketFlow

def viaT, Mat2: Repr[T]

Transform this Flow by appending the given processing steps.

1
2
3
4
5
6
7
8
9
          +---------------------------------+
| Resulting Flow[In, T, Mat] |
| |
| +------+ +------+ |
| | | | | |
In ~~> | this | ~~Out~~> | flow | ~~> T
| | Mat| | M| |
| +------+ +------+ |
+---------------------------------+

The materialized value of the combined Flow will be the materialized value of the current flow (ignoring the other Flow’s value), use viaMat if a different strategy is needed.

Akka Http WebSocket Message type

通过这个 Flow 的定义,我们可以看到,其 In 和 Out 的类型都是 Message,这是 Akka HTTP Scala DSL 描述 WebSocket 消息的抽象模型,并且包含两种类型: TextMessageBinaryMessage。这里我们处理简单的 TextMessage,收到每个消息后使用 mapAsync 进行处理,处理方式很简单,直接把数据传入 downstream。

def mapAsyncT(f: (Out) ⇒ Future[T]): Repr[T]

Transform this stream by applying the given function to each of the elements as they pass through this processing step. The function returns a Future and the value of that future will be emitted downstream. The number of Futures that shall run in parallel is given as the first argument to mapAsync. These Futures may complete in any order, but the elements that are emitted downstream are in the same order as received from upstream.
If the function f throws an exception or if the Future is completed with failure and the supervision decision is akka.stream.Supervision.Stop the stream will be completed with failure.
If the function f throws an exception or if the Future is completed with failure and the supervision decision is akka.stream.Supervision.Resume or akka.stream.Supervision.Restart the element is dropped and the stream continues.
The function f is always invoked on the elements in the order they arrive.
Adheres to the ActorAttributes.SupervisionStrategy attribute.
Emits when the Future returned by the provided function finishes for the next element in sequence
Backpressures when the number of futures reaches the configured parallelism and the downstream backpressures or the first future is not completed

Completes when upstream completes and all futures have been completed and all elements have been emitted

Cancels when downstream cancels

Message Handle Actor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
object Protocol {
case class SignedMessage(uuid: UUID, msg: String)
case class OpenConnection(actor: ActorRef, uuid: UUID)
case class CloseConnection(uuid: UUID)
}

class ChatRef extends Actor {
override def receive: Receive = withClients(Map.empty[UUID, ActorRef])

def withClients(clients: Map[UUID, ActorRef]): Receive = {
case SignedMessage(uuid, msg) => clients.collect{
case (id, ar) if id != uuid => ar ! msg
}
case OpenConnection(ar, uuid) if clients.size == maximumClients => ar ! PoisonPill
case OpenConnection(ar, uuid) => context.become(withClients(clients.updated(uuid, ar)))
case CloseConnection(uuid) => context.become(withClients(clients - uuid))
}
}

集中处理 WebSocket 的 Actor 设计十分简单,首先定义一组协议消息,也就是 Actor 可以处理的消息,然后在分别实现处理代码,这里注意以下几点。

Maintain connection ID and ctorRef

由于我们在一个 Actor 中处理多个 Connection, 所以我们需要记录每个 Connection 的 ID 和对应的 Actor 的关系,这里使用一个 Map 来存储这个关系。

1
def withClients(clients: Map[UUID, ActorRef])

并且在每次收到 OpenConnectionCloseConnection 的时候更新该 Map 数据结构。

Handle receive message

1
2
3
case SignedMessage(uuid, msg) => clients.collect{
case (id, ar) if id != uuid => ar ! msg
}

当我们收到某个 uuid 发送的消息后,我们可以通过维护的 Map 找到对应的 Actor,从而向该 Connection 进行反馈。

Akka Http Route with Flow

1
2
3
4
5
val route = get {
pathEndOrSingleSlash {
handleWebSocketMessages(websocketFlow)
}
}

外事具备只欠通风,我们只需要使用前面创建好的 Flow,然后利用 Akka HTTP 提供的工具就可以实现 WebSocket 的创建。

handleWebSocketMessages 文档

后面我们会专门解读 Akka HTTP WebSocket 支持的文档。

Ref

Source Code