恙髡 发表于 2025-6-9 15:55:11

【Zookeeper从入门到实战】SpringBoot整合完整指南

Zookeeper从入门到实战:SpringBoot整合完整指南

一、Zookeeper概述

1.1 什么是Zookeeper

Zookeeper是一个开源的分布式协调服务,由Apache软件基金会维护。它最初是为Hadoop生态系统设计的,但现在已被广泛应用于各种分布式系统中。Zookeeper提供了一种简单而健壮的方式,用于管理分布式环境中的配置信息、命名服务、分布式同步和组服务等。
1.2 Zookeeper的核心特性


[*]顺序一致性:客户端的更新请求将按照它们被发送的顺序进行应用
[*]原子性:更新操作要么成功要么失败,没有中间状态
[*]单一系统映像:无论客户端连接到哪个服务器,都将看到相同的服务视图
[*]可靠性:一旦更新被应用,它将从那时起保持,直到客户端覆盖更新
[*]及时性:系统的客户端视图保证在一定时间范围内是最新的
1.3 Zookeeper的典型应用场景


[*]配置管理:集中式配置管理
[*]分布式锁:实现跨JVM的互斥机制
[*]集群管理:监控集群节点状态
[*]命名服务:类似DNS的服务
[*]分布式队列:简单的队列实现
[*]Leader选举:分布式系统中的主节点选举
二、Zookeeper安装与配置

2.1 单机模式安装

下载Zookeeper

wget https://downloads.apache.org/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
tar -zxvf apache-zookeeper-3.7.0-bin.tar.gz
cd apache-zookeeper-3.7.0-bin配置Zookeeper

创建配置文件conf/zoo.cfg:
# 基本时间单位(毫秒)
tickTime=2000
# 数据目录
dataDir=/tmp/zookeeper
# 客户端连接端口
clientPort=2181
# 初始化连接时能容忍的最长心跳间隔(tickTime的倍数)
initLimit=10
# 发送请求和接收响应能容忍的最长心跳间隔
syncLimit=5启动Zookeeper

bin/zkServer.sh start验证安装

bin/zkCli.sh -server 127.0.0.1:21812.2 集群模式安装

对于生产环境,建议至少部署3个节点的Zookeeper集群。修改每个节点的zoo.cfg:
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=10
syncLimit=5
# 集群配置 server.id=host:port1:port2
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888在每个节点的dataDir目录下创建myid文件,内容为对应的server.id中的id:
# 在node1上
echo 1 > /var/lib/zookeeper/myid

# 在node2上
echo 2 > /var/lib/zookeeper/myid

# 在node3上
echo 3 > /var/lib/zookeeper/myid三、Zookeeper基础操作

3.1 基本命令操作

通过zkCli.sh连接后可以执行以下命令:
# 查看根节点
ls /

# 创建持久节点
create /myapp "myapp data"

# 创建临时节点(会话结束自动删除)
create -e /myapp/tempnode "temp data"

# 创建顺序节点
create -s /myapp/seqnode "seq data"

# 获取节点数据
get /myapp

# 设置节点数据
set /myapp "new data"

# 删除节点
delete /myapp/seqnode0000000001

# 递归删除节点
rmr /myapp

# 查看节点状态
stat /myapp3.2 Zookeeper节点类型


[*]持久节点(PERSISTENT):创建后一直存在,除非显式删除
[*]临时节点(EPHEMERAL):客户端会话结束时自动删除
[*]持久顺序节点(PERSISTENT_SEQUENTIAL):持久节点,但节点名后会附加一个单调递增的数字后缀
[*]临时顺序节点(EPHEMERAL_SEQUENTIAL):临时节点,带有序号后缀
3.3 Zookeeper Watch机制

Watch是Zookeeper的一个重要特性,它允许客户端在节点发生变化时收到通知。
# 设置watch
get /myapp watch

