温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

什么是Flink windows和Time操作

发布时间:2021-09-13 09:22:14 来源:亿速云 阅读:129 作者:柒染 栏目:大数据

本篇文章为大家展示了什么是Flink windows和Time操作,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

Time类型

在Flink中常用的Time类型:

  • 处理时间

  • 摄取时间

  • 事件时间

什么是Flink windows和Time操作

处理时间

是上图中,最后一步的处理时间,表示服务器中执行相关操作的处理时间。例如一些算子操作时间,在服务器上面的时间。

如果你以处理时间作为流处理的时间处理方式,那么所有的基于时间的操作都会使用服务器的时间,来运行相关的操作。例如:一个小时的处理时间窗口,将会包含一个小时内的到达服务器内的所有数据。例如应用程序9:15am开始执行,第一个小时的时间处理窗口会包含所有的9:15到10:15内的事件数据,下一个时间窗口是10:15到11:15内的所有数据。

处理时间是最简单的事件处理方式,并不需要流和机器的时间协调。因此提供了高性能和低延迟。然而在分布式环境中或者异步环境中处理时间并不能够提供准确性(也就是说在处理数据时,由于网络的抖动在一个处理时间窗口中例如9:15到10:15,很大可能包括9:00的事件数据)。

事件时间

事件时间是每一个设备上每一个单独事件发生的时间例如手机登录APP的日志时间。这个时间就是这条数据记录的时间。每一条数据都有一个时间戳表示这条数据的事件发生时间。这个时间取决于每条数据,而并不会依赖于机器的时间。事件时间处理时必须指定如何获得Event Time watermarks(用来描述Event Time如何处理)。

按照事件时间处理数据,处理结果应该是完全一致,也就是说无论处理多少次结果都是一样的,这就是所谓的大数据处理的幂等性。 不管事件到达时间和事件是不是有序到达(在生产环境中,数据往往进入到服务器中的时间和顺序是不一定的,有可能先产生的数据后到达服务器,这取决于很多网络因素)

摄取时间

摄取时间表示某个事件数据进入到Flink的时间。在source操作中,每条记录都会得到source的当前时间戳,也就是接收到的数据自动会有一个摄取时间,也就是例如时间窗都是基于这个时间来处理的。

摄取时间是处于事件时间和处理时间之间。如上图所示。摄取时间是有成本的,但是却是结果可预测的。因为摄取时间使用了稳定的时间戳(在source端只会分配一次),每一条数据的时间戳都是固定的。并且同一摄取时间的数据有可能被分配到不同的处理时间窗口中。

Windows

Windows使我们处理无限数据流(源源不断的进来)的核心部件。Windows把我们的数据流拆成一个个的buckets。我们需要把算子作用到buckets上面去。

第一件事情就是需要指定我们的流数据是不是有key,有key和没有key对应的算子是完全不一样的。

Keyed windows

带keyby,会结合windows一起使用。输入的数据内容中的任意属性都可以作为一个key。在这个流上可以允许窗口多任务并行计算,每一个逻辑key都可以被独立计算,相同的key的数据会被发送到相同的并行任务中去处理。

Non-Keyed windows

通过使用windowAll来指定。原始的数据流不会被拆分成多个逻辑任务,所有窗口逻辑都是一个窗口任务来执行,所以并行度是1。

windows 生命周期

简而言之,当第一个元素到达对应的窗口时,一个windows就会被开始创建。当时间(不管是event时间还是processing时间)达到时间戳范围,就会移除窗口。另外,每一个窗口都有一个Trigger和window Functions,当数据到达窗口后,执行的函数就是window Functions,这个函数包含了对这个窗口内容的所有计算,当Trigger达到一定条件之后,就会触发。

Windows Assigners

在指定流数据是否带key之后,下一步就是定义窗口的分配器(windows assigner),windows assigner的职责是定义每一个传入的元素如何分配到窗口内。对于keyby使用window()方法,对于non-keyby使用windowAll()方法。

WindowAssigner is responsible for assigning each incoming element to one or more windows. 

 每个传入的数据分配给一个或多个窗口。

