背景

在公司的某些业务场景中,存在多个节点操作同一数据库的需求。这些操作可能涉及对同一个表的并发读写。如果不加以控制,就可能出现以下问题:

  • 多个节点同时操作同一条记录,导致数据不一致或冲突。

  • 数据库抛出异常(如死锁或资源竞争),影响系统的稳定性。

为了解决这个问题设计并实现了一个基于数据库的简单分布式锁,以确保在并发场景下对关键资源的访问是可控且安全的。

需求

  1. 锁的唯一性:
    每个分布式锁都需要以唯一的 key 标识,确保不同业务场景间的锁不会互相干扰。

  2. 并发控制:
    在并发场景下,只有一个请求能成功获取到锁,其它请求需要等待锁的释放或直接返回获取失败的状态。

  3. 锁的归属性:
    锁归属于特定的请求,获取到锁的请求可以释放锁,但其它未持有锁的请求不能释放这把锁。这是为了防止锁被误释放,确保锁的安全性和正确性。

  4. 自动过期:
    每个分布式锁都需要设置一个过期时间expire_at),用于防止因异常情况导致的锁长时间未释放问题。当锁超过过期时间后,其他节点可以获取锁,从而避免资源被长时间占用的问题。需要注意,锁的过期时间应根据业务需求灵活配置,既要确保锁的有效性,也不能让锁持续占用资源过久。

  5. 可重入性(扩展需求,暂未实现):
    如果同一节点在持有锁的情况下,再次请求同一锁,不需要重新获取锁,而是直接允许进入。同时,需记录锁的重入次数,避免误释放。

解决思路

  • 数据库表设计:
    创建一个 distributed_lock 表,用来存储锁的相关信息,包括锁的 key、锁的持有者(归属)、锁的过期时间等。

    CREATE TABLE distributed_lock (
        lock_key VARCHAR(255) NOT NULL PRIMARY KEY, -- 锁的唯一标识
        lock_owner VARCHAR(255),                    -- 持有锁的节点或请求标识
        expire_at TIMESTAMP,                        -- 锁的过期时间
        reentrant_count INT DEFAULT 0               -- 锁的重入次数(可选)
    );

  • 锁的获取:
    通过 SELECT ... FOR UPDATE NOWAIT 的方式实现对锁的互斥访问。在查询到锁不存在或当前锁属于自己的情况下,可以成功获取锁。

  • 锁的释放:
    根据锁的归属进行校验,只有当前锁的持有者才能释放锁。如果存在重入机制,则在释放时减少计数器,直到计数为 0 时真正释放锁。

  • 锁的过期处理:
    为防止因异常未释放锁导致资源长时间被占用,锁需要设置一个过期时间expire_at)。当锁过期后,其他节点可以获取到锁。

实现

  1. 流程图:

  2. 代码实现

    // 获取锁
    public boolean getLock(DataSourceConfigDTO dataSourceConfigDTO, String lockKey, LocalDateTime expireTime) {
    
            DbTableBO sysApplicationConfig = DbTableServiceFactory.getService().getTable(dataSourceConfigDTO, "distributed_lock");
            if (sysApplicationConfig == null) {
                // 创建表 
                createDistributedLockTable(dataSourceConfigDTO, DistributedLockModel.class);
            }
    
            // 获取锁记录
            Map<String, Object> args = new HashMap<>();
            args.put("lockKey", lockKey);
            DistributedLockModel distributedLockModel;
            String sql = "SELECT * FROM distributed_lock WHERE lock_key = #lockKey# FOR UPDATE NOWAIT";
            try {
                distributedLockModel = this.getDaoService().queryForMap(dataSourceConfigDTO, sql, args, DistributedLockModel.class);
            } catch (Exception e) {
                logError(e.getMessage(), e);
                return false;
            }
    
            String lockOwner = getLockOwner();
    
            // 为空,写入数据,获取成功
            if (ObjectUtil.isNull(distributedLockModel)) {
                distributedLockModel = new DistributedLockModel();
                distributedLockModel.setLockKey(lockKey);
                distributedLockModel.setLockOwner(lockOwner);
                distributedLockModel.setExpireAt(expireTime);
                Result insertResult;
                try {
                    insertResult = this.getDaoService().insert(this.getModelBO(dataSourceConfigDTO, DistributedLockModel.class), BeanUtil.beanToMap(distributedLockModel));
                } catch (Exception e) {
                    // 多节点写入,主键冲突
                    return false;
                }
                return insertResult.isSuccess();
            }
    
            // 不为空,未过期
            if (distributedLockModel.getExpireAt().isAfter(LocalDateTime.now())) {
                // 同一个拥有者,获取成功
                return distributedLockModel.getLockOwner().equals(lockOwner);
            } else {
                // 不为空,已过期,获取成功
                Map<String, Object> updateArgs = new HashMap<>();
                updateArgs.put("lockOwner", lockOwner);
                updateArgs.put("expireTime", expireTime);
                updateArgs.put("lockKey", lockKey);
                updateArgs.put("expireAt", distributedLockModel.getExpireAt());
                String updateSql = "UPDATE distributed_lock SET lock_owner = #lockOwner#, expire_at = #expireTime# WHERE lock_key = #lockKey# AND expire_at = #expireAt#";
                // 更新锁信息
                int update = this.getDaoService().update(dataSourceConfigDTO, updateSql, updateArgs);
                return update == 1;
            }
        }
    
    
    // 获取当前请求归属人
    protected String getLockOwner() {
            if (!ThreadLocalContext.isInit()) {
                ThreadLocalContext.init();
            }
    
            String lockOwner = (String) ThreadLocalContext.get("lockOwner");
            if (lockOwner == null) {
                lockOwner = UUID.randomUUID().toString();
            }
    
            ThreadLocalContext.put("lockOwner", lockOwner);
    
            return lockOwner;
        }
    
    
    // 释放锁
    public boolean releaseLock(DataSourceConfigDTO dataSourceConfigDTO, String lockKey) {
            try {
                String lockOwner = getLockOwner();
                Map<String, Object> args = new HashMap<>();
                args.put("lock_key", lockKey);
                args.put("lock_owner", lockOwner);
                Result deleteResult = this.getDaoService().delete(getModelBO(dataSourceConfigDTO, DistributedLockModel.class), args);
                if (!deleteResult.isSuccess()) {
                    logError("释放锁失败");
                }
                return deleteResult.isSuccess();
            } finally {
                ThreadLocalContext.remove();
            }
        }

总结

通过基于数据库的分布式锁机制,可以有效解决多节点并发访问共享资源时的数据冲突问题。在设计时,需要重点关注锁的唯一性、并发控制、归属性,以及在复杂业务场景下对锁重入和过期处理的支持。

下辈子还是当只猫吧