首頁 > 軟體

Flink 側流輸出原始碼範例解析

2022-09-16 22:04:50

Flink 側流輸出原始碼解析

Flink 的 side output 為我們提供了側流(分流)輸出的功能,根據條件可以把一條流分為多個不同的流,之後做不同的處理邏輯,下面就來看下側流輸出相關的原始碼。

先來看下面的一個 Demo,一個流被分成了 3 個流,一個主流,兩個側流輸出。

SingleOutputStreamOperator<JasonLeePOJO> process =
        kafka_source1.process(
                new ProcessFunction<JasonLeePOJO, JasonLeePOJO>() {
                    @Override
                    public void processElement(
                            JasonLeePOJO value,
                            ProcessFunction<JasonLeePOJO, JasonLeePOJO>.Context ctx,
                            Collector<JasonLeePOJO> out)
                            throws Exception {
                        // 這個是主流輸出
                        if (value.getName().equals("flink")) {
                            out.collect(value);
                        // 下面兩個是測流輸出
                        } else if (value.getName().equals("spark")) {
                            ctx.output(test, value);
                        // 測流
                        } else if (value.getName().equals("hadoop")) {
                            ctx.output(test1, value);
                        }
                    }
                });

為了更加清楚的檢視每一個運算元,我禁用了 operator chain,任務的 DAG 圖如下所示:

這樣就比較清晰了,很明顯從 process 運算元開始,1 個資料流分為了 3 個資料流,當然,在預設情況下沒有禁止

operator chain 所有的運算元都是 chain 在一起的。

原始碼解析

我們先來看第一個主流輸出也就是 out.collect(value) 的原始碼,這裡的 out 實際上是 TimestampedCollector 物件。

TimestampedCollector#collect

@Override
public void collect(T record) {
    output.collect(reuse.replace(record));
}

在 collect 方法中持有一個 output 物件,用來輸出資料,在這裡實際上是一個 CountingOutput 它是一個包裝了 Output 的物件,主要用於更新傳送資料的 metric,並輸出資料。

CountingOutput#collect

@Override
public void collect(StreamRecord<OUT> record) {
    numRecordsOut.inc();
    output.collect(record);
}

在 CountingOutput 中也持有一個 output 物件,但是這裡的 output 是 BroadcastingOutputCollector 物件,從名字就可以看出它是往下游廣播資料的,這裡就有一個疑問?把資料廣播到下游,那豈不是下游的每個資料流都有這條資料嗎?這樣的話是怎麼實現分流的呢?帶著這個疑問,我們來看 BroadcastingOutputCollector 的 collect 方法是怎麼實現的。

BroadcastingOutputCollector#collect

@Override
public void collect(StreamRecord<T> record) {
    // 這裡的 outputs 陣列有三個 output 分別對應上面的三個輸出流
    for (Output<StreamRecord<T>> output : outputs) {
        output.collect(record);
    }
}

在 BroadcastingOutputCollector 物件裡也持有一個 output 物件,其實他們都實現了 Output 介面,用來往下游傳送資料,這裡的 outputs 是一個 Output 陣列,代表了下游的所有 Output,因為上面有三個輸出流,所以陣列裡面就包含了 3 個 Output 物件。

迴圈的呼叫 output 的 collect 方法往下游傳送資料,因為我打斷了 operator chain,所以 process 運算元和下游的 Print 運算元不在同一個 operatorChain 內,那麼上下游運算元之間資料傳輸用的就是 RecordWriterOutput,否則用的是 CopyingChainingOutput 或者 ChainingOutput,具體使用的是哪個 Output 這裡就不多介紹了,後面有時間的話會單獨介紹。

RecordWriterOutput#collect

@Override
public void collect(StreamRecord<OUT> record) {
    // 主流是沒有 outputTag 的,只有測流有 outputTag
    if (this.outputTag != null) {
        // we are not responsible for emitting to the main output.
        return;
    }

    pushToRecordWriter(record);
}

接著來看 RecordWriterOutput 的 collect 方法,在 collect 方法裡面會先判斷 outputTag 是否為空,如果不為空不做任何處理,直接返回,否則就把資料推播到下游運算元,只有側流輸出才需要定義 outputTag,主流(正常流)是沒有 outputTag 的,所以這裡會走 pushToRecordWriter 方法把資料寫入到下游,也就是說雖然會以廣播的形式把資料廣播到所有下游,但其實另外兩個側流是直接返回的,只有主流才會把資料推播到下游,這也就解釋了上面的疑問。

然後再來看第二個側流輸出 ctx.output(test, value) 的原始碼,這裡的 ctx 實際上是 ProcessOperator#ContextImpl 物件。

ProcessOperator#ContextImpl#output

@Override
public <X> void output(OutputTag<X> outputTag, X value) {
    if (outputTag == null) {
        throw new IllegalArgumentException("OutputTag must not be null.");
    }
    output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
}

如果 outputTag 是空,直接丟擲異常,因為這個是側流,所以必須要定義 OutputTag。這裡的 output 實際上是父類別 AbstractStreamOperator 所持有的變數,如果 outputTag 不為空,就呼叫 output 的 collect 方法把資料傳送到下游,這裡的 output 和上面的一樣是 CountingOutput 但是 collect 方法是另外一個過載的方法。

CountingOutput#collect

@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
    numRecordsOut.inc();
    output.collect(outputTag, record);
}

可以發現,這個 collect 方法比上面那個多了一個 OutputTag 引數,也就是使用側流輸出的時候定義的 OutputTag 物件,然後呼叫 output 的 collect 方法傳送資料,這個也和上面的一樣,同樣是 BroadcastingOutputCollector 物件的另外一個過載方法,多了一個 OutputTag 引數。

BroadcastingOutputCollector#collect

@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
    for (Output<StreamRecord<T>> output : outputs) {
        output.collect(outputTag, record);
    }
}

這裡的邏輯和上面是一樣的,同樣的迴圈呼叫 collect 方法傳送資料。

RecordWriterOutput#collect

@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
    // 先要判斷兩個 OutputTag 是否一樣
    if (OutputTag.isResponsibleFor(this.outputTag, outputTag)) {
        pushToRecordWriter(record);
    }
}

在這個 collect 方法中會先判斷傳入的 OutputTag 物件和成員變數 this.outputTag 是不是相等,如果是的話,就傳送資料,否則不做任何處理,所以這裡每次只會選擇一個下游側流輸出資料,這樣就實現了所謂的分流。

OutputTag#isResponsibleFor

public static boolean isResponsibleFor(
        @Nullable OutputTag<?> owner, @Nonnull OutputTag<?> other) {
    return other.equals(owner);
}

可以看到在 isResponsibleFor 方法內是直接呼叫 OutputTag 的 equals 方法判斷兩個物件是否相等的。

第三個側流 test1 ctx.output(test1, value) 和第二個側流 test 是完全一樣的情況,這裡就不在看程式碼了。

上面是完成了分流操作,那怎麼獲取到分流後結果呢(資料流)?我們可以通過 getSideOutput 方法獲取。

DataStream<JasonLeePOJO> sideOutput = process.getSideOutput(test);
DataStream<JasonLeePOJO> sideOutput1 = process.getSideOutput(test1);

getSideOutput 原始碼

public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
    sideOutputTag = clean(requireNonNull(sideOutputTag));

    // make a defensive copy
    sideOutputTag = new OutputTag<X>(sideOutputTag.getId(), sideOutputTag.getTypeInfo());

    TypeInformation<?> type = requestedSideOutputs.get(sideOutputTag);
    if (type != null && !type.equals(sideOutputTag.getTypeInfo())) {
        throw new UnsupportedOperationException(
                "A side output with a matching id was "
                        + "already requested with a different type. This is not allowed, side output "
                        + "ids need to be unique.");
    }

    requestedSideOutputs.put(sideOutputTag, sideOutputTag.getTypeInfo());

    SideOutputTransformation<X> sideOutputTransformation =
            new SideOutputTransformation<>(this.getTransformation(), sideOutputTag);
    return new DataStream<>(this.getExecutionEnvironment(), sideOutputTransformation);
}

getSideOutput 方法裡先是構建了一個 SideOutputTransformation 物件,然後又構建了 DataStream 物件,這樣我們就可以基於分流後的 DataStream 做不同的處理邏輯了,從而實現了把一個 DataStream 分流成多個 DataStream 功能。

總結

通過對側流輸出的原始碼進行解析,在分流的時候,資料是通過廣播的方式傳送到下游運算元的,對於主流的資料來說,只有 OutputTag 為空的才會處理,側流因為 OutputTag 不為空,所以直接返回,不做任何處理,那對於側流的資料來說,是通過判斷兩個 OutputTag 是否相等,所以每次只會把資料傳送到下游對應的那一個側流上去,這樣即可實現分流邏輯。

以上就是Flink 側流輸出原始碼範例解析的詳細內容,更多關於Flink 側流輸出的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com