在传统单机部署的情况下,可以使用Java并发处理相关的API(如ReentrantLcok或synchronized)进行互斥控制。 但是在分布式系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机并发控制锁策略失效,为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁的由来。 当多个进程不在同一个系统中,就需要用分布式锁控制多个进程对资源的访问。
Redis实现分布式锁主要是使用Redis提供的setnx命令,setnx 是『SET if Not eXists』(如果不存在,则 SET)的简写。 命令格式:SETNX key value;使用:只在键 key 不存在的情况下,将键 key 的值设置为 value 。若键 key 已经存在, 则 SETNX 命令不做任何动作。返回值:命令在设置成功时返回 1 ,设置失败时返回 0 ,Redis保证setnx的原子性,所以如果setnx返回1就说明获取到锁,如果0就说明获取锁失败。
主要问题就是超时,例如当业务执行时间大于key的过期时间就可能产生并发问题,同时也可能将其他线程持有的锁误删。
解决方案就是自动续期,当线程成功获得锁后就开启一个WatchDog监听当前线程,如果key超时之前不能完成业务逻辑那就将key的过期时间延长,同时如果持有锁的服务宕机就不自动续期,让Redis将key删除,当然这个功能已经有现成的实现方式了,就是Redisson。
首先引入Redisson依赖
<!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.15.1</version>
</dependency>
实现方式
public class Main {
private static RedissonClient redissonClient;
static {
//添加Redis配置信息
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
redissonClient = Redisson.create(config);
}
public static void main(String[] args) throws InterruptedException {
Thread myThread1 = new MyThread();
Thread myThread2 = new MyThread();
Thread myThread3 = new MyThread();
Thread myThread4 = new MyThread();
myThread1.start();
myThread2.start();
myThread3.start();
myThread4.start();
myThread1.join();
myThread2.join();
myThread3.join();
myThread4.join();
}
static class MyThread extends Thread {
@Override
public void run() {
//初始化锁对象
RLock watchDogLock = redissonClient.getLock("watchDogLock");
for (; ; ) {
//利用Redisson的API成功获取锁时Redisson会自动启动WatchDog监听当前线程,底层实现是通过Lua脚本。
boolean success = watchDogLock.tryLock();
System.out.println(Thread.currentThread().getName() + " 是否成功获取锁:" + success);
if (success) {
System.out.println(Thread.currentThread().getName() + " 开始执行业务逻辑");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + " 业务执行成功,进行解锁");
watchDogLock.unlock();
}
} else {
System.out.println(Thread.currentThread().getName() + " 获取锁失败");
}
}
}
}
}
就是因为临时节点的特性,所以Zookeeper不会遇到和Redis相同的问题,如果客户端运行正常运行就不会删除临时节点,也就不会释放锁,如果客户端宕机那临时节点就会在客户端断开一段时间后自动删除。
代码使用了Curator包,这个包封装了底层Zookeeper的操作,使用起来更加方便,可以有效减少样板代码。
首先引入依赖
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.1.0</version>
</dependency>
代码示例
public class Main {
private static final CuratorFramework ZK_CLIENT;
static {
//初始化Zookeeper配置
String zkServerAddress = "127.0.0.1:12181";
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000);
ZK_CLIENT = CuratorFrameworkFactory.builder()
.connectString(zkServerAddress)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
ZK_CLIENT.start();
}
private static final String LOCK_PATH = "/myLock";
//创建锁对象
private static final InterProcessMutex LOCK = new InterProcessMutex(ZK_CLIENT, LOCK_PATH);
public static void main(String[] args) throws InterruptedException {
Thread myThread1 = new MyThread();
Thread myThread2 = new MyThread();
Thread myThread3 = new MyThread();
Thread myThread4 = new MyThread();
myThread1.start();
myThread2.start();
myThread3.start();
myThread4.start();
myThread1.join();
myThread2.join();
myThread3.join();
myThread4.join();
}
static class MyThread extends Thread {
@Override
public void run() {
for (; ; ) {
try {
//加锁
LOCK.acquire();
System.out.println(Thread.currentThread().getName() + " 获取到锁");
Thread.sleep(500);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
System.out.println(Thread.currentThread().getName() + " 释放锁");
//解锁
LOCK.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
}
Redis和Zookeeper都能够实现分布式锁,但是他们之间有相同点和不同点,使用的时候需要根据需求来选择。
相同点:由于工具封装的都比较好,实现起来都很简单。
不同点:
因篇幅问题不能全部显示,请点此查看更多更全内容