<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
呼叫多個平級服務,按照服務優先順序返回第一個有效資料。
具體case:一個頁面可能有很多的彈窗,彈窗之間又有優先順序。每次只需要返回第一個有資料的彈窗。但是又希望所有彈窗之間的資料獲取是非同步的。這種場景使用 Reactor 怎麼實現呢?
public interface TestServiceI { Mono request(); }
提供一個 request 方法,返回一個 Mono 物件。
@Data @ToString @AllArgsConstructor @NoArgsConstructor public class TestUser { private String name; }
@Slf4j public class TestServiceImpl1 implements TestServiceI { @Override public Mono request() { log.info("execute.test.service1"); return Mono.fromSupplier(() -> { try { System.out.println("service1.threadName=" + Thread.currentThread().getName()); Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } return ""; }) .map(name -> { return new TestUser(name); }); } }
第一個 service 執行耗時 500ms。返回空物件;
建立第二個 service 執行耗時 1000ms。返回空物件;程式碼如上,改一下sleep時間即可。
繼續建立第三個 service 執行耗時 1000ms。返回 name3。程式碼如上,改一下 sleep 時間,以及返回為 name3。
public static void main(String[] args) { long startTime = System.currentTimeMillis(); TestServiceI testServiceImpl4 = new TestServiceImpl4(); TestServiceI testServiceImpl5 = new TestServiceImpl5(); TestServiceI testServiceImpl6 = new TestServiceImpl6(); List<TestServiceI> serviceIList = new ArrayList<>(); serviceIList.add(testServiceImpl4); serviceIList.add(testServiceImpl5); serviceIList.add(testServiceImpl6); // 執行 service 列表,這樣有多少個 service 都可以 Flux<Mono<TestUser>> monoFlux = Flux.fromIterable(serviceIList) .map(service -> { return service.request(); }); // flatMap(或者flatMapSequential) + map 實現異常繼續下一個執行 Flux flux = monoFlux.flatMapSequential(mono -> { return mono.map(user -> { TestUser testUser = JsonUtil.parseJson(JsonUtil.toJson(user), TestUser.class); if (Objects.nonNull(testUser) && StringUtils.isNotBlank(testUser.getName())) { return testUser; } // null 在 reactor 中是異常資料。 return null; }) .onErrorContinue((err, i) -> { log.info("onErrorContinue={}", i); }); }); Mono mono = flux.elementAt(0, Mono.just("")); Object block = mono.block(); System.out.println(block + "blockFirst 執行耗時ms:" + (System.currentTimeMillis() - startTime)); }
執行輸出:
20:54:26.512 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
20:54:26.553 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1
service1.threadName=main
20:54:27.237 [main] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
20:54:27.237 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
service5.threadName=main
20:54:28.246 [main] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
20:54:28.246 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
service6.threadName=main
TestUser(name=name3)blockFirst 執行耗時ms:2895
總結:這樣實現按照順序返回第一個正常資料。但是執行並沒有非同步。下一步:如何實現非同步呢?
修改 service 實現。增加 .subscribeOn(Schedulers.boundedElastic())
如下:
@Slf4j public class TestServiceImpl1 implements TestServiceI { @Override public Mono request() { log.info("execute.test.service1"); return Mono.fromSupplier(() -> { try { System.out.println("service1.threadName=" + Thread.currentThread().getName()); Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } return ""; }) //增加subscribeOn .subscribeOn(Schedulers.boundedElastic()) .map(name -> { return new TestUser(name); }); } }
再次執行輸出如下:
21:02:04.213 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
21:02:04.265 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1
service4.threadName=boundedElastic-1
21:02:04.300 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
21:02:04.302 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
service2.threadName=boundedElastic-2
service3.threadName=boundedElastic-3
21:02:04.987 [boundedElastic-1] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
21:02:05.307 [boundedElastic-2] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
TestUser(name=name6)blockFirst 執行耗時ms:1242
boundedElastic
;修改 service 實現,使用 CompletableFuture 執行耗時操作(這裡是sleep,具體到專案中可能是外部介面呼叫,DB 操作等);然後使用 Mono.fromFuture 返回 Mono 物件。
@Slf4j public class TestServiceImpl1 implements TestServiceI{ @Override public Mono request() { log.info("execute.test.service1"); CompletableFuture<String> uCompletableFuture = CompletableFuture.supplyAsync(() -> { try { System.out.println("service1.threadName=" + Thread.currentThread().getName()); Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } return "testname1"; }); return Mono.fromFuture(uCompletableFuture).map(name -> { return new TestUser(name); }); } }
執行返回如下:
21:09:59.465 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
21:09:59.510 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
service2.threadName=ForkJoinPool.commonPool-worker-1
21:09:59.526 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
service3.threadName=ForkJoinPool.commonPool-worker-2
21:09:59.526 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1
service1.threadName=ForkJoinPool.commonPool-worker-3
21:10:00.526 [ForkJoinPool.commonPool-worker-1] INFO com.geniu.reactor.TestReactorOrder - onErrorContinue=TestUser(name=)
21:10:00.538 [ForkJoinPool.commonPool-worker-2] INFO com.geniu.reactor.TestReactorOrder - onErrorContinue=TestUser(name=)
TestUser(name=testname1)blockFirst 執行耗時ms:1238
到此這篇關於Reactor 多工並行執行且結果按順序返回第一個的文章就介紹到這了,更多相關Reactor 多工執行內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45