flink重温笔记(十九), flinkSQL 顶层 API ——FlinkSQL 窗口(解决动态累积数据业务需求),Flink重温笔记(十九),FlinkSQL顶层API详解——动态累积数据业务的窗口解决方案

马肤

温馨提示:这篇文章已超过464天没有更新,请注意相关的内容是否还可用!

本文介绍了FlinkSQL顶层API中的FlinkSQL窗口概念,用于解决动态累积数据业务需求。通过FlinkSQL窗口,可以处理流数据并对其进行窗口化操作,实现数据的累积计算。该API提供了灵活的窗口操作方式,支持时间窗口和计数窗口,能够满足各种业务需求。本文简要概述了FlinkSQL窗口的使用方法和应用场景,为开发者提供了使用Flink处理动态数据的思路和方法。

Flink学习笔记

前言:今天是学习 flink 的第 19 天啦!学习了 flinkSQL 中窗口的应用,包括滚动窗口,滑动窗口,会话窗口,累计窗口,学会了如何计算累计值(类似于中视频计划中的累计播放量业务需求),多维数据分析等大数据热点问题,总结了很多自己的理解和想法,希望和大家多多交流,希望对大家有帮助!

flink重温笔记(十九), flinkSQL 顶层 API ——FlinkSQL 窗口(解决动态累积数据业务需求),Flink重温笔记(十九),FlinkSQL顶层API详解——动态累积数据业务的窗口解决方案 第1张
(图片来源网络,侵删)

Tips:"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起分享这份快乐吧😊!

喜欢我的博客的话,记得点个红心❤️和小关小注哦!您的支持是我创作的动力!"

flink重温笔记(十九), flinkSQL 顶层 API ——FlinkSQL 窗口(解决动态累积数据业务需求),Flink重温笔记(十九),FlinkSQL顶层API详解——动态累积数据业务的窗口解决方案 第2张
(图片来源网络,侵删)

文章目录

  • Flink学习笔记
    • 六、FlinkSQL 窗口
      • 1. 窗口表值函数(tvfs)
      • 2. 窗口分类函数及聚合操作
        • 2.1 滚动窗口(Tumble Windows)
        • 2.2 滑动窗口(Hop Windows)
        • 2.3 会话窗口(Session Windows,暂不支持 Window TVF)
        • 2.4 累计窗口(Comulate Windows flink1.13 版本新特性)
        • 3. 多维数据分析
          • 3.1 GROUPING SETS
          • 3.2 ROLLUP
          • 3.3 CUBE
          • 3.4 GROUPING 和 GROUPING_ID
            • 3.4.1 GROUPING 函数
            • 3.4.2 GROUPING_ID(兼容 Hive)
            • 3.5 Window Top-N
            • 4. Over Windows
              • 4.1 ROWS OVER WINDOW
              • 4.2 RANGE OVER WINDOW
              • 5. TableAPI 窗口的定义
                • 5.1.1 滚动窗口
                • 5.1.2 滑动窗口
                • 5.1.3 会话窗口

                  六、FlinkSQL 窗口

                  1. 窗口表值函数(tvfs)

                  将流变成特殊的“批”处理,常用的窗口:

                  • 滑动窗口
                  • 滚动窗口
                  • 会话窗口(flink 1.14 版本支持)
                  • 累计窗口(flink 1.13 版本新增)

                    在 flink 1.13 之前,是一个特殊的 GroupWindowFunction

                    SELECT
                    	TUMBLE_START( bidtime, INTERVAL '10' MINUTE),
                    	TUMBLE_END( bidtime, INTERVAL '10' MINUTE),
                    	TUMBLE_ROWTIME( bidtime, INTERVAL '10' MINUTE),
                    	SUM(price)
                    FROM MyTable
                    GROUP BY TUMBLE( bidtime, INTERVAL '10' MINUTE),
                    

                    在 flink 1.13 之后,用 Table-Value Function 进行语法标准化

                    SELECT window_start, window_end, window_time, SUM(price)
                    FROM TABLE(
                    	TUMBLE(TABLE MyTable, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
                    )
                    GROUP BY window_start, window_end;
                    

                    2. 窗口分类函数及聚合操作

                    2.1 滚动窗口(Tumble Windows)

                    语法:

                    TUMBLE(TABLE data, DESCRIPTOR(timecol), size)
                    data:一个表名。
                    timecol:是一个列描述符,指示应将数据的哪个时间属性列映射到翻转窗口。
                    size:是指定滚动窗口宽度的持续时间。
                    

                    数据:

                    2021-04-15 08:05:00,4.00,C
                    2021-04-15 08:07:00,2.00,A
                    2021-04-15 08:09:00,5.00,D
                    2021-04-15 08:11:00,3.00,B
                    2021-04-15 08:13:00,1.00,E
                    2021-04-15 08:17:00,6.00,F
                    

                    需求:现在有一个实时数据看板,需要计算当前每10分钟GMV的总和

                    package cn.itcast.day02.Window;
                    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                    import org.apache.flink.table.api.EnvironmentSettings;
                    import org.apache.flink.table.api.Table;
                    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
                    import org.apache.flink.types.Row;
                    /**
                     * @author lql
                     * @time 2024-03-16 17:33:47
                     * @description TODO
                     */
                    public class GroupWindowsSqlTumbleExample {
                        public static void main(String[] args) throws Exception {
                            //todo 1)构建flink流处理的运行环境
                            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                            //todo 2)设置并行度
                            env.setParallelism(1);
                            //todo 3)构建flink的表的运行环境
                            EnvironmentSettings settings = EnvironmentSettings.newInstance()
                                    .useBlinkPlanner()
                                    .inStreamingMode()
                                    .build();
                            StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, settings);
                            String filePath = GroupWindowsSqlTumbleExample.class.getClassLoader().getResource("bid.csv").getPath();
                            tabEnv.executeSql("create table Bid(" +
                                    "bidtime TIMESTAMP(3)," +
                                    "price DECIMAL(10, 2), " +
                                    "item string," +
                                    "watermark for bidtime as bidtime - interval '1' second) " +
                                    "with("
                                    + "'connector' = 'filesystem',"
                                    + "'path' = 'file:///"+filePath+"',"
                                    + "'format' = 'csv'"
                                    + ")");
                            Table table = tabEnv.sqlQuery("" +
                                    "select window_start,window_end,sum(price) as sum_price " +
                                    " from table(" +
                                    "  tumble(table Bid, DESCRIPTOR(bidtime), interval '10' MINUTES))" +
                                    "  group by window_start,window_end");
                            tabEnv.toAppendStream(table, Row.class).print();
                            env.execute();
                        }
                    }
                    

                    结果:

                    +I[2021-04-15T08:00, 2021-04-15T08:10, 11.00]
                    +I[2021-04-15T08:10, 2021-04-15T08:20, 10.00]
                    

                    2.2 滑动窗口(Hop Windows)

                    语法:

                    HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
                    data:是一个表名。
                    timecol:是一个列描述符,指示应将数据的哪个时间属性列映射到滑动窗口。
                    slide:是一个持续时间,指定了连续跳跃窗口开始之间的持续时间
                    size:是指定跳变窗口宽度的持续时间
                    

                    需求:每隔 5 分钟,统计 10 分钟的数据

                    package cn.itcast.day02.Window;
                    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                    import org.apache.flink.table.api.EnvironmentSettings;
                    import org.apache.flink.table.api.Table;
                    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
                    import org.apache.flink.types.Row;
                    /**
                     * @author lql
                     * @time 2024-03-16 19:28:30
                     * @description TODO
                     */
                    public class GroupWindowsSqlHopExample {
                        public static void main(String[] args) throws Exception {
                            //todo 1)构建flink流处理的运行环境
                            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                            //todo 2)设置并行度
                            env.setParallelism(1);
                            //todo 3)构建flink的表的运行环境
                            EnvironmentSettings settings = EnvironmentSettings.newInstance()
                                    .useBlinkPlanner()
                                    .inStreamingMode()
                                    .build();
                            StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, settings);
                            String filePath = GroupWindowsSqlHopExample.class.getClassLoader().getResource("bid.csv").getPath();
                            tabEnv.executeSql("create table Bid(" +
                                    "bidtime TIMESTAMP(3)," +
                                    "price DECIMAL(10, 2), " +
                                    "item string," +
                                    "watermark for bidtime as bidtime - interval '1' second) " +
                                    "with("
                                    + "'connector' = 'filesystem',"
                                    + "'path' = 'file:///"+filePath+"',"
                                    + "'format' = 'csv'"
                                    + ")");
                            Table table = tabEnv.sqlQuery("" +
                                    "select window_start,window_end,sum(price) as sum_price " +
                                    " from table(" +
                                    "  hop(table Bid, DESCRIPTOR(bidtime), interval '5' MINUTES, interval '10' MINUTES))" +
                                    "  group by window_start,window_end");
                            tabEnv.toAppendStream(table, Row.class).print();
                            env.execute();
                        }
                    }
                    

                    结果:

                    +I[2021-04-15T08:00, 2021-04-15T08:10, 11.00]
                    +I[2021-04-15T08:05, 2021-04-15T08:15, 15.00]
                    +I[2021-04-15T08:10, 2021-04-15T08:20, 10.00]
                    +I[2021-04-15T08:15, 2021-04-15T08:25, 6.00]
                    

                    2.3 会话窗口(Session Windows,暂不支持 Window TVF)

                    Flink1.13 版本中不支持 Window TVF,预计在 flink1.14 版本中支持;

                    需求:用老版本实现,定义 Session Gap 为3分钟,一个窗口最后一条数据之后的三分钟内没有新数据出现,则该窗口关闭,再之后的数据被归为下一个窗口

                    package cn.itcast.day02.Window;
                    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
                    /**
                     * @author lql
                     * @time 2024-03-16 19:37:20
                     * @description TODO
                     */
                    public class GroupWindowsSqlSessionExample {
                        public static void main(String[] args) {
                            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                            env.setParallelism(1);
                            StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
                            String filePath = GroupWindowsSqlSessionExample.class.getClassLoader().getResource("bid.csv").getPath();
                            // 作为事件时间的字段必须是 timestamp 类型, 所以根据 long 类型的 ts 计算出来一个 t
                            tEnv.executeSql("create table Bid(" +
                                    "bidtime TIMESTAMP(3)," +
                                    "price DECIMAL(10, 2), " +
                                    "item string," +
                                    "watermark for bidtime as bidtime - interval '1' second) " +
                                    "with("
                                    + "'connector' = 'filesystem',"
                                    + "'path' = 'file:///"+filePath+"',"
                                    + "'format' = 'csv'"
                                    + ")");
                            tEnv.sqlQuery("SELECT " +
                                                    "  SESSION_START(bidtime, INTERVAL '3' minute) as wStart,  " +
                                                    "  SESSION_END(bidtime, INTERVAL '3' minute) as wEnd,  " +
                                                    "  SUM(price) sum_price " +
                                                    "FROM Bid " +
                                                    "GROUP BY SESSION(bidtime, INTERVAL '3' minute)"
                                    )
                                    .execute()
                                    .print();
                        }
                    }
                    

                    结果:

                    +----+-------------------------+-------------------------+-----------+
                    | op |                  wStart |                    wEnd | sum_price |
                    +----+-------------------------+-------------------------+-----------+
                    | +I | 2021-04-15 08:05:00.000 | 2021-04-15 08:16:00.000 |     15.00 |
                    | +I | 2021-04-15 08:17:00.000 | 2021-04-15 08:20:00.000 |      6.00 |
                    +----+-------------------------+-------------------------+-----------+
                    2 rows in set
                    

                    2.4 累计窗口(Comulate Windows flink1.13 版本新特性)

                    语法:

                    CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
                    TABLE 表名称
                    DESCRIPTOR 表中作为开窗的时间字段名称
                    step 大窗口的分割长度
                    size 指定最大的那个时间窗口
                    

                    需求:10 分钟作为窗口,统计每隔两分钟的累计数(类似于中视频计划计算播放量完美累计曲线!)

                    package cn.itcast.day02.Window;
                    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
                    /**
                     * @author lql
                     * @time 2024-03-16 19:45:02
                     * @description TODO
                     */
                    public class GroupWindowsSqlCumulateExample {
                        public static void main(String[] args) {
                            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                            env.setParallelism(1);
                            StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
                            String filePath = GroupWindowsSqlCumulateExample.class.getClassLoader().getResource("bid.csv").getPath();
                            // 作为事件时间的字段必须是 timestamp 类型, 所以根据 long 类型的 ts 计算出来一个 t
                            tEnv.executeSql("create table Bid(" +
                                    "bidtime TIMESTAMP(3)," +
                                    "price DECIMAL(10, 2), " +
                                    "item string," +
                                    "watermark for bidtime as bidtime - interval '1' second) " +
                                    "with("
                                    + "'connector' = 'filesystem',"
                                    + "'path' = 'file:///"+filePath+"',"
                                    + "'format' = 'csv'"
                                    + ")");
                            tEnv.sqlQuery("SELECT window_start, window_end, SUM(price) as sum_price\n" +
                                            "  FROM TABLE(\n" +
                                            "    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))\n" +
                                            "  GROUP BY window_start, window_end"
                                    )
                                    .execute()
                                    .print();
                        }
                    }
                    

                    结果:

                    +----+-------------------------+-------------------------+-----------+
                    | op |            window_start |              window_end | sum_price |
                    +----+-------------------------+-------------------------+-----------+
                    | +I | 2021-04-15 08:00:00.000 | 2021-04-15 08:06:00.000 |      4.00 |
                    | +I | 2021-04-15 08:00:00.000 | 2021-04-15 08:08:00.000 |      6.00 |
                    | +I | 2021-04-15 08:00:00.000 | 2021-04-15 08:10:00.000 |     11.00 |
                    | +I | 2021-04-15 08:10:00.000 | 2021-04-15 08:12:00.000 |      3.00 |
                    | +I | 2021-04-15 08:10:00.000 | 2021-04-15 08:14:00.000 |      4.00 |
                    | +I | 2021-04-15 08:10:00.000 | 2021-04-15 08:16:00.000 |      4.00 |
                    | +I | 2021-04-15 08:10:00.000 | 2021-04-15 08:18:00.000 |     10.00 |
                    | +I | 2021-04-15 08:10:00.000 | 2021-04-15 08:20:00.000 |     10.00 |
                    +----+-------------------------+-------------------------+-----------+
                    8 rows in set
                    

                    3. 多维数据分析

                    3.1 GROUPING SETS

                    当前效果:

                    SELECT window_start, 
                    		window_end,
                    		userId,
                    		category,
                    		sum(price) as sum_price
                    FROM TABLE(
                       TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS)) 
                    GROUP BY window_start, window_end, GROUPING SETS((userId, category), (userId), ()) 
                    

                    以前效果:

                    // ()
                    SELECT window_start, window_end, 'NULL' as userId, 'NULL' as category, sum(price) as sum_price
                    FROM TABLE(
                    TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS))
                    GROUP BY window_start, window_end
                    UNION ALL
                    // (userId)
                    SELECT window_start, window_end, userId as userId, 'NULL' as category, sum(price) as sum_price
                    FROM TABLE(
                    TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS))
                    GROUP BY window_start, window_end, userId
                    UNION ALL
                    // (userId, category)
                    SELECT window_start, window_end,userId, category, sum(price) as sum_price
                    FROM TABLE(
                    TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS))
                    GROUP BY window_start, window_end, userId, category
                    

                    3.2 ROLLUP

                    速记:从右往左,全面到稀缺!

                    GROUP BY ROLLUP(a, b, c)
                    --等价于以下语句。
                    GROUPING SETS((a,b,c),(a,b),(a), ())
                    GROUP BY ROLLUP ( a, (b, c), d )
                    --等价于以下语句。
                    GROUPING SETS (
                        ( a, b, c, d ),
                        ( a, b, c    ),
                        ( a          ),
                        (            )
                    )
                    

                    3.3 CUBE

                    速记:排列组合

                    GROUP BY CUBE(a, b, c)
                    --等价于以下语句。
                    GROUPING SETS((a,b,c),(a,b),(a,c),(b,c),(a),(b),(c),())
                    GROUP BY CUBE ( (a, b), (c, d) )
                    --等价于以下语句。
                    GROUPING SETS (
                        ( a, b, c, d ),
                        ( a, b       ),
                        (       c, d ),
                        (            )
                    )
                    // CUBE 和 GROUPING SETS 组合,相当于排列组合基础上加上元素
                    GROUP BY a, CUBE (b, c), GROUPING SETS ((d), (e))
                    --等价于以下语句。
                    GROUP BY GROUPING SETS (
                        (a, b, c, d), (a, b, c, e),
                        (a, b, d),    (a, b, e),
                        (a, c, d),    (a, c, e),
                        (a, d),       (a, e)
                    )
                    

                    3.4 GROUPING 和 GROUPING_ID

                    背景:GROUPING SETS 结果中使用 NULL 充当占位符,导致无法区分占位符 NULL 与数据中真正的 NULL。

                    3.4.1 GROUPING 函数
                    • 接受一个列名作为参数
                    • 返回0,意味着 无NULL / 来自输入数据(原本存在的空值)
                    • 返回1,意味着 NULL 是 GROUPING SETS 的占位符。

                      实例:

                      SELECT  window_start, window_end, userId, category, 
                      		GROUPING(category) as categoryFlag,
                      		sum(price) as sum_price,
                      		IF(GROUPING(category) = 0, category, 'ALL') as `all`
                      FROM TABLE(
                         		TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS)) 
                      GROUP BY window_start, window_end, GROUPING SETS((userId, category), (userId))
                      

                      结果:

                      window_startwindow_enduserIdcategorysum_priceflagall
                      2021-05-23 05:16:35.0002021-05-23 05:16:40.000NULLNULL10.11ALL
                      2021-05-23 05:16:40.0002021-05-23 05:16:45.000NULLNULL96.61ALL
                      2021-05-23 05:16:45.0002021-05-23 05:16:50.000NULLNULL15.61ALL
                      2021-05-23 05:16:35.0002021-05-23 05:16:40.000user_001电脑10.10电脑
                      2021-05-23 05:16:40.0002021-05-23 05:16:45.000user_001手机14.10手机
                      2021-05-23 05:16:40.0002021-05-23 05:16:45.000user_002手机82.50手机
                      2021-05-23 05:16:45.0002021-05-23 05:16:50.000user_001电脑15.60电脑
                      2021-05-23 05:16:35.0002021-05-23 05:16:40.000user_001NULL10.11ALL
                      2021-05-23 05:16:40.0002021-05-23 05:16:45.000user_001NULL14.11ALL
                      2021-05-23 05:16:40.0002021-05-23 05:16:45.000user_002NULL82.51ALL
                      2021-05-23 05:16:45.0002021-05-23 05:16:50.000user_001NULL15.61ALL

                      3.4.2 GROUPING_ID(兼容 Hive)

                      MaxCompute还提供了无参数的 GROUPING__ID 函数,用于兼容Hive查询。

                      结果是将参数列的GROUPING结果按照BitMap的方式组成整数

                      MaxCompute 和 Hive 2.3.0 及以上版本兼容该函数,在Hive 2.3.0以下版本中该函数输出不一致,因此并不推荐使用此函数。

                      SELECT
                      a,b,c ,
                      COUNT(*),
                      GROUPING_ID
                      FROM VALUES (1,2,3) as t(a,b,c)
                      GROUP BY a, b, c GROUPING SETS ((a,b,c), (a));
                      GROUPING_ID既无输入参数,也无括号。此表达方式在 MaxCompute 中等价于 GROUPING_ID(a,b,c),参数与 GROUP BY 的顺序一致。
                      

                      3.5 Window Top-N

                      模板:计算每10分钟营业时间窗内销售额最高的前3名供应商。

                      SELECT *
                        FROM (
                          SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
                          FROM (
                            SELECT window_start, window_end, supplier_id, SUM(price) as price, COUNT(*) as cnt
                            FROM TABLE(
                              TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
                            GROUP BY window_start, window_end, supplier_id
                          )
                      ) WHERE rownum 
                          public static void main(String[] args) throws Exception {
                              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                              env.setParallelism(1);
                              SingleOutputStreamOperator element.getTimestamp())
                                      );
                              StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
                              Table table = tableEnv
                                      .fromDataStream(dataStream, $("category"), $("timestamp").rowtime(), $("money"));
                              table
                                      .window(Tumble.over(lit(5).second())
                                              .on($("timestamp")).as("w"))  // 定义滚动窗口并给窗口起一个别名
                                      .groupBy($("category"), 
                                               $("w")) // 窗口必须出现的分组字段中
                                      .select($("category"), 
                                              $("w").start().as("window_start"), 
                                              $("w").end().as("window_end"), 
                                              $("money").sum().as("total_money"))
                                      .execute()
                                      .print();
                              env.execute();
                          }
                          @Data
                          @AllArgsConstructor
                          @NoArgsConstructor
                          public static class OrderInfo {
                              private String category;
                              private Long timestamp;
                              private Double money;
                          }
                      }
                      

                      5.1.2 滑动窗口

                      Slide 类方法:

                      • over:定义窗口长度
                      • every:定义滑动步长
                      • on:用来分组(按时间间隔)或者排序(按行数)的时间字段
                      • as:别名,必须出现在后面的groupBy中

                        例子:每隔5秒钟统计过去10秒钟每个商品类型的销售总额

                        public class GroupWindowsTableApiTumbleExample {
                            public static void main(String[] args) throws Exception {
                                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                                env.setParallelism(1);
                                SingleOutputStreamOperator dataStream = env
                                        .fromElements(
                                                new OrderInfo("电脑", 1000L, 100D),
                                                new OrderInfo("手机", 2000L, 200D),
                                                new OrderInfo("电脑", 3000L, 300D),
                                                new OrderInfo("手机", 4000L, 400D),
                                                new OrderInfo("手机", 5000L, 500D),
                                                new OrderInfo("电脑", 6000L, 600D))
                                        .assignTimestampsAndWatermarks(
                                                WatermarkStrategy
                                                        .forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                                        .withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
                                        );
                                StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
                                Table table = tableEnv
                                        .fromDataStream(dataStream, $("category"), $("timestamp").rowtime(), $("money"));
                                table
                                        .window(Slide.over(lit(10).second())
                                                .every(lit(5).second())
                                                .on($("timestamp"))
                                                .as("w"))  // 定义滚动窗口并给窗口起一个别名
                                        .groupBy($("category"), 
                                                 $("w")) // 窗口必须出现的分组字段中
                                        .select($("category"), 
                                                $("w").start().as("window_start"), 
                                                $("w").end().as("window_end"), 
                                                $("money").sum().as("total_money"))
                                        .execute()
                                        .print();
                                env.execute();
                            }
                            @Data
                            @AllArgsConstructor
                            @NoArgsConstructor
                            public static class OrderInfo {
                                private String category;
                                private Long timestamp;
                                private Double money;
                            }
                        }
                        

                        5.1.3 会话窗口

                        Session 类方法:

                        • withGap:会话时间间隔
                        • on:用来分组(按时间间隔)或者排序(按行数)的时间字段
                        • as:别名,必须出现在后面的groupBy中

                          例子:两次的时间间隔超过6秒的基础上,没有新的订单事件这个窗口就会关闭,然后处理这个窗口区间内所产生的订单数据计算

                          public class GroupWindowsTableApiTumbleExample {
                              public static void main(String[] args) throws Exception {
                                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                                  env.setParallelism(1);
                                  SingleOutputStreamOperator dataStream = env
                                          .fromElements(
                                                  new OrderInfo("电脑", 1000L, 100D),
                                                  new OrderInfo("手机", 2000L, 200D),
                                                  new OrderInfo("电脑", 3000L, 300D),
                                                  new OrderInfo("手机", 4000L, 400D),
                                                  new OrderInfo("手机", 5000L, 500D),
                                                  new OrderInfo("电脑", 6000L, 600D))
                                          .assignTimestampsAndWatermarks(
                                                  WatermarkStrategy
                                                          .forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                                          .withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
                                          );
                                  StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
                                  Table table = tableEnv
                                          .fromDataStream(dataStream, $("category"), $("timestamp").rowtime(), $("money"));
                                  table
                                          .window(Session.withGap(lit(6).second())
                                                  .on($("timestamp"))
                                                  .as("w"))  // 定义滚动窗口并给窗口起一个别名
                                          .groupBy($("category"), 
                                                   $("w")) // 窗口必须出现的分组字段中
                                          .select($("category"), 
                                                  $("w").start().as("window_start"), 
                                                  $("w").end().as("window_end"), 
                                                  $("money").sum().as("total_money"))
                                          .execute()
                                          .print();
                                  env.execute();
                              }
                              @Data
                              @AllArgsConstructor
                              @NoArgsConstructor
                              public static class OrderInfo {
                                  private String category;
                                  private Long timestamp;
                                  private Double money;
                              }
                          }
                          


