Reactor

reactor,rxjava 都是实现了 reactive-streams 里的四个核心 API
在这里插入图片描述

webflux 依赖 reactor-netty,进而依赖了 reactor
在这里插入图片描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
什么是响应式,就是基于响应的反应。

背压,电影的缓冲就是背压
=> 在线播放电影的时候,观看可能5s就缓冲完了,你不可以5s内看完,这个电影就要保存下来,慢慢看,就是背压。
=> 生产端进行电影下发,客户端就是容器,保存电影,你只有看的时候它才会缓冲,也就是你订阅了。

=> 下雨的水库,下雨的时候,水会存起来,有大坝、水库。
=> 如果没有大坝就会直接把下游冲垮,有大坝则可以按需排放。

=> 有一个水龙头,每隔一段时间放10升水,你可能突然渴了会用1升水,那么剩下9分钟就浪费了。
=> 背压策略:有时候你临时请求没有了,来一个元素放一个元素,按照我们的需求,可以增加一个蓄水池,按照下游的需求来排放。
=> 所以说,背压就是一个暂存元素的地方。

=> 上游生产的太多了,下游不能及时消费了,需要一个暂存的地方。
=> 有时候听课,听不懂,可以暂时录下来,后面慢慢听。
=> 背压:暂存元素的地方,放在上游。


上游订阅
org.reactivestreams.Publisher#subscribe

下游消费
org.reactivestreams.Subscriber#onNext
org.reactivestreams.Subscriber#onSubscribe

承上启下
org.reactivestreams.Subscription

# 四个API
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}

public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}

public interface Subscription {
public void request(long n);
public void cancel();
}

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

reactor 改进 rxjava create

1
2
3
4
5
6
7
8
9
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure) {
return onAssembly(new FluxCreate<>(emitter, backpressure, FluxCreate.CreateMode.PUSH_PULL));
}

public static <T> Flowable<T> create(@NonNull FlowableOnSubscribe<T> source, @NonNull BackpressureStrategy mode) {
Objects.requireNonNull(source, "source is null");
Objects.requireNonNull(mode, "mode is null");
return RxJavaPlugins.onAssembly(new FlowableCreate<>(source, mode));
}

并发更新的安全性

1
2
3
有两种:
1、乐观锁 cas原子更新 例如:AtomicReferenceFieldUpdater。。。(抢占低)
2、悲观锁 synchronized、lock(抢占高)

reactor Flux.create

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
public abstract class Flux<T> implements CorePublisher<T> {
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure) {
return onAssembly(new FluxCreate<>(emitter, backpressure, FluxCreate.CreateMode.PUSH_PULL));
}
}

final class FluxCreate<T> extends Flux<T> implements SourceProducer<T> {
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
BaseSink<T> sink = createSink(actual, backpressure);

actual.onSubscribe(sink);
try {
// create情况下,返回 SerializedFluxSink(下面会讲)
source.accept(
createMode == CreateMode.PUSH_PULL ? new SerializedFluxSink<>(sink) :
sink);
}
catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
sink.error(Operators.onOperatorError(ex, actual.currentContext()));
}
}

static <T> BaseSink<T> createSink(CoreSubscriber<? super T> t, OverflowStrategy backpressure) {
switch (backpressure) {
case IGNORE: {
return new IgnoreSink<>(t);
}
case ERROR: {
return new ErrorAsyncSink<>(t);
}
case DROP: {
return new DropAsyncSink<>(t);
}
case LATEST: {
return new LatestAsyncSink<>(t);
}
default: {
// 默认 256 ===> public static final int SMALL_BUFFER_SIZE = Math.max(16, Integer.parseInt(System.getProperty("reactor.bufferSize.small", "256")));
return new BufferAsyncSink<>(t, Queues.SMALL_BUFFER_SIZE);
}
}
}

}


// 使用队列缓冲
static final class BufferAsyncSink<T> extends BaseSink<T> {
final Queue<T> queue;// 水库,背压容器,

volatile int wip;// 接收数量

BufferAsyncSink(CoreSubscriber<? super T> actual, int capacityHint) {
super(actual);
this.queue = Queues.<T>unbounded(capacityHint).get();// 具有固定链接大小的无边界、阵列支持的单个生产者、单个消费者队列
}

@Override
public FluxSink<T> next(T t) {
queue.offer(t);
drain();
return this;
}

// 省略部分代码
void drain() {// drain=排出
if (WIP.getAndIncrement(this) != 0) {
return;
}

final Subscriber<? super T> a = actual;
final Queue<T> q = queue;

for (; ; ) {
T o = q.poll();
a.onNext(o);


if (WIP.decrementAndGet(this) == 0) {
break;
}
}

}
}

