炀餮氢 发表于 2025-6-2 00:33:25

响应式编程之Project Reactor

Project Reactor作为响应式编程范式的核心实现框架,严格遵循Reactive Streams规范体系,其架构设计完整包含了规范定义的四个核心组件:Publisher(数据源)、Subscriber(订阅者)、Subscription(订阅关系)和Processor(处理节点)。在该框架中,Flux和Mono不仅实现了Publisher接口的标准语义,更构建了完整的响应式数据流处理范式:通过订阅关系建立生产-消费通道,基于事件驱动机制实现非阻塞式数据推送,同时通过背压(backpressure)协议保障系统的弹性通信。
基本流程

从整体上理解 Project Reactor 的工作原理,能够帮助我们更清晰地掌握其中的各种概念和操作,避免迷失方向。实际上,从大局来看,整个 Reactor 就是基于订阅-发布模式的。Flux 和 Mono 作为系统中默认的 Publisher,简化了我们自定义 Publisher 的工作。Flux 和 Mono 集成了大量的操作符,这些操作符的存在减少了我们自定义 Subscriber 和 Processor 的需求。通过这些操作符的组合,我们可以直接对数据源和元素进行操作,而无需自己编写额外的 Processor 和 Subscriber。除非在特殊情况下,否则不建议主动去自定义 Subscriber 和 Processor。
创建数据源(Flux,Mono)->转换和处理数据(map,filter...)->subscribe订阅数据源一、响应式数据源:

1.1 Flux与Mono

作为Project Reactor的核心发布者,Flux和Mono的主要区别如下:

[*]Flux代表0-N个元素的异步序列
[*]Mono表示0-1个结果的异步操作
// 创建Flux
Flux.just("1", "2", "3").subscribe(System.out::println);

// 创建Mono
Mono.just("a").subscribe(System.out::println);1.2 数据源类型

了解了Flux和Mono之后,我们知道了如何简单的创建数据源,其中Flux和Mono也给我们提供了非常多的创建数据源的方式,大概分为以下几类。

[*]‌空数据源: 用于表示无数据的完成信号(如删除操作的结果)。
[*]‌动态生成:Mono.create 和 Flux.generate/Flux.create 允许手动控制元素发射(同步或异步)。
[*]‌异步数据源: 从 Future、Callable 或 Supplier 中获取数据,支持非阻塞操作。
[*]‌时间驱动: Mono.delay 延迟发射,Flux.interval 周期性发射递增数值。
[*]‌合并/组合: zip 严格对齐元素,merge 无序合并,concat 顺序连接。
[*]‌背压适配: 通过 FluxSink 或 MonoSink 手动控制背压和元素发射。
Mono 和 Flux 数据源创建方式分类总结‌
‌类别‌‌描述‌‌Mono 方法示例‌‌Flux 方法示例‌‌空数据源‌创建不发射任何元素的数据流。Mono.empty()Flux.empty()‌单个元素‌发射单个静态值或对象。Mono.just(T)Flux.just(T...)‌多个元素‌发射多个静态值或对象(仅 Flux 支持)。N/AFlux.just(T1, T2...)‌集合/数组‌从集合或数组生成元素。N/AFlux.fromIterable(List) Flux.fromArray(T[])‌流(Stream)‌从 Java Stream 生成元素。N/AFlux.fromStream(Stream)‌动态生成‌通过生成器函数动态生成元素。Mono.create(sink -> {...})Flux.generate(sink -> {...}) Flux.create(sink -> {...})‌异步数据源‌从异步操作(如 Future、Callable)获取数据。Mono.fromFuture(Future) Mono.fromCallable(Callable)Flux.from(Publisher) Flux.fromStream(Supplier)‌错误信号‌直接发射错误信号。Mono.error(Throwable)Flux.error(Throwable)‌延迟初始化‌惰性生成数据(订阅时才执行逻辑)。Mono.defer(() -> ...) Mono.fromSupplier(Supplier)Flux.defer(() -> ...) Flux.fromStream(Supplier)‌时间驱动‌基于时间生成数据(如定时、延迟)。Mono.delay(Duration)Flux.interval(Duration)‌合并/组合‌合并多个数据源。Mono.zip(Mono1, Mono2...)Flux.merge(Flux1, Flux2...) Flux.concat(Flux1, Flux2...) Flux.zip(Flux1, Flux2...)‌背压适配‌适配外部背压机制(如 Sink 手动控制)。Mono.create(MonoSink)Flux.create(FluxSink)‌条件触发‌根据条件生成数据(如 first、takeUntil)。Mono.firstWithValue(Mono1, Mono2)Flux.firstWithValue(Publisher...) Flux.takeUntil(Predicate)1.3 数据源发布模型