# 另一个会话修改/myapp节点数据
set /myapp "changed data"

# 原会话会收到NodeDataChanged事件四、Zookeeper Java客户端API

4.1 原生Java客户端

首先添加Maven依赖:
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    zookeeper</artifactId>
    <version>3.7.0</version>
</dependency>基本操作示例

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class ZookeeperDemo {
    private static final String CONNECT_STRING = "localhost:2181";
    private static final int SESSION_TIMEOUT = 5000;
    private static ZooKeeper zk;
    private static final CountDownLatch connectedSemaphore = new CountDownLatch(1);

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
      // 创建连接
      zk = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (Event.KeeperState.SyncConnected == event.getState()) {
                  connectedSemaphore.countDown();
                }
            }
      });
      connectedSemaphore.await();
      System.out.println("Zookeeper连接成功");
      
      // 创建节点
      String path = "/test-node";
      String data = "test data";
      zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
      System.out.println("创建节点: " + path);
      
      // 获取节点数据
      byte[] nodeData = zk.getData(path, false, null);
      System.out.println("节点数据: " + new String(nodeData));
      
      // 修改节点数据
      String newData = "new test data";
      zk.setData(path, newData.getBytes(), -1);
      System.out.println("修改节点数据");
      
      // 删除节点
      zk.delete(path, -1);
      System.out.println("删除节点: " + path);
      
      // 关闭连接
      zk.close();
    }
}Watch示例

public class ZookeeperWatchDemo {
    // ... 同上连接代码
   
    public static void watchDemo() throws KeeperException, InterruptedException {
      String path = "/watch-node";
      
      // 创建节点
      zk.create(path, "init".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
      
      // 设置watch
      Stat stat = new Stat();
      byte[] data = zk.getData(path, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("收到事件: " + event);
                if (event.getType() == Event.EventType.NodeDataChanged) {
                  try {
                        // 再次设置watch,实现持续监听
                        byte[] newData = zk.getData(path, this, null);
                        System.out.println("数据已修改,新值为: " + new String(newData));
                  } catch (Exception e) {
                        e.printStackTrace();
                  }
                }
            }
      }, stat);
      
      System.out.println("初始数据: " + new String(data));
      
      // 修改数据触发watch
      zk.setData(path, "changed".getBytes(), stat.getVersion());
      
      // 等待watch触发
      Thread.sleep(1000);
      
      // 删除节点
      zk.delete(path, -1);
    }
}4.2 Curator客户端

Curator是Netflix开源的Zookeeper客户端,提供了更高级的API和常用模式的实现。
添加依赖

<dependency>
    <groupId>org.apache.curator</groupId>
    curator-framework</artifactId>
    <version>5.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    curator-recipes</artifactId>
    <version>5.2.0</version>
</dependency>基本操作示例

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

public class CuratorDemo {
    private static final String CONNECT_STRING = "localhost:2181";
    private static final int SESSION_TIMEOUT = 5000;
    private static final int CONNECTION_TIMEOUT = 3000;

    public static void main(String[] args) throws Exception {
      // 创建客户端
      CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(CONNECT_STRING)
                .sessionTimeoutMs(SESSION_TIMEOUT)
                .connectionTimeoutMs(CONNECTION_TIMEOUT)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .build();
      
      // 启动客户端
      client.start();
      
      // 创建节点
      String path = "/curator-node";
      client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath(path, "init".getBytes());
      System.out.println("创建节点: " + path);
      
      // 获取节点数据
      byte[] data = client.getData().forPath(path);
      System.out.println("节点数据: " + new String(data));
      
      // 修改节点数据
      Stat stat = client.setData().forPath(path, "changed".getBytes());
      System.out.println("修改节点数据,版本号: " + stat.getVersion());
      
      // 删除节点
      client.delete()
                .guaranteed()
                .deletingChildrenIfNeeded()
                .forPath(path);
      System.out.println("删除节点: " + path);
      
      // 关闭客户端
      client.close();
    }
}五、SpringBoot整合Zookeeper

