【基础】Reactor 响应式编程

news/2023/6/7 23:55:32

Reactor 响应式编程

  • Reactor 基本概述
  • Flow 核心概念
    • Publisher 数据发布者
    • Subscriber 数据消费者
    • Subscription 订阅关系
    • Processor 中间处理器
    • 消息发布订阅示例代码
    • Backpressure 回压
    • Backpressure 示例代码
  • Reactor -- Mono/Flux API
    • 创建 Mono/Flux
    • Exception 异常处理
    • 常用的数据处理 API
  • 参考文章

Reactor 基本概述

Reactor 是 Java 反应式编程的框架,Webflux 底层使用的也是该框架,其通过流的方式实现了异步相应。反应式编程通过异步的方式提高了系统的吞吐量,有效利用了系统资源,但其并不能提高响应的速度。其在根本上是一种 pub/sub 模式,通过在发布者与消费者之间的预留通道实现异步响应,类似于 Future 的概念。

发布者和订阅者是 Reactor 中有两个最基本的概念,可以简单理解为消息队列中的生产者和消费者的概念。在Reactor中发布者有两个,一个是 Flux,一个是 Mono。Flux 代表的是 0-N 个元素的响应式序列,而 Mono 代表的是 0-1个的元素的结果。

因为流式处理一般将处理多个元素,因此本文主要学习 Flux 相关的内容来理解 Reactor 框架的原理和使用。

Flow 核心概念

JDK9 中推出了 Flow API,用以支持 Reactive Programming,即响应式编程。

在响应式编程中,会有一个数据发布者 Publisher 和数据订阅者 Subscriber:

  • Publisher 发布数据;

  • Subscriber 接收 Publisher 发布的数据并进行消费;

  • 在 Subscriber 和 Publisher 之间还存在一个 Processor,类似于一个过滤器,可以对数据进行中间处理;

  • Publisher 与 Subscriber 之间的订阅关系由 Subscription 控制;

Publisher 数据发布者

Publisher 是数据的发布者,它是一个函数式接口,其中只有一个方法,该方法可以配置其对应的消费者,源码如下:

    @FunctionalInterfacepublic static interface Publisher<T> {public void subscribe(Subscriber<? super T> subscriber);}

Subscriber 数据消费者

Subscriber 负责消费数据,其内部定义了 4 个方法,源码如下:

public static interface Subscriber<T> {public void onSubscribe(Subscription subscription);public void onNext(T item);public void onError(Throwable throwable);public void onComplete();}

onSubscribe():当与消费者绑定成功时调用该方法;

onNext():当接收到一条发布数据时调用该方法;

onError():当发布者或消费者发生异常时调用该方法;

onComplete():当发布者关闭且所有数据已经被全部消费后调用该方法;

Subscription 订阅关系

Subscription 定义了发布者和消费者的订阅关系,可以理解为两者之间的通道,定义源码如下:

    public static interface Subscription {public void request(long n);public void cancel();}

request():该方法用于向通道中请求 n 个数据进行处理;

cancel():该方法用于取消发布者和消费者的绑定关系;

Processor 中间处理器

Processor 是数据发布者与消费者中间的处理器,可以对发布者发布的数据进行预处理后再发送给消费者进行消费。

实际上其既是发布者,又是消费者,即在两者中间进行了一次数据的处理转发,其源码如下:

    public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {}

消息发布订阅示例代码

上述几个核心概念的基本使用代码如下,通过代码可以更好的理解 Flow 的原理

public class FlowDemo {public static void main(String[] args) {SubmissionPublisher<String> publisher = new SubmissionPublisher<>();Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {this.subscription = subscription;// 向数据发布者请求数据this.subscription.request(1);}@Overridepublic void onNext(String item) {System.out.println("接收到消息>>>" + item);// 接收数据后可以继续接收或取消订阅this.subscription.request(1);}@Overridepublic void onError(Throwable throwable) {// 产生异常后直接取消订阅this.subscription.cancel();}@Overridepublic void onComplete() {// 发布者所有数据全部被接收,且发布者已经关闭System.out.println("数据接收完毕~");}};// 将订阅者注册到发布者publisher.subscribe(subscriber);// 发布消息for (int i = 0; i < 10; i++) {// 发送数据publisher.submit(String.valueOf(i));}// 关闭发布者publisher.close();// 维持程序保持开启while (true) {}}

Processor 处理器的示例代码:

public class ProcessorDemo {public static void main(String[] args) {class DataFilter extends SubmissionPublisher<String> implements Flow.Processor<String,String>{private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {this.subscription = subscription;this.subscription.request(1);}@Overridepublic void onNext(String item) {this.submit("【处理后数据】" + item);this.subscription.request(1);}@Overridepublic void onError(Throwable throwable) {this.subscription.cancel();}@Overridepublic void onComplete() {this.close();}}SubmissionPublisher<String> publisher = new SubmissionPublisher<>();DataFilter dataFilter = new DataFilter();publisher.subscribe(dataFilter);Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {this.subscription = subscription;//向数据发布者请求一个数据this.subscription.request(1);}@Overridepublic void onNext(String item) {System.out.println("接收到消息>>>" + item);//接收完成后,可以继续接收或者不接收//this.subscription.cancel();try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}this.subscription.request(1);}@Overridepublic void onError(Throwable throwable) {//出现异常,就会来到这个方法,此时直接取消订阅即可this.subscription.cancel();}@Overridepublic void onComplete() {//发布者的所有数据都被接收,并且发布者已经关闭System.out.println("数据接收完毕~");}};dataFilter.subscribe(subscriber);for (int i = 0; i < 1000; i++) {// 发送数据System.out.println("产生数据" + i);publisher.submit(String.valueOf(i));}//关闭发布者publisher.close();// 维持程序保持开启while (true) {}}}

Backpressure 回压

Backpressure 回压是指消费能力低于生产能力时,Subscriber 会将 Publisher 发布的数据缓存在 Subscription 中,其长度默认为256,源码如下:

    static final int DEFAULT_BUFFER_SIZE = 256;public static int defaultBufferSize() {return DEFAULT_BUFFER_SIZE;}

当 Subscription 存满时,生产者将根据消费者的消费能力动态的调整数据发布的速度,以实现消费者对生产者的反向控制。

Backpressure 示例代码

运行代码,可以观察到,生产者在生产数据达到 Subscription 上限时便停止生产,然后根据消费者的消费速度动态调节生产速度。

public class FlowDemo0 {public static void main(String[] args) {SubmissionPublisher<String> publisher = new SubmissionPublisher<>();Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {this.subscription = subscription;// 向数据发布者请求数据this.subscription.request(1);}@Overridepublic void onNext(String item) {System.out.println("接收到消息>>>" + item);// 等待1秒再接收下一条数据try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}// 接收数据后可以继续接收或取消订阅this.subscription.request(1);}@Overridepublic void onError(Throwable throwable) {// 产生异常后直接取消订阅this.subscription.cancel();}@Overridepublic void onComplete() {// 发布者所有数据全部被接收,且发布者已经关闭System.out.println("数据接收完毕~");}};// 将订阅者注册到发布者publisher.subscribe(subscriber);// 发布消息for (int i = 0; i < 500; i++) {// 发送数据System.out.println("产生数据" + i);publisher.submit(String.valueOf(i));}// 关闭发布者publisher.close();// 维持程序保持开启while (true) {}}
}

Reactor – Mono/Flux API

注:下述代码在测试时,需要添加while (true) {}以维持主线程开启,避免数据还没有处理完主线程就关闭的情况。

创建 Mono/Flux

  • just():使用已知内容创建;

  • fromIterable():通过可迭代对象创建;

  • fromStream():从集合流中创建;

  • range():通过范围迭代创建;

        // 1. 创建 Flux/Mono// 1.1 使用已知内容创建 FluxFlux.just(1, 2, 3, 4, "hello", "world").subscribe(System.out::println);// 1.2 通过可迭代对象创建 FluxFlux.fromIterable(Arrays.asList(1, 2, 3, 4, 5)).subscribe(System.out::println);// 1.3 从集合流中创建 FluxFlux.fromStream(Stream.of(1,2,3,4)).subscribe(System.out::println);// 1.4 通过范围迭代创建 FluxFlux.range(0,10).subscribe(System.out::println);
  • interval():按照从 0 递增的方式自动创建;

  • delayElements():数据流延时发送方法;

        // 2. 创建时常用的方法// 2.1 interval() 方法可以用来生成从 0 开始递增的 Long 对象的数据序列Flux.interval(Duration.ofMillis(100))// 限制执行10次.take(10).subscribe(System.out::println);// 2.2 delayElements() 方法延时发送Flux.fromIterable(Arrays.asList(1, 2, 3, 4, 5)).delayElements(Duration.ofMillis(1000L)).subscribe(System.out::println);

Exception 异常处理

  • doOnError():异常监听,监听到异常的处理逻辑;

  • onErrorReturn():产生异常时返回消息给订阅者;

        Flux.just("1", "2", "3")// concatWith() 可以连接一个新的流.concatWith(Flux.error(new Exception("手动模拟异常..."))).doOnError(Throwable::printStackTrace).onErrorReturn("产生异常,返回 500...").subscribe(System.out::println);
  • subscribe():可以通过传入参数指定异常处理

