<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
Trigger確定視窗(由視窗分配器形成)何時準備好由視窗函數處理。每個WindowAssigner都帶有一個預設值Trigger。如果預設觸發器不符合您的需求,您可以使用trigger(…)。
public abstract class Trigger<T, W extends Window> implements Serializable { /** 只要有元素落⼊到當前窗⼝, 就會調⽤該⽅法 * @param element 收到的元素 * @param timestamp 元素抵達時間. * @param window 元素所屬的window視窗. * @param ctx ⼀個上下⽂物件,通常⽤該物件註冊 timer(ProcessingTime/EventTime) 回撥. */ public abstract TriggerResult onElement(T var1, long var2, W var4, Trigger.TriggerContext var5) throws Exception; /** * processing-time 定時器回撥函數 * * @param time 定時器觸發的時間. * @param window 定時器觸發的視窗物件. * @param ctx ⼀個上下⽂物件,通常⽤該物件註冊 timer(ProcessingTime/EventTime) 回撥. */ public abstract TriggerResult onProcessingTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception; /** * event-time 定時器回撥函數 * * @param time 定時器觸發的時間. * @param window 定時器觸發的視窗物件. * @param ctx ⼀個上下⽂物件,通常⽤該物件註冊 timer(ProcessingTime/EventTime) 回撥. */ public abstract TriggerResult onEventTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception; /** * 當 多個視窗合併到⼀個窗⼝的時候,呼叫該方法法,例如系統SessionWindow * * @param window 合併後的新視窗物件 * @param ctx ⼀個上下⽂物件,通常用該物件註冊 timer(ProcessingTime/EventTime)回撥以及存取狀態 */ public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception { throw new UnsupportedOperationException("This trigger does not support merging."); } /** * 當視窗被刪除後執⾏所需的任何操作。例如:可以清除定時器或者刪除狀態資料 */ public abstract void clear(W var1, Trigger.TriggerContext var2) throws Exception; }
public enum TriggerResult { // 表示對視窗不執行任何操作。即不觸發視窗計算,也不刪除元素。 CONTINUE(false, false), // 觸發視窗計算,輸出結果,然後將視窗中的資料和視窗進行清除。 FIRE_AND_PURGE(true, true), // 觸發視窗計算,但是保留視窗元素 FIRE(true, false), // 不觸發視窗計算,丟棄視窗,並且刪除視窗的元素。 PURGE(false, true); private final boolean fire; private final boolean purge; private TriggerResult(boolean fire, boolean purge) { this.purge = purge; this.fire = fire; } public boolean isFire() { return this.fire; } public boolean isPurge() { return this.purge; } }
一旦觸發器確定視窗已準備好進行處理,就會觸發,返回狀態可以是FIRE或FIRE_AND_PURGE。其中FIRE是觸發視窗計算並保留視窗內容,而FIRE_AND_PURGE是觸發視窗計算並刪除視窗內容。預設情況下,預實現的觸發器只是簡單地FIRE不清除視窗狀態。
主要看看EventTimeTrigger和ProcessingTimeTrigger的原始碼。
public class EventTimeTrigger extends Trigger<Object, TimeWindow> { private static final long serialVersionUID = 1L; private EventTimeTrigger() { } public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { return TriggerResult.FIRE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } } public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; } public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.deleteEventTimeTimer(window.maxTimestamp()); } public boolean canMerge() { return true; } public void onMerge(TimeWindow window, OnMergeContext ctx) { long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentWatermark()) { ctx.registerEventTimeTimer(windowMaxTimestamp); } } public String toString() { return "EventTimeTrigger()"; } public static EventTimeTrigger create() { return new EventTimeTrigger(); } }
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> { private static final long serialVersionUID = 1L; private ProcessingTimeTrigger() { } public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) { ctx.registerProcessingTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE; } public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.deleteProcessingTimeTimer(window.maxTimestamp()); } public boolean canMerge() { return true; } public void onMerge(TimeWindow window, OnMergeContext ctx) { long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) { ctx.registerProcessingTimeTimer(windowMaxTimestamp); } } public String toString() { return "ProcessingTimeTrigger()"; } public static ProcessingTimeTrigger create() { return new ProcessingTimeTrigger(); } }
在 onElement()方法中,ctx.registerProcessingTimeTimer(window.maxTimestamp())將會註冊一個ProcessingTime定時器,時間引數是window.maxTimestamp(),也就是視窗的最終時間,當時間到達這個視窗最終時間,定時器觸發並呼叫 onProcessingTime()方法,在 onProcessingTime() 方法中,return TriggerResult.FIRE 即返回 FIRE,觸發視窗中資料的計算,但是會保留視窗元素。
需要注意的是ProcessingTimeTrigger類只會在視窗的最終時間到達的時候觸發視窗函數的計算,計算完成後並不會清除視窗中的資料,這些資料儲存在記憶體中,除非呼叫PURGE或FIRE_AND_PURGE,否則資料將一直存在記憶體中。實際上,Flink中提供的Trigger類,除了PurgingTrigger類,其他的都不會對視窗中的資料進行清除。
TumblingEventTimeWindows :EventTimeTrigger public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> { public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); } }
TumblingProcessingTimeWindows :ProcessingTimeTrigger
public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> { public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); } }
SlidingEventTimeWindows:EventTimeTrigger public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> { public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); } }
SlidingProcessingTimeWindows :ProcessingTimeTrigger
public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> { public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); } }
EventTimeSessionWindows:EventTimeTrigger public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> { public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); } }
ProcessingTimeSessionWindows:ProcessingTimeTrigger
public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> { public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); } }
GlobalWindows :NeverTrigger public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> { public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return new GlobalWindows.NeverTrigger(); } }
到此這篇關於Java Flink視窗觸發器Trigger的用法詳解的文章就介紹到這了,更多相關Java Flink視窗觸發器內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45