5.1 项目搭建

创建SpringBoot项目并添加依赖:
<dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      spring-boot-starter-web</artifactId>
    </dependency>
   
    <dependency>
      <groupId>org.apache.curator</groupId>
      curator-framework</artifactId>
      <version>5.2.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.curator</groupId>
      curator-recipes</artifactId>
      <version>5.2.0</version>
    </dependency>
   
    <dependency>
      <groupId>org.projectlombok</groupId>
      lombok</artifactId>
      <optional>true</optional>
    </dependency>
</dependencies>5.2 配置类

创建Zookeeper配置类:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ZookeeperConfig {
   
    @Value("${zookeeper.connect-string}")
    private String connectString;
   
    @Value("${zookeeper.session-timeout}")
    private int sessionTimeout;
   
    @Value("${zookeeper.connection-timeout}")
    private int connectionTimeout;
   
    @Bean(initMethod = "start", destroyMethod = "close")
    public CuratorFramework curatorFramework() {
      return CuratorFrameworkFactory.builder()
                .connectString(connectString)
                .sessionTimeoutMs(sessionTimeout)
                .connectionTimeoutMs(connectionTimeout)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .namespace("springboot-demo")// 命名空间隔离
                .build();
    }
}在application.properties中添加配置:
# Zookeeper配置
zookeeper.connect-string=localhost:2181
zookeeper.session-timeout=5000
zookeeper.connection-timeout=30005.3 服务类

创建Zookeeper操作服务类:
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;
import java.util.List;

@Slf4j
@Service
public class ZookeeperService {
   
    @Autowired
    private CuratorFramework curatorFramework;
   
    /**
   * 创建节点
   */
    public String createNode(String path, String data, CreateMode createMode) throws Exception {
      byte[] dataBytes = data.getBytes(StandardCharsets.UTF_8);
      String createdPath = curatorFramework.create()
                .creatingParentsIfNeeded()
                .withMode(createMode)
                .forPath(path, dataBytes);
      log.info("节点创建成功: {}", createdPath);
      return createdPath;
    }
   
    /**
   * 获取节点数据
   */
    public String getNodeData(String path) throws Exception {
      byte[] dataBytes = curatorFramework.getData().forPath(path);
      return new String(dataBytes, StandardCharsets.UTF_8);
    }
   
    /**
   * 更新节点数据
   */
    public Stat updateNodeData(String path, String data) throws Exception {
      byte[] dataBytes = data.getBytes(StandardCharsets.UTF_8);
      return curatorFramework.setData().forPath(path, dataBytes);
    }
   
    /**
   * 删除节点
   */
    public void deleteNode(String path) throws Exception {
      curatorFramework.delete()
                .guaranteed()
                .deletingChildrenIfNeeded()
                .forPath(path);
      log.info("节点删除成功: {}", path);
    }
   
    /**
   * 检查节点是否存在
   */
    public boolean isNodeExist(String path) throws Exception {
      Stat stat = curatorFramework.checkExists().forPath(path);
      return stat != null;
    }
   
    /**
   * 获取子节点列表
   */
    public List<String> getChildren(String path) throws Exception {
      return curatorFramework.getChildren().forPath(path);
    }
   
    /**
   * 添加节点监听
   */
    public void addNodeListener(String path, NodeListenerCallback callback) {
      CuratorCache cache = CuratorCache.build(curatorFramework, path);
      
      CuratorCacheListener listener = CuratorCacheListener.builder()
                .forCreates(node -> callback.onNodeCreated(node.getPath(), new String(node.getData())))
                .forChanges((oldNode, node) ->
                        callback.onNodeUpdated(node.getPath(), new String(node.getData())))
                .forDeletes(node -> callback.onNodeDeleted(node.getPath()))
                .forInitialized(() -> callback.onInitialized()))
                .build();
      
      cache.listenable().addListener(listener);
      cache.start();
    }
   
    public interface NodeListenerCallback {
      default void onNodeCreated(String path, String data) {}
      default void onNodeUpdated(String path, String data) {}
      default void onNodeDeleted(String path) {}
      default void onInitialized() {}
    }
}5.4 控制器类

创建REST控制器:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/zk")
public class ZookeeperController {
   
    @Autowired
    private ZookeeperService zookeeperService;
   
    @PostMapping("/node")
    public String createNode(@RequestParam String path,
                           @RequestParam String data,
                           @RequestParam(defaultValue = "PERSISTENT") String mode) throws Exception {
      return zookeeperService.createNode(path, data, CreateMode.valueOf(mode));
    }
   
    @GetMapping("/node")
    public String getNodeData(@RequestParam String path) throws Exception {
      return zookeeperService.getNodeData(path);
    }
   
    @PutMapping("/node")
    public String updateNodeData(@RequestParam String path,
                               @RequestParam String data) throws Exception {
      zookeeperService.updateNodeData(path, data);
      return "更新成功";
    }
   
    @DeleteMapping("/node")
    public String deleteNode(@RequestParam String path) throws Exception {
      zookeeperService.deleteNode(path);
      return "删除成功";
    }
   
    @GetMapping("/node/exists")
    public boolean isNodeExist(@RequestParam String path) throws Exception {
      return zookeeperService.isNodeExist(path);
    }
   
    @GetMapping("/node/children")
    public List<String> getChildren(@RequestParam String path) throws Exception {
      return zookeeperService.getChildren(path);
    }
}六、Zookeeper高级应用

6.1 分布式锁实现

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

@Slf4j
@Service
public class DistributedLockService {
   
    private static final String LOCK_PATH = "/locks";
   
    @Autowired
    private CuratorFramework curatorFramework;
   
    /**
   * 获取分布式锁
   */
    public boolean acquireLock(String lockName, long waitTime, TimeUnit timeUnit) {
      InterProcessMutex lock = new InterProcessMutex(curatorFramework, LOCK_PATH + "/" + lockName);
      try {
            return lock.acquire(waitTime, timeUnit);
      } catch (Exception e) {
            log.error("获取分布式锁失败", e);
            return false;
      }
    }
   
    /**
   * 释放分布式锁
   */
    public void releaseLock(String lockName) {
      InterProcessMutex lock = new InterProcessMutex(curatorFramework, LOCK_PATH + "/" + lockName);
      try {
            if (lock.isAcquiredInThisProcess()) {
                lock.release();
            }
      } catch (Exception e) {
            log.error("释放分布式锁失败", e);
      }
    }
   
    /**
   * 执行带锁的操作
   */
    public <T> T executeWithLock(String lockName, long waitTime, TimeUnit timeUnit, LockOperation<T> operation) throws Exception {
      InterProcessMutex lock = new InterProcessMutex(curatorFramework, LOCK_PATH + "/" + lockName);
      try {
            if (lock.acquire(waitTime, timeUnit)) {
                return operation.execute();
            } else {
                throw new RuntimeException("获取锁超时");
            }
      } finally {
            if (lock.isAcquiredInThisProcess()) {
                lock.release();
            }
      }
    }
   
    public interface LockOperation<T> {
      T execute() throws Exception;
    }
}6.2 配置中心实现

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
@Service
public class ConfigCenterService {
   
    private static final String CONFIG_PATH = "/config";
    private final Map<String, String> configCache = new ConcurrentHashMap<>();
   
    @Autowired
    private CuratorFramework curatorFramework;
   
    @PostConstruct
    public void init() throws Exception {
      // 初始化配置
      loadAllConfigs();
      
      // 监听配置变化
      watchConfigChanges();
    }
   