    • 参数1:定义正常消费逻辑;

    • 参数2:定义异常处理逻辑;

    • 参数3:定义消费完成的逻辑;

        Flux.just("1", "2", "3")// concatWith() 可以连接一个新的流.concatWith(Flux.error(new Exception("手动模拟异常..."))).subscribe(System.out::println,System.err::println,() -> System.out.println("完成..."));
  • onErrorResume():产生异常后重新产生新的流
        Flux.just("1", "2", "3")// concatWith() 可以连接一个新的流.concatWith(Flux.error(new Exception("手动模拟异常..."))).onErrorResume(throwable -> {System.out.println(throwable.getMessage());;return Flux.just("1", "1", "1");}).subscribe(System.out::println);
  • retry():产生异常后进行重试,参数为重试次数
        Flux.just("1", "2", "3")// concatWith() 可以连接一个新的流.concatWith(Flux.error(new Exception("手动模拟异常..."))).retry(1).subscribe(System.out::println);

常用的数据处理 API

  • merge():按照所有流的实际产生顺序进行合并;

  • mergeSequential():按照流合并的次序进行合并,先消费第一个,在消费第二个;

        Flux.merge(Flux.interval(Duration.ofMillis(10)).take(5),Flux.interval(Duration.ofMillis(10)).take(3)).subscribe(System.out::println);Flux.mergeSequential(Flux.interval(Duration.ofMillis(10)).take(5),Flux.interval(Duration.ofMillis(10)).take(3)).subscribe(System.out::println);
  • buffer():将流中的元素收集为集合;

    • 可以传入两个参数,分别为 maxSize 和 skip。其中,maxSize 代表数据切割后每个集合的最大长度;skip 代表每一次切割后切割器切割起点跳跃的元素个数
  • bufferTimeout():按照时间间隔切割对流中的数据进行收集;

    • 可以传入两个参数,分别为 maxSize 和 maxTime。其中,maxSize 代表数据切割后每个集合的最大长度;maxTime 代表切割的最大时间间隔
  • bufferWhile():当 Predicate 为 true 时才收集当前元素;

  • bufferUntil():直到 Predicate 为 true 时才收集一次所有元素;

    • 当第二个参数为 true 时,满足条件的元素将作为集合的首个元素;若为 false,则满足条件的元素为最后一个元素(默认)
        Flux.range(1, 10).buffer(3, 3).subscribe(System.out::println);Flux.interval(Duration.ofMillis(100L)).bufferTimeout(9, Duration.ofMillis(1000L)).subscribe(System.out::println);Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);Flux.range(1, 10).bufferUntil(i -> i % 2 == 0, false).subscribe(System.out::println);
  • filter():按条件对流中的数据进行过滤;
        Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);
  • zipWith():把流中的元素与另一个流中对应元素进行合并,多余元素将被抛弃;

    • 第二个参数可以指定合并的规则
        Flux.just(1, 2).zipWith(Flux.just(3, 4), (s1, s2) -> s1 + "," + s2).subscribe(System.out::println);
  • take():提取指定数量的元素或按时间间隔提取元素;

  • takeLast():提取最后 n 个元素;

  • takeWhile():当 Predicate 返回 true 时才进行提取;

  • takeUntil():提取元素直到 Predicate 返回 true;

        Flux.range(1, 1000).take(10).subscribe(System.out::println);Flux.interval(Duration.ofMillis(10)).take(Duration.ofMillis(100)).subscribe(System.out::println);Flux.range(1, 1000).takeLast(10).subscribe(System.out::println);Flux.range(1, 1000).takeWhile(i -> i < 20).subscribe(System.out::println);Flux.range(1, 1000).takeUntil(i -> i > 100).subscribe(System.out::println);
  • reduce():对流中数据进行规约聚合;

  • reduceWith():对流中数据进行规约聚合,第一个参数可以通过 Supplier 设置初始值;

        Flux.range(1, 100).reduce(Integer::sum).subscribe(System.out::println);Flux.range(1, 100).reduceWith(() -> 100, Integer::sum).subscribe(System.out::println);
  • map():将流中的元素依次进行映射处理;

  • flatMap():将流中的每一个元素看作一个新的流进行处理并按实际生产顺序进行合并;

  • flatMapSequential():按订阅顺序进行合并;

