找回密码
 立即注册
首页 业界区 业界 Apache NiFi C2 Client 实现详解

Apache NiFi C2 Client 实现详解

博咱 2025-6-29 21:20:44
项目标题与描述

Apache NiFi C2 Client 是Apache NiFi项目中的Command and Control (C2)协议客户端实现,主要用于与C2服务器进行通信,实现远程管理和配置更新功能。该客户端支持心跳机制、配置更新、资源同步等核心功能,使得NiFi实例能够被集中管理和控制。
核心功能包括:

  • 定期发送心跳信息到C2服务器
  • 接收并执行来自C2服务器的操作指令
  • 支持配置和资源的动态更新
  • 提供安全通信机制(双向TLS认证)
功能特性

核心功能


  • 心跳机制:定期向C2服务器发送心跳信息,报告自身状态
  • 操作处理:支持多种操作类型(START/STOP/UPDATE等)和操作对象(FLOW/CONFIGURATION等)
  • 配置更新:动态更新流配置和资源文件
  • 资源同步:同步资源文件到本地
  • 安全通信:支持HTTPS和双向TLS认证
  • 状态管理:跟踪和管理操作执行状态
独特价值


  • 标准化协议:基于标准化的C2协议,易于与其他系统集成
  • 灵活扩展:通过操作处理器机制支持自定义操作处理逻辑
  • 可靠通信:实现完善的错误处理和重试机制
  • 状态持久化:支持操作状态的持久化存储,应对重启等场景
安装指南

系统要求


  • Java 8+
  • Apache NiFi C2服务器
依赖项

项目依赖包括:

  • OkHttp (HTTP客户端)
  • Jackson (JSON处理)
  • SLF4J (日志)
  • Apache Commons Lang
安装步骤


  • 将项目编译打包:
    1. mvn clean package
    复制代码
  • 将生成的jar包添加到NiFi或MiNiFi的classpath中
  • 配置C2客户端参数(见下文配置部分)
使用说明

基本配置

C2客户端通过C2ClientConfig类进行配置,主要配置项包括:
  1. // 示例配置
  2. C2ClientConfig config = new C2ClientConfig(
  3.     "https://c2-server/nifi-api/c2-protocol/heartbeat",  // C2服务器URL
  4.     "https://c2-server/nifi-api/c2-protocol/acknowledge", // ACK URL
  5.     "default",  // Agent类
  6.     "agent-123", // Agent标识
  7.     "/conf",    // 配置目录
  8.     "manifest-1.0", // 运行时清单标识
  9.     "minifi-java",  // 运行时类型
  10.     30000L,     // 心跳间隔(ms)
  11.     "keystore.jks", // 密钥库文件
  12.     "keystorepass", // 密钥库密码
  13.     "keypass",      // 密钥密码
  14.     "JKS",          // 密钥库类型
  15.     "truststore.jks", // 信任库文件
  16.     "truststorepass", // 信任库密码
  17.     "JKS"           // 信任库类型
  18. );
复制代码
初始化客户端
  1. // 创建HTTP客户端
  2. OkHttpClient okHttpClient = new OkHttpClientProvider(config).okHttpClient();
  3. // 创建C2序列化器
  4. C2Serializer serializer = new C2JacksonSerializer();
  5. // 创建C2客户端
  6. C2Client c2Client = new C2HttpClient(config, okHttpClient, serializer);
复制代码
发送心跳示例
  1. // 创建心跳信息
  2. C2Heartbeat heartbeat = new C2Heartbeat();
  3. heartbeat.setIdentifier("heartbeat-001");
  4. heartbeat.setCreated(System.currentTimeMillis());
  5. // 设置设备信息
  6. DeviceInfo deviceInfo = new DeviceInfo();
  7. deviceInfo.setIdentifier("device-001");
  8. heartbeat.setDeviceInfo(deviceInfo);
  9. // 发送心跳
  10. Optional<C2HeartbeatResponse> response = c2Client.publishHeartbeat(heartbeat);
  11. response.ifPresent(r -> {
  12.     // 处理响应中的操作请求
  13.     List<C2Operation> operations = r.getRequestedOperations();
  14.     operations.forEach(op -> {
  15.         // 执行操作...
  16.     });
  17. });
复制代码
处理配置更新
  1. // 当收到UPDATE CONFIGURATION操作时
  2. String callbackUrl = "https://c2-server/config/update";
  3. Optional<byte[]> configContent = c2Client.retrieveUpdateConfigurationContent(callbackUrl);
  4. configContent.ifPresent(content -> {
  5.     // 将新配置保存到文件系统
  6.     try {
  7.         Files.write(Paths.get("/conf/new-flow.json"), content);
  8.         // 触发配置更新逻辑...
  9.         
  10.         // 发送ACK确认
  11.         C2OperationAck ack = new C2OperationAck();
  12.         ack.setOperationId("op-123");
  13.         C2OperationState state = new C2OperationState();
  14.         state.setState(C2OperationState.OperationState.FULLY_APPLIED);
  15.         ack.setOperationState(state);
  16.         c2Client.acknowledgeOperation(ack);
  17.     } catch (IOException e) {
  18.         // 错误处理...
  19.     }
  20. });
