redis cluster集群模式下实现批量可重入锁


    目录
  • 一、redis cluster 集群版
  • 二、redis 分布式锁
  • 三、如何使用 redis 实现批量可重入锁?
    • 1、方案一:Lua脚本批量加锁
    • 2、方案二:pipeline批量加锁
  • 四、总结

    一、redis cluster 集群版
    在Redis 3.0版本以后,Redis发布了Redis Cluster。该集群主要支持搞并发和海量数据处理等优势,当 Redis 在集群模式下运行时,它处理数据存储的方式与作为单个实例运行时不同。这是因为它应该准备好跨多个节点分发数据,从而实现水平可扩展性。具体能力表现为:
    
  • 自动分割数据到不同的节点上
  • 整个集群的部分节点失败或者不可达的情况下能够继续处理命令

    Redis没有使用一致性hash,而是引入哈希槽的概念,也就是 Hash Slot。Redis集群由16384个哈希槽slot,每个key通过CRC16校验后对16384取模来决定放置那个槽,集群的每个节点负责一部分hash槽,也就是说数据存放在hash槽里,而每个节点只负责部分hash槽(这样数据就存放在不同的节点)。
    例如:node1、node2、node3三个节点,node1节点负责0到5500号hash槽,node2节点负责5501到11000号hash槽,node3节点负责11001到16384号hash槽。这种结构很容易添加或者删除节点,比如如果我想新添加个节点node4, 我需要从节点 node1, node2, node3中得部分槽到node4上. 如果我想移除节点node1,需要将node1中的槽移到node2和node3节点上,然后将没有任何槽的node1节点从集群中移除即可. 由于从一个节点将哈希槽移动到另一个节点并不会停止服务,所以无论添加删除或者改变某个节点的哈希槽的数量都不会造成集群不可用的状态。
    
    在某些集群方案中,涉及多个key的操作会被限制在一个slot中,如Redis Cluster中的mget/mset操作。这里就会涉及到 哈希标签 Hash Tag 的概念。
    Hash Tag是用于计算哈希槽时的一个特殊场景,是一种确保多个键分配到同一个哈希槽中的方法。这是为了在Redis集群中实现多键操作而使用的。为了实现Hash Tag,在某些情况下,会以稍微不同的方式计算key的哈希槽。如果key包含"{…}"模式,则仅对{和}之间的子字符串进行散列以获取哈希槽。但由于可能存在多个{或}出现,因此该算法遵循以下规则:
    
  • 如果key包含字符 {
  • 并且如果 } 字符位于 { 的右侧
  • 并且在第一个 { 和第一个 } 之间存在一个或多个字符

    对于符合上述规则的key,则不会对整个key进行散列处理,而只会对第一次出现 { 和随后第一次出现 } 之间的内容进行散列。否则,对整个key进行散列处理。
    
    不使用hash tag批量设置不同名称的key:
    
127.0.0.1:6379> mset name name1 name2 name3
(error) CROSSSLOT Keys in request don't hash to the same slot

    显示错误信息:CROSSSLOT 请求中的key没有哈希到同一个插槽。这个问题是因为多键操作的时候每个键对应的slot可能不是一个,客户端没法做move操作。
    解决思路就是采用redis cluster的hashTag,当redis的key加上hashTag时,集群算key的slot是按照hashTag进行计算,即可保证hashTag一致的key能分配到相同的stlot中。:
    
127.0.0.1:6379> mset name {name} {name}1 {name}2 {name}3

    二、redis 分布式锁
    Redis锁使用起来比较简单,既可以锁定单个键,也可以批量锁定多个键,以实现更大规模的操作。它也是分布式应用中使用最广泛的分布式锁实施方式,可以有效解决单点故障、死锁和负载失衡等问题。
    大规模锁定Redis,实现批量操作,一般通过以下实现:
    
  • 使用Redis的消息订阅机制,创建消息频道,用于锁定指定键之间的多个键。消息频道的名字称为锁名,它代表锁定的范围和跨度。
  • 然后,通过Redis的SUBSCRIBE命令订阅消息频道名字,比如“ lock_key”,并调用Redis BLPOP,将锁定的键占据,以实现批量锁定。
  • 此外,也可以使用Redis的Lua脚本实现批量锁定。获取带锁的Key数组,这里以数组形式表示。同时,以原子的形式执行多个SETNX命令,一旦全部执行成功,则实现批量锁定:

    
local locks = red:lrange("lock_keys", 1, -1)
for i, v in iprs(locks) do
    if redis.call("setnx", v, field) == 1 then
        red.lpush("locked_keys", v)
    end
end

    释放锁定的键,实现批量解锁,语句如下:
    
local unlocked_locks = red:lrange("locked_keys",1, -1)
for i, v in iprs(unlocked_locks) do
    red.del(v)
end
red.del("locked_keys")

    使用Redis的WATCH功能,防止多个客户端同时更新同一键,即如果更新发生乐观锁的冲突的情况下,返回失败给客户端,从而保证了锁定的原子性:
    
-- 使用Redis watch,开始监听
red.watch("lock_keys")
-- 进行具体操作
-- …
-- 解锁操作
red.unwatch()

    Redis锁使用起来非常简单,可以用于单个键锁定和大规模锁定,从而实现批量操作,有效解决分布式应用中的死锁、负载失衡、单点故障等问题。
    三、如何使用 redis 实现批量可重入锁?
    1、方案一:Lua脚本批量加锁
    Lua加锁脚本处理:
    
	/**
	 * 加锁脚本
	 * KEYS[1] key
	 * ARGV[1] value
	 * ARGV[2] expire
	 * 判断key是否存在,不存在则加锁,并记录加锁次数+1;若存在,则判断value是否相等,相等则记录加锁次数+1,不相等则返回0
	 */
	private static final String REENTRANT_LOCK_SCRIPT = "if redis.call('EXISTS', KEYS[1]) == 0 then " +
        "    redis.call('SET', KEYS[1], ARGV[1]) " +
        "    redis.call('EXPIRE', KEYS[1], ARGV[2]) " +
        "    redis.call('INCR', 'lockCount:' .. KEYS[1]) " +
        "    return 1 " +
        "else " +
        "    if redis.call('GET', KEYS[1]) == ARGV[1] then " +
        "        redis.call('EXPIRE', KEYS[1], ARGV[2]) " +
        "        redis.call('INCR', 'lockCount:' .. KEYS[1]) " +
        "        return 1 " +
        "    else " +
        "        return 0 " +
        "    end " +
        "end";
        
    @Bean
    public DefaultRedisScript<Long> reentrantLockRedisScript(){
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(REENTRANT_LOCK_SCRIPT, Long.class);
        try {
            List<Boolean> results= redisTemplate.getConnectionFactory().getConnection().scriptExists(redisScript.getSha1());
            if(Boolean.FALSE.equals(results.get(0)))
            {
                redisTemplate.getConnectionFactory().getConnection().scriptLoad(redisScript.getScriptAsString().getBytes());
                log.info("redis reentrantLockRedisScript load success");
            }

        }catch (Exception ex){
            log.error("redis reentrantLockRedisScript load error",ex);
        }

        return redisScript;
    }


    Lua解锁脚本处理:
    
    private static final String RELEASE_REENTRANT_LOCK_SCRIPT = "if redis.call('EXISTS', KEYS[1]) == 0 then " +
            "    return 0 " +
            "else " +
            "    if redis.call('GET', KEYS[1]) == ARGV[1] then " +
            "        redis.call('DECR', 'lockCount:' .. KEYS[1]) " +
            "        if redis.call('GET', 'lockCount:' .. KEYS[1]) == '0' then " +
            "            redis.call('DEL', KEYS[1]) " +
            "        end " +
            "        return 1 " +
            "    else " +
            "        return 0 " +
            "    end " +
            "end";
    @Bean
    public DefaultRedisScript<Long> releaseReentrantLockRedisScript(){
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(RELEASE_REENTRANT_LOCK_SCRIPT, Long.class);
        try {
            List<Boolean> results= redisTemplate.getConnectionFactory().getConnection().scriptExists(redisScript.getSha1());
            if(Boolean.FALSE.equals(results.get(0)))
            {
                redisTemplate.getConnectionFactory().getConnection().scriptLoad(redisScript.getScriptAsString().getBytes());
                log.info("redis releaseReentrantLockRedisScript load success");
            }

        }catch (Exception ex){
            log.error("redis releaseReentrantLockRedisScript load error",ex);
        }

        return redisScript;
    }

    redis set 多个key场景需要hash tag处理:
    
private static final String KEY_FORMAT = "%s{%s}";
String slotKey = String.format(KEY_FORMAT, key, key);

    结合以上封装,批量可重入锁的方法处理如下:
    
/**
     * 可重入锁,value相同的情况下,可重复加锁,当所有加锁方都解锁后才会释放锁
     *
     * @param key           锁的key
     * @param value         锁的value
     * @param expireSeconds 锁过期时间
     * @param waitSeconds   等待时间
     * @param process
     * @return
     */
    public boolean reentrantLock(String key, String value, long expireSeconds, long waitSeconds, Runnable process) {
        boolean lock = false;
        try {
            //特殊处理key,为了保证原key和计数key落在同一个slot,将原key拼装成: key{key}
            String slotKey = String.format(KEY_FORMAT, key, key);
            long start = System.currentTimeMillis();
            while (!(lock = redisUtil.execute(reentrantLockRedisScript, Collections.singletonList(slotKey), value, expireSeconds) == 1)) {
                Thread.sleep(100);
                if ((System.currentTimeMillis() - start) / 1000 > waitSeconds) {
                    break;
                }
            }
            //加锁成功,执行传入的方法,最后用lua脚本判断锁的value是否还是当前的value,是则执行解锁
            if (lock) {
                try {
                    process.run();
                } catch (Exception ex) {
                    throw ex instanceof BusinessException ? ex : new BusinessException(ex.getMessage());
                } finally {
                    redisUtil.execute(releaseReentrantLockRedisScript, Collections.singletonList(slotKey), value);
                }
            }

        } catch (BusinessException businessException) {
            throw businessException;
        } catch (Exception e) {
            log.error("redis lockAndRun error!lockKey=" + key, e);
        }
        return lock;
    }

    /**
     * 先获取可重入锁,获取成功后批量加锁,执行传入的方法
     *
     * @param keys
     * @param reentrantKey
     * @param value
     * @param expireSeconds
     * @param waitSeconds
     * @param process
     * @return
     */
    public boolean batchReentrantLock(Set<String> keys, String reentrantKey, String value, long expireSeconds, long waitSeconds, Runnable process) {
        List<Boolean> result = new ArrayList<>(1);
        boolean reentrantLock = reentrantLock(reentrantKey, value, expireSeconds, waitSeconds, () -> {
            result.add(batchLockAndRun(keys, expireSeconds, waitSeconds, process));
        });
        return reentrantLock && result.get(0);
    }

    2、方案二:pipeline批量加锁
  • 不用lua以避免cross slot error
  • 批量加锁失败后立即全部解锁,防止死锁

    
	/**
	 * 使用redisTemplate
	 * @param script
	 * @param keys
	 * @param args
	 * @param <T>
	 */
	public List<Object> executePipelined(String[] keys, String[] values, long time, TimeUnit timeUnit) {
	      return redisTemplate.executePipelined(new SessionCallback<Object>() {
	           @Override
	           public Object execute(RedisOperations operations) throws DataAccessException {
	               for (int i = 0; i < keys.length; i++) {
	                   operations.opsForValue().setIfAbsent(keys[i], values[i], time, timeUnit);
	               }
	               return null;
	           }
	       });
	}
	
	/**
	 * pipeline批量加锁
	 * @param keys          需要加锁的key
	 * @param values        锁的value,用于解锁时判断是否是当前线程加的锁
	 * @param expireSeconds 锁过期时间
	 * @return boolean 是否加锁成功
	 */
	private boolean tryBatchLock(String[] keys, String[] values, long expireSeconds) {
	    List<Object> results = redisUtil.executePipelined(keys, values, expireSeconds, TimeUnit.SECONDS);
	    if (results == null || results.size() != keys.length || results.contains(false)) {
	        //加锁失败,立即解锁
	        redisUtil.executePipelined(releaseReentrantLockRedisScript, keys, values);
	        return false;
	    }
	    return true;
	}

    四、总结
    以上是使用redis cluster集群版所遇到的问题以及解决方案,主要在业务实现过程中,需要注意redis cluster key会被划分到不同的槽中的问题,以及redis可重入锁是否会有死锁的问题等,更多相关redis cluster 批量可重入锁内容请搜索电脑手机教程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持电脑手机教程网!