    /**
   * 加载所有配置
   */
    private void loadAllConfigs() throws Exception {
      if (curatorFramework.checkExists().forPath(CONFIG_PATH) == null) {
            curatorFramework.create().creatingParentsIfNeeded().forPath(CONFIG_PATH);
      }
      
      List<String> children = curatorFramework.getChildren().forPath(CONFIG_PATH);
      for (String child : children) {
            String path = CONFIG_PATH + "/" + child;
            byte[] data = curatorFramework.getData().forPath(path);
            configCache.put(child, new String(data, StandardCharsets.UTF_8));
      }
    }
   
    /**
   * 监听配置变化
   */
    private void watchConfigChanges() {
      CuratorCache cache = CuratorCache.build(curatorFramework, CONFIG_PATH);
      
      CuratorCacheListener listener = CuratorCacheListener.builder()
                .forCreates(node -> {
                  String key = node.getPath().replace(CONFIG_PATH + "/", "");
                  configCache.put(key, new String(node.getData()));
                  log.info("配置新增: {}={}", key, configCache.get(key));
                })
                .forChanges((oldNode, node) -> {
                  String key = node.getPath().replace(CONFIG_PATH + "/", "");
                  configCache.put(key, new String(node.getData()));
                  log.info("配置修改: {}={}", key, configCache.get(key));
                })
                .forDeletes(node -> {
                  String key = node.getPath().replace(CONFIG_PATH + "/", "");
                  configCache.remove(key);
                  log.info("配置删除: {}", key);
                })
                .build();
      
      cache.listenable().addListener(listener);
      cache.start();
    }
   
    /**
   * 获取配置
   */
    public String getConfig(String key) {
      return configCache.get(key);
    }
   
    /**
   * 获取所有配置
   */
    public Map<String, String> getAllConfigs() {
      return new HashMap<>(configCache);
    }
   
    /**
   * 设置配置
   */
    public void setConfig(String key, String value) throws Exception {
      String path = CONFIG_PATH + "/" + key;
      byte[] data = value.getBytes(StandardCharsets.UTF_8);
      
      if (curatorFramework.checkExists().forPath(path) == null) {
            curatorFramework.create().forPath(path, data);
      } else {
            curatorFramework.setData().forPath(path, data);
      }
    }
   
    /**
   * 删除配置
   */
    public void deleteConfig(String key) throws Exception {
      String path = CONFIG_PATH + "/" + key;
      curatorFramework.delete().forPath(path);
    }
}6.3 服务注册与发现

服务注册

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.net.InetAddress;

@Slf4j
@Service
public class ServiceRegistry {
   
    private static final String REGISTRY_PATH = "/services";
   
    @Autowired
    private CuratorFramework curatorFramework;
   
    @Value("${server.port}")
    private int port;
   
    private String servicePath;
   
    @PostConstruct
    public void register() throws Exception {
      // 创建服务根节点(持久节点)
      if (curatorFramework.checkExists().forPath(REGISTRY_PATH) == null) {
            curatorFramework.create()
                  .creatingParentsIfNeeded()
                  .forPath(REGISTRY_PATH, "Service Registry".getBytes());
      }
      
      // 获取本机IP地址
      String ip = InetAddress.getLocalHost().getHostAddress();
      String serviceInstance = ip + ":" + port;
      
      // 创建临时顺序节点
      servicePath = curatorFramework.create()
                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                .forPath(REGISTRY_PATH + "/instance-", serviceInstance.getBytes());
      
      log.info("服务注册成功: {}", servicePath);
    }
   
    public String getServicePath() {
      return servicePath;
    }
}服务发现

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;

@Slf4j
@Service
public class ServiceDiscovery {
   
    private static final String REGISTRY_PATH = "/services";
    private final List<String> serviceInstances = new ArrayList<>();
   
    @Autowired
    private CuratorFramework curatorFramework;
   