Project Reactor 的发布模型是其响应式编程的核心机制,主要分为 ‌冷发布者(Cold Publisher)‌ 和 ‌热发布者(Hot Publisher)‌。它们的区别在于数据流的生成、共享方式以及订阅者的消费行为。以下是详细解释:
1.3.1、冷发布者(Cold Publisher)

‌定义‌:冷发布者为每个订阅者生成‌独立的数据流‌。每个订阅者都会触发数据源的完整生成过程,即使其他订阅者已订阅过。
‌特点‌:

[*]‌数据流独立‌:每个订阅者从头开始消费数据。
[*]‌延迟生成‌:数据在订阅时才开始生成(惰性计算)。
[*]‌资源隔离‌:不同订阅者的数据生成逻辑互不影响。
‌适用场景‌:

[*]需要每个订阅者获取完整数据(如 HTTP 请求、数据库查询)。
[*]数据源的生成成本较高,但需确保订阅者的独立性。
代码示例
// 创建冷发布者
Flux<Integer> coldFlux = Flux.range(1, 3).doOnNext(i -> System.out.println("冷发布者发出: " + i));
// 第一个订阅者
coldFlux.subscribe(i -> System.out.println("订阅者1: " + i));
// 第二个订阅者
coldFlux.subscribe(i -> System.out.println("订阅者2: " + i));输出
冷发布者发出: 1
订阅者1: 1
冷发布者发出: 2
订阅者1: 2
冷发布者发出: 1
订阅者2: 1
冷发布者发出: 2
订阅者2: 2
1.3.2、热发布者(Hot Publisher)


[*]‌定义‌:热发布者共享一个‌统一的数据流‌,所有订阅者消费同一份数据。数据源的生成与订阅者的订阅时间无关,后订阅的订阅者可能错过早期数据。
‌特点‌:

[*]‌数据流共享‌:所有订阅者接收同一数据源。
[*]‌实时性‌:数据源的生成独立于订阅行为。
[*]‌资源复用‌:多个订阅者共享同一数据生成逻辑。
‌适用场景‌:

[*]实时事件推送(如传感器数据、股票报价)。
[*]需要广播数据,避免重复生成高成本操作(如 WebSocket 消息)。

热发布者的实现方式有如下几种:

[*]‌ConnectableFlux(手动控制)‌
通过 publish() 方法将 Flux 转换为 ConnectableFlux,需手动调用 connect() 启动数据流。
代码示例
      // 创建 ConnectableFlux 并转换为热发布者
      ConnectableFlux<Integer> hotFlux = Flux.range(1, 3)
                .doOnNext(i -> System.out.println("热发布者发出: " + i))
                .publish(); // 转换为 ConnectableFlux
      // 订阅者A
      hotFlux.subscribe(i -> System.out.println("订阅者A: " + i));
      // 订阅者B
      hotFlux.subscribe(i -> System.out.println("订阅者B: " + i));
      // 手动触发数据流开始
      hotFlux.connect();输出
热发布者发出: 1
订阅者A: 1
订阅者B: 1
热发布者发出: 2
订阅者A: 2
订阅者B: 2
热发布者发出: 3
订阅者A: 3
订阅者B: 3
[*]‌autoConnect()‌(自动连接)
当达到指定订阅者数量时,自动启动数据流。
      Flux<Integer> autoFlux = Flux.range(1, 3)
                .doOnNext(i -> System.out.println("热发布者发出: " + i))
                .publish()
                .autoConnect(2);// 当有 2 个订阅者时自动启动
      autoFlux.subscribe(i -> System.out.println("订阅者A: " + i));
      autoFlux.subscribe(i -> System.out.println("订阅者B: " + i));输出
