Spring Boot 集成 Akka 并是实现异步请求

摘要:本文主要介绍两个问题,一个是 SpringAKKA 的集成,另一个是如何使用 Spring 异步请求。最后结合两点,设计一个简单的场景。

Spring 集成 AKKA

首先我们需要在 Spring 中集成 AKKA。目前有若干篇博文介绍该方法,基本思路都是一致的,就是通过 AKKAExtension 机制将 AKKAActor 注入到 SpringIoC 容器中。然后通过 依赖注入 来使用 Actor

参考博文:

Introduction to Spring with Akka:该文章比较详细的介绍了配置过程
Spring-boot-akka-part1 & part2:该文章分两部分,除了介绍配置过程,另外还介绍了使用其进行异步调用。

以上两篇文章都是在 Spring 的配置中,配置系统级别的 Actor,然后在 Client 代码中创建新的 Actor

另外一篇文章 AKKA Actor Dependency Injection Using Spring,虽然文章比较老,但是该文章完全自己实现了注入的过程,没有依赖于 Extension,同时其配置也将业务层的 Actor 一同配置。

本文集合上面三篇文章进行说明,使用 Extension 机制,同时配置业务层 Actor,然后在其他地方全部使用注入方式来访问 Actore

依赖注入简介

[Akka](http://akka.io/) 是一个基于 Actor 并发模型的功能强大的应用程序框架。这个框架是用 Scala 编写的,当然它也可以在基于 java 的应用程序中完全使用。因此,我们常常希望将Akka 集成到现有的基于 Spring 的应用程序中,或者简单地使用 Springbean 连接到 actor 中。

Spring/Akka 集成的问题在于 Spring 中的 bean 管理与 Akka 中的 actor 管理之间的差异: actor 具有与典型 Spring bean 生命周期不同的特定生命周期。
此外,参与者被分割为参与者本身(这是一个内部实现细节,Spring 无法管理)和参与者引用(可由客户端代码访问),以及可序列化和可移植的不同 Akka 运行时。

幸运的是,Akka 提供了一个机制,即 Akka 扩展 ,这使得使用外部依赖注入框架成为一项相当简单的任务。

maven 依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<properties>
<spring.version>4.3.1.RELEASE</spring.version>
<akka.version>2.5.14</akka.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>

<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>${akka.version}</version>
</dependency>

</dependencies>

如果使用 Spring Boot 则无需 spring-context 依赖。akka 的版本可以自行更新,如果有 breaking change 需要注意,可能本文不兼容。

ActorSystem 集成到 Spring ApplicationContext

Spring Been 注入到 Akka Actors

构造 SpringActorProducer

SpringActorProducer 的功能就是从 ApplicationContext 中以 Spring Bean 的形式通过名称来创建 actors

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class SpringActorProducer implements IndirectActorProducer {

private ApplicationContext applicationContext;

private String beanActorName;

public SpringActorProducer(ApplicationContext applicationContext,
String beanActorName) {
this.applicationContext = applicationContext;
this.beanActorName = beanActorName;
}

@Override
public Actor produce() {
return (Actor) applicationContext.getBean(beanActorName);
}

@Override
public Class<? extends Actor> actorClass() {
return (Class<? extends Actor>) applicationContext.getType(beanActorName);
}
}

IndirectActorProducerakka 提供的一个接口:

This interface defines a class of actor creation strategies deviating from the usual default of just reflectively instantiating the Actor subclass. It can be used to allow a dependency injection framework to determine the actual actor class and how it shall be instantiated.

这个接口定义了一个actor创建策略类,它偏离了通常的默认值,只是反射地实例化了 actor 子类。它可以用于允许依赖注入框架来确定实际的 actor 类以及如何实例化它。

其中 produce 方法用途:

This factory method must produce a fresh actor instance upon each invocation. It is not permitted to return the same instance more than once.

这个工厂方法必须在每次调用时生成一个新的actor实例。不允许多次返回同一个实例。

actorClass 方法用途:

This method is used by Props to determine the type of actor which will be created. This means that an instance of this IndirectActorProducer will be created in order to call this method during any call to Props.actorClass; it should be noted that such calls may performed during actor set-up before the actual actor’s instantiation, and that the instance created for calling actorClass is not necessarily reused later to produce the actor.

该方法被道具用来确定将要创建的参与者的类型。这意味着将创建这个IndirectActorProducer的实例,以便在调用Props.actorClass时调用该方法;应该注意的是,这样的调用可以在实际的actor实例化之前的actor设置期间执行,并且为调用actorClass而创建的实例并不一定要在以后重用以生成actor。

通俗的讲,该类是让 AKKA 知道在你的应用环境中如何创建 actor,由于我们使用 Spring,所以所有的 bean 都应该在 SpringContext 通过注册的名字获取。所以该类的构造函数注入了两个参数,一个是 ApplicationContext,另一个就是 beanActorName。然后实现接口并实现两个方法,一个方法是如何生成 Actor,根据 Spring 思想,就是通过 bean 的名称直接在 Context 进行查找。当然这里可以进一步扩展,比如 bean 有构造函数,需要传入参数,那么我们在多注入一个 Object.. args,然后通过名字加参数获取 bean 实例。(可以参考上面的文章 part2 的实例,代码如下)。理论上我们可以将任意的 IoC 容器注入到这个 Producer 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class SpringActorProducer implements IndirectActorProducer {

final private ApplicationContext applicationContext;
final private String actorBeanName;
final private Object[] args;

public SpringActorProducer(ApplicationContext applicationContext, String actorBeanName, Object... args) {
this.applicationContext = applicationContext;
this.actorBeanName = actorBeanName;
this.args = args;
}

@Override
public Actor produce() {
if (args == null) {
return (Actor) applicationContext.getBean(actorBeanName);
} else {
return (Actor) applicationContext.getBean(actorBeanName, args);
}
}

@Override
public Class<? extends Actor> actorClass() {
return (Class<? extends Actor>) applicationContext.getType(actorBeanName);
}
}

创建 SpringAKKA 扩展

什么是 akka 扩展? 一个扩展就是每个参与者系统创建的单例实例。

If you want to add features to Akka, there is a very elegant, but powerful mechanism for doing so. It’s called Akka Extensions and is comprised of 2 basic components: an Extension and an ExtensionId.

Extensions will only be loaded once per ActorSystem, which will be managed by Akka. You can choose to have your Extension loaded on-demand or at ActorSystem creation time through the Akka configuration. Details on how to make that happens are below, in the “Loading from Configuration” section.

如果想要为Akka添加特性,有一个非常优美而且强大的工具,称为 Akka 扩展。它由两部分组成: Extension 和 ExtensionId.

Extensions 在每个 ActorSystem 中只会加载一次, 并被Akka所管理。 你可以选择按需加载你的Extension或是在 ActorSystem 创建时通过Akka配置来加载。 关于这些细节,见下文 “从配置中加载” 的部分.

警告:由于扩展是hook到Akka自身的,所以扩展的实现者需要保证自己扩展的线程安全性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class SpringExtension extends AbstractExtensionId<SpringExtension.SpringExt> {

public static final SpringExtension SPRING_EXTENSION_PROVIDER = new SpringExtension();

@Override
public SpringExt createExtension(ExtendedActorSystem system) {
return new SpringExt();
}

public static class SpringExt implements Extension {
private volatile ApplicationContext applicationContext;

public void initialize(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}

public Props props(String actorBeanName) {
return Props.create(
SpringActorProducer.class, applicationContext, actorBeanName);
}
}
}

理解这个扩展,我们首先要理解什么是 PropsProps 实例是 actor 的蓝图,也就是说 Props 用于获取 actore。具体用法就是如果我们希望获取一个 actor,我们可以使用该方法来获取 actorSystem.actorOf(props, actorName)。 这里有点绕,因为我们的 Props 是通过 SpringActorProducer 获取的,我们为什么不直接用 Producer 来获取 Actor 呢?应该是为了抽象,Props 的创造方法有多种,这里只是其中的一种。

该扩展中定义了一个静态的成员变量,类型就是类本身,也就是维护本身的实例的引用。而其中的 createExtension 方法,如果阅读源码的话,可以发现是在注册 extension 的时候被调用。这个类应该是增加一个私有的构造函数,从而表明该类是一个单例。

该扩展的使用就是通过静态成员 SPRING_EXTENSION_PROVIDER 根据 actor 名字获取其 Propsprops 方法每次需要使用 Spring 来管理 actor 的引用的时候,就需要调用方法。

创建 Spring 配置类

我们使用 Java 配置方法来配置 Akka。不同于前面两篇参考文章,本文配置了两个 Actor,一个全局的 system actor, 还配置了一个业务的 actor,这样我们在使用业务的 actor 的时候更加方便。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Configuration
@ComponentScan
public class AkkaConfiguration {

public static final String ACTOR_SYSTEM = "ACTOR_SYSTEM";
public static final String LOGIN_ACTOR = "LOGIN_ACTOR";
@Autowired
private ApplicationContext applicationContext;

private ActorSystem actorSystem;

@Bean(name = ACTOR_SYSTEM)
public ActorSystem actorSystem() {
actorSystem = ActorSystem.create(ACTOR_SYSTEM);
SPRING_EXTENSION_PROVIDER.get(actorSystem).initialize(applicationContext);
return actorSystem;
}

@Bean(name = LOGIN_ACTOR)
@DependsOn({ACTOR_SYSTEM})
public ActorRef loginActor() {
return actorSystem
.actorOf(SPRING_EXTENSION_PROVIDER.get(actorSystem).props("loginActor"), LOGIN_ACTOR);
}
}

配置方法很简洁,首先我们需要注入 ApplicationContext,这个 context 需要传入 akka 扩展方法。我们首先配置系统级别的 actor,先使用 ActorSystem 的静态方法来使用名字创建一个 actor,然后将该 actor 放入到当前的 ApplicationContext 中,最后返回 ActorSystem 的引用。

然后在 ActorSystem 基础上我们在配置一个业务的 Actor,我们前面提到过,如何通过 Props 获取 Actor 的引用。这里就是其使用场景。我们通过 actorSystem.actorOf 方法传入蓝图(Props)和名称来获取其实例。

至此就完成了 AKKASpring 的配置。

Spring 使用 AKKA

使用 AKKA 重点在于需要理解 actor 的思想,很多文章介绍这方面的内容,比如这篇比较详细的介绍其设计思想。简单的将,Actor 就是一个消息的处理者,最常见的使用就是重载 onReceive 方法,然后根据消息的不同进行不同的处理,然后返回结果。在业务逻辑中,向相应功能的 actor 发送不同的消息。消息的发送和结果可以是异步的,比如我们执行一个比较费时的操作,这个操作可以放到actor,当执行该动作的消息发送到 actor 后,无需等待 actor 完成这个动作,而是先返回一个 Future,如果业务逻辑是 一个请求,请求就可以结束。知道 actor 完成,Future 会再次被触发从而获取处理结果。

Actor 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Component
public class GreetingService {

public String greet(String name) {
return "Hello, " + name;
}
}

@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class GreetingActor extends UntypedActor {

private GreetingService greetingService;

// constructor

@Override
public void onReceive(Object message) throws Throwable {
if (message instanceof Greet) {
String name = ((Greet) message).getName();
getSender().tell(greetingService.greet(name), getSelf());
} else {
unhandled(message);
}
}

public static class Greet {

private String name;

// standard constructors/getters

}
}

使用 Actor

1
2
3
4
5
6
7
8
9
ActorRef greeter = system.actorOf(SPRING_EXTENSION_PROVIDER.get(system)
.props("greetingActor"), "greeter");

FiniteDuration duration = FiniteDuration.create(1, TimeUnit.SECONDS);
Timeout timeout = Timeout.durationToTimeout(duration);

Future<Object> result = ask(greeter, new Greet("John"), timeout);

Assert.assertEquals("Hello, John", Await.result(result, duration));

扩展场景设计

该场景十分简单,我们假定需要设计一个扫码登录的功能,我们抛去细节,只关注核心,实际上这个核心主要有三个 API

  1. 初始登陆过程,比如我们打开微信网页版
  2. polling 的过程,这个过程是不断判断是否用户完成了扫码并点击确认授权
  3. 授权过程,这个 API 模拟了用户完成扫码,这时候 polling api 结束,并返回扫码后的结果。