首頁 > 軟體

Flink實現特定統計的歸約聚合reduce操作

2023-02-08 22:02:34

如果說簡單聚合是對一些特定統計需求的實現,那麼 reduce 運算元就是一個一般化的聚合統計操作了。從大名鼎鼎的 MapReduce 開始,我們對 reduce 操作就不陌生:它可以對已有的

資料進行歸約處理,把每一個新輸入的資料和當前已經歸約出來的值,再做一個聚合計算。與簡單聚合類似,reduce 操作也會將 KeyedStream 轉換為 DataStream。它不會改變流的元

素資料型別,所以輸出型別和輸入型別是一樣的。呼叫 KeyedStream 的 reduce 方法時,需要傳入一個引數,實現 ReduceFunction 介面。介面在原始碼中的定義如下:

@Public
@FunctionalInterface
public interface ReduceFunction<T> extends Function, Serializable {
    /**
     * The core method of ReduceFunction, combining two values into one value of the same type. The
     * reduce function is consecutively applied to all values of a group until only a single value
     * remains.
     *
     * @param value1 The first value to combine.
     * @param value2 The second value to combine.
     * @return The combined value of both input values.
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the
     *     operation to fail and may trigger recovery.
     */
    T reduce(T value1, T value2) throws Exception;
}

ReduceFunction 介面裡需要實現 reduce()方法,這個方法接收兩個輸入事件,經過轉換處理之後輸出一個相同型別的事件;所以,對於一組資料,我們可以先取兩個進行合併,然後再

將合併的結果看作一個資料、再跟後面的資料合併,最終會將它“簡化”成唯一的一個資料,這也就是 reduce“歸約”的含義。在流處理的底層實現過程中,實際上是將中間“合併的結果”

作為任務的一個狀態儲存起來的;之後每來一個新的資料,就和之前的聚合狀態進一步做歸約。

其實,reduce 的語意是針對列表進行規約操作,運算規則由 ReduceFunction 中的 reduce方法來定義,而在 ReduceFunction 內部會維護一個初始值為空的累加器,注意累加器的型別

和輸入元素的型別相同,當第一條元素到來時,累加器的值更新為第一條元素的值,當新的元素到來時,新元素會和累加器進行累加操作,這裡的累加操作就是 reduce 函數定義的運算規

則。然後將更新以後的累加器的值向下遊輸出。

我們可以單獨定義一個函數類實現 ReduceFunction 介面,也可以直接傳入一個匿名類。當然,同樣也可以通過傳入 Lambda 表示式實現類似的功能。與簡單聚合類似,reduce 操作也會將 KeyedStream 轉換為 DataStrema。它不會改變流的元素資料型別,所以輸出型別和輸入型別是一樣的。下面我們來看一個稍複雜的例子。

我們將資料流按照使用者 id 進行分割區,然後用一個 reduce 運算元實現 sum 的功能,統計每個使用者存取的頻次;進而將所有統計結果分到一組,用另一個 reduce 運算元實現 maxBy 的功能,記錄所有使用者中存取頻次最高的那個,也就是當前存取量最大的使用者是誰。

package com.rosh.flink.test;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
/**
 * 我們將資料流按照使用者 id 進行分割區,然後用一個 reduce 運算元實現 sum 的功能,統計每個
 * 使用者存取的頻次;進而將所有統計結果分到一組,用另一個 reduce 運算元實現 maxBy 的功能,
 * 記錄所有使用者中存取頻次最高的那個,也就是當前存取量最大的使用者是誰。
 */
public class TransReduceTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //隨機生成資料
        Random random = new Random();
        List<Integer> userIds = new ArrayList<>();
        for (int i = 1; i <= 10; i++) {
            userIds.add(random.nextInt(5));
        }
        DataStreamSource<Integer> userIdDS = env.fromCollection(userIds);
        //每個ID存取記錄一次
        SingleOutputStreamOperator<Tuple2<Integer, Long>> mapDS = userIdDS.map(new MapFunction<Integer, Tuple2<Integer, Long>>() {
            @Override
            public Tuple2<Integer, Long> map(Integer value) throws Exception {
                return new Tuple2<>(value, 1L);
            }
        });
        //統計每個user存取多少次
        SingleOutputStreamOperator<Tuple2<Integer, Long>> sumDS = mapDS.keyBy(tuple -> tuple.f0).reduce(new ReduceFunction<Tuple2<Integer, Long>>() {
            @Override
            public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
                return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
            }
        });
        sumDS.print("sumDS  ->>>>>>>>>>>>>");
        //把所有分割區合併,求出最大的存取量
        SingleOutputStreamOperator<Tuple2<Integer, Long>> maxDS = sumDS.keyBy(key -> true).reduce(new ReduceFunction<Tuple2<Integer, Long>>() {
            @Override
            public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
                if (value1.f1 > value2.f1) {
                    return value1;
                } else {
                    return value2;
                }
            }
        });
        maxDS.print("maxDS ->>>>>>>>>>>");
        env.execute("TransReduceTest");
    }
}

到此這篇關於Flink實現特定統計的歸約聚合reduce操作的文章就介紹到這了,更多相關Flink歸約聚合內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com