告别 Possibly blocking call: WebFlux/Gateway 阻塞调用优雅解决
2025-04-17 05:42:10
告别 'Possibly blocking call': 在 Spring WebFlux/Gateway 中优雅处理阻塞调用
问题来了:为什么我的网关日志里全是 'Possibly blocking call'?
不少人在用 Spring Cloud Gateway 或者 Spring WebFlux 这类响应式框架时,会遇到一个头疼的警告信息:"Possibly blocking call in non-blocking context could lead to thread starvation"。特别是当你尝试在一个响应式的流程里,去调用一个传统的、同步阻塞的服务时,这个问题就更容易冒出来。
举个常见的场景:你有一个 API 网关(基于 Spring Cloud Gateway),它需要验证请求过来的 Token 是否有效。负责验证 Token 的是一个独立的 Auth 微服务。网关通过 OpenFeign 调用 Auth 服务,代码可能长这样:
// 在 Gateway 的某个 Filter 或者 Controller 里
// 注入了一个标准的 Feign Client
@Autowired
private AuthClient authClient;
// ... 省略其他代码 ...
public Mono<Void> handleRequest(ServerWebExchange exchange, GatewayFilterChain chain) {
String token = extractToken(exchange.getRequest());
// 问题就出在这里:一个同步阻塞的调用
boolean isValidToken = authClient.validateToken(token); // IDE 或 Reactor 会告警
if (isValidToken) {
// ... token 有效,继续处理请求 ...
return chain.filter(exchange);
} else {
// ... token 无效,返回错误 ...
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
return exchange.getResponse().setComplete();
}
}
// Feign Client 接口定义 (标准 OpenFeign)
@FeignClient("auth-service")
public interface AuthClient {
@GetMapping("/validate")
boolean validateToken(@RequestParam("token") String token);
}
代码看上去挺直观,但你的 IDE(比如 IntelliJ IDEA 装了 Reactor 插件)或者运行时,可能会提示 authClient.validateToken(token)
这一行是潜在的阻塞操作。
用户提到了尝试使用 Reactive Feign (com.playtika.reactivefeign:feign-reactor-spring-cloud-starter
) 但遇到了 Maven 依赖下载问题。确实,Reactive Feign 是解决这个问题的一种方式,但既然它暂时用不了,或者你想探索其他方法,那还有没有别的路子可走呢?(顺便提一句,这种场景用消息队列确实不太合适,毕竟验证 Token 需要即时得到结果)。
根源剖析:阻塞调用为何在非阻塞世界里“水土不服”?
要弄明白为什么会有这个警告,得先理解 Spring WebFlux/Gateway 这类响应式框架的工作模式。它们通常基于像 Netty 这样的事件驱动、非阻塞 I/O 模型。
简单来说,它们内部维护着一个(或多个)数量固定的 事件循环线程(Event Loop Thread) 。这些线程个个都是“时间管理大师”,专门负责快速处理网络事件(接收请求、发送响应、读写数据等)。一个线程能在很短的时间内处理多个连接上的事件。
关键点在于:事件循环线程绝对不能被长时间阻塞 。
想象一下,一个事件循环线程正在处理你的请求。当代码执行到 authClient.validateToken(token)
时,由于这是个标准的同步 HTTP 调用,这个线程就得停下来,眼巴巴地等着网络另一端的 Auth 服务给回响应。这个等待过程可能耗时几十毫秒甚至几秒。在这段时间里,这个可怜的线程就啥也干不了了,完全被“阻塞”了。
如果同一时间有很多请求涌入,并且都需要进行这种阻塞调用,那么有限的几个事件循环线程可能很快就全都被占满了,都在等待。结果就是,新的请求根本没有线程来处理,系统看起来就像卡死了一样,响应极其缓慢甚至完全没响应——这就是所谓的 “线程饥饿(Thread Starvation)” 。
这就是为什么在非阻塞的上下文中(比如 WebFlux/Gateway 的处理流里),执行阻塞操作是大忌。
解决之道:告别阻塞,拥抱顺畅
既然知道了病根,那就有药方了。除了 Reactive Feign,我们至少还有两种主流且推荐的方式来处理这个问题。
方案一:把阻塞任务“扔”给专用线程池 (Schedulers.boundedElastic
)
这是 Project Reactor(WebFlux 背后的响应式库)推荐的处理遗留阻塞代码的标准姿势。思路很简单:既然事件循环线程不能阻塞,那我就把这个阻塞的操作,扔给一个专门干“脏活累活”(阻塞任务)的独立线程池 去执行。执行完了,再把结果通过异步回调的方式交还给事件循环线程。
Project Reactor 提供了一个现成的、专门为此设计的线程池调度器:Schedulers.boundedElastic()
。这个调度器维护一个有界的、弹性的线程池,专门用来执行那些耗时可能较长或者会阻塞的非 CPU 密集型任务。它和事件循环线程池是隔离的。
原理和作用:
- 将阻塞调用包装成一个
Callable
。 - 使用
Mono.fromCallable
将这个Callable
转换成一个Mono
(响应式流中的一个表示 0 或 1 个元素的发布者)。 - 使用
.subscribeOn(Schedulers.boundedElastic())
指示这个Mono
的执行(也就是那个阻塞调用)切换到boundedElastic
线程池上进行。 - 这样,实际的阻塞等待发生在
boundedElastic
的线程上,事件循环线程只是触发了这个任务,然后就可以去忙别的事情了,不会被卡住。当阻塞调用完成后,结果会通过Mono
异步地传递回来,后续的响应式链(比如flatMap
,map
等)会在原来的事件循环线程(或者其他合适的线程)上继续执行。
代码示例:
修改上面的 handleRequest
方法:
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
// ... 其他 imports ...
@Autowired
private AuthClient authClient;
public Mono<Void> handleRequest(ServerWebExchange exchange, GatewayFilterChain chain) {
String token = extractToken(exchange.getRequest());
// 1. 把阻塞调用包起来
Mono<Boolean> validationMono = Mono.fromCallable(() -> {
// 这个 lambda 表达式里的代码会在 boundedElastic 线程池执行
System.out.println("执行验证的线程: " + Thread.currentThread().getName()); // 观察线程名,会是 elastic-x
return authClient.validateToken(token); // 这是阻塞调用
}).subscribeOn(Schedulers.boundedElastic()); // 2. 指定在 Schedulers.boundedElastic() 上执行
// 3. 继续响应式地处理结果
return validationMono.flatMap(isValidToken -> {
System.out.println("处理验证结果的线程: " + Thread.currentThread().getName()); // 观察线程名,通常会是 reactor-http-nio-x
if (isValidToken) {
return chain.filter(exchange);
} else {
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
return exchange.getResponse().setComplete();
}
});
}
进阶使用技巧:
boundedElastic
线程池配置: 这个线程池的大小和队列长度是有限制的,可以通过 Java 系统属性来调整,以应对高并发场景:reactor.schedulers.defaultBoundedElasticSize
: 最大线程数 (默认是 CPU 核心数 * 10)。reactor.schedulers.defaultBoundedElasticQueueSize
: 任务队列大小 (默认 100,000)。
如果你的阻塞调用非常多或者耗时特别长,可能需要适当调大这些值,但要注意资源消耗。密切监控这个线程池的活跃线程数和队列堆积情况很重要。
- 使用
publishOn
: 有时你可能希望阻塞调用后的 后续处理 也发生在特定线程池(比如boundedElastic
或另一个自定义线程池),可以用.publishOn()
。但通常,subscribeOn
用于指定 从源头开始 在哪个线程执行(特别是阻塞源),而后续操作默认会回到调用链上游的线程上下文(通常是事件循环线程)。对于 I/O 绑定的阻塞调用,subscribeOn(Schedulers.boundedElastic())
是最常用的。
方案二:换个“交通工具” - 使用 WebClient
进行异步调用
既然你在一个响应式的环境里,最“地道”的方式当然是使用响应式的 HTTP 客户端。Spring 5 引入了 WebClient
,这是一个现代的、完全非阻塞的、响应式的 HTTP 客户端,是 RestTemplate
的继任者,并且与 Project Reactor/WebFlux 无缝集成。
与其用标准的、默认同步阻塞的 OpenFeign,不如直接为这个特定的调用改用 WebClient
。
原理和作用:
WebClient
底层基于非阻塞 I/O(如 Netty),从发起请求到接收响应的整个过程都是异步的。- 它本身就返回
Mono
或Flux
,天然融入响应式编程模型。 - 发起请求后,它不会阻塞当前线程(比如事件循环线程),而是注册一个回调。当网络响应到达时,底层的 I/O 线程(通常是事件循环线程)会收到通知并触发这个回调,继续处理响应。
代码示例:
首先,你需要配置一个 WebClient
Bean (通常在配置类里):
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.cloud.client.loadbalancer.LoadBalanced; // 如果用了服务发现
@Configuration
public class WebClientConfig {
@Bean
@LoadBalanced // 如果你的 Auth 服务注册到了服务发现中心 (如 Nacos, Eureka),需要加这个注解来启用客户端负载均衡
public WebClient.Builder loadBalancedWebClientBuilder() {
return WebClient.builder();
}
// 如果不通过服务发现,而是直接指定 Auth 服务的地址
// @Bean
// public WebClient authWebClient(WebClient.Builder builder) {
// return builder.baseUrl("http://auth-service-address:port").build();
// }
}
然后,在你的网关代码里注入并使用 WebClient
:
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
// ... 其他 imports ...
@Component // 或者 @Service 等
public class AuthServiceValidator {
private final WebClient webClient;
// 注入通过 @LoadBalanced 配置的 WebClient.Builder
// 或者直接注入特定服务的 WebClient Bean
public AuthServiceValidator(@LoadBalanced WebClient.Builder webClientBuilder) {
// 假设 Auth 服务在服务发现中的名字是 "auth-service"
this.webClient = webClientBuilder.baseUrl("http://auth-service").build();
}
public Mono<Boolean> validateTokenAsync(String token) {
return this.webClient.get() // 使用 GET 请求
.uri(uriBuilder -> uriBuilder
.path("/validate") // Auth 服务的验证端点
.queryParam("token", token) // 请求参数
.build())
.retrieve() // 发起请求并获取响应体
.bodyToMono(Boolean.class) // 将响应体转换成 Mono<Boolean>
.onErrorResume(ex -> {
// 处理调用异常,比如网络错误、服务不可用等
// 这里简单返回 false,实际应用中可能需要更详细的日志和处理
System.err.println("调用 Auth 服务验证 Token 出错: " + ex.getMessage());
return Mono.just(false);
});
// .timeout(Duration.ofSeconds(3)); // 可以设置超时
}
}
// 在 Gateway Filter 中使用这个 Validator
@Autowired
private AuthServiceValidator authServiceValidator;
public Mono<Void> handleRequest(ServerWebExchange exchange, GatewayFilterChain chain) {
String token = extractToken(exchange.getRequest());
// 直接调用返回 Mono 的异步方法
return authServiceValidator.validateTokenAsync(token)
.flatMap(isValidToken -> {
System.out.println("处理验证结果的线程: " + Thread.currentThread().getName()); // 观察线程名,仍然是 reactor-http-nio-x
if (isValidToken) {
return chain.filter(exchange);
} else {
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
return exchange.getResponse().setComplete();
}
});
}
进阶使用技巧:
WebClient
配置: 你可以对WebClient
进行很多精细化配置,例如:- 连接超时、读取/写入超时。
- 配置底层的 HttpClient(比如 Reactor Netty 的连接池大小、keep-alive 等)。
- 添加默认 Header、Filter Function(用于日志、认证、修改请求/响应等)。
- 错误处理:
WebClient
的retrieve()
方法在遇到 HTTP 状态码为 4xx 或 5xx 时默认会抛出WebClientResponseException
。你可以使用.onStatus()
来定制错误处理逻辑,或者像上面例子中那样用.onErrorResume()
来捕获所有类型的错误并提供降级逻辑。 - 断路器与重试: 集成 Resilience4J 或 Spring Retry (
reactor-extra
模块) 来实现断路器、重试等服务容错机制,增强系统稳定性。 - 请求/响应日志: 利用
WebClient
的 Filter Function 或者配合日志框架(如 Logbook)记录详细的 HTTP 交互日志,方便调试。
安全建议:
- 如果 Auth 服务需要 HTTPS,确保
WebClient
配置了正确的 SSL/TLS 上下文。 - 处理 Token 这类敏感信息时,注意日志脱敏,避免在日志中意外暴露 Token 内容。
(警惕) 方案三:增加工作线程 - 治标不治本
有人可能会想:“既然是线程不够用了导致饿死,那我多加点线程不就行了?” 比如,尝试去调整底层 Netty 服务器的工作线程数。
原理和作用:
理论上,增加事件循环线程(Worker Thread)的数量,确实能让系统同时处理更多阻塞任务(因为有更多线程可以被阻塞)。
操作步骤:
通常涉及到修改 WebFlux/Gateway 的服务器配置。具体属性可能依赖于你使用的 Spring Boot 版本和具体的服务器实现(默认是 Reactor Netty)。可能需要通过 application.properties
/yaml
或者配置自定义的 ReactorResourceFactory
Bean 来设置类似 server.netty.worker-count
的属性(请查阅你使用的 Spring Boot 版本文档确认准确的配置方式)。
为什么不推荐:
- 治标不治本: 这并没有解决阻塞调用本身的问题。阻塞依然存在,只是你用了更多宝贵的线程资源去“硬抗”。
- 违反响应式设计初衷: 响应式模型的核心优势就是用少量线程处理高并发。大量增加线程会增加线程上下文切换的开销,消耗更多内存,反而可能降低系统整体吞吐量和效率。
- 难以确定合适的线程数: 需要多少线程才够?这很难估计,并且会随着负载变化而变化。设置少了还是会饿死,设置多了浪费资源。
- 阻塞时间不可控: 如果下游服务响应变慢,再多的线程也可能被耗尽。
强烈建议:把这作为最后的手段,并且只在阻塞调用极其短暂且次数严格受控的情况下,作为临时缓解措施考虑。 首选还是方案一或方案二。
如何选择?
那么,方案一 (Schedulers.boundedElastic
) 和方案二 (WebClient
) 该用哪个呢?
-
如果你想尽可能少地改动现有代码 ,并且那个阻塞调用只是个别现象,那么使用
Mono.fromCallable(...).subscribeOn(Schedulers.boundedElastic())
(方案一) 是个不错的选择。它能快速解决问题,对原有业务逻辑的侵入性相对较小。 -
如果你追求更彻底的响应式化 ,希望整个调用链都是非阻塞的,或者你需要对 HTTP 调用进行更精细的控制(超时、重试、连接池等),那么改用
WebClient
(方案二) 是更优、更符合响应式编程思想的选择。虽然可能需要稍微多写点代码(替换 Feign 调用为WebClient
调用),但长期来看,它能带来更好的性能和资源利用率,与 WebFlux/Gateway 的契合度也最高。 -
至于方案三(增加线程) ,尽量避免。
选择哪种方案取决于你的具体场景、团队对响应式编程的熟悉程度以及你愿意投入的改造时间。但无论如何,直接在事件循环线程上进行阻塞调用是应该极力避免的。