网站图片切换js代码,seo关键词排名优化联系方式,做网站湖州,网页开发三件套Kafka入门4.0.0版本#xff08;基于Java、SpringBoot操作#xff09;
一、kafka概述
Kafka最初是由LinkedIn公司开发的#xff0c;是一个高可靠、高吞吐量、低延迟的分布式发布订阅消息系统#xff0c;它使用Scala语言编写#xff0c;并于2010年被贡献给了Apache基金会基于Java、SpringBoot操作
一、kafka概述
Kafka最初是由LinkedIn公司开发的是一个高可靠、高吞吐量、低延迟的分布式发布订阅消息系统它使用Scala语言编写并于2010年被贡献给了Apache基金会随后成为Apache的顶级开源项目。主要特点有: 为发布和订阅提供高吞吐量消息持久化分布式消费消息采用Pull模式支持在线和离线场景 本次采用最新的kafka版本4.0.0Kafka 4.0 最引人瞩目的变化之一当属其默认运行在 KRaftKafka Raft模式下彻底摆脱了对 Apache ZooKeeper 的依赖。在 Kafka 的发展历程中ZooKeeper 曾是其核心组件负责协调分布式系统中的元数据管理、Broker 注册、主题分区分配等关键任务。然而随着 Kafka 功能的不断丰富与用户规模的持续扩大ZooKeeper 逐渐成为系统部署和运维中的一个复杂性来源增加了运营成本与管理难度。
KRaft 模式的引入标志着 Kafka 在架构上的自我进化达到了一个新高度。通过采用基于 Raft 一致性算法的共识机制Kafka 将元数据管理内嵌于自身体系实现了对 ZooKeeper 的无缝替代。这一转变带来了多方面的显著优势
简化部署与运维运维人员无需再为维护 ZooKeeper 集群投入额外精力降低了整体运营开销。新架构减少了系统的复杂性使得 Kafka 的安装、配置和日常管理变得更加直观和高效。
增强可扩展性KRaft 模式下Kafka 集群的扩展性得到了进一步提升。新增 Broker 节点的加入流程更加简便能够更好地适应大规模数据处理场景下对系统资源动态调整的需求。
提升系统性能与稳定性去除 ZooKeeper 这一外部依赖后Kafka 在元数据操作的响应速度和一致性方面表现出色。尤其是在高并发写入和读取场景下系统的稳定性和可靠性得到了增强减少了因外部组件故障可能导致的单点问题。
之前的架构 现在的架构 kafka消费模型
不同消费者组可以消费全量的消息相同消费者组内的消费者只能消费一部分。 kafka基本概念
Producer生产者
消息的生产者负责将消息发送到Kafka集群中。
Consumer消费者
消息的消费者负责从Kafka集群中读取并处理消息
Broker服务代理节点
Kafka集群中的一个或多个服务器负责存储和转发消息。
Topic主题
Kafka中的消息以主题为单位进行归类生产者发送消息到特定主题消费者订阅并消费这些主题的消息。
Partition分区
每个主题可以细分为多个分区分区是Kafka存储消息的物理单位每个分区可以看作是一个有序的、不可变的消息序列。
Replica副本
Kafka为每个分区引入了多副本机制以提高数据的安全性和可靠性。副本分为leader和follower其中leader负责处理读写请求follower负责从leader同步数据。
Consumer Group消费者组
由多个消费者组成消费者组内的消费者共同消费同一个主题的消息但每个消费者只负责消费该主题的一个或多个分区避免消息重复消费。
kraft
通过采用基于 Raft 一致性算法的共识机制Kafka 将元数据管理内嵌于自身体系实现了对 ZooKeeper 的无缝替代
kafka发送端采用push模式
kafka消费端采用pull模式订阅并消费消息 Kafka的工作原理
可以概括为以下几个步骤 消息发布 生产者将消息发送到Kafka集群的特定主题并可以选择发送到该主题的哪个分区。如果未指定分区Kafka会根据负载均衡策略自动选择分区。 消息存储 Kafka将接收到的消息存储在磁盘上的分区中每个分区都是一个有序的消息序列。Kafka使用顺序写入和零拷贝技术来提高写入性能并通过多副本机制确保数据的安全性和可靠性。 消息消费 消费者组中的消费者从Kafka集群中订阅并消费消息。每个消费者负责消费一个或多个分区中的消息并确保消息至少被消费一次。消费者可以使用拉Pull模式或推Push模式从Kafka中拉取消息。 负载均衡 Kafka通过ZooKeeper维护集群的元数据信息包括分区和消费者的对应关系。当消费者数量或分区数量发生变化时Kafka会重新分配分区给消费者以实现负载均衡。 容错机制 Kafka通过多副本机制实现容错。当leader副本出现故障时Kafka会从ISRIn-Sync Replicas集合中选择一个新的leader副本继续对外提供服务。同时Kafka还提供了多种可靠性级别供用户选择以满足不同的业务需求。
kafka特点
一、Kafka的持久化机制
Kafka的持久化机制主要涉及消息的存储和复制。Kafka以日志的形式存储消息每个主题Topic被划分为多个分区Partition每个分区中的消息按照顺序进行存储。Kafka使用多个副本Replica来保证消息的持久性和可靠性每个分区的消息会被复制到多个副本中以防止数据丢失。此外Kafka还允许根据配置的保留策略来保留已消费的消息一段时间以便在需要时进行检索和恢复。
Kafka的副本机制是其实现高可用性和数据持久性的重要基石。每个主题的每个分区都配置有多个副本这些副本分散保存在不同的Broker上从而能够对抗部分Broker宕机带来的数据不可用问题。Kafka的副本机制包括领导者副本Leader Replica和追随者副本Follower Replica
领导者副本负责处理所有的读写请求包括生产者的消息写入和消费者的消息读取。
追随者副本从领导者副本异步拉取消息并写入到自己的提交日志中从而实现与领导者副本的同步。追随者副本不对外提供服务只作为数据的冗余备份。
Kafka还引入了ISRIn-Sync Replicas机制即与领导者副本保持同步的副本集合。只有处于ISR中的副本才能参与到消息的写入和读取过程中以确保数据的一致性和可靠性。当某个副本与领导者副本的同步延迟超过一定的阈值时它会被踢出ISR直到同步恢复正常。
二、Kafka的数据一致性
Kafka通过多个机制来确保数据的一致性包括副本同步、ISR机制、生产者事务和消费者事务等
副本同步确保主副本将数据同步到所有副本的过程在副本同步完成之前生产者才会认为消息已经被成功写入。
ISR机制通过动态调整ISR列表中的副本确保只有可靠的副本参与到数据的读写操作从而提高数据的一致性和可靠性。
生产者事务Kafka的生产者事务机制可以确保消息的Exactly-Once语义即消息不会被重复写入或丢失。生产者事务将消息的发送和位移提交等操作放在同一个事务中一旦事务提交成功就意味着消息已经被成功写入并且对应的位移也已经提交。
消费者事务虽然Kafka的消费者通常不直接支持事务但消费者可以通过提交位移Offset来确保消息的正确消费。消费者事务将消息的拉取和位移提交等操作放在同一个事务中以确保消息不会被重复消费或丢失。
二、kafka应用
2.1 win11安装kafka4.0.0
下载地址https://kafka.apache.org/downloads 下载最后一个kafka-2.13-4.0.0.tgz 下载好之后把这个压缩包解压就行了然后找到config下面的server.properties
找到log.dirs改成自己电脑上的目录
log.dirsE:\\runSoft\\kafka\\data第一步 获取uuid
先打开命令行进入到bin下面的windows目录下
命令
kafka-storage.bat random-uuid先获取uuid我的uuid为ANVnC_s-QYGJF1C7wu9Aww 第二步 格式化日志
命令
kafka-storage.bat format --standalone -t PPEZ2LW8T8yjZNWnfNHorQ -c ../../config/server.properties第三步 启动
打开命令行进入到bin下面的windows目录下 启动命令
kafka-server-start.bat ../../config/server.properties创建topic
kafka-topics.bat --create --topic quickstart-events --bootstrap-server localhost:9092启动一个消费端
kafka-console-consumer.bat --topic quickstart-events --from-beginning --bootstrap-server localhost:9092启动一个生产端
kafka-console-consumer.bat --topic quickstart-events --from-beginning --bootstrap-server localhost:9092问题 1、如果提示如下 命令行 输入行太长。
命令语法不正确。则需要把目录变短目录太长win11不让输入。 2tgz需要解压两次 只解压一次是不行的tgz是打包之后压缩的。3、如果启动失败需要重新配置 重新配置时。把log.dirs的路径下面的东西清空2.2 java开发kafka
第一步引入依赖 dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion4.0.0/version
/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion2.0.17/version
/dependency第二步建立生产者
public class Producer {public static void main(String[] args) {MapString,Object props new HashMap();// kafka 集群 节点props.put(bootstrap.servers, localhost:9092);props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);props.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);String topic test;KafkaProducerString, String producer new KafkaProducer(props);producer.send(new ProducerRecordString, String(topic, key, value-1));producer.send(new ProducerRecordString, String(topic, key, value-2));producer.send(new ProducerRecordString, String(topic, key, value-3));producer.close();}}ProducerRecord 是 Kafka 中的一个核心类它代表了一组 Kafka 需要发送的 key/value 键值对它由记录要发送到的主题名称Topic Name**可选的分区号Partition Number**以及可选的键值对构成。 第三步、建立消费者类
public class Consumer {public static void main(String[] args){Properties props new Properties();props.put(bootstrap.servers, localhost:9092);props.put(group.id, test);props.put(enable.auto.commit, true);props.put(auto.commit.interval.ms, 1000);props.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);KafkaConsumerString , String consumer new KafkaConsumer(props);consumer.subscribe(Arrays.asList(test));while (true) {ConsumerRecordsString,String records consumer.poll(Duration.ofDays(100));for (ConsumerRecordString, String record : records) {System.out.printf(partition %d ,offset %d, key %s, value %s%n,record.partition(), record.offset(), record.key(), record.value());}}}
}运行效果 2.3 spring boot整合kafka
第一步引入依赖
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion3.4.1/versionrelativePath/ !-- lookup parent in repository --/parentartifactIdspring_boot_kafka_demo/artifactIdpackagingjar/packagingnamespring_boot_kafka_demo Maven Webapp/nameurlhttp://maven.apache.org/urldependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter/artifactId/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependencydependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion4.0.0/version/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin/plugins/build
/project
第二步编写配置文件
编写resources下的application.yml
spring:kafka:bootstrap-servers: localhost:9092consumer:auto-offset-reset: earliest第三步编写生产者
Service
public class Producer {Autowiredprivate KafkaTemplateString, String kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send(topic1, message);}PostConstructpublic void init() {sendMessage(Hello, Kafka!);}
}第四步编写消费者
Component
public class Consumer {KafkaListener(id myId, topics topic1)public void listen(String in) {System.out.println(in);}
}第五步编写启动类
SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}}运行效果 2.4 记录日志到kafka中
第一步在2.3的基础上添加依赖
dependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion2.0.12/version !-- Spring Boot 3.x 推荐版本 --
/dependency第二步添加kafka的日志appender类
public class KafkaLogbackAppender extends UnsynchronizedAppenderBaseILoggingEvent {private String topic application-logs;private String bootstrapServers localhost:9092;private KafkaProducerString, String producer;Overridepublic void start() {Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());this.producer new KafkaProducer(props);super.start();}Overrideprotected void append(ILoggingEvent eventObject) {String msg eventObject.getFormattedMessage();producer.send(new ProducerRecord(topic, msg));}Overridepublic void stop() {if (producer ! null) {producer.close();}super.stop();}// Getter and Setter for XML configpublic void setTopic(String topic) {this.topic topic;}public void setBootstrapServers(String bootstrapServers) {this.bootstrapServers bootstrapServers;}
}第三步在resources下添加logback-spring.xml文件
configuration debugfalse scantrue scanPeriod30 seconds!-- 定义日志格式 --property namePATTERN value%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n/!-- 控制台输出 --appender nameSTDOUT classch.qos.logback.core.ConsoleAppenderencoderpattern${PATTERN}/pattern/encoder/appender!-- Kafka Appender --appender nameKAFKA classcom.demo.KafkaLogbackAppenderbootstrapServerslocalhost:9092/bootstrapServerstopicapplication-logs/topic/appender!-- 根日志输出 --root levelinfoappender-ref refSTDOUT/appender-ref refKAFKA//root/configuration第四步修改Producer类
Service
public class Producer {private static final Logger logger LoggerFactory.getLogger(Producer.class);Autowiredprivate KafkaTemplateString, String kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send(topic1, message);}PostConstructpublic void init() {sendMessage(Hello, Kafka!);logger.info(Message sent);logger.info(Message sent);logger.info(Message sent);logger.info(Message sent);logger.info(Message sent);}
}第五步修改Consumer类
Component
public class Consumer {KafkaListener(id myId, topics topic1)public void listen(String in) {System.out.println(in);}KafkaListener(id myId2, topics application-logs)public void listen2(String in) {System.out.println(resinfo:in);}
}