// 关于上面:create 默认 CreateMode.PUSH_PULL,使用 new SerializedFluxSink<>(sink)
// 序列化对onNext、onError和onComplete的调用。
static final class SerializedFluxSink<T> implements FluxSink<T>, Scannable {
SerializedFluxSink(BaseSink<T> sink) {
this.sink = sink;
this.mpscQueue = Queues.<T>unboundedMultiproducer().get();// 包装为:多生产者-单消费者无限队列
}

@Override
public FluxSink<T> next(T t) {// ...省略部分代码
// 情况1、sink.next
if (WIP.get(this) == 0 && WIP.compareAndSet(this, 0, 1)) {
sink.next(t);
}
// 情况2、mpscQueue.offer
else {
this.mpscQueue.offer(t);
}
drainLoop();
return this;
}
}

// 只保存一个值,后面的会覆盖前面的,
static final class LatestAsyncSink<T> extends BaseSink<T> {

final AtomicReference<T> queue;

@Override
public FluxSink<T> next(T t) {
T old = queue.getAndSet(t);
Operators.onDiscard(old, ctx);
drain();
return this;
}
}

// 接收第一个,后面全忽略
static final class IgnoreSink<T> extends BaseSink<T> {
@Override
public FluxSink<T> next(T t) {// ...省略部分代码
actual.onNext(t);

for (; ; ) {
long r = requested;
if (r == 0L || REQUESTED.compareAndSet(this, r, r - 1)) {
return this;
}
}
}
}

2022-12-20 16:53:21 我是真的看不下去了,
百度了一下,看过他书的人评论也很差劲,
但是找到了下面一篇文章,真的好。


—————

2022-12-20…18.53.05 知秋:响应式.txt

2018.9.10 响应式一些内容的分享总结

响应式到底是什么?

现实生活中,当我们听到有人喊我们的时候,我们会对其进行响应,也就是说,我们是基于事件驱动模式来进行的编程。
所以这个过程其实就是对于所产生事件的下发,我们的消费者对其进行的一系列的消费。
从这个角度,我们可以思考,整个代码的设计我们应该是针对于消费者来讲的,
=> 比如看电影,有些画面我们不想看,那就闭上眼睛,有些声音不想听,那就捂上耳朵,
=> 说白了,就是对于消费者的增强包装,我们将这些复杂的逻辑给其拆分,然后分割成一个个的小任务进行封装,于是就有了诸如filter、map、skip、limit等操作。而对于其中源码的设计逻辑,见下文。

并发与并行的关系

可以这么说,并发很好的利用了CPU时间片的特性,也就是操作系统选择并运行一个任务,接着在下一个时间片会运行另一个任务,并把前一个任务设置成等待状态。
其实这里想表达的是并发并不意味着并行
具体来举几个情况:

  • 有时候多线程执行会提高应用程序的性能,而有时候反而会降低程序的性能。这在关于JDK中其Stream API的使用上体现的很明显,如果任务量很小,而我们又使用了并行流,反而降低了性能。

  • 我们在多线程编程中可能会同时开启或者关闭多个线程,这会产生的很多性能开销,这也降低了程序性能。

  • 当我们的线程同时都在等待IO过程,此时并发也就可能会阻塞CPU资源,其造成的后果不仅仅是用户在等待结果,同时会浪费CPU的计算资源。

  • 如果几个线程共享了一个数据,情况就变得有些复杂了,我们需要考虑数据在各个线程中状态的一致性。为了达到这个目的,我们很可能会使用Synchronized或者是lock来解决。

现在,应该对并发有一定的认知了吧。并发是一个很好的东西,但并不一定会实现并行。并行是在多个CPU核心上的同一时间运行多个任务或者一个任务分为多块执行(如ForkJoin)。单核CPU的话就不要考虑了。
补充一点,实际上多线程就意味着并发,但是并行只发生在当这些线程在同一时间调度分配在不同CPU上执行。也就是说,并行是并发的一种特定的形式。往往我们一个任务里会产生很多元素,然而这些个元素在不做操作的情况下大都只能在当前线程中操作,要么我们就要对其进行ForkJoin,但这些对于我们很多程序员来讲有时候很不好操作控制,上手难度有些高,响应式的话,我们可以简单的通过其调度API就可以轻松做到事件元素的下发分配,其内部将每个元素包装成一个任务提交到线程池中,我们可以根据是否是计算型任务还是IO类型的任务来选择相应的线程池。
这里,需要强调一下:线程只是一个对象而已,不要把其想象成cpu中的某一个执行核心,这是很多人都在犯的错,cpu时间片切换执行这些个线程。

