首页 > 编程语言 > 基于springboot实现redis分布式锁的方法
2020
11-29

基于springboot实现redis分布式锁的方法

在公司的项目中用到了分布式锁,但只会用却不明白其中的规则
所以写一篇文章来记录
使用场景:交易服务,使用redis分布式锁,防止重复提交订单,出现超卖问题

分布式锁的实现方式

  1. 基于数据库乐观锁/悲观锁
  2. Redis分布式锁(本文)
  3. Zookeeper分布式锁

redis是如何实现加锁的?

在redis中,有一条命令,实现锁

1
SETNX key value

该命令的作用是将 key 的值设为 value ,当且仅当 key 不存在。若给定的 key 已经存在,则 SETNX 不做任何动作。设置成功,返回 1 ;设置失败,返回 0

使用 redis 来实现锁的逻辑就是这样的

线程 1 获取锁  -- > setnx lockKey lockvalue
              -- >  1  获取锁成功
线程 2 获取锁  -- > setnx lockKey lockvalue
              -- >  0  获取锁失败  (继续等待,或者其他逻辑)
线程 1 释放锁  -- >
线程 2 获取锁  -- > setnx lockKey lockvalue
              -- > 1 获取成功

接下来我们将基于springboot实现redis分布式锁

1. 引入redis、springmvc、lombok依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
<?xml version="1.0" encoding="UTF-8"?>
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.4.0</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>
  <groupId>cn.miao.redis</groupId>
  <artifactId>springboot-caffeine-demo</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>springboot-redis-lock-demo</name>
  <description>Demo project for Redis Distribute Lock</description>
 
  <properties>
    <java.version>1.8</java.version>
  </properties>
 
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>
 
    <!--redis-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
      <version>2.1.4.RELEASE</version>
    </dependency>
 
    <!--springMvc-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
      <version>2.3.3.RELEASE</version>
    </dependency>
 
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.12</version>
      <scope>provided</scope>
    </dependency>
 
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>
 
  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>
 
</project>

2. 新建RedisDistributedLock.java并书写加锁解锁逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.connection.ReturnType;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.types.Expiration;
 
import java.nio.charset.StandardCharsets;
 
/**
 * @author miao
 * redis 加锁工具类
 */
@Slf4j
public class RedisDistributedLock {
 
  /**
   * 超时时间
   */
  private static final long TIMEOUT_MILLIS = 15000;
 
  /**
   * 重试次数
   */
  private static final int RETRY_TIMES = 10;
 
  /***
   * 睡眠时间
   */
  private static final long SLEEP_MILLIS = 500;
 
  /**
   * 用来加锁的lua脚本
   * 因为新版的redis加锁操作已经为原子性操作
   * 所以放弃使用lua脚本
   */
  private static final String LOCK_LUA =
      "if redis.call(\"setnx\",KEYS[1],ARGV[1]) == 1 " +
          "then " +
          "  return redis.call('expire',KEYS[1],ARGV[2]) " +
          "else " +
          "  return 0 " +
          "end";
 
  /**
   * 用来释放分布式锁的lua脚本
   * 如果redis.get(KEYS[1]) == ARGV[1],则redis delete KEYS[1]
   * 否则返回0
   * KEYS[1] , ARGV[1] 是参数,我们只调用的时候 传递这两个参数就可以了
   * KEYS[1] 主要用?泶?菰?edis 中用作key值的参数
   * ARGV[1] 主要用来传递在redis中用做 value值的参数
   */
  private static final String UNLOCK_LUA =
      "if redis.call(\"get\",KEYS[1]) == ARGV[1] "
          + "then "
          + "  return redis.call(\"del\",KEYS[1]) "
          + "else "
          + "  return 0 "
          + "end ";
 
  /**
   * 检查 redisKey 是否上锁
   *
   * @param redisKey redisKey
   * @param template template
   * @return Boolean
   */
  public static Boolean isLock(String redisKey, String value, RedisTemplate<Object, Object> template) {
 
    return lock(redisKey, value, template, RETRY_TIMES);
  }
 
