找回密码
 立即注册
首页 业界区 业界 Influxdb订阅与kapacitor使用梳理

Influxdb订阅与kapacitor使用梳理

费卿月 2025-8-8 09:18:41
转载请注明出处:
一、订阅功能的核心作用

InfluxDB 的订阅是一种 数据自动推送机制,当指定数据库的写入操作发生时,InfluxDB 会 实时复制数据 并推送到预先配置的端点(如 Kapacitor)。
类比说明:


  • 类似于 MySQL 的 Binlog 复制
  • 或 Kafka 的 Producer-Consumer 模型
二、订阅的工作原理

         
1.png


    • 数据路径
      写入请求 → InfluxDB存储引擎 → 订阅分发器 → HTTP推送 → 接收端
    • 协议支持

      • 默认使用 HTTP 协议(可配置 HTTPS)
      • 数据格式与 InfluxDB 行协议(Line Protocol)一致


三、订阅的配置方法

1. 创建订阅
  1. -- 基本语法
  2. CREATE SUBSCRIPTION "<订阅名称>"
  3. ON "<数据库>"."<保留策略>"
  4. DESTINATIONS  "<协议>://<主机>:<端口>/<路径>"
  5. -- 实际示例(推送到Kapacitor)
  6. CREATE SUBSCRIPTION "kapacitor-sub"
  7. ON "monitor"."rp30"
  8. DESTINATIONS ALL 'http://kapacitor:9092'
复制代码
2. 参数说明

参数说明示例值ALL发送到所有目标ALL 或 ANYANY发送到任意一个可用目标 协议支持 http/https/udphttp路径Kapacitor 需使用 /write/kapacitor/v1/write四、订阅的管理与查看

1. 查看所有订阅
  1. -- 查看特定数据库的订阅
  2. SHOW SUBSCRIPTIONS ON "monitor"
  3. -- 输出示例:
  4. name: monitor
  5. retention_policy name            mode destinations
  6. --------------- ----            ---- ------------
  7. rp30           kapacitor-sub    ALL  [http://kapacitor:9092]
复制代码
2. 删除订阅
  1. DROP SUBSCRIPTION "kapacitor-sub" ON "monitor"."rp30"
复制代码
3. 查看订阅状态(需监控端点)
  1. kapacitor stats ingress
复制代码
                     
2.png

 
五、订阅的核心特点

1. 实时性


  • 数据写入 InfluxDB 后 毫秒级 推送到订阅端
  • 对比查询拉取模式,延迟降低 90% 以上
2. 可靠性

保障机制说明重试机制默认重试 3 次(可配置)离线缓存网络中断时缓存 1000 条数据(默认)数据去重通过 UUID 避免重复推送六、Kapacitor日志分析数据写入

 查看kapacitor得日志:
              
3.png

(1)数据来源


  • InfluxDB 订阅推送:

    • InfluxDB 的 monitor.rp30 数据通过 HTTP POST 推送到 Kapacitor 的 /write 端点。
    • 触发条件:InfluxDB 的 CREATE SUBSCRIPTION 配置生效。

(2)数据内容


  • 数据库:monitor
  • 保留策略:rp30
  • 时间精度:ns(纳秒级时间戳)
  • 一致性级别:未指定(默认 all)
(3)Kapacitor 处理


  • 成功接收(status=204):

    • Kapacitor 正确接收数据,未返回内容(204 No Content)。

七、Kapacitor命令分析

命令作用与 ingress 的关联性kapacitor stats general查看任务处理状态若 ingress 有数据但任务无处理,需检查任务逻辑kapacitor stats egress查看数据输出(如HTTP告警发送)确认数据是否被正确处理并转发influx -execute "SHOW STATS"查看InfluxDB推送统计对比InfluxDB发送量与Kapacitor接收量(1)kapacitor stats egress

  典型输出示例:
  1. Database   Retention Policy Measurement Points Received
  2. ---------  --------------- ----------- ---------------
  3. monitor    rp30            cpu         1500
  4. _kapacitor autogen         edges       39451
复制代码
字段说明Database数据来源的数据库名(如 monitor)Retention Policy数据所属的保留策略(如 rp30)Measurement指标名称(如 cpu)Points Received累计接收的数据点数(持续增长表示数据正常流动)                                
4.png

(2)kapacitor stats general
  1. root@kapacitor:/var/log/kapacitor# kapacitor stats  general
  2. ClusterID:                    183a5dd5-458f-4923-8c7c-d1951e1da259
  3. ServerID:                     675c36aa-e959-4a46-8713-cbe86346b01c
  4. Host:                         kapacitor
  5. <strong>Tasks:                        </strong><strong>16
  6. Enabled Tasks:                16
  7. Subscriptions:                4</strong>
  8. Platform:                     OSS
  9. Version:                      1.5.9
  10. root@kapacitor:/var/log/kapacitor#
复制代码
  可以查看订阅任务得数量

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册