项目标题与描述
Apache NiFi C2 Client 是Apache NiFi项目中的Command and Control (C2)协议客户端实现,主要用于与C2服务器进行通信,实现远程管理和配置更新功能。该客户端支持心跳机制、配置更新、资源同步等核心功能,使得NiFi实例能够被集中管理和控制。
核心功能包括:
- 定期发送心跳信息到C2服务器
- 接收并执行来自C2服务器的操作指令
- 支持配置和资源的动态更新
- 提供安全通信机制(双向TLS认证)
功能特性
核心功能
- 心跳机制:定期向C2服务器发送心跳信息,报告自身状态
- 操作处理:支持多种操作类型(START/STOP/UPDATE等)和操作对象(FLOW/CONFIGURATION等)
- 配置更新:动态更新流配置和资源文件
- 资源同步:同步资源文件到本地
- 安全通信:支持HTTPS和双向TLS认证
- 状态管理:跟踪和管理操作执行状态
独特价值
- 标准化协议:基于标准化的C2协议,易于与其他系统集成
- 灵活扩展:通过操作处理器机制支持自定义操作处理逻辑
- 可靠通信:实现完善的错误处理和重试机制
- 状态持久化:支持操作状态的持久化存储,应对重启等场景
安装指南
系统要求
- Java 8+
- Apache NiFi C2服务器
依赖项
项目依赖包括:
- OkHttp (HTTP客户端)
- Jackson (JSON处理)
- SLF4J (日志)
- Apache Commons Lang
安装步骤
- 将项目编译打包:
- 将生成的jar包添加到NiFi或MiNiFi的classpath中
- 配置C2客户端参数(见下文配置部分)
使用说明
基本配置
C2客户端通过C2ClientConfig类进行配置,主要配置项包括:- // 示例配置
- C2ClientConfig config = new C2ClientConfig(
- "https://c2-server/nifi-api/c2-protocol/heartbeat", // C2服务器URL
- "https://c2-server/nifi-api/c2-protocol/acknowledge", // ACK URL
- "default", // Agent类
- "agent-123", // Agent标识
- "/conf", // 配置目录
- "manifest-1.0", // 运行时清单标识
- "minifi-java", // 运行时类型
- 30000L, // 心跳间隔(ms)
- "keystore.jks", // 密钥库文件
- "keystorepass", // 密钥库密码
- "keypass", // 密钥密码
- "JKS", // 密钥库类型
- "truststore.jks", // 信任库文件
- "truststorepass", // 信任库密码
- "JKS" // 信任库类型
- );
复制代码 初始化客户端
- // 创建HTTP客户端
- OkHttpClient okHttpClient = new OkHttpClientProvider(config).okHttpClient();
- // 创建C2序列化器
- C2Serializer serializer = new C2JacksonSerializer();
- // 创建C2客户端
- C2Client c2Client = new C2HttpClient(config, okHttpClient, serializer);
复制代码 发送心跳示例
- // 创建心跳信息
- C2Heartbeat heartbeat = new C2Heartbeat();
- heartbeat.setIdentifier("heartbeat-001");
- heartbeat.setCreated(System.currentTimeMillis());
- // 设置设备信息
- DeviceInfo deviceInfo = new DeviceInfo();
- deviceInfo.setIdentifier("device-001");
- heartbeat.setDeviceInfo(deviceInfo);
- // 发送心跳
- Optional<C2HeartbeatResponse> response = c2Client.publishHeartbeat(heartbeat);
- response.ifPresent(r -> {
- // 处理响应中的操作请求
- List<C2Operation> operations = r.getRequestedOperations();
- operations.forEach(op -> {
- // 执行操作...
- });
- });
复制代码 处理配置更新
- // 当收到UPDATE CONFIGURATION操作时
- String callbackUrl = "https://c2-server/config/update";
- Optional<byte[]> configContent = c2Client.retrieveUpdateConfigurationContent(callbackUrl);
- configContent.ifPresent(content -> {
- // 将新配置保存到文件系统
- try {
- Files.write(Paths.get("/conf/new-flow.json"), content);
- // 触发配置更新逻辑...
-
- // 发送ACK确认
- C2OperationAck ack = new C2OperationAck();
- ack.setOperationId("op-123");
- C2OperationState state = new C2OperationState();
- state.setState(C2OperationState.OperationState.FULLY_APPLIED);
- ack.setOperationState(state);
- c2Client.acknowledgeOperation(ack);
- } catch (IOException e) {
- // 错误处理...
- }
- });
复制代码 核心代码
C2客户端接口
- public interface C2Client {
- /**
- * 发送心跳到C2服务器
- * @param heartbeat 心跳信息
- * @return 可选的响应信息
- */
- Optional<C2HeartbeatResponse> publishHeartbeat(C2Heartbeat heartbeat);
- /**
- * 确认操作完成
- * @param operationAck 操作确认信息
- */
- void acknowledgeOperation(C2OperationAck operationAck);
- /**
- * 获取更新配置内容
- * @param callbackUrl 回调URL
- * @return 可选的配置内容
- */
- Optional<byte[]> retrieveUpdateConfigurationContent(String callbackUrl);
- /**
- * 获取资源内容
- * @param callbackUrl 回调URL
- * @return 可选的资源内容
- */
- Optional<byte[]> retrieveUpdateAssetContent(String callbackUrl);
- }
复制代码 心跳管理器
- public class C2HeartbeatManager implements Runnable {
- private final C2Client client;
- private final C2HeartbeatFactory heartbeatFactory;
- private final ReentrantLock heartbeatLock;
-
- @Override
- public void run() {
- if (!heartbeatLock.tryLock()) {
- LOGGER.debug("心跳锁被其他线程持有,跳过本次心跳发送");
- return;
- }
- try {
- C2Heartbeat heartbeat = heartbeatFactory.create(runtimeInfoSupplier.get());
- client.publishHeartbeat(heartbeat).ifPresent(this::processResponse);
- } finally {
- heartbeatLock.unlock();
- }
- }
-
- private void processResponse(C2HeartbeatResponse response) {
- // 处理响应中的操作请求
- ofNullable(response.getRequestedOperations())
- .filter(not(List::isEmpty))
- .ifPresent(ops -> ops.forEach(c2OperationManager::add));
- }
- }
复制代码 操作处理器
- public class UpdateConfigurationOperationHandler implements C2OperationHandler {
- private final UpdateConfigurationStrategy updateStrategy;
-
- @Override
- public C2OperationAck handle(C2Operation operation) {
- String callbackUrl = operation.getArgs().get("callbackUrl");
- Optional<byte[]> configContent = client.retrieveUpdateConfigurationContent(callbackUrl);
-
- if (configContent.isPresent()) {
- try {
- updateStrategy.update(configContent.get());
- return createSuccessAck(operation);
- } catch (Exception e) {
- return createFailedAck(operation, e.getMessage());
- }
- }
- return createFailedAck(operation, "无法获取配置内容");
- }
- }
复制代码 HTTP客户端实现
- public class C2HttpClient implements C2Client {
- private final C2ClientConfig clientConfig;
- private final OkHttpClient httpClient;
- private final C2Serializer serializer;
-
- @Override
- public Optional<C2HeartbeatResponse> publishHeartbeat(C2Heartbeat heartbeat) {
- Request request = new Request.Builder()
- .url(clientConfig.getC2Url())
- .post(createRequestBody(heartbeat))
- .build();
-
- try (Response response = httpClient.newCall(request).execute()) {
- return serializer.deserialize(response.body().string(), C2HeartbeatResponse.class);
- } catch (IOException e) {
- LOGGER.error("发送心跳失败", e);
- return Optional.empty();
- }
- }
-
- private RequestBody createRequestBody(C2Heartbeat heartbeat) {
- String json = serializer.serialize(heartbeat).orElseThrow();
- return RequestBody.create(json, MediaType.get("application/json"));
- }
- }
复制代码 通过以上核心代码,可以看出Apache NiFi C2 Client提供了完整的C2协议实现,支持与C2服务器的各种交互场景,是NiFi生态系统中的重要组件。
更多精彩内容 请关注我的个人公众号 公众号(办公AI智能小助手)
公众号二维码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |