金坛区建设工程质量监督网站,专业网站建设哪家效果好,电子商务ppt课件,佛山关键词排名工具实战指南#xff1a;使用 Spring Cloud Stream 集成 Kafka 构建高效消息驱动微服务 视频地址#xff1a; Stream为什么被引入-尚硅谷SCS-1-内容介绍-图灵诸葛 官方文档#xff1a; Spring Cloud Stream 什么是 Spring Cloud Stream?
Spring Cloud Stream(SCS) 是一个用于构…实战指南使用 Spring Cloud Stream 集成 Kafka 构建高效消息驱动微服务 视频地址 Stream为什么被引入-尚硅谷SCS-1-内容介绍-图灵诸葛 官方文档 Spring Cloud Stream 什么是 Spring Cloud Stream?
Spring Cloud Stream(SCS) 是一个用于构建消息驱动微服务的框架它基于 Spring Boot提供了一种简化的方式来处理消息和事件的传递。它旨在为不同消息代理如 Kafka、RabbitMQ、Apache Kafka 等提供统一的编程模型使开发者能够更轻松地在微服务架构中使用消息通信。
以下是 Spring Cloud Stream 的一些关键概念和特性
Binder绑定器Binder 是 Spring Cloud Stream 的核心概念之一它提供了与底层消息代理之间的连接和交互。通过 BinderSpring Cloud Stream 能够与不同的消息代理集成例如 Kafka、RabbitMQ 等。每个消息代理都有自己的 Binder 实现使开发者可以在不同的消息代理之间切换而无需修改应用代码。消息通道Message ChannelsSpring Cloud Stream 通过消息通道来实现消息的发送和接收。应用程序可以通过绑定到消息通道来与消息代理进行交互。消息通道可以是输入通道用于接收消息或输出通道用于发送消息。消息转换Message ConversionSpring Cloud Stream 会自动进行消息的序列化和反序列化将消息从 Java 对象转换为消息代理支持的格式以及将从消息代理接收的消息转换回 Java 对象。发布-订阅模式Spring Cloud Stream 支持发布-订阅模式可以让多个消费者订阅同一个主题topic的消息实现了一对多的消息通信。消息分组Message Grouping消息分组可以将一组消费者组织在一起共同处理相同分组 ID 的消息。这对于实现负载均衡和消息去重非常有用。函数式编程模型Spring Cloud Stream 鼓励使用函数式编程模型通过定义处理消息的函数来实现业务逻辑。这种方式使得编写简洁、可测试的消息处理逻辑变得更加容易。实时数据处理Spring Cloud Stream 不仅用于消息传递还可以用于实时数据处理。您可以在消息到达之后立即对其进行处理从而支持实时分析、转换和处理。
总体而言Spring Cloud Stream 简化了在微服务架构中使用消息传递的复杂性提供了一种与消息代理集成的高级抽象让开发者能够更专注于业务逻辑的实现。它的灵活性使得您能够轻松地在不同的消息代理之间切换同时提供了强大的工具来处理消息和事件的传递从而使您的微服务系统更具可扩展性和弹性。
消息中间件的切换只需要 更换依赖 即可。
讲讲 Kafka
Kafka 是一个开源的分布式流数据平台最初由 LinkedIn 开发并捐赠给 Apache 软件基金会。它被设计用于处理高吞吐量、可持久化的实时数据流。Kafka 的主要目标是提供一种高效、可扩展、持久化的消息传递系统能够处理大规模的实时数据流同时保证数据的可靠性和可用性。
以下是 Kafka 的一些关键特性和概念
发布-订阅模式Kafka 采用发布-订阅模式生产者Publisher将消息发布到主题Topic而消费者Consumer可以订阅一个或多个主题来接收消息。这使得多个消费者能够独立地从同一个主题订阅消息实现一对多的消息传递。分区和副本Kafka 将每个主题分为多个分区每个分区可以在不同的服务器上进行副本复制从而提高可用性和容错性。分区和副本的组合允许 Kafka 处理大规模的消息流并保证数据的持久性。持久化Kafka 将消息以持久化的方式存储在磁盘上确保消息在生产者发送和消费者接收之间不会丢失。消息被保存在分区中可以根据需要保留一段时间甚至可以通过配置来保留特定时间段的历史消息。高吞吐量Kafka 在处理消息时具有高吞吐量的能力它能够同时处理成千上万的消息适用于大规模的实时数据处理场景。水平扩展Kafka 支持水平扩展可以通过添加新的服务器节点来增加吞吐量和存储容量从而适应不断增长的数据量。消息保序性Kafka 保证在同一分区内的消息保持顺序这对于一些需要按照顺序处理的场景非常重要。流处理Kafka 不仅用于消息传递还可以用于实时流数据处理。Kafka Streams 是一个用于处理和分析流数据的库可以在 Kafka 上进行流式处理支持流数据的转换、聚合和计算等操作。社区生态系统Kafka 拥有丰富的社区生态系统提供了许多与 Kafka 集成的工具和库如消费者和生产者客户端、连接器Connectors用于将 Kafka 与其他数据源集成、Kafka 管理工具等。
总体而言Kafka 是一个强大的分布式消息流平台适用于许多实时数据处理和消息传递的应用场景。它的可靠性、高性能和可扩展性使得它成为构建大规模实时数据处理系统的重要组件之一。
Spring Cloud Stream 和 Kafka 之间的联系和区别
Spring Cloud Stream 是一个用于构建基于 Spring Boot 的消息驱动微服务的框架它提供了统一的编程模型和抽象来处理消息流而 Kafka 是 Spring Cloud Stream 支持的消息中间件之一。下面我们来讲一下它们之间的联系和区别
联系 消息驱动架构Spring Cloud Stream 和 Kafka 都支持消息驱动架构通过将消息作为信息传递的核心来构建应用程序。它们都支持发布-订阅模式允许不同的微服务之间通过消息进行通信。 微服务和云原生Spring Cloud Stream 是 Spring Cloud 生态系统中的一部分专注于帮助开发人员构建云原生的微服务应用程序。Kafka 作为 Spring Cloud Stream 的一种消息中间件实现与 Spring Cloud Stream 一起可以支持在微服务架构中使用消息传递来解耦微服务之间的通信。 可插拔的消息中间件Spring Cloud Stream 提供了一个抽象层使得在不同的消息中间件之间进行切换变得容易。它支持多种消息中间件包括 Kafka、RabbitMQ 等使开发人员可以根据实际需求选择合适的消息中间件。
区别 定位和用途Spring Cloud Stream 是一个用于构建消息驱动微服务的框架它提供了统一的编程模型和抽象来简化消息传递。而 Kafka 是一个分布式流数据平台专注于处理高吞吐量、可持久化的实时数据流。 功能广度Kafka 是一个功能丰富的消息中间件除了消息传递外还提供了分区、副本、持久化、高吞吐量等特性。而 Spring Cloud Stream 更加专注于在微服务架构中实现消息传递。 编程模型Spring Cloud Stream 提供了更加抽象的编程模型通过 Binder 将应用程序与消息中间件解耦。开发人员只需关注业务逻辑而不必过多关注底层的消息传递细节。Kafka 则需要更多的配置和代码来实现消息的生产和消费。 生态系统Spring Cloud Stream 作为 Spring Cloud 生态系统的一部分可以与其他 Spring Cloud 组件无缝集成如服务发现、负载均衡等。Kafka 作为独立的消息中间件可以在不同的技术栈中使用。
总的来说Spring Cloud Stream 和 Kafka 都是用于处理消息的技术但它们的定位和功能略有不同。Spring Cloud Stream 提供了更加抽象和便捷的方式来构建消息驱动的微服务而 Kafka 提供了更丰富的特性来处理实时数据流。在使用时开发人员可以根据项目需求和技术栈的选择来决定是否使用 Spring Cloud Stream 以及选择哪种消息中间件。
实践
集成 Kafka
引入依赖
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.3.7.RELEASE/versionrelativePath/ !-- lookup parent from repository --/parentgroupIdorg.example/groupIdartifactIdstream-demo/artifactIdversion1.0-SNAPSHOT/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncoding/propertiesdependencyManagementdependenciesdependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-dependencies/artifactIdversionHoxton.SR3/versiontypepom/typescopeimport/scope/dependency/dependencies/dependencyManagementdependenciesdependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.26/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-stream-binder-kafka/artifactId/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin/plugins/build
/project配置文件
server:port: 7888
spring:# application:# name: producercloud:stream:kafka:binder:brokers: ip:9092 #Kafka的消息中间件服务器zk-nodes: ip:2181 #Zookeeper的节点如果集群后面加,号分隔auto-create-topics: true #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。bindings:output: #这里用stream给我们提供的默认output后面会讲到自定义outputdestination: topic #消息发往的目的地content-type: text/plain #消息发送的格式接收端不用指定格式但是发送端要input:destination: topic发送业务类
source - output
EnableBinding(Source.class)
public class SendService {Resourceprivate Source source;public void sendMsg(String msg) {source.output().send(MessageBuilder.withPayload(msg).build());}public void sendBody(Object object) {source.output().send(MessageBuilder.withPayload(object).build());}
}消费者
sink - input
EnableBinding(Sink.class)
public class RecieveService {StreamListener(Sink.INPUT)public void recieve(Object payload){System.out.println(payload);}
}控制器 Controller
RestController
RequestMapping(/send)
public class StreamController {Resourceprivate SendService sendService;/*** 发送* localhost:7888/send/hello** param msg 消息*/GetMapping(/{msg})public void send(PathVariable(msg) String msg){sendService.sendMsg(msg);}GetMapping(/body)public void sendFace() {// 创建一个对象Q_FACE_INPUT qFaceInput new Q_FACE_INPUT();// set 写入数据...sendService.sendBody(qFaceInput);System.out.printf(发送 Face Data 成功);}}自定义通道生产消息
1、创建 MyChannel 接口
/*** 自定义通道 - 模仿source接口造轮子**/
public interface MyChannel {String FACE_OUTPUT face_output;String HUMAN_OUTPUT human_output;String VEHICLE_INPUT vehicle_output;Output(FACE_OUTPUT)MessageChannel faceOutput();Output(HUMAN_OUTPUT)MessageChannel humanOutput();Output(VEHICLE_INPUT)MessageChannel vehicleOutput();}注意事项:
一般接口上不写 Component 注解。接口主要用于定义行为契约而具体的实现通常由类来提供。Component 注解用于将类标识为 Spring 容器管理的组件而不是接口。实际上将 Component 注解放在接口上可能会引发问题。
踩坑小记
有时候在指定类中使用 Resource 注入的时候会报错 – 创建不了这个接口 bean但是使用 Spring 自带的 Autowired 注解就会没问题。可能是因为是两个注解默认注入 bean 的方式不一样引起的
Autowired 默认的注入方式为 byType根据类型进行匹配Resource 默认注入方式为 byName根据名称进行匹配
详细可参考autowired-和-resource-的区别是什么
2、修改 application.yml 文件
server:port: 7888
spring:profiles:active: dev # 表示开发环境cloud:stream:kafka:binder:brokers: ip:9092 #Kafka的消息中间件服务器auto-create-topics: true #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。bindings:# 自定义output - 生产face_output:destination: Q_FACE_INPUT_TWO_DX #消息发往的目的地human_output:destination: Q_HUMAN_INPUT_TWO_DXvehicle_output:destination: Q_VEHICLE_INPUT_TWO_DX3、编写生产者业务类
EnableBinding(value {MyChannel.class})
public class SendService {Resourceprivate MyChannel myChannel;public void sendFaceMsg(String msg) {myChannel.faceOutput().send(MessageBuilder.withPayload(msg).build());}public void sendHumanMsg(String msg) {myChannel.humanOutput().send(MessageBuilder.withPayload(msg).build());}public void sendVehicleMsg(String msg) {myChannel.vehicleOutput().send(MessageBuilder.withPayload(msg).build());}}4、编写 Controller 控制器
RestController
RequestMapping(/send)
public class StreamController {Resourceprivate SendService sendService;/*** 发送人脸数据* localhost:7888/send/face/hello** param msg 消息*/GetMapping(/face/{msg})public void sendFace(PathVariable(msg) String msg){sendService.sendFaceMsg(msg);}/*** 发送人体数据** param msg */GetMapping(/human/{msg})public void sendHuman(PathVariable(msg) String msg){sendService.sendHumanMsg(msg);}/*** 发送车辆数据标准** param msg */GetMapping(/veh/{msg})public void sendVehicle(PathVariable(msg) String msg){sendService.sendVehicleMsg(msg);}}自定义订阅频道
1、创建一个订阅通道注解
Component
public interface MySubscribableChannel {String INPUT1 input1;String INPUT2 input2;String INPUT3 input3;Input(INPUT1)SubscribableChannel input1();Input(INPUT2)SubscribableChannel input2();}2、application.yml 文件配置对应的主题信息
spring:profiles:active: dev # 表示开发环境cloud:stream:kafka:binder:brokers: ip:9092 #Kafka的消息中间件服务器auto-create-topics: true #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。bindings:# 自定义output - 生产output1:destination: topic1 #消息发往的目的地output2:destination: topic2# input订阅消息input1:destination: topic1input2:destination: topic23、监听消息 StreamListener(INPUT1)public void subChannel (String msg){System.out.println(msg);}StreamListener(INPUT2)public void subChannel2 (String msg){System.out.println(msg);}其他写法参考
注意配置文件中 input-in-0 的写法
https://blog.csdn.net/qq_42221396/article/details/128409521Spring Cloud Stream 3.1以后的使用方法_org.springframework.cloud.stream.annotation.enable
学习参考 Spring Cloud 系列之 Spring Cloud Stream - 风的姿态 - 博客园 (cnblogs.com) Spring Cloud 十五Stream 入门、主要概念与自定义消息发送与接收 - 东北小狐狸 - 博客园 (cnblogs.com) Spring Cloud Stream中文指导手册_书上有云的博客-CSDN博客 官方文档中文版Spring Cloud Stream 快速入门 Springcloud Stream详解及整合kafka - 简书 (jianshu.com)