分布式锁
是为了解决在多个jvm下,synchronized失效的问题。
模拟分布式锁
public interface ILock {
boolean tryLock(long timeoutSec);
void unlock();
}
import cn.hutool.core.lang.UUID;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.yaml.snakeyaml.events.Event;
import java.util.concurrent.TimeUnit;
public class SimpleRedisLock implements ILock {
private String name;
private StringRedisTemplate redisTemplate;
public SimpleRedisLock(String name,StringRedisTemplate redisTemplate) {
this.name=name;
this.redisTemplate = redisTemplate;
}
private static final String KEY_PREFIX="lock:";
private static final String ID_PREFIX= UUID.randomUUID().toString(true)+"-";
@Override
public boolean tryLock(long timeoutSec) {
String threadId=ID_PREFIX+ Thread.currentThread().getId();
//使用setnx
Boolean success = redisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success); //防止拆箱空指针
}
@Override
public void unlock() {
String threadId=ID_PREFIX+Thread.currentThread().getId();
String id = redisTemplate.opsForValue().get(KEY_PREFIX + name);
if(id.equals(threadId)) {
//在这里需要判断 存入redis的分布式锁的值 是否与当前的线程id相同 因为如果不判断可能会存在锁误删的问题 比如线程A进入 存入的值是1,但是线程A执行过程中阻塞了 在阻塞的过程中 锁自己释放了 后面线程B来了 B拿到锁 存入的值是2 但是A这时 可以正常执行了 于是没有判断的情况下 把B的锁删掉了 这样出现了问题
redisTemplate.delete(KEY_PREFIX + name);
}
}
}
//用户进行下单的代码
public Result createVoucherOrder(Long voucherId) {
// 5.一人一单
Long userId = UserHolder.getUser().getId();
SimpleRedisLock redisLock = new SimpleRedisLock("order:" + userId, redisTemplate);
boolean isLock = redisLock.tryLock(1200);
if(!isLock){
//获取锁失败
return Result.fail("不允许重复下单");
}
try {
// 5.1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}
// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
return Result.fail("库存不足!");
}
// 7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2.用户id
voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 7.返回订单id
return Result.ok(orderId);
} finally {
//释放锁
redisLock.unlock();
}
}
但是这样设计的分布式锁 依然会存在问题
问题如下:
判断锁标识和释放锁时两个动作,当一个线程A已经判断了是自己的锁,但是在准备释放锁时,阻塞了,可能是jvm垃圾回收影响。被阻塞后,A的锁过期了自动释放了,这时又来一个线程B,线程B获取锁,这时上一个线程A又开始执行了,把线程B的锁给释放了,因此又造成了误删
@Override
public void unlock() {
String threadId=ID_PREFIX+Thread.currentThread().getId();
String id = redisTemplate.opsForValue().get(KEY_PREFIX + name);
if(id.equals(threadId)) {
//线程在这一步阻塞
//删掉锁的代码
redisTemplate.delete(KEY_PREFIX + name);
}
}
解决办法:保证判断锁和删除锁的原子性 LUA脚本
使用LUA脚本首先保证有LUA的环境 并且下载IDEA中的EmmyLua更方便
if(redis.call('get',KEYS[1])== ARGV[1]) then
return redis.call('del',KEYS[1])
end
return 0
判断是否是自己的锁,自己的锁就释放,不是返回0
import cn.hutool.core.lang.UUID;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.yaml.snakeyaml.events.Event;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
public class SimpleRedisLock implements ILock {
private String name;
private StringRedisTemplate redisTemplate;
public SimpleRedisLock(String name,StringRedisTemplate redisTemplate) {
this.name=name;
this.redisTemplate = redisTemplate;
}
private static final String KEY_PREFIX="lock:";
private static final String ID_PREFIX= UUID.randomUUID().toString(true)+"-";
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT=new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
@Override
public boolean tryLock(long timeoutSec) {
String threadId=ID_PREFIX+ Thread.currentThread().getId();
//使用setnx
Boolean success = redisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success); //放置拆箱空指针
}
@Override
public void unlock() {
/**
* 基于lua脚本 解决原子性
Collections里面放的key
第三个参数是argv
*/
redisTemplate.execute(UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX+ Thread.currentThread().getId());
}
}
这样就可以保证原子性了
我们自己实现的redisLock他存在这一定的问题 比如可重入的问题,可重试,超时续约的问题。Redisson都可以帮助我们解决。
Redisson
可重入机制:采用的是hash接口 放入当前线程id的数据和锁重入的次数 通过LUA脚本来保证原子性
可重试:如果获取锁失败了,并不是立刻重试,而是得到释放锁发出的信号,得到信号后再次重试
超时续约问题:每隔一段时间就会重置超时时间
使用Redisson解决上面遇到的误删问题
public Result seckillVoucher(Long voucherId) {
//查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀尚未开始");
}
//判断秒杀是否结束
if(voucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已经结束");
}
//判断库存是否充足
Integer stock = voucher.getStock();
if(stock<1){
return Result.fail("库存不足");
}
return createVoucherOrder(voucherId);
}
public Result createVoucherOrder(Long voucherId) {
// 5.一人一单
Long userId = UserHolder.getUser().getId();
//基于redisson 实现
RLock redisLock = redissonClient.getLock("lock:order:" + userId);
boolean isLock = redisLock.tryLock();
if(!isLock){
//获取锁失败
return Result.fail("不允许重复下单");
}
try {
// 5.1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}
// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
return Result.fail("库存不足!");
}
// 7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2.用户id
voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 7.返回订单id
return Result.ok(orderId);
} finally {
//释放锁
redisLock.unlock();
}
}