找回密码
 立即注册
首页 业界区 业界 【Zookeeper从入门到实战】SpringBoot整合完整指南 ...

【Zookeeper从入门到实战】SpringBoot整合完整指南

恙髡 2025-6-9 15:55:11
Zookeeper从入门到实战:SpringBoot整合完整指南

一、Zookeeper概述

1.1 什么是Zookeeper

Zookeeper是一个开源的分布式协调服务,由Apache软件基金会维护。它最初是为Hadoop生态系统设计的,但现在已被广泛应用于各种分布式系统中。Zookeeper提供了一种简单而健壮的方式,用于管理分布式环境中的配置信息、命名服务、分布式同步和组服务等。
1.2 Zookeeper的核心特性


  • 顺序一致性:客户端的更新请求将按照它们被发送的顺序进行应用
  • 原子性:更新操作要么成功要么失败,没有中间状态
  • 单一系统映像:无论客户端连接到哪个服务器,都将看到相同的服务视图
  • 可靠性:一旦更新被应用,它将从那时起保持,直到客户端覆盖更新
  • 及时性:系统的客户端视图保证在一定时间范围内是最新的
1.3 Zookeeper的典型应用场景


  • 配置管理:集中式配置管理
  • 分布式锁:实现跨JVM的互斥机制
  • 集群管理:监控集群节点状态
  • 命名服务:类似DNS的服务
  • 分布式队列:简单的队列实现
  • Leader选举:分布式系统中的主节点选举
二、Zookeeper安装与配置

2.1 单机模式安装

下载Zookeeper
  1. wget https://downloads.apache.org/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
  2. tar -zxvf apache-zookeeper-3.7.0-bin.tar.gz
  3. cd apache-zookeeper-3.7.0-bin
复制代码
配置Zookeeper

创建配置文件conf/zoo.cfg:
  1. # 基本时间单位(毫秒)
  2. tickTime=2000
  3. # 数据目录
  4. dataDir=/tmp/zookeeper
  5. # 客户端连接端口
  6. clientPort=2181
  7. # 初始化连接时能容忍的最长心跳间隔(tickTime的倍数)
  8. initLimit=10
  9. # 发送请求和接收响应能容忍的最长心跳间隔
  10. syncLimit=5
复制代码
启动Zookeeper
  1. bin/zkServer.sh start
复制代码
验证安装
  1. bin/zkCli.sh -server 127.0.0.1:2181
复制代码
2.2 集群模式安装

对于生产环境,建议至少部署3个节点的Zookeeper集群。修改每个节点的zoo.cfg:
  1. tickTime=2000
  2. dataDir=/var/lib/zookeeper
  3. clientPort=2181
  4. initLimit=10
  5. syncLimit=5
  6. # 集群配置 server.id=host:port1:port2
  7. server.1=node1:2888:3888
  8. server.2=node2:2888:3888
  9. server.3=node3:2888:3888
复制代码
在每个节点的dataDir目录下创建myid文件,内容为对应的server.id中的id:
  1. # 在node1上
  2. echo 1 > /var/lib/zookeeper/myid
  3. # 在node2上
  4. echo 2 > /var/lib/zookeeper/myid
  5. # 在node3上
  6. echo 3 > /var/lib/zookeeper/myid
复制代码
三、Zookeeper基础操作

3.1 基本命令操作

通过zkCli.sh连接后可以执行以下命令:
  1. # 查看根节点
  2. ls /
  3. # 创建持久节点
  4. create /myapp "myapp data"
  5. # 创建临时节点(会话结束自动删除)
  6. create -e /myapp/tempnode "temp data"
  7. # 创建顺序节点
  8. create -s /myapp/seqnode "seq data"
  9. # 获取节点数据
  10. get /myapp
  11. # 设置节点数据
  12. set /myapp "new data"
  13. # 删除节点
  14. delete /myapp/seqnode0000000001
  15. # 递归删除节点
  16. rmr /myapp
  17. # 查看节点状态
  18. stat /myapp