复制代码
核心代码

C2客户端接口
  1. public interface C2Client {
  2.     /**
  3.      * 发送心跳到C2服务器
  4.      * @param heartbeat 心跳信息
  5.      * @return 可选的响应信息
  6.      */
  7.     Optional<C2HeartbeatResponse> publishHeartbeat(C2Heartbeat heartbeat);
  8.     /**
  9.      * 确认操作完成
  10.      * @param operationAck 操作确认信息
  11.      */
  12.     void acknowledgeOperation(C2OperationAck operationAck);
  13.     /**
  14.      * 获取更新配置内容
  15.      * @param callbackUrl 回调URL
  16.      * @return 可选的配置内容
  17.      */
  18.     Optional<byte[]> retrieveUpdateConfigurationContent(String callbackUrl);
  19.     /**
  20.      * 获取资源内容
  21.      * @param callbackUrl 回调URL
  22.      * @return 可选的资源内容
  23.      */
  24.     Optional<byte[]> retrieveUpdateAssetContent(String callbackUrl);
  25. }
复制代码
心跳管理器
  1. public class C2HeartbeatManager implements Runnable {
  2.     private final C2Client client;
  3.     private final C2HeartbeatFactory heartbeatFactory;
  4.     private final ReentrantLock heartbeatLock;
  5.    
  6.     @Override
  7.     public void run() {
  8.         if (!heartbeatLock.tryLock()) {
  9.             LOGGER.debug("心跳锁被其他线程持有,跳过本次心跳发送");
  10.             return;
  11.         }
  12.         try {
  13.             C2Heartbeat heartbeat = heartbeatFactory.create(runtimeInfoSupplier.get());
  14.             client.publishHeartbeat(heartbeat).ifPresent(this::processResponse);
  15.         } finally {
  16.             heartbeatLock.unlock();
  17.         }
  18.     }
  19.    
  20.     private void processResponse(C2HeartbeatResponse response) {
  21.         // 处理响应中的操作请求
  22.         ofNullable(response.getRequestedOperations())
  23.             .filter(not(List::isEmpty))
  24.             .ifPresent(ops -> ops.forEach(c2OperationManager::add));
  25.     }
  26. }
复制代码
操作处理器
  1. public class UpdateConfigurationOperationHandler implements C2OperationHandler {
  2.     private final UpdateConfigurationStrategy updateStrategy;
  3.    
  4.     @Override
  5.     public C2OperationAck handle(C2Operation operation) {
  6.         String callbackUrl = operation.getArgs().get("callbackUrl");
  7.         Optional<byte[]> configContent = client.retrieveUpdateConfigurationContent(callbackUrl);
  8.         
  9.         if (configContent.isPresent()) {
  10.             try {
  11.                 updateStrategy.update(configContent.get());
  12.                 return createSuccessAck(operation);
  13.             } catch (Exception e) {
  14.                 return createFailedAck(operation, e.getMessage());
  15.             }
  16.         }
  17.         return createFailedAck(operation, "无法获取配置内容");
  18.     }
  19. }
复制代码
HTTP客户端实现
  1. public class C2HttpClient implements C2Client {
  2.     private final C2ClientConfig clientConfig;
  3.     private final OkHttpClient httpClient;
  4.     private final C2Serializer serializer;
  5.    
  6.     @Override
  7.     public Optional<C2HeartbeatResponse> publishHeartbeat(C2Heartbeat heartbeat) {
  8.         Request request = new Request.Builder()
  9.             .url(clientConfig.getC2Url())
  10.             .post(createRequestBody(heartbeat))
  11.             .build();
  12.             
  13.         try (Response response = httpClient.newCall(request).execute()) {
  14.             return serializer.deserialize(response.body().string(), C2HeartbeatResponse.class);
  15.         } catch (IOException e) {
  16.             LOGGER.error("发送心跳失败", e);
  17.             return Optional.empty();
  18.         }
  19.     }
  20.    
  21.     private RequestBody createRequestBody(C2Heartbeat heartbeat) {
  22.         String json = serializer.serialize(heartbeat).orElseThrow();
  23.         return RequestBody.create(json, MediaType.get("application/json"));
  24.     }
  25. }
复制代码
通过以上核心代码,可以看出Apache NiFi C2 Client提供了完整的C2协议实现,支持与C2服务器的各种交互场景,是NiFi生态系统中的重要组件。
更多精彩内容 请关注我的个人公众号 公众号(办公AI智能小助手)
公众号二维码
1.png


来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册