背景

为了保证数据的一致性,在一些业务处理中都会选择加锁来保证数据的一致性。在单机模式下我们通常选择使用synchronized等这种JAVA提供好的jvm锁来实现,但是在集群和分布式情况下,这种jvm级别的锁式无法满足我们的需求,因为一个服务部署在多台服务器上,这些服务器上的jvm是无法通讯的,所以我们需要一种方案来解决分布式情况下数据一致性。

在互联网公司,基本上企业内部都会有自己的一套分布式锁开发框架

前言

分布式锁一般有三种实现方式:

  1. 数据库乐观锁
  2. 基于redis实现分布式锁
  3. 基于zooKeeper实习哪分布式锁

本次讲着重介绍redis实现分布式锁,和与数据库和zooKeeper实习分布式锁的对比。

可靠性

为了保证分布式锁的可用,我们至少要保证锁的实现同时满足以下四个条件:

  1. 互斥性。在任意时刻,只能有一个客户端持有锁
  2. 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁
  3. 具有容错性。只要大部分的redis节点正常运行,客户端就可以加锁和解锁
  4. 解铃还须系铃人。加锁和解锁的对象必须是同一个客户端,客户端不能把别人加的锁给解了。

redis锁演变过程

redis本地部署

1
-Dserver.port=8082

Dashboard复制项目后不同端口号运行

单机不安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public String decuctStock() {
// 从redis中拿到stock的数量
int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stock")));
// stock数量大于零的时候才进行扣减
if (stock > 0){
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减库存成功,剩余库存:" + realStock + " 卖出数量为:" + count.addAndGet(1));
return "扣减库存成功";
}else {
System.out.println("扣减库存失败,库存不足");
return "扣减库存失败,库存不足";
}

}

在高并发情况下,这种写法肯定会导致超卖现象。假如有3个线程同时从redis中读取stock数据,假如这个时候均为200,然后同时判断都大于零,然后进入减库存操作,都减去一,这个时候在写入数据,那么这个时候三个线程下订单结果库存才减了一;

  • jmeter压测

    • 100个线程并发
    • 延迟0秒
    • 循环三次
  • 总库存为200

image-20210901095800868
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
扣减库存成功,剩余库存:199         卖出数量为:1
扣减库存成功,剩余库存:198 卖出数量为:2
扣减库存成功,剩余库存:197 卖出数量为:3
扣减库存成功,剩余库存:196 卖出数量为:4
扣减库存成功,剩余库存:196 卖出数量为:5
扣减库存成功,剩余库存:196 卖出数量为:6
扣减库存成功,剩余库存:195 卖出数量为:7
扣减库存成功,剩余库存:195 卖出数量为:8
.....
扣减库存成功,剩余库存:2 卖出数量为:257
扣减库存成功,剩余库存:1 卖出数量为:258
扣减库存成功,剩余库存:0 卖出数量为:259
扣减库存失败,库存不足
扣减库存失败,库存不足
扣减库存失败,库存不足

单机安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public synchronized String decuctStock() {
// 从redis中拿到stock的数量
int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stock")));
// stock数量大于零的时候才进行扣减
if (stock > 0){
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减库存成功,剩余库存:" + realStock + " 卖出数量为:" + count.addAndGet(1));
return "扣减库存成功";
}else {
System.out.println("扣减库存失败,库存不足");
return "扣减库存失败,库存不足";
}
}

单机压测

  • jmeter压测

    • 100个线程并发
    • 延迟0秒
    • 循环三次
  • 总库存为200

  • redis单机本地部署

image-20210901100029412
1
2
3
4
5
6
7
8
9
10
扣减库存成功,剩余库存:199         卖出数量为:1
扣减库存成功,剩余库存:198 卖出数量为:2
扣减库存成功,剩余库存:197 卖出数量为:3
扣减库存成功,剩余库存:196 卖出数量为:4
......
......
扣减库存成功,剩余库存:3 卖出数量为:197
扣减库存成功,剩余库存:2 卖出数量为:198
扣减库存成功,剩余库存:1 卖出数量为:199
扣减库存成功,剩余库存:0 卖出数量为:200

加入synchronized后能保证单机的数据一致性,但是当这个服务是集群部署的时候,加上负载均衡,jvm级别的锁不能保证线程安全