复制代码
3.2 Zookeeper节点类型


  • 持久节点(PERSISTENT):创建后一直存在,除非显式删除
  • 临时节点(EPHEMERAL):客户端会话结束时自动删除
  • 持久顺序节点(PERSISTENT_SEQUENTIAL):持久节点,但节点名后会附加一个单调递增的数字后缀
  • 临时顺序节点(EPHEMERAL_SEQUENTIAL):临时节点,带有序号后缀
3.3 Zookeeper Watch机制

Watch是Zookeeper的一个重要特性,它允许客户端在节点发生变化时收到通知。
  1. # 设置watch
  2. get /myapp watch
  3. # 另一个会话修改/myapp节点数据
  4. set /myapp "changed data"
  5. # 原会话会收到NodeDataChanged事件
复制代码
四、Zookeeper Java客户端API

4.1 原生Java客户端

首先添加Maven依赖:
  1. <dependency>
  2.     <groupId>org.apache.zookeeper</groupId>
  3.     zookeeper</artifactId>
  4.     <version>3.7.0</version>
  5. </dependency>
复制代码
基本操作示例
  1. import org.apache.zookeeper.*;
  2. import org.apache.zookeeper.data.Stat;
  3. import java.io.IOException;
  4. import java.util.concurrent.CountDownLatch;
  5. public class ZookeeperDemo {
  6.     private static final String CONNECT_STRING = "localhost:2181";
  7.     private static final int SESSION_TIMEOUT = 5000;
  8.     private static ZooKeeper zk;
  9.     private static final CountDownLatch connectedSemaphore = new CountDownLatch(1);
  10.     public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
  11.         // 创建连接
  12.         zk = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() {
  13.             @Override
  14.             public void process(WatchedEvent event) {
  15.                 if (Event.KeeperState.SyncConnected == event.getState()) {
  16.                     connectedSemaphore.countDown();
  17.                 }
  18.             }
  19.         });
  20.         connectedSemaphore.await();
  21.         System.out.println("Zookeeper连接成功");
  22.         
  23.         // 创建节点
  24.         String path = "/test-node";
  25.         String data = "test data";
  26.         zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  27.         System.out.println("创建节点: " + path);
  28.         
  29.         // 获取节点数据
  30.         byte[] nodeData = zk.getData(path, false, null);
  31.         System.out.println("节点数据: " + new String(nodeData));
  32.         
  33.         // 修改节点数据
  34.         String newData = "new test data";
  35.         zk.setData(path, newData.getBytes(), -1);
  36.         System.out.println("修改节点数据");
  37.         
  38.         // 删除节点
  39.         zk.delete(path, -1);
  40.         System.out.println("删除节点: " + path);
  41.         
  42.         // 关闭连接
  43.         zk.close();
  44.     }
  45. }
