redis分布式锁


分布式锁

是为了解决在多个jvm下,synchronized失效的问题。

模拟分布式锁

public interface ILock {
    boolean tryLock(long timeoutSec);

    void unlock();
}
import cn.hutool.core.lang.UUID;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.yaml.snakeyaml.events.Event;
import java.util.concurrent.TimeUnit;

public class SimpleRedisLock implements ILock {
    private String name;
    private StringRedisTemplate redisTemplate;
    
    public SimpleRedisLock(String name,StringRedisTemplate redisTemplate) {
        this.name=name;
        this.redisTemplate = redisTemplate;
    }

    private static final String KEY_PREFIX="lock:";
    private static final String ID_PREFIX= UUID.randomUUID().toString(true)+"-";

    @Override
    public boolean tryLock(long timeoutSec) {
        String threadId=ID_PREFIX+ Thread.currentThread().getId();
        //使用setnx
        Boolean success = redisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);  //防止拆箱空指针
    }

    @Override
    public void unlock() {
        String threadId=ID_PREFIX+Thread.currentThread().getId();
        String id = redisTemplate.opsForValue().get(KEY_PREFIX + name);
        if(id.equals(threadId)) {
            //在这里需要判断 存入redis的分布式锁的值  是否与当前的线程id相同  因为如果不判断可能会存在锁误删的问题  比如线程A进入  存入的值是1,但是线程A执行过程中阻塞了 在阻塞的过程中 锁自己释放了 后面线程B来了  B拿到锁 存入的值是2 但是A这时 可以正常执行了 于是没有判断的情况下  把B的锁删掉了  这样出现了问题
            redisTemplate.delete(KEY_PREFIX + name);
        }
    }
}
//用户进行下单的代码
public Result createVoucherOrder(Long voucherId) {
    // 5.一人一单
    Long userId = UserHolder.getUser().getId();
    SimpleRedisLock redisLock = new SimpleRedisLock("order:" + userId, redisTemplate);
    boolean isLock = redisLock.tryLock(1200);
    if(!isLock){
        //获取锁失败
        return Result.fail("不允许重复下单");
    }

    try {
        // 5.1.查询订单
        int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
        // 5.2.判断是否存在
        if (count > 0) {
            // 用户已经购买过了
            return Result.fail("用户已经购买过一次!");
        }
        // 6.扣减库存
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock - 1") // set stock = stock - 1
                .eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
                .update();
        if (!success) {
            // 扣减失败
            return Result.fail("库存不足!");
        }
        // 7.创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        // 7.1.订单id
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        // 7.2.用户id
        voucherOrder.setUserId(userId);
        // 7.3.代金券id
        voucherOrder.setVoucherId(voucherId);
        save(voucherOrder);
        // 7.返回订单id
        return Result.ok(orderId);
    } finally {
        //释放锁
        redisLock.unlock();
    }
}

但是这样设计的分布式锁 依然会存在问题

问题如下:
判断锁标识和释放锁时两个动作,当一个线程A已经判断了是自己的锁,但是在准备释放锁时,阻塞了,可能是jvm垃圾回收影响。被阻塞后,A的锁过期了自动释放了,这时又来一个线程B,线程B获取锁,这时上一个线程A又开始执行了,把线程B的锁给释放了,因此又造成了误删

 @Override
    public void unlock() {
        String threadId=ID_PREFIX+Thread.currentThread().getId();
        String id = redisTemplate.opsForValue().get(KEY_PREFIX + name);
        if(id.equals(threadId)) {
            //线程在这一步阻塞
            //删掉锁的代码  
            redisTemplate.delete(KEY_PREFIX + name);
        }
    }

解决办法:保证判断锁和删除锁的原子性 LUA脚本

使用LUA脚本首先保证有LUA的环境 并且下载IDEA中的EmmyLua更方便

if(redis.call('get',KEYS[1])== ARGV[1]) then
   return redis.call('del',KEYS[1])
end
return 0

判断是否是自己的锁,自己的锁就释放,不是返回0

import cn.hutool.core.lang.UUID;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.yaml.snakeyaml.events.Event;
import java.util.Collections;
import java.util.concurrent.TimeUnit;


public class SimpleRedisLock implements ILock {

    private String name;
    private StringRedisTemplate redisTemplate;

    public SimpleRedisLock(String name,StringRedisTemplate redisTemplate) {
        this.name=name;
        this.redisTemplate = redisTemplate;
    }

    private static final String KEY_PREFIX="lock:";
    private static final String ID_PREFIX= UUID.randomUUID().toString(true)+"-";
    private static final DefaultRedisScript<Long>  UNLOCK_SCRIPT;
    static {
        UNLOCK_SCRIPT=new DefaultRedisScript<>();
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
        UNLOCK_SCRIPT.setResultType(Long.class);
    }

    @Override
    public boolean tryLock(long timeoutSec) {
        String threadId=ID_PREFIX+ Thread.currentThread().getId();
        //使用setnx
        Boolean success = redisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);  //放置拆箱空指针

    }

    @Override
    public void unlock() {
        /**
         * 基于lua脚本  解决原子性
         Collections里面放的key  
         第三个参数是argv
         */
        redisTemplate.execute(UNLOCK_SCRIPT,
                Collections.singletonList(KEY_PREFIX + name),
                ID_PREFIX+ Thread.currentThread().getId());

    }

}

这样就可以保证原子性了

我们自己实现的redisLock他存在这一定的问题 比如可重入的问题,可重试,超时续约的问题。Redisson都可以帮助我们解决。

Redisson


可重入机制:采用的是hash接口 放入当前线程id的数据和锁重入的次数 通过LUA脚本来保证原子性

可重试:如果获取锁失败了,并不是立刻重试,而是得到释放锁发出的信号,得到信号后再次重试

超时续约问题:每隔一段时间就会重置超时时间

使用Redisson解决上面遇到的误删问题

public Result seckillVoucher(Long voucherId) {
    //查询优惠券
    SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
    //判断秒杀是否开始
    if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
        return Result.fail("秒杀尚未开始");
    }
    //判断秒杀是否结束
    if(voucher.getEndTime().isBefore(LocalDateTime.now())){
        return Result.fail("秒杀已经结束");
    }
    //判断库存是否充足
    Integer stock = voucher.getStock();
    if(stock<1){
        return Result.fail("库存不足");
    }
    return createVoucherOrder(voucherId);
}



public Result createVoucherOrder(Long voucherId) {
    // 5.一人一单
    Long userId = UserHolder.getUser().getId();
    //基于redisson 实现
    RLock redisLock = redissonClient.getLock("lock:order:" + userId);
    boolean isLock = redisLock.tryLock();

    if(!isLock){
        //获取锁失败
        return Result.fail("不允许重复下单");
    }
    try {
        // 5.1.查询订单
        int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
        // 5.2.判断是否存在
        if (count > 0) {
            // 用户已经购买过了
            return Result.fail("用户已经购买过一次!");
        }
        // 6.扣减库存
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock - 1") // set stock = stock - 1
                .eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
                .update();
        if (!success) {
            // 扣减失败
            return Result.fail("库存不足!");
        }
        // 7.创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        // 7.1.订单id
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        // 7.2.用户id
        voucherOrder.setUserId(userId);
        // 7.3.代金券id
        voucherOrder.setVoucherId(voucherId);
        save(voucherOrder);

        // 7.返回订单id
        return Result.ok(orderId);
    } finally {
        //释放锁
        redisLock.unlock();
    }
}

文章作者: 蛰伏
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 蛰伏 !
  目录