热发布者发出: 1
订阅者A: 1
订阅者B: 1
热发布者发出: 2
订阅者A: 2
订阅者B: 2
热发布者发出: 3
订阅者A: 3
订阅者B: 3
[*]‌share()‌(简化热发布者)
等价于 publish().refCount(1):当第一个订阅者到来时启动,最后一个取消订阅时终止。
      Flux<Long> sharedFlux = Flux.interval(Duration.ofSeconds(1))
                .doOnNext(i -> System.out.println("热发布者发出: " + i))
                .take(5)
                .share();
      sharedFlux.subscribe(i -> System.out.println("订阅者A: " + i));
      Thread.sleep(2500);
      sharedFlux.subscribe(i -> System.out.println("订阅者B: " + i)); // 订阅者B错过前2个数据输出
热发布者发出: 0
订阅者A: 0
热发布者发出: 1
订阅者A: 1
热发布者发出: 2
订阅者A: 2
订阅者B: 2
热发布者发出: 3
订阅者A: 3
订阅者B: 3
热发布者发出: 4
订阅者A: 4
订阅者B: 4
[*]‌replay()‌(历史数据缓存)
允许新订阅者消费订阅前的历史数据(缓存策略可配置)。
      ConnectableFlux<Integer> replayFlux = Flux.range(1, 3)
                .doOnNext(i -> System.out.println("热发布者发出: " + i))
                .replay(2);// 缓存最近2个数据

      replayFlux.subscribe(i -> System.out.println("订阅者A: " + i));
      replayFlux.connect();
      Thread.sleep(1000);
      replayFlux.subscribe(i -> System.out.println("订阅者B: " + i)); // 订阅者B收到最后2个数据‌输出‌:
热发布者发出: 1
订阅者A: 1
热发布者发出: 2
订阅者A: 2
热发布者发出: 3
订阅者A: 3
订阅者B: 2
订阅者B: 3冷发布者和热发布者对比表格
‌特性‌‌冷发布者‌‌热发布者‌数据生成时机订阅时生成提前生成(或由 connect() 触发)订阅者独立性每个订阅者独立消费完整数据共享同一数据流资源消耗高(每个订阅者独立生成)低(共享生成逻辑)典型场景数据库查询、静态数据实时事件、广播二、强大的操作符生态系统

2.1 核心操作符分类

‌类别‌‌操作符示例‌‌功能描述‌‌转换操作符‌buffer, map, flatMap, window修改流中元素结构或内容(如分组、映射、扁平化)‌过滤操作符‌filter, take, skip按条件筛选元素(如保留满足条件的元素、跳过前N项)‌组合操作符‌merge, concat, zip合并多个流(如按顺序连接、并行合并、元素一一配对)‌条件操作符‌any, all, hasElement判断流中元素是否满足条件(如是否存在满足条件的元素)‌数学操作符‌count, sum, reduce对元素进行聚合计算(如统计总数、求和、累加)‌错误处理操作符‌onErrorReturn, onErrorResume异常时提供备选值或切换至备用流(如返回静态值、动态恢复逻辑)‌工具操作符‌delay, timeout, log, subscribe控制流生命周期(如延迟发送、超时中断、记录日志、触发订阅)整个数据源操作doOnNext,,,doOnRequest,doOnSubscribe,doOnComplete等其中以doOn开头的可以对整个数据链的不同状态进行操作2.2 常见操作(类似Java的Stream)

//转换操作符、过滤操、条件及数学操作符类似Java的Stream这里不做过多赘述
//map
Flux.just(1, 2, 3).map(i -> i + 1).subscribe(System.out::println);
//filter
Flux.just("a", "b", "c").filter(s -> s.equals("a")).subscribe(System.out::println);
//flatMap
Flux.just("a", "b", "c").flatMap(s -> Flux.just(s.toUpperCase())).subscribe(System.out::println);
//reduce
Flux.just(1, 2, 3).reduce(0, (a, b) -> a + b).subscribe(System.out::println);
//window窗口使用
Flux.just(1, 2, 3, 4, 5, 6).window(3, 1).flatMap(e -> e.reduce(0, Integer::sum)).subscribe(System.out::println);
//buffer背压或者批处理使用,会缓存数据
Flux.just(1, 2, 3, 4, 5, 6).buffer(3, 1).subscribe(System.out::println);2.3 组合操作符