复制代码
Watch示例
  1. public class ZookeeperWatchDemo {
  2.     // ... 同上连接代码
  3.    
  4.     public static void watchDemo() throws KeeperException, InterruptedException {
  5.         String path = "/watch-node";
  6.         
  7.         // 创建节点
  8.         zk.create(path, "init".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  9.         
  10.         // 设置watch
  11.         Stat stat = new Stat();
  12.         byte[] data = zk.getData(path, new Watcher() {
  13.             @Override
  14.             public void process(WatchedEvent event) {
  15.                 System.out.println("收到事件: " + event);
  16.                 if (event.getType() == Event.EventType.NodeDataChanged) {
  17.                     try {
  18.                         // 再次设置watch,实现持续监听
  19.                         byte[] newData = zk.getData(path, this, null);
  20.                         System.out.println("数据已修改,新值为: " + new String(newData));
  21.                     } catch (Exception e) {
  22.                         e.printStackTrace();
  23.                     }
  24.                 }
  25.             }
  26.         }, stat);
  27.         
  28.         System.out.println("初始数据: " + new String(data));
  29.         
  30.         // 修改数据触发watch
  31.         zk.setData(path, "changed".getBytes(), stat.getVersion());
  32.         
  33.         // 等待watch触发
  34.         Thread.sleep(1000);
  35.         
  36.         // 删除节点
  37.         zk.delete(path, -1);
  38.     }
  39. }
复制代码
4.2 Curator客户端

Curator是Netflix开源的Zookeeper客户端,提供了更高级的API和常用模式的实现。
添加依赖
  1. <dependency>
  2.     <groupId>org.apache.curator</groupId>
  3.     curator-framework</artifactId>
  4.     <version>5.2.0</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>org.apache.curator</groupId>
  8.     curator-recipes</artifactId>
  9.     <version>5.2.0</version>
  10. </dependency>
复制代码
基本操作示例
  1. import org.apache.curator.framework.CuratorFramework;
  2. import org.apache.curator.framework.CuratorFrameworkFactory;
  3. import org.apache.curator.retry.ExponentialBackoffRetry;
  4. import org.apache.zookeeper.CreateMode;
  5. import org.apache.zookeeper.data.Stat;
  6. public class CuratorDemo {
  7.     private static final String CONNECT_STRING = "localhost:2181";
  8.     private static final int SESSION_TIMEOUT = 5000;
  9.     private static final int CONNECTION_TIMEOUT = 3000;
  10.     public static void main(String[] args) throws Exception {
  11.         // 创建客户端
  12.         CuratorFramework client = CuratorFrameworkFactory.builder()
  13.                 .connectString(CONNECT_STRING)
  14.                 .sessionTimeoutMs(SESSION_TIMEOUT)
  15.                 .connectionTimeoutMs(CONNECTION_TIMEOUT)
  16.                 .retryPolicy(new ExponentialBackoffRetry(1000, 3))
  17.                 .build();
  18.         
  19.         // 启动客户端
  20.         client.start();
  21.         
  22.         // 创建节点
  23.         String path = "/curator-node";
  24.         client.create()
  25.                 .creatingParentsIfNeeded()
  26.                 .withMode(CreateMode.PERSISTENT)
  27.                 .forPath(path, "init".getBytes());
  28.         System.out.println("创建节点: " + path);
  29.         
  30.         // 获取节点数据
  31.         byte[] data = client.getData().forPath(path);
  32.         System.out.println("节点数据: " + new String(data));
  33.         
  34.         // 修改节点数据
  35.         Stat stat = client.setData().forPath(path, "changed".getBytes());
  36.         System.out.println("修改节点数据,版本号: " + stat.getVersion());
  37.         
  38.         // 删除节点
  39.         client.delete()
  40.                 .guaranteed()
  41.                 .deletingChildrenIfNeeded()
  42.                 .forPath(path);
  43.         System.out.println("删除节点: " + path);
  44.         
  45.         // 关闭客户端
  46.         client.close();
  47.     }
  48. }
复制代码
五、SpringBoot整合Zookeeper

5.1 项目搭建

创建SpringBoot项目并添加依赖:
  1. <dependencies>
  2.     <dependency>
  3.         <groupId>org.springframework.boot</groupId>
  4.         spring-boot-starter-web</artifactId>
  5.     </dependency>
  6.    
  7.     <dependency>
  8.         <groupId>org.apache.curator</groupId>
  9.         curator-framework</artifactId>
  10.         <version>5.2.0</version>
  11.     </dependency>
  12.     <dependency>
  13.         <groupId>org.apache.curator</groupId>
  14.         curator-recipes</artifactId>
  15.         <version>5.2.0</version>
  16.     </dependency>
  17.    
  18.     <dependency>
  19.         <groupId>org.projectlombok</groupId>
  20.         lombok</artifactId>
  21.         <optional>true</optional>
  22.     </dependency>
  23. </dependencies>
复制代码
5.2 配置类

创建Zookeeper配置类:
  1. import org.apache.curator.framework.CuratorFramework;
  2. import org.apache.curator.framework.CuratorFrameworkFactory;
  3. import org.apache.curator.retry.ExponentialBackoffRetry;
  4. import org.springframework.beans.factory.annotation.Value;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class ZookeeperConfig {
  9.    
  10.     @Value("${zookeeper.connect-string}")
  11.     private String connectString;
  12.    
  13.     @Value("${zookeeper.session-timeout}")
  14.     private int sessionTimeout;
  15.    
  16.     @Value("${zookeeper.connection-timeout}")
  17.     private int connectionTimeout;
  18.    
  19.     @Bean(initMethod = "start", destroyMethod = "close")
  20.     public CuratorFramework curatorFramework() {
  21.         return CuratorFrameworkFactory.builder()
  22.                 .connectString(connectString)
  23.                 .sessionTimeoutMs(sessionTimeout)
  24.                 .connectionTimeoutMs(connectionTimeout)
  25.                 .retryPolicy(new ExponentialBackoffRetry(1000, 3))
  26.                 .namespace("springboot-demo")  // 命名空间隔离
  27.                 .build();
  28.     }
  29. }
复制代码
在application.properties中添加配置:
  1. # Zookeeper配置
  2. zookeeper.connect-string=localhost:2181
  3. zookeeper.session-timeout=5000
  4. zookeeper.connection-timeout=3000
复制代码
5.3 服务类

创建Zookeeper操作服务类:
  1. import lombok.extern.slf4j.Slf4j;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.recipes.cache.ChildData;
  4. import org.apache.curator.framework.recipes.cache.CuratorCache;
  5. import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
  6. import org.apache.zookeeper.CreateMode;
  7. import org.apache.zookeeper.data.Stat;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.stereotype.Service;
  10. import java.nio.charset.StandardCharsets;
  11. import java.util.List;
  12. @Slf4j
  13. @Service
  14. public class ZookeeperService {
  15.    
  16.     @Autowired
  17.     private CuratorFramework curatorFramework;
  18.    
  19.     /**
  20.      * 创建节点
  21.      */
  22.     public String createNode(String path, String data, CreateMode createMode) throws Exception {
  23.         byte[] dataBytes = data.getBytes(StandardCharsets.UTF_8);
  24.         String createdPath = curatorFramework.create()
  25.                 .creatingParentsIfNeeded()
  26.                 .withMode(createMode)
  27.                 .forPath(path, dataBytes);
  28.         log.info("节点创建成功: {}", createdPath);
  29.         return createdPath;
  30.     }
  31.    
  32.     /**
  33.      * 获取节点数据
  34.      */
  35.     public String getNodeData(String path) throws Exception {
  36.         byte[] dataBytes = curatorFramework.getData().forPath(path);
  37.         return new String(dataBytes, StandardCharsets.UTF_8);
  38.     }
  39.    
  40.     /**
  41.      * 更新节点数据
  42.      */
  43.     public Stat updateNodeData(String path, String data) throws Exception {
  44.         byte[] dataBytes = data.getBytes(StandardCharsets.UTF_8);
  45.         return curatorFramework.setData().forPath(path, dataBytes);
  46.     }
  47.    
  48.     /**
  49.      * 删除节点
  50.      */
  51.     public void deleteNode(String path) throws Exception {
  52.         curatorFramework.delete()
  53.                 .guaranteed()
  54.                 .deletingChildrenIfNeeded()
  55.                 .forPath(path);
  56.         log.info("节点删除成功: {}", path);
  57.     }
  58.    
  59.     /**
  60.      * 检查节点是否存在
  61.      */
  62.     public boolean isNodeExist(String path) throws Exception {
  63.         Stat stat = curatorFramework.checkExists().forPath(path);
  64.         return stat != null;
  65.     }
  66.    
  67.     /**
  68.      * 获取子节点列表
  69.      */
  70.     public List<String> getChildren(String path) throws Exception {
  71.         return curatorFramework.getChildren().forPath(path);
  72.     }
  73.    
  74.     /**
  75.      * 添加节点监听
  76.      */
  77.     public void addNodeListener(String path, NodeListenerCallback callback) {
  78.         CuratorCache cache = CuratorCache.build(curatorFramework, path);
  79.         
  80.         CuratorCacheListener listener = CuratorCacheListener.builder()
  81.                 .forCreates(node -> callback.onNodeCreated(node.getPath(), new String(node.getData())))
  82.                 .forChanges((oldNode, node) ->
  83.                         callback.onNodeUpdated(node.getPath(), new String(node.getData())))
  84.                 .forDeletes(node -> callback.onNodeDeleted(node.getPath()))
  85.                 .forInitialized(() -> callback.onInitialized()))
  86.                 .build();
  87.         
  88.         cache.listenable().addListener(listener);
  89.         cache.start();
  90.     }
  91.    
  92.     public interface NodeListenerCallback {
  93.         default void onNodeCreated(String path, String data) {}
  94.         default void onNodeUpdated(String path, String data) {}
  95.         default void onNodeDeleted(String path) {}
  96.         default void onInitialized() {}
  97.     }
  98. }