集群压测

  • jmeter压测
    • 100个线程并发
    • 延迟0秒
    • 循环4次
  • 总库存为200
  • redis单机本地部署
  • 三个服务集群
  • geteway(本地部署)轮询负载均衡
image-20210901104535887
1
2
3
4
5
6
7
8
9
10
11
// 第一台服务
扣减库存成功,剩余库存:0 卖出数量为:122
扣减库存失败,库存不足
// 第二台服务
扣减库存成功,剩余库存:0 卖出数量为:115
扣减库存失败,库存不足
// 第三台服务
扣减库存成功,剩余库存:0 卖出数量为:95
扣减库存失败,库存不足

//总卖出数量为:122+115+95=332

优势:单机情况下能够保证数据一致性

劣势:不能保证在分布式情况下的数据一致性

集群安全之redis的SETNX实现

redis是原子性,可以使用redis api的setnx方法解决。把所有的线程并行请求转为redis中的串行请求。

Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。

1
2
3
4
5
6
7
8
9
10
11
redis> EXISTS job                # job 不存在
(integer) 0

redis> SETNX job "programmer" # job 设置成功
(integer) 1

redis> SETNX job "code-farmer" # 尝试覆盖 job ,失败
(integer) 0

redis> GET job # 没有被覆盖
"programmer"

思想:

所有请求先去到redis中设置相同的key,只有设置成功的那个线程才能执行下面的业务逻辑,当业务逻辑执行完成后删除key,然后其他线程能够再次拿到key,执行。

基础

集群压测

  • jmeter压测
    • 100个线程并发
    • 延迟0秒
    • 循环4次
  • 总库存为200
  • redis单机本地部署
  • 三个服务集群
  • geteway(本地部署)轮询负载均衡
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Override
public String decuctStock() {
//当先线程设置redis锁
Boolean isExist = stringRedisTemplate.opsForValue().setIfAbsent("stocklock", "");
// 如果没有拿到锁,怎返回,或者不断重试
if (isExist != null && !isExist){
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();

}
return decuctStock();
}else if (isExist == null){
return "程序错误";
}

// 从redis中拿到stock的数量
int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stock")));
// stock数量大于零的时候才进行扣减
if (stock > 0){
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减库存成功,剩余库存:" + realStock + " 卖出数量为:" + count.addAndGet(1));
// 删除锁
stringRedisTemplate.delete("stocklock");
return "扣减库存成功";
}else {
stringRedisTemplate.delete("stocklock");
System.out.println("扣减库存失败,库存不足");
return "扣减库存失败,库存不足";
}
}
image-20210901111405543
1
2
3
4
5
6
7
8
9
10
// 第一台服务
扣减库存成功,剩余库存:0 卖出数量为:94
扣减库存失败,库存不足
// 第二台服务
扣减库存成功,剩余库存:1 卖出数量为:60
扣减库存失败,库存不足
// 第三台服务
扣减库存成功,剩余库存:9 卖出数量为:46
扣减库存失败,库存不足
//总卖出数量为:94+60+46=200

总卖出数量为200,符合我们的预期

不足

这样看来我们的程序是不是达到了分布式锁的目的,在分布式条件下,我们的库存和我们预想的情况下一样。但是我们在看一下上面保证分布式情况下的可靠性条件

1
2
3
4
5
6
7
为了保证分布式锁的`可用`,我们至少要保证锁的实现同时满足以下四个条件:

1. 互斥性。在任意时刻,只能有一个客户端持有锁
2. 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁
3. 具有容错性。只要大部分的redis节点正常运行,客户端就可以加锁和解锁
4. 解铃还须系铃人。加锁和解锁的对象必须是同一个客户端,客户端不能把别人加的锁给解了。

互斥性问题解决了,然后看我的程序会不会发生死锁

如果在运行的过程中,突然一台机器宕机会发生什么

为了模拟上述情况,更改代码,主要在删除stocklock前,设置一个长时间的休眠,方便我们能够宕机一个服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Override
public String decuctStock() {
//当先线程设置redis锁
Boolean isExist = stringRedisTemplate.opsForValue().setIfAbsent("stocklock", "stocklock");
// 如果没有拿到锁,怎返回,或者不断重试
if (isExist != null && !isExist){
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();

}
return decuctStock();
}else if (isExist == null){
return "程序错误";
}

