手机版网站建设合同,自己做网站花钱么,百色市右江区了建设局网站,在线装修设计师咨询文章目录 1、消费者组1.1、使用 efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本1.2、创建生产者发送消息1.3、application.yml配置1.4、创建消费者监听器1.5、创建SpringBoot启动类1.6、屏蔽 kafka debug 日志 logback.xml1.7、引入spring-kafka依赖1.8、消费… 文章目录 1、消费者组1.1、使用 efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本1.2、创建生产者发送消息1.3、application.yml配置1.4、创建消费者监听器1.5、创建SpringBoot启动类1.6、屏蔽 kafka debug 日志 logback.xml1.7、引入spring-kafka依赖1.8、消费者控制台 1、消费者组
1.1、使用 efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本 1.2、创建生产者发送消息
package com.atguigu.spring.kafka.consumer;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;SpringBootTest
class SpringKafkaConsumerApplicationTests {ResourceKafkaTemplate kafkaTemplate;Testvoid contextLoads() {for (int i 0; i 10; i) {kafkaTemplate.send(my_topic1,i%6,, 消费者组i);}}
} 1.3、application.yml配置
server:port: 8120# v1
spring:Kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097consumer:# read-committed读事务已提交的消息 解决脏读问题isolation-level: read-committed # 消费者的事务隔离级别read-uncommitted会导致脏读可以读取生产者事务还未提交的消息# 消费者是否自动ack true自动ack 消费者获取到消息后kafka提交消费者偏移量enable-auto-commit: true # 消费者提交ack时多长时间批量提交一次auto-commit-interval: 1000# 消费者第一次消费主题消息时从哪个位置开始auto-offset-reset: earliest #指定Offset消费:earliest | latest | nonekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
1.4、创建消费者监听器
package com.atguigu.spring.kafka.consumer.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
Component
public class MyKafkaListener {KafkaListener(topics {my_topic1},groupId my_group1)public void onMessage1(ConsumerRecordString, String record) {System.out.println(my_group1消费者1获取到消息topic record.topic()partition:record.partition()offset record.offset()key record.key()value record.value());}KafkaListener(topics {my_topic1},groupId my_group1)public void onMessage2(ConsumerRecordString, String record) {System.out.println(my_group1消费者2获取到消息topic record.topic()partition:record.partition()offset record.offset()key record.key()value record.value());}KafkaListener(topics {my_topic1},groupId my_group2)public void onMessage3(ConsumerRecordString, String record) {System.out.println(my_group2消费者获取到消息topic record.topic()partition:record.partition()offset record.offset()key record.key()value record.value());}}
1.5、创建SpringBoot启动类
package com.atguigu.spring.kafka.consumer;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;// Generated by https://start.springboot.io
// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 https://springdoc.cn
SpringBootApplication
public class SpringKafkaConsumerApplication {public static void main(String[] args) {SpringApplication.run(SpringKafkaConsumerApplication.class, args);}}
1.6、屏蔽 kafka debug 日志 logback.xml
configuration !-- 如果觉得idea控制台日志太多src\main\resources目录下新建logback.xml
屏蔽kafka debug --logger nameorg.apache.kafka.clients leveldebug /
/configuration1.7、引入spring-kafka依赖
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion3.0.5/versionrelativePath/ !-- lookup parent from repository --/parent!-- Generated by https://start.springboot.io --!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 https://springdoc.cn --groupIdcom.atguigu/groupIdartifactIdspring-kafka-consumer/artifactIdversion0.0.1-SNAPSHOT/versionnamespring-kafka-consumer/namedescriptionspring-kafka-consumer/descriptionpropertiesjava.version17/java.version/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin/plugins/build/project
1.8、消费者控制台 . ____ _ __ _ _/\\ / ____ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | _ | _| | _ \/ _ | \ \ \ \\\/ ___)| |_)| | | | | || (_| | ) ) ) ) |____| .__|_| |_|_| |_\__, | / / / /|_||___//_/_/_/:: Spring Boot :: (v3.0.5)my_group2消费者获取到消息topic my_topic1partition:2offset 0key value 消费者组2
my_group2消费者获取到消息topic my_topic1partition:2offset 1key value 消费者组8
my_group2消费者获取到消息topic my_topic1partition:0offset 0key value 消费者组0
my_group2消费者获取到消息topic my_topic1partition:0offset 1key value 消费者组6
my_group2消费者获取到消息topic my_topic1partition:5offset 0key value 消费者组5
my_group2消费者获取到消息topic my_topic1partition:3offset 0key value 消费者组3
my_group2消费者获取到消息topic my_topic1partition:3offset 1key value 消费者组9
my_group2消费者获取到消息topic my_topic1partition:1offset 0key value 消费者组1
my_group2消费者获取到消息topic my_topic1partition:1offset 1key value 消费者组7
my_group2消费者获取到消息topic my_topic1partition:4offset 0key value 消费者组4
my_group1消费者2获取到消息topic my_topic1partition:2offset 0key value 消费者组2
my_group1消费者2获取到消息topic my_topic1partition:2offset 1key value 消费者组8
my_group1消费者1获取到消息topic my_topic1partition:5offset 0key value 消费者组5
my_group1消费者1获取到消息topic my_topic1partition:4offset 0key value 消费者组4
my_group1消费者1获取到消息topic my_topic1partition:3offset 0key value 消费者组3
my_group1消费者1获取到消息topic my_topic1partition:3offset 1key value 消费者组9
my_group1消费者2获取到消息topic my_topic1partition:1offset 0key value 消费者组1
my_group1消费者2获取到消息topic my_topic1partition:1offset 1key value 消费者组7
my_group1消费者2获取到消息topic my_topic1partition:0offset 0key value 消费者组0
my_group1消费者2获取到消息topic my_topic1partition:0offset 1key value 消费者组6