[*]zip
zip操作符可以将多个(最多8个)流合并成一个流,合并的方式是将两个流中的元素按照顺序一一对应,然后将两个元素组合成一个元素。 如果两个流的长度不一致,那么最终合并成的流的长度就是两个流中长度较短的那个流的长度。
Flux<String> flux1 = Flux.just("a", "b", "c");
Flux<String> flux2 = Flux.just("d", "e", "f");
Flux<String> flux3 = Flux.just("1", "2", "3");
Flux.zip(flux1, flux2, flux3).subscribe(System.out::println);

//输出




[*]merge
merge 操作符可以将两个流合并成一个流,合并的方式是将两个流中的元素交替地放入到合并后的流中。同时运行,根据时间先后运行。
Flux<Integer> flux3 = Flux.just(1, 2, 3).delayElements(Duration.ofMillis(80));
Flux<Integer> flux4 = Flux.just(4, 5, 6).delayElements(Duration.ofMillis(50));
flux3.mergeWith(flux4).subscribe(System.out::println);

//输出由于是根据时间先后处理,所以结果大概率是这样,也有可能会稍有不同
4
1
5
2
6
3

[*]concat
concat 操作符可以将两个流合并成一个流,合并的方式是将两个流中的元素按照顺序放入到合并后的流中。按照顺序分别运行,flux1运行完成以后再运行flux2
Flux<Integer> flux1 = Flux.just(1, 2, 3).delayElements(Duration.ofMillis(80));
Flux<Integer> flux2 = Flux.just(4, 5, 6).delayElements(Duration.ofMillis(50));
flux1.concatWith(flux2).subscribe(System.out::println);
//输出
1
2
3
4
5
62.4 整个数据源操作

Project Reactor 提供了大量的以doOn开头的方法,这些方法用于在数据流的生命周期中插入副作用逻辑(如日志、监控或资源管理),‌不修改数据流本身,仅用于观察或触发行为‌。
每个方法的使用方法大致相同,下面以doOnRequest和doOnNext做一下简单的示例。
Flux.just(1, 2, 3, 4, 5, 6).doOnNext(s -> System.out.println("doOnNext: " + s)).subscribe();
System.out.println("----------------");
Flux.just(1, 2, 3).doOnRequest(s -> System.out.println("doOnRequest: " + s)).subscribe(System.out::println);
//输出
doOnNext: 1
doOnNext: 2
doOnNext: 3
doOnNext: 4
doOnNext: 5
doOnNext: 6
----------------
doOnRequest: 9223372036854775807
1
2
3下面是每个方法的使用场景和触发时机。
方法触发时机参数类型典型场景doOnSubscribe订阅时Consumer资源初始化doOnNext元素推送时Consumer日志记录、状态更新doOnError发生错误时Consumer错误监控、报警doOnComplete流正常结束时Runnable完成通知doOnRequest下游请求数据时Consumer背压调试、请求量监控doOnCancel取消订阅时Runnable资源释放doOnEach所有事件发生时Consumer统一事件处理doOnTerminate流终止前(完成/错误前)Runnable终止前清理逻辑doAfterTerminate流终止后(完成/错误后)Runnable终止后统计doOnDiscard元素被丢弃时Consumer资源回收、数据一致性检查三、执行控制:订阅与调度

3.1 订阅机制

subscribe 操作符用来订阅流中的元素。 当流中的元素没有被订阅的时候,所有的操作都不会触发,只有当流中的元素被订阅的时候,所有的操作才会触发。 通过上面内容的阅读,相信你已经对Project Reactor的发布订阅模型已经了解了个大概,上面的订阅的例子也有很多,这里不做过多的赘述。
3.2 调度器策略

Schedulers‌ 是管理线程和并发任务的核心工具,用于控制响应式流的执行上下文。通过合理选择调度器,可以优化资源利用、避免阻塞,并提升应用性能
调度器线程模型适用场景注意事项immediate当前线程轻量级同步操作避免阻塞single单线程严格顺序执行避免长时间阻塞boundedElastic动态线程池阻塞 I/O 操作控制最大线程数和队列容量parallel固定大小线程池计算密集型并行任务线程数默认等于 CPU 核心数fromExecutorService自定义线程池集成现有线程池需自行管理生命周期3.3 默认调度器

