分布式锁starter+自定义注解
项目在多个微服务中都需要进行幂等操作,我们把锁相关的操作封装在了分布式锁的组件中。这个分布式锁是基于Redisson+Redis+自定义注解+AOP 切面
首先在pom文件中引入相关依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.mifu</groupId>
<artifactId>nft-turbo-common</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<groupId>cn.mifu</groupId>
<artifactId>nft-turbo-lock</artifactId>
<description>分布式锁组件</description>
<version>1.0.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- Redis -->
<dependency>
<groupId>cn.mifu</groupId>
<artifactId>nft-turbo-cache</artifactId>
</dependency>
</dependencies>
</project>
在nft-turbo-cache已经引入了redis的依赖
<!-- Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.24.3</version>
</dependency>
定义切面类和注解
为什么要用@Order(Integer.Min_VALUE + 1),这里的作用是把分布式锁这个切面的优先级提高,保证分布式锁的控制范围覆盖整个业务逻辑及其他切面的处理过程,确保锁的有效性。比如在日志切面前置之前,日志切面后置之后。
分布式锁切面前置通知(获取锁)→ 日志切面前置通知 → 业务逻辑 → 日志切面后置通知 → 分布式锁切面后置通知(释放锁)
/**
* 分布式锁切面
*
* @author mifuRD
*/
@Aspect
@Component
@Order(Integer.MIN_VALUE + 1)
public class DistributeLockAspect {
}
@Aspect:表示这个类是一个切面类,它封装了交叉关注点的逻辑。
@Component:将这个类声明为Spring管理的bean,以便Spring AOP可以扫描到这个切面。
定义注解@DistributeLock
/**
* 分布式锁注解
*
* @author mifuRD
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistributeLock {
/**
* 锁的场景
*
* @return
*/
public String scene();
/**
* 加锁的key,优先取key(),如果没有,则取keyExpression()
*
* @return
*/
public String key() default DistributeLockConstant.NONE_KEY;
/**
* SPEL表达式:
* <pre>
* #id
* #insertResult.id
* </pre>
*
* @return
*/
public String keyExpression() default DistributeLockConstant.NONE_KEY;
/**
* 超时时间,毫秒
* 默认情况下不设置超时时间,会自动续期
*
* @return
*/
public int expireTime() default DistributeLockConstant.DEFAULT_EXPIRE_TIME;
/**
* 加锁等待时长,毫秒
* 默认情况下不设置等待时长,不做等待
* @return
*/
public int waitTime() default DistributeLockConstant.DEFAULT_WAIT_TIME;
}
自定义的一个注解,用来标记需要分布式锁的方法
编写切面逻辑
通过@Around通知来围绕目标方法执行,在方法执行之前和之后插入锁的逻辑
/**
* 分布式锁切面
*
* @author mifuRD
*/
@Aspect
@Component
public class DistributeLockAspect {
private RedissonClient redissonClient;
public DistributeLockAspect(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
private static final Logger LOG = LoggerFactory.getLogger(DistributeLockAspect.class);
@Around("@annotation(cn.mifu.nft.turbo.lock.DistributeLock)")
public Object process(ProceedingJoinPoint pjp) throws Exception {
Object response = null;
Method method = ((MethodSignature) pjp.getSignature()).getMethod();
DistributeLock distributeLock = method.getAnnotation(DistributeLock.class);
String key = distributeLock.key();
if (DistributeLockConstant.NONE_KEY.equals(key)) {
if (DistributeLockConstant.NONE_KEY.equals(distributeLock.keyExpression())) {
throw new DistributeLockException("no lock key found...");
}
SpelExpressionParser parser = new SpelExpressionParser();
Expression expression = parser.parseExpression(distributeLock.keyExpression());
EvaluationContext context = new StandardEvaluationContext();
// 获取参数值
Object[] args = pjp.getArgs();
// 获取运行时参数的名称
StandardReflectionParameterNameDiscoverer discoverer
= new StandardReflectionParameterNameDiscoverer();
String[] parameterNames = discoverer.getParameterNames(method);
// 将参数绑定到context中
if (parameterNames != null) {
for (int i = 0; i < parameterNames.length; i++) {
context.setVariable(parameterNames[i], args[i]);
}
}
// 解析表达式,获取结果
key = String.valueOf(expression.getValue(context));
}
String scene = distributeLock.scene();
String lockKey = scene + "#" + key;
int expireTime = distributeLock.expireTime();
int waitTime = distributeLock.waitTime();
RLock rLock = redissonClient.getLock(lockKey);
boolean lockResult = false;
if (waitTime == DistributeLockConstant.DEFAULT_WAIT_TIME) {
if (expireTime == DistributeLockConstant.DEFAULT_EXPIRE_TIME) {
LOG.info(String.format("lock for key : %s", lockKey));
rLock.lock();
} else {
LOG.info(String.format("lock for key : %s , expire : %s", lockKey, expireTime));
rLock.lock(expireTime, TimeUnit.MILLISECONDS);
}
lockResult = true;
} else {
if (expireTime == DistributeLockConstant.DEFAULT_EXPIRE_TIME) {
LOG.info(String.format("try lock for key : %s , wait : %s", lockKey, waitTime));
lockResult = rLock.tryLock(waitTime, TimeUnit.MILLISECONDS);
} else {
LOG.info(String.format("try lock for key : %s , expire : %s , wait : %s", lockKey, expireTime, waitTime));
lockResult = rLock.tryLock(waitTime, expireTime, TimeUnit.MILLISECONDS);
}
}
if (!lockResult) {
LOG.warn(String.format("lock failed for key : %s , expire : %s", lockKey, expireTime));
throw new DistributeLockException("acquire lock failed... key : " + lockKey);
}
try {
LOG.info(String.format("lock success for key : %s , expire : %s", lockKey, expireTime));
response = pjp.proceed();
} catch (Throwable e) {
throw new Exception(e);
} finally {
rLock.unlock();
LOG.info(String.format("unlock for key : %s , expire : %s", lockKey, expireTime));
}
return response;
}
}
当key为NONE_KEY时,通过SpEL表达式,需要解析该表达式获得最终的key
if (DistributeLockConstant.NONE_KEY.equals(key)) {
if (DistributeLockConstant.NONE_KEY.equals(distributeLock.keyExpression())) {
throw new DistributeLockException("no lock key found...");
}
SpelExpressionParser parser = new SpelExpressionParser();
Expression expression = parser.parseExpression(distributeLock.keyExpression());
EvaluationContext context = new StandardEvaluationContext();
// 获取参数值
Object[] args = pjp.getArgs();
// 获取运行时参数的名称
StandardReflectionParameterNameDiscoverer discoverer
= new StandardReflectionParameterNameDiscoverer();
String[] parameterNames = discoverer.getParameterNames(method);
// 将参数绑定到context中
if (parameterNames != null) {
for (int i = 0; i < parameterNames.length; i++) {
context.setVariable(parameterNames[i], args[i]);
}
}
// 解析表达式,获取结果
key = String.valueOf(expression.getValue(context));
}
尝试获取锁
int expireTime = distributeLock.expireTime();
int waitTime = distributeLock.waitTime();
RLock rLock = redissonClient.getLock(lockKey);
boolean lockResult = false;
if (waitTime == DistributeLockConstant.DEFAULT_WAIT_TIME) {
if (expireTime == DistributeLockConstant.DEFAULT_EXPIRE_TIME) {
LOG.info(String.format("lock for key : %s", lockKey));
rLock.lock();
} else {
LOG.info(String.format("lock for key : %s , expire : %s", lockKey, expireTime));
rLock.lock(expireTime, TimeUnit.MILLISECONDS);
}
lockResult = true;
} else {
if (expireTime == DistributeLockConstant.DEFAULT_EXPIRE_TIME) {
LOG.info(String.format("try lock for key : %s , wait : %s", lockKey, waitTime));
lockResult = rLock.tryLock(waitTime, TimeUnit.MILLISECONDS);
} else {
LOG.info(String.format("try lock for key : %s , expire : %s , wait : %s", lockKey, expireTime, waitTime));
lockResult = rLock.tryLock(waitTime, expireTime, TimeUnit.MILLISECONDS);
}
}
if (!lockResult) {
LOG.warn(String.format("lock failed for key : %s , expire : %s", lockKey, expireTime));
throw new DistributeLockException("acquire lock failed... key : " + lockKey);
}
try {
LOG.info(String.format("lock success for key : %s , expire : %s", lockKey, expireTime));
response = pjp.proceed();
} catch (Throwable e) {
throw new Exception(e);
} finally {
rLock.unlock();
LOG.info(String.format("unlock for key : %s , expire : %s", lockKey, expireTime));
}
return response;
通过AOP切面实现分布式锁可以在不修改原有方法的情况下,为方法添加分布式锁的功能,从而实现了代码解耦、增强可维护性
使用方式
@Override
@DistributeLock(keyExpression = "#request.identifier", scene = "ORDER_CREATE")
@Facade
public OrderResponse create(OrderCreateRequest request) {
try {
orderValidatorChain.validate(request);
} catch (Exception e) {
return new OrderResponse.OrderResponseBuilder().buildFail(ORDER_CREATE_VALID_FAILED.getCode(), e.getMessage());
}
Boolean preDeductResult = inventoryWrapperService.preDeduct(request);
if (preDeductResult) {
return orderService.create(request);
}
throw new OrderException(OrderErrorCode.INVENTORY_DEDUCT_FAILED);
}