挡缭 发表于 2025-9-24 17:46:37

基于CentOS Stream 8的物联网数据采集与展示方案

系统架构全景图

图表 
 
一、系统平台优化(CentOS Stream 8)

1. 系统基础配置

bash# 1. 系统更新与加固
sudo dnf update -y
sudo dnf install epel-release -y
sudo dnf install fail2ban firewalld -y

# 2. 创建专用运维账户
sudo useradd -m -s /bin/bash iotadmin
sudo passwd iotadmin
sudo usermod -aG wheel iotadmin

# 3. SSH安全加固
sudo sed -i 's/^PermitRootLogin yes/PermitRootLogin no/' /etc/ssh/sshd_config
sudo sed -i 's/^PasswordAuthentication yes/PasswordAuthentication no/' /etc/ssh/sshd_config
sudo systemctl restart sshd

# 4. 防火墙配置
sudo systemctl enable --now firewalld
sudo firewall-cmd --permanent --add-port=1883/tcp   # MQTT
sudo firewall-cmd --permanent --add-port=8883/tcp   # MQTT/SSL
sudo firewall-cmd --permanent --add-port=9092/tcp   # Kafka
sudo firewall-cmd --permanent --add-port=3000/tcp   # Grafana
sudo firewall-cmd --reload2. 内核参数优化(/etc/sysctl.conf)

conf# 网络性能优化
net.core.somaxconn = 65535
net.core.netdev_max_backlog = 65536
net.ipv4.tcp_max_syn_backlog = 65536

# 文件句柄限制
fs.file-max = 2097152
fs.nr_open = 2097152

# MQTT连接优化
net.ipv4.tcp_keepalive_time = 600
net.ipv4.tcp_keepalive_probes = 3
net.ipv4.tcp_keepalive_intvl = 15二、MQTT Broker集群部署(EMQX企业版)

1. 集群化部署

bash# 安装EMQX企业版
curl -s https://assets.emqx.com/scripts/install-emqx-rpm.sh | sudo bash
sudo dnf install emqx-enterprise -y

# 配置集群(3节点示例)
# 节点1(10.0.0.1):
echo "cluster.name = iot_platform" >> /etc/emqx/emqx.conf
echo "node.name = emqx@10.0.0.1" >> /etc/emqx/emqx.conf

# 节点2(10.0.0.2):
emqx_ctl cluster join emqx@10.0.0.12. 安全加固配置