// 从redis中拿到stock的数量
int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stock")));
// stock数量大于零的时候才进行扣减
if (stock > 0){
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减库存成功,剩余库存:" + realStock + " 卖出数量为:" + count.addAndGet(1));
// 删除锁
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
stringRedisTemplate.delete("stocklock");
return "扣减库存成功";
}else {
// 删除锁
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
stringRedisTemplate.delete("stocklock");
System.out.println("扣减库存失败,库存不足");
return "扣减库存失败,库存不足";
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 第一台服务
扣减库存成功,剩余库存:199 卖出数量为:1
扣减库存成功,剩余库存:195 卖出数量为:2
扣减库存成功,剩余库存:193 卖出数量为:3
扣减库存成功,剩余库存:192 卖出数量为:4
扣减库存成功,剩余库存:188 卖出数量为:5
// 第二台服务
扣减库存成功,剩余库存:197 卖出数量为:1
扣减库存成功,剩余库存:194 卖出数量为:2
扣减库存成功,剩余库存:191 卖出数量为:3
扣减库存成功,剩余库存:189 卖出数量为:4

// 第三台服务
扣减库存成功,剩余库存:198 卖出数量为:1
扣减库存成功,剩余库存:196 卖出数量为:2
扣减库存成功,剩余库存:190 卖出数量为:3

//在运行过程中把第一台服务器停止,然后整个项目陷入一直死锁状态,控制台没有打印

查看redis中的key,发现stocklock存在,说明刚刚在停止第一台服务器的时候,这个key没有删除,所以造成了整个项目的死锁

image-20210901142008719

改进一

基础版的主要问题是如果一个客户端在持有锁的过程中因为种种问题,没有主动解锁,那么整个项目将发生死锁。

所以在改进一种可以设置一个超时时间,如果客户端在持有锁的过程中宕机,那么可以等待过了超时时间后,解除死锁。

1
2
//设置锁的时候,添加一个超时时间
Boolean isExist = stringRedisTemplate.opsForValue().setIfAbsent("stocklock", "stocklock",5, TimeUnit.SECONDS);

image-20210901144358811

这样即时一个服务在持有锁的情况下宕机了,没有即时删除key,仍有一个过期时间,在过期时间到了后,整个项目仍然可以正常运行。

改进二

着看改进一就没有问题了吗,如果在程序运行过程中,一个程序因为未知原因(cpu卡顿,业务逻辑卡顿,业务逻辑中的网络卡顿等等),在过期时间内还没有运行完成,然后这个时候key过期了。这个时候就想当与锁释放了,然后第二个线程就拿到了锁,然后线程进来了,然后在第二个线程运行过程中,第一个线程删除了这个key,这个时候删除了就是第二个线程的key,然后如果整个项目这样一直迭代,那么肯定会出现大量的超卖现象。

之前的方案:

  • 可以保证互斥性
  • 可以保证不能发生死锁
  • 但是不能保证解铃还须系铃人

第一个线程把第二个线程的锁释放了。

所以,再次改进,然后想到了CAS中的ABA问题,CAS的解决方案是添加了一个线程id,这个线程在一个jvm中是唯一的,在改变值的时候不仅要判断值是否想当,还需要判断这个线程id是否相等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public String decuctStock() {
//当先线程设置redis锁
// 在这只value的时候,添加了额一个uuid,保证唯一性,只有当前线程能够释放
String clientUuid = UUID.randomUUID().toString();
Boolean isExist = stringRedisTemplate.opsForValue().setIfAbsent("stocklock", clientUuid,5, TimeUnit.SECONDS);
// 如果没有拿到锁,怎返回,或者不断重试
if (isExist != null && !isExist) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();

}
return decuctStock();
} else if (isExist == null) {
return "程序错误";
}

// 从redis中拿到stock的数量
int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stock")));
// stock数量大于零的时候才进行扣减
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减库存成功,剩余库存:" + realStock + " 卖出数量为:" + count.addAndGet(1));
// 删除锁
if (clientUuid.equals(stringRedisTemplate.opsForValue().get("stocklock")) ){
stringRedisTemplate.delete("stocklock");
}

return "扣减库存成功";
} else {
// 删除锁
if (clientUuid.equals(stringRedisTemplate.opsForValue().get("stocklock")) ){
stringRedisTemplate.delete("stocklock");
}
System.out.println("扣减库存失败,库存不足");
return "扣减库存失败,库存不足";
}
}

