返回

告别 Possibly blocking call: WebFlux/Gateway 阻塞调用优雅解决

java

告别 'Possibly blocking call': 在 Spring WebFlux/Gateway 中优雅处理阻塞调用

问题来了:为什么我的网关日志里全是 'Possibly blocking call'?

不少人在用 Spring Cloud Gateway 或者 Spring WebFlux 这类响应式框架时,会遇到一个头疼的警告信息:"Possibly blocking call in non-blocking context could lead to thread starvation"。特别是当你尝试在一个响应式的流程里,去调用一个传统的、同步阻塞的服务时,这个问题就更容易冒出来。

举个常见的场景:你有一个 API 网关(基于 Spring Cloud Gateway),它需要验证请求过来的 Token 是否有效。负责验证 Token 的是一个独立的 Auth 微服务。网关通过 OpenFeign 调用 Auth 服务,代码可能长这样:

// 在 Gateway 的某个 Filter 或者 Controller 里
// 注入了一个标准的 Feign Client
@Autowired
private AuthClient authClient;

// ... 省略其他代码 ...

public Mono<Void> handleRequest(ServerWebExchange exchange, GatewayFilterChain chain) {
    String token = extractToken(exchange.getRequest());

    // 问题就出在这里:一个同步阻塞的调用
    boolean isValidToken = authClient.validateToken(token); // IDE 或 Reactor 会告警

    if (isValidToken) {
        // ... token 有效,继续处理请求 ...
        return chain.filter(exchange);
    } else {
        // ... token 无效,返回错误 ...
        exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
        return exchange.getResponse().setComplete();
    }
}

// Feign Client 接口定义 (标准 OpenFeign)
@FeignClient("auth-service")
public interface AuthClient {
    @GetMapping("/validate")
    boolean validateToken(@RequestParam("token") String token);
}

代码看上去挺直观,但你的 IDE(比如 IntelliJ IDEA 装了 Reactor 插件)或者运行时,可能会提示 authClient.validateToken(token) 这一行是潜在的阻塞操作。

用户提到了尝试使用 Reactive Feign (com.playtika.reactivefeign:feign-reactor-spring-cloud-starter) 但遇到了 Maven 依赖下载问题。确实,Reactive Feign 是解决这个问题的一种方式,但既然它暂时用不了,或者你想探索其他方法,那还有没有别的路子可走呢?(顺便提一句,这种场景用消息队列确实不太合适,毕竟验证 Token 需要即时得到结果)。

根源剖析:阻塞调用为何在非阻塞世界里“水土不服”?

要弄明白为什么会有这个警告,得先理解 Spring WebFlux/Gateway 这类响应式框架的工作模式。它们通常基于像 Netty 这样的事件驱动、非阻塞 I/O 模型。

简单来说,它们内部维护着一个(或多个)数量固定的 事件循环线程(Event Loop Thread) 。这些线程个个都是“时间管理大师”,专门负责快速处理网络事件(接收请求、发送响应、读写数据等)。一个线程能在很短的时间内处理多个连接上的事件。

关键点在于:事件循环线程绝对不能被长时间阻塞

想象一下,一个事件循环线程正在处理你的请求。当代码执行到 authClient.validateToken(token) 时,由于这是个标准的同步 HTTP 调用,这个线程就得停下来,眼巴巴地等着网络另一端的 Auth 服务给回响应。这个等待过程可能耗时几十毫秒甚至几秒。在这段时间里,这个可怜的线程就啥也干不了了,完全被“阻塞”了。

如果同一时间有很多请求涌入,并且都需要进行这种阻塞调用,那么有限的几个事件循环线程可能很快就全都被占满了,都在等待。结果就是,新的请求根本没有线程来处理,系统看起来就像卡死了一样,响应极其缓慢甚至完全没响应——这就是所谓的 “线程饥饿(Thread Starvation)”

这就是为什么在非阻塞的上下文中(比如 WebFlux/Gateway 的处理流里),执行阻塞操作是大忌。

