SpringBoot响应式编程

BV1sC4y1K7ET
笔记地址-语雀
代码地址
.
Reactive Streams
响应式宣言
Reactor-快速开始

第一部分:响应式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
命令式编程 => 响应式编程   stream流编排


无限元素
有序
组件之间异步传递元素
强制性非阻塞背压模式


迭代器模式
1、迭代速度取决于数量
2、容器缓存


背压
浏览器 1w请求 => Tomcat(队列缓存请求,慢慢处理)
正压:正向压力:数据生产者给消费者压力
浏览器 1w请求 => Tomcat


多线程:线程多好?少好?
100个线程 4c 25线程/1c 切换
结论:线程多,只会激烈竞争。
理论:让少量线程一直忙,而不是让大量线程一直切换等待。

浏览器 => 1w请求 tomcat 200线程 4c 1c50线程 来回切换

浏览器 => 1w请求 boss线程-缓冲区[] => worker线程(同核心数)-远程调用-[数据缓冲区]
响应式编程===> 全链路无阻塞
===> 只要占了一个CPU核心的线程一直不闲,不等任何数据返回。
数据到达后自动放到缓存区,worker闲了去缓存区拿数据继续处理

公司:
1、100个活, 20个员工,活越多招的员工越多:阻塞式
2、100个活,5个员工,无限压榨资源利用率。

目的:通过全异步的方式、加缓存区构建一个实时的数据流系统。

在这里插入图片描述

1
2
3
4
5
6
7
8
9
10
全阻塞模式
a 调用 b
响应式(加个中间层 buffer)
a 放入 buffer,b 从 buffer 拿。


数据是自流动的,而不是靠迭代被动流动;
推拉模型:
推:流模式;上游有数据,自动推给下游;
拉:迭代器;自己遍历,自己拉取;

一切皆为流处理
在这里插入图片描述

发布、消费、处理、订阅关系
在这里插入图片描述

1
2
3
4
响应式编程(线程池+缓冲区):
1、底层:基于数据缓冲队列 + 消息驱动模型 + 异步回调机制
2、编码:流式编程 + 链式调用 + 声明式API
3、效果:优雅全异步 + 消息实时处理 + 高吞吐量 + 占用少量资源

jdk9 发布订阅 - 代码
在这里插入图片描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package java.util.concurrent;
public final class Flow {

// 发布者
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}

// 订阅者
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}

// 订阅关系
public static interface Subscription {
public void request(long n);
public void cancel();
}

// 处理器
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

}
1
2
3
4
5
6
7
8
9
高并发:缓存、异步、队排好
高可用:分片、复制、选领导

万物皆为数据:两种:单个、多个
Mono、Flux

非阻塞原理:缓冲 + 回调

少量线程一直运行 大于 大量线程切换等待

在这里插入图片描述

第二部分:Reactor

1
2
3
4
5
6
7
哲学:
1、道理天然就懂
2、无限碰壁,总结的哲学


一个数据流:
元素(0~N) + 信号(1 正常/异常)

Reactor-快速开始
Flux
在这里插入图片描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
* 响应式编程核心:看懂文档弹珠图;
* 信号: 正常/异常(取消)
* SignalType:
* SUBSCRIBE: 被订阅
* REQUEST: 请求了N个元素
* CANCEL: 流被取消
* ON_SUBSCRIBE:在订阅时候
* ON_NEXT: 在元素到达
* ON_ERROR: 在流错误
* ON_COMPLETE:在流正常完成时
* AFTER_TERMINATE:中断以后
* CURRENT_CONTEXT:当前上下文
* ON_CONTEXT:感知上下文
*
* doOnXxx API触发时机
* 1、doOnNext:每个数据(流的数据)到达的时候触发
* 2、doOnEach:每个元素(流的数据和信号)到达的时候触发
* 3、doOnRequest: 消费者请求流元素的时候
* 4、doOnError:流发生错误
* 5、doOnSubscribe: 流被订阅的时候
* 6、doOnTerminate: 发送取消/异常信号中断了流
* 7、doOnCancle: 流被取消
* 8、doOnDiscard:流中元素被忽略的时候
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
并发流 parallel
// 8线程 处理 100w数据,一批 100个数据
Flux.range(1,100_0000)
.buffer(100)
.parallel(8)
.runOn(Schedulers.newParallel("yy"))
.log()
.subscribe();


上下文 Context
Flux.just(1,2,3)
.transformDeferredContextual((flux,context)->{
System.out.println("flux = " + flux);
System.out.println("context = " + context);
return flux.map(i->i+"==>"+context.get("prefix"));
})
//上游能拿到下游的最近一次数据
.contextWrite(Context.of("prefix","哈哈"))
//ThreadLocal共享了数据,上游的所有人能看到; Context由下游传播给上游
.subscribe(v-> System.out.println("v = " + v));

第三部分:WebFlux

组件对比

1
2
Mono: 返回0|1 数据流
Flux:返回N数据流

在这里插入图片描述

“Reactive”定义

1
2
3
4
5
6
7
8
9
Repository:Publisher
HTTP Server:Subscriber



<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

main方法,创建服务器

HttpHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public static void main(String[] args) throws IOException {
//快速自己编写一个能处理请求的服务器

//1、创建一个能处理Http请求的处理器。 参数:请求、响应; 返回值:Mono<Void>:代表处理完成的信号
HttpHandler handler = (ServerHttpRequest request,
ServerHttpResponse response)->{
URI uri = request.getURI();
System.out.println(Thread.currentThread()+"请求进来:"+uri);

//创建 响应数据的 DataBuffer
DataBufferFactory factory = response.bufferFactory();

//数据Buffer
DataBuffer buffer = factory.wrap(new String(uri + " ==> Hello!").getBytes());

// 需要一个 DataBuffer 的发布者
return response.writeWith(Mono.just(buffer));
};

//2、启动一个服务器,监听8080端口,接受数据,拿到数据交给 HttpHandler 进行请求处理
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);


//3、启动Netty服务器
HttpServer.create()
.host("localhost")
.port(8080)
.handle(adapter) //用指定的处理器处理请求
.bindNow(); //现在就绑定

System.out.println("服务器启动完成....监听8080,接受请求");
System.in.read();
System.out.println("服务器停止....");


}

WebFlux Controller:Mono、Flux、sse

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
    //现在推荐的方式
//1、返回单个数据Mono: Mono<Order>、User、String、Map
//2、返回多个数据Flux: Flux<Order>
//3、配合Flux,完成SSE: Server Send Event; 服务端事件推送

@ResponseBody
@Controller
public class HelloController {

//现在推荐的方式
//1、返回单个数据Mono: Mono<Order>、User、String、Map
//2、返回多个数据Flux: Flux<Order>
//3、配合Flux,完成SSE: Server Send Event; 服务端事件推送

@GetMapping("/haha")
public Mono<String> haha(){
return Mono.just("哈哈");
}

@GetMapping("/hehe")
public Flux<String> hehe(){
return Flux.just("hehe1","hehe2");
}

//text/event-stream
//SSE测试; chatgpt都在用; 服务端推送
@GetMapping(value = "/sse",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> sse(){
return Flux.range(1,10)
.map(i-> {

//构建一个SSE对象
return ServerSentEvent.builder("ha-" + i)
.id(i + "")
.comment("hei-" + i)
.event("haha")
.build();
})
.delayElements(Duration.ofMillis(500));
}

}
1
2
3
Mybatis ResultMap

R2dbc Converter