哈尔滨网站建设制作哪家好,网站app制作费用单,凤凰手机网官网,黄山旅游攻略1、概述
最近感觉上班实在是太无聊#xff0c;打算给大家分享一下Kafka的使用#xff0c;本篇文章首先给大家分享三种方式搭建Kafka环境#xff0c;接着给大家介绍kafka核心的基础概念以及Java API的使用#xff0c;最后分享一个SpringBoot的集成案例#xff0c;希望对大…1、概述
最近感觉上班实在是太无聊打算给大家分享一下Kafka的使用本篇文章首先给大家分享三种方式搭建Kafka环境接着给大家介绍kafka核心的基础概念以及Java API的使用最后分享一个SpringBoot的集成案例希望对大家有所帮助
2、环境搭建
2.1、安装包安装
关于环境搭建这块我们先来通过手动下载安装包的方式来完成首先下载安装包这是我使用的环境是CentOS7。
下载的地址
https://archive.apache.org/dist/kafka/3.7.0/kafka_2.13-3.7.0.tgz
可以使用wget 工具下载 下载完成后我们解压这里我是用的路径是 /usr/local 解压后如下图所示 来到bin路径下我们需要先启动zookeeper 然后再启动kafka相关命令如下
# 启动zookeeper
./zookeeper-server-start.sh ../config/zookeeper.properties # 启动 kafka
./kafka-server-start.sh ../config/server.properties
启动后 我们可以查看进程 kafka 已经启动成功了
2.2、Kraft 方式启动 kafka
上面我们使用zookeeper的方式运行了Kafkakafka从2.8的版本就引入了KRaft模式主要是用来取代zookeeper来管理元数据下面我们来试试使用KRaft的方式来启动kafka
# 停掉kafka
./kafka-server-stop.sh ../config/server.properties# 停掉zookeeper
./zookeeper-server-stop.sh ../config/zookeeper.properties# 生成Cluster UUID
./kafka-storage.sh random-uuid 这里我们能需要记录下 生成的这个uuid值
# 格式化日志目录
./kafka-storage.sh format -t y__hXaR2QVKaehk5FiNLfQ -c ../config/kraft/server.properties接着启动kafka
# 启动Kafka
./kafka-server-start.sh ../config/kraft/server.properties 2.3、 Docker 安装
相信大家一定喜欢这种方式直接使用 Docker 一把梭哈
# 拉取Kafka镜像
docker pull apache/kafka:3.7.0# 启动Kafka容器
docker run -p 9092:9092 apache/kafka:3.7.0# 复制出来一份配置文件
docker cp 7434ce960297:/etc/kafka/docker/server.properties /opt/docker/kafka然后我们需要修改配置文件 修改完成后我们使用以下命令启动kafka
docker run -d \
-v /opt/docker/kafka:/mnt/shared/config \
-v /opt/data/kafka-data:/mnt/kafka-data \
-p 9092:9092 \
--name kafka-container \
apache/kafka:3.7.0
完成之后我们可以使用客户端工具连接试试 至此我们的环境搭建完成了。
3、基础概念
3.1、Topic event 简述
在kafka中 消息(event) 被组织并持久地存储在Topic中。非常简单地说Topic 类似于文件系统中的一个文件夹而事件就是该文件夹中的文件。这既是官方给出的Topic和event的定义
https://kafka.apache.org/37/documentation.html#introduction
接下来我们来创建一个主题首先我们来到 kafka 安装目录的 bin 目录下 # 创建一个名为 tianlongbabu的主题
./kafka-topics.sh --create --topic tianlongbabu --bootstrap-server 192.168.200.100:9092# 列出所有的主题在主机上操作可以使用 localhost
./kafka-topics.sh --list --bootstrap-server localhost:9092 创建好了之后 我们可以回到客户端工具查看 同样的 我们可以直接在这个客户端上删除这个 topic 3.2、生产消息和消费消息
我们接下来看看 怎么往主题中写入事件(消息)
# 在主机上 指定topic 连接到kafka服务端
./kafka-console-producer.sh --topic tianlongbabu \
--bootstrap-server 192.168.200.100:9092
连接上之后 就可以发送事件了 同样的我们新开一个终端 使用 kafka-console-consumer.sh 这个脚本 就可以消费topic中的数据了
## 主机上操作 可以使用localhost
## --from-beginning 表示从kafka最早的消息开始消费./kafka-console-consumer.sh --topic tianlongbabu \
--from-beginning --bootstrap-server localhost:9092 我们连接上之后 就会打印出刚刚发送的事件(消息)了。
3.3、关于事件的组成
看到这里 相信大家对事件有了一定的了解关于事件这里给出一段官方文档上的原文 我们 从这段话中至少可以知道
1、event 在文档中也被称为记录(record)或消息(message)
2、 当你向Kafka读写数据时采用的是事件形式
3、概念上一个事件包含键(key)、值(value)、时间戳(timestamp)以及可选的元数据(metadata)。
所以一个事件可以设置一个key, 那么你可能会问key 是干什么用的呢作用是什么呢这个会在后面的编码的章节中给出解释大家先知道有这个概念就行了
3.4、关于 Partition (分区)
先来给出一张图出自官方文档的主要术语解释的章节 从这张图中我们可以看到topic 中存在多个 partition(分区)也就是说事件其实都是被保存在topic中的分区里并且一个topic 可以创建多个分区。
消息在分区中以追加的方式存储每条消息都有一个唯一的偏移量offset表示它在分区中的位置。
那么分区的作用是什么呢我自己根据文档上描述主要总结了4点
1、分区允许多个消费者并行消费同一主题中的消息不同的消费者可以消费不同的分区从而提高系统的吞吐量
2、Kafka会根据配置将消息均匀地分布到各个分区确保负载在多个消费者之间均匀分配
3、在同一分区内消息的顺序是有保障的。这意味着对于同一键的所有消息都会被发送到同一分区从而保持顺序
4、通过增加分区数可以扩展主题的并发能力和存储能力。Kafka支持动态增加分区但请注意这可能影响到现有数据的顺序。
这几点大家先了解即可。
4、入门程序开发
4.1、新建项目引入依赖
新建项目添加 kafka 客户端依赖
!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion3.7.0/version/dependency
4.2、编写生产者
我们先来创建消息生产者的代码
public class KafkaProducerTest {public static void main(String[] args) {// Kafka配置Properties props new Properties();props.put(bootstrap.servers, 192.168.200.100:9092); // Kafka服务器地址props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);// 创建Kafka生产者KafkaProducerString, String producer new KafkaProducer(props);String topic tianlongbabu; // 主题名称String key gaibang; // 消息键String value 乔峰; // 消息值// 发送消息ProducerRecordString, String record new ProducerRecord(topic, key, value);producer.send(record, (RecordMetadata metadata, Exception e) - {if (e ! null) {e.printStackTrace();} else {System.out.println(Sent message with key: key , value: value , to partition: metadata.partition() , offset: metadata.offset());}});// 关闭生产者producer.close();}
}上述代码的写法 在org.apache.kafka.clients.producer.KafkaProducer 类的注释中有详细的说明 这个简单的入门案例中用到了前面给大家介绍的key(事件键)了看了这个案例大家应该能够明白是什么意思了吧。这个时候 我们运行main方法 就能把测试数据写入到 topic 中了
4.3、编写消费者
相关代码如下
public static void main(String[] args) {// Kafka配置Properties props new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.200.100:9092); // Kafka服务器地址props.put(ConsumerConfig.GROUP_ID_CONFIG, test-group); // 消费者组IDprops.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);// 创建Kafka消费者KafkaConsumerString, String consumer new KafkaConsumer(props);String topic tianlongbabu; // 主题名称// 订阅主题consumer.subscribe(Collections.singletonList(topic));// 不断消费消息while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(100));for (ConsumerRecordString, String record : records) {System.out.printf(Consumed message with key: %s, value: %s, from partition: %d, offset: %d%n,record.key(), record.value(), record.partition(), record.offset());}}// 关闭消费者通常不会到达这里// consumer.close();}
消费者的代码里我们需要指定一个消费组 id ,当存在多个消费者服务的时候 需要为他们各自的消费组id。
我们运行main方法即可收到刚才发送的消息了。 到这里一个简单的生产-消费的案例已经结束了相信你对Kafka也有了一个初步的认知。
4.4、关于消费组ID
上述入门程序的消费者程序中有一个消费组id 。在 Kafka 中消费组 IDConsumer Group ID是一个重要的概念用于标识一组协同工作的消费者。
简单来说就是
1、对于一个特定的主题如果多个消费者属于同一个消费组那么主题中的每条消息只会被该消费组内的一个消费者消费。 2、如果消费者属于不同的消费组则它们可以独立消费同一主题的消息并且不会互相影响
5、SpringBoot集成
5.1、相关依赖
SpringBoot版本
parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion3.2.9/versionrelativePath/ !-- lookup parent from repository --/parent
项目相关依赖
dependencies!-- Spring Boot Starter --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter/artifactId/dependency!-- Spring Kafka --dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependency!-- Spring Boot Starter Web (optional, if you want to create a REST API) --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependency!-- Add other dependencies as needed --
/dependencies
5.2、配置详解
我们在applicatio.properties 添加以下配置
spring.kafka.bootstrap-servers192.168.200.100:9092
## 消费组
spring.kafka.consumer.group-idmy-group
## 从最早的消息开始消费
spring.kafka.consumer.auto-offset-resetearliest
5.3、编写生产者
Service
public class KafkaProducer {Autowiredprivate KafkaTemplateString, String kafkaTemplate;public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);}
}
5.4、编写消费者
Service
public class KafkaConsumer {KafkaListener(topics tianlongbabu, groupId my-group)public void listen(String message) {System.out.println(Received Message: message);}
}
5.5、测试
RestController
public class KafkaController {Autowiredprivate KafkaProducer kafkaProducer;PostMapping(/send)public void sendMessage(RequestBody String message) {kafkaProducer.sendMessage(tianlongbabu, message);}
}
启动类
SpringBootApplication
public class BootKafkaApplication {public static void main(String[] args) {SpringApplication.run(BootKafkaApplication.class, args);}}
启动服务由于我们配置的是earliest所以会打印出 topic中之前的消息 我们可以继续使用postman发送消息 然后观察控制台的输出 好了一个简单的消息生产和消费的流程就是这样了
6、Kafka 的应用
今天这篇文章主要给大家介绍了Kafka的一些基础知识并且实现了SpringBoot快速集成的案例大家后续可以使用上述案例作为模板添加自己的业务代码。后续我也会分享一些 Kafka工程化落地的实战案例在我的微信公众号(代码洁癖症患者)上感兴趣的小伙伴可以去查阅
最后我们聊聊Kafka的应用场景
首先 Kafka 可以作为高吞吐量的消息队列支持异步通信。生产者将消息发送到主题消费者从主题中读取消息适合实现解耦的微服务架构。
其次 Kafka 可以收集各个服务或应用的日志信息并将其集中存储以便后续的分析和监控。
在大数据领域 使用 Kafka 结合流处理框架如 Apache Flink、Apache Spark Streaming可以实时处理和分析数据流适用于实时监控、欺诈检测等场景。
在数据集成领域 Kafka 可以作为不同数据源之间的数据桥梁实现数据的实时传输和整合常用于 ETL提取、转换、加载流程。
这里仅仅列出几个比较常用的场景还有很多领域都可以见到Kafka的身影。
好了纸上得来终觉浅大家赶紧动手试试吧