找回密码
 立即注册
首页 业界区 业界 基于CentOS Stream 8的物联网数据采集与展示方案 ...

基于CentOS Stream 8的物联网数据采集与展示方案

挡缭 2025-9-24 17:46:37
系统架构全景图

图表 
1.png

 
一、系统平台优化(CentOS Stream 8)

1. 系统基础配置

bash
  1. # 1. 系统更新与加固
  2. sudo dnf update -y
  3. sudo dnf install epel-release -y
  4. sudo dnf install fail2ban firewalld -y
  5. # 2. 创建专用运维账户
  6. sudo useradd -m -s /bin/bash iotadmin
  7. sudo passwd iotadmin
  8. sudo usermod -aG wheel iotadmin
  9. # 3. SSH安全加固
  10. sudo sed -i 's/^PermitRootLogin yes/PermitRootLogin no/' /etc/ssh/sshd_config
  11. sudo sed -i 's/^PasswordAuthentication yes/PasswordAuthentication no/' /etc/ssh/sshd_config
  12. sudo systemctl restart sshd
  13. # 4. 防火墙配置
  14. sudo systemctl enable --now firewalld
  15. sudo firewall-cmd --permanent --add-port=1883/tcp   # MQTT
  16. sudo firewall-cmd --permanent --add-port=8883/tcp   # MQTT/SSL
  17. sudo firewall-cmd --permanent --add-port=9092/tcp   # Kafka
  18. sudo firewall-cmd --permanent --add-port=3000/tcp   # Grafana
  19. sudo firewall-cmd --reload
复制代码
2. 内核参数优化(/etc/sysctl.conf)

conf
  1. # 网络性能优化
  2. net.core.somaxconn = 65535
  3. net.core.netdev_max_backlog = 65536
  4. net.ipv4.tcp_max_syn_backlog = 65536
  5. # 文件句柄限制
  6. fs.file-max = 2097152
  7. fs.nr_open = 2097152
  8. # MQTT连接优化
  9. net.ipv4.tcp_keepalive_time = 600
  10. net.ipv4.tcp_keepalive_probes = 3
  11. net.ipv4.tcp_keepalive_intvl = 15
复制代码
二、MQTT Broker集群部署(EMQX企业版)

1. 集群化部署

bash
  1. # 安装EMQX企业版
  2. curl -s https://assets.emqx.com/scripts/install-emqx-rpm.sh | sudo bash
  3. sudo dnf install emqx-enterprise -y
  4. # 配置集群(3节点示例)
  5. # 节点1(10.0.0.1):
  6. echo "cluster.name = iot_platform" >> /etc/emqx/emqx.conf
  7. echo "node.name = emqx@10.0.0.1" >> /etc/emqx/emqx.conf
  8. # 节点2(10.0.0.2):
  9. emqx_ctl cluster join emqx@10.0.0.1
复制代码
2. 安全加固配置