响应式中的背压到底是一种怎样的理解

用一个不算很恰当的中国的成语来讲,就是承上启下。

为了更好的解释,我们来看一个场景,大坝,在洪水时期,下游没有办法一下子消耗那么多水,大坝在此的作用就是拦截洪水,并根据下游的消耗情况酌情排放。

再者,父亲的背,我们小时候,社会上很多的事情首先由父亲用自己的背来帮我们来扛起,然后根据我们自身的能力来适当的下发给我们压力,

也就是说,背压应该写在连接元素生产者和消费者的一个地方,即生产者和消费者的连线者。

然后,通过这里的描述,背压应该具有承载元素的能力,也就是其必须是一个容器的,而且元素的存储与下发应该具有先后的,那么使用队列则是最适合不过了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
背压,电影的缓冲就是背压
=> 在线播放电影的时候,观看可能5s就缓冲完了,你不可以5s内看完,这个电影就要保存下来,慢慢看,就是背压。
=> 生产端进行电影下发,客户端就是容器,保存电影,你只有看的时候它才会缓冲,也就是你订阅了。

=> 下雨的水库,下雨的时候,水会存起来,有大坝、水库。
=> 如果没有大坝就会直接把下游冲垮,有大坝则可以按需排放。

=> 有一个水龙头,每隔一段时间放10升水,你可能突然渴了会用1升水,那么剩下9分钟就浪费了。
=> 背压策略:有时候你临时请求没有了,来一个元素放一个元素,按照我们的需求,可以增加一个蓄水池,按照下游的需求来排放。
=> 所以说,背压就是一个暂存元素的地方。

=> 上游生产的太多了,下游不能及时消费了,需要一个暂存的地方。
=> 有时候听课,听不懂,可以暂时录下来,后面慢慢听。
=> 背压:暂存元素的地方,放在上游。

reactive-streams

  • reactor,rxjava 都是实现了 reactive-streams 里的四个核心 API
    在这里插入图片描述
  • webflux 依赖 reactor-netty,进而依赖了 reactor
    在这里插入图片描述

如何去看Rxjava或者Reactor的源码

如何去看Rxjava或者Reactor的源码,根据源码的接口的设计我们可以得到一些什么启示

关于响应式的Rx标准已经写入了JDK中:java.util.concurrent.Flow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@FunctionalInterface
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);// ---------------1
}

public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);// ------------------2

public void onNext(T item);// ------------------4

public void onError(Throwable throwable);

public void onComplete();
}


public static interface Subscription {
public void request(long n);// ---------------------3

public void cancel();
}


public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

可以看到,Flow这个类中包含了这4个接口定义,Publisher 通过subscribe方法来和Subscriber产生订阅关系,而Subscriber依靠onSubscribe来首先和上游产生联系,这里就是靠Subscription来做到的,所以说,Subscription往往会作为生产者的内部类定义其中,其用来接收生产者所生产的元素,支持背压的话,Subscription应该首先将其放入到一个队列中,然后根据请求数量来调用Subscriber的onNext等方法进行下发。

这个在Rx编程中都是统一的模式,我们通过Reactor中reactor.core.publisher.Flux#fromArray所涉及的FluxArray的源码来对此段内容进行理解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
final class FluxArray<T> extends Flux<T> implements Fuseable, Scannable {

final T[] array;

@SafeVarargs
public FluxArray(T... array) {
this.array = Objects.requireNonNull(array, "array");
}

@SuppressWarnings("unchecked")
public static <T> void subscribe(CoreSubscriber<? super T> s, T[] array) {
if (array.length == 0) {
Operators.complete(s);
return;
}
if (s instanceof ConditionalSubscriber) {
s.onSubscribe(new ArrayConditionalSubscription<>((ConditionalSubscriber<? super T>) s, array));
}
else {
s.onSubscribe(new ArraySubscription<>(s, array));
}
}

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
subscribe(actual, array);
}


@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.BUFFERED) return array.length;
return null;
}