复制代码
5.4 控制器类

创建REST控制器:
  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.web.bind.annotation.*;
  3. @RestController
  4. @RequestMapping("/zk")
  5. public class ZookeeperController {
  6.    
  7.     @Autowired
  8.     private ZookeeperService zookeeperService;
  9.    
  10.     @PostMapping("/node")
  11.     public String createNode(@RequestParam String path,
  12.                            @RequestParam String data,
  13.                            @RequestParam(defaultValue = "PERSISTENT") String mode) throws Exception {
  14.         return zookeeperService.createNode(path, data, CreateMode.valueOf(mode));
  15.     }
  16.    
  17.     @GetMapping("/node")
  18.     public String getNodeData(@RequestParam String path) throws Exception {
  19.         return zookeeperService.getNodeData(path);
  20.     }
  21.    
  22.     @PutMapping("/node")
  23.     public String updateNodeData(@RequestParam String path,
  24.                                @RequestParam String data) throws Exception {
  25.         zookeeperService.updateNodeData(path, data);
  26.         return "更新成功";
  27.     }
  28.    
  29.     @DeleteMapping("/node")
  30.     public String deleteNode(@RequestParam String path) throws Exception {
  31.         zookeeperService.deleteNode(path);
  32.         return "删除成功";
  33.     }
  34.    
  35.     @GetMapping("/node/exists")
  36.     public boolean isNodeExist(@RequestParam String path) throws Exception {
  37.         return zookeeperService.isNodeExist(path);
  38.     }
  39.    
  40.     @GetMapping("/node/children")
  41.     public List<String> getChildren(@RequestParam String path) throws Exception {
  42.         return zookeeperService.getChildren(path);
  43.     }
  44. }
