红弘丽 发表于 2026-1-23 10:55:00

玩转 ZooKeeper之分布式锁

上一篇已经给出了选举leader执行任务的案例,接下来将领导者选举例子改成分布式锁(Distributed Lock)的实现方式。 模拟一个高并发扣减库存的场景:多个节点同时抢购同一商品(库存=100),使用 ZooKeeper 分布式锁确保同一时刻只有一个节点能扣库存,避免超卖。
核心区别回顾:

[*]领导者选举:全局只有一个 Leader 长期持有,一直执行任务。
[*]分布式锁:谁需要操作资源就去抢锁,用完立即释放,其他节点可以继续抢。
distributed-lock-service/
├── pom.xml
├── src/
│   └── main/
│       ├── java/
│       │   └── com/example/
│       │       ├── DistributedLock.java// 分布式锁核心
│       │       ├── InventoryService.java // 库存扣减服务
│       │       └── App.java             // 主入口
│       └── resources/
│         └── application.propertiespom.xml(依赖同前,添加 Curator 简化实现,生产中推荐 Curator 而非原生 ZooKeeper API)
1 <dependencies>
2   
3   <dependency>
4         <groupId>org.apache.zookeeper</groupId>
5         zookeeper</artifactId>
6         <version>3.8.0</version>
7   </dependency>
8   
9   <dependency>
10         <groupId>org.apache.curator</groupId>
11         curator-recipes</artifactId>
12         <version>5.7.0</version>
13   </dependency>
14   <dependency>
15         <groupId>org.apache.curator</groupId>
16         curator-framework</artifactId>
17         <version>5.7.0</version>
18   </dependency>
19   
20   <dependency>
21         <groupId>org.slf4j</groupId>
22         slf4j-simple</artifactId>
23         <version>1.7.36</version>
24   </dependency>
25 </dependencies>1. 配置(application.properties)
1 zk.connectString=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181
2 zk.sessionTimeout=5000
3 zk.connectionTimeout=3000
4
5 # 分布式锁根路径
6 lock.rootPath=/locks
7 # 具体锁路径(这里模拟商品库存锁)
8 lock.path=/locks/inventory/product-123452. 核心代码
DistributedLock.java(基于 Curator 的可重入公平分布式锁)
1 package com.example;
2
3 import org.apache.curator.framework.CuratorFramework;
4 import org.apache.curator.framework.CuratorFrameworkFactory;
5 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
6 import org.apache.curator.retry.ExponentialBackoffRetry;
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9
10 import java.util.concurrent.TimeUnit;
11
12 public class DistributedLock {
13   private static final Logger logger = LoggerFactory.getLogger(DistributedLock.class);
14
15   private final CuratorFramework client;
16   private final InterProcessMutex lock;
17   private final String lockPath;
18
19   public DistributedLock(String zkConnectString, int sessionTimeout, String lockPath) {
20         this.lockPath = lockPath;
21
22         // Curator 客户端(带重试机制)
23         ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
24         this.client = CuratorFrameworkFactory.newClient(zkConnectString, sessionTimeout, 3000, retryPolicy);
25         client.start();
26
27         // 可重入公平分布式锁
28         this.lock = new InterProcessMutex(client, lockPath);
29   }
30
31   /**
32      * 尝试获取锁,超时则返回 false
33      */
34   public boolean acquire(long timeout, TimeUnit unit) {
35         try {
36             boolean acquired = lock.acquire(timeout, unit);
37             if (acquired) {
38               logger.info("Lock acquired: {}", lockPath);
39             } else {
40               logger.warn("Failed to acquire lock within timeout: {}", lockPath);
41             }
42             return acquired;
43         } catch (Exception e) {
44             logger.error("Error acquiring lock", e);
45             return false;
46         }
47   }
48
49   /**
50      * 释放锁
51      */
52   public void release() {
53         try {
54             if (lock.isAcquiredInThisProcess()) {
55               lock.release();
56               logger.info("Lock released: {}", lockPath);
57             }
58         } catch (Exception e) {
59             logger.error("Error releasing lock", e);
60         }
61   }
62
63   public void close() {
64         client.close();
65   }
66 }InventoryService.java(模拟库存扣减)
1 package com.example;
2
3 import org.slf4j.Logger;
4 import org.slf4j.LoggerFactory;
5
6 import java.util.concurrent.atomic.AtomicInteger;
7
8 public class InventoryService {
9   private static final Logger logger = LoggerFactory.getLogger(InventoryService.class);
10
11   // 模拟库存(实际应从数据库读取)
12   private final AtomicInteger stock = new AtomicInteger(100);
13
14   /**
15      * 扣减库存(模拟业务逻辑)
16      */
17   public void deductStock(int quantity) {
18         if (stock.get() < quantity) {
19             logger.warn("库存不足!当前库存: {}", stock.get());
20             return;
21         }
22
23         // 模拟数据库操作耗时
24         try {
25             Thread.sleep(50); // 模拟网络延迟
26         } catch (InterruptedException e) {
27             Thread.currentThread().interrupt();
28         }
29
30         int newStock = stock.addAndGet(-quantity);
31         logger.info("扣减成功!扣减数量: {},剩余库存: {}", quantity, newStock);
32   }
33
34   public int getStock() {
35         return stock.get();
36   }
37 }App.java(主入口:模拟 10 个并发请求抢锁)
1 package com.example;
2
3 import java.io.IOException;
4 import java.util.Properties;
5 import java.util.concurrent.ExecutorService;
6 import java.util.concurrent.Executors;
7 import java.util.concurrent.TimeUnit;
8
9 public class App {
10   public static void main(String[] args) throws IOException, InterruptedException {
11         Properties props = new Properties();
12         props.load(App.class.getClassLoader().getResourceAsStream("application.properties"));
13
14         String zkConnect = props.getProperty("zk.connectString");
15         int sessionTimeout = Integer.parseInt(props.getProperty("zk.sessionTimeout"));
16         String lockPath = props.getProperty("lock.path");
17
18         InventoryService inventory = new InventoryService();
19         DistributedLock lock = new DistributedLock(zkConnect, sessionTimeout, lockPath);
20
21         // 模拟 10 个并发请求(实际生产中来自不同服务实例或线程)
22         ExecutorService executor = Executors.newFixedThreadPool(10);
23         for (int i = 0; i < 10; i++) {
24             final int orderId = i + 1;
25             executor.submit(() -> {
26               System.out.println("订单 " + orderId + " 开始尝试扣库存...");
27
28               // 尝试获取锁(超时 3 秒)
29               if (lock.acquire(3, TimeUnit.SECONDS)) {
30                     try {
31                         // 临界区:扣库存
32                         inventory.deductStock(1);
33                     } finally {
34                         // 必须释放锁!
35                         lock.release();
36                     }
37               } else {
38                     System.out.println("订单 " + orderId + " 获取锁超时,放弃本次扣减");
39               }
40             });
41         }
42
43         executor.shutdown();
44         executor.awaitTermination(30, TimeUnit.SECONDS);
45
46         System.out.println("最终剩余库存: " + inventory.getStock());
47         lock.close();
48   }
49 }部署方式(与领导者选举完全相同)

[*]3 台服务器(node1、node2、node3)
[*]安装 ZooKeeper 集群(同前)
[*]打包 JAR:mvn clean package
[*]每台服务器上传 JAR + application.properties
[*]启动脚本(start.sh)同前,但 node.id 不需要了(分布式锁不需要唯一 ID)nohup java -jar distributed-lock-service-1.0-SNAPSHOT.jar > service.log 2>&1 &

[*]运行方式:在任意一台或多台服务器上启动多个实例(或在同一台机器启动多个进程),模拟并发。
[*]验证:启动后观察日志,只有一个线程/进程能成功扣库存,其他线程要么等待要么超时。

日志示例(运行后可能的输出)
订单 1 开始尝试扣库存...
订单 2 开始尝试扣库存...
订单 3 开始尝试扣库存...
...
Lock acquired: /locks/inventory/product-12345   // 订单1 抢到锁
扣减成功!扣减数量: 1,剩余库存: 99
Lock released: /locks/inventory/product-12345
订单 4 开始尝试扣库存...
Lock acquired: /locks/inventory/product-12345   // 订单4 抢到锁
扣减成功!扣减数量: 1,剩余库存: 98
Lock released: /locks/inventory/product-12345
...
最终剩余库存: 90   // 扣了 10 次,库存从 100 -> 90,无超卖与领导者选举的对比总结
方面领导者选举(前例)分布式锁(本例)日志出现频率选举只在启动或 Leader 宕机时触发一次每次业务请求都可能触发抢锁/释放日志(高频)持有锁时间长期(直到宕机)极短(扣库存 50ms + 网络延迟)日志关键词"I am the Leader"、"I am Follower, watching""Lock acquired"、"Lock released"、"Failed to acquire"并发场景所有节点只选一个干活多个节点/线程并发抢同一把锁释放时机通常不释放(宕机自动释放)必须在 finally 中释放,否则死锁典型日志量少(启动 + 宕机时)多(每个订单都有一条 acquire + release) 
生产环境建议

[*]用 Curator:原生 ZooKeeper 实现分布式锁容易出错(比如忘记释放、顺序节点管理复杂),Curator 的 InterProcessMutex 可解决。
[*]可重入性:Curator 锁默认支持可重入(同一线程可多次 acquire)。
[*]公平锁:默认公平(FIFO),避免饥饿。
[*]锁超时:业务设置合理超时,避免长时间阻塞。
[*]监控:监控 ZooKeeper 节点数、Watch 数量、锁竞争频率。
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

高清宁 发表于 2026-1-26 10:30:34

感谢发布原创作品,程序园因你更精彩

湄圳啸 发表于 2026-1-28 04:51:58

收藏一下   不知道什么时候能用到

揉幽递 发表于 2026-1-29 05:49:49

yyds。多谢分享

喳谍 发表于 2026-2-2 19:36:31

感谢,下载保存了

蜴间囝 发表于 2026-2-5 09:31:38

这个好,看起来很实用

接快背 发表于 2026-2-5 11:45:50

感谢发布原创作品,程序园因你更精彩

昆拗干 发表于 2026-2-7 03:05:15

用心讨论,共获提升!

咫噎 发表于 2026-2-7 07:10:54

不错,里面软件多更新就更好了

杭环 发表于 2026-2-7 07:50:43

这个有用。

任佳湍 发表于 2026-2-8 22:10:28

感谢发布原创作品,程序园因你更精彩

皇甫佳文 发表于 2026-2-9 02:08:28

新版吗?好像是停更了吧。

玲液 发表于 2026-2-9 17:07:37

过来提前占个楼

欤夤 发表于 2026-2-10 22:32:09

感谢分享

孟茹云 发表于 2026-2-11 11:43:42

新版吗?好像是停更了吧。

各卧唯 发表于 2026-2-11 16:02:45

这个有用。

遏筒煽 发表于 2026-2-13 10:47:47

很好很强大我过来先占个楼 待编辑

乃阕饯 发表于 2026-2-25 15:34:58

懂技术并乐意极积无私分享的人越来越少。珍惜

陆菊 发表于 2026-3-8 17:42:00

感谢分享

赫连如冰 发表于 2026-3-9 04:56:10

这个有用。
页: [1] 2
查看完整版本: 玩转 ZooKeeper之分布式锁