《Java编程方法论:响应式Spring Reactor 3设计与实现》学习笔记目录
笔记来源:《Java编程方法论:响应式Spring Reactor 3设计与实现》ISBN:9787121394768 作者:知秋 出版时间:2020年09月
Flux
- 是可以发出 0 到 N 个元素的生产者
- 核心方法是
public final void subscribe(Subscriber<? super T> actual)
2.2 用 Flux.create 创建源
2.2.2 Flux 的快速包装方法
支持背压的方法
- just: 可以指定序列中包含的全部元素。创建的 Flux 源序列会在发布元素之后自动结束。
- fromArray、fromIterable、fromStream: 从这些对象中创建 Flux 对象。
- empty: 创建一个不包含任何元素,只发布结束消息的源序列
- error (Throwable error): 创建一个只包含错误消息的源序列
- never: 创建一个不包含任何消息通知的序列
- range: 创建从 start 起始的 count 个 Integer 对象的源序列
不支持背压的方法(想要支持背压,可手动添加 onBackpressureXXX 方法)
- interval (Duration period) 和 interval (Duration delay, Duration period): 创建一个包含了从 0 开始递增的 Long 对象的源序列
1
2
3
4
5
6
7
8
9
| @Test
void fluxCreate2_2_2() {
Flux.just("Hello", "aoeai.com").subscribe(System.out::println);
Flux.fromArray(new Integer[]{1,2,3,4}).subscribe(System.out::println);
Flux.empty().subscribe(System.out::println);
Flux.range(10, 3).subscribe(System.out::println);
Flux.interval(Duration.ofMillis(100)).subscribe(System.out::println);
sleep(500);
}
|
2.3 蛇形走位的 QueueSubscription
2.3.1 无界队列 SpscLinkedArrayQueue
2.4 Mono 的二三事
只有 0 到 1 个元素的序列
2.5 通过 BaseSubscriber 自定义订阅者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
public class DemoSubscriber <T> extends BaseSubscriber<T> {
@Override
public void hookOnSubscribe(Subscription subscription) {
System.out.println("Subscribed");
request(1);
}
@Override
public void hookOnNext(T value) {
System.out.println(value);
request(1);
}
}
|
- 在参与订阅的时候,会先调用 onSubscribe 方法,通过这个回调方法可以定义:推 或 拉
- 拉
- 在 hookOnSubscribe 回调方法内进行 request 方法调用
- 然后执行中央的方法 onNext,其中包含最重要的消费逻辑,所以必须重写 hookOnNext 回调方法
- hookOnXXX 方法都是空实现,需要根据自己的实际情况重写
- 推:调用 request(Long.MAX_VALUE)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| @Test
public void DemoSubscriber_test() {
Flux.just("Hello", "DockerX").subscribe(new DemoSubscriber<>());
}
@Test
public void DemoSubscriber_test2() {
Flux.just("Hello", "DockerX").subscribe(new BaseSubscriber<>() {
@Override
protected void hookOnNext(String value) {
System.out.println(value);
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("Subscribed");
requestUnbounded();
}
});
}
|
2.6 将常见的监听器改造成响应式结构
2.7 Flux.push 的特殊使用场景及细节探索
- Flux.push 是 Flux.create 的一个变种,值适用处理单线程下生产者产生的事件
- 当确定生产端只有一个线程在生产元素,使用 Flux.push 可以因减少一层包装而提高性能
2.8 对 Flux.handle 的解读