    @PostConstruct
    public void init() throws Exception {
      // 初始化服务列表
      discoverServices();
      
      // 监听服务变化
      watchServices();
    }
   
    /**
   * 发现可用服务
   */
    private void discoverServices() throws Exception {
      serviceInstances.clear();
      
      if (curatorFramework.checkExists().forPath(REGISTRY_PATH) != null) {
            List<String> instances = curatorFramework.getChildren().forPath(REGISTRY_PATH);
            for (String instance : instances) {
                String instancePath = REGISTRY_PATH + "/" + instance;
                byte[] data = curatorFramework.getData().forPath(instancePath);
                serviceInstances.add(new String(data));
            }
      }
      
      log.info("当前可用服务实例: {}", serviceInstances);
    }
   
    /**
   * 监听服务变化
   */
    private void watchServices() {
      CuratorCache cache = CuratorCache.build(curatorFramework, REGISTRY_PATH);
      
      CuratorCacheListener listener = CuratorCacheListener.builder()
                .forCreates(node -> {
                  try {
                        discoverServices();
                  } catch (Exception e) {
                        log.error("处理服务新增事件失败", e);
                  }
                })
                .forChanges((oldNode, node) -> {
                  try {
                        discoverServices();
                  } catch (Exception e) {
                        log.error("处理服务变更事件失败", e);
                  }
                })
                .forDeletes(node -> {
                  try {
                        discoverServices();
                  } catch (Exception e) {
                        log.error("处理服务删除事件失败", e);
                  }
                })
                .build();
      
      cache.listenable().addListener(listener);
      cache.start();
    }
   
    /**
   * 获取所有服务实例
   */
    public List<String> getAllServiceInstances() {
      return new ArrayList<>(serviceInstances);
    }
   
    /**
   * 随机获取一个服务实例(简单的负载均衡)
   */
    public String getRandomServiceInstance() {
      if (serviceInstances.isEmpty()) {
            return null;
      }
      int index = (int) (Math.random() * serviceInstances.size());
      return serviceInstances.get(index);
    }
}七、生产环境注意事项

7.1 Zookeeper性能优化


[*]数据目录和事务日志目录分离:将dataDir和dataLogDir配置到不同的物理磁盘
[*]JVM调优:适当增加JVM堆内存,设置合适的GC参数
[*]快照清理:配置autopurge.snapRetainCount和autopurge.purgeInterval自动清理旧快照
[*]限制客户端连接数:合理设置maxClientCnxns参数
7.2 监控与运维


[*]使用四字命令监控:

[*]echo stat | nc localhost 2181 查看服务器状态
[*]echo mntr | nc localhost 2181 查看监控信息
[*]echo cons | nc localhost 2181 查看客户端连接

[*]JMX监控:启用JMX监控Zookeeper运行状态
[*]日志管理:合理配置日志级别和日志滚动策略
7.3 常见问题解决


[*]连接问题:

[*]检查防火墙设置
[*]确认Zookeeper服务是否正常运行
[*]检查客户端和服务端版本是否兼容

[*]节点已存在错误:

[*]使用带版本号的API操作节点
[*]先检查节点是否存在再操作

[*]会话过期:

[*]增加会话超时时间
[*]实现会话过期后的重连逻辑

八、总结

本文从Zookeeper的基本概念讲起,详细介绍了安装配置、基本操作、Java客户端使用,到SpringBoot整合,再到高级应用如分布式锁、配置中心、服务注册与发现等。通过完整的代码示例和详细注释,希望能帮助读者从零开始掌握Zookeeper的使用。
Zookeeper作为分布式系统的基石,其强大的协调能力可以帮助我们解决分布式环境中的各种难题。但在实际生产环境中,还需要根据具体场景合理设计和使用,并注意性能优化和监控运维。
完整的SpringBoot整合Zookeeper项目代码可以在GitHub上找到:项目地址

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: 【Zookeeper从入门到实战】SpringBoot整合完整指南