<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
在響應式程式設計中,多執行緒非同步性成為天然的內在,多執行緒之間的切換也成為原生的,在處理一個資料流Flux/Mono時,基本無法知道是執行在哪個執行緒上或哪個執行緒池裡,可以說,每一個操作符operator以及內部的函數都可能執行在不同的執行緒上。這就意味著,以前用ThreadLocal來作為方法間透明傳遞共用變數的方式不再行得通。為此,Reactor提供了Context來替代ThreadLocal實現一個跨執行緒的共用變數的透明方式。
本文會從以下幾個方面來介紹Context的相關知識:
static String KEY = "TEST_CONTEXT_KEY"; static String KEY2 = "TEST_CONTEXT_KEY2"; public static void main(String[] args) { Flux<String> flux = convert("hello", Flux.just(1, 2, 3)); flux .subscriberContext(Context.of(KEY, "Outside")) .subscribe(v -> System.out.println(v)); } public static Flux<String> convert(String prefix, Flux<Integer> publisher) { return publisher.map(v -> prefix + " " + v) .subscriberContext(Context.of(KEY, "NotUsed")) .flatMap(v -> Mono.subscriberContext().map(ctx -> ctx.get(KEY) + " " + ctx.get(KEY2) + " " + v)) .subscriberContext(context -> context.put(KEY2, "Inside")) .flatMap(v -> Mono.subscriberContext().map(ctx -> ctx.get(KEY) + " " + v)); }
上面是context的使用方案介紹,其輸出如下:
Outside Outside Inside hello 1
Outside Outside Inside hello 2
Outside Outside Inside hello 3
上面的使用案例展示了一個使用context的常見例子。通過在外部方法裡傳入context,如flux.subscriberContext(Context.of(KEY, "Outside"))
,使得內部方法convert能夠獲取外界環境的context,同時內部方法還可以增加自己的context資料,如subscriberContext(context -> context.put(KEY2, "Inside"))
,結合之後,在讓內部的方法(flatMap裡的方法)感知到整個上下文context的資料內容。
對於context的使用,主要分為幾個部分: 1. context的建立 2. context的寫入(傳入)與讀取 3. 執行順序
1. context —— 不可變物件
由於reactor天然是跨執行緒的,所以context設計為了不可變的物件,即每次的更新都是建立一個新的物件。每次的put/putAll操作,都是先把舊物件的值複製到新物件,然後再進行put/putAll等更新操作。
2. context的寫入與讀取
context寫入是使用subscriberContext方法,其入參有兩種形式:傳值方式subscriberContext(ctx)與lambda函數方式 —— subscriberContext(ctx -> ctx.put(key,value))。
context的讀取是利用Mono的靜態方法subscriberContext()來獲取,由於其返回的是一個Mono, 所以通常與flatMap結合使用。
3. 執行順序
context的傳入是發生在subscribe()訂閱階段的,所以其寫入的順序是從下往上的,即在範例中,先執行subscriberContext(Context.of(KEY, "Outside"))
,再執行subscriberContext(context -> context.put(KEY2, "Inside"))
, 最後執行subscriberContext(Context.of(KEY, "NotUsed"))
。在訂閱階段執行完後,進入執行階段,資料流從上往下執行,每次讀取context的時候Mono.subscriberContext()
都是讀取下一個的context。所以"NotUsed"的context並沒有生效。
此外,context.put()操作是複製舊的再update新的物件,所以Mono.subscriberContext().map(ctx -> ctx.get(KEY) + " " + ctx.get(KEY2) + " " + v)
這個階段仍能讀取前一個context關於KEY的內容。
總結
FluxContextStart
的物件注意
subscriberContext(Context.of("Outside")
與subscriberContext(context -> Context.of("Outside"))
是有區別,前者是會結合複用前面的context,而後者是直接返回一個新的context並不會複用前面的context。 其原因是,subscriberContext(Context.of("Outside"))
其實內部呼叫的是subscriberContext(context -> context.putAll(Context.of("Outside"))
,其入參的context就是前面的context,putAll方法會複用前面的context。而 subscriberContext(context -> Context.of("Outside"))不復用的原因就是因為放棄了入參的context。所以,可以利用這種方式來放棄之前的context,當然不鼓勵這麼做,因為你不清楚之前context會不會影響後續的程式。
本文章的程式碼用的事reactor 3.3的版本,自3.5之後,subscriberContext方法改為contextWrite
,讀取的方法改為deferContextual
。
現在我們從原始碼上看看,context寫入為什麼是自下而上的,讀取的時候又是依附於下一個subscriber並且自上而下的。
public final Flux<T> subscriberContext(Function<Context, Context> doOnContext) { return new FluxContextStart<>(this, doOnContext); } FluxContextStart(Flux<? extends T> source, Function<Context, Context> doOnContext) { super(source); this.doOnContext = Objects.requireNonNull(doOnContext, "doOnContext"); } @Override public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) { Context c = doOnContext.apply(actual.currentContext()); return new ContextStartSubscriber<>(actual, c); } ContextStartSubscriber(CoreSubscriber<? super T> actual, Context context) { this.actual = actual; this.context = context; if (actual instanceof ConditionalSubscriber) { this.actualConditional = (ConditionalSubscriber<? super T>) actual; } else { this.actualConditional = null; } } @Override public Context currentContext() { return this.context; }
上面擷取了subscriberContext方法的原始碼,可以看到subscriberContext方法最終會建立ContextStartSubscriber的物件,並將生成的context賦值Context c = doOnContext.apply(actual.currentContext())
,所以context是伴隨subscriberContext方法對應的subscriber裡的。
由於context賦值操作Context c = doOnContext.apply(actual.currentContext())
是發生在subscribeOrReturn方法裡,即發生在subscribe()訂閱階段,所以整個執行的順序是自下而上的(沿著整個flow自下而上至源頭的publisher)。
那讀取context的時候為什麼是自上而下的呢?我們來看下讀取操作Mono.subscribeContext()的原始碼。
public static Mono<Context> subscriberContext() { return onAssembly(MonoCurrentContext.INSTANCE); } final class MonoCurrentContext extends Mono<Context> implements Fuseable, Scannable { static final MonoCurrentContext INSTANCE = new MonoCurrentContext(); public void subscribe(CoreSubscriber<? super Context> actual) { Context ctx = actual.currentContext(); actual.onSubscribe(Operators.scalarSubscription(actual, ctx)); } } interface InnerOperator<I, O> extends InnerConsumer<I>, InnerProducer<O> { @Override default Context currentContext() { return actual().currentContext(); } }
Mono.subscribeContext()
方法返回的是一個MonoCurrentContext的靜態物件,在訂閱subscribe時期,就會去讀取當前的context,即Context ctx = actual.currentContext()
。而對於一個InnerOperator的介面而言,其currentContext()方法會不斷尋找下一個subscriber的context,即 actual().currentContext()
,直到有哪個subscriber覆寫了currentContext方法,如先前的ContextStartSubscriber物件。對於InnerOperator介面,是大多數subscriber都會實現的介面,例如map、filter、flatmap這些,都會實現這個介面。
在找到context之後,通過Operators.scalarSubscription(actual, ctx)
寫入,這個方法其實也是Mono.just()的實現,所以相當於把context當做value,生成了一個Mono.just(ctx)來完成了context讀取。
所以,context讀取的是從當前操作operator之後的那個最接近的subscriber的context。這也解釋了前面使用案例中,subscriberContext(Context.of(KEY, "NotUsed"))
,沒有作用的緣故。
雖然reactor提供了context來替代ThreadLocal的使用,但目前大多數的程式碼庫仍然是指令式程式設計的,使用的方式仍然是基於ThreadLocal的,如Logger裡的MDC。本小節以Logger中的MDC來介紹,如何利用context實現與舊系統中的基於ThreadLocal方式的打通。
我們假設有這樣的一個場景,每一次的Http請求都有一個trace id,我們稱為request id,並通過Http Header "X-Request-Id"來命名,列印紀錄檔的時候,希望每條紀錄檔裡都包含請求id,這樣方便跟蹤整個請求鏈路的情況。
為此,我們把紀錄檔設定裡的pattern設定為:[%X{X-Request-Id}] [%thread] %-5level - %msg %n
。
可以在SpringBoot
的application.yml
裡設定,如:
logging.pattern.level: "[%X{X-Request-Id}] [%thread] %-5level - %msg %n"
因此,要使得每條紀錄檔裡有request id,那就必須要MDC裡有key為X-Request-Id
的內容。下面來看下,reactor中是如何實現的。
@SpringBootApplication @Slf4j @RestController public class MdcApplication { public static void main(String[] args) { SpringApplication.run(MdcApplication.class, args); } private final static String X_REQUEST_ID_KEY = "X-Request-Id"; @GetMapping("/") Flux<String> split(@RequestParam("value") String value, @RequestHeader(X_REQUEST_ID_KEY) String requestId) { return Flux.fromArray(value.split("_")) .doOnEach(logWithContext(ch -> log.info("handling one item: {}", ch))) .subscriberContext(Context.of(X_REQUEST_ID_KEY, requestId)); } private static <T> Consumer<Signal<T>> logWithContext(Consumer<T> logStatement) { return signal -> { if (!signal.isOnNext()) { return; } String requestId = signal.getContext().get(X_REQUEST_ID_KEY); try (MDC.MDCCloseable closeable = MDC.putCloseable(X_REQUEST_ID_KEY, requestId)) { logStatement.accept(signal.get()); } }; } }
這是一個簡單的範例程式,對於請求輸入的value值通過"-"分割後,再一個個返回給使用者端。首先利用subscriberContext方法,將http header裡的X-Request-Id
作為context來傳入。然後利用doOnEach的方式獲取signal。doOnEach的方法可以工作在onNext、onComplete、onError等所有事件,每一個訊號signal裡都包含有context,當為onNext則還包含value值,當為onError時,則還包含有exception。因此可以通過signal來獲取context。
在從context獲取X-Request-Id後,可以利用try-with-resource方式來更新MDC,其效果是在執行完try裡面的程式後,將更新的value回退。等價於:
try { MDC.put(X_REQUEST_ID_KEY, requestId); logStatement.accept(signal.get()); } finally { MDC.remove(X_REQUEST_ID_KEY); }
置於為什麼需要操作完之後回退掉MDC中的更新,那是因為reactor中所有的操作都是非同步執行在不同執行緒中的,如果不回退的話,很有可能造成汙染,其原因還是MDC內部是用ThreadLocal實現的,所以跨執行緒的時候,如果不把ThreadLocal值清理乾淨,很容易造成互相汙染。
用curl命令傳送請求:curl --header "X-Request-Id:12345" localhost:8080?value=a_b_c
,返回的結果是abc
,列印的紀錄檔如下:
[12345] [reactor-http-nio-2] INFO - handling one item: a
[12345] [reactor-http-nio-2] INFO - handling one item: b
[12345] [reactor-http-nio-2] INFO - handling one item: c
其中12345
就是從context裡獲取到的request id。
如果想要將request id繼續貫穿後續請求流程,如請求第三方服務,可以在用webClient傳送請求的時候,把request id作為header加入到它的request請求裡,如:
Mono.subscriberContext().map(ctx -> { RequestHeadersSpec<?> request = webClient.get().uri(uri); request = request.header("X-Request-ID", ctx.get(X_REQUEST_ID_KEY)); // The rest of your request logic... });
本文介紹了reactor中context的概念,並用程式碼範例的方式介紹瞭如何使用。再然後,通過原始碼的解讀來加深對context使用規則的理解:自下而上的context寫入,以及與subscriber繫結後的自上而下的讀取。 在這之後,用以傳遞並列印紀錄檔中包含request id的一個實際例子,來介紹如何使用context與log的MDC一起使用。
雖然reactor自3.1開始提供了context來彌補無法使用ThreadLocal的不足,但與ThreaLocal相比,context仍然有不少侷限。比如使用上的不方便,要麼利用Mono.subscribeContext().map並搭配flatmap來使用,要麼需要將資料流轉化成訊號signal流來使用,總之遠不如ThreadLocal來的簡單易用。另外,context的不可變特性,雖然有助於thread safe,但使得不同方法之間無法傳遞更新,比如方法A內修改後再傳遞給方法B,因為context是唯讀的,但這在ThreadLocal上卻是輕而易舉就能實現。
好訊息的是,reactor在3.5開始,提供了新的方法deferContextual來簡化context的使用。以及提出了context view的概念來簡化context傳遞問題,感興趣的可以閱讀reactor檔案。
到此這篇關於詳解Reactor中Context的用法的文章就介紹到這了,更多相關Reactor Context內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45