复制代码
六、Zookeeper高级应用

6.1 分布式锁实现
  1. import lombok.extern.slf4j.Slf4j;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Service;
  6. import java.util.concurrent.TimeUnit;
  7. @Slf4j
  8. @Service
  9. public class DistributedLockService {
  10.    
  11.     private static final String LOCK_PATH = "/locks";
  12.    
  13.     @Autowired
  14.     private CuratorFramework curatorFramework;
  15.    
  16.     /**
  17.      * 获取分布式锁
  18.      */
  19.     public boolean acquireLock(String lockName, long waitTime, TimeUnit timeUnit) {
  20.         InterProcessMutex lock = new InterProcessMutex(curatorFramework, LOCK_PATH + "/" + lockName);
  21.         try {
  22.             return lock.acquire(waitTime, timeUnit);
  23.         } catch (Exception e) {
  24.             log.error("获取分布式锁失败", e);
  25.             return false;
  26.         }
  27.     }
  28.    
  29.     /**
  30.      * 释放分布式锁
  31.      */
  32.     public void releaseLock(String lockName) {
  33.         InterProcessMutex lock = new InterProcessMutex(curatorFramework, LOCK_PATH + "/" + lockName);
  34.         try {
  35.             if (lock.isAcquiredInThisProcess()) {
  36.                 lock.release();
  37.             }
  38.         } catch (Exception e) {
  39.             log.error("释放分布式锁失败", e);
  40.         }
  41.     }
  42.    
  43.     /**
  44.      * 执行带锁的操作
  45.      */
  46.     public <T> T executeWithLock(String lockName, long waitTime, TimeUnit timeUnit, LockOperation<T> operation) throws Exception {
  47.         InterProcessMutex lock = new InterProcessMutex(curatorFramework, LOCK_PATH + "/" + lockName);
  48.         try {
  49.             if (lock.acquire(waitTime, timeUnit)) {
  50.                 return operation.execute();
  51.             } else {
  52.                 throw new RuntimeException("获取锁超时");
  53.             }
  54.         } finally {
  55.             if (lock.isAcquiredInThisProcess()) {
  56.                 lock.release();
  57.             }
  58.         }
  59.     }
  60.    
  61.     public interface LockOperation<T> {
  62.         T execute() throws Exception;
  63.     }
  64. }