bash# 1. 启用TLS加密
sudo mkdir /etc/emqx/certs
sudo certbot certonly --standalone -d mqtt.example.com
sudo cp /etc/letsencrypt/live/mqtt.example.com/* /etc/emqx/certs/

# 2. 配置EMQX(/etc/emqx/emqx.conf)
listeners.ssl.default {
bind = "0.0.0.0:8883"
max_connections = 100000
ssl_options {
    keyfile = "/etc/emqx/certs/privkey.pem"
    certfile = "/etc/emqx/certs/fullchain.pem"
}
}

# 3. 设备级认证
emqx_ctl users add device_001 $6$rounds=10000$somesalt$hashed_password3. 主题权限控制

conf# /etc/emqx/acl.conf
{allow, {user, "device_001"}, publish, ["sensors/001/#"]}
{allow, {user, "backend"}, subscribe, ["sensors/#"]}
{deny, all}三、数据处理与存储架构

1. 消息队列缓冲(Kafka)

bash# 安装Kafka
wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
tar -xzf kafka_2.13-3.4.0.tgz

# 配置集群(3节点)
# server.properties
broker.id=1
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://node1:9092
zookeeper.connect=node1:2181,node2:2181,node3:21812. 时序数据库(TimescaleDB)

bash# 安装PostgreSQL 15 + TimescaleDB
sudo dnf install https://download.postgresql.org/pub/repos/yum/reporpms/EL-8-x86_64/pgdg-redhat-repo-latest.noarch.rpm
sudo dnf module disable postgresql
sudo dnf install postgresql15-server postgresql15-contrib timescaledb-2-postgresql-15

# 初始化数据库
sudo /usr/pgsql-15/bin/postgresql-15-setup initdb
sudo systemctl enable --now postgresql-15

# 创建超级表
CREATE TABLE sensor_data (
    time TIMESTAMPTZ NOT NULL,
    device_id TEXT NOT NULL,
    value DOUBLE PRECISION NOT NULL
);
SELECT create_hypertable('sensor_data', 'time');3. 数据清洗服务(Python示例)

pythonfrom kafka import KafkaConsumer
import psycopg2

# Kafka消费者
consumer = KafkaConsumer(
    'raw_sensor_data',
    bootstrap_servers=['kafka1:9092', 'kafka2:9092'],
    security_protocol='SSL',
    ssl_cafile='ca.pem'
)

# TimescaleDB连接
conn = psycopg2.connect("dbname=tsdb user=tsdbadmin")
cursor = conn.cursor()

for message in consumer:
    data = json.loads(message.value)
   
    # 数据验证
    if not validate_sensor_data(data):
      continue
      
    # 数据清洗
    cleaned = clean_data(data)
   
    # 写入数据库
    cursor.execute(
      "INSERT INTO sensor_data (time, device_id, value) VALUES (%s, %s, %s)",
      (cleaned['timestamp'], cleaned['device_id'], cleaned['value'])
    )
    conn.commit()
   
    # 更新缓存
    redis.set(f"latest:{cleaned['device_id']}", json.dumps(cleaned))四、安全加固体系

1. 传输层安全

协议端口加密方式证书管理MQTT8883TLS 1.3Let's Encrypt自动更新HTTPS443TLS 1.3企业级证书Database5432TLS双向认证自签名CA2. 数据加密策略

python# 设备端数据加密示例
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend

def encrypt_data(data, key):
    iv = os.urandom(12)
    cipher = Cipher(
      algorithms.AES(key),
      modes.GCM(iv),
      backend=default_backend()
    )
    encryptor = cipher.encryptor()
    ciphertext = encryptor.update(data) + encryptor.finalize()
    return iv + encryptor.tag + ciphertext3. 访问控制矩阵

角色MQTT权限DB访问API权限设备发布特定主题无无数据服务订阅所有主题只写内部网络访问前端应用无只读JWT认证+RBAC管理员管理主题读写管理员权限五、运维监控体系

1. 监控组件部署

bash# Prometheus安装
sudo dnf install prometheus

# Node Exporter
sudo dnf install node_exporter

# Grafana
sudo dnf install grafana2. 关键监控指标

yaml# prometheus.yml 片段
scrape_configs:
- job_name: 'emqx'
    static_configs:
      - targets: ['emqx1:18083', 'emqx2:18083']
- job_name: 'postgres'
    static_configs:
      - targets: ['db1:9187']
- job_name: 'kafka'
    static_configs:
      - targets: ['kafka1:7071']3. 告警规则示例

yamlgroups:
- name: MQTT服务
rules:
- alert: EMQX节点离线
    expr: up{job="emqx"} == 0
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "MQTT节点 {{ $labels.instance }} 离线"
      
- alert: 消息积压
    expr: kafka_consumergroup_lag > 10000
    for: 10m
    labels:
      severity: warning六、设备接入与扩展方案

1. 设备接入流程

图表
 
2. 设备管理API设计

python# 设备注册API
@app.route('/api/v1/devices', methods=['POST'])
@jwt_required()
def register_device():
    data = request.get_json()
    device_id = generate_device_id()
   
    # 创建数据库记录
    db.execute("""
      INSERT INTO devices (id, name, type, owner)
      VALUES (%s, %s, %s, %s)
    """, (device_id, data['name'], data['type'], get_jwt_identity()))
   
    # 生成设备凭证
    credential = generate_device_credential(device_id)
   
    return jsonify({
      'device_id': device_id,
      'username': credential['username'],
      'password': credential['password'],
      'certificate': credential['cert_pem']
    }), 2013. 多协议支持方案

协议转换方式适用场景HTTPEMQX Webhook传统设备改造CoAPCoAP-MQTT代理网关低功耗设备Modbus边缘计算转换工业设备LoRaWAN网络服务器集成长距离物联网七、日常运维手册

1. 备份策略

bash# 数据库每日备份
pg_dump -U postgres -Fc tsdb > /backup/tsdb-$(date +%F).dump

# 配置文件备份
rsync -av /etc/emqx /backup/configs/emqx
rsync -av /etc/kafka /backup/configs/kafka

# 证书备份
tar -czf /backup/certs-$(date +%F).tar.gz /etc/letsencrypt/{live,archive}2. 灾难恢复流程


[*]恢复最新数据库备份
[*]重建EMQX集群
[*]恢复Kafka偏移量
[*]验证数据完整性
[*]逐步恢复设备连接
3. 性能调优命令

bash# 查看MQTT连接数
emqx_ctl clients list

# 检查Kafka积压
kafka-consumer-groups.sh --describe --group data_consumers

# 时序数据库维护
timescaledb-tune --quiet --yes八、扩展架构设计

1. 边缘计算集成

图表
 
2. 数据管道扩展

python# 添加AI处理管道
from kafka import KafkaProducer

ai_producer = KafkaProducer(bootstrap_servers='kafka:9092')

def process_for_ai(data):
    # 特征提取
    features = extract_features(data)
   
    # 发送到AI服务队列
    ai_producer.send('ai_processing', json.dumps(features).encode())

# 在清洗服务中调用
process_for_ai(cleaned_data)3. 多区域部署

bash# 跨区域EMQX集群
emqx_ctl cluster join emqx@us-east-1.platform.com

# 数据库级联复制
# 主库(欧洲)
CREATE PUBLICATION euro_publication FOR TABLE sensor_data;

# 从库(亚洲)
CREATE SUBSCRIPTION asia_subscription
CONNECTION 'host=euro-db port=5432 dbname=tsdb'
PUBLICATION euro_publication;九、前端展示系统

1. 实时数据大屏

javascript// 使用MQTT.js直接订阅
const client = mqtt.connect('wss://mqtt.example.com:8084/mqtt', {
username: 'web_user',
password: 'secure_token'
})

client.subscribe('sensors/+/status')
client.on('message', (topic, payload) => {
const data = JSON.parse(payload)
updateDashboard(data)
})2. 设备管理界面功能


[*]设备状态监控(在线/离线)
[*]实时数据曲线(Chart.js)
[*]历史数据查询(时间范围选择)
[*]告警管理(阈值设置)
[*]固件OTA升级
3. 移动APP集成

kotlin// Android数据获取示例
suspend fun fetchSensorData(deviceId: String): List<SensorData> {
    return withContext(Dispatchers.IO) {
      val response = apiService.getDeviceData(
            deviceId = deviceId,
            from = Instant.now().minus(1, ChronoUnit.DAYS).toString(),
            to = Instant.now().toString()
      )
      response.data ?: emptyList()
    }
}十、持续演进路线


[*]阶段1(基础平台)

[*]EMQX集群部署
[*]核心数据处理流水线
[*]基础监控

[*]阶段2(安全加固)

[*]设备证书管理
[*]数据端到端加密
[*]审计日志

[*]阶段3(智能扩展)

[*]边缘计算节点
[*]AI异常检测
[*]预测性维护

[*]阶段4(全球化部署)

[*]多区域集群
[*]数据主权合规
[*]跨云架构

本方案基于CentOS Stream 8构建了企业级物联网平台,通过多层次安全加固、全链路监控、弹性扩展架构,支持从数百到数百万设备的平滑扩展,日均处理能力可达亿级数据点,满足工业4.0场景需求。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: 基于CentOS Stream 8的物联网数据采集与展示方案