在 Project Reactor 中,可以很方便的通过publishOn和subscribeOn来切换使用的线程调度器。
Flux.range(1, 10)
      .publishOn(Schedulers.boundedElastic()) //切换调度器
      .log("publish thread:")
      .flatMap(n -> Mono.fromCallable(() -> n).subscribeOn(Schedulers.parallel()))//切换调度器
      .log("subscribe thread:")
      .subscribe();3.4 自定义虚拟线程调度器

当然在JDK17及更改的版本中也可以结合虚拟线程进一步提高并发量。
Scheduler customSchedule = Schedulers.fromExecutor(Executors.newVirtualThreadPerTaskExecutor());
Flux.range(1, 10)
      .publishOn(customSchedule)
      .log("publish thread:")
      .flatMap(n -> Mono.fromCallable(() -> n).subscribeOn(Schedulers.parallel()))
      .log("subscribe thread:")
      .subscribe();四、高级控制组件

4.1 Processor与Sink的关系

在 Project Reactor 中,‌Processor‌ 曾是一个关键组件,但随着 Reactor 3.4+ 版本的演进,官方逐渐将其标记为‌弃用(Deprecated)‌,并推荐使用更现代的 ‌Sink API‌ 替代。以下是弃用原因、两者核心区别。
1. ‌线程安全


[*]processor:大多数 Processor 实现(如 DirectProcessor、UnicastProcessor)‌非线程安全‌,直接调用 onNext、onComplete 等方法需手动同步。
[*]Sink: 原子性操作‌:Sink 提供 tryEmitNext、tryEmitError 等方法,确保多线程推送数据时的安全性。
2. ‌角色定位‌


[*]‌Processor:同时作为 Publisher 和 Subscriber,这种设计虽然灵活,但导致职责不清晰,容易误用。
[*]Sink:仅作为纯生产者(仅生成数据流)
3. ‌背压处理


[*]‌processor
对背压的支持差异大:

[*]DirectProcessor 完全忽略背压(无界队列)。
[*]UnicastProcessor 支持单订阅者的背压,但需手动配置缓冲区。

[*]Sink‌:内置配置,通过 onBackpressureBuffer、onBackpressureError 等链式方法直接定义背压行为。
4. ‌生命周期管理复杂‌


[*]‌processor‌:需显式调用 onComplete 或 onError 结束流,若遗漏可能导致资源泄漏或订阅者挂起。
[*]Sink:通过 tryEmitComplete 和 tryEmitError 明确结束流,避免资源泄漏。
5. ‌API 设计


[*]‌processor‌:Processor 的 API 未针对现代响应式编程模式优化(如缺少对重试、重播的内置支持)。
[*]Sink:灵活简单,通过 Sinks.Many 的 multicast()、unicast() 或 replay() 快速配置多订阅者行为。
4.2 API使用示例

​    由于processor已经被弃用,不推荐使用,这里不做过多介绍。

[*]1:发送单个数据
Sinks.One<String> sink = Sinks.one();
Mono<String> mono = sink.asMono();
mono.subscribe(
      value -> System.out.println("Received: " + value),
      error -> System.err.println("Error: " + error),
      () -> System.out.println("Completed")
);
sink.tryEmitValue("Hello");// 等效于 tryEmitNext + tryEmitComplete
[*]2:发送多个数据
// 创建多播 Sink, 并设计缓冲被压策略
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
//转换为flux
Flux<String> hotFlux = sink.asFlux().map(String::toUpperCase);

// 订阅者A
hotFlux.subscribe(i -> System.out.println("订阅者A: " + i));

// 订阅者B
hotFlux.subscribe(i -> System.out.println("订阅者B: " + i));

// 发送数据
sink.tryEmitNext("hello");
sink.tryEmitNext("world");
sink.tryEmitComplete();
[*]3:支持历史数据
// 创建重播 Sink,保留最近 2 个元素
Sinks.Many<String> sink = Sinks.many().replay().limit(2);

