首頁 > 軟體

Java Flink視窗觸發器Trigger的用法詳解

2022-07-08 18:03:08

定義

Trigger確定視窗(由視窗分配器形成)何時準備好由視窗函數處理。每個WindowAssigner都帶有一個預設值Trigger。如果預設觸發器不符合您的需求,您可以使用trigger(…)。

Trigger 原始碼

public abstract class Trigger<T, W extends Window> implements Serializable {
	/**
	 只要有元素落⼊到當前窗⼝, 就會調⽤該⽅法
	 * @param element 收到的元素
	 * @param timestamp 元素抵達時間.
	 * @param window 元素所屬的window視窗.
	 * @param ctx ⼀個上下⽂物件,通常⽤該物件註冊 timer(ProcessingTime/EventTime) 回撥.
	 */
    public abstract TriggerResult onElement(T var1, long var2, W var4, Trigger.TriggerContext var5) throws Exception;
	
	 /**
	 * processing-time 定時器回撥函數
	 *
	 * @param time 定時器觸發的時間.
	 * @param window 定時器觸發的視窗物件.
	 * @param ctx ⼀個上下⽂物件,通常⽤該物件註冊 timer(ProcessingTime/EventTime) 回撥.
	 */
    public abstract TriggerResult onProcessingTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;

	/**
	 * event-time 定時器回撥函數
	 *
	 * @param time 定時器觸發的時間.
	 * @param window 定時器觸發的視窗物件.
	 * @param ctx ⼀個上下⽂物件,通常⽤該物件註冊 timer(ProcessingTime/EventTime) 回撥.
	 */
    public abstract TriggerResult onEventTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;

	 /**
	 * 當 多個視窗合併到⼀個窗⼝的時候,呼叫該方法法,例如系統SessionWindow
	 *
	 * @param window 合併後的新視窗物件
	 * @param ctx ⼀個上下⽂物件,通常用該物件註冊 timer(ProcessingTime/EventTime)回撥以及存取狀態
	 */
    public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
        throw new UnsupportedOperationException("This trigger does not support merging.");
    }
	
	/**
	 * 當視窗被刪除後執⾏所需的任何操作。例如:可以清除定時器或者刪除狀態資料
	 */
    public abstract void clear(W var1, Trigger.TriggerContext var2) throws Exception;
    }

TriggerResult 原始碼

public enum TriggerResult {
	// 表示對視窗不執行任何操作。即不觸發視窗計算,也不刪除元素。
    CONTINUE(false, false),
    // 觸發視窗計算,輸出結果,然後將視窗中的資料和視窗進行清除。
    FIRE_AND_PURGE(true, true),
    // 觸發視窗計算,但是保留視窗元素
    FIRE(true, false),
    // 不觸發視窗計算,丟棄視窗,並且刪除視窗的元素。
    PURGE(false, true);

    private final boolean fire;
    private final boolean purge;

    private TriggerResult(boolean fire, boolean purge) {
        this.purge = purge;
        this.fire = fire;
    }

    public boolean isFire() {
        return this.fire;
    }

    public boolean isPurge() {
        return this.purge;
    }
}

一旦觸發器確定視窗已準備好進行處理,就會觸發,返回狀態可以是FIRE或FIRE_AND_PURGE。其中FIRE是觸發視窗計算並保留視窗內容,而FIRE_AND_PURGE是觸發視窗計算並刪除視窗內容。預設情況下,預實現的觸發器只是簡單地FIRE不清除視窗狀態。