0
收藏0
文章版权声明:除非注明,否则均为VPS857原创文章,转载或复制请以超链接形式并注明出处。

相关阅读

  • 【研发日记】Matlab/Simulink自动生成代码(二)——五种选择结构实现方法,Matlab/Simulink自动生成代码的五种选择结构实现方法(二),Matlab/Simulink自动生成代码的五种选择结构实现方法详解(二)
  • 超级好用的C++实用库之跨平台实用方法,跨平台实用方法的C++实用库超好用指南,C++跨平台实用库使用指南,超好用实用方法集合,C++跨平台实用库超好用指南,方法与技巧集合
  • 【动态规划】斐波那契数列模型(C++),斐波那契数列模型(C++实现与动态规划解析),斐波那契数列模型解析与C++实现(动态规划)
  • 【C++】,string类底层的模拟实现,C++中string类的模拟底层实现探究
  • uniapp 小程序实现微信授权登录(前端和后端),Uniapp小程序实现微信授权登录全流程(前端后端全攻略),Uniapp小程序微信授权登录全流程攻略,前端后端全指南
  • Vue脚手架的安装(保姆级教程),Vue脚手架保姆级安装教程,Vue脚手架保姆级安装指南,Vue脚手架保姆级安装指南,从零开始教你如何安装Vue脚手架
  • 如何在树莓派 Raspberry Pi中本地部署一个web站点并实现无公网IP远程访问,树莓派上本地部署Web站点及无公网IP远程访问指南,树莓派部署Web站点及无公网IP远程访问指南,本地部署与远程访问实践,树莓派部署Web站点及无公网IP远程访问实践指南,树莓派部署Web站点及无公网IP远程访问实践指南,本地部署与远程访问详解,树莓派部署Web站点及无公网IP远程访问实践详解,本地部署与远程访问指南,树莓派部署Web站点及无公网IP远程访问实践详解,本地部署与远程访问指南。
  • vue2技术栈实现AI问答机器人功能(流式与非流式两种接口方法),Vue2技术栈实现AI问答机器人功能,流式与非流式接口方法探究,Vue2技术栈实现AI问答机器人功能,流式与非流式接口方法详解
  • 发表评论

    快捷回复:表情:
    评论列表 (暂无评论,0人围观)

    还没有评论,来说两句吧...

    目录[+]

    取消
    微信二维码
    微信二维码
    支付宝二维码