解决之道:告别阻塞,拥抱顺畅

既然知道了病根,那就有药方了。除了 Reactive Feign,我们至少还有两种主流且推荐的方式来处理这个问题。

方案一:把阻塞任务“扔”给专用线程池 (Schedulers.boundedElastic)

这是 Project Reactor(WebFlux 背后的响应式库)推荐的处理遗留阻塞代码的标准姿势。思路很简单:既然事件循环线程不能阻塞,那我就把这个阻塞的操作,扔给一个专门干“脏活累活”(阻塞任务)的独立线程池 去执行。执行完了,再把结果通过异步回调的方式交还给事件循环线程。

Project Reactor 提供了一个现成的、专门为此设计的线程池调度器:Schedulers.boundedElastic()。这个调度器维护一个有界的、弹性的线程池,专门用来执行那些耗时可能较长或者会阻塞的非 CPU 密集型任务。它和事件循环线程池是隔离的。

原理和作用:

  • 将阻塞调用包装成一个 Callable
  • 使用 Mono.fromCallable 将这个 Callable 转换成一个 Mono (响应式流中的一个表示 0 或 1 个元素的发布者)。
  • 使用 .subscribeOn(Schedulers.boundedElastic()) 指示这个 Mono 的执行(也就是那个阻塞调用)切换到 boundedElastic 线程池上进行。
  • 这样,实际的阻塞等待发生在 boundedElastic 的线程上,事件循环线程只是触发了这个任务,然后就可以去忙别的事情了,不会被卡住。当阻塞调用完成后,结果会通过 Mono 异步地传递回来,后续的响应式链(比如 flatMap, map 等)会在原来的事件循环线程(或者其他合适的线程)上继续执行。

代码示例:

修改上面的 handleRequest 方法:

import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
// ... 其他 imports ...

@Autowired
private AuthClient authClient;

public Mono<Void> handleRequest(ServerWebExchange exchange, GatewayFilterChain chain) {
    String token = extractToken(exchange.getRequest());

    // 1. 把阻塞调用包起来
    Mono<Boolean> validationMono = Mono.fromCallable(() -> {
        // 这个 lambda 表达式里的代码会在 boundedElastic 线程池执行
        System.out.println("执行验证的线程: " + Thread.currentThread().getName()); // 观察线程名,会是 elastic-x
        return authClient.validateToken(token); // 这是阻塞调用
    }).subscribeOn(Schedulers.boundedElastic()); // 2. 指定在 Schedulers.boundedElastic() 上执行

    // 3. 继续响应式地处理结果
    return validationMono.flatMap(isValidToken -> {
        System.out.println("处理验证结果的线程: " + Thread.currentThread().getName()); // 观察线程名,通常会是 reactor-http-nio-x
        if (isValidToken) {
            return chain.filter(exchange);
        } else {
            exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
            return exchange.getResponse().setComplete();
        }
    });
}

进阶使用技巧:

  • boundedElastic 线程池配置: 这个线程池的大小和队列长度是有限制的,可以通过 Java 系统属性来调整,以应对高并发场景:
    • reactor.schedulers.defaultBoundedElasticSize: 最大线程数 (默认是 CPU 核心数 * 10)。
    • reactor.schedulers.defaultBoundedElasticQueueSize: 任务队列大小 (默认 100,000)。
      如果你的阻塞调用非常多或者耗时特别长,可能需要适当调大这些值,但要注意资源消耗。密切监控这个线程池的活跃线程数和队列堆积情况很重要。
  • 使用 publishOn: 有时你可能希望阻塞调用后的 后续处理 也发生在特定线程池(比如 boundedElastic 或另一个自定义线程池),可以用 .publishOn()。但通常,subscribeOn 用于指定 从源头开始 在哪个线程执行(特别是阻塞源),而后续操作默认会回到调用链上游的线程上下文(通常是事件循环线程)。对于 I/O 绑定的阻塞调用,subscribeOn(Schedulers.boundedElastic()) 是最常用的。

