背景
在公司的某些业务场景中,存在多个节点操作同一数据库的需求。这些操作可能涉及对同一个表的并发读写。如果不加以控制,就可能出现以下问题:
多个节点同时操作同一条记录,导致数据不一致或冲突。
数据库抛出异常(如死锁或资源竞争),影响系统的稳定性。
为了解决这个问题设计并实现了一个基于数据库的简单分布式锁,以确保在并发场景下对关键资源的访问是可控且安全的。
需求
锁的唯一性:
每个分布式锁都需要以唯一的key
标识,确保不同业务场景间的锁不会互相干扰。并发控制:
在并发场景下,只有一个请求能成功获取到锁,其它请求需要等待锁的释放或直接返回获取失败的状态。锁的归属性:
锁归属于特定的请求,获取到锁的请求可以释放锁,但其它未持有锁的请求不能释放这把锁。这是为了防止锁被误释放,确保锁的安全性和正确性。自动过期:
每个分布式锁都需要设置一个过期时间(expire_at
),用于防止因异常情况导致的锁长时间未释放问题。当锁超过过期时间后,其他节点可以获取锁,从而避免资源被长时间占用的问题。需要注意,锁的过期时间应根据业务需求灵活配置,既要确保锁的有效性,也不能让锁持续占用资源过久。可重入性(扩展需求,暂未实现):
如果同一节点在持有锁的情况下,再次请求同一锁,不需要重新获取锁,而是直接允许进入。同时,需记录锁的重入次数,避免误释放。
解决思路
数据库表设计:
创建一个distributed_lock
表,用来存储锁的相关信息,包括锁的key
、锁的持有者(归属)、锁的过期时间等。CREATE TABLE distributed_lock ( lock_key VARCHAR(255) NOT NULL PRIMARY KEY, -- 锁的唯一标识 lock_owner VARCHAR(255), -- 持有锁的节点或请求标识 expire_at TIMESTAMP, -- 锁的过期时间 reentrant_count INT DEFAULT 0 -- 锁的重入次数(可选) );
锁的获取:
通过SELECT ... FOR UPDATE NOWAIT
的方式实现对锁的互斥访问。在查询到锁不存在或当前锁属于自己的情况下,可以成功获取锁。锁的释放:
根据锁的归属进行校验,只有当前锁的持有者才能释放锁。如果存在重入机制,则在释放时减少计数器,直到计数为 0 时真正释放锁。锁的过期处理:
为防止因异常未释放锁导致资源长时间被占用,锁需要设置一个过期时间(expire_at
)。当锁过期后,其他节点可以获取到锁。
实现
流程图:
代码实现
// 获取锁 public boolean getLock(DataSourceConfigDTO dataSourceConfigDTO, String lockKey, LocalDateTime expireTime) { DbTableBO sysApplicationConfig = DbTableServiceFactory.getService().getTable(dataSourceConfigDTO, "distributed_lock"); if (sysApplicationConfig == null) { // 创建表 createDistributedLockTable(dataSourceConfigDTO, DistributedLockModel.class); } // 获取锁记录 Map<String, Object> args = new HashMap<>(); args.put("lockKey", lockKey); DistributedLockModel distributedLockModel; String sql = "SELECT * FROM distributed_lock WHERE lock_key = #lockKey# FOR UPDATE NOWAIT"; try { distributedLockModel = this.getDaoService().queryForMap(dataSourceConfigDTO, sql, args, DistributedLockModel.class); } catch (Exception e) { logError(e.getMessage(), e); return false; } String lockOwner = getLockOwner(); // 为空,写入数据,获取成功 if (ObjectUtil.isNull(distributedLockModel)) { distributedLockModel = new DistributedLockModel(); distributedLockModel.setLockKey(lockKey); distributedLockModel.setLockOwner(lockOwner); distributedLockModel.setExpireAt(expireTime); Result insertResult; try { insertResult = this.getDaoService().insert(this.getModelBO(dataSourceConfigDTO, DistributedLockModel.class), BeanUtil.beanToMap(distributedLockModel)); } catch (Exception e) { // 多节点写入,主键冲突 return false; } return insertResult.isSuccess(); } // 不为空,未过期 if (distributedLockModel.getExpireAt().isAfter(LocalDateTime.now())) { // 同一个拥有者,获取成功 return distributedLockModel.getLockOwner().equals(lockOwner); } else { // 不为空,已过期,获取成功 Map<String, Object> updateArgs = new HashMap<>(); updateArgs.put("lockOwner", lockOwner); updateArgs.put("expireTime", expireTime); updateArgs.put("lockKey", lockKey); updateArgs.put("expireAt", distributedLockModel.getExpireAt()); String updateSql = "UPDATE distributed_lock SET lock_owner = #lockOwner#, expire_at = #expireTime# WHERE lock_key = #lockKey# AND expire_at = #expireAt#"; // 更新锁信息 int update = this.getDaoService().update(dataSourceConfigDTO, updateSql, updateArgs); return update == 1; } } // 获取当前请求归属人 protected String getLockOwner() { if (!ThreadLocalContext.isInit()) { ThreadLocalContext.init(); } String lockOwner = (String) ThreadLocalContext.get("lockOwner"); if (lockOwner == null) { lockOwner = UUID.randomUUID().toString(); } ThreadLocalContext.put("lockOwner", lockOwner); return lockOwner; } // 释放锁 public boolean releaseLock(DataSourceConfigDTO dataSourceConfigDTO, String lockKey) { try { String lockOwner = getLockOwner(); Map<String, Object> args = new HashMap<>(); args.put("lock_key", lockKey); args.put("lock_owner", lockOwner); Result deleteResult = this.getDaoService().delete(getModelBO(dataSourceConfigDTO, DistributedLockModel.class), args); if (!deleteResult.isSuccess()) { logError("释放锁失败"); } return deleteResult.isSuccess(); } finally { ThreadLocalContext.remove(); } }
总结
通过基于数据库的分布式锁机制,可以有效解决多节点并发访问共享资源时的数据冲突问题。在设计时,需要重点关注锁的唯一性、并发控制、归属性,以及在复杂业务场景下对锁重入和过期处理的支持。