复制代码
6.2 配置中心实现
  1. import lombok.extern.slf4j.Slf4j;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.recipes.cache.CuratorCache;
  4. import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Service;
  7. import javax.annotation.PostConstruct;
  8. import java.nio.charset.StandardCharsets;
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. import java.util.concurrent.ConcurrentHashMap;
  12. @Slf4j
  13. @Service
  14. public class ConfigCenterService {
  15.    
  16.     private static final String CONFIG_PATH = "/config";
  17.     private final Map<String, String> configCache = new ConcurrentHashMap<>();
  18.    
  19.     @Autowired
  20.     private CuratorFramework curatorFramework;
  21.    
  22.     @PostConstruct
  23.     public void init() throws Exception {
  24.         // 初始化配置
  25.         loadAllConfigs();
  26.         
  27.         // 监听配置变化
  28.         watchConfigChanges();
  29.     }
  30.    
  31.     /**
  32.      * 加载所有配置
  33.      */
  34.     private void loadAllConfigs() throws Exception {
  35.         if (curatorFramework.checkExists().forPath(CONFIG_PATH) == null) {
  36.             curatorFramework.create().creatingParentsIfNeeded().forPath(CONFIG_PATH);
  37.         }
  38.         
  39.         List<String> children = curatorFramework.getChildren().forPath(CONFIG_PATH);
  40.         for (String child : children) {
  41.             String path = CONFIG_PATH + "/" + child;
  42.             byte[] data = curatorFramework.getData().forPath(path);
  43.             configCache.put(child, new String(data, StandardCharsets.UTF_8));
  44.         }
  45.     }
  46.    
  47.     /**
  48.      * 监听配置变化
  49.      */
  50.     private void watchConfigChanges() {
  51.         CuratorCache cache = CuratorCache.build(curatorFramework, CONFIG_PATH);
  52.         
  53.         CuratorCacheListener listener = CuratorCacheListener.builder()
  54.                 .forCreates(node -> {
  55.                     String key = node.getPath().replace(CONFIG_PATH + "/", "");
  56.                     configCache.put(key, new String(node.getData()));
  57.                     log.info("配置新增: {}={}", key, configCache.get(key));
  58.                 })
  59.                 .forChanges((oldNode, node) -> {
  60.                     String key = node.getPath().replace(CONFIG_PATH + "/", "");
  61.                     configCache.put(key, new String(node.getData()));
  62.                     log.info("配置修改: {}={}", key, configCache.get(key));
  63.                 })
  64.                 .forDeletes(node -> {
  65.                     String key = node.getPath().replace(CONFIG_PATH + "/", "");
  66.                     configCache.remove(key);
  67.                     log.info("配置删除: {}", key);
  68.                 })
  69.                 .build();
  70.         
  71.         cache.listenable().addListener(listener);
  72.         cache.start();
  73.     }
  74.    
  75.     /**
  76.      * 获取配置
  77.      */
  78.     public String getConfig(String key) {
  79.         return configCache.get(key);
  80.     }
  81.    
  82.     /**
  83.      * 获取所有配置
  84.      */
  85.     public Map<String, String> getAllConfigs() {
  86.         return new HashMap<>(configCache);
  87.     }
  88.    
  89.     /**
  90.      * 设置配置
  91.      */
  92.     public void setConfig(String key, String value) throws Exception {
  93.         String path = CONFIG_PATH + "/" + key;
  94.         byte[] data = value.getBytes(StandardCharsets.UTF_8);
  95.         
  96.         if (curatorFramework.checkExists().forPath(path) == null) {
  97.             curatorFramework.create().forPath(path, data);
  98.         } else {
  99.             curatorFramework.setData().forPath(path, data);
  100.         }
  101.     }
  102.    
  103.     /**
  104.      * 删除配置
  105.      */
  106.     public void deleteConfig(String key) throws Exception {
  107.         String path = CONFIG_PATH + "/" + key;
  108.         curatorFramework.delete().forPath(path);
  109.     }
  110. }