static final class ArraySubscription<T>
implements InnerProducer<T>, SynchronousSubscription<T> {

final CoreSubscriber<? super T> actual;

final T[] array;

int index;

volatile boolean cancelled;

volatile long requested;
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<ArraySubscription> REQUESTED =
AtomicLongFieldUpdater.newUpdater(ArraySubscription.class, "requested");

ArraySubscription(CoreSubscriber<? super T> actual, T[] array) {
this.actual = actual;
this.array = array;
}

@Override
public void request(long n) {
if (Operators.validate(n)) {
if (Operators.addCap(REQUESTED, this, n) == 0) {
if (n == Long.MAX_VALUE) {
fastPath();
}
else {
slowPath(n);
}
}
}
}

void slowPath(long n) {
final T[] a = array;
final int len = a.length;
final Subscriber<? super T> s = actual;

int i = index;
int e = 0;

for (; ; ) {
if (cancelled) {
return;
}

while (i != len && e != n) {
T t = a[i];

if (t == null) {
s.onError(new NullPointerException("The " + i + "th array element was null"));
return;
}

s.onNext(t);

if (cancelled) {
return;
}

i++;
e++;
}

if (i == len) {
s.onComplete();
return;
}

n = requested;

if (n == e) {
index = i;
n = REQUESTED.addAndGet(this, -e);
if (n == 0) {
return;
}
e = 0;
}
}
}

void fastPath() {...}

}

static final class ArrayConditionalSubscription<T>
implements InnerProducer<T>, SynchronousSubscription<T> {
....
}

}

各种中间操作的包装我们该如何去做,依据之前的接口定义,我们应该更注重功能的设定,而无论是filter,flatmap,map等这些常用的操作,其实都是消费动作,理应定义在消费者层面,想到这里,我们该如何去做?
这里,我们就要结合我们的设计模式,装饰模式,对subscribe(Subscriber<? super T> subscriber)所传入的Subscriber进行功能增强,即从Subscriber这个角度来讲,使用的是装饰增强模式,但从外面来看,其整体定义的依然是一个Flux或者Mono,这里FluxArray的话就是例子,这样,从这个角度来讲,其属于向上适配,也就是适配模式,这里的适配玩的比较有意思,完全就是靠对内部类的包装然后通过subscribe(Subscriber<? super T> subscriber)衔接来完成的。

所以,我们应该想到中国古代苏轼的题西林壁里有一句话:横看成岭侧成峰 远近高低各不同 讲的就是从不同的角度去看待一个事物,就会得到不同的结果。同样,一百个人心中有一百个哈姆雷特,也是对于同一个事物的看法,从这里,我们应该能学到设计模式千万不要特别刻意的去绝对化!

我们可以结合reactor.core.publisher.Flux#filter涉及的FluxFilter来观察理解上述涉及的内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
final class FluxFilter<T> extends FluxOperator<T, T> {

final Predicate<? super T> predicate;

FluxFilter(Flux<? extends T> source, Predicate<? super T> predicate) {
super(source);
this.predicate = Objects.requireNonNull(predicate, "predicate");
}

@Override
@SuppressWarnings("unchecked")
public void subscribe(CoreSubscriber<? super T> actual) {
if (actual instanceof ConditionalSubscriber) {
source.subscribe(new FilterConditionalSubscriber<>((ConditionalSubscriber<? super T>) actual,
predicate));
return;
}
source.subscribe(new FilterSubscriber<>(actual, predicate));
}

static final class FilterSubscriber<T>
implements InnerOperator<T, T>,
Fuseable.ConditionalSubscriber<T> {

final CoreSubscriber<? super T> actual;

final Predicate<? super T> predicate;

Subscription s;

boolean done;

FilterSubscriber(CoreSubscriber<? super T> actual, Predicate<? super T> predicate) {
this.actual = actual;
this.predicate = predicate;
}

@Override
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
this.s = s;
actual.onSubscribe(this);
}
}

@Override
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
return;
}

boolean b;

try {
b = predicate.test(t);
}
catch (Throwable e) {
onError(Operators.onOperatorError(s, e, t, actual.currentContext()));
return;
}
if (b) {
actual.onNext(t);
}
else {
s.request(1);
}
}

@Override
public boolean tryOnNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
return false;
}

boolean b;

try {
b = predicate.test(t);
}
catch (Throwable e) {
onError(Operators.onOperatorError(s, e, t, actual.currentContext()));
return false;
}
if (b) {
actual.onNext(t);
}
return b;
}

@Override
public void onError(Throwable t) {
if (done) {
Operators.onErrorDropped(t, actual.currentContext());
return;
}
done = true;
actual.onError(t);
}

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
actual.onComplete();
}

@Override
@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) return s;
if (key == Attr.TERMINATED) return done;

return InnerOperator.super.scanUnsafe(key);
}

@Override
public CoreSubscriber<? super T> actual() {
return actual;
}

@Override
public void request(long n) {
s.request(n);
}