Flink 預置的Trigger

  • EventTimeTrigger:通過對比EventTime和視窗的Endtime確定是否觸發視窗計算,如果EventTime大於Window EndTime則觸發,否則不觸發,視窗將繼續等待。
  • ProcessTimeTrigger:通過對比ProcessTime和視窗EndTme確定是否觸發視窗,如果ProcessTime大於EndTime則觸發計算,否則視窗繼續等待。
  • ContinuousEventTimeTrigger:根據間隔時間週期性觸發視窗或者Window的結束時間小於當前EndTime觸發視窗計算。
  • ContinuousProcessingTimeTrigger:根據間隔時間週期性觸發視窗或者Window的結束時間小於當前ProcessTime觸發視窗計算。
  • CountTrigger:根據接入資料量是否超過設定的闕值判斷是否觸發視窗計算。
  • DeltaTrigger:根據接入資料計算出來的Delta指標是否超過指定的Threshold去判斷是否觸發視窗計算。
  • PurgingTrigger:可以將任意觸發器作為引數轉換為Purge型別的觸發器,計算完成後資料將被清理。
  • NeverTrigger:任何時候都不觸發視窗計算

主要看看EventTimeTrigger和ProcessingTimeTrigger的原始碼。

EventTimeTrigger原始碼

public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private EventTimeTrigger() {
    }

    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    public boolean canMerge() {
        return true;
    }

    public void onMerge(TimeWindow window, OnMergeContext ctx) {
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
            ctx.registerEventTimeTimer(windowMaxTimestamp);
        }

    }

    public String toString() {
        return "EventTimeTrigger()";
    }

    public static EventTimeTrigger create() {
        return new EventTimeTrigger();
    }
}

ProcessingTimeTrigger原始碼

public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private ProcessingTimeTrigger() {
    }

    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
        return TriggerResult.FIRE;
    }

    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteProcessingTimeTimer(window.maxTimestamp());
    }

    public boolean canMerge() {
        return true;
    }

    public void onMerge(TimeWindow window, OnMergeContext ctx) {
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
            ctx.registerProcessingTimeTimer(windowMaxTimestamp);
        }

    }

    public String toString() {
        return "ProcessingTimeTrigger()";
    }

    public static ProcessingTimeTrigger create() {
        return new ProcessingTimeTrigger();
    }
}

在 onElement()方法中,ctx.registerProcessingTimeTimer(window.maxTimestamp())將會註冊一個ProcessingTime定時器,時間引數是window.maxTimestamp(),也就是視窗的最終時間,當時間到達這個視窗最終時間,定時器觸發並呼叫 onProcessingTime()方法,在 onProcessingTime() 方法中,return TriggerResult.FIRE 即返回 FIRE,觸發視窗中資料的計算,但是會保留視窗元素。

需要注意的是ProcessingTimeTrigger類只會在視窗的最終時間到達的時候觸發視窗函數的計算,計算完成後並不會清除視窗中的資料,這些資料儲存在記憶體中,除非呼叫PURGE或FIRE_AND_PURGE,否則資料將一直存在記憶體中。實際上,Flink中提供的Trigger類,除了PurgingTrigger類,其他的都不會對視窗中的資料進行清除。

常見視窗的Trigger

捲動視窗

TumblingEventTimeWindows :EventTimeTrigger
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return EventTimeTrigger.create();
        }
}

TumblingProcessingTimeWindows :ProcessingTimeTrigger

public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return ProcessingTimeTrigger.create();
    }
}

滑動視窗

SlidingEventTimeWindows:EventTimeTrigger
public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return EventTimeTrigger.create();
    }
}

SlidingProcessingTimeWindows :ProcessingTimeTrigger

public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return ProcessingTimeTrigger.create();
        }
}

對談視窗

EventTimeSessionWindows:EventTimeTrigger
public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return EventTimeTrigger.create();
    }
}

ProcessingTimeSessionWindows:ProcessingTimeTrigger

public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return ProcessingTimeTrigger.create();
    }
}

全域性視窗

GlobalWindows :NeverTrigger
public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
     public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return new GlobalWindows.NeverTrigger();
        }
}

到此這篇關於Java Flink視窗觸發器Trigger的用法詳解的文章就介紹到這了,更多相關Java Flink視窗觸發器內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com