2021-05-12 14:32:11
Storm Windowing storm滑動視窗簡介
Storm Windowing
簡介
Storm可同時處理視窗內的所有tuple。視窗可以從時間或數量上來劃分,由如下兩個因素決定:
- 視窗的長度,可以是時間間隔或Tuple數量;
- 滑動間隔(sliding Interval),可以是時間間隔或Tuple數量;
要確保topo的過期時間大於視窗的大小加上滑動間隔
Sliding Window:滑動視窗
按照固定的時間間隔或者Tuple數量滑動視窗。
- 如果滑動間隔和視窗大小一樣則等同於滾窗,
- 如果滑動間隔大於視窗大小則會丟失資料,
- 如果滑動間隔小於視窗大小則會視窗重疊。
Tumbling Window:捲動視窗
元組被單個視窗處理,一個元組只屬於一個視窗,不會有視窗重疊。
根據我自己的經驗其實一般用捲動就可以了
構造builder的時候支援以下的設定
(時間和數量的排列組合):
- withWindow(Count windowLength, Count slidingInterval)
滑窗 視窗長度:tuple數, 滑動間隔: tuple數 - withWindow(Count windowLength)
滑窗 視窗長度:tuple數, 滑動間隔: 每個tuple進來都滑 - withWindow(Count windowLength, Duration slidingInterval)
滑窗 視窗長度:tuple數, 滑動間隔: 時間間隔 - withWindow(Duration windowLength, Duration slidingInterval)
滑窗 視窗長度:時間間隔, 滑動間隔: 時間間隔 - withWindow(Duration windowLength)
滑窗 視窗長度:時間間隔, 滑動間隔: 每個tuple進來都滑 - withWindow(Duration windowLength, Count slidingInterval)
滑窗 視窗長度:時間間隔, 滑動間隔: 時間間隔 - withTumblingWindow(BaseWindowedBolt.Count count)
滾窗 視窗長度:Tuple數 - withTumblingWindow(BaseWindowedBolt.Duration duration)
滾窗 視窗長度:時間間隔
Tuple時間戳和亂序
storm支援追蹤源資料的時間戳。
Event time 和Process time
預設的時間戳是處理元組時的bolt視窗生成的,
Event time,事件時間,通常這個時間會帶在Tuple中;
Process time,到某一個處理環節的時間。
舉例:A今天早上9點告訴B,說C昨天晚上9點在濱江國際;
這條資訊中,可以認為C在濱江國際的Event time是昨天晚上9點,B接收到這條資訊的時間,即Process time,是今天早上9點。
設定時間戳欄位(timestamp field)
windows按照時間劃分時,預設是Process time,也可以指定為Tuple中的Event time。
如果以Event time來劃分視窗:
- Tuple落入到哪個視窗,是看tuple裡的Event time。
- 視窗向後推進,主要依靠Event time的增長;
public BaseWindowedBolt withTimestampField(String fieldName)
延時(lag)和水位線(watermark)
從當前最後一條資料算起,往前減去lag,得到一個時間,這個時間就是watermark;
認為watermark之前的資料都已經到了。收到06:01:00的資料時,認為06:00:00的資料都到了。給他們入window。
這樣實際是一個延時處理,等到了06:01:00時,我才開始將06:00:00的資料放入視窗。
如果很不巧,06:00:00的資料在06:01:00之後,lag為60s,不好意思,進不了視窗。此資料不會被處理,並且會在worker的紀錄檔中加一行INFO資訊。
public class SlidingWindowBolt extends BaseWindowedBolt {
private OutputCollector collector;
@Override
publicvoidprepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
publicvoidexecute(TupleWindow inputWindow) {
for(Tuple tuple: inputWindow.get()) {
// do the windowing computation
...
}
// emit the results
collector.emit(new Values(computedValue));
}
}
publicstaticvoidmain(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 1);
builder.setBolt("slidingwindowbolt",
new SlidingWindowBolt().withWindow(new Count(30), new Count(10)),
1).shuffleGrouping("spout");
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
Storm的安裝步驟 http://www.linuxidc.com/Linux/2016-08/134184.htm
Kafka-Storm 整合部署 http://www.linuxidc.com/Linux/2016-03/129063.htm
Storm在Ubuntu環境下的單機部署 http://www.linuxidc.com/Linux/2016-03/129060.htm
相關文章