当前位置: 首页 > news >正文

Spark streaming的窗口操作(window、reduceByWindow等)和foreachRDD结合

一、背景

最近有一个需求是:要求有一个类对象为Order,它有string类型的字段orderNo和Long类型的字段cost,生产者写到kafka的value是Order对象序列化后的字节数组、key值是orderNo字段,要求spark以手动提交的方式消费kafka,并将数据依次写入到hive表中,并且spark中有一个5分钟滑动窗口,滑动步长为1分钟,统计5分钟内的cost总值并输出。

然后实现代码里用到了reduceByKeyAndWindow的窗口操作,毕竟配合了foreachRDD进行了操作,所以对这一块做了一个研究。代码如下:

  1 package com.example;
  2 
  3 import org.apache.kafka.clients.consumer.ConsumerConfig;
  4 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
  5 import org.apache.kafka.common.serialization.StringDeserializer;
  6 import org.apache.spark.SparkConf;
  7 import org.apache.spark.api.java.JavaRDD;
  8 import org.apache.spark.api.java.JavaSparkContext;
  9 import org.apache.spark.api.java.function.Function;
 10 import org.apache.spark.sql.*;
 11 import org.apache.spark.streaming.*;
 12 import org.apache.spark.streaming.api.java.*;
 13 import org.apache.spark.streaming.kafka010.*;
 14 
 15 import java.io.ByteArrayInputStream;
 16 import java.io.ObjectInputStream;
 17 import java.sql.Timestamp;
 18 import java.util.*;
 19 import java.util.stream.Collectors;
 20 
 21 import scala.Tuple2;
 22 
 23 public class SparkKafkaToHive {
 24     public static Order deserialize(byte[] bytes) throws Exception {
 25         if (bytes == null) return null;
 26         ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
 27         ObjectInputStream ois = new ObjectInputStream(bis);
 28         Object o = ois.readObject();
 29         return (Order) o;
 30     }
 31 
 32     // SparkSession 单例帮助类(在 foreachRDD 中复用 SparkSession)
 33     public static class JavaSparkSessionSingleton {
 34         private static transient SparkSession instance = null;
 35 
 36         public static SparkSession getInstance(SparkConf conf) {
 37             if (instance == null) {
 38                 synchronized (JavaSparkSessionSingleton.class) {
 39                     if (instance == null) {
 40                         instance = SparkSession.builder()
 41                                 .config(conf)
 42                                 .enableHiveSupport()
 43                                 .getOrCreate();
 44                     }
 45                 }
 46             }
 47             return instance;
 48         }
 49     }
 50 
 51     public static void main(String[] args) throws Exception {
 52         String brokers = "localhost:9092";
 53         String topic = "orders-topic";
 54         String groupId = "spark-orders-consumer-group";
 55 
 56         // SparkConf & StreamingContext(微批间隔:1 分钟,符合滑动步长)
 57         SparkConf sparkConf = new SparkConf()
 58                 .setAppName("SparkKafkaToHiveDemo")
 59                 .setIfMissing("spark.master", "local[2]") // 本地测试时使用
 60                 .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
 61 
 62         // 这里设置 1 分钟 batch interval(因为滑动步长是 1 分钟)
 63         Duration batchInterval = Durations.minutes(1);
 64         JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, batchInterval);
 65 
 66         // Kafka params
 67         Map<String, Object> kafkaParams = new HashMap<>();
 68         kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
 69         kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 70         kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
 71         kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
 72         kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
 73         // 禁用自动提交,由我们手动提交
 74         kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 75 
 76         Collection<String> topics = Collections.singletonList(topic);
 77 
 78         // Create Direct Stream
 79         JavaInputDStream<org.apache.kafka.clients.consumer.ConsumerRecord<String, byte[]>> stream =
 80                 KafkaUtils.createDirectStream(
 81                         jssc,
 82                         LocationStrategies.PreferConsistent(),
 83                         ConsumerStrategies.<String, byte[]>Subscribe(topics, kafkaParams)
 84                 );
 85 
 86         // --- 1) 将每个 micro-batch 的所有订单写入 Hive ---
 87         stream.foreachRDD((rdd, time) -> {
 88             if (rdd.isEmpty()) {
 89                 // 没有数据也应该尝试提交 offsets?这里跳过提交以示例简单(可视需求决定)
 90                 return;
 91             }
 92 
 93             // 获取 offset ranges
 94             OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
 95 
 96             // 反序列化为 Order 对象 JavaRDD<Order>
 97             JavaRDD<Order> ordersRDD = rdd.map(record -> {
 98                 try {
 99                     return deserialize(record.value());
100                 } catch (Exception e) {
101                     // 处理反序列化异常(记录日志并丢弃)
102                     e.printStackTrace();
103                     return null;
104                 }
105             }).filter(Objects::nonNull);
106 
107             // 写入 Hive
108             if (!ordersRDD.isEmpty()) {
109                 // 获取(或创建)SparkSession
110                 SparkSession spark = JavaSparkSessionSingleton.getInstance(sparkConf);
111                 Dataset<Row> df = spark.createDataFrame(ordersRDD, Order.class);
112                 // 写入 Hive 表(append)
113                 df.write().mode(SaveMode.Append).insertInto("demo_db.orders");
114             }
115 
116             // 处理完业务后,提交 offsets 到 Kafka(手动提交)
117             // 需要对 stream 进行类型转换以调用 commitAsync
118             try {
119                 ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges, (offsets, exception) -> {
120                     if (exception != null) {
121                         System.err.println("CommitAsync failed: " + exception.getMessage());
122                         exception.printStackTrace();
123                     } else {
124                         System.out.println("Committed offsets: " + Arrays.toString(offsetRanges));
125                     }
126                 });
127             } catch (Exception commitEx) {
128                 commitEx.printStackTrace();
129             }
130         });
131 
132         // --- 2) 窗口聚合:5 分钟窗口,1 分钟滑动,统计 cost 总和,并写入 Hive ---
133         // 先将 stream 转成 PairDStream<"all", cost>
134         JavaPairDStream<String, Long> costPairs = stream.mapToPair(record -> {
135             try {
136                 Order o = deserialize(record.value());
137                 if (o != null && o.getCost() != null) {
138                     return new Tuple2<>("all", o.getCost());
139                 } else {
140                     return new Tuple2<>("all", 0L);
141                 }
142             } catch (Exception e) {
143                 e.printStackTrace();
144                 return new Tuple2<>("all", 0L);
145             }
146         });
147 
148         // reduceByKeyAndWindow (windowDuration=5min, slideDuration=1min)
149         JavaPairDStream<String, Long> windowed = costPairs.reduceByKeyAndWindow(
150                 (a, b) -> a + b,
151                 Durations.minutes(5),
152                 Durations.minutes(1)
153         );
154 
155         // 在每个窗口的 RDD 中写入 Hive(可以插入到 order_cost_window_agg)
156         windowed.foreachRDD((rdd, time) -> {
157             if (rdd.isEmpty()) return;
158 
159             // time 是窗口的结束时间(即当前批次时间)
160             long windowEndMs = time.milliseconds();
161             long windowStartMs = windowEndMs - 5 * 60 * 1000 + 1; // 包含窗口起点
162 
163             List<Row> rows = rdd.map(tuple -> {
164                 long total = tuple._2();
165                 return RowFactory.create(new Timestamp(windowStartMs), new Timestamp(windowEndMs), total);
166             }).collect();
167 
168             if (!rows.isEmpty()) {
169                 SparkSession spark = JavaSparkSessionSingleton.getInstance(sparkConf);
170                 StructType schema = new StructType(new StructField[]{
171                         new StructField("window_start", DataTypes.TimestampType, false, Metadata.empty()),
172                         new StructField("window_end", DataTypes.TimestampType, false, Metadata.empty()),
173                         new StructField("total_cost", DataTypes.LongType, false, Metadata.empty())
174                 });
175                 Dataset<Row> df = spark.createDataFrame(rows, schema);
176                 // 写入 Hive 聚合表
177                 df.write().mode(SaveMode.Append).insertInto("demo_db.order_cost_window_agg");
178 
179                 // 同时打印到控制台
180                 df.show(false);
181             }
182         });
183 
184         // 启动流
185         jssc.start();
186         jssc.awaitTermination();
187     }
188 }

其中通过reduceByKeyAndWindow方法,使用了窗口大小为5分钟,滑动步长为1分钟的滑动窗口,并将窗口大小内的多个批次的RDD(每分钟从kafka拉一批数据,就是一个批次)汇总成一个窗口DStream(一个滑动窗口对应一个RDD,RDD包含该滑动窗口的所有数据),并进行了reduce操作,而最后通过foreachRDD对每个滑动窗口(一个滑动窗口对应一个RDD)聚合后的数据写入到hive表中,触发写入的时机为每个滑动窗口的结束时间边界。

二、原理解析

 window() 操作中,foreachRDD() 会处理每个滑动窗口内的数据,并且每个滑动窗口只对应一个RDD,这个RDD包含该滑动窗口内的所有数据,包含了所有小批次的RDD数据(比如上面1分钟读一次kafka,那么一分钟kafka就是一个批次)的聚合

window() 和 foreachRDD() 解释

  1. window() 操作:

    • window() 用于将一个时间范围内的数据分成窗口,并且每个窗口的 时间区间是独立的。例如,假设你使用了 5 分钟的窗口和 1 分钟的滑动步长,每个窗口都是独立的时间段,不会跨越多个窗口。

    • 这个操作会创建多个 时间窗口(滑动窗口),而每个窗口的聚合结果通常会被传递到 foreachRDD() 中。

  2. foreachRDD() 执行次数:

    • 对于时间窗口(滑动窗口)foreachRDD() 会被调用一次。滑动窗口中的数据是当前窗口的所有数据。每个窗口的聚合结果会作为一个独立的 RDD 传递给 foreachRDD

    • 所以每个滑动窗口的数据聚合 只会在对应的窗口内 计算一次。