方案二:换个“交通工具” - 使用 WebClient 进行异步调用

既然你在一个响应式的环境里,最“地道”的方式当然是使用响应式的 HTTP 客户端。Spring 5 引入了 WebClient,这是一个现代的、完全非阻塞的、响应式的 HTTP 客户端,是 RestTemplate 的继任者,并且与 Project Reactor/WebFlux 无缝集成。

与其用标准的、默认同步阻塞的 OpenFeign,不如直接为这个特定的调用改用 WebClient

原理和作用:

  • WebClient 底层基于非阻塞 I/O(如 Netty),从发起请求到接收响应的整个过程都是异步的。
  • 它本身就返回 MonoFlux,天然融入响应式编程模型。
  • 发起请求后,它不会阻塞当前线程(比如事件循环线程),而是注册一个回调。当网络响应到达时,底层的 I/O 线程(通常是事件循环线程)会收到通知并触发这个回调,继续处理响应。

代码示例:

首先,你需要配置一个 WebClient Bean (通常在配置类里):

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.cloud.client.loadbalancer.LoadBalanced; // 如果用了服务发现

@Configuration
public class WebClientConfig {

    @Bean
    @LoadBalanced // 如果你的 Auth 服务注册到了服务发现中心 (如 Nacos, Eureka),需要加这个注解来启用客户端负载均衡
    public WebClient.Builder loadBalancedWebClientBuilder() {
        return WebClient.builder();
    }

    // 如果不通过服务发现,而是直接指定 Auth 服务的地址
    // @Bean
    // public WebClient authWebClient(WebClient.Builder builder) {
    //    return builder.baseUrl("http://auth-service-address:port").build();
    // }
}

然后,在你的网关代码里注入并使用 WebClient:

import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
// ... 其他 imports ...

@Component // 或者 @Service 等
public class AuthServiceValidator {

    private final WebClient webClient;

    // 注入通过 @LoadBalanced 配置的 WebClient.Builder
    // 或者直接注入特定服务的 WebClient Bean
    public AuthServiceValidator(@LoadBalanced WebClient.Builder webClientBuilder) {
        // 假设 Auth 服务在服务发现中的名字是 "auth-service"
        this.webClient = webClientBuilder.baseUrl("http://auth-service").build();
    }

    public Mono<Boolean> validateTokenAsync(String token) {
        return this.webClient.get() // 使用 GET 请求
                .uri(uriBuilder -> uriBuilder
                        .path("/validate") // Auth 服务的验证端点
                        .queryParam("token", token) // 请求参数
                        .build())
                .retrieve() // 发起请求并获取响应体
                .bodyToMono(Boolean.class) // 将响应体转换成 Mono<Boolean>
                .onErrorResume(ex -> {
                    // 处理调用异常,比如网络错误、服务不可用等
                    // 这里简单返回 false,实际应用中可能需要更详细的日志和处理
                    System.err.println("调用 Auth 服务验证 Token 出错: " + ex.getMessage());
                    return Mono.just(false);
                });
                // .timeout(Duration.ofSeconds(3)); // 可以设置超时
    }
}

// 在 Gateway Filter 中使用这个 Validator
@Autowired
private AuthServiceValidator authServiceValidator;

public Mono<Void> handleRequest(ServerWebExchange exchange, GatewayFilterChain chain) {
    String token = extractToken(exchange.getRequest());

    // 直接调用返回 Mono 的异步方法
    return authServiceValidator.validateTokenAsync(token)
            .flatMap(isValidToken -> {
                System.out.println("处理验证结果的线程: " + Thread.currentThread().getName()); // 观察线程名,仍然是 reactor-http-nio-x
                if (isValidToken) {
                    return chain.filter(exchange);
                } else {
                    exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
                    return exchange.getResponse().setComplete();
                }
            });
}