Flink内置的window assigner对于大多数场景来讲基本上是够用的(tumbling windows滚动窗口, sliding windows滑动窗口, session windows会话窗口 and global windows全局窗口)。也可以通过继承WindowAssigner来自定义一个window assigner。所有的内置window assigner(除了全局窗口)都是基于时间(处理时间或事件时间)来分配数据的。

基于时间的窗口有一个开始的timestamp(inclusive)和结束timestamp(exclusive)表示窗口的大小。

Flink中对于窗口的划分有两大类,第一大类是基于time(用的最多),第二大类是基于count。

Tumbling Windows 滚动窗口

滚动窗口分配器将分配每一个元素到一个指定大小的窗口,这种类型的窗口有一个固定的大小而且不会有重叠的。上面这张图就是随着时间流按照指定的时间间隔拆开。

简单实例代码:

Scala

object WindosApp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("192.168.227.128", 9999)
    text.flatMap(_.split(",")).map((_,1)).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1)

    env.execute("WindosApp")
  }

}

上面的代码表示监听socket数据流,每隔5秒获取一次数据。timeWindow表示根据时间来划分窗口,(此外还有countWindow根据数量来划分窗口)。默认时间是processTime处理时间。

Java

public class JavaWindowApp {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> text = env.socketTextStream("192.168.227.128", 9999);
        text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] tokens = value.toLowerCase().split(",");
                for(String token: tokens) {
                    if(token.length()>0){
                        out.collect(new Tuple2<String, Integer>(token, 1));
                    }
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);

        env.execute("JavaWindowApp");

    }
}

Sliding Windows滑动窗口

什么是Flink windows和Time操作

滑动窗口分配器分配每一个元素到一个固定大小的窗口,类似于滚动窗口,窗口大小可以通过配置进行修改,但是滑动窗口还有另外一个附加滑动参数控制滑动窗口什么时候启动,所以这个窗口是有可能重叠的。

上面图的意思是window1的窗口大小是10分钟,滑动大小是5分钟,也就是每隔5分钟产生一个窗口,这个窗口的大小是10分钟,这个窗口就是window2,然后window2又过5分钟产生一个窗口,窗口的大小是10分钟 window3,以此类推。所以滑动窗口处理的数据可能会有重叠。一个数据元素可能会在多个窗口中进行处理。

使用场景:每个半个小时统计前一个小时的TopN。

object WindosApp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("192.168.227.128", 9999)
    text.flatMap(_.split(",")).map((_,1)).keyBy(0)
      //.timeWindow(Time.seconds(5)) # 滚动窗口
      .timeWindow(Time.seconds(10),Time.seconds(5))
      .sum(1).print().setParallelism(1)

    env.execute("WindosApp")
  }

}

每隔5秒统计近10秒的数据。所以当服务器端输入:

a,a,a,b,b,b
a,a,a,b,b,b
a,b,a,b,a,a

时,控制台会打印两遍结果:

(a,10)
(b,8)
(b,8)
(a,10)

Window Functions

在定义窗口分配器之后,就需要指定基于每一个窗口的计算方法了(在上面的例子中我们做了一个keyby sum操作)。window function会处理窗口中的每一个元素。window function包括如下几个:

  • ReduceFunction

  • AggregationFunction

  • FoldFunction

  • ProcessWindowFunction

ReduceFunction和AggregationFunction的执行效率更高,因为Flink会在数据到达每一个窗口时首先做一个增量聚合操作。ProcessWindowFunction拿到的是包含在窗口中的所有的元素以及附加信息一个Iterable,是一个全量聚合。因此ProcessWindowFunction的执行效率不高,因为Flink会缓存窗口中的所有数据。

ReduceFunction

input中的两个元素进行结合产生一个同样类型的输出。这里我们举例,通过传入的数据类型是数值类型来演示增量效果。

Scala