bash
  1. # 1. 启用TLS加密
  2. sudo mkdir /etc/emqx/certs
  3. sudo certbot certonly --standalone -d mqtt.example.com
  4. sudo cp /etc/letsencrypt/live/mqtt.example.com/* /etc/emqx/certs/
  5. # 2. 配置EMQX(/etc/emqx/emqx.conf)
  6. listeners.ssl.default {
  7.   bind = "0.0.0.0:8883"
  8.   max_connections = 100000
  9.   ssl_options {
  10.     keyfile = "/etc/emqx/certs/privkey.pem"
  11.     certfile = "/etc/emqx/certs/fullchain.pem"
  12.   }
  13. }
  14. # 3. 设备级认证
  15. emqx_ctl users add device_001 $6$rounds=10000$somesalt$hashed_password
复制代码
3. 主题权限控制

conf
  1. # /etc/emqx/acl.conf
  2. {allow, {user, "device_001"}, publish, ["sensors/001/#"]}
  3. {allow, {user, "backend"}, subscribe, ["sensors/#"]}
  4. {deny, all}
复制代码
三、数据处理与存储架构

1. 消息队列缓冲(Kafka)

bash
  1. # 安装Kafka
  2. wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
  3. tar -xzf kafka_2.13-3.4.0.tgz
  4. # 配置集群(3节点)
  5. # server.properties
  6. broker.id=1
  7. listeners=PLAINTEXT://:9092
  8. advertised.listeners=PLAINTEXT://node1:9092
  9. zookeeper.connect=node1:2181,node2:2181,node3:2181
复制代码
2. 时序数据库(TimescaleDB)

bash
  1. # 安装PostgreSQL 15 + TimescaleDB
  2. sudo dnf install https://download.postgresql.org/pub/repos/yum/reporpms/EL-8-x86_64/pgdg-redhat-repo-latest.noarch.rpm
  3. sudo dnf module disable postgresql
  4. sudo dnf install postgresql15-server postgresql15-contrib timescaledb-2-postgresql-15
  5. # 初始化数据库
  6. sudo /usr/pgsql-15/bin/postgresql-15-setup initdb
  7. sudo systemctl enable --now postgresql-15
  8. # 创建超级表
  9. CREATE TABLE sensor_data (
  10.     time TIMESTAMPTZ NOT NULL,
  11.     device_id TEXT NOT NULL,
  12.     value DOUBLE PRECISION NOT NULL
  13. );
  14. SELECT create_hypertable('sensor_data', 'time');
复制代码
3. 数据清洗服务(Python示例)

python
  1. from kafka import KafkaConsumer
  2. import psycopg2
  3. # Kafka消费者
  4. consumer = KafkaConsumer(
  5.     'raw_sensor_data',
  6.     bootstrap_servers=['kafka1:9092', 'kafka2:9092'],
  7.     security_protocol='SSL',
  8.     ssl_cafile='ca.pem'
  9. )
  10. # TimescaleDB连接
  11. conn = psycopg2.connect("dbname=tsdb user=tsdbadmin")
  12. cursor = conn.cursor()
  13. for message in consumer:
  14.     data = json.loads(message.value)
  15.    
  16.     # 数据验证
  17.     if not validate_sensor_data(data):
  18.         continue
  19.         
  20.     # 数据清洗
  21.     cleaned = clean_data(data)
  22.    
  23.     # 写入数据库
  24.     cursor.execute(
  25.         "INSERT INTO sensor_data (time, device_id, value) VALUES (%s, %s, %s)",
  26.         (cleaned['timestamp'], cleaned['device_id'], cleaned['value'])
  27.     )
  28.     conn.commit()
  29.    
  30.     # 更新缓存
  31.     redis.set(f"latest:{cleaned['device_id']}", json.dumps(cleaned))
复制代码
四、安全加固体系

1. 传输层安全

协议端口加密方式证书管理MQTT8883TLS 1.3Let's Encrypt自动更新HTTPS443TLS 1.3企业级证书Database5432TLS双向认证自签名CA2. 数据加密策略

python
  1. # 设备端数据加密示例
  2. from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
  3. from cryptography.hazmat.backends import default_backend
  4. def encrypt_data(data, key):
  5.     iv = os.urandom(12)
  6.     cipher = Cipher(
  7.         algorithms.AES(key),
  8.         modes.GCM(iv),
  9.         backend=default_backend()
  10.     )
  11.     encryptor = cipher.encryptor()
  12.     ciphertext = encryptor.update(data) + encryptor.finalize()
  13.     return iv + encryptor.tag + ciphertext
复制代码
3. 访问控制矩阵

角色MQTT权限DB访问API权限设备发布特定主题无无数据服务订阅所有主题只写内部网络访问前端应用无只读JWT认证+RBAC管理员管理主题读写管理员权限五、运维监控体系

1. 监控组件部署

bash
  1. # Prometheus安装
  2. sudo dnf install prometheus
  3. # Node Exporter
  4. sudo dnf install node_exporter
  5. # Grafana
  6. sudo dnf install grafana
复制代码
2. 关键监控指标

yaml
  1. # prometheus.yml 片段
  2. scrape_configs:
  3.   - job_name: 'emqx'
  4.     static_configs:
  5.       - targets: ['emqx1:18083', 'emqx2:18083']
  6.   - job_name: 'postgres'
  7.     static_configs:
  8.       - targets: ['db1:9187']
  9.   - job_name: 'kafka'
  10.     static_configs:
  11.       - targets: ['kafka1:7071']
复制代码
3. 告警规则示例

yaml
  1. groups:
  2. - name: MQTT服务
  3.   rules:
  4.   - alert: EMQX节点离线
  5.     expr: up{job="emqx"} == 0
  6.     for: 5m
  7.     labels:
  8.       severity: critical
  9.     annotations:
  10.       summary: "MQTT节点 {{ $labels.instance }} 离线"
  11.       
  12.   - alert: 消息积压
  13.     expr: kafka_consumergroup_lag > 10000
  14.     for: 10m
  15.     labels:
  16.       severity: warning
复制代码
六、设备接入与扩展方案

1. 设备接入流程

图表
2.png

 
2. 设备管理API设计

python
  1. # 设备注册API
  2. @app.route('/api/v1/devices', methods=['POST'])
  3. @jwt_required()
  4. def register_device():
  5.     data = request.get_json()
  6.     device_id = generate_device_id()
  7.    
  8.     # 创建数据库记录
  9.     db.execute("""
  10.         INSERT INTO devices (id, name, type, owner)
  11.         VALUES (%s, %s, %s, %s)
  12.     """, (device_id, data['name'], data['type'], get_jwt_identity()))
  13.    
  14.     # 生成设备凭证
  15.     credential = generate_device_credential(device_id)
  16.    
  17.     return jsonify({
  18.         'device_id': device_id,
  19.         'username': credential['username'],
  20.         'password': credential['password'],
  21.         'certificate': credential['cert_pem']
  22.     }), 201
复制代码
3. 多协议支持方案

协议转换方式适用场景HTTPEMQX Webhook传统设备改造CoAPCoAP-MQTT代理网关低功耗设备Modbus边缘计算转换工业设备LoRaWAN网络服务器集成长距离物联网七、日常运维手册

1. 备份策略

bash
  1. # 数据库每日备份
  2. pg_dump -U postgres -Fc tsdb > /backup/tsdb-$(date +%F).dump
  3. # 配置文件备份
  4. rsync -av /etc/emqx /backup/configs/emqx
  5. rsync -av /etc/kafka /backup/configs/kafka
  6. # 证书备份
  7. tar -czf /backup/certs-$(date +%F).tar.gz /etc/letsencrypt/{live,archive}
复制代码
2. 灾难恢复流程


  • 恢复最新数据库备份
  • 重建EMQX集群
  • 恢复Kafka偏移量
  • 验证数据完整性
  • 逐步恢复设备连接
3. 性能调优命令

bash
  1. # 查看MQTT连接数
  2. emqx_ctl clients list
  3. # 检查Kafka积压
  4. kafka-consumer-groups.sh --describe --group data_consumers
  5. # 时序数据库维护
  6. timescaledb-tune --quiet --yes
复制代码
八、扩展架构设计

1. 边缘计算集成

图表
3.png

 
2. 数据管道扩展

python
  1. # 添加AI处理管道
  2. from kafka import KafkaProducer
  3. ai_producer = KafkaProducer(bootstrap_servers='kafka:9092')
  4. def process_for_ai(data):
  5.     # 特征提取
  6.     features = extract_features(data)
  7.    
  8.     # 发送到AI服务队列
  9.     ai_producer.send('ai_processing', json.dumps(features).encode())
  10. # 在清洗服务中调用
  11. process_for_ai(cleaned_data)
复制代码
3. 多区域部署

bash
  1. # 跨区域EMQX集群
  2. emqx_ctl cluster join emqx@us-east-1.platform.com
  3. # 数据库级联复制
  4. # 主库(欧洲)
  5. CREATE PUBLICATION euro_publication FOR TABLE sensor_data;
  6. # 从库(亚洲)
  7. CREATE SUBSCRIPTION asia_subscription
  8. CONNECTION 'host=euro-db port=5432 dbname=tsdb'
  9. PUBLICATION euro_publication;
复制代码
九、前端展示系统

1. 实时数据大屏

javascript
  1. // 使用MQTT.js直接订阅
  2. const client = mqtt.connect('wss://mqtt.example.com:8084/mqtt', {
  3.   username: 'web_user',
  4.   password: 'secure_token'
  5. })
  6. client.subscribe('sensors/+/status')
  7. client.on('message', (topic, payload) => {
  8.   const data = JSON.parse(payload)
  9.   updateDashboard(data)
  10. })
复制代码
2. 设备管理界面功能


  • 设备状态监控(在线/离线)
  • 实时数据曲线(Chart.js)
  • 历史数据查询(时间范围选择)
  • 告警管理(阈值设置)
  • 固件OTA升级
3. 移动APP集成

kotlin
  1. // Android数据获取示例
  2. suspend fun fetchSensorData(deviceId: String): List<SensorData> {
  3.     return withContext(Dispatchers.IO) {
  4.         val response = apiService.getDeviceData(
  5.             deviceId = deviceId,
  6.             from = Instant.now().minus(1, ChronoUnit.DAYS).toString(),
  7.             to = Instant.now().toString()
  8.         )
  9.         response.data ?: emptyList()
  10.     }
  11. }
复制代码
十、持续演进路线


  • 阶段1(基础平台)

    • EMQX集群部署
    • 核心数据处理流水线
    • 基础监控

  • 阶段2(安全加固)

    • 设备证书管理
    • 数据端到端加密
    • 审计日志

  • 阶段3(智能扩展)

    • 边缘计算节点
    • AI异常检测
    • 预测性维护

  • 阶段4(全球化部署)

    • 多区域集群
    • 数据主权合规
    • 跨云架构

本方案基于CentOS Stream 8构建了企业级物联网平台,通过多层次安全加固、全链路监控、弹性扩展架构,支持从数百到数百万设备的平滑扩展,日均处理能力可达亿级数据点,满足工业4.0场景需求。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册