但是这样真的能够解决了吗,如果是在判断这两个值相等后,这个缓存时间过期了,然后第二个线程拿到了这个锁,然后在执行的过程中,第一个线程删除了这把锁。

这样不是还有上述问题。

所以我们必须要保证删除锁的过程中的原子性:

lua脚本

一下lua脚本不确定是否正确:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19


local key = "rate.limit:" .. KEYS[1]
local limit = tonumber(ARGV[1])
local expire_time = ARGV[2]

local is_exists = redis.call("EXISTS", key)
if is_exists == 1 then
if redis.call("INCR", key) > limit then
return 0
else
return 1
end
else
redis.call("SET", key, 1)
redis.call("EXPIRE", key, expire_time)
return 1
end

1
2
3
4
5
6
7
8
9
10
11
12
13
private boolean accessLimit(String ip, int limit, int timeout, Jedis connection) throws IOException {
List<String> keys = Collections.singletonList(ip);
List<String> argv = Arrays.asList(String.valueOf(limit), String.valueOf(timeout));

return 1 == (long) connection.eval(loadScriptString("script.lua"), keys, argv);
}

// 加载Lua代码
private String loadScriptString(String fileName) throws IOException {
Reader reader = new InputStreamReader(Client.class.getClassLoader().getResourceAsStream(fileName));
return CharStreams.toString(reader);
}

lua脚本能够保证redis执行的原子性

这样能够解决解铃还须系铃人问题,但是还有一个问题没有解决,那么就是如果程序运行时间超过了设定的过期时间,第二个线程拿到了这个锁,虽然第一个线程不能删除第二个删除的锁,但是第一个线程可以修改这个库存值,这样还是会造成一个超卖问题。

存在问题:

  1. 非高可用
  2. 不支持阻塞和非阻塞

改进三

这个可以设置一个监听线程,如果线程在这个锁三份之一的时间内还有没完成业务逻辑,那么就为这把锁续命延长时间,在延长一定次数后,直接回滚,放弃本次操作

但是这样,为造成我们编写程序的复杂性,每次这样的业务逻辑都要写这么多的代码吗?所以引申出来了一个分布式redisson框架

redisson优势

  • redisson所有指令都通过lua脚本执行,redis支持lua脚本原子执行
  • rediss设置key默认时间为30s,如果一个客户端持有锁的时间超过了30秒呢?
    • redison中有watchdog(监听器),在你获取锁后,每隔10秒后帮你把key的超时时间设为30s,一直续期
    • 如果不想一直续期,可以不使用默认,设置过期时间,然后要设置回滚
  • redisson的的监听器保证了没有死锁发生
    • 如果及其宕机了,他也就不会自动续期了,过期后其他线程就可以拿到锁

使用

1
2
3
4
5
6
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.6.5</version>
</dependency>

1
2
3
4
5
6
7
8
@Bean
public Redisson redisson () {
Config cfg = new Config();
// 可以设置redis集群
cfg.useSingleServer().setAddress("redis://localhost:6379").setDatabase(0);
return (Redisson) Redisson.create(cfg);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
    @Override
// 添加事务,如果中间遇到卡段,一直在运行,在超过30秒后redisson释放锁并抛出异常,然后@Transactional检测到异常事务后回滚
@Transactional(rollbackFor = Exception.class)
public String decuctStock() {
RLock stocklock = redisson.getLock("stocklock");
String returnResult = "";

try {
//设置加锁只有30秒,超过30秒则放弃
stocklock.lock(30,TimeUnit.SECONDS);
int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stock")));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减库存成功,剩余库存:" + realStock + " 卖出数量为:" + count.addAndGet(1));
returnResult = "扣减库存成功";

} else {
System.out.println("扣减库存失败,库存不足");
returnResult = "扣减库存失败,库存不足";
}

} catch (InterruptedException e) {
e.printStackTrace();
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
} finally {
if (stocklock != null && stocklock.isHeldByCurrentThread()) {
stocklock.unlock();
}
}
return returnResult;
}

集群压测

  • jmeter压测
    • 100个线程并发
    • 延迟0秒
    • 循环4次
  • 总库存为200
  • redis单机本地部署
  • 三个服务集群
  • geteway(本地部署)轮询负载均衡
  • redisson分布式锁框架