object WindowReduceApp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("192.168.227.128", 9999)
    text.flatMap(_.split(","))
      .map(x=>(1,x.toInt)) // 1,2,3,4,5 => (1,1) (1,2) (1,3) (1,4) (1,5)
      .keyBy(0) //因为key都是1, 所以所有的元素都到一个task去执行
      .timeWindow(Time.seconds(5)) // 滚动窗口
      .reduce((v1, v2) => { //// reduce函数作用在窗口之上,就可以完成窗口中的增量操作,不用等所有的数据到达之后进行一次性处理,而是数据两两处理
      println(v1 + "....." + v2)
      (v1._1, v1._2 + v2._2)
    })
      .print().setParallelism(1)
    env.execute("WindowReduceApp")
  }
}

服务器端输入:

1,2,3,4,5

控制台中输出如下:

(1,1).....(1,2)
(1,3).....(1,3)
(1,6).....(1,4)
(1,10).....(1,5)
(1,15)

reduce函数作用在窗口之上,就可以完成窗口中的增量操作,不用等所有的数据到达之后进行一次性处理,而是数据两两处理。

Java

public class JavaWindowReduceApp {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> text = env.socketTextStream("192.168.227.128", 9999);
        text.flatMap(new FlatMapFunction<String, Tuple2<Integer, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) throws Exception {
                String[] tokens = value.toLowerCase().split(",");
                for(String token: tokens) {
                    if(token.length()>0){
                        out.collect(new Tuple2<Integer, Integer>(1, Integer.parseInt(token)));
                    }
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(5))
                .reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {
                    @Override
                    public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
                        System.out.println("value1 = [" + value1 + "], value2 = [" + value2 + "]");
                        return new Tuple2<>(value1.f0,value1.f1 + value2.f1);
                    }
                })
                .print().setParallelism(1);

        env.execute("JavaWindowApp");

    }
}

输出结果如下:

value1 = [(1,1)], value2 = [(1,2)]
value1 = [(1,3)], value2 = [(1,3)]
value1 = [(1,6)], value2 = [(1,4)]
value1 = [(1,10)], value2 = [(1,5)]
(1,15)

ProcessWindowFunction

ProcessWindowFunction可以拿到一个Iterable,可以拿到窗口中的所有元素,并且有一个上下文对象可以访问时间和状态信息,比reducefunction可以提供更多的功能。但这样却可以带来资源和性能的开销,因为元素并不能通过增量的方式去聚合,相反,它需要把所有的数据都放在一个buffer中。

public class JavaWindowProcessApp {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> text = env.socketTextStream("192.168.227.128", 9999);
        text.flatMap(new FlatMapFunction<String, Tuple2<Integer, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) throws Exception {
                String[] tokens = value.toLowerCase().split(",");
                for(String token: tokens) {
                    if(token.length()>0){
                        out.collect(new Tuple2<Integer, Integer>(1, Integer.parseInt(token)));
                    }
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(5))
                .process(new ProcessWindowFunction<Tuple2<Integer, Integer>, Object, Tuple, TimeWindow>() {
                    @Override
                    public void process(Tuple tuple, Context context, Iterable<Tuple2<Integer, Integer>> elements, Collector<Object> out) throws Exception {
                        System.out.println("tuple = [" + tuple + "], context = [" + context + "], elements = [" + elements + "], out = [" + out + "]");
                        long count = 0;
                        for(Tuple2<Integer, Integer> in:elements) {
                            count++;
                        }
                        out.collect("window:" + context.window() + "count:" + count);
                    }
                })
                .print().setParallelism(1);

        env.execute("JavaWindowApp");

    }
}

服务器输入:

1,2,3,4,5

控制台输出:

tuple = [(1)], context = [org.apache.flink.streaming.runtime.operators.windowing.functions.InternalProcessWindowContext@40e09d6c], elements = [[(1,1), (1,2), (1,3), (1,4), (1,5)]], out = [org.apache.flink.streaming.api.operators.TimestampedCollector@4e277b00]
window:TimeWindow{start=1568542160000, end=1568542165000}count:5

只输出一次,说明是等待所有数据都拿到之后才进行处理。

使用场景:窗口内的数据进行排序。在Reduce中是无法进行排序的。

上述内容就是什么是Flink windows和Time操作,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注亿速云行业资讯频道。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI