一.背景
有时候我们需要过滤数据,有些中间数据是不需要的,比如场景:
binlog 数据更新的时候,我们仅仅需要最新数据。会根据ID 分组,然后取version 最大的一条,存储
二.简单实例
@Data @ToString public class Order { // 主键id private Integer id; // 版本 private Integer version; private Timestamp mdTime; public Order(int id, Integer version) { this.id = id; this.version = version; this.mdTime = new Timestamp(System.currentTimeMillis()); } public Order() { } }
public class OrderSource implements SourceFunction<Order> { Random random = new Random(); @Override public void run(SourceContext<Order> ctx) throws Exception { while (true) { TimeUnit.MILLISECONDS.sleep(100); // 为了区分,我们简单生0~2的id, 和版本0~99 int id = random.nextInt(3); Order o = new Order(id, random.nextInt(100)); ctx.collect(o); } } @Override public void cancel() { } }
public class ReduceApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<Order> userInfoDataStream = env.addSource(new OrderSource()); DataStream<Order> timedData = userInfoDataStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Order>() { @Override public long extractAscendingTimestamp(Order element) { return element.getMdTime().getTime(); } }); SingleOutputStreamOperator<Order> reduce = timedData .keyBy("id") .timeWindow(Time.seconds(10), Time.seconds(5)) .reduce((ReduceFunction<Order>) (v1, v2) -> v1.getVersion() >= v2.getVersion() ? v1 : v2); reduce.print(); env.execute("test"); } }
结果:
Order(id=2, version=97, mdTime=2019-03-11 17:39:34.052)
Order(id=0, version=99, mdTime=2019-03-11 17:39:32.913)
Order(id=1, version=96, mdTime=2019-03-11 17:39:34.155)
Order(id=2, version=97, mdTime=2019-03-11 17:39:34.052)
Order(id=1, version=96, mdTime=2019-03-11 17:39:34.155)
Order(id=0, version=99, mdTime=2019-03-11 17:39:32.913)
这个会对同一个窗口做过滤,比如同步到另一个mysql,hdfs,就能减少数据量
相关推荐
f-2.springboot构建上报服务,f-23.flink分析之hadoop环境搭建,f-28.flink分析之封装dao层结合hbase进行判重代码编写,f-39.flink分析之频道新鲜度map逻辑代码编写,f-40.flink分析之频道新鲜度reduce逻辑代码编写...
flink demo应用 讲述flatmap reducemap datastream等等
谈及Hadoop大家自然不会对 MapReduce感到陌生,它将计算分为两个阶段,分别为 Map 和 Reduce。MapReduce计算框架虽然借鉴了函数式编程和矢量编程的思想完成了分布式计算。但不得不承认MapReduce在矢量编程结构过于...
构建并优化树后,引擎会将其编译为由隐式并行运算符(例如 Map、Reduce、Match、CoGroup 或 Cross)组成的 PACT 计划。 结果提供给 Apache Flink 平台,该平台负责 PACT 计划优化及其并行执行,例如通过 HDFS 或...
开源项目-chrislusf-glow.zip,Glow is an easy-to-use distributed computation system written in Go, similar to Hadoop Map Reduce, Spark, Flink, Samza, etc. Currently just started and not feature rich yet...
针对以下概念设计Map Reduce Java程序: 问题1:计数和筛选数据:计算的实体数问题2:过滤复杂数据:使用公司地址作为过滤列列出公司ID 问题3:计算出的每个企业ID的平均评分,并列出前10名问题4:减少侧加入和工作...
在MapReduce框架,Shuffle是连接Map和Reduce之间的桥梁,Map阶段通过shuffle读取数据并输出到对应的Reduce,而Reduce阶段负责从Map端拉取数据并进行计算。在整个shuffle过程中,往往伴随着大量的磁盘和网络I/O。所以...
NimData受到Pandas / Spark / Flink / Thrill等框架的启发,位于Pandas与Spark / Flink / Thrill一方之间。 与Pandas相似,NimData当前是非分布式的,但共享Spark / Flink / Thrill的类型安全的惰性API。 多亏了Nim...
流式计算主要针对unboundeddata(无界数据流)...2003年,Google的MapReduce横空出世,通过经典的Map&Reduce定义和系统容错等保障来方便处理各种大数据。很快就到了Hadoop,被认为是开源版的MapReduce,带动了整个ap
MapReduce,Map函数和Reduce函数,编程容易,屏蔽底层分布式并⾏编程细节。采⽤分⽽治之思想,并⾮所有任务都可以分⽽治之。 YARN实现⼀个集群多个框架,例如⼀千台机器,同时部署了三个框架(MapReduce、Storm、...
早期的处理模型(Map/Reduce)早已经力不从心,而且也很难应用到处理流程长且复杂的数据流水线上。另外,近年来涌现出诸多大数据应用组件,如HBase、Hive、Kafka、Spark、Flink等。开发者经常要用到不同的技术、框架、...
下⽂将介绍这些框架: · 仅批处理框架: Apache Hadoop · 仅流处理框架: Apache Storm Apache Samza · 混合框架: Apache Spark Apache Flink ⼤数据处理框架是什么? ⼤数据处理框架是什么? 处理框架和处理...
菲特兰 Featran,也称为Featran77或F77(明白吗?),是一个用于功能转换的Scala库。 它旨在简化在数据科学和机器学习过程中进行要素工程的...我们可以使用reduce和map以幼稚的方式实现它。 case class Point ( score