Reactor接口之四
interval
@Test
public void testInterval() {
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux.range(1,10)
.zipWith(Flux.interval(Duration.ofSeconds(1)))
.subscribe(System.out::println, null ,countDownLatch::countDown);
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
interval定时生成一个Flux
[1,0]
[2,1]
[3,2]
[4,3]
[5,4]
[6,5]
[7,6]
[8,7]
[9,8]
[10,9]
@Test
public void testInterval1() {
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux.interval(Duration.ofSeconds(1))
.zipWith(Flux.range(1,10))
.subscribe(System.out::println, null ,countDownLatch::countDown);
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
每个1秒输出一个Flux
[0,1]
[1,2]
[2,3]
[3,4]
[4,5]
[5,6]
[6,7]
[7,8]
[8,9]
[9,10]
@Test
public void testInterval2() {
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux.interval(Duration.ofSeconds(1), Duration.ofMillis(200))
.zipWith(Flux.range(1,10))
.subscribe(System.out::println, null ,countDownLatch::countDown);
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
和上面类似。Flux.interval第一个参数是第一次调用时延迟时间,第二个参数是定时隔间时间。
sort
@Test
public void testSort() {
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux.create(t -> {
for (int i = 0; i < 10; i++) {
t.next(random.nextInt(100));
}
t.complete();
})
.sort()
.subscribe(System.out::println,null, countDownLatch::countDown);
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
排序。
all
@Test
public void testAll() {
Flux.create(t -> {
for (int i = 0; i < 10; i++) {
t.next(random.nextInt(100));
}
t.complete();
})
.all(k -> (int)k > 50)
.subscribe(System.out::println);
}
all表示所有元素都满足条件则返回true,否则返回false。
any
@Test
public void testAny() {
Flux.create(t -> {
for (int i = 0; i < 10; i++) {
t.next(random.nextInt(100));
}
t.complete();
})
.any(k -> (int)k > 50)
.subscribe(System.out::println);
}
any表示有一个元素返回true则返回true,否则返回false。
buffer
@Test
public void testBuffer() {
Flux.range(1,10)
.buffer()
.subscribe(System.out::println);
}
buffer将所有传入的值收集到单个List缓冲区中,该缓冲区将在该Flux完成后由返回的Flux发出。
cache
@Test
public void testCache() {
Flux.range(1,10)
.cache()
.subscribe(System.out::println);
}
cache将此Flux转换为一个热点源,并缓存最后发出的信号以供下一个Subscriber使用。将保留无限量的onNext信号。完成和错误也将被重播。
collectSortedList
@Test
public void testClooectSortedList() {
Flux.create(t -> {
for (int i = 0; i < 10; i++) {
t.next(random.nextInt(100));
}
t.complete();
})
.collectSortedList()
.subscribe(System.out::println);
}
collectSortedList排序并组合成List。