复制代码
6.3 服务注册与发现

服务注册
  1. import lombok.extern.slf4j.Slf4j;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.zookeeper.CreateMode;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.stereotype.Service;
  7. import javax.annotation.PostConstruct;
  8. import java.net.InetAddress;
  9. @Slf4j
  10. @Service
  11. public class ServiceRegistry {
  12.    
  13.     private static final String REGISTRY_PATH = "/services";
  14.    
  15.     @Autowired
  16.     private CuratorFramework curatorFramework;
  17.    
  18.     @Value("${server.port}")
  19.     private int port;
  20.    
  21.     private String servicePath;
  22.    
  23.     @PostConstruct
  24.     public void register() throws Exception {
  25.         // 创建服务根节点(持久节点)
  26.         if (curatorFramework.checkExists().forPath(REGISTRY_PATH) == null) {
  27.             curatorFramework.create()
  28.                     .creatingParentsIfNeeded()
  29.                     .forPath(REGISTRY_PATH, "Service Registry".getBytes());
  30.         }
  31.         
  32.         // 获取本机IP地址
  33.         String ip = InetAddress.getLocalHost().getHostAddress();
  34.         String serviceInstance = ip + ":" + port;
  35.         
  36.         // 创建临时顺序节点
  37.         servicePath = curatorFramework.create()
  38.                 .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
  39.                 .forPath(REGISTRY_PATH + "/instance-", serviceInstance.getBytes());
  40.         
  41.         log.info("服务注册成功: {}", servicePath);
  42.     }
  43.    
  44.     public String getServicePath() {
  45.         return servicePath;
  46.     }
  47. }
