摘要:本文介绍如何使用Actor 和 Stream 来处理 Akka Http WebSocket消息。该实现具有如下特性:
- 一个 Actor 可以处理多个 WebSocket
- 客户端终止 WebSocket,服务端可以收到通知
- 更加容易集群化,并且使用 Actor Router 进行负载平衡
Introduction
我们主要参考如下两篇文章,其中第二篇个人感觉更好一点,前一个会有内存溢出问题。
Theory
基本思路如下图:
- Source 处理从 Actor 向 Client 发送的消息
- Sink 处理从 Client 向 Actor 发送的消息
1 | --> Sink --> |
Implementation 实现
分为4步
Sink Source Flow
1 | def chatActorFlow(connectionId: UUID) : Flow[String, String, Any] = { |
这段代码是整个实现的核心,通过定义 Flow
,连接了客户端和服务端完成基本的消息传递。
Source.actorRef
首先我们理解一下: Source.actorRef(16, OverflowStrategy.fail)
,我们参考 Source.actorRef
的官方 API 文档 (该文档位于 Companion object Source
):
Creates a Source that is materialized as an akka.actor.ActorRef.
我们可以发现,这个方法实际上是创建了一个 Source
,这个 Source
的特点就是将会被 Materialized 为 akka.actor.ActorRef。
Messages sent to this actor will be emitted to the stream if there is demand from downstream, otherwise they will be buffered until request for demand is received.
我们通过这种方法创建的 Source
的功能就是,如果向其中的 Actor 发送消息,这个消息将会被释放到 stream 中。所以这样一来,Actor 就自然的和 Stream 关联在了一起,我们也可以从其实现源码探知真相:
1 | Source |
mapMaterializedValue
下面一个重要操作是 mapMaterializedValue
, 这个方法目标很简单就是把一个 Stream
立即 Materialize
,对于我们上面创建的Source
,物质化后就是一个 ActorRef
,我们就可以针对这个 ActorRef
进行业务逻辑的操控了。
注:什么是 Materialization
?简单的说就是把蓝图实现了,如果我们把Flow 的设计看做是一个水管管道的设计,那么物质化的过程就是打开所有的阀门和开源,水将会从源头流经整个管道系统。
It is important to remember that even after constructing the
RunnableGraph
by connecting all the source, sink and different operators, no data will flow through it until it is materialized. Materialization is the process of allocating all resources needed to run the computation described by a Graph (in Akka Streams this will often involve starting up Actors).
ActorRef
拿到了和客户端 Stream
建立桥梁的 ActorRef
,我们如何使用呢?
1 | actor : ActorRef => { |
这里只做了一件事,就是基于该 ActorRef
和连接的ID
构造 Protocol.OpenConnection
消息,把该消息发送个 chatRef
。后面这个 Actor
可以看做是所有的 WebSocket
的控制者。下面的章节我们会介绍这个动作的用途。
Sink from the message
前面创建的 Source
已经解决了如何连接 WebSocket
客户端,并通过 Actor
向其发送消息,那么如何接受来自客户端的消息呢?这里创建了 Sink
。
1 | val sink = Flow[String] |
该 Flow 是传递 String 类型的,表示我们只能用它处理 String 消息,然后消息字符串会被重新打包为一个 Protocol.SignedMessage
,这个结构包含了 connectionId
,从而能够确定该消息是哪个 WebSocket
连接发送。
Sink.actorRef
Sends the elements of the stream to the given ActorRef.
这个方法对和 Source.actorRef
对应起来的:Stream
收到的消息将会转发给 ActorRef
。
我们可以用下面这张图概括一下这两个 actorRef
的作用。
1 | Client <-- Stream <-- Source.actorRef.mapMaterializedValue{ar=>???} <-- Actor:ar |
Flow map and to
def mapT ⇒ T): Repr[T]
Transform this stream by applying the given function to each of the elements as they pass through this processing step.
Adheres to the ActorAttributes.SupervisionStrategy attribute.
Emits when the mapping function returns an element
Backpressures when downstream backpressures
Completes when upstream completes
Cancels when downstream cancelsdef toMat2: Sink[In, Mat]
Connect this Flow to a Sink, concatenating the processing steps of both.
1 | +------------------------------+ |
The materialized value of the combined Sink will be the materialized value of the current flow (ignoring the given Sink’s value), use toMat if a different strategy is needed.
综上可以,这个定义就是将 String
类型的 Flow
,将其连接到一个 Sink
,而这个 Sink
的功能就是将 Stream
上流动的 String
消息传递给 Actor
,而这个 Actor
正是我们前面提到的所有 WebSocket
连接的控制者。
def actorRef[T](ref: ActorRef, onCompleteMessage: Any)
的第二个参数表示:
If the target actor terminates the stream will be canceled. When the stream is completed successfully the given onCompleteMessage will be sent to the destination actor. When the stream is completed with failure a akka.actor.Status.Failure message will be sent to the destination actor.
所以一旦当前的 WebSocket
被终止,其 Stream
就会被 Complete
,这时候这个消息就会传递给 Actor
,这样控制者就有能力来处理 WebSocket
被终止后的善后工作,比如清理相关的数据。
Flow.fromSinkAndSource(sink, source)
组合 Sink
和 Source
,创建 Flow
。
def fromSinkAndSourceI, O: Flow[I, O, NotUsed]
Creates a Flow from a Sink and a Source where the Flow’s input will be sent to the Sink and the Flow’s output will come from the Source.
The resulting flow can be visualized as:
1 | +----------------------------------------------+ |
The completion of the Sink and Source sides of a Flow constructed using this method are independent. So if the Sink receives a completion signal, the Source side will remain unaware of that. If you are looking to couple the termination signals of the two sides use Flow.fromSinkAndSourceCoupled instead.
我们在看这个文档的时候一定要注意,图中的 I
和 O
,也就是 In
和 Out
在我们这个场景中都对应这 WebSocket
的客户端,因为 WebSocket
是双向的,所以万万要注意这一点。至于为什么这个 Flow
能够为双向的 WebSocket
服务,我们在 Akka Http WebSocket
的介绍中会详细介绍其原理。
Message Handle Flow
1 | def websocketFlow: Flow[Message, Message, Any] = |
前面介绍了通过 Sink
和 Source
构造了一个 Flow
, 这个 Flow
已经具有以下能力:
- Sink:从 Stream 获取 String 类型的数据,传递给总操控者 Actor (chatActerRef)
- Source:向某个特定的 Actor(ar) 发送消息,消息可以传递给 Stream
- Stream 被终止或取消时,将会有特殊消息(Protocol.CloseConnection)发送给总操控者 Actor (chatActerRef)
- 特定的 Actor(ar) 创建的时候,将会有特殊消息(Protocol.OpenConnection)发送给总操控者 Actor (chatActerRef)
这里我们利用 via
关键字,将处理 WebSocket
消息的 Flow
进行连接,从而创建出我们最终用于处理 WebSocket
的 Flow
def viaT, Mat2: Repr[T]
Transform this Flow by appending the given processing steps.
1 | +---------------------------------+ |
The materialized value of the combined Flow will be the materialized value of the current flow (ignoring the other Flow’s value), use viaMat if a different strategy is needed.
Akka Http WebSocket Message type
通过这个 Flow 的定义,我们可以看到,其 In 和 Out 的类型都是 Message
,这是 Akka HTTP Scala DSL
描述 WebSocket
消息的抽象模型,并且包含两种类型: TextMessage
和 BinaryMessage
。这里我们处理简单的 TextMessage
,收到每个消息后使用 mapAsync
进行处理,处理方式很简单,直接把数据传入 downstream。
def mapAsyncT(f: (Out) ⇒ Future[T]): Repr[T]
Transform this stream by applying the given function to each of the elements as they pass through this processing step. The function returns a Future and the value of that future will be emitted downstream. The number of Futures that shall run in parallel is given as the first argument to mapAsync. These Futures may complete in any order, but the elements that are emitted downstream are in the same order as received from upstream.
If the function f throws an exception or if the Future is completed with failure and the supervision decision is akka.stream.Supervision.Stop the stream will be completed with failure.
If the function f throws an exception or if the Future is completed with failure and the supervision decision is akka.stream.Supervision.Resume or akka.stream.Supervision.Restart the element is dropped and the stream continues.
The function f is always invoked on the elements in the order they arrive.
Adheres to the ActorAttributes.SupervisionStrategy attribute.
Emits when the Future returned by the provided function finishes for the next element in sequence
Backpressures when the number of futures reaches the configured parallelism and the downstream backpressures or the first future is not completedCompletes when upstream completes and all futures have been completed and all elements have been emitted
Cancels when downstream cancels
Message Handle Actor
1 | object Protocol { |
集中处理 WebSocket
的 Actor 设计十分简单,首先定义一组协议消息,也就是 Actor 可以处理的消息,然后在分别实现处理代码,这里注意以下几点。
Maintain connection ID and ctorRef
由于我们在一个 Actor
中处理多个 Connection
, 所以我们需要记录每个 Connection
的 ID 和对应的 Actor
的关系,这里使用一个 Map
来存储这个关系。
1 | def withClients(clients: Map[UUID, ActorRef]) |
并且在每次收到 OpenConnection
和 CloseConnection
的时候更新该 Map
数据结构。
Handle receive message
1 | case SignedMessage(uuid, msg) => clients.collect{ |
当我们收到某个 uuid 发送的消息后,我们可以通过维护的 Map 找到对应的 Actor
,从而向该 Connection
进行反馈。
Akka Http Route with Flow
1 | val route = get { |
外事具备只欠通风,我们只需要使用前面创建好的 Flow,然后利用 Akka HTTP
提供的工具就可以实现 WebSocket
的创建。
后面我们会专门解读 Akka HTTP WebSocket
支持的文档。