例子:

假设你有一个 5 分钟的窗口,1 分钟滑动步长,数据从 10:00 到 10:10(包括10:10)。你使用 window() 对数据进行分组操作,类似这样:

JavaDStream<Tuple2<String, Integer>> windowedStream = dstream.window(new Duration(5 * 60 * 1000), new Duration(1 * 60 * 1000)).reduceByKey((x, y) -> x + y);

对于上面的代码:

  • 每个窗口会处理 5 分钟的时间段。

  • 如果你以 1 分钟的滑动步长处理数据,那么你会有如下的窗口区间:

    • 10:00 到 10:05

    • 10:01 到 10:06

    • 10:02 到 10:07

    • 10:03 到 10:08

    • 10:04 到 10:09

    • 10:05 到 10:10

每个窗口内的数据会被独立地处理,并且在 foreachRDD() 中输出。

foreachRDD() 执行的次数

  • 对于每个时间窗口(滑动窗口)foreachRDD() 会被调用一次,每次传递一个包含该时间段数据的 RDD。

  • 窗口会按时间滑动,并且每个窗口的数据会生成一个独立的 RDD,并在 foreachRDD() 中执行。

  • foreachRDD() 的执行次数等于窗口的数量,具体由窗口的大小和滑动步长决定。

示例:

假设你有以下数据流:

10:00, ORD1, 100 10:01, ORD2, 150 10:02, ORD3, 200 10:03, ORD4, 250 10:04, ORD5, 300 10:05, ORD6, 350 10:06, ORD7, 400 10:07, ORD8, 450 10:08, ORD9, 500 10:09, ORD10, 550 10:10, ORD11, 600

如果你使用了一个 5 分钟窗口,1 分钟滑动步长:

