Spring WebFlux 中的并发
1、简介本文将带你了解 Spring WebFlux 响应式应用中的并发。2、响应式编程的动机一个典型的 Web 应用由多个复杂的交互部分组成。其中许多交互在本质上是阻塞性的例如那些涉及数据库调用以获取或更新数据的交互。而其他几个部分则是独立的可以并发执行也可能是并行执行。并发是多个任务在同一时间段内交替执行而并行是多个任务同时执行。并发关注的是任务的调度和切换以提高系统的效率和响应性而并行关注的是任务的同时执行以提高计算速度和性能。例如用户对 Web 服务器的两个请求可以由不同的线程处理。在多核平台上这对整体响应时间有明显的好处。因此这种并发模型被称thread-per-request每个请求一个线程模型如上图每个线程一次处理一个请求。虽然基于线程的并发为我们解决了部分问题但却无法解决单个线程内的大部分交互仍然是阻塞的这一事实。此外在 Java 中使用原生线程来实现并发时还需要付出上下文切换的巨大代价。与此同时随着 Web 应用面临的请求越来越多thread-per-request模式开始无法满足人们的期望。因此我们需要一种并发模型它可以帮助我们用相对较少的线程数处理越来越多的请求。这也是采用响应式 编程的主要动机之一。3、响应式编程中的并发响应式编程可以帮助我们根据数据流和变化的传播来构建程序。在完全无阻塞的环境中这可以让我们实现更高的并发性和更好的资源利用率。然而响应式编程是否完全摒弃了基于线程的并发虽然这种说法有些激烈但响应式编程肯定与使用线程实现并发的方法截然不同。响应式编程带来的根本区别在于异步。换句话说程序流程从一连串同步操作转变为异步事件流。例如在响应式模型下对数据库的读取调用不会在获取数据时阻塞调用线程。调用会立即返回一个发布者Publisher其他人可以订阅该发布者。订阅者Subscriber可以在事件发生后对其进行处理甚至可以自己进一步生成事件最重要的是响应式编程并不强调应该生成和消耗哪个线程事件。相反它强调的是将程序构造成异步事件流。这里的发布者和订阅者不需要属于同一个线程。这有助于我们更好地利用可用线程从而提高整体并发性。4、Event LoopEvent Loop事件循环模型是一种用于服务器的响应式异步编程模型上图是一个事件循环的抽象设计展示了响应式异步编程的思想Event Loop 在单个线程中连续运行我们可以根据可用内核的数量设置多个 Event Loop。Event Loop 按顺序处理来自事件队列的事件并在向平台注册回调后立即返回。Platform平台可以触发操作的完成如数据库调用或外部服务调用。Event Loop 可在operation操作完成通知时触发callback回调并将结果发回给原始调用者。包括 Node.js、Netty 和 Ngnix 在内的许多平台都实现了 Event Loop 模型。与 Apache HTTP 服务器、Tomcat 或 JBoss 等传统平台相比它们具有更好的可扩展性。5、用 Spring WebFlux 进行响应式编程对响应式 编程及其并发模型有了足够的了解后来看看 Spring WebFlux。它是 Spring 在 5.0 版本中添加的响应式 Web 框架。你可以看到Spring WebFlux 与 Spring 中的传统 Web 框架并行并不一定会取代它。这里有几个要点需要注意Spring WebFlux 通过函数路由扩展了传统的基于注解的编程模型。此外它还将底层 HTTP 运行时与 Reactive Streams API 相适配使得运行时之间可以互操作。它能够支持各种响应式运行时包括Tomcat、Reactor、Netty或Undertow等 Servlet 3.1 容器。最后它还包括 WebClient这是一个用于 HTTP 请求的响应式非阻塞客户端提供函数式和 Fluent 风格的 API。6、支持的运行时的线程模型如前所述响应式程序倾向于只使用几个线程并充分利用它们。不过线程的数量和性质取决于选择的实际 Reactive Stream API 运行时。要说明的是Spring WebFlux 可以通过HttpHandler提供的通用 API 适应不同的运行时。该 API 是一个简单的接口只有一个方法提供了对不同服务器 API如 Reactor Netty、Servlet 3.1 API 或 Undertow API的抽象。来看看其中几种 软件所采用的线程模型。WebFlux 默认的服务器是 Netty你也可以使用其他任何受支持的服务器如 Tomcat。只需要在pom.xml中正确配置依赖即可dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-webflux/artifactId exclusions exclusion groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-reactor-netty/artifactId /exclusion /exclusions /dependency dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-tomcat/artifactId /dependency有多种方式观察在 Java 虚拟机中创建的线程也可以直接从线程类本身提取线程这更简单Thread.getAllStackTraces() .keySet() .stream() .collect(Collectors.toList());6.1、Reactor Netty如上所述Reactor Netty 是 Spring Boot WebFlux Starter 中的默认嵌入式服务器。来看看 Netty 默认创建的线程。首先不添加任何其他依赖也不会使用WebClient。因此如果启动使用 Spring Boot Starter 创建的 Spring WebFlux 应用就会看到它创建的一些默认线程注意除了用于服务器的普通线程外Netty 还会生成大量用于处理请求的工作线程。这些线程数量通常是可用的 CPU 内核数量。这是在四核机器上的输出结果。还有一些典型的 JVM 环境下的管理线程但在这里它们并不重要。Netty 使用 Event Loop 模型以异步响应式的方式提供高度可扩展的并发性。来看看 Netty 如何利用 Java NIO 实现 Event Loop从而提供这种可扩展性如上EventLoopGroup管理一个或多个必须持续运行的EventLoop。因此不建议创建超过可用内核数量的EventLoop。EventLoopGroup还为每个新创建的Channel分配一个EventLoop。因此在Channel的整个生命周期内所有操作都由同一个线程执行。6.2、Apache TomcatSpring WebFlux 也支持传统的 Servlet 容器如 Apache Tomcat。WebFlux 依赖于 Servlet 3.1 API 和非阻塞 I/O。虽然它在底层适配器后面使用了 Servlet API但 Servlet API 并不能直接使用。来看看在 Tomcat 上运行的 WebFlux 应用会有哪些线程你可以看到这里的线程数量和类型与之前观察到的截然不同。首先Tomcat 启动时会有更多的工作线程默认为 10 个。当然还会看到一些 JVM 和Catalina容器特有的内部线程但在本文中可以忽略这些线程。我们需要先了解 Tomcat 与 Java NIO 的架构才能将其与上面看到的线程联系起来。Tomcat 5 及以后版本在其 Connector连接器组件中支持 NIO该组件主要负责接收请求。另一个 Tomcat 组件是 Container容器组件它负责容器管理功能。在这里重点关注的是Connector组件为支持 NIO 而实现的线程模型。它由Acceptor、Poller和Worker组成是NioEndpoint模块的一部分Tomcat 会为Acceptor、Poller和Worker生成一个或多个线程通常还会有一个专用于Worker的线程池。对 Tomcat 架构的详细讨论超出了本文的范围但我们现在应该有足够的了解来理解我们之前看到的线程。7、WebClient 的线程模型WebClient是 Spring WebFlux 中的响应式 HTTP 客户端。可以在需要基于 REST 的通信时使用它从而创建端到端的响应式应用。正如之前所看到的响应式应用只需几个线程即可运行因此应用的任何部分都不会阻塞线程。因此WebClient在帮助我们实现 WebFlux 的潜力方面发挥着至关重要的作用。7.1、使用 WebClient使用WebClient也很简单。不需要包含任何特定的依赖因为它是 Spring WebFlux 的一部分。创建一个简单的 REST 端点它返回一个MonoGetMapping(/index) public MonoString getIndex() { return Mono.just(Hello World!); }然后使用WebClient来调用这个 REST 端点并被动地消费数据WebClient.create(http://localhost:8080/index).get() .retrieve() .bodyToMono(String.class) .doOnNext(s - printThreads());在这里还打印了使用前面介绍的方法创建的线程。7.2、理解线程模型那么线程模型在WebClient中是如何工作的呢毫不疑问WebClient也使用 Event Loop 模型实现并发。当然它依赖于底层运行时提供必要的基础架构。如果我们在 Reactor Netty 上运行WebClient它将共享 Netty 用于服务器的 Event Loop。因此在这种情况下我们可能不会注意到创建的线程有什么不同。不过WebClient也支持 Servlet 3.1 容器如 Jetty但其工作方式有所不同。如果我们比较一下运行 Jetty 的 WebFlux 应用上创建的线程有WebClient和没有WebClient就会发现多了几个线程。在这里WebClient必须创建其 Event Loop。因此可以看到该 Event Loop 创建的固定数量的处理线程在某些情况下为客户端和服务器设置独立的线程池可以提供更好的性能。虽然这不是 Netty 的默认行为但如果需要可以为WebClient声明一个专用的线程池。8、数据访问库的线程模型正如之前所说即使是一个简单的应用通常也由几个需要 “连接” 的组件组成。这些组件的典型例子包括数据库和 Message Broker。其中许多现有的客户端库仍然是阻塞的但这种情况正在迅速改变。现在有几种数据库提供用于连接的响应式客户端库。其中许多库在 Spring Data 中可用也可以直接使用其他库。来看看这些库使用的线程模型。8.1、Spring Data MongoDBSpring Data MongoDB 基于 MongoDB Reactive Streams 驱动 为 MongoDB 提供响应式 Repository 支持。最值得注意的是该驱动完全实现了 Reactive Streams API可提供具有非阻塞背压back-pressure的异步流处理。添加pring-boot-starter-data-mongodb-reactive依赖即可在 Spring Boot 中使用 MongoDB 响应式 Repositorydependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-data-mongodb-reactive/artifactId /dependency创建一个 Repository并使用它以非阻塞的方式在 MongoDB 上执行一些基本操作public interface PersonRepository extends ReactiveMongoRepositoryPerson, ObjectId { } ..... personRepository.findAll().doOnComplete(this::printThreads);那么当我们在 Netty 服务器上运行此应用时会看到什么样的线程呢毫无疑问不会看到太大的区别因为 Spring Data Reactive Repository 使用的是与服务器相同的 Event Loop。8.2、Reactor KafkaSpring 目前仍在建设全面支持响应式 Kafka 的过程中。然而在 Spring 之外确实有其他可用的选择。Reactor Kafka 是基于 Reactor 的 Kafka 响应式 API。Reactor Kafka 可以使用函数式 API 发布和消费消息也可以使用非阻塞背压。首先需要在应用中添加所需的依赖以使用 Reactor Kafkadependency groupIdio.projectreactor.kafka/groupId artifactIdreactor-kafka/artifactId version1.3.10/version /dependency以非阻塞的方式向 Kafka 发送消息// producerProps标准 Kafka 生产者配置 Map SenderOptionsInteger, String senderOptions SenderOptions.create(producerProps); KafkaSenderInteger, String sender KafkaSender.create(senderOptions); FluxSenderRecordInteger, String, Integer outboundFlux Flux .range(1, 10) .map(i - SenderRecord.create(new ProducerRecord(reactive-test, i, Message_ i), i)); sender.send(outboundFlux).subscribe();同样也可以以非阻塞的方式从 Kafka 中获取消息// consumerProps标准 Kafka 消费者配置 Map ReceiverOptionsInteger, String receiverOptions ReceiverOptions.create(consumerProps); receiverOptions.subscription(Collections.singleton(reactive-test)); KafkaReceiverInteger, String receiver KafkaReceiver.create(receiverOptions); FluxReceiverRecordInteger, String inboundFlux receiver.receive(); inboundFlux.doOnComplete(this::printThreads)上述代码非常简单订阅了 Kafka 中的reactive-test主题并收到了Flux消息。其创建的线程如下可以看到一些非典型的 Netty 服务器线程。这表明 Reactor Kafka 管理着自己的线程池其中有几个工作worker线程专门参与 Kafka 消息的处理。当然还会看到其他一些与 Netty 和 JVM 相关的线程这些我们都可以忽略不计。Kafka 生产者使用单独的网络线程向 Broker 发送请求。此外它们还通过单线程池调度器single-threaded pooled scheduler向应用发送响应。而 Kafka 消费者则是每个消费者组有一个线程它会阻塞以监听传入的消息。然后传入的消息会被调度在不同的线程池中处理。9、WebFlux 中的调度选项到目前为止我们已经看到在完全非阻塞的环境中只需使用少量线程响应式 编程就发挥了出色的作用。但这也意味着如果确实存在阻塞的部分它将导致性能严重下降。这是因为阻塞操作可能会完全冻结 Event Loop。那么响反应式编程中我们该如何处理长期运行的进程或阻塞操作呢老实说最好的办法就是避开它们。然而这并不总是可行的我们可能需要为应用的这些部分制定专门的调度策略。Spring WebFlux 提供了一种在 Data Flow Chain 之间将处理切换到不同线程池的机制。这可以让我们精确控制某些任务的调度策略。当然WebFlux 是基于线程池抽象即底层 Reactive 库中的调度器来提供这种功能的。9.1、Reactor在 Reactor 中Scheduler 类定义了执行模型以及执行的位置。Schedulers 类提供了许多执行上下文Execution Context如immediate、single、elastic和parallel。它们提供了不同类型的线程池可用于不同的工作。此外还可以使用已有的 ExecutorService 创建自己的 Scheduler。Schedulers提供了多种执行上下文而 Reactor 也为切换执行上下文的不同方法。这些方法就是publishOn和subscribeOn。我们可以在 Chain调用链中的任何位置将publishOn与Scheduler一起使用Scheduler会影响所有后续操作符。虽然可以在 Chain 中的任何位置使用subscribeOn和Scheduler但它只会影响发送源的上下文。回想一下WebClient 在 Netty 上默认共享与服务器相同的 Event Loop。然而我们可能有合理的理由为WebClient创建一个专用的线程池。来看看如何在 ReactorWebFlux 的默认 Reactive 库中实现这一点Scheduler scheduler Schedulers.newBoundedElastic(5, 10, MyThreadGroup); WebClient.create(http://localhost:8080/index).get() .retrieve() .bodyToMono(String.class) .publishOn(scheduler) .doOnNext(s - printThreads());之前我们没有观察到在有WebClient和没有WebClient的 Netty 上创建的线程有什么不同。然而如果现在运行上面的代码我们可以观察到一些新的线程被创建在这里我们可以看到作为 “bounded elastic thread pool”有界弹性线程池 一部分创建的线程。一旦订阅WebClient的响应就会在这里发布。这样主线程池就可以处理服务器请求了。9.2、RxJavaRxJava 的默认行为与 Reactor 并无太大区别。Observable以及在其上应用的操作符链Chain会在订阅被调用的同一线程上执行工作并通知观察者。此外RxJava 和 Reactor 一样提供了将预定的或自定义的调度策略引入到链中的方法。RxJava 还有一个 Schedulers 类它为 Observable 链提供了多种执行模型。其中包括new thread、immediate、trampoline、io、computation和test。当然它还允许从 Java Executor 中定义一个 Scheduler。此外RxJava 还提供了两种扩展方法来实现这一目标即subscribeOn和observeOn。subscribeOn方法通过指定一个不同的Scheduler来改变默认行为Observable应在该Scheduler上运行。另一方面observeOn方法指定了一个不同的SchedulerObservable可以使用该Scheduler向观察者发送通知。如前所述Spring WebFlux 默认使用 Reactor 作为其 Reactive 库。但由于它与 Reactive Streams API 完全兼容因此可以切换到另一种 Reactive Streams 实现如 RxJava适用于 RxJava 1.x 及其 Reactive Streams 适配器。添加依赖dependency groupIdio.reactivex.rxjava2/groupId artifactIdrxjava/artifactId version2.2.21/version /dependency然后在应用中使用 RxJava 类型如Observable以及 RxJava 特定的Schedulerio.reactivex.Observable .fromIterable(Arrays.asList(Tom, Sawyer)) .map(s - s.toUpperCase()) .observeOn(io.reactivex.schedulers.Schedulers.trampoline()) .doOnComplete(this::printThreads);运行这个应用除了常规的 Netty 和 JVM 相关线程外还应该看到一些与 RxJavaScheduler相关的线程10、总结本文介绍了响应式和和传统 编程模型之间的差异还结合不同的 HTTP 运行时和 Reactive 库介绍了 WebFlux 中的线程模型。最后还介绍了在使用WebClient和数据访问库时线程模型有何不同。