1.Kafka Logcleaner源码分析
2.源码解析kafka删除topic
3.SpringBoot系列SpringBoot整合Kafka(含源码)
4.kafka源码Topic的日日志创建源码分析(附视频)
5.记一次 Kafka 重启失败问题排查
6.Kafka消费者源码:重平衡(1)-初始化与FIND_COORDINATOR
Kafka Logcleaner源码分析
Kafka日志保留策略包括按时间/大小和compact两种。Logcleaner遵循compact策略清理日志,志源只保留最新的系统消息,当多个消息具有相同key时,日日志只保留最新的志源一个。
每个日志由两部分组成:clean和dirty。系统auxpi源码dirty部分可以进一步划分为cleanable和uncleanable。日日志uncleanable部分不允许清理,志源包括活跃段和未达到compact延迟时间的系统段。
清理过程由后台线程定期执行,日日志选择最脏的志源日志进行清理,脏度由dirty部分字节数与总字节数的系统比例决定。清理前,日日志Logcleaner构建一个key->last_offset映射,志源包含dirty部分的系统所有消息。清理后,日志文件过滤掉过期消息,并合并较小的连续段为较大文件。
payload为null的消息被Logcleaner删除,这类消息在topic配置的时间内保留,然后被清理。清理过程需与幂等性和事务性生产者兼容,保留活跃生产者最后一批消息,直到产生新消息或生产者不活跃。只清理提交或终止事物中的消息,未提交事物中的消息不清理。
Logcleaner通过cleanOrSleep方法启动清理,选择最脏日志,调用clean清理并合并段。在清理前计算tombstone的推币源码app移除时间,确保在clean部分驻留一定时间后移除。清理过程包括构建offset映射,分组段文件并清理合并。
Logcleaner的清理逻辑确保了高效和一致的日志管理,助力Kafka系统稳定运行。
源码解析kafka删除topic
本文以kafka0.8.2.2为例,解析如何删除一个topic以及其背后的关键技术和源码实现过程。
删除一个topic涉及两个关键点:配置删除参数以及执行删除操作。
首先,配置参数`delete.topic.enable`为`True`,这是Broker级别的配置,用于指示kafka是否允许执行topic删除操作。
其次,执行命令`bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name`,此命令指示kafka删除指定的topic。
若未配置`delete.topic.enable`为`True`,topic仅被标记为删除状态,而非立即清除。此时,通常的做法是手动删除Zookeeper中的topic信息和日志,但这仅会清除Zookeeper的数据,并不会真正清除kafkaBroker内存中的topic数据。因此,最佳做法是配置`delete.topic.enable`为`True`,然后重启kafka。
接下来,我们介绍几个关键类和它们在删除topic过程中的作用。
1. **PartitionStateMachine**:该类代表分区的状态机,决定分区的webrtc怎么编译源码当前状态及其转移。状态包括:NonExistentPartition、NewPartition、OnlinePartition、OfflinePartition。
2. **ReplicaManager**:负责管理当前机器的所有副本,处理读写、删除等具体操作。读写操作流程包括获取partition对象,再获取Replica对象,接着获取Log对象,并通过其管理的Segment对象将数据写入、读出。
3. **ReplicaStateMachine**:副本的状态机,决定副本的当前状态和状态之间的转移。状态包括:NewReplica、OnlineReplica、OfflineReplica、ReplicaDeletionStarted、ReplicaDeletionSuccessful、ReplicaDeletionIneligible、NonExistentReplica。
4. **TopicDeletionManager**:管理topic删除的状态机,包括发布删除命令、监听并开始删除topic、以及执行删除操作。
在删除topic的过程中,分为四个阶段:客户端执行删除命令、未配置`delete.topic.enable`的流水、配置了`delete.topic.enable`的canfestival源码详细解析流水、以及手动删除Zookeeper上topic信息和磁盘数据。
客户端执行删除命令时,会在"/admin/delete_topics"目录下创建topicName节点。
未配置`delete.topic.enable`时,topic删除流程涉及监听topic删除命令、判断`delete.topic.enable`状态、标记topic为不可删除、以及队列删除topic任务。
配置了`delete.topic.enable`时,额外步骤包括停止删除topic、检查特定条件、更新删除topic集合、激活删除线程、执行删除操作,如解除分区变动监听、清除内存数据结构、删除副本数据、删除Zookeeper节点信息等。
关于手动删除Zookeeper上topic信息和磁盘数据,通常做法是删除Zookeeper的topic相关信息及磁盘数据,但这可能导致部分内存数据未清除。是否会有隐患,需要进一步测试。
总结而言,kafka的topic删除流程基于Zookeeper实现,通过配置参数、执行命令、管理状态机以及清理相关数据,以实现topic的35级源码配置有序删除。正确配置`delete.topic.enable`并执行删除操作是确保topic完全清除的关键步骤。
SpringBoot系列SpringBoot整合Kafka(含源码)
在现代微服务架构的构建中,消息队列扮演着关键角色,而Apache Kafka凭借其高吞吐量、可扩展性和容错性脱颖而出。本文将深入讲解如何在SpringBoot框架中集成Kafka,以实现实时数据传输和处理。
Kafka是一个开源的流处理平台,由LinkedIn开发,专为大型实时数据流处理应用设计。它基于发布/订阅模式,支持分布式系统中的数据可靠传递,并可与Apache Storm、Hadoop、Spark等集成,应用于日志收集、大规模消息系统、用户活动跟踪、实时数据处理、指标聚合以及事件分发等场景。
在集成SpringBoot和Kafka时,首先需要配置版本依赖。如果遇到如"Error connecting to node"的连接问题,可以尝试修改本地hosts文件,确保正确指定Kafka服务器的IP地址。成功整合后,SpringBoot将允许服务间高效地传递消息,避免消息丢失,极大地简化了开发过程。
完整源码可通过关注公众号"架构殿堂"获取,回复"SpringBoot+Kafka"即可。最后,感谢您的支持和持续关注,"架构殿堂"公众号将不断更新AIGC、Java基础面试题、Netty、Spring Boot、Spring Cloud等实用内容,期待您的持续关注和学习。
kafka源码Topic的创建源码分析(附视频)
关于Kafka Topic创建的源码分析,可以从kafka-topic.sh脚本的入口开始,它执行了kafka.admin.TopicCommand类。在创建Topic时,主要涉及AdminClientTopicService对象的创建和AdminClientClient创建Topics方法的调用,其中Controller负责处理客户端的CreateTopics请求。
服务端的处理逻辑在KafkaRequestHandler.run()方法中,通过apis.handle(request)调用对应接口,如KafkaApis.handleCreateTopicsRequest,这个方法会触发adminManager.createTopics(),创建主题并监控其完成状态。创建的Topic配置和分区副本信息会被写入Zookeeper,如Topic配置和Topic的分区副本分配。
当Controller监听到/brokers/topics/Topic名称的变更后,会触发Broker在磁盘上创建相关Log文件。如果Controller在创建过程中失败,如Controller挂掉,待重新选举后,创建过程会继续,直到Log文件被创建并同步到zk中。
创建Topic时,zk上会创建特定节点,包括主题配置和分区信息。手动添加或删除/brokers/topics/节点将影响Topic的创建和管理。完整参数可通过sh bin/kafka-topic -help查看。
记一次 Kafka 重启失败问题排查
在2月日下午1点左右,收到用户反馈,日志显示kafka集群A主题的分区选举不到leader,导致部分消息发送到该分区时,出现无leader错误。运维在kafka-manager中发现broker0节点处于假死状态,进程仍在运行,尝试多次重启后仍无反应。为了解决问题,运维使用kill命令终止了节点进程,接着尝试重启,但失败了。
kafka版本为0.,其中unclean.leader.election.enable参数默认为false,表示分区不能在ISR以外的副本中选举leader。因为A主题的分区leader副本位于broker0,且另一个副本速度跟不上leader,已被踢出ISR。这导致发送到分区的消息持续报错,且未消费的消息无法继续消费。
查看KafkaServer.log日志,发现Kafka启动过程中生成了大量日志,显示大量主题索引文件损坏,并在重建索引文件。定位到源码中kafka.log.OffsetIndex#sanityCheck方法,该方法用于检查每个logsegment的index文件,确保索引文件的完整性。判断索引文件是否损坏的依据是,entries索引块等于零时,表示索引没有内容,没有损坏;当entries不等于零,需要判断索引文件最后偏移量是否大于索引文件的基偏移量,不大于则认为索引文件损坏,需要重建。
日志显示非正常退出可能导致旧版本中发生此问题。然而,后续版本已经修复了这个问题,修复逻辑是删除损坏的日志文件并重建。接着,查看导致重启失败的错误信息,发现问题出在删除并重建索引过程中。在相关issues中,找到了关于这个问题的描述,表明它很难复现。为了解决问题,升级kafka版本是当务之急。
针对问题,解决思路是确保broker0启动成功,恢复A主题的分区。由于日志和索引文件导致启动失败,只需删除损坏的日志和索引文件并重启即可。但如果分区的日志索引文件也损坏,会导致该分区下未消费的数据丢失,因为此时分区的leader仍在broker0中,但由于broker0挂掉且分区ISR只有leader,导致分区不可用。在数据清空broker0上的leader数据并重启后,Kafka可能会将broker0的副本作为leader,但由于数据被清空,只能将follower数据截断为0,不大于leader的偏移量,这不合理。
提出一个可能的操作,即在分区不可用时,用户可以手动设置分区内的任意一个副本作为leader。后续将对这个问题进行深入分析。
张乘辉,中通科技信息中心技术平台部员工,主要负责消息平台与全链路压测项目的研发。热衷于技术分享,微信公众号「后端进阶」作者,技术博客作者,Seata Contributor,GitHub ID:objcoding。
Kafka消费者源码:重平衡(1)-初始化与FIND_COORDINATOR
在Kafka 2.5.2的消费者组中,重平衡是关键,它定义了消费者如何根据订阅关系调整对Topic分区的分配。当消费者数量、订阅的Topic或GroupCoordinator所在的Broker发生变更时,会触发重平衡。
消费者组状态由GroupState类管理,共有五个状态:Empty(无成员)、PreparingRebalance(加入中)、CompletingRebalance(等待分配)、Stable(已平衡)和Dead(元数据已删除)。状态间的转换基于预先定义的前置状态。例如,从Empty到PreparingRebalance,预示着重平衡的开始。
重平衡过程分为几个步骤,首先是消费者和Broker之间的协调。服务端启动时,GroupCoordinator组件即已就绪,而Consumer通过ConsumerCoordinator与之通信。在启动时,消费者首先会通过FindCoordinatorRequest找到GroupCoordinator,通过最小负载节点发送请求,然后服务端确定哪个Broker负责协调,如groupId的hash值对consumer_offsets分区数取模确定。
一旦找到GroupCoordinator,消费者会发送JoinGroupRequest。后续步骤如SYNC_GROUP和HEARTBEAT确保消费者与协调器保持同步。这部分详细内容在后续的文章中会进一步探讨。