window() 和 foreachRDD() 结果:

  • 第一个窗口(10:00-10:05):

    • 数据:ORD1, ORD2, ORD3, ORD4, ORD5

    • 聚合结果:ORD1 + ORD2 + ORD3 + ORD4 + ORD5 = 100 + 150 + 200 + 250 + 300 = 1000

    • 结果会在 10:05 输出

  • 第二个窗口(10:01-10:06):

    • 数据:ORD2, ORD3, ORD4, ORD5, ORD6

    • 聚合结果:ORD2 + ORD3 + ORD4 + ORD5 + ORD6 = 150 + 200 + 250 + 300 + 350 = 1250

    • 结果会在 10:06 输出

  • 第三个窗口(10:02-10:07):

    • 数据:ORD3, ORD4, ORD5, ORD6, ORD7

    • 聚合结果:ORD3 + ORD4 + ORD5 + ORD6 + ORD7 = 200 + 250 + 300 + 350 + 400 = 1500

    • 结果会在 10:07 输出

  • 第四个窗口(10:03-10:08):

    • 数据:ORD4, ORD5, ORD6, ORD7, ORD8

    • 聚合结果:ORD4 + ORD5 + ORD6 + ORD7 + ORD8 = 250 + 300 + 350 + 400 + 450 = 1750

    • 结果会在 10:08 输出

输出示例:

Time: 2025-09-05 10:05:00, Window: [2025-09-05 10:00:00, 2025-09-05 10:05:00], Total Cost: 1000 Time: 2025-09-05 10:06:00, Window: [2025-09-05 10:01:00, 2025-09-05 10:06:00], Total Cost: 1250 Time: 2025-09-05 10:07:00, Window: [2025-09-05 10:02:00, 2025-09-05 10:07:00], Total Cost: 1500 Time: 2025-09-05 10:08:00, Window: [2025-09-05 10:03:00, 2025-09-05 10:08:00], Total Cost: 1750 ...

关键点总结:

  • window() 会根据时间区间分割数据,每个窗口处理一次 foreachRDD()

  • 每个批次的数据是基于时间段的,因此每个窗口的输出结果是独立的。

  • foreachRDD() 会在每个窗口对应的批次内执行一次,而每个批次只会处理一个窗口的数据,并不会汇总多个窗口的数据。

 

参考:DStream 中有几个RDD ?

转发请注明出处:https://www.cnblogs.com/fnlingnzb-learner/p/19076296

http://www.sczhlp.com/news/73142/

相关文章:

  • 东莞企业网站建设设计太仓有专门做网站的地方吗
  • 网站模块名称巴青网站制作
  • 建设牌安全带厂家网站手机网站开发入门
  • 义乌网站建设技术托管宣传广告设计模板
  • wordpress调用目录下如何做好网站针对搜索引擎的seo
  • 不可忽略的字符串终止符
  • 怎么完整下载网站模板广告牌
  • 小公司做网站需要注意什么问题网站设计太原
  • 美丽乡村建设网站php源码wordpress xml地图
  • 做网站最多的行业南宁seo
  • 在线软件开发平台seo信息编辑招聘
  • 网站建设维护书快速整站优化
  • 网站交互主要做什么网站开发的框架
  • 代驾网站开发关于做视频网站的一些代码
  • 网站扁平化设计理念深圳做网站建设的哪家效果好又便宜
  • 带音乐网站模板优化营商环境评价
  • 网站后台添加内容网页不显示wordpress+漂亮的博客
  • 做视频网站用什么开发大连市城市建设管理局网站
  • 熊猫头表情包制作网站天津建设工程信息网电脑版登录
  • 分销网站怎么做长沙网站推广公司
  • 供求信息网站开发背景宁波外贸建站公司
  • 怎么用linux做网站服务器计算机学校全国排名
  • 微网站模板源代码dede手机网站模板
  • 网页视频怎么下载高清找人做网站排名优化
  • 广州网站建设网络推广企业网站代码怎么优化
  • 重庆建设造价信息网站精品网站建设费用 干净磐石网络
  • vs中做网站设置背景图片什么网站做装修公司广告比较好
  • 盂县在线这个网站是谁做的安康市相亲平台
  • 如何修复 iPhone 的 UI 界面突然变大,导致 iOS UI 无法正常操作使用的问题 All In One
  • 网站模板如何删除wang域名 网站