BPMAXBPMAX
  • 快速入门
  • 核心概念
  • 管理员手册
  • 仿真和回放
  • 流程相关脚本
  • 表单相关脚本
  • 数据集相关脚本
  • 界面相关脚本
  • 系统相关脚本
  • 流程集成
  • 数据集
  • OpenAPI
  • 实体列表
  • 插件开发
  • 日志排查
  • 飞书平台

    • 同步组织架构
    • 同步团队组织架构
    • 一键拉群
    • 高级卡片消息
    • 服务台能力
  • 实用功能

    • 系统公告
    • 项目日历
    • 超时自动化
    • 报告自动生成
  • 文档更新记录
  • 系统更新说明
  • 快速入门
  • 核心概念
  • 管理员手册
  • 仿真和回放
  • 流程相关脚本
  • 表单相关脚本
  • 数据集相关脚本
  • 界面相关脚本
  • 系统相关脚本
  • 流程集成
  • 数据集
  • OpenAPI
  • 实体列表
  • 插件开发
  • 日志排查
  • 飞书平台

    • 同步组织架构
    • 同步团队组织架构
    • 一键拉群
    • 高级卡片消息
    • 服务台能力
  • 实用功能

    • 系统公告
    • 项目日历
    • 超时自动化
    • 报告自动生成
  • 文档更新记录
  • 系统更新说明
  • 插件开发

    • Kafka消费日志查询指南

Kafka消费日志查询指南

1. 基于 MessageConsumer 的消费模式

目前基于这种模式的队列有

  • 流程-开始节点-子流程定时任务:project_auto_replay
  • 流程-环节节点-环节超时自动化:step_agent_auto_inspect
  • 流程-当前环节负责人分配-脚本和矩阵事件订阅: subscription_event

1.1 消费进度查询

消费进度可以通过以下日志关键字进行查询:

  • 批次消费开始:

    [processBatch][partition:{分区号}][offset:{起始offset}:{结束offset}:{消息数量}:{offsetLag}][start]
  • 分片消费进度:

    [progress:{当前分片}/{总分片数}][size:{分片大小}][start]
  • 单条消息消费:

    [processSingleMessage:{batch_id}:{message_id}][offset:{offset}][start]
  • 消费完成:

    [success][cost:{耗时}s][chunk_offset:{offset}][success]

1.2 消息积压排查

可以通过以下方式排查消息积压:

  1. 通过 offsetLag 查看积压情况:

    [processBatch][partition:{分区号}][offset:{起始offset}:{结束offset}:{消息数量}:{offsetLag}]
    • offsetLag 表示当前消费位点距离最新消息的差值,数值越大表示积压越严重
  2. 通过消费耗时分析:

    [success][cost:{耗时}s]
    • 如果单条消息消费耗时过长,可能导致积压
  3. 长耗时告警日志:

    [long time cost][cost:{耗时}s]
    • 当消息处理时间超过配置的 sessionTimeout 时会打印此日志
  4. 消费异常:

    [error][retryStrategy:{重试策略}][message_id:{消息ID}][over_limit:{是否超过重试限制}][retry:{当前重试次数}:{最大重试次数}]

1.3 常见积压原因分析

  1. 消费性能问题:

    • 单条消息处理耗时过长
    • 并发处理数量配置过低
    • 外部依赖服务响应慢
  2. 消费异常:

    • 消息处理失败导致重试
    • 消息格式错误
    • 业务逻辑异常
  3. 系统资源问题:

    • 消费者节点CPU/内存使用率过高
    • 网络延迟
    • 外部依赖服务不可用

2. 基于 KafkaConsumer 的原生消费模式

目前基于这种消费模式的队列有

  • 流程-环节节点-自动创建子流程: auto_create_flow_sub_project_list_task
  • 流程事件任务: flow_event_list_task
  • 用户事件任务: user_event_list_task
  • 项目自动创建任务: auto_create_project_task
  • ai环节流转: plugin_urgent_reminder_flow_task

2.1 消费进度查询

可通过以下日志关键字查询:

[kafka.consumer:eatch batch start][topic:{主题}][partition:{分区号}][isRunning:{是否运行中}][isStale:{是否过期}][message length:{消息数量}][highWatermark:{最高水位}][firstOffset:{首个偏移量}][lastOffset:{最后偏移量}]