sink.tryEmitNext("A");
sink.tryEmitNext("B");

// 订阅者1 (接收历史数据 A, B)
sink.asFlux().subscribe(s -> System.out.println("Sub1: " + s));

// 推送新数据
sink.tryEmitNext("C");

// 订阅者2(接收历史数据 B, C)
sink.asFlux().subscribe(s -> System.out.println("Sub2: " + s));

//输出
Sub1: A
Sub1: B
Sub1: C
Sub2: B
Sub2: C4.2背压

4.2.1 背压策略

1.onBackpressureBuffer(缓冲策略)


[*]‌行为‌:将未消费的数据存储在缓冲区中,等待下游请求时发送。
[*]‌配置选项‌

[*]‌缓冲区大小‌:可指定有界或无界(默认无界,需谨慎使用)。
[*]‌溢出策略‌

[*]ERROR:缓冲区满时抛出 IllegalStateException。
[*]DROP_LATEST:丢弃新数据,保留旧数据。
[*]DROP_OLDEST:丢弃最旧数据,保留新数据。


[*]‌适用场景‌:允许短暂的速度不匹配,但需控制内存占用。
‌2. onBackpressureError(错误策略)


[*]‌行为‌:当缓冲区满或下游未请求时,‌立即抛出错误‌(IllegalStateException)。
[*]‌适用场景‌:严格要求实时性,容忍数据丢失但需快速失败。
3. directBestEffort(尽力而为策略)


[*]‌行为‌:无缓冲区,直接推送数据到下游。如果下游未请求,‌静默丢弃新数据‌。
[*]‌特点‌:避免内存占用,但可能导致数据丢失。
[*]‌适用场景‌:实时事件处理(如日志、指标采集),允许偶尔丢失。
‌4. replay(重播策略)


[*]‌行为
‌新订阅者重播历史数据‌

[*]同时支持实时数据推送。
[*]可配置重播的缓冲区大小(如保留最近的 N 个元素)。

[*]‌背压处理‌

[*]对新订阅者:重播历史数据时遵循背压请求。
[*]对实时数据:使用 onBackpressureBuffer 或 directBestEffort 策略。

[*]‌适用场景‌:需要新订阅者获取历史数据的场景。
‌4.2.2. 默认策略‌


[*]‌multicast()‌:默认使用 directBestEffort(无缓冲区)。
[*]‌unicast()‌:默认使用 onBackpressureBuffer(无界缓冲区)。
[*]‌replay()‌:默认保留所有历史数据(无界缓冲区)。
五、 Hooks与Context

5.1 Hooks

在 Project Reactor 中,‌Hooks‌ 是一组全局回调机制,允许对 Reactor 库的默认行为进行‌定制化扩展‌,用于调试、监控或修改响应式流的执行逻辑。
‌1、Hooks 的核心用途‌


[*]‌全局错误处理‌:捕获未被下游处理的异常。
[*]‌操作符生命周期监控‌:在操作符执行前后插入自定义逻辑。
[*]‌调试与追踪‌:增强堆栈跟踪信息,定位异步流问题。
[*]‌行为修改‌:动态替换或包装操作符的实现。
‌2、常用 Hooks 及功能‌

1. onOperatorError‌

[*]‌作用‌:捕获操作符执行过程中抛出的‌未处理异常‌。
[*]‌典型场景‌:统一日志记录、转换错误类型。
Hooks.onOperatorError((error, context) -> {
    System.err.println("全局捕获异常: " + error);
    return error;
});
‌2. onNextDropped‌

[*]‌作用‌:处理因下游取消订阅、背压溢出等原因被‌丢弃的 onNext 元素‌。
[*]‌典型场景‌:记录丢失的数据,用于审计或补偿。
Hooks.onNextDropped(item ->
    System.out.println("元素被丢弃: " + item)
);
‌3. onErrorDropped‌

[*]‌作用‌:处理因下游已终止(如已调用 onComplete)而被‌丢弃的 onError 信号‌。
[*]‌典型场景‌:避免静默忽略错误。
Hooks.onErrorDropped(error ->
    System.err.println("错误被丢弃: " + error)
);
4. onOperatorDebug‌