进阶使用技巧:

  • WebClient 配置: 你可以对 WebClient 进行很多精细化配置,例如:
    • 连接超时、读取/写入超时。
    • 配置底层的 HttpClient(比如 Reactor Netty 的连接池大小、keep-alive 等)。
    • 添加默认 Header、Filter Function(用于日志、认证、修改请求/响应等)。
  • 错误处理: WebClientretrieve() 方法在遇到 HTTP 状态码为 4xx 或 5xx 时默认会抛出 WebClientResponseException。你可以使用 .onStatus() 来定制错误处理逻辑,或者像上面例子中那样用 .onErrorResume() 来捕获所有类型的错误并提供降级逻辑。
  • 断路器与重试: 集成 Resilience4J 或 Spring Retry (reactor-extra 模块) 来实现断路器、重试等服务容错机制,增强系统稳定性。
  • 请求/响应日志: 利用 WebClient 的 Filter Function 或者配合日志框架(如 Logbook)记录详细的 HTTP 交互日志,方便调试。

安全建议:

  • 如果 Auth 服务需要 HTTPS,确保 WebClient 配置了正确的 SSL/TLS 上下文。
  • 处理 Token 这类敏感信息时,注意日志脱敏,避免在日志中意外暴露 Token 内容。

(警惕) 方案三:增加工作线程 - 治标不治本

有人可能会想:“既然是线程不够用了导致饿死,那我多加点线程不就行了?” 比如,尝试去调整底层 Netty 服务器的工作线程数。

原理和作用:

理论上,增加事件循环线程(Worker Thread)的数量,确实能让系统同时处理更多阻塞任务(因为有更多线程可以被阻塞)。

操作步骤:

通常涉及到修改 WebFlux/Gateway 的服务器配置。具体属性可能依赖于你使用的 Spring Boot 版本和具体的服务器实现(默认是 Reactor Netty)。可能需要通过 application.properties/yaml 或者配置自定义的 ReactorResourceFactory Bean 来设置类似 server.netty.worker-count 的属性(请查阅你使用的 Spring Boot 版本文档确认准确的配置方式)。

为什么不推荐:

  • 治标不治本: 这并没有解决阻塞调用本身的问题。阻塞依然存在,只是你用了更多宝贵的线程资源去“硬抗”。
  • 违反响应式设计初衷: 响应式模型的核心优势就是用少量线程处理高并发。大量增加线程会增加线程上下文切换的开销,消耗更多内存,反而可能降低系统整体吞吐量和效率。
  • 难以确定合适的线程数: 需要多少线程才够?这很难估计,并且会随着负载变化而变化。设置少了还是会饿死,设置多了浪费资源。
  • 阻塞时间不可控: 如果下游服务响应变慢,再多的线程也可能被耗尽。

强烈建议:把这作为最后的手段,并且只在阻塞调用极其短暂且次数严格受控的情况下,作为临时缓解措施考虑。 首选还是方案一或方案二。

如何选择?

那么,方案一 (Schedulers.boundedElastic) 和方案二 (WebClient) 该用哪个呢?

  • 如果你想尽可能少地改动现有代码 ,并且那个阻塞调用只是个别现象,那么使用 Mono.fromCallable(...).subscribeOn(Schedulers.boundedElastic()) (方案一) 是个不错的选择。它能快速解决问题,对原有业务逻辑的侵入性相对较小。

  • 如果你追求更彻底的响应式化 ,希望整个调用链都是非阻塞的,或者你需要对 HTTP 调用进行更精细的控制(超时、重试、连接池等),那么改用 WebClient (方案二) 是更优、更符合响应式编程思想的选择。虽然可能需要稍微多写点代码(替换 Feign 调用为 WebClient 调用),但长期来看,它能带来更好的性能和资源利用率,与 WebFlux/Gateway 的契合度也最高。

  • 至于方案三(增加线程) ,尽量避免。

选择哪种方案取决于你的具体场景、团队对响应式编程的熟悉程度以及你愿意投入的改造时间。但无论如何,直接在事件循环线程上进行阻塞调用是应该极力避免的。