一、背景
最近有一个需求是:要求有一个类对象为Order,它有string类型的字段orderNo和Long类型的字段cost,生产者写到kafka的value是Order对象序列化后的字节数组、key值是orderNo字段,要求spark以手动提交的方式消费kafka,并将数据依次写入到hive表中,并且spark中有一个5分钟滑动窗口,滑动步长为1分钟,统计5分钟内的cost总值并输出。
然后实现代码里用到了reduceByKeyAndWindow的窗口操作,毕竟配合了foreachRDD进行了操作,所以对这一块做了一个研究。代码如下:
1 package com.example; 2 3 import org.apache.kafka.clients.consumer.ConsumerConfig; 4 import org.apache.kafka.common.serialization.ByteArrayDeserializer; 5 import org.apache.kafka.common.serialization.StringDeserializer; 6 import org.apache.spark.SparkConf; 7 import org.apache.spark.api.java.JavaRDD; 8 import org.apache.spark.api.java.JavaSparkContext; 9 import org.apache.spark.api.java.function.Function; 10 import org.apache.spark.sql.*; 11 import org.apache.spark.streaming.*; 12 import org.apache.spark.streaming.api.java.*; 13 import org.apache.spark.streaming.kafka010.*; 14 15 import java.io.ByteArrayInputStream; 16 import java.io.ObjectInputStream; 17 import java.sql.Timestamp; 18 import java.util.*; 19 import java.util.stream.Collectors; 20 21 import scala.Tuple2; 22 23 public class SparkKafkaToHive { 24 public static Order deserialize(byte[] bytes) throws Exception { 25 if (bytes == null) return null; 26 ByteArrayInputStream bis = new ByteArrayInputStream(bytes); 27 ObjectInputStream ois = new ObjectInputStream(bis); 28 Object o = ois.readObject(); 29 return (Order) o; 30 } 31 32 // SparkSession 单例帮助类(在 foreachRDD 中复用 SparkSession) 33 public static class JavaSparkSessionSingleton { 34 private static transient SparkSession instance = null; 35 36 public static SparkSession getInstance(SparkConf conf) { 37 if (instance == null) { 38 synchronized (JavaSparkSessionSingleton.class) { 39 if (instance == null) { 40 instance = SparkSession.builder() 41 .config(conf) 42 .enableHiveSupport() 43 .getOrCreate(); 44 } 45 } 46 } 47 return instance; 48 } 49 } 50 51 public static void main(String[] args) throws Exception { 52 String brokers = "localhost:9092"; 53 String topic = "orders-topic"; 54 String groupId = "spark-orders-consumer-group"; 55 56 // SparkConf & StreamingContext(微批间隔:1 分钟,符合滑动步长) 57 SparkConf sparkConf = new SparkConf() 58 .setAppName("SparkKafkaToHiveDemo") 59 .setIfMissing("spark.master", "local[2]") // 本地测试时使用 60 .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); 61 62 // 这里设置 1 分钟 batch interval(因为滑动步长是 1 分钟) 63 Duration batchInterval = Durations.minutes(1); 64 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, batchInterval); 65 66 // Kafka params 67 Map<String, Object> kafkaParams = new HashMap<>(); 68 kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); 69 kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 70 kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); 71 kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); 72 kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); 73 // 禁用自动提交,由我们手动提交 74 kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 75 76 Collection<String> topics = Collections.singletonList(topic); 77 78 // Create Direct Stream 79 JavaInputDStream<org.apache.kafka.clients.consumer.ConsumerRecord<String, byte[]>> stream = 80 KafkaUtils.createDirectStream( 81 jssc, 82 LocationStrategies.PreferConsistent(), 83 ConsumerStrategies.<String, byte[]>Subscribe(topics, kafkaParams) 84 ); 85 86 // --- 1) 将每个 micro-batch 的所有订单写入 Hive --- 87 stream.foreachRDD((rdd, time) -> { 88 if (rdd.isEmpty()) { 89 // 没有数据也应该尝试提交 offsets?这里跳过提交以示例简单(可视需求决定) 90 return; 91 } 92 93 // 获取 offset ranges 94 OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); 95 96 // 反序列化为 Order 对象 JavaRDD<Order> 97 JavaRDD<Order> ordersRDD = rdd.map(record -> { 98 try { 99 return deserialize(record.value()); 100 } catch (Exception e) { 101 // 处理反序列化异常(记录日志并丢弃) 102 e.printStackTrace(); 103 return null; 104 } 105 }).filter(Objects::nonNull); 106 107 // 写入 Hive 108 if (!ordersRDD.isEmpty()) { 109 // 获取(或创建)SparkSession 110 SparkSession spark = JavaSparkSessionSingleton.getInstance(sparkConf); 111 Dataset<Row> df = spark.createDataFrame(ordersRDD, Order.class); 112 // 写入 Hive 表(append) 113 df.write().mode(SaveMode.Append).insertInto("demo_db.orders"); 114 } 115 116 // 处理完业务后,提交 offsets 到 Kafka(手动提交) 117 // 需要对 stream 进行类型转换以调用 commitAsync 118 try { 119 ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges, (offsets, exception) -> { 120 if (exception != null) { 121 System.err.println("CommitAsync failed: " + exception.getMessage()); 122 exception.printStackTrace(); 123 } else { 124 System.out.println("Committed offsets: " + Arrays.toString(offsetRanges)); 125 } 126 }); 127 } catch (Exception commitEx) { 128 commitEx.printStackTrace(); 129 } 130 }); 131 132 // --- 2) 窗口聚合:5 分钟窗口,1 分钟滑动,统计 cost 总和,并写入 Hive --- 133 // 先将 stream 转成 PairDStream<"all", cost> 134 JavaPairDStream<String, Long> costPairs = stream.mapToPair(record -> { 135 try { 136 Order o = deserialize(record.value()); 137 if (o != null && o.getCost() != null) { 138 return new Tuple2<>("all", o.getCost()); 139 } else { 140 return new Tuple2<>("all", 0L); 141 } 142 } catch (Exception e) { 143 e.printStackTrace(); 144 return new Tuple2<>("all", 0L); 145 } 146 }); 147 148 // reduceByKeyAndWindow (windowDuration=5min, slideDuration=1min) 149 JavaPairDStream<String, Long> windowed = costPairs.reduceByKeyAndWindow( 150 (a, b) -> a + b, 151 Durations.minutes(5), 152 Durations.minutes(1) 153 ); 154 155 // 在每个窗口的 RDD 中写入 Hive(可以插入到 order_cost_window_agg) 156 windowed.foreachRDD((rdd, time) -> { 157 if (rdd.isEmpty()) return; 158 159 // time 是窗口的结束时间(即当前批次时间) 160 long windowEndMs = time.milliseconds(); 161 long windowStartMs = windowEndMs - 5 * 60 * 1000 + 1; // 包含窗口起点 162 163 List<Row> rows = rdd.map(tuple -> { 164 long total = tuple._2(); 165 return RowFactory.create(new Timestamp(windowStartMs), new Timestamp(windowEndMs), total); 166 }).collect(); 167 168 if (!rows.isEmpty()) { 169 SparkSession spark = JavaSparkSessionSingleton.getInstance(sparkConf); 170 StructType schema = new StructType(new StructField[]{ 171 new StructField("window_start", DataTypes.TimestampType, false, Metadata.empty()), 172 new StructField("window_end", DataTypes.TimestampType, false, Metadata.empty()), 173 new StructField("total_cost", DataTypes.LongType, false, Metadata.empty()) 174 }); 175 Dataset<Row> df = spark.createDataFrame(rows, schema); 176 // 写入 Hive 聚合表 177 df.write().mode(SaveMode.Append).insertInto("demo_db.order_cost_window_agg"); 178 179 // 同时打印到控制台 180 df.show(false); 181 } 182 }); 183 184 // 启动流 185 jssc.start(); 186 jssc.awaitTermination(); 187 } 188 }
其中通过reduceByKeyAndWindow方法,使用了窗口大小为5分钟,滑动步长为1分钟的滑动窗口,并将窗口大小内的多个批次的RDD(每分钟从kafka拉一批数据,就是一个批次)汇总成一个窗口DStream(一个滑动窗口对应一个RDD,RDD包含该滑动窗口的所有数据),并进行了reduce操作,而最后通过foreachRDD对每个滑动窗口(一个滑动窗口对应一个RDD)聚合后的数据写入到hive表中,触发写入的时机为每个滑动窗口的结束时间边界。
二、原理解析
在 window()
操作中,foreachRDD()
会处理每个滑动窗口内的数据,并且每个滑动窗口只对应一个RDD,这个RDD包含该滑动窗口内的所有数据,包含了所有小批次的RDD数据(比如上面1分钟读一次kafka,那么一分钟kafka就是一个批次)的聚合
window()
和 foreachRDD()
解释
-
window()
操作:-
window()
用于将一个时间范围内的数据分成窗口,并且每个窗口的 时间区间是独立的。例如,假设你使用了 5 分钟的窗口和 1 分钟的滑动步长,每个窗口都是独立的时间段,不会跨越多个窗口。 -
这个操作会创建多个 时间窗口(滑动窗口),而每个窗口的聚合结果通常会被传递到 foreachRDD() 中。
-
-
foreachRDD()
执行次数:-
对于时间窗口(滑动窗口),
foreachRDD()
会被调用一次。滑动窗口中的数据是当前窗口的所有数据。每个窗口的聚合结果会作为一个独立的 RDD 传递给foreachRDD
。 -
所以每个滑动窗口的数据聚合 只会在对应的窗口内 计算一次。
-
例子:
假设你有一个 5 分钟的窗口,1 分钟滑动步长,数据从 10:00 到 10:10(包括10:10)。你使用 window()
对数据进行分组操作,类似这样:
JavaDStream<Tuple2<String, Integer>> windowedStream = dstream.window(new Duration(5 * 60 * 1000), new Duration(1 * 60 * 1000)).reduceByKey((x, y) -> x + y);
对于上面的代码:
-
每个窗口会处理 5 分钟的时间段。
-
如果你以 1 分钟的滑动步长处理数据,那么你会有如下的窗口区间:
-
10:00 到 10:05
-
10:01 到 10:06
-
10:02 到 10:07
-
10:03 到 10:08
-
10:04 到 10:09
-
10:05 到 10:10
-
每个窗口内的数据会被独立地处理,并且在 foreachRDD()
中输出。
foreachRDD()
执行的次数
-
对于每个时间窗口(滑动窗口),
foreachRDD()
会被调用一次,每次传递一个包含该时间段数据的 RDD。 -
窗口会按时间滑动,并且每个窗口的数据会生成一个独立的 RDD,并在
foreachRDD()
中执行。 -
foreachRDD()
的执行次数等于窗口的数量,具体由窗口的大小和滑动步长决定。
示例:
假设你有以下数据流:
10:00, ORD1, 100 10:01, ORD2, 150 10:02, ORD3, 200 10:03, ORD4, 250 10:04, ORD5, 300 10:05, ORD6, 350 10:06, ORD7, 400 10:07, ORD8, 450 10:08, ORD9, 500 10:09, ORD10, 550 10:10, ORD11, 600
如果你使用了一个 5 分钟窗口,1 分钟滑动步长:
window()
和 foreachRDD()
结果:
-
第一个窗口(10:00-10:05):
-
数据:ORD1, ORD2, ORD3, ORD4, ORD5
-
聚合结果:
ORD1 + ORD2 + ORD3 + ORD4 + ORD5
= 100 + 150 + 200 + 250 + 300 = 1000 -
结果会在 10:05 输出
-
-
第二个窗口(10:01-10:06):
-
数据:ORD2, ORD3, ORD4, ORD5, ORD6
-
聚合结果:
ORD2 + ORD3 + ORD4 + ORD5 + ORD6
= 150 + 200 + 250 + 300 + 350 = 1250 -
结果会在 10:06 输出
-
-
第三个窗口(10:02-10:07):
-
数据:ORD3, ORD4, ORD5, ORD6, ORD7
-
聚合结果:
ORD3 + ORD4 + ORD5 + ORD6 + ORD7
= 200 + 250 + 300 + 350 + 400 = 1500 -
结果会在 10:07 输出
-
-
第四个窗口(10:03-10:08):
-
数据:ORD4, ORD5, ORD6, ORD7, ORD8
-
聚合结果:
ORD4 + ORD5 + ORD6 + ORD7 + ORD8
= 250 + 300 + 350 + 400 + 450 = 1750 -
结果会在 10:08 输出
-
输出示例:
关键点总结:
-
window()
会根据时间区间分割数据,每个窗口处理一次foreachRDD()
。 -
每个批次的数据是基于时间段的,因此每个窗口的输出结果是独立的。
-
foreachRDD()
会在每个窗口对应的批次内执行一次,而每个批次只会处理一个窗口的数据,并不会汇总多个窗口的数据。
参考:DStream 中有几个RDD ?
转发请注明出处:https://www.cnblogs.com/fnlingnzb-learner/p/19076296