找回密码
 立即注册
首页 业界区 安全 InfluxDB 订阅(Subscription)

InfluxDB 订阅(Subscription)

岭猿 2025-8-7 20:53:38
转载请注明出处:
一、InfluxDB 订阅推送机制

1. 推送原理



    • 触发条件:当数据写入指定数据库的指定保留策略(RP)时
    • 推送方式:通过 HTTP/HTTPS 协议以 POST 请求发送 原始行协议数据
    • 数据格式:与直接写入 InfluxDB 的格式完全相同
      示例:cpu,host=server1 usage=75 1625097600000000000

2. 推送流程

                        
1.png

二、完整配置示例

1. 在 InfluxDB 中配置订阅
  1. # 进入InfluxDB命令行
  2. influx
  3. # 创建订阅(推送到Kapacitor)
  4. CREATE SUBSCRIPTION "kapacitor-alerts"
  5. ON "monitor"."rp30"
  6. DESTINATIONS ALL 'http://kapacitor:9092/write'
  7. # 创建订阅(推送到另一个InfluxDB)
  8. CREATE SUBSCRIPTION "influxdb-backup"
  9. ON "monitor"."rp30"
  10. DESTINATIONS ALL 'http://backup-influxdb:8086/write?db=monitor'
复制代码
     查看订阅:
                        
2.png

2. InfluxDB 配置文件优化 (/etc/influxdb/influxdb.conf)
  1. [subscriber]
  2.   enabled = true
  3.   http-timeout = "30s"       # 推送超时时间
  4.   insecure-skip-verify = true # 跳过HTTPS证书验证(测试用)
  5.   buffer-size = 2000         # 网络中断时的缓存条数
复制代码
三、接收端配置示例

1. Kapacitor 接收配置

  Kapacitor 无需特殊配置,默认在 9092 端口提供 /write 端点接收数据。
2. 自定义HTTP服务接收(Python示例)
  1. from flask import Flask, request
  2. app = Flask(__name__)
  3. @app.route('/write', methods=['POST'])
  4. def handle_data():
  5.     print(f"Received data: {request.data.decode()}")
  6.     return '', 204  # 必须返回204
  7. if __name__ == '__main__':
  8.     app.run(host='0.0.0.0', port=8080)
复制代码
四、验证推送是否成功

1. 检查订阅状态
  1. # 查看所有订阅
  2. influx -execute "SHOW SUBSCRIPTIONS"
  3. # 输出示例:
  4. name: monitor
  5. retention_policy name            mode destinations
  6. --------------- ----            ---- ------------
  7. rp30           kapacitor-alerts ALL  [http://kapacitor:9092/write]
复制代码
2. 监控推送日志

3. 接收端验证

  Kapacitor 端: 
  1. # 查看接收统计
  2. kapacitor stats ingress
  3. # 输出示例:
  4. Database   Retention Policy Measurement Points Received
  5. monitor    rp30            cpu              1500
复制代码
  自定义HTTP服务端:
    直接查看程序输出的接收日志。
五、故障排查命令

1. 测试推送手动触发
  1. # 生成测试数据
  2. echo 'test_metric,host=test1 value=1 $(date +%s%N)' | \
  3. curl -XPOST 'http://influxdb:8086/write?db=monitor&rp=rp30' --data-binary @-
  4. # 检查是否推送(InfluxDB端)
  5. journalctl -u influxdb | grep -A 5 "test_metric"
复制代码
2. 网络连通性测试
  1. # 从InfluxDB容器测试Kapacitor连通性
  2. docker exec -it influxdb curl -v -X POST http://kapacitor:9092/write -d "test value=1"
复制代码
3. 查看失败记录
  1. # 查看未推送的缓存数据
  2. influx -execute "SHOW DIAGNOSTICS"
复制代码
六、高级配置技巧

1. 调试模式启用
  1. # 在InfluxDB配置中增加调试日志
  2. [logging]
  3.   level = "debug"
  4.   subscriber = true
复制代码
七、关键注意事项


  • 数据一致性

    • 订阅推送是 至少一次(at-least-once) 语义
    • 网络中断可能导致重复数据

  • 性能影响

    • 每个订阅目标会增加约 5% 的 CPU 开销
    • 建议订阅目标不超过 3 个

  • 版本兼容性

    • InfluxDB 1.x 和 2.x 的订阅机制不同
    • Kapacitor 1.6+ 版本需使用 /write 端点


来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册