Hi,大家好,我是编程小6,很荣幸遇见你,我把这些年在开发过程中遇到的问题或想法写出来,今天说一说分布式锁中-基于 Redis 的实现需避坑 - Jedis 篇「建议收藏」,希望能够帮助你!!!。
篇幅太长看着也累,每天进步一点点
欢迎关注公众号「架构染色」交流和学习
分布式锁系列内容规划如下,本篇是第 4 篇:
Redis 应该是目前最受欢迎的高性能的缓存数据库了,在五一期间看到一则 Redis 7.0 发布的消息后,回想起多年前学习黄健宏老师《Redis 从入门到精通》2.x 的月伴时光,不由得感慨 Reids 发展之迅速。搜集了一下 3.0 及之后各版本的知名特性,整理出来方便读者朋友们有个简单了解(感兴趣的朋友还需自行深入研究),情况大致如下:
为满足本篇目标所需,这里着重介绍以下几个关键特性:
Redis 的数据结构(来自网络)
Redis 的集群模式(来自网络)
Redis lua 脚本的工作机制(来自网络)
Redis 的分布式锁正是基于以上特性来实现的,简单来说是:
TTL 机制:用于支撑异常情况下的锁自动释放的能力
顺序变更:用于支撑获取锁和排队等待的能力
集群+主从模式:用于支撑锁服务的高可用
Redis 没有提供对分布式锁亲和的监听机制,需要客户端主动轮询感知数据变更。
使用 Jedis 指令实现分布式锁的核心流程如下图所示:
其中第 2 和第 4 是核心环节,有几个版本的演进很有趣味:
SET key value [EX seconds] [PX milliseconds] [NX|XX]
if(jedis.set(key, lockValue, "NX", "EX", 100) == 1){ //加锁成功
try {
do work //执行业务
//这里缺点什么?
}catch(Exception e){
//...
}finally {
jedis.del(key); //释放锁,这里可能误删其他client的锁key
}
}
解决办法是:加锁时指定的 lockValue 为随机值,每次加锁时的值都是唯一的,释放锁时若 lockValue 与加锁时的值一致才可释放,否则什么都不做,逻辑如下:
if(jedis.set(key, randomLockValue, "NX", "EX", 100) == 1){ //加锁
try {
do something //业务处理
}catch(){
}
finally {
//判断是不是当前线程加的锁,是才释放
//但判断和释放锁两个操作不是原子性的
if (randomLockValue.equals(jedis.get(key))) {
jedis.del(key); //释放锁
}
}
}
以上代码遗留的问题是判断 randomlockValue 和释放锁两个操作不是原子性的。
String script =
"if redis.call('get',KEYS[1]) == ARGV[1] then" +
" return redis.call('del',KEYS[1]) " +
"else" +
" return 0 " +
"end";
Object result = jedis.eval(script, Collections.singletonList(key),
Collections.singletonList(randomLockValue));
if("1".equals(result.toString())){
return true;
}
至此依然存在的一个问题是:若持锁后,业务逻辑执行耗时 超过了 key 的过期时间,则锁 Key 会被 Reids 主动删除。
if(jedis.set(key, randomLockValue, "NX", "EX", 100) == 1){ //加锁成功
try {
do work //执行业务
//watchDog定时延后Key的过期时间
}catch(Exception e){
//...
}finally {
String script =
"if redis.call('get',KEYS[1]) == ARGV[1] then" +
" return redis.call('del',KEYS[1]) " +
"else" +
" return 0 " +
"end";
try {
Object result = jedis.eval(script, Collections.singletonList(key),
Collections.singletonList(randomLockValue));
if("1".equals(result.toString())){
return true;
}
return false;
}catch(Exception e){
//...
}
}
}
可能读者是单篇阅读,这里引入第一篇《分布式锁上-初探》中的一些内容,一个分布式锁应具备这样一些功能特点:
基于上文对 Jedis 分布式锁的介绍,这里简单总结一下 Jedis 的能力矩阵,ZK 请看《分布式锁中-基于 Zookeeper 的实现》,etcd 请看《分布式锁中-基于 etcd 的实现很优雅》 ,表格中标题使用 Redis-简单锁,主要是跟 RedLock 做区分,这种简单锁使用 Jedis 、Lettuce、Redisson 都能实现,任何一把锁的信息只保存在一个 Redis master 实例中,而 RedLock 是 Redisson 提供的高阶分布式锁,它需要客户端同时跟多个 Redis master 实例协作才能完成,即一把锁的信息同时存在于多个 master 实例中。它的情况会在后续文章中补充(感兴趣的读者可以关注本号 【架构染色】 ,文章完成时会主动推送给你)
能力 | ZK | etcd | Redis-简单锁 | Redlock | MySql |
---|---|---|---|---|---|
互斥 | 是 | 是 | 是 | ||
安全 | 链接异常时,session 丢失自动释放锁 | 基于租约,超时自动释放锁 | 基于 TTL,超时自动释放锁 | ||
可用性 | 相对可用性还好 | 好 | 好 | ||
可重入 | 服务端非可重入,本地线程可重入 | 服务端非可重入,Resission本地线程可重入 | 服务端非可重入,本地线程可重入需自研 | ||
加解锁速度 | 速度不算快 | 速度快,GRPC 协议优势以及服务端能力的优势 | 速度快 | ||
阻塞非阻塞 | 客户端两种能力都提供 | jetcd-core 中,阻塞非阻塞由 Future#get 支撑 | Jedis非阻塞,# Redission提供阻塞能力 | ||
公平非公平 | 公平锁 | 公平锁 | 非公平锁,# Redission提供公平锁 | ||
可续期 | 天然支持 | 天然支持 | Jedis需自研 watchDog,Redission自带 | ||
其他因素 | 技术栈偏老,性能不佳 | 多数公司不熟悉 | 容易受业务缓存操作干扰 |
Jedis 是 Redis 官方推出的用于通过 Java 连接 Redis 客户端的一个工具包,提供了 Redis 的各种命令支持。
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.3.0</version>
</dependency>
SetParams params = SetParams.setParams().nx().ex(lockState.getLeaseTTL());
String result = client.set(lockState.getLockKey(), lockState.getLockValue(), params);
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = client.eval(script, 1, lockState.getLockKey(), lockState.getLockValue());
package com.rock.dlock.jedis;
import com.rock.dlock.common.DtLockException;
import com.rock.dlock.common.KeepAliveAction;
import com.rock.dlock.common.KeepAliveTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisPooled;
import redis.clients.jedis.params.SetParams;
import java.net.SocketTimeoutException;
import java.util.concurrent.TimeUnit;
/** * @author zs * @date 2022/11/13 4:44 PM */
public class DemoJedisLock {
private final static Logger log = LoggerFactory.getLogger(DemoJedisLock.class);
private JedisPooled client;
private LockState lockState;
private KeepAliveTask keepAliveTask;
private int sleepMillisecond;
private final static String RESULT_OK = "OK";
private static final Long UNLOCK_SUCCESS = 1L;
class LockState {
private String lockKey;
private String lockValue;
private String errorMsg;
private int leaseTTL;
private long leaseId;
private boolean lockSuccess;
public LockState(String lockKey, int leaseTTL) {
this.lockKey = lockKey;
this.leaseTTL = leaseTTL;
}
public LockState(String lockKey, String value, int leaseTTL) {
this.lockKey = lockKey;
this.lockValue = value;
this.leaseTTL = leaseTTL;
}
public String getLockKey() {
return lockKey;
}
public void setLockKey(String lockKey) {
this.lockKey = lockKey;
}
public String getLockValue() {
return lockValue;
}
public void setLockValue(String lockValue) {
this.lockValue = lockValue;
}
public String getErrorMsg() {
return errorMsg;
}
public void setErrorMsg(String errorMsg) {
this.errorMsg = errorMsg;
}
public long getLeaseId() {
return leaseId;
}
public void setLeaseId(long leaseId) {
this.leaseId = leaseId;
}
public boolean isLockSuccess() {
return lockSuccess;
}
public void setLockSuccess(boolean lockSuccess) {
this.lockSuccess = lockSuccess;
}
public int getLeaseTTL() {
return leaseTTL;
}
public void setLeaseTTL(int leaseTTL) {
this.leaseTTL = leaseTTL;
}
}
public DemoJedisLock(JedisPooled client, String key, String value, int ttlSeconds) {
//1.准备客户端
this.client = client;
this.lockState = new LockState(key, value, ttlSeconds);
this.sleepMillisecond = (ttlSeconds * 1000) / 3; //抢锁的重试间隔可由用户指定
}
public boolean tryLock(long waitTime, TimeUnit waitUnit) throws DtLockException {
long totalMillisSeconds = waitUnit.toMillis(waitTime);
long start = System.currentTimeMillis();
//重试,直到成功或超过指定时间
while (true) {
// 抢锁
try {
SetParams params = SetParams.setParams().nx().ex(lockState.getLeaseTTL());
String result = client.set(lockState.getLockKey(), lockState.getLockValue(), params);
if (RESULT_OK.equals(result)) {
manualKeepAlive();
log.info("[jedis-lock] lock success 线程:{} 加锁成功,key:{} , value:{}", Thread.currentThread().getName(), lockState.getLockKey(), lockState.getLockValue());
lockState.setLockSuccess(true);
return true;
} else {
if (System.currentTimeMillis() - start >= totalMillisSeconds) {
return false;
}
Thread.sleep(sleepMillisecond);
}
} catch (Exception e) {
Throwable cause = e.getCause();
if (cause instanceof SocketTimeoutException) {//忽略网络抖动等异常
}
log.error("[jedis-lock] lock failed:" + e);
throw new DtLockException("[jedis-lock] lock failed:" + e.getMessage(), e);
}
}
}
//此实现中忽略,网络通信异常部分的处理,可参考tryLock
public void unlock() throws DtLockException {
try {
// 首先停止续约
if (keepAliveTask != null) {
keepAliveTask.close();
}
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = client.eval(script, 1, lockState.getLockKey(), lockState.getLockValue());
if (UNLOCK_SUCCESS.equals(result)) {
log.info("[jedis-lock] unlock success 线程 : {} 解锁成功,锁key : {} ,路径:{}", Thread.currentThread().getName(), lockState.getLockKey(), lockState.getLockValue());
} else {
log.info("[jedis-lock] unlock del key failed ,线程 : {} 解锁成功,锁key : {} ,路径:{}", Thread.currentThread().getName(), lockState.getLockKey(), lockState.getLockValue());
}
} catch (Exception e) {
log.error("[jedis-lock] unlock failed:" + e.getMessage(), e);
throw new DtLockException("[jedis-lock] unlock failed:" + e.getMessage(), e);
}
}
// 定时将Key的过期推迟
private void manualKeepAlive() {
final String t_key = lockState.getLockKey();
final int t_ttl = lockState.getLeaseTTL();
keepAliveTask = new KeepAliveTask(new KeepAliveAction() {
@Override
public void run() throws DtLockException {
// 刷新值
try {
client.expire(t_key, t_ttl);
} catch (Exception e) {
e.printStackTrace();
}
}
}, t_ttl);
keepAliveTask.start();
}
}
package com.rock.dlock.common;
public class DtLockException extends RuntimeException{
public DtLockException(String message) {
super(message);
}
public DtLockException(String message, Throwable cause) {
super(message, cause);
}
public static DtLockException clientException(){
return new DtLockException("client is empty");
}
}
package com.rock.dlock.common;
public interface KeepAliveAction {
void run() throws DtLockException;
}
package com.rock.dlock.common;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
/** * @author zs * @date 2022/11/7 4:20 PM */
public class KeepAliveTask extends Thread {
private static final Logger LOGGER = LoggerFactory.getLogger(KeepAliveTask.class);
public volatile boolean isRunning = true;
/** * 过期时间,单位s */
private long ttlSeconds;
private KeepAliveAction action;
public KeepAliveTask(KeepAliveAction action, long ttlSeconds) {
this.ttlSeconds = ttlSeconds;
this.action = action;
this.setDaemon(true);
}
@Override
public void run() {
final long sleep = this.ttlSeconds * 1000 / 3; // 每隔三分之一过期时间,续租一次
while (isRunning) {
try {
// 1、续租,刷新值
action.run();
LOGGER.debug("续租成功!");
TimeUnit.MILLISECONDS.sleep(sleep);
} catch (InterruptedException e) {
close();
} catch (DtLockException e) {
close();
}
}
}
public void close() {
isRunning = false;
this.interrupt();
}
}
import com.rock.dlock.jedis.DemoJedisLock;
import redis.clients.jedis.JedisPooled;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/** * @author zs * @date 2022/11/13 4:51 PM */
public class TestJedisLock {
public static void main(String[] args) {
JedisPooled jedis = new JedisPooled("127.0.0.1", 6379);
DemoJedisLock demoEtcdLock1 = new DemoJedisLock(jedis, "rock", UUID.randomUUID().toString(), 10);
DemoJedisLock demoEtcdLock2 = new DemoJedisLock(jedis, "rock", UUID.randomUUID().toString(), 10);
boolean lock1 = demoEtcdLock1.tryLock(20, TimeUnit.SECONDS);
if (lock1) {
try {
System.out.printf("do something");
} finally {
demoEtcdLock1.unlock();
}
}
demoEtcdLock1.tryLock(20, TimeUnit.SECONDS);
demoEtcdLock2.tryLock(20, TimeUnit.SECONDS);//等待锁,超时后放弃
}
}
通常分布式锁服务会和业务逻辑使用同一个Redis 集群,自然也使用同一个 Jedis 客户端;当业务逻辑侧对 Redis 的读写并发提高时,会给 Redis 集群和 Jedis 客户度带来压力;为应对一些异常情况,我们除了解功能层面的 API,还需要了解一下客户端的一些配置调优,主要是池化管理和网络通信两个方面
在使用 Jedis 时可以配置 JedisPool 连接池,池化处理有许多好处,如:提高响应的速度、降低资源的消耗、方便管理和维护;JedisPool 配置参数大部分是由 JedisPoolConfig 的对应项来赋值的,在生产中我们需要关注它的配置并合理的赋值,如此能够提升 Redis 的服务性能,降低资源开销。下边是对一些重要参数的说明、默认及设置建议:
参数 | 说明 | 默认值 | 建议 |
---|---|---|---|
maxTotal | 资源池中的最大连接数 | 8 | |
maxIdle | 资源池允许的最大空闲连接数 | 8 | |
minIdle | 资源池确保的最少空闲连接数 | 0 | |
blockWhenExhausted | 当资源池用尽后,调用者是否要等待。只有当值为 true 时,下面的maxWaitMillis才会生效。 | true | 建议使用默认值。 |
maxWaitMillis | 当资源池连接用尽后,调用者的最大等待时间(单位为毫秒)。 | -1(表示永不超时) | 不建议使用默认值。 |
testOnBorrow | 向资源池借用连接时是否做连接有效性检测(ping)。检测到的无效连接将会被移除。 | false | 业务量很大时候建议设置为 false,减少一次 ping 的开销。 |
testOnReturn | 向资源池归还连接时是否做连接有效性检测(ping)。检测到无效连接将会被移除。 | false | 业务量很大时候建议设置为 false,减少一次 ping 的开销。 |
jmxEnabled | 是否开启 JMX 监控 | true | 建议开启,请注意应用本身也需要开启。 |
空闲 Jedis 对象的回收检测由以下四个参数组合完成,testWhileIdle是该功能的开关。
名称 | 说明 | 默认值 | 建议 |
---|---|---|---|
testWhileIdle | 是否开启空闲资源检测。 | false | true |
timeBetweenEvictionRunsMillis | 空闲资源的检测周期(单位为毫秒) | -1(不检测) | 建议设置,周期自行选择,也可以默认也可以使用下方JedisPoolConfig 中的配置。 |
minEvictableIdleTimeMillis | 资源池中资源的最小空闲时间(单位为毫秒),达到此值后空闲资源将被移除。 | 180000(即 30 分钟) | 可根据自身业务决定,一般默认值即可,也可以考虑使用下方JeidsPoolConfig中的配置。 |
numTestsPerEvictionRun | 做空闲资源检测时,每次检测资源的个数。 | 3 | 可根据自身应用连接数进行微调,如果设置为 -1,就是对所有连接做空闲监测。 |
通过源码可以发现这些配置是 GenericObjectPoolConfig 对象的属性,这个类实际上是 rg.apache.commons.pool2.impl apache 提供的,也就是说 jedis 的连接池是依托于 apache 提供的对象池来,这个对象池的声明周期如下图,感兴趣的可以看下:
Rsdis 节点故障或者网络抖动时,这两个值如果不合理可能会导致很严重的问题,比如 timeout 设置为 1000,maxRedirect 为 2,一旦出现 redis 连接问题,将会导致请求阻塞 3s 左右。而这个 3 秒的阻塞在可能导致常规业务流量下的线程池耗尽,需根据业务场景调整。
本篇介绍了如何基于 Redis 的特性来实现一个分布式锁,并基于 Jedis 库提供了一个分布式锁的示例,呈现了其关键 API 的用法;此示例尚未达到生产级可用,如异常、可重入、可重试、超时控制等功能都未补全,计划在下一篇介绍完 redlock 之后,再介绍一个健壮的分布式锁客户端要如何抽象设计,如何适配 ZK 、Redis 、etcd 。
分布式锁系列内容规划如下,本篇是第 4 篇:
如果这篇文章对您有所帮助,或者有所启发的话,帮忙扫描下发二维码关注一下,关注公众号:【 架构染色 】,进行交流和学习。您的支持是我坚持写作最大的动力。
分布式锁中-基于 Redis 的实现需避坑 - Jedis 篇 view.inews.qq.com/a/20220211A… cloud.tencent.com/developer/a…