WebFlux 之 Mono API 教程

前面写了一些 WebFlux 方面的教程,发现很多人搜索到了我的文章,希望我能继续写一些教程。于是便有了本文。

关于 WebFlux 前面我已经反复强调过,它是一种新的编程趋势,未来会越来越流行。所以,很有必要认真的学习学习!

Fulx 和 Mono

Reactive Streams 是一个标准或者说是规范,由 Netflix、TypeSafe、Pivotal 等公司发起的。Reactor 是 Spring 中的一个子项目,一个基于 java 的响应式编程框架,该框架实现了 Reactive Programming(反应式编程即响应式编程) 思想,符合 Reactive Streams 规范。Mono 和 Flux 是 Reactor 中非常重要的两个类。

Reactive Streams 是规范,Reactor 实现了 Reactive Streams。Web Flux 以 Reactor 为基础,实现 Web 领域的反应式编程框架。

Mono和Flux都是Publisher(发布者)。

其实,对于大部分业务开发人员来说,当编写反应式代码时,我们通常只会接触到 Publisher 这个接口,对应到 Reactor 便是 Mono 和 Flux。对于 Subscriber 和 Subcription 这两个接口,Reactor 必然也有相应的实现。但是,这些都是 Web Flux 和 Spring Data Reactive 这样的框架用到的。如果不开发中间件,通常开发人员是不会接触到的。

比如,在 Web Flux,你的方法只需返回 Mono 或 Flux 即可。你的代码基本也只和 Mono 或 Flux 打交道。而 Web Flux 则会实现 Subscriber ,onNext 时将业务开发人员编写的 Mono 或 Flux 转换为 HTTP Response 返回给客户端。

Mono 实现了 org.reactivestreams.Publisher 接口,代表0到1个元素的发布者(Publisher)。

根据上面的介绍,可以得出 Mono  相当于 Optional,或和 Optional 的作用类似。

@RestController
public class MonoController {

    @RequestMapping("/hello")
    public Optional<String> hello(@RequestParam(required = false) String str){
        if(StringUtils.isEmpty(str)){
            return Optional.empty();
        }
        return Optional.of(":www.xttblog.com");
    }

    @RequestMapping("/mono")
    public Mono<String> mono(@RequestParam(required = false) String str){
        if(StringUtils.isEmpty(str)){
            return Mono.empty();
        }
        return Mono.just(":www.xttblog.com");
    }
}

反应式编程框架主要采用了观察者模式,而 Spring Reactor的核心则是对观察者模式的一种衍伸。关于观察者模式的架构中被观察者(Observable)和观察者(Subscriber)处在不同的线程环境中时,由于者各自的工作量不一样,导致它们产生事件和处理事件的速度不一样,这时就出现了两种情况:

  • 被观察者产生事件慢一些,观察者处理事件很快。那么观察者就会等着被观察者发送事件好比观察者在等米下锅,程序等待)。
  • 被观察者产生事件的速度很快,而观察者处理很慢。那就出问题了,如果不作处理的话,事件会堆积起来,最终挤爆你的内存,导致程序崩溃。(好比被观察者生产的大米没人吃,堆积最后就会烂掉)。为了方便下面理解Mono和Flux,也可以理解为Publisher(发布者也可以理解为被观察者)主动推送数据给Subscriber(订阅者也可以叫观察者),如果Publisher发布消息太快,超过了Subscriber的处理速度,如何处理。这时就出现了Backpressure(背压—–指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略)

Mono 实现了 Publisher 接口,但是通过查看源码,发现它是一个抽象类。Mono 里面有很多 API,关于这些 API 的解释如下:

  • empty():创建一个不包含任何元素,只发布结束消息的序列。
  • just():可以指定序列中包含的全部元素。创建出来的 Mono序列在发布这些元素之后会自动结束。
  • justOrEmpty():从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。
  • error(Throwable error):创建一个只包含错误消息的序列。
  • never():创建一个不包含任何消息通知的序列。
  • fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。
  • delay(Duration duration)和 delayMillis(long duration):创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。
  • create():通过 create()方法来使用 MonoSink 来创建 Mono。

API 使用案例如下所示。

//empty():创建一个不包含任何元素,只发布结束消息的序列
Mono.empty().subscribe(System.out::println);
//just():可以指定序列中包含的全部元素。创建出来的 Mono序列在发布这些元素之后会自动结束。
Mono.just("www.xttblog.com").subscribe(System.out::println);
//ustOrEmpty():从一个 Optional 对象或可能为 null 的对象中创建 Mono。
//只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。
Mono.justOrEmpty(null).subscribe(System.out::println);
Mono.justOrEmpty("").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("")).subscribe(System.out::println);
//error(Throwable error):创建一个只包含错误消息的序列。
Mono.error(new RuntimeException("error")).subscribe(System.out::println, System.err::println);
//never():创建一个不包含任何消息通知的序列。
Mono.never().subscribe(System.out::println);
//通过 create()方法来使用 MonoSink 来创建 Mono。
Mono.create(sink -> sink.success("")).subscribe(System.out::println);

//通过fromRunnable创建,并实现异常处理
Mono.fromRunnable(() -> {
    System.out.println("thread run");
    throw new RuntimeException("thread run error");
}).subscribe(System.out::println, System.err::println);
//通过fromCallable创建
Mono.fromCallable(() -> "callable run ").subscribe(System.out::println);
//通过fromSupplier创建
Mono.fromSupplier(() -> "create from supplier").subscribe(System.out::println);

//delay(Duration duration)和 delayMillis(long duration):创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。
long start = System.currentTimeMillis();
Disposable disposable = Mono.delay(Duration.ofSeconds(2)).subscribe(n -> {
    System.out.println("生产数据源:"+ n);
    System.out.println("当前线程ID:"+ Thread.currentThread().getId() + ",生产到消费耗时:"+ (System.currentTimeMillis() - start));
});
System.out.println("主线程"+ Thread.currentThread().getId() + "耗时:"+ (System.currentTimeMillis() - start));
while(!disposable.isDisposed()) { }

跟我学 WebFlux

本文主要是介绍了一下 Mono 类中相关 API 的用法,关于 Flux 类,我们下章继续。

WebFlux 之 Mono API 教程

: » WebFlux 之 Mono API 教程

原创文章,作者:6024010,如若转载,请注明出处:https://blog.ytso.com/252031.html

(2)
上一篇 2022年5月3日
下一篇 2022年5月3日

相关推荐

发表回复

登录后才能评论