一.背景
当我们统计用户点击的时候,有时候会因为各种情况数据延迟,我们需要一个允许最大的延迟范围进行统计。这里的延迟统计分为两种:
模拟初始数据:早上10:10:00 用户点击了一次,但是延迟到10:10:05 才发送过来,允许最大延迟5秒, 5秒窗口统计。我们希望还是能统计到
二.基本代码
@Data public class UserTimeInfo implements Serializable { private String userId; /** 实际时间-偏移量 偏移后的时间*/ private Timestamp pTime; public UserTimeInfo() { } public UserTimeInfo(String userId, Timestamp pTime) { this.userId = userId; this.pTime = pTime; } }
public class UserTimeSource implements SourceFunction<UserTimeInfo> { /** * 为了id 统计方便,我们只留一个id */ static String[] userIds = {"id->"}; Random random = new Random(); /** * 模拟发送20次 */ int times = 20; @Override public void run(SourceContext sc) throws Exception { while (true) { TimeUnit.SECONDS.sleep(1); int m = (int) (System.currentTimeMillis() % userIds.length); // 随机延迟几秒 int defTime = random.nextInt(5); // 发送时间 DateTime dateTime = new DateTime(); // 计算延迟后的时间,并且打印时间 DateTime dateTimePrint = dateTime.plusSeconds(-defTime); System.out.println("实际时间:" + print(dateTime) + ",延迟:" + defTime + ":-->" + print(dateTimePrint)); // 发送延迟时间 dateTime = dateTime.plusSeconds(-defTime); sc.collect(new UserTimeInfo(userIds[m], new Timestamp(dateTime.getMillis()))); // 只持续固定时间方便观察 if (--times == 0) { break; } } } @Override public void cancel() { System.out.println("cancel to do ..."); } private static String print(DateTime dateTime) { return dateTime.toString("yyyy-MM-dd hh:mm:ss"); } }
三.定义我们的两种watermark
a. 基于系统时间
/** * 这里逻辑,模拟按系统时间进行统计 * 所有数据和系统自身时间有关 */ public class UserTimeWaterMarkBySystem implements AssignerWithPeriodicWatermarks<UserTimeInfo> { /** * 默认允许 5秒延迟 */ long maxDelayTime = 5000; /** * 该时间由于基于系统时间来做, * 如果10:00 11:10 秒用户点击的数据,然后延迟,实际收到的时间是10.00 11:15 * a.根据系统时间 想减,小于5秒就会统计到 * b.注意,如果程序挂了,12点重启消费这个数据,就统计不到了 * @return */ @Nullable @Override public Watermark getCurrentWatermark() { return new Watermark(System.currentTimeMillis() - maxDelayTime); } @Override public long extractTimestamp(UserTimeInfo element, long previousElementTimestamp) { long timestamp = element.getPTime().getTime(); return timestamp; } }
b.根据数据自生时间进行做延迟判断
public class UserTimeWaterMarkByRowTime implements AssignerWithPeriodicWatermarks<UserTimeInfo> { /** * 默认允许 5秒延迟 */ long maxDelayTime = 5000; /** * 该时间由于基于数据时间来做, * 如果10:00 11:10 秒用户点击的数据,然后延迟,实际收到的时间是10.00 11:15 * a.根据系统时间 想减,小于5秒就会统计到 * b.只要消息 时间延迟小于5 就能被统计。 * 这种对点击事件来说,更符合要求 * @return */ private long currentMaxTimestamp; @Nullable @Override public Watermark getCurrentWatermark() { return new Watermark(currentMaxTimestamp - maxDelayTime); } @Override public long extractTimestamp(UserTimeInfo element, long previousElementTimestamp) { long timestamp = element.getPTime().getTime(); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } }
四.source 类,和以前一样
public class UserTimeWaterMarkApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<UserTimeInfo> userInfoDataStream = env.addSource(new UserTimeSource()); // UserTimeWaterMarkByRowTime 这个时间可以替换 DataStream<UserTimeInfo> timedData = userInfoDataStream.assignTimestampsAndWatermarks(new UserTimeWaterMarkByRowTime()); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); tableEnv.registerDataStream("test", timedData, "userId,pTime.rowtime"); Table result = tableEnv.sqlQuery("SELECT userId,TUMBLE_END(pTime, INTERVAL '5' SECOND) as pTime,count(1) as cnt FROM test" + " GROUP BY TUMBLE(pTime, INTERVAL '5' SECOND),userId "); // deal with (Tuple2<Boolean, Row> value) -> out.collect(row) SingleOutputStreamOperator allClick = tableEnv.toRetractStream(result, Row.class) .flatMap((Tuple2<Boolean, Row> value, Collector<Row> out) -> { out.collect(value.f1); }).returns(Row.class); // add sink or print allClick.print(); env.execute("test"); } }
public class UserTimeWaterMarkApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<UserTimeInfo> userInfoDataStream = env.addSource(new UserTimeSource()); DataStream<UserTimeInfo> timedData = userInfoDataStream.assignTimestampsAndWatermarks(new UserTimeWaterMarkByRowTime()); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); tableEnv.registerDataStream("test", timedData, "userId,pTime.rowtime"); Table result = tableEnv.sqlQuery("SELECT userId,TUMBLE_END(pTime, INTERVAL '5' SECOND) as pTime,count(1) as cnt FROM test" + " GROUP BY TUMBLE(pTime, INTERVAL '5' SECOND),userId "); // deal with (Tuple2<Boolean, Row> value) -> out.collect(row) SingleOutputStreamOperator allClick = tableEnv.toRetractStream(result, Row.class) .flatMap((Tuple2<Boolean, Row> value, Collector<Row> out) -> { out.collect(value.f1); }).returns(Row.class); // add sink or print allClick.print(); env.execute("test"); }
小结:
1.这个是基于flink 1.7 跑的
2.代码比较简单,也好理解,有问题直接私信我
相关推荐
本课程以FlinkSQL流批一体技术为主线,全面讲解Flink Table编程、SQL编程、Time与WaterMark、Window操作、函数使用、元数据管理,最后以一个完整的实战项目为例,详细讲解FlinkSQL的流式项目开发。学完本课程,希望...
文章会对Flink中基本API如:DataSet、DataStream、Table、Sql和常用特性如:Time&Window、窗口函数、Watermark、触发器、分布式缓存、异步IO、侧输出、广播和高级应用如:ProcessFunction、状态管理等知识点进行整理...
第七章 EventTime-Watermark(难点) 175 第八章 Flink的状态管理 200 第九章 Flink的容错 226 第1种:全局调整 235 第2种:单任务调整 235 第十章 flink 扩展知识 261 第十一章 flink-SQL开发 277 第十二章 总结 292
使用WaterMark技术实现了窗口计算中延迟数据的处理,同时对流式计算的窗口时间定义分类:处理时间,摄取时间,事件时间本人觉得flink的这些特性一定程序上可以窥探出大数据的未来方向,所以花了些时间来阅读源码,先...
内容包含(Flink简介-Flink编程模型-重要概念-Task划分-共享资源槽-Flink的时间-Flink的Window-Flink的WaterMark-重启策略)等内容
7.Flink watermark与侧道输出 8.Flink状态计算 9.Flink容错checkpoint与一致性语义 10.Flink进阶 异步IO,背压,内存管理 11.Flink Table API与SQL 课程目录介绍 第一章 Flink简介 01.Flink的引入 02.什么是Flink 03...
Thirdly, watermark needs to be supported too. 但是,如果不更改源代码,它看起来并不容易。 我想写我自己的redis-sink。 当您查看此项目时,有一些技巧需要记住: 该项目仅支持REDIS CLUSTER模式。 该项目使用...
flink
第一章 Flink简介 第二章 快速上手 第三章 Flink部署 第四章 Flink运行架构 ...第七章 时间语义与Watermark 第八章 ProcessFunction API(底层API) 第九章 状态编程与容错机制 第十章 Table API 与 SQL
flink全站式内容纲要,针对于flink的内容,学习思路,Flink保证ExactlyOnce,Flink的WaterMark,Flink侧流输出 等
在后半程的课程中,讲师又对Flink中的Watermark、Flink的CheckPoint案例及Flink底层技术进行了探讨,在课程收尾的核心部分进行了全面的优化技术指南教学,包括了网络、内存、监控等方面的优化技术,让Flink技术更加...
如果要使用EventTime,那么需要引入EventTime的时间属性,引入方式如下所示:我们知道,流处理从事件产生,到流经source,再到operator,
2、支持事件时间(Event Time)概念,结合Watermark处理乱序数据 3、支持有状态计算,并且支持多种状态 内存、文件、RocksDB 4、支持高度灵活的窗口(Window)操作 time、count、session 5、基于轻量级分布式快照...
kafka的多分区watermark机制,这个是在工作中使用的kafka的watermark机制,调试代码,感兴趣可以一观。
Flink官网关于水印机制的解释文档
1.异步IO 2.Join 3.分区 4.Sideoutput 5.sink 6.source 7.transform 8.types 9.watermark 10.windowing
20210320添加Flink窗口和Trigger相关的Demo 20210319添加Flink watermark相关演示20210316添加Flink sink相关演示:kafka / redis / es / mysql 20210313添加netty的demo 20210308添加HBase操作相关的Demo 20210303...
谷歌大神对流式计算的精彩解析。将batch processing看做是stream processing的一个子集,对event time、watermark、trigger等概念的讲解绝对让你对流计算有一个全新的认识。