临沂360网站建设推广,移动网站建设初学视频教程,网站推广销售,百度 个人中心首页Kafka入门 
为什么要用消息中间件#xff1f; 
异步处理 
场景说明#xff1a;用户注册后#xff0c;需要发注册邮件和注册短信。传统的做法有两种1.串行的方式#xff1b;2.并行方式。 
串行方式#xff1a;将注册信息写入数据库成功后#xff0c;发送注册邮件#xff…Kafka入门 
为什么要用消息中间件 
异步处理 
场景说明用户注册后需要发注册邮件和注册短信。传统的做法有两种1.串行的方式2.并行方式。 
串行方式将注册信息写入数据库成功后发送注册邮件再发送注册短信。以上三个任务全部完成后返回给客户端。 
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QSbXkisA-1686193157416)(file:///C:\Users\ADMINI~1\AppData\Local\Temp\msohtmlclip1\01\clip_image002.jpg)] 
2并行方式将注册信息写入数据库成功后发送注册邮件的同时发送注册短信。以上三个任务完成后返回给客户端。与串行的差别是并行的方式可以提高处理的时间。 
假设三个业务节点每个使用50毫秒钟不考虑网络等其他开销则串行方式的时间是150毫秒并行的时间可能是100毫秒。 
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9p13A393-1686193157418)(file:///C:\Users\ADMINI~1\AppData\Local\Temp\msohtmlclip1\01\clip_image004.jpg)] 
小结如以上案例描述传统的方式系统的性能并发量吞吐量响应时间会有瓶颈。如何解决这个问题呢 
引入消息队列将不是必须的业务逻辑异步处理。 
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8RExvNKZ-1686193157419)(file:///C:\Users\ADMINI~1\AppData\Local\Temp\msohtmlclip1\01\clip_image006.jpg)] 
按照以上约定用户的响应时间相当于是注册信息写入数据库的时间也就是50毫秒。注册邮件发送短信写入消息队列后直接返回因此写入消息队列的速度很快基本可以忽略因此用户的响应时间可能是50毫秒。因此架构改变后系统的吞吐量提高到每秒20 QPS。比串行提高了3倍比并行提高了两倍。 
应用解耦 
场景说明用户下单后订单系统需要通知库存系统。传统的做法是订单系统调用库存系统的接口。 
传统模式的缺点 
1 假如库存系统无法访问则订单减库存将失败从而导致订单失败 
2 订单系统与库存系统耦合 
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CZbRjNwt-1686193157420)(file:///C:\Users\ADMINI~1\AppData\Local\Temp\msohtmlclip1\01\clip_image008.jpg)] 
如何解决以上问题呢引入应用消息队列后的方案 
订单系统用户下单后订单系统完成持久化处理将消息写入消息队列返回用户订单下单成功。 
库存系统订阅下单的消息采用拉/推的方式获取下单信息库存系统根据下单信息进行库存操作。 
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-c7GtV11T-1686193157422)(file:///C:\Users\ADMINI~1\AppData\Local\Temp\msohtmlclip1\01\clip_image010.jpg)] 
假如在下单时库存系统不能正常使用。也不影响正常下单因为下单后订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。 
流量削峰 
流量削峰也是消息队列中的常用场景一般在秒杀或团抢活动中使用广泛。 
应用场景秒杀活动一般会因为流量过大导致流量暴增应用挂掉。为解决这个问题一般需要在应用前端加入消息队列可以控制活动的人数可以缓解短时间内高流量压垮应用。 
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KkntO33S-1686193157423)(file:///C:\Users\ADMINI~1\AppData\Local\Temp\msohtmlclip1\01\clip_image012.gif)] 
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8cQLq6b0-1686193157425)(file:///C:\Users\ADMINI~1\AppData\Local\Temp\msohtmlclip1\01\clip_image014.gif)] 
用户的请求服务器接收后首先写入消息队列。假如消息队列长度超过最大数量则直接抛弃用户请求或跳转到错误页面秒杀业务根据消息队列中的请求信息再做后续处理。 
日志处理 
日志处理是指将消息队列用在日志处理中比如Kafka的应用解决大量日志传输的问题。架构简化如下 日志采集客户端负责日志数据采集定时写入Kafka队列Kafka消息队列负责日志数据的接收存储和转发日志处理应用订阅并消费kafka队列中的日志数据 
为什么选择Kafka 
消息中间件的编年史 Kafka的外在表现和内在设计 
kafka最初是LinkedIn的一个内部基础设施系统。最初开发的起因是LinkedIn虽然有了数据库和其他系统可以用来存储数据但是缺乏一个可以帮助处理持续数据流的组件。 
所以在设计理念上开发者不想只是开发一个能够存储数据的系统如关系数据库、Nosql数据库、搜索引擎等等更希望把数据看成一个持续变化和不断增长的流并基于这样的想法构建出一个数据系统一个数据架构。 
Kafka外在表现很像消息系统允许发布和订阅消息流但是它和传统的消息系统有很大的差异 
1、Kafka是个现代分布式系统以集群的方式运行可以自由伸缩。 
2、Kafka可以按照要求存储数据保存多久都可以 
3、流式处理将数据处理的层次提示到了新高度消息系统只会传递数据Kafka的流式处理能力可以让我们用很少的代码就能动态地处理派生流和数据集。所以Kafka不仅仅是个消息中间件。 
Kafka不仅仅是一个消息中间件同时它是一个流平台这个平台上可以发布和订阅数据流Kafka的流有一个单独的包Stream的处理并把他们保存起来进行处理这个是Kafka作者的设计理念。 
大数据领域Kafka还可以看成实时版的Hadoop但是还是有些区别Hadoop可以存储和定期处理大量的数据文件往往以TB计数而Kafka可以存储和持续处理大型的数据流。Hadoop主要用在数据分析上而Kafka因为低延迟更适合于核心的业务应用上。 
Kafka名字的由来卡夫卡与法国作家马塞尔·普鲁斯特爱尔兰作家詹姆斯·乔伊斯并称为西方现代主义文学的先驱和大师。《变形记》是卡夫卡的短篇代表作是卡夫卡的艺术成就中的一座高峰被认为是20世纪最伟大的小说作品之一达到管理层的高度同学可以多看下人文相关的书籍增长管理知识和人格魅力。 
本次课程将会以kafka_2.13-3.3.1版本做主讲这是讲课时的最新版本。 
市场主流消息中间件对比 Kafka中的基本概念 
消息和批次 
消息 Kafka里的数据单元也就是我们一般消息中间件里的消息的概念可以比作数据库中一条记录。消息由字节数组组成。消息还可以包含键可选元数据也是字节数组主要用于对消息选取分区。 
作为一个高效的消息系统为了提高效率消息可以被分批写入Kafka。批次就是一组消息这些消息属于同一个主题和分区。如果只传递单个消息会导致大量的网络开销把消息分成批次传输可以减少这开销。但是这个需要权衡时间延迟和吞吐量之间批次里包含的消息越多单位时间内处理的消息就越多单个消息的传输时间就越长吞吐量高延时也高。如果进行压缩可以提升数据的传输和存储能力但需要更多的计算处理。 
对于Kafka来说消息是晦涩难懂的字节数组一般我们使用序列化和反序列化技术格式常用的有JSON和XML还有AvroHadoop开发的一款序列化框架具体怎么使用依据自身的业务来定。 
主题和分区 
Kafka里的消息用主题进行分类主题好比数据库中的表主题下有可以被分为若干个 分区分表技术 。分区本质上是个提交日志文件有新消息这个消息就会以追加的方式写入分区写文件的形式然后用先入先出的顺序读取。 
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PG8I02O1-1686193157434)(file:///C:\Users\ADMINI~1\AppData\Local\Temp\msohtmlclip1\01\clip_image002.jpg)] 
但是因为主题会有多个分区所以在整个主题的范围内是无法保证消息的顺序的单个分区则可以保证。 
Kafka通过分区来实现数据冗余和伸缩性因为分区可以分布在不同的服务器上那就是说一个主题可以跨越多个服务器这是Kafka高性能的一个原因多台服务器的磁盘读写性能比单台更高。 
前面我们说Kafka可以看成一个流平台很多时候我们会把一个主题的数据看成一个流不管有多少个分区。 
生产者和消费者、偏移量、消费者群组 
就是一般消息中间件里生产者和消费者的概念。一些其他的高级客户端API像数据管道API和流式处理的Kafka Stream都是使用了最基本的生产者和消费者作为内部组件然后提供了高级功能。 
生产者默认情况下把消息均衡分布到主题的所有分区上如果需要指定分区则需要使用消息里的消息键和分区器。 
消费者订阅主题一个或者多个并且按照消息的生成顺序读取。消费者通过检查所谓的偏移量来区分消息是否读取过。偏移量是一种元数据一个不断递增的整数值创建消息的时候Kafka会把他加入消息。在一个主题中一个分区里每个消息的偏移量是唯一的。每个分区最后读取的消息偏移量会保存到Zookeeper或者Kafka上这样分区的消费者关闭或者重启读取状态都不会丢失。 
多个消费者可以构成一个消费者群组。怎么构成共同读取一个主题的消费者们就形成了一个群组。群组可以保证每个分区只被一个消费者使用。 消费者和分区之间的这种映射关系叫做消费者对分区的所有权关系很明显一个分区只有一个消费者而一个消费者可以有多个分区。 
吃饭的故事一桌一个分区多桌多个分区生产者不断生产消息(消费)消费者就是买单的人消费者群组就是一群买单的人一个分区只能被消费者群组中的一个消费者消费不能重复消费如果有一个消费者挂掉了James跑路了另外的消费者接上 
Broker和集群 
一个独立的Kafka服务器叫Broker。broker的主要工作是接收生产者的消息设置偏移量提交消息到磁盘保存为消费者提供服务响应请求返回消息。在合适的硬件上单个broker可以处理上千个分区和每秒百万级的消息量。要达到这个目的需要做操作系统调优和JVM调优 
多个broker可以组成一个集群。每个集群中broker会选举出一个集群控制器。控制器会进行管理包括将分区分配给broker和监控broker。 
集群里一个分区从属于一个broker这个broker被称为首领。但是分区可以被分配给多个broker这个时候会发生分区复制。 
集群中Kafka内部一般使用管道技术进行高效的复制。 
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-trE4nSw7-1686193157437)(file:///C:\Users\ADMINI~1\AppData\Local\Temp\msohtmlclip1\01\clip_image006.jpg)] 
分区复制带来的好处是提供了消息冗余。一旦首领broker失效其他broker可以接管领导权。当然相关的消费者和生产者都要重新连接到新的首领上。 
保留消息 
在一定期限内保留消息是Kafka的一个重要特性Kafka broker默认的保留策略是要么保留一段时间7天要么保留一定大小比如1个G。到了限制旧消息过期并删除。但是每个主题可以根据业务需求配置自己的保留策略开发时要注意Kafka不像Mysql之类的永久存储。 
为什么选择Kafka 
优点 
多生产者和多消费者 
基于磁盘的数据存储换句话说Kafka的数据天生就是持久化的。 
高伸缩性Kafka一开始就被设计成一个具有灵活伸缩性的系统对在线集群的伸缩丝毫不影响整体系统的可用性。 
高性能结合横向扩展生产者、消费者和brokerKafka可以轻松处理巨大的信息流LinkedIn公司每天处理万亿级数据同时保证亚秒级的消息延迟。 
常见场景 
活动跟踪 
跟踪网站用户和前端应用发生的交互比如页面访问次数和点击将这些信息作为消息发布到一个或者多个主题上这样就可以根据这些数据为机器学习提供数据更新搜素结果等等头条、淘宝等总会推送你感兴趣的内容其实在数据分析之前就已经做了活动跟踪。 
传递消息 
标准消息中间件的功能 
收集指标和日志 
收集应用程序和系统的度量监控指标或者收集应用日志信息通过Kafka路由到专门的日志搜索系统比如ES。国内用得较多 
提交日志 
收集其他系统的变动日志比如数据库。可以把数据库的更新发布到Kafka上应用通过监控事件流来接收数据库的实时更新或者通过事件流将数据库的更新复制到远程系统。 
还可以当其他系统发生了崩溃通过重放日志来恢复系统的状态。异地灾备 
流处理 
操作实时数据流进行统计、转换、复杂计算等等。随着大数据技术的不断发展和成熟无论是传统企业还是互联网公司都已经不再满足于离线批处理实时流处理的需求和重要性日益增长 。 
近年来业界一直在探索实时流计算引擎和API比如这几年火爆的Spark Streaming、Kafka Streaming、Beam和Flink其中阿里双11会场展示的实时销售金额就用的是流计算是基于Flink然后阿里在其上定制化的Blink。 
Kafka的安装、管理和配置 
安装 
预备环境 
Kafka是Java生态圈下的一员用Scala编写运行在Java虚拟机上所以安装运行和普通的Java程序并没有什么区别。 
安装Kafka官方说法Java环境推荐Java8。 
Kafka需要Zookeeper保存集群的元数据信息和消费者信息。Kafka一般会自带Zookeeper但是从稳定性考虑应该使用单独的Zookeeper而且构建Zookeeper集群。 
运行 
Kafka with ZooKeeper 
启动Zookeeper 
进入Kafka目录下的bin\windows 
执行kafka-server-start.bat …/…/config/server.properties 
Linux下与此类似进入bin后执行对应的sh文件即可 
Kafka with KRaft 
1、生成集群id 2、格式化存储目录 3、启动服务 启动正确后的界面如下 kafka基本的操作和管理 
## 列出所有主题 
./kafka-topics.sh --bootstrap-server localhost:9092 --list## 列出所有主题的详细信息 
./kafka-topics.sh --bootstrap-server localhost:9092 --describe## 创建主题主题名 my-topic 1副本8分区 
./kafka-topics.sh --bootstrap-server localhost:9092  --create --topic my-topic --replication-factor 1 --partitions 8## 增加分区注意分区无法被删除 
./kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic my-topic --partitions 16## 创建生产者控制台 
./kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic## 创建消费者控制台 
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning --consumer.config ../config/consumer.properties## kafka终止命令 
./kafka-server-stop.sh 总结就是 
Broker配置 
配置文件放在Kafka目录下的config目录中主要是server.properties文件 
常规配置 
broker.id 
在单机时无需修改但在集群下部署时往往需要修改。它是个每一个broker在集群中的唯一表示要求是正数。当该服务器的IP地址发生改变时broker.id没有变化则不会影响consumers的消息情况 
listeners 
监听列表(以逗号分隔 不同的协议(如plaintext,trace,ssl、不同的IP和端口)),hostname如果设置为0.0.0.0则绑定所有的网卡地址如果hostname为空则绑定默认的网卡。如果没有配置则默认为java.net.InetAddress.getCanonicalHostName()。 
如PLAINTEXT://myhost:9092,TRACE://:9091或 PLAINTEXT://0.0.0.0:9092, 
zookeeper.connect 
zookeeper集群的地址可以是多个多个之间用逗号分割。一组hostname:port/path列表,hostname是zk的机器名或IP、port是zk的端口、/path是可选zk的路径如果不指定默认使用根路径 
log.dirs 
Kafka把所有的消息都保存在磁盘上存放这些数据的目录通过log.dirs指定。可以使用多路径使用逗号分隔。如果是多路径Kafka会根据“最少使用”原则把同一个分区的日志片段保存到同一路径下。会往拥有最少数据分区的路径新增分区。 
num.recovery.threads.per.data.dir 
每数据目录用于日志恢复启动和关闭时的线程数量。因为这些线程只是服务器启动正常启动和崩溃后重启和关闭时会用到。所以完全可以设置大量的线程来达到并行操作的目的。注意这个参数指的是每个日志目录的线程数比如本参数设置为8而log.dirs设置为了三个路径则总共会启动24个线程。 
auto.create.topics.enable 
是否允许自动创建主题。如果设为true那么produce生产者往主题写消息consume消费者从主题读消息或者fetch metadata任意客户端向主题发送元数据请求时一个不存在的主题时就会自动创建。缺省为true。 
delete.topic.enabletrue 
删除主题配置默认未开启 
主题配置 
新建主题的默认参数 
num.partitions 
每个新建主题的分区个数分区个数只能增加不能减少 。这个参数一般要评估比如每秒钟要写入和读取1000M数据如果现在每个消费者每秒钟可以处理50MB的数据那么需要20个分区这样就可以让20个消费者同时读取这些分区从而达到设计目标。一般经验把分区大小限制在25G之内比较理想 
log.retention.hours 
日志保存时间默认为7天168小时。超过这个时间会清理数据。bytes和minutes无论哪个先达到都会触发。与此类似还有log.retention.minutes和log.retention.ms都设置的话优先使用具有最小值的那个。提示时间保留数据是通过检查磁盘上日志片段文件的最后修改时间来实现的。也就是最后修改时间是指日志片段的关闭时间也就是文件里最后一个消息的时间戳 
log.retention.bytes 
topic每个分区的最大文件大小一个topic的大小限制  分区数*log.retention.bytes。-1没有大小限制。log.retention.bytes和log.retention.minutes任意一个达到要求都会执行删除。(注意如果是log.retention.bytes先达到了则是删除多出来的部分数据)一般不推荐使用最大文件删除策略而是推荐使用文件过期删除策略。 
log.segment.bytes 
分区的日志存放在某个目录下诸多文件中这些文件将分区的日志切分成一段一段的我们称为日志片段。这个属性就是每个文件的最大尺寸当尺寸达到这个数值时就会关闭当前文件并创建新文件。被关闭的文件就开始等待过期。默认为1G。 
如果一个主题每天只接受100MB的消息那么根据默认设置需要10天才能填满一个文件。而且因为日志片段在关闭之前消息是不会过期的所以如果log.retention.hours保持默认值的话那么这个日志片段需要17天才过期。因为关闭日志片段需要10天等待过期又需要7天。 
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-15U5n3yQ-1686193157450)(file:///C:\Users\ADMINI~1\AppData\Local\Temp\msohtmlclip1\01\clip_image010.jpg)] 
log.segment.ms 
作用和log.segment.bytes类似只不过判断依据是时间。同样的两个参数以先到的为准。这个参数默认是不开启的。 
message.max.bytes 
表示一个服务器能够接收处理的消息的最大字节数注意这个值producer和consumer必须设置一致且不要大于fetch.message.max.bytes属性的值(消费者能读取的最大消息,这个值应该大于或等于message.max.bytes)。该值默认是1000000字节大概900KB~1MB。如果启动压缩判断压缩后的值。这个值的大小对性能影响很大值越大网络和IO的时间越长还会增加磁盘写入的大小。 
Kafka设计的初衷是迅速处理短小的消息一般10K大小的消息吞吐性能最好LinkedIn的kafka性能测试 
硬件配置对Kafka性能的影响 
为Kafka选择合适的硬件更像是一门艺术就跟它的名字一样我们分别从磁盘、内存、网络和CPU上来分析确定了这些关注点就可以在预算范围之内选择最优的硬件配置。 
磁盘吞吐量/磁盘容量 
磁盘吞吐量IOPS 每秒的读写次数会影响生产者的性能。因为生产者的消息必须被提交到服务器保存大多数的客户端都会一直等待直到至少有一个服务器确认消息已经成功提交为止。也就是说磁盘写入速度越快生成消息的延迟就越低。SSD固态贵单个速度快HDD机械偏移可以多买几个设置多个目录加快速度具体情况具体分析 
磁盘容量的大小则主要看需要保存的消息数量。如果每天收到1TB的数据并保留7天那么磁盘就需要7TB的数据。 
内存 
Kafka本身并不需要太大内存内存则主要是影响消费者性能。在大多数业务情况下消费者消费的数据一般会从内存页面缓存从系统内存中分中获取这比在磁盘上读取肯定要快的多。一般来说运行Kafka的JVM不需要太多的内存剩余的系统内存可以作为页面缓存或者用来缓存正在使用的日志片段所以我们一般Kafka不会同其他的重要应用系统部署在一台服务器上因为他们需要共享页面缓存这个会降低Kafka消费者的性能。 
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vsyB4Z1r-1686193157452)(file:///C:\Users\ADMINI~1\AppData\Local\Temp\msohtmlclip1\01\clip_image012.jpg)] 
网络 
网络吞吐量决定了Kafka能够处理的最大数据流量。它和磁盘是制约Kafka拓展规模的主要因素。对于生产者、消费者写入数据和读取数据都要瓜分网络流量。同时做集群复制也非常消耗网络。 
CPU 
Kafka对cpu的要求不高主要是用在对消息解压和压缩上。所以cpu的性能不是在使用Kafka的首要考虑因素。 
总结 
我们要为Kafka选择合适的硬件时优先考虑存储包括存储的大小然后考虑生产者的性能也就是磁盘的吞吐量选好存储以后再来选择CPU和内存就容易得多。网络的选择要根据业务上的情况来定也是非常重要的一环。