[*]‌作用‌:启用‌调试模式‌,为异步操作符生成增强的堆栈跟踪信息(含订阅点位置)。
[*]‌代价‌:增加性能开销,‌仅限开发环境使用‌。
Hooks.onOperatorDebug(); // 启用调试模式
‌5. onEachOperator / onLastOperator‌

[*]‌作用‌:在‌每个操作符执行前后‌插入自定义逻辑(如日志、指标采集)。
[*]‌典型场景‌:性能监控、动态修改数据流。
Hooks.onEachOperator(operator -> {
    long start = System.currentTimeMillis();
    return original -> original.doFinally(signal ->
      System.out.println("操作符耗时: " + (System.currentTimeMillis() - start) + "ms")
    );
});
6. 重置 Hooks‌

[*]恢复默认行为
javaCopy CodeHooks.resetOnOperatorError();
Hooks.resetOnNextDropped();
Hooks.resetOnOperatorDebug();
4.4 Context

在 Project Reactor 中,‌Context‌ 是用于在响应式流的各个阶段之间传递‌上下文数据‌的核心机制。它解决了传统 ThreadLocal 在异步、多线程环境中的局限性,允许数据在操作符链中安全传递。以下是 Context 的详细解析,涵盖其设计思想、API 使用及典型场景。
‌1. 为什么需要 Context?‌


[*]‌问题‌:在异步响应式流中,数据可能由不同线程处理,ThreadLocal 无法跨线程传递。
[*]‌解决方案‌:Context 提供一种与订阅链绑定的、不可变的键值存储,确保上下文数据在流的生命周期内可被安全访问。
‌2. Context 的特点‌


[*]‌不可变性‌:每次修改会生成新实例,确保线程安全。
[*]‌订阅链绑定‌:数据跟随订阅链传递,而非依赖线程(需要注意的是Context的传递是从底部往上传递的)。
[*]‌键值存储‌:类似 Map 结构,支持类型安全的键(ContextKey)。
[*]自底向上(Downstream → Upstream)‌

[*]‌写入顺序‌:后调用的 contextWrite 会覆盖先调用的。
[*]‌读取顺序‌:下游(靠近订阅点)的 Context 优先被访问。

通过 contextWrite 操作符将 Context 写入响应式流,通过deferContextual在流中读取 Context
//注意由于ontext的传递是从底部往上传递的,所以必须在下面(A点)先写入才能在(B点读取到)
Flux.just("A", "B", "C", "D")
      //记为B点拼接 Context 中的值
      .flatMap(s -> {
                  System.out.println("ssss:" + s);
                  return Mono.deferContextual(ctx -> Mono.just(s + ctx.get("suffix")));
                }
      )
      //记为A点写入 Context(关键:必须在读取操作之前调用)
      .contextWrite(Context.of("suffix", "-ctx"))
      // 订阅输出结果
      .subscribe(System.out::println);Context自底向上(Downstream → Upstream)传播示例
由于Context自底向上的传播特性,所以Context中B点的值会覆盖A点的值
Flux.just("A", "B", "C", "D")
      // 拼接 Context 中的值
      .flatMap(s -> {
                  //由于ctx222会覆盖ctx111,所以此处拼接的是ctx222
                  System.out.println("ssss:" + s);
                  return Mono.deferContextual(ctx -> Mono.just(s + ctx.get("suffix")));
                }
      )
      //记为B点,    写入 Context ctx222会覆盖ctx111
      .contextWrite(Context.of("suffix", "-ctx222"))
      //记为A点,    写入 Context
      .contextWrite(Context.of("suffix", "-ctx111"))
      // 订阅输出结果
      .subscribe(System.out::println);



//输出
ssss:A
A-ctx222
ssss:B
B-ctx222
ssss:C
C-ctx222
ssss:D
D-ctx222结语

通过深入理解Project Reactor这些核心概念,可以更好地驾驭响应式编程范式,构建出更高效、更弹性的分布式系统。与现代虚拟线程的结合,为构建新一代高并发应用提供了更优解。通过合理选择调度策略、优化线程模型,可以在保持代码简洁的同时,充分发挥硬件性能。
路漫漫其修远兮,吾将上下而求索

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: 响应式编程之Project Reactor