        Flux.range(1, 100).map(x -> {return x / 2;}).subscribe(System.out::println);Flux.just(5, 10).flatMap(x ->Flux.interval(Duration.ofMillis(x * 10)).take(x)).subscribe(System.out::println);Flux.just(5, 10).flatMapSequential(x ->Flux.interval(Duration.ofMillis(x * 10)).take(x)).subscribe(System.out::println);
  • skip():跳过指定条数或跳过指定时间间隔;
        Flux.just(1, 2, 3, 4, 5, 6, 7).skip(2).subscribe(System.out::println);Flux.interval(Duration.ofMillis(100)).skip(Duration.ofMillis(300)).subscribe(System.out::println);
  • distinct():去重处理;
        Flux.just(1, 1, 2, 2, 5, 6, 7).distinct().subscribe(System.out::println);
  • last():取流中的最后一个元素;

  • next():取流中的第一个元素;

        Flux.just(1, 2, 3, 4, 5).last().subscribe(System.out::println);Flux.just(1, 2, 3, 4, 5).next().subscribe(System.out::println););
  • doOnError():产生异常时执行;

  • doOnComplete():数据接收完成时执行;

  • doFinally():最后执行;

        Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9).concatWith(Flux.error(new Exception()))//错误时执行.doOnError(e -> System.out.println("产生异常>>>" + e))//完成时执行.doOnComplete(() -> System.out.println("数据接收完成~"))//最后执行.doFinally(t -> System.out.println("最后执行信息>>>" + t)).subscribe(System.out::println);

参考文章

  1. Java反应式框架Reactor中的Mono和Flux
  2. 探究WebFlux之Reactor
  3. WebFlux 前置知识(四)
  4. JAVA Reactor API ( Flux和Mono)的简单使用

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.exyb.cn/news/show-4554182.html

如若内容造成侵权/违法违规/事实不符,请联系郑州代理记账网进行投诉反馈,一经查实,立即删除!

相关文章

SQL错题集

1 》》176. 第二高的薪水 - 力扣&#xff08;LeetCode&#xff09; (leetcode-cn.com) 思路&#xff1a; 先使用 order by salary(属性名) desc 对 salary 进行降序排序&#xff0c; 然后再使用 limit 1 offset 1 取出排序后表中第二行&#xff08;即第二个元组&#xff09; 以…

c语言 错题集

文章目录1宏定义 #define PI 3.14159 中&#xff0c;用宏名PI代替一个2.字符串"\x54BNHGb13,2m"的长度为3&#xff0c;程序输出4,字符串的错误赋值5.浮点数参与运算问题6.指针指向数组元素7,错误语句8,&#xff0c;-联系运算 n n - n * n9, -- 运算符10&#xff0c;…

软考刷题错题集

希赛网软件设计师每日一练&#xff08;2022.10.11&#xff09; 1.结构化分析方法中&#xff0c;数据流图中的元素在&#xff08;&#xff09;中定义 A.加工逻辑 B.实体联系图 C.流程图 D.数据字典 知识点类型&#xff1a;软件工程 正确&#xff1a;D 我&#xff1a;B2.下…

Java错题集

&#x1f4af;错题集 JavaSE 执行顺序 题目1、 静态代码块的执行顺序&#xff0c;先初始化子类&#xff0c;要调用父类的实例化初始化块Base,再new一个父类对象&#xff0c;执行代码块>Base&#xff0c;最终输出BaseBase 题目2、 考察Switch的基本用法&#xff0c;注意每…

javascript错题集

正确答案: A 解析 首先需要明白addEventListener的回调函数的函数上下文为触发事件的元素的引用&#xff0c;如无意外则change方法改变的flag是节点上的flag&#xff0c;和button无关&#xff0c;但是由于change是箭头函数&#xff0c;this与构造函数绑定在一起了&#xf…

【笔记】错题集

文章目录错题集 - java - 在线测试基础 jvm- 格式 常识- - 1&#xff1a;可以用作标识符的是 - 【易误】- - 2 &#xff1a; 下列数组的声明有哪些是对的&#xff1f; - 【易误】- String- - 1 &#xff1a; 编译 String String”String” - 【好题】- - 2&#xff1a;String…

Vivado2018.3安装及注册指南-安装包获取

一、vivado 介绍 vivado设计套件 是FPGA 厂商赛灵思&#xff08;Xilinx&#xff09;公司最新的为其产品定制的集成开发环境&#xff0c;支持Block Design、Verilog、VHDL等多种设计输入方式&#xff0c;内嵌综合器以及仿真器&#xff0c;可以完成从设计输入、综合适配、仿真到…

微信小程序九宫格抽奖

效果图比较卡顿&#xff0c;真实运行效果是旋转的 用到的素材&#xff1a; 实现步骤&#xff1a; 实现原理 改变每一项的透明度实现选中效果。利用setTimeOut时间使旋转速度越来越慢。达到慢慢停止的效果。实际应用中可以将9张奖品图片和中奖项均通过接口返回。以方便奖品的调…