复制代码
服务发现
  1. import lombok.extern.slf4j.Slf4j;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.recipes.cache.CuratorCache;
  4. import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Service;
  7. import javax.annotation.PostConstruct;
  8. import java.util.ArrayList;
  9. import java.util.List;
  10. @Slf4j
  11. @Service
  12. public class ServiceDiscovery {
  13.    
  14.     private static final String REGISTRY_PATH = "/services";
  15.     private final List<String> serviceInstances = new ArrayList<>();
  16.    
  17.     @Autowired
  18.     private CuratorFramework curatorFramework;
  19.    
  20.     @PostConstruct
  21.     public void init() throws Exception {
  22.         // 初始化服务列表
  23.         discoverServices();
  24.         
  25.         // 监听服务变化
  26.         watchServices();
  27.     }
  28.    
  29.     /**
  30.      * 发现可用服务
  31.      */
  32.     private void discoverServices() throws Exception {
  33.         serviceInstances.clear();
  34.         
  35.         if (curatorFramework.checkExists().forPath(REGISTRY_PATH) != null) {
  36.             List<String> instances = curatorFramework.getChildren().forPath(REGISTRY_PATH);
  37.             for (String instance : instances) {
  38.                 String instancePath = REGISTRY_PATH + "/" + instance;
  39.                 byte[] data = curatorFramework.getData().forPath(instancePath);
  40.                 serviceInstances.add(new String(data));
  41.             }
  42.         }
  43.         
  44.         log.info("当前可用服务实例: {}", serviceInstances);
  45.     }
  46.    
  47.     /**
  48.      * 监听服务变化
  49.      */
  50.     private void watchServices() {
  51.         CuratorCache cache = CuratorCache.build(curatorFramework, REGISTRY_PATH);
  52.         
  53.         CuratorCacheListener listener = CuratorCacheListener.builder()
  54.                 .forCreates(node -> {
  55.                     try {
  56.                         discoverServices();
  57.                     } catch (Exception e) {
  58.                         log.error("处理服务新增事件失败", e);
  59.                     }
  60.                 })
  61.                 .forChanges((oldNode, node) -> {
  62.                     try {
  63.                         discoverServices();
  64.                     } catch (Exception e) {
  65.                         log.error("处理服务变更事件失败", e);
  66.                     }
  67.                 })
  68.                 .forDeletes(node -> {
  69.                     try {
  70.                         discoverServices();
  71.                     } catch (Exception e) {
  72.                         log.error("处理服务删除事件失败", e);
  73.                     }
  74.                 })
  75.                 .build();
  76.         
  77.         cache.listenable().addListener(listener);
  78.         cache.start();
  79.     }
  80.    
  81.     /**
  82.      * 获取所有服务实例
  83.      */
  84.     public List<String> getAllServiceInstances() {
  85.         return new ArrayList<>(serviceInstances);
  86.     }
  87.    
  88.     /**
  89.      * 随机获取一个服务实例(简单的负载均衡)
  90.      */
  91.     public String getRandomServiceInstance() {
  92.         if (serviceInstances.isEmpty()) {
  93.             return null;
  94.         }
  95.         int index = (int) (Math.random() * serviceInstances.size());
  96.         return serviceInstances.get(index);
  97.     }
  98. }
复制代码
七、生产环境注意事项

7.1 Zookeeper性能优化


  • 数据目录和事务日志目录分离:将dataDir和dataLogDir配置到不同的物理磁盘
  • JVM调优:适当增加JVM堆内存,设置合适的GC参数
  • 快照清理:配置autopurge.snapRetainCount和autopurge.purgeInterval自动清理旧快照
  • 限制客户端连接数:合理设置maxClientCnxns参数
7.2 监控与运维


  • 使用四字命令监控

    • echo stat | nc localhost 2181 查看服务器状态
    • echo mntr | nc localhost 2181 查看监控信息
    • echo cons | nc localhost 2181 查看客户端连接

  • JMX监控:启用JMX监控Zookeeper运行状态
  • 日志管理:合理配置日志级别和日志滚动策略
7.3 常见问题解决


  • 连接问题

    • 检查防火墙设置
    • 确认Zookeeper服务是否正常运行
    • 检查客户端和服务端版本是否兼容

  • 节点已存在错误

    • 使用带版本号的API操作节点
    • 先检查节点是否存在再操作

  • 会话过期

    • 增加会话超时时间
    • 实现会话过期后的重连逻辑

八、总结

本文从Zookeeper的基本概念讲起,详细介绍了安装配置、基本操作、Java客户端使用,到SpringBoot整合,再到高级应用如分布式锁、配置中心、服务注册与发现等。通过完整的代码示例和详细注释,希望能帮助读者从零开始掌握Zookeeper的使用。
Zookeeper作为分布式系统的基石,其强大的协调能力可以帮助我们解决分布式环境中的各种难题。但在实际生产环境中,还需要根据具体场景合理设计和使用,并注意性能优化和监控运维。
完整的SpringBoot整合Zookeeper项目代码可以在GitHub上找到:项目地址

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册