  private static Boolean lock(String redisKey,
                String value,
                RedisTemplate<Object, Object> template,
                int retryTimes) {
 
    boolean result = lockKey(redisKey, value, template);
 
    while (!(result) && retryTimes-- > 0) {
      try {
 
        log.debug("lock failed, retrying...{}", retryTimes);
        Thread.sleep(RedisDistributedLock.SLEEP_MILLIS);
      } catch (InterruptedException e) {
 
        return false;
      }
      result = lockKey(redisKey, value, template);
    }
 
    return result;
  }
 
 
  private static Boolean lockKey(final String key,
                  final String value,
                  RedisTemplate<Object, Object> template) {
    try {
 
      RedisCallback<Boolean> callback = (connection) -> connection.set(
          key.getBytes(StandardCharsets.UTF_8),
          value.getBytes(StandardCharsets.UTF_8),
          Expiration.milliseconds(RedisDistributedLock.TIMEOUT_MILLIS),
          RedisStringCommands.SetOption.SET_IF_ABSENT
      );
 
      return template.execute(callback);
    } catch (Exception e) {
 
      log.info("lock key fail because of ", e);
    }
 
    return false;
  }
 
 
  /**
   * 释放分布式锁资源
   *
   * @param redisKey key
   * @param value  value
   * @param template redis
   * @return Boolean
   */
  public static Boolean releaseLock(String redisKey,
                   String value,
                   RedisTemplate<Object, Object> template) {
    try {
      RedisCallback<Boolean> callback = (connection) -> connection.eval(
          UNLOCK_LUA.getBytes(),
          ReturnType.BOOLEAN,
          1,
          redisKey.getBytes(StandardCharsets.UTF_8),
          value.getBytes(StandardCharsets.UTF_8)
      );
 
      return template.execute(callback);
    } catch (Exception e) {
 
      log.info("release lock fail because of ", e);
    }
 
    return false;
  }
 
}

补充:
1. spring-data-redis 有StringRedisTempla和RedisTemplate两种,但是我选择了RedisTemplate,因为他比较万能。他们的区别是:当你的redis数据库里面本来存的是字符串数据或者你要存取的数据就是字符串类型数据的时候,那么你就使用StringRedisTemplate即可, 但是如果你的数据是复杂的对象类型,而取出的时候又不想做任何的数据转换,直接从Redis里面取出一个对象,那么使用RedisTemplate是 更好的选择。
2.  选择lua脚本是因为,脚本运行是原子性的,在脚本运行期间没有客户端可以操作,所以在释放锁的时候用了lua脚本,
而redis最新版加锁时保证了Redis值和自动过期时间的原子性,所用没用lua脚本

3. 创建测试类 TestController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
 
import javax.annotation.Resource;
 
/**
 * @author miao
 */
@RestController
@Slf4j
public class TestController {
 
  @Resource
  private RedisTemplate<Object, Object> redisTemplate;
 
  @PostMapping("/order")
  public String createOrder() throws InterruptedException {
 
    log.info("开始创建订单");
 
    Boolean isLock = RedisDistributedLock.isLock("testLock", "456789", redisTemplate);
 
    if (!isLock) {
 
      log.info("锁已经被占用");
      return "fail";
    } else {
      //.....处理逻辑
    }
 
    Thread.sleep(10000);
    //一定要记得释放锁,否则会出现问题
    RedisDistributedLock.releaseLock("testLock", "456789", redisTemplate);
 
    return "success";
  }
}

4. 使用postman进行测试

5. redis分布式锁的缺点

上面我们说的是redis,是单点的情况。如果是在redis sentinel集群中情况就有所不同了。在redis sentinel集群中,我们具有多台redis,他们之间有着主从的关系,例如一主二从。我们的set命令对应的数据写到主库,然后同步到从库。当我们申请一个锁的时候,对应就是一条命令 setnx mykey myvalue ,在redis sentinel集群中,这条命令先是落到了主库。假设这时主库down了,而这条数据还没来得及同步到从库,sentinel将从库中的一台选举为主库了。这时,我们的新主库中并没有mykey这条数据,若此时另外一个client执行 setnx mykey hisvalue , 也会成功,即也能得到锁。这就意味着,此时有两个client获得了锁。这不是我们希望看到的,虽然这个情况发生的记录很小,只会在主从failover的时候才会发生,大多数情况下、大多数系统都可以容忍,但是不是所有的系统都能容忍这种瑕疵。

6.redis分布式锁的优化

为了解决故障转移情况下的缺陷,Antirez 发明了 Redlock 算法,使用redlock算法,需要多个redis实例,加锁的时候,它会想多半节点发送 setex mykey myvalue 命令,只要过半节点成功了,那么就算加锁成功了。释放锁的时候需要想所有节点发送del命令。这是一种基于【大多数都同意】的一种机制。感兴趣的可以查询相关资料。在实际工作中使用的时候,我们可以选择已有的开源实现,python有redlock-py,java 中有Redisson redlock。

redlock确实解决了上面所说的“不靠谱的情况”。但是,它解决问题的同时,也带来了代价。你需要多个redis实例,你需要引入新的库 代码也得调整,性能上也会有损。所以,果然是不存在“完美的解决方案”,我们更需要的是能够根据实际的情况和条件把问题解决了就好。

我大致讲清楚了redis分布式锁方面的问题(日后如果有新的领悟就继续更新)。更多相关springboot redis分布式锁内容请搜索自学编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持自学编程网!

编程技巧