@Override
public void cancel() {
s.cancel();
}
}

static final class FilterConditionalSubscriber<T>
implements InnerOperator<T, T>,
Fuseable.ConditionalSubscriber<T> {

...
}

}

根据这些设计,我们自己也是完全可以作为参考来通过一套api接口设计,可以衍生出很多规范逻辑的开发,比如我们看到的众多的Rx衍生操作API的设计实现,其都是按照一套模板来进行的,我们可以称之为代码层面的微服务设计。

如何去看待众多函数表达式

人类最擅长描述场景,比如一套动作,假如是舞蹈的话,可以讲是什么什么编舞,但是这个编舞又要在一定的框架之下,即有一定的规范,同样,我们施展一套拳法,也需要一个规范,不能踢一脚也叫拳法。而对于这个规范的实现,那就是一套动作,对于拳法来讲,可能就是一个很简单的左勾拳或者右勾拳,也可以是比较复杂的咏春拳,太极拳等,而且一套拳法可能有很多小套路组成,这些小套路也是遵循着这个规范进行的,那么依据这个思路,我们来看下面的函数式接口定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@FunctionalInterface
public interface Predicate<T> {
boolean test(T t);
default Predicate<T> and(Predicate<? super T> other) {
Objects.requireNonNull(other);
return (t) -> test(t) && other.test(t);
}
default Predicate<T> negate() {
return (t) -> !test(t);
}
default Predicate<T> or(Predicate<? super T> other) {
Objects.requireNonNull(other);
return (t) -> test(t) || other.test(t);
}
static <T> Predicate<T> isEqual(Object targetRef) {
return (null == targetRef)
? Objects::isNull
: object -> targetRef.equals(object);
}
}

@FunctionalInterface
public interface BiConsumer<T, U> {

void accept(T t, U u);


default BiConsumer<T, U> andThen(BiConsumer<? super T, ? super U> after) {
Objects.requireNonNull(after);

return (l, r) -> {
accept(l, r);
after.accept(l, r);
};
}
}

可以看到无论是条件判断表达式Predicate还是无返回值动作处理函数BiConsumer都遵循一个标准动作的设计定义思路,并通过default方法来对同类动作进行编排,以达到更加丰富的效果。所以,函数式的应用更加倾向于干净利落,凸显自己要做的事情就好,未来,我会在自己的Java编程方法论- JDK篇中花大量篇幅来解读函数式编程的各种奇特而实用的使用方法,来降低我们复杂接口的设计逻辑难度,做到知名见义,了然于胸的效果。这个在我的Java编程方法论- Reactor与Spring webflux篇中也是有涉及的。

关于响应式的使用性能的考究

响应式编程知识一种模式,用的好与坏全看自己对于api的理解程度,不要想着会多么的降低性能,这个并没有进行什么过度包装这一说的,当讲到jdbc这里如何表现不行的时候,当前并没有一个开源的Reactor-jdbc的框架,也就造成的测试的不合理性,何况新的知识是需要大家一起共同来学习推动的,不好的地方我们推动就好,不需要上来就对其进行否定,mongodb有提供相应的响应式api,但其内部还是之前的方式,同样,关系型数据库也是一个道理,响应式编程注重的是中间过程的处理,关于生产元素的获取它没太多关系,更多的还是看元素生产者的性能,一家之言,可能有偏颇,希望理解,有问题提出就好。


———–

1
2
3
4
5
反应式系统的特质:
+ 即时响应性:只要有可能, 系统就会及时地做出响应。
+ 回弹性:系统在出现失败时依然保持即时响应性。
+ 弹性: 系统在不断变化的工作负载之下依然保持即时响应性。
+ 消息驱动:反应式系统依赖异步的消息传递,从而确保了松耦合、隔离、位置透明的组件之间有着明确边界。

反应式宣言 (Simplified Chinese)
https://www.reactivemanifesto.org/zh-CN

1
2
3
4
同步阻塞,喝酒吹瓶,一口喝不下会喷出来。

异步解耦
背压,一个开关,来控制频率。我今天心情不好想要10条,明天心情好了想要100条。
  • 同步阻塞调用模型、反应式系统模型
    在这里插入图片描述

Spring reactive
https://spring.io/reactive

JDK9 Flow

1
2
3
4
5
6
7
8
9
10
11
12
发布者,订阅者,建立关系

oSub 第一次订阅会触发此方法,
next 处理数据
error 错误
complete 完成

request long n 背压实现,控制数据量

处理器 既是发布,也是订阅。

256 默认缓冲大小