2.2 消息积压排查

  1. 批次消息处理:

    [kafka.consumer][topic:{主题}][partition:{分区号}][message length:{消息数量}][highWatermark:{最高水位}][firstOffset:{首个偏移量}][lastOffset:{最后偏移量}]
  2. 消费耗时统计:

    [kafka.consumer:each batch cost][topic:{主题}][length:{消息数量}][firstOffset:{首个偏移量}][lastOffset:{最后偏移量}][sessionTimeout:{超时时间}][cost:{耗时}s]

3. 实际应用示例 - project_auto_replay

3.1 消费进度查询

[创建子项目开始][batch_id:{批次ID}]
[创建子项目完成][batch_id:{批次ID}][msg_id:{消息ID}][耗时:{耗时}s]

3.2 异常排查

[创建子项目失败][batch_id:{批次ID}][msg_id:{消息ID}][error:{错误信息}]

4. 通用最佳实践

  1. 日志查询技巧:

    • 使用 batch_id 关联同一批次的消息
    • 使用 message_id 追踪单条消息的完整生命周期
    • 结合 cost 字段分析性能瓶颈
  2. 监控指标:

    • offsetLag:消息积压程度
    • 消费耗时:处理性能
    • 错误率:系统稳定性
    • 重试次数:业务异常情况
  3. 告警配置建议:

    • offsetLag 超过阈值
    • 单条消息处理时间超过 sessionTimeout
    • 错误率超过阈值
    • 重试次数过多
  4. 排查步骤:

    1. 查看 offsetLag 确认积压情况
    2. 分析消费耗时找出性能瓶颈
    3. 检查错误日志定位异常原因
    4. 评估系统资源使用情况
    5. 分析外部依赖服务状态

5. 飞书组织架构同步消费模式

5.1 消费进度查询

飞书组织架构同步的日志包含以下几个维度:

  1. 消费批次信息:
[飞书KAFKA组织架构同步:eachBatch][pid:{进程ID}][firstOffset:{起始offset}][lastOffset:{结束offset}][lag:{积压数}][feishuEvents:{飞书事件数量}][highWatermark:{最高水位}][partition:{分区号}][message_length:{消息数量}]
  1. 飞书事件处理:
[飞书组织架构同步-消息同步][oneByOne][i:{当前处理序号}/{总事件数}][length:{批次总长度}]
  1. 批次处理完成:
[飞书KAFKA组织架构同步:eachBatchEnd][pid:{进程ID}][firstOffset:{起始offset}][lastOffset:{结束offset}]

5.2 消息积压排查

  1. 通过 lag 值查看积压情况:
[lag:{当前积压数}] // lag = highWatermark - lastOffset
  1. 事件过滤统计:
[飞书组织架构同步-消息同步][oneByOne][filterEventsLength:{过滤后事件数}][length:{原始事件数}]
  1. 异常日志:
[飞书消息同步失败][i:{序号}][offset:{偏移量}][message:{错误信息}][data:{消息数据}][firstOffset:{起始offset}][lastOffset:{结束offset}]

5.3 特殊处理逻辑

  1. 用户数据去重:
  • 按照 schema、app_id、union_id 作为唯一键
  • 只保留同一用户最新的一条数据
  1. 部门数据去重:
  • 按照 schema、app_id、department_id 作为唯一键
  • 只保留同一部门最新的一条数据
  1. 茶百道特殊处理:
  • 部门数据需要去掉 name 字段
  • 部门 ID 需要添加 'D' 前缀
  • 基本用户模型中的 leader 字段需要转换到 partner_leader_id 字段

5.4 常见问题排查

  1. 数据同步问题:
[非法的用户信息][offset:{偏移量}][identifier:{标识符}][groupMessage:{消息内容}]
[非法的团队信息][offset:{偏移量}][identifier:{标识符}][groupMessage:{消息内容}]
  1. 消费者启动问题:
[飞书组织架构同步: getSingleKafkaConsumer][msg:{错误信息}]
[飞书组织架构同步:未找到的consumer][consumerConfig:{配置信息}]
  1. 消息处理问题:
[飞书组织架构同步][eachBatchErroMessage: {错误信息}][firstOffset:{起始offset}][lastOffset:{结束offset}]

5.5 监控建议

  1. 关键监控指标:
  • lag 值监控:反映消息积压情况
  • 事件过滤率:filterEventsLength/length 比率
  • 错误率:失败事件数/总事件数
  • 处理耗时:每批次处理时间
  1. 告警设置:
  • lag 值超过阈值
  • 事件过滤率异常(过高或过低)
  • 错误率超过阈值
  • 批次处理超时
  1. 运维建议:
  • 定期检查消费者状态
  • 监控数据同步一致性
  • 关注特殊处理逻辑的执行情况
  • 定期清理过期的消费者组和主题