Kafka消费日志查询指南
1. 基于 MessageConsumer 的消费模式
目前基于这种模式的队列有
- 流程-开始节点-子流程定时任务:project_auto_replay
- 流程-环节节点-环节超时自动化:step_agent_auto_inspect
- 流程-当前环节负责人分配-脚本和矩阵事件订阅: subscription_event
1.1 消费进度查询
消费进度可以通过以下日志关键字进行查询:
批次消费开始:
[processBatch][partition:{分区号}][offset:{起始offset}:{结束offset}:{消息数量}:{offsetLag}][start]
分片消费进度:
[progress:{当前分片}/{总分片数}][size:{分片大小}][start]
单条消息消费:
[processSingleMessage:{batch_id}:{message_id}][offset:{offset}][start]
消费完成:
[success][cost:{耗时}s][chunk_offset:{offset}][success]
1.2 消息积压排查
可以通过以下方式排查消息积压:
通过 offsetLag 查看积压情况:
[processBatch][partition:{分区号}][offset:{起始offset}:{结束offset}:{消息数量}:{offsetLag}]
- offsetLag 表示当前消费位点距离最新消息的差值,数值越大表示积压越严重
通过消费耗时分析:
[success][cost:{耗时}s]
- 如果单条消息消费耗时过长,可能导致积压
长耗时告警日志:
[long time cost][cost:{耗时}s]
- 当消息处理时间超过配置的 sessionTimeout 时会打印此日志
消费异常:
[error][retryStrategy:{重试策略}][message_id:{消息ID}][over_limit:{是否超过重试限制}][retry:{当前重试次数}:{最大重试次数}]
1.3 常见积压原因分析
消费性能问题:
- 单条消息处理耗时过长
- 并发处理数量配置过低
- 外部依赖服务响应慢
消费异常:
- 消息处理失败导致重试
- 消息格式错误
- 业务逻辑异常
系统资源问题:
- 消费者节点CPU/内存使用率过高
- 网络延迟
- 外部依赖服务不可用
2. 基于 KafkaConsumer 的原生消费模式
目前基于这种消费模式的队列有
- 流程-环节节点-自动创建子流程: auto_create_flow_sub_project_list_task
- 流程事件任务: flow_event_list_task
- 用户事件任务: user_event_list_task
- 项目自动创建任务: auto_create_project_task
- ai环节流转: plugin_urgent_reminder_flow_task
2.1 消费进度查询
可通过以下日志关键字查询:
[kafka.consumer:eatch batch start][topic:{主题}][partition:{分区号}][isRunning:{是否运行中}][isStale:{是否过期}][message length:{消息数量}][highWatermark:{最高水位}][firstOffset:{首个偏移量}][lastOffset:{最后偏移量}]
2.2 消息积压排查
批次消息处理:
[kafka.consumer][topic:{主题}][partition:{分区号}][message length:{消息数量}][highWatermark:{最高水位}][firstOffset:{首个偏移量}][lastOffset:{最后偏移量}]
消费耗时统计:
[kafka.consumer:each batch cost][topic:{主题}][length:{消息数量}][firstOffset:{首个偏移量}][lastOffset:{最后偏移量}][sessionTimeout:{超时时间}][cost:{耗时}s]
3. 实际应用示例 - project_auto_replay
3.1 消费进度查询
[创建子项目开始][batch_id:{批次ID}]
[创建子项目完成][batch_id:{批次ID}][msg_id:{消息ID}][耗时:{耗时}s]
3.2 异常排查
[创建子项目失败][batch_id:{批次ID}][msg_id:{消息ID}][error:{错误信息}]
4. 通用最佳实践
日志查询技巧:
- 使用 batch_id 关联同一批次的消息
- 使用 message_id 追踪单条消息的完整生命周期
- 结合 cost 字段分析性能瓶颈
监控指标:
- offsetLag:消息积压程度
- 消费耗时:处理性能
- 错误率:系统稳定性
- 重试次数:业务异常情况
告警配置建议:
- offsetLag 超过阈值
- 单条消息处理时间超过 sessionTimeout
- 错误率超过阈值
- 重试次数过多
排查步骤:
- 查看 offsetLag 确认积压情况
- 分析消费耗时找出性能瓶颈
- 检查错误日志定位异常原因
- 评估系统资源使用情况
- 分析外部依赖服务状态
5. 飞书组织架构同步消费模式
5.1 消费进度查询
飞书组织架构同步的日志包含以下几个维度:
- 消费批次信息:
[飞书KAFKA组织架构同步:eachBatch][pid:{进程ID}][firstOffset:{起始offset}][lastOffset:{结束offset}][lag:{积压数}][feishuEvents:{飞书事件数量}][highWatermark:{最高水位}][partition:{分区号}][message_length:{消息数量}]
- 飞书事件处理:
[飞书组织架构同步-消息同步][oneByOne][i:{当前处理序号}/{总事件数}][length:{批次总长度}]
- 批次处理完成:
[飞书KAFKA组织架构同步:eachBatchEnd][pid:{进程ID}][firstOffset:{起始offset}][lastOffset:{结束offset}]
5.2 消息积压排查
- 通过 lag 值查看积压情况:
[lag:{当前积压数}] // lag = highWatermark - lastOffset
- 事件过滤统计:
[飞书组织架构同步-消息同步][oneByOne][filterEventsLength:{过滤后事件数}][length:{原始事件数}]
- 异常日志:
[飞书消息同步失败][i:{序号}][offset:{偏移量}][message:{错误信息}][data:{消息数据}][firstOffset:{起始offset}][lastOffset:{结束offset}]
5.3 特殊处理逻辑
- 用户数据去重:
- 按照 schema、app_id、union_id 作为唯一键
- 只保留同一用户最新的一条数据
- 部门数据去重:
- 按照 schema、app_id、department_id 作为唯一键
- 只保留同一部门最新的一条数据
- 茶百道特殊处理:
- 部门数据需要去掉 name 字段
- 部门 ID 需要添加 'D' 前缀
- 基本用户模型中的 leader 字段需要转换到 partner_leader_id 字段
5.4 常见问题排查
- 数据同步问题:
[非法的用户信息][offset:{偏移量}][identifier:{标识符}][groupMessage:{消息内容}]
[非法的团队信息][offset:{偏移量}][identifier:{标识符}][groupMessage:{消息内容}]
- 消费者启动问题:
[飞书组织架构同步: getSingleKafkaConsumer][msg:{错误信息}]
[飞书组织架构同步:未找到的consumer][consumerConfig:{配置信息}]
- 消息处理问题:
[飞书组织架构同步][eachBatchErroMessage: {错误信息}][firstOffset:{起始offset}][lastOffset:{结束offset}]
5.5 监控建议
- 关键监控指标:
- lag 值监控:反映消息积压情况
- 事件过滤率:filterEventsLength/length 比率
- 错误率:失败事件数/总事件数
- 处理耗时:每批次处理时间
- 告警设置:
- lag 值超过阈值
- 事件过滤率异常(过高或过低)
- 错误率超过阈值
- 批次处理超时
- 运维建议:
- 定期检查消费者状态
- 监控数据同步一致性
- 关注特殊处理逻辑的执行情况
- 定期清理过期的消费者组和主题