首頁 > 軟體

Storm Windowing storm滑動視窗簡介

2020-06-16 17:23:15

Storm Windowing

簡介

Storm可同時處理視窗內的所有tuple。視窗可以從時間或數量上來劃分,由如下兩個因素決定:

  • 視窗的長度,可以是時間間隔或Tuple數量;
  • 滑動間隔(sliding Interval),可以是時間間隔或Tuple數量;

要確保topo的過期時間大於視窗的大小加上滑動間隔

Sliding Window:滑動視窗

按照固定的時間間隔或者Tuple數量滑動視窗。

  • 如果滑動間隔和視窗大小一樣則等同於滾窗,
  • 如果滑動間隔大於視窗大小則會丟失資料,
  • 如果滑動間隔小於視窗大小則會視窗重疊。

Tumbling Window:捲動視窗

元組被單個視窗處理,一個元組只屬於一個視窗,不會有視窗重疊。
根據我自己的經驗其實一般用捲動就可以了

構造builder的時候支援以下的設定

(時間和數量的排列組合):

  • withWindow(Count windowLength, Count slidingInterval)
    滑窗 視窗長度:tuple數, 滑動間隔: tuple數
  • withWindow(Count windowLength)
    滑窗 視窗長度:tuple數, 滑動間隔: 每個tuple進來都滑
  • withWindow(Count windowLength, Duration slidingInterval)
    滑窗 視窗長度:tuple數, 滑動間隔: 時間間隔
  • withWindow(Duration windowLength, Duration slidingInterval)
    滑窗 視窗長度:時間間隔, 滑動間隔: 時間間隔
  • withWindow(Duration windowLength)
    滑窗 視窗長度:時間間隔, 滑動間隔: 每個tuple進來都滑
  • withWindow(Duration windowLength, Count slidingInterval)
    滑窗 視窗長度:時間間隔, 滑動間隔: 時間間隔
  • withTumblingWindow(BaseWindowedBolt.Count count)
    滾窗 視窗長度:Tuple數
  • withTumblingWindow(BaseWindowedBolt.Duration duration)
    滾窗 視窗長度:時間間隔

Tuple時間戳和亂序

storm支援追蹤源資料的時間戳。
Event time 和Process time
預設的時間戳是處理元組時的bolt視窗生成的,
Event time,事件時間,通常這個時間會帶在Tuple中;
Process time,到某一個處理環節的時間。
舉例:A今天早上9點告訴B,說C昨天晚上9點在濱江國際;
這條資訊中,可以認為C在濱江國際的Event time是昨天晚上9點,B接收到這條資訊的時間,即Process time,是今天早上9點。

設定時間戳欄位(timestamp field)

windows按照時間劃分時,預設是Process time,也可以指定為Tuple中的Event time。
如果以Event time來劃分視窗:

  1. Tuple落入到哪個視窗,是看tuple裡的Event time。
  2. 視窗向後推進,主要依靠Event time的增長;
public BaseWindowedBolt withTimestampField(String fieldName)

延時(lag)和水位線(watermark)

從當前最後一條資料算起,往前減去lag,得到一個時間,這個時間就是watermark;
認為watermark之前的資料都已經到了。收到06:01:00的資料時,認為06:00:00的資料都到了。給他們入window。
這樣實際是一個延時處理,等到了06:01:00時,我才開始將06:00:00的資料放入視窗。

如果很不巧,06:00:00的資料在06:01:00之後,lag為60s,不好意思,進不了視窗。此資料不會被處理,並且會在worker的紀錄檔中加一行INFO資訊。

public class SlidingWindowBolt extends BaseWindowedBolt {
    private OutputCollector collector;

    @Override
    publicvoidprepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    publicvoidexecute(TupleWindow inputWindow) {
      for(Tuple tuple: inputWindow.get()) {
        // do the windowing computation
        ...
      }
      // emit the results
      collector.emit(new Values(computedValue));
    }
}

publicstaticvoidmain(String[] args) {
    TopologyBuilder builder = new TopologyBuilder();
     builder.setSpout("spout", new RandomSentenceSpout(), 1);
     builder.setBolt("slidingwindowbolt",
                     new SlidingWindowBolt().withWindow(new Count(30), new Count(10)),
                     1).shuffleGrouping("spout");
    Config conf = new Config();
    conf.setDebug(true);
    conf.setNumWorkers(1);

    StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());

}

Storm的安裝步驟 http://www.linuxidc.com/Linux/2016-08/134184.htm

Kafka-Storm 整合部署 http://www.linuxidc.com/Linux/2016-03/129063.htm

Storm在Ubuntu環境下的單機部署 http://www.linuxidc.com/Linux/2016-03/129060.htm

本文永久更新連結地址http://www.linuxidc.com/Linux/2017-01/139946.htm


IT145.com E-mail:sddin#qq.com