image-20210901162657634

1
2
3
4
5
6
7
8
9
10
// 第一台服务
扣减库存成功,剩余库存:1 卖出数量为:70
扣减库存失败,库存不足
// 第二台服务
扣减库存成功,剩余库存:0 卖出数量为:60
扣减库存失败,库存不足
// 第三台服务
扣减库存成功,剩余库存:2 卖出数量为:70
扣减库存失败,库存不足
//总卖出数量为:70+60+70=200

redisson分布式锁的实现原理

Redis如何实现高并发分布式锁?

但是这个架构还是存在问题的,因为redis服务器是主从的架构,当在master节点设置锁之后,slave节点会立刻同步。但是如果刚在master节点设置上了锁,slave节点还没来得及设置,master节点就挂掉了。还是会产生上同样的问题,新的线程获得锁。

redisson有解决方案(redlock),但是解决方案存在争议

redis分布式锁和zookeeper分布式锁

redis

redis实现分布式锁是AP高可用

redis在向主节点加锁后,可以直接返回给客户端,性能较高

redis适合在单机情况下,在主从模式下Redis作者antirez提出了RedLock算法

这个场景是假设有一个redis cluster,有5个redis master实例。然后执行如下步骤获取一把锁:

  • 获取当前时间戳,单位是毫秒

  • 轮流尝试在每个master节点上创建锁,过期时间较短,一般就几十毫秒

  • 尝试在大多数节点上建立一个锁,比如5个节点就要求是3个节点(n / 2 +1)

  • 客户端计算建立好锁的时间,如果建立锁的时间小于超时时间,就算建立成功了

  • 要是锁建立失败了,那么就依次删除这个锁

  • 只要别人建立了一把分布式锁,你就得不断轮询去尝试获取锁

img

但是不推荐使用redlock,还有写bug。如果非要保证强一致行,可以考虑使用zookeeper,如果为了高可用,高性能,允许少量的数据不一致(这个可以通过后期log日志维护),可以使用redis。

zookeeper

zookeeper实现分布式锁是CP强一致

zookeeper想主节点加锁后,会通知从节点,只有一半以上的节点都加锁成功后,才会返回给客户端

redisson(redis)分布式锁和curator(zookeeper)分布式锁对比

通过jmeter压测发现,在同样的并发请求,redis本地单机,zookeeper本地单机的情况下,redis的300线程的200库存量用时平均在3秒左右,而zookeeper的300线程的200库存量用时平均在28秒左右。

可以明显看的出来,针对本次的300线程200库存,redis单机和zookeeper单机

对比 zookeeper redis
时间 28秒 3秒
CAP CP AP
性能 较低
可靠 较低

工具对比

  1. redisTemplate是基于某个具体实现的再封装,比如说springBoot1.x时,具体实现是jedis;而到了springBoot2.x时,具体实现变成了lettuce。封装的好处就是隐藏了具体的实现,使调用更简单,但是有人测试过jedis效率要10-30倍的高于redisTemplate的执行效率,所以单从执行效率上来讲,jedis完爆redisTemplate。redisTemplate的好处就是基于springBoot自动装配的原理,使得整合redis时比较简单。

  2. jedis作为老牌的redis客户端,采用同步阻塞式IO,采用线程池时是线程安全的。优点是简单、灵活、api全面,缺点是某些redis高级功能需要自己封装。

  3. lettuce作为新式的redis客户端,基于netty采用异步非阻塞式IO,是线程安全的,优点是提供了很多redis高级功能,例如集群、哨兵、管道等,缺点是api抽象,学习成本高。lettuce好是好,但是jedis比他生得早。

  4. redission作为redis的分布式客户端,同样基于netty采用异步非阻塞式IO,是线程安全的,优点是提供了很多redis的分布式操作和高级功能,缺点是api抽象,学习成本高

面试题:

你们项目中在分布式中如何保证线程安全

zookeeper和redis如何选择

https://zhuanlan.zhihu.com/p/106333054

https://www.zhihu.com/question/300767410/answer/1698980571

参考文章:

https://blog.csdn.net/belongtocode/article/details/102212078

https://www.cnblogs.com/moxiaotao/p/10829799.html

https://blog.csdn.net/qq_40925189/article/details/109580439