<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
如果說簡單聚合是對一些特定統計需求的實現,那麼 reduce 運算元就是一個一般化的聚合統計操作了。從大名鼎鼎的 MapReduce 開始,我們對 reduce 操作就不陌生:它可以對已有的
資料進行歸約處理,把每一個新輸入的資料和當前已經歸約出來的值,再做一個聚合計算。與簡單聚合類似,reduce 操作也會將 KeyedStream 轉換為 DataStream。它不會改變流的元
素資料型別,所以輸出型別和輸入型別是一樣的。呼叫 KeyedStream 的 reduce 方法時,需要傳入一個引數,實現 ReduceFunction 介面。介面在原始碼中的定義如下:
@Public @FunctionalInterface public interface ReduceFunction<T> extends Function, Serializable { /** * The core method of ReduceFunction, combining two values into one value of the same type. The * reduce function is consecutively applied to all values of a group until only a single value * remains. * * @param value1 The first value to combine. * @param value2 The second value to combine. * @return The combined value of both input values. * @throws Exception This method may throw exceptions. Throwing an exception will cause the * operation to fail and may trigger recovery. */ T reduce(T value1, T value2) throws Exception; }
ReduceFunction 介面裡需要實現 reduce()方法,這個方法接收兩個輸入事件,經過轉換處理之後輸出一個相同型別的事件;所以,對於一組資料,我們可以先取兩個進行合併,然後再
將合併的結果看作一個資料、再跟後面的資料合併,最終會將它“簡化”成唯一的一個資料,這也就是 reduce“歸約”的含義。在流處理的底層實現過程中,實際上是將中間“合併的結果”
作為任務的一個狀態儲存起來的;之後每來一個新的資料,就和之前的聚合狀態進一步做歸約。
其實,reduce 的語意是針對列表進行規約操作,運算規則由 ReduceFunction 中的 reduce方法來定義,而在 ReduceFunction 內部會維護一個初始值為空的累加器,注意累加器的型別
和輸入元素的型別相同,當第一條元素到來時,累加器的值更新為第一條元素的值,當新的元素到來時,新元素會和累加器進行累加操作,這裡的累加操作就是 reduce 函數定義的運算規
則。然後將更新以後的累加器的值向下遊輸出。
我們可以單獨定義一個函數類實現 ReduceFunction 介面,也可以直接傳入一個匿名類。當然,同樣也可以通過傳入 Lambda 表示式實現類似的功能。與簡單聚合類似,reduce 操作也會將 KeyedStream 轉換為 DataStrema。它不會改變流的元素資料型別,所以輸出型別和輸入型別是一樣的。下面我們來看一個稍複雜的例子。
我們將資料流按照使用者 id 進行分割區,然後用一個 reduce 運算元實現 sum 的功能,統計每個使用者存取的頻次;進而將所有統計結果分到一組,用另一個 reduce 運算元實現 maxBy 的功能,記錄所有使用者中存取頻次最高的那個,也就是當前存取量最大的使用者是誰。
package com.rosh.flink.test; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.ArrayList; import java.util.List; import java.util.Random; /** * 我們將資料流按照使用者 id 進行分割區,然後用一個 reduce 運算元實現 sum 的功能,統計每個 * 使用者存取的頻次;進而將所有統計結果分到一組,用另一個 reduce 運算元實現 maxBy 的功能, * 記錄所有使用者中存取頻次最高的那個,也就是當前存取量最大的使用者是誰。 */ public class TransReduceTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //隨機生成資料 Random random = new Random(); List<Integer> userIds = new ArrayList<>(); for (int i = 1; i <= 10; i++) { userIds.add(random.nextInt(5)); } DataStreamSource<Integer> userIdDS = env.fromCollection(userIds); //每個ID存取記錄一次 SingleOutputStreamOperator<Tuple2<Integer, Long>> mapDS = userIdDS.map(new MapFunction<Integer, Tuple2<Integer, Long>>() { @Override public Tuple2<Integer, Long> map(Integer value) throws Exception { return new Tuple2<>(value, 1L); } }); //統計每個user存取多少次 SingleOutputStreamOperator<Tuple2<Integer, Long>> sumDS = mapDS.keyBy(tuple -> tuple.f0).reduce(new ReduceFunction<Tuple2<Integer, Long>>() { @Override public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception { return new Tuple2<>(value1.f0, value1.f1 + value2.f1); } }); sumDS.print("sumDS ->>>>>>>>>>>>>"); //把所有分割區合併,求出最大的存取量 SingleOutputStreamOperator<Tuple2<Integer, Long>> maxDS = sumDS.keyBy(key -> true).reduce(new ReduceFunction<Tuple2<Integer, Long>>() { @Override public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception { if (value1.f1 > value2.f1) { return value1; } else { return value2; } } }); maxDS.print("maxDS ->>>>>>>>>>>"); env.execute("TransReduceTest"); } }
到此這篇關於Flink實現特定統計的歸約聚合reduce操作的文章就介紹到這了,更多相關Flink歸約聚合內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45