Zookeeper从入门到实战:SpringBoot整合完整指南
一、Zookeeper概述
1.1 什么是Zookeeper
Zookeeper是一个开源的分布式协调服务,由Apache软件基金会维护。它最初是为Hadoop生态系统设计的,但现在已被广泛应用于各种分布式系统中。Zookeeper提供了一种简单而健壮的方式,用于管理分布式环境中的配置信息、命名服务、分布式同步和组服务等。
1.2 Zookeeper的核心特性
- 顺序一致性:客户端的更新请求将按照它们被发送的顺序进行应用
- 原子性:更新操作要么成功要么失败,没有中间状态
- 单一系统映像:无论客户端连接到哪个服务器,都将看到相同的服务视图
- 可靠性:一旦更新被应用,它将从那时起保持,直到客户端覆盖更新
- 及时性:系统的客户端视图保证在一定时间范围内是最新的
1.3 Zookeeper的典型应用场景
- 配置管理:集中式配置管理
- 分布式锁:实现跨JVM的互斥机制
- 集群管理:监控集群节点状态
- 命名服务:类似DNS的服务
- 分布式队列:简单的队列实现
- Leader选举:分布式系统中的主节点选举
二、Zookeeper安装与配置
2.1 单机模式安装
下载Zookeeper
- wget https://downloads.apache.org/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
- tar -zxvf apache-zookeeper-3.7.0-bin.tar.gz
- cd apache-zookeeper-3.7.0-bin
复制代码 配置Zookeeper
创建配置文件conf/zoo.cfg:- # 基本时间单位(毫秒)
- tickTime=2000
- # 数据目录
- dataDir=/tmp/zookeeper
- # 客户端连接端口
- clientPort=2181
- # 初始化连接时能容忍的最长心跳间隔(tickTime的倍数)
- initLimit=10
- # 发送请求和接收响应能容忍的最长心跳间隔
- syncLimit=5
复制代码 启动Zookeeper
验证安装
- bin/zkCli.sh -server 127.0.0.1:2181
复制代码 2.2 集群模式安装
对于生产环境,建议至少部署3个节点的Zookeeper集群。修改每个节点的zoo.cfg:- tickTime=2000
- dataDir=/var/lib/zookeeper
- clientPort=2181
- initLimit=10
- syncLimit=5
- # 集群配置 server.id=host:port1:port2
- server.1=node1:2888:3888
- server.2=node2:2888:3888
- server.3=node3:2888:3888
复制代码 在每个节点的dataDir目录下创建myid文件,内容为对应的server.id中的id:- # 在node1上
- echo 1 > /var/lib/zookeeper/myid
- # 在node2上
- echo 2 > /var/lib/zookeeper/myid
- # 在node3上
- echo 3 > /var/lib/zookeeper/myid
复制代码 三、Zookeeper基础操作
3.1 基本命令操作
通过zkCli.sh连接后可以执行以下命令:- # 查看根节点
- ls /
- # 创建持久节点
- create /myapp "myapp data"
- # 创建临时节点(会话结束自动删除)
- create -e /myapp/tempnode "temp data"
- # 创建顺序节点
- create -s /myapp/seqnode "seq data"
- # 获取节点数据
- get /myapp
- # 设置节点数据
- set /myapp "new data"
- # 删除节点
- delete /myapp/seqnode0000000001
- # 递归删除节点
- rmr /myapp
- # 查看节点状态
- stat /myapp
复制代码 3.2 Zookeeper节点类型
- 持久节点(PERSISTENT):创建后一直存在,除非显式删除
- 临时节点(EPHEMERAL):客户端会话结束时自动删除
- 持久顺序节点(PERSISTENT_SEQUENTIAL):持久节点,但节点名后会附加一个单调递增的数字后缀
- 临时顺序节点(EPHEMERAL_SEQUENTIAL):临时节点,带有序号后缀
3.3 Zookeeper Watch机制
Watch是Zookeeper的一个重要特性,它允许客户端在节点发生变化时收到通知。- # 设置watch
- get /myapp watch
- # 另一个会话修改/myapp节点数据
- set /myapp "changed data"
- # 原会话会收到NodeDataChanged事件
复制代码 四、Zookeeper Java客户端API
4.1 原生Java客户端
首先添加Maven依赖:- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- zookeeper</artifactId>
- <version>3.7.0</version>
- </dependency>
复制代码 基本操作示例
- import org.apache.zookeeper.*;
- import org.apache.zookeeper.data.Stat;
- import java.io.IOException;
- import java.util.concurrent.CountDownLatch;
- public class ZookeeperDemo {
- private static final String CONNECT_STRING = "localhost:2181";
- private static final int SESSION_TIMEOUT = 5000;
- private static ZooKeeper zk;
- private static final CountDownLatch connectedSemaphore = new CountDownLatch(1);
- public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
- // 创建连接
- zk = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- if (Event.KeeperState.SyncConnected == event.getState()) {
- connectedSemaphore.countDown();
- }
- }
- });
- connectedSemaphore.await();
- System.out.println("Zookeeper连接成功");
-
- // 创建节点
- String path = "/test-node";
- String data = "test data";
- zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- System.out.println("创建节点: " + path);
-
- // 获取节点数据
- byte[] nodeData = zk.getData(path, false, null);
- System.out.println("节点数据: " + new String(nodeData));
-
- // 修改节点数据
- String newData = "new test data";
- zk.setData(path, newData.getBytes(), -1);
- System.out.println("修改节点数据");
-
- // 删除节点
- zk.delete(path, -1);
- System.out.println("删除节点: " + path);
-
- // 关闭连接
- zk.close();
- }
- }
复制代码 Watch示例
- public class ZookeeperWatchDemo {
- // ... 同上连接代码
-
- public static void watchDemo() throws KeeperException, InterruptedException {
- String path = "/watch-node";
-
- // 创建节点
- zk.create(path, "init".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
- // 设置watch
- Stat stat = new Stat();
- byte[] data = zk.getData(path, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- System.out.println("收到事件: " + event);
- if (event.getType() == Event.EventType.NodeDataChanged) {
- try {
- // 再次设置watch,实现持续监听
- byte[] newData = zk.getData(path, this, null);
- System.out.println("数据已修改,新值为: " + new String(newData));
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }, stat);
-
- System.out.println("初始数据: " + new String(data));
-
- // 修改数据触发watch
- zk.setData(path, "changed".getBytes(), stat.getVersion());
-
- // 等待watch触发
- Thread.sleep(1000);
-
- // 删除节点
- zk.delete(path, -1);
- }
- }
复制代码 4.2 Curator客户端
Curator是Netflix开源的Zookeeper客户端,提供了更高级的API和常用模式的实现。
添加依赖
- <dependency>
- <groupId>org.apache.curator</groupId>
- curator-framework</artifactId>
- <version>5.2.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- curator-recipes</artifactId>
- <version>5.2.0</version>
- </dependency>
复制代码 基本操作示例
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.retry.ExponentialBackoffRetry;
- import org.apache.zookeeper.CreateMode;
- import org.apache.zookeeper.data.Stat;
- public class CuratorDemo {
- private static final String CONNECT_STRING = "localhost:2181";
- private static final int SESSION_TIMEOUT = 5000;
- private static final int CONNECTION_TIMEOUT = 3000;
- public static void main(String[] args) throws Exception {
- // 创建客户端
- CuratorFramework client = CuratorFrameworkFactory.builder()
- .connectString(CONNECT_STRING)
- .sessionTimeoutMs(SESSION_TIMEOUT)
- .connectionTimeoutMs(CONNECTION_TIMEOUT)
- .retryPolicy(new ExponentialBackoffRetry(1000, 3))
- .build();
-
- // 启动客户端
- client.start();
-
- // 创建节点
- String path = "/curator-node";
- client.create()
- .creatingParentsIfNeeded()
- .withMode(CreateMode.PERSISTENT)
- .forPath(path, "init".getBytes());
- System.out.println("创建节点: " + path);
-
- // 获取节点数据
- byte[] data = client.getData().forPath(path);
- System.out.println("节点数据: " + new String(data));
-
- // 修改节点数据
- Stat stat = client.setData().forPath(path, "changed".getBytes());
- System.out.println("修改节点数据,版本号: " + stat.getVersion());
-
- // 删除节点
- client.delete()
- .guaranteed()
- .deletingChildrenIfNeeded()
- .forPath(path);
- System.out.println("删除节点: " + path);
-
- // 关闭客户端
- client.close();
- }
- }
复制代码 五、SpringBoot整合Zookeeper
5.1 项目搭建
创建SpringBoot项目并添加依赖:- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- spring-boot-starter-web</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.curator</groupId>
- curator-framework</artifactId>
- <version>5.2.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- curator-recipes</artifactId>
- <version>5.2.0</version>
- </dependency>
-
- <dependency>
- <groupId>org.projectlombok</groupId>
- lombok</artifactId>
- <optional>true</optional>
- </dependency>
- </dependencies>
复制代码 5.2 配置类
创建Zookeeper配置类:- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.retry.ExponentialBackoffRetry;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- @Configuration
- public class ZookeeperConfig {
-
- @Value("${zookeeper.connect-string}")
- private String connectString;
-
- @Value("${zookeeper.session-timeout}")
- private int sessionTimeout;
-
- @Value("${zookeeper.connection-timeout}")
- private int connectionTimeout;
-
- @Bean(initMethod = "start", destroyMethod = "close")
- public CuratorFramework curatorFramework() {
- return CuratorFrameworkFactory.builder()
- .connectString(connectString)
- .sessionTimeoutMs(sessionTimeout)
- .connectionTimeoutMs(connectionTimeout)
- .retryPolicy(new ExponentialBackoffRetry(1000, 3))
- .namespace("springboot-demo") // 命名空间隔离
- .build();
- }
- }
复制代码 在application.properties中添加配置:- # Zookeeper配置
- zookeeper.connect-string=localhost:2181
- zookeeper.session-timeout=5000
- zookeeper.connection-timeout=3000
复制代码 5.3 服务类
创建Zookeeper操作服务类:- import lombok.extern.slf4j.Slf4j;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.recipes.cache.ChildData;
- import org.apache.curator.framework.recipes.cache.CuratorCache;
- import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
- import org.apache.zookeeper.CreateMode;
- import org.apache.zookeeper.data.Stat;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import java.nio.charset.StandardCharsets;
- import java.util.List;
- @Slf4j
- @Service
- public class ZookeeperService {
-
- @Autowired
- private CuratorFramework curatorFramework;
-
- /**
- * 创建节点
- */
- public String createNode(String path, String data, CreateMode createMode) throws Exception {
- byte[] dataBytes = data.getBytes(StandardCharsets.UTF_8);
- String createdPath = curatorFramework.create()
- .creatingParentsIfNeeded()
- .withMode(createMode)
- .forPath(path, dataBytes);
- log.info("节点创建成功: {}", createdPath);
- return createdPath;
- }
-
- /**
- * 获取节点数据
- */
- public String getNodeData(String path) throws Exception {
- byte[] dataBytes = curatorFramework.getData().forPath(path);
- return new String(dataBytes, StandardCharsets.UTF_8);
- }
-
- /**
- * 更新节点数据
- */
- public Stat updateNodeData(String path, String data) throws Exception {
- byte[] dataBytes = data.getBytes(StandardCharsets.UTF_8);
- return curatorFramework.setData().forPath(path, dataBytes);
- }
-
- /**
- * 删除节点
- */
- public void deleteNode(String path) throws Exception {
- curatorFramework.delete()
- .guaranteed()
- .deletingChildrenIfNeeded()
- .forPath(path);
- log.info("节点删除成功: {}", path);
- }
-
- /**
- * 检查节点是否存在
- */
- public boolean isNodeExist(String path) throws Exception {
- Stat stat = curatorFramework.checkExists().forPath(path);
- return stat != null;
- }
-
- /**
- * 获取子节点列表
- */
- public List<String> getChildren(String path) throws Exception {
- return curatorFramework.getChildren().forPath(path);
- }
-
- /**
- * 添加节点监听
- */
- public void addNodeListener(String path, NodeListenerCallback callback) {
- CuratorCache cache = CuratorCache.build(curatorFramework, path);
-
- CuratorCacheListener listener = CuratorCacheListener.builder()
- .forCreates(node -> callback.onNodeCreated(node.getPath(), new String(node.getData())))
- .forChanges((oldNode, node) ->
- callback.onNodeUpdated(node.getPath(), new String(node.getData())))
- .forDeletes(node -> callback.onNodeDeleted(node.getPath()))
- .forInitialized(() -> callback.onInitialized()))
- .build();
-
- cache.listenable().addListener(listener);
- cache.start();
- }
-
- public interface NodeListenerCallback {
- default void onNodeCreated(String path, String data) {}
- default void onNodeUpdated(String path, String data) {}
- default void onNodeDeleted(String path) {}
- default void onInitialized() {}
- }
- }
复制代码 5.4 控制器类
创建REST控制器:- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.*;
- @RestController
- @RequestMapping("/zk")
- public class ZookeeperController {
-
- @Autowired
- private ZookeeperService zookeeperService;
-
- @PostMapping("/node")
- public String createNode(@RequestParam String path,
- @RequestParam String data,
- @RequestParam(defaultValue = "PERSISTENT") String mode) throws Exception {
- return zookeeperService.createNode(path, data, CreateMode.valueOf(mode));
- }
-
- @GetMapping("/node")
- public String getNodeData(@RequestParam String path) throws Exception {
- return zookeeperService.getNodeData(path);
- }
-
- @PutMapping("/node")
- public String updateNodeData(@RequestParam String path,
- @RequestParam String data) throws Exception {
- zookeeperService.updateNodeData(path, data);
- return "更新成功";
- }
-
- @DeleteMapping("/node")
- public String deleteNode(@RequestParam String path) throws Exception {
- zookeeperService.deleteNode(path);
- return "删除成功";
- }
-
- @GetMapping("/node/exists")
- public boolean isNodeExist(@RequestParam String path) throws Exception {
- return zookeeperService.isNodeExist(path);
- }
-
- @GetMapping("/node/children")
- public List<String> getChildren(@RequestParam String path) throws Exception {
- return zookeeperService.getChildren(path);
- }
- }
复制代码 六、Zookeeper高级应用
6.1 分布式锁实现
- import lombok.extern.slf4j.Slf4j;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.recipes.locks.InterProcessMutex;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import java.util.concurrent.TimeUnit;
- @Slf4j
- @Service
- public class DistributedLockService {
-
- private static final String LOCK_PATH = "/locks";
-
- @Autowired
- private CuratorFramework curatorFramework;
-
- /**
- * 获取分布式锁
- */
- public boolean acquireLock(String lockName, long waitTime, TimeUnit timeUnit) {
- InterProcessMutex lock = new InterProcessMutex(curatorFramework, LOCK_PATH + "/" + lockName);
- try {
- return lock.acquire(waitTime, timeUnit);
- } catch (Exception e) {
- log.error("获取分布式锁失败", e);
- return false;
- }
- }
-
- /**
- * 释放分布式锁
- */
- public void releaseLock(String lockName) {
- InterProcessMutex lock = new InterProcessMutex(curatorFramework, LOCK_PATH + "/" + lockName);
- try {
- if (lock.isAcquiredInThisProcess()) {
- lock.release();
- }
- } catch (Exception e) {
- log.error("释放分布式锁失败", e);
- }
- }
-
- /**
- * 执行带锁的操作
- */
- public <T> T executeWithLock(String lockName, long waitTime, TimeUnit timeUnit, LockOperation<T> operation) throws Exception {
- InterProcessMutex lock = new InterProcessMutex(curatorFramework, LOCK_PATH + "/" + lockName);
- try {
- if (lock.acquire(waitTime, timeUnit)) {
- return operation.execute();
- } else {
- throw new RuntimeException("获取锁超时");
- }
- } finally {
- if (lock.isAcquiredInThisProcess()) {
- lock.release();
- }
- }
- }
-
- public interface LockOperation<T> {
- T execute() throws Exception;
- }
- }
复制代码 6.2 配置中心实现
- import lombok.extern.slf4j.Slf4j;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.recipes.cache.CuratorCache;
- import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import javax.annotation.PostConstruct;
- import java.nio.charset.StandardCharsets;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- @Slf4j
- @Service
- public class ConfigCenterService {
-
- private static final String CONFIG_PATH = "/config";
- private final Map<String, String> configCache = new ConcurrentHashMap<>();
-
- @Autowired
- private CuratorFramework curatorFramework;
-
- @PostConstruct
- public void init() throws Exception {
- // 初始化配置
- loadAllConfigs();
-
- // 监听配置变化
- watchConfigChanges();
- }
-
- /**
- * 加载所有配置
- */
- private void loadAllConfigs() throws Exception {
- if (curatorFramework.checkExists().forPath(CONFIG_PATH) == null) {
- curatorFramework.create().creatingParentsIfNeeded().forPath(CONFIG_PATH);
- }
-
- List<String> children = curatorFramework.getChildren().forPath(CONFIG_PATH);
- for (String child : children) {
- String path = CONFIG_PATH + "/" + child;
- byte[] data = curatorFramework.getData().forPath(path);
- configCache.put(child, new String(data, StandardCharsets.UTF_8));
- }
- }
-
- /**
- * 监听配置变化
- */
- private void watchConfigChanges() {
- CuratorCache cache = CuratorCache.build(curatorFramework, CONFIG_PATH);
-
- CuratorCacheListener listener = CuratorCacheListener.builder()
- .forCreates(node -> {
- String key = node.getPath().replace(CONFIG_PATH + "/", "");
- configCache.put(key, new String(node.getData()));
- log.info("配置新增: {}={}", key, configCache.get(key));
- })
- .forChanges((oldNode, node) -> {
- String key = node.getPath().replace(CONFIG_PATH + "/", "");
- configCache.put(key, new String(node.getData()));
- log.info("配置修改: {}={}", key, configCache.get(key));
- })
- .forDeletes(node -> {
- String key = node.getPath().replace(CONFIG_PATH + "/", "");
- configCache.remove(key);
- log.info("配置删除: {}", key);
- })
- .build();
-
- cache.listenable().addListener(listener);
- cache.start();
- }
-
- /**
- * 获取配置
- */
- public String getConfig(String key) {
- return configCache.get(key);
- }
-
- /**
- * 获取所有配置
- */
- public Map<String, String> getAllConfigs() {
- return new HashMap<>(configCache);
- }
-
- /**
- * 设置配置
- */
- public void setConfig(String key, String value) throws Exception {
- String path = CONFIG_PATH + "/" + key;
- byte[] data = value.getBytes(StandardCharsets.UTF_8);
-
- if (curatorFramework.checkExists().forPath(path) == null) {
- curatorFramework.create().forPath(path, data);
- } else {
- curatorFramework.setData().forPath(path, data);
- }
- }
-
- /**
- * 删除配置
- */
- public void deleteConfig(String key) throws Exception {
- String path = CONFIG_PATH + "/" + key;
- curatorFramework.delete().forPath(path);
- }
- }
复制代码 6.3 服务注册与发现
服务注册
- import lombok.extern.slf4j.Slf4j;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.zookeeper.CreateMode;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Service;
- import javax.annotation.PostConstruct;
- import java.net.InetAddress;
- @Slf4j
- @Service
- public class ServiceRegistry {
-
- private static final String REGISTRY_PATH = "/services";
-
- @Autowired
- private CuratorFramework curatorFramework;
-
- @Value("${server.port}")
- private int port;
-
- private String servicePath;
-
- @PostConstruct
- public void register() throws Exception {
- // 创建服务根节点(持久节点)
- if (curatorFramework.checkExists().forPath(REGISTRY_PATH) == null) {
- curatorFramework.create()
- .creatingParentsIfNeeded()
- .forPath(REGISTRY_PATH, "Service Registry".getBytes());
- }
-
- // 获取本机IP地址
- String ip = InetAddress.getLocalHost().getHostAddress();
- String serviceInstance = ip + ":" + port;
-
- // 创建临时顺序节点
- servicePath = curatorFramework.create()
- .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
- .forPath(REGISTRY_PATH + "/instance-", serviceInstance.getBytes());
-
- log.info("服务注册成功: {}", servicePath);
- }
-
- public String getServicePath() {
- return servicePath;
- }
- }
复制代码 服务发现
- import lombok.extern.slf4j.Slf4j;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.recipes.cache.CuratorCache;
- import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import javax.annotation.PostConstruct;
- import java.util.ArrayList;
- import java.util.List;
- @Slf4j
- @Service
- public class ServiceDiscovery {
-
- private static final String REGISTRY_PATH = "/services";
- private final List<String> serviceInstances = new ArrayList<>();
-
- @Autowired
- private CuratorFramework curatorFramework;
-
- @PostConstruct
- public void init() throws Exception {
- // 初始化服务列表
- discoverServices();
-
- // 监听服务变化
- watchServices();
- }
-
- /**
- * 发现可用服务
- */
- private void discoverServices() throws Exception {
- serviceInstances.clear();
-
- if (curatorFramework.checkExists().forPath(REGISTRY_PATH) != null) {
- List<String> instances = curatorFramework.getChildren().forPath(REGISTRY_PATH);
- for (String instance : instances) {
- String instancePath = REGISTRY_PATH + "/" + instance;
- byte[] data = curatorFramework.getData().forPath(instancePath);
- serviceInstances.add(new String(data));
- }
- }
-
- log.info("当前可用服务实例: {}", serviceInstances);
- }
-
- /**
- * 监听服务变化
- */
- private void watchServices() {
- CuratorCache cache = CuratorCache.build(curatorFramework, REGISTRY_PATH);
-
- CuratorCacheListener listener = CuratorCacheListener.builder()
- .forCreates(node -> {
- try {
- discoverServices();
- } catch (Exception e) {
- log.error("处理服务新增事件失败", e);
- }
- })
- .forChanges((oldNode, node) -> {
- try {
- discoverServices();
- } catch (Exception e) {
- log.error("处理服务变更事件失败", e);
- }
- })
- .forDeletes(node -> {
- try {
- discoverServices();
- } catch (Exception e) {
- log.error("处理服务删除事件失败", e);
- }
- })
- .build();
-
- cache.listenable().addListener(listener);
- cache.start();
- }
-
- /**
- * 获取所有服务实例
- */
- public List<String> getAllServiceInstances() {
- return new ArrayList<>(serviceInstances);
- }
-
- /**
- * 随机获取一个服务实例(简单的负载均衡)
- */
- public String getRandomServiceInstance() {
- if (serviceInstances.isEmpty()) {
- return null;
- }
- int index = (int) (Math.random() * serviceInstances.size());
- return serviceInstances.get(index);
- }
- }
复制代码 七、生产环境注意事项
7.1 Zookeeper性能优化
- 数据目录和事务日志目录分离:将dataDir和dataLogDir配置到不同的物理磁盘
- JVM调优:适当增加JVM堆内存,设置合适的GC参数
- 快照清理:配置autopurge.snapRetainCount和autopurge.purgeInterval自动清理旧快照
- 限制客户端连接数:合理设置maxClientCnxns参数
7.2 监控与运维
- 使用四字命令监控:
- echo stat | nc localhost 2181 查看服务器状态
- echo mntr | nc localhost 2181 查看监控信息
- echo cons | nc localhost 2181 查看客户端连接
- JMX监控:启用JMX监控Zookeeper运行状态
- 日志管理:合理配置日志级别和日志滚动策略
7.3 常见问题解决
- 连接问题:
- 检查防火墙设置
- 确认Zookeeper服务是否正常运行
- 检查客户端和服务端版本是否兼容
- 节点已存在错误:
- 使用带版本号的API操作节点
- 先检查节点是否存在再操作
- 会话过期:
八、总结
本文从Zookeeper的基本概念讲起,详细介绍了安装配置、基本操作、Java客户端使用,到SpringBoot整合,再到高级应用如分布式锁、配置中心、服务注册与发现等。通过完整的代码示例和详细注释,希望能帮助读者从零开始掌握Zookeeper的使用。
Zookeeper作为分布式系统的基石,其强大的协调能力可以帮助我们解决分布式环境中的各种难题。但在实际生产环境中,还需要根据具体场景合理设计和使用,并注意性能优化和监控运维。
完整的SpringBoot整合Zookeeper项目代码可以在GitHub上找到:项目地址
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |