redis分布式锁解决超卖_为什么大家都在抢茅台酒

Python (54) 2023-03-24 22:42

Hi,大家好,我是编程小6,很荣幸遇见你,我把这些年在开发过程中遇到的问题或想法写出来,今天说一说redis分布式锁解决超卖_为什么大家都在抢茅台酒,希望能够帮助你!!!。

大家好呀,我是狂野君,最近总听说茅台的很火,正好今日闲来无事,准备自食其力,自己搞瓶茅台尝尝,怎么搞呢,当然是靠技术了,哈哈哈

好了,废话不多说了,上干货了

在并发编程中,我们通过锁,来避免由于竞争而造成的数据不一致问题。通常,我们以synchronized 、Lock来使用它(单机情况)

我们来看一个案例:

高并发下单超卖问题

 @Autowired
     RedisTemplate<String,String> redisTemplate;
 ​
     String maotai = "maotai20210321001";//茅台商品编号
 ​
     @PostConstruct
     public void init(){
         //此处模拟向缓存中存入商品库存操作
         redisTemplate.opsForValue().set(maotai,"100");
     }
 ​
 ​
     @GetMapping("/get/maotai2")
     public String seckillMaotai2() {
         synchronized (this) {
             Integer count = Integer.parseInt(redisTemplate.opsForValue().get(maotai)); // 1
             //如果还有库存
             if (count > 0) {
                 //抢到了茅台,库存减一
                 redisTemplate.opsForValue().set(maotai,String.valueOf(count-1));
                 //后续操作 do something
                 log.info("我抢到茅台了!");
                 return "ok";
             }else {
                 return "no";
             }
         }
     }

问题分析:

  • 现象:本地锁在多节点下失效(集群/分布式)
  • 原因:本地锁它只能锁住本地JVM进程中的多个线程,对于多个JVM进程的不同线程间是锁不住的
  • 解决:分布式锁(在分布式环境下提供锁服务,并且达到本地锁的效果)

何为分布式锁

  • 当在分布式架构下,数据只有一份(或有限制),此时需要利用锁的技术控制某一时刻修改数据的进程数。
  • 用一个状态值表示锁,对锁的占用和释放通过状态值来标识。

分布式锁特点

  • 互斥性:不仅要在同一jvm进程下的不同线程间互斥,更要在不同jvm进程下的不同线程间互斥
  • 锁超时:支持锁的自动释放,防止死锁
  • 正确,高效,高可用:解铃还须系铃人(加锁和解锁必须是同一个线程),加锁和解锁操作一定要高效,提供锁的服务要具备容错性
  • 可重入:如果一个线程拿到了锁之后继续去获取锁还能获取到,我们称锁是可重入的(方法的递归调用)
  • 阻塞/非阻塞:如果获取不到直接返回视为非阻塞的,如果获取不到会等待锁的释放直到获取锁或者等待超时,视为阻塞的
  • 公平/非公平:按照请求的顺序获取锁视为公平的

基于Redis实现分布式锁

实现思路:

锁的实现主要基于redis的SETNX命令:

SETNX key value

将 key 的值设为 value ,当且仅当 key 不存在。

若给定的 key 已经存在,则 SETNX 不做任何动作。

SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。

返回值: 设置成功,返回 1 设置失败,返回 0

使用SETNX完成同步锁的流程及事项如下:

  1. 使用SETNX命令获取锁,若返回0(key已存在,锁已存在)则获取失败,反之获取成功
  2. 为了防止获取锁后程序出现异常,导致其他线程/进程调用SETNX命令总是返回0而进入死锁状态,需要为该key设置一个“合理”的过期时间
  3. 释放锁,使用DEL命令将锁数据删除

实现代码版本1:

 @GetMapping("/get/maotai3")
    public String seckillMaotai3() {
		
        //获取锁
        Boolean islock = redisTemplate.opsForValue().setIfAbsent(lockey, "1");
        if (islock) {
            //设置锁的过期时间
            redisTemplate.expire(lockey,5, TimeUnit.SECONDS);
            try {
                Integer count = Integer.parseInt(redisTemplate.opsForValue().get(maotai)); // 1
                //如果还有库存
                if (count > 0) {
                    //抢到了茅台,库存减一
                    redisTemplate.opsForValue().set(maotai,String.valueOf(count-1));
                    //后续操作 do something
                    log.info("我抢到茅台了!");
                    return "ok";
                }else {
                    return "no";
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //释放锁
                redisTemplate.delete(lockey);
            }
        }
        return "dont get lock";
    }

问题分析:

    1. setnx 和 expire是非原子性操作(解决:2.6以前可用使用lua脚本,2.6以后可用set命令)
  • 2.错误解锁(如何保证解铃还须系铃人:给锁加一个唯一标识)

错误解锁问题解决:

 @GetMapping("/get/maotai4")
    public String seckillMaotai4() {
        String requestid = UUID.randomUUID().toString() + Thread.currentThread().getId();
        /*String locklua ="" +
                "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then redis.call('expire',KEYS[1],ARGV[2]) ; return true " +
                "else return false " +
                "end";
        Boolean islock = redisTemplate.execute(new RedisCallback<Boolean>() {
            @Override
            public Boolean doInRedis(RedisConnection redisConnection) throws DataAccessException {
                Boolean eval = redisConnection.eval(
                        locklua.getBytes(),
                        ReturnType.BOOLEAN,
                        1,
                        lockey.getBytes(),
                        requestid.getBytes(),
                        "5".getBytes()
                );
                return eval;
            }
        });*/
        //获取锁
        Boolean islock = redisTemplate.opsForValue().setIfAbsent(lockey,requestid,5,TimeUnit.SECONDS);
        if (islock) {
            try {
                Integer count = Integer.parseInt(redisTemplate.opsForValue().get(maotai)); // 1
                //如果还有库存
                if (count > 0) {
                    //抢到了茅台,库存减一
                    redisTemplate.opsForValue().set(maotai,String.valueOf(count-1));
                    //后续操作 do something
                    log.info("我抢到茅台了!");
                    return "ok";
                }else {
                    return "no";
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //释放锁
                //判断是自己的锁才能去释放 这种操作不是原子性的
                /*String id = redisTemplate.opsForValue().get(lockey);
                if (id !=null && id.equals(requestid)) {
                    redisTemplate.delete(lockey);
                }*/
                String unlocklua = "" +
                        "if redis.call('get',KEYS[1]) == ARGV[1] then redis.call('del',KEYS[1]) ; return true " +
                        "else return false " +
                        "end";
                redisTemplate.execute(new RedisCallback<Boolean>() {
                    @Override
                    public Boolean doInRedis(RedisConnection redisConnection) throws DataAccessException {
                        Boolean eval = redisConnection.eval(
                                unlocklua.getBytes(),
                                ReturnType.BOOLEAN,
                                1,
                                lockey.getBytes(),
                                requestid.getBytes()
                        );
                        return eval;
                    }
                });
            }
        }
        return "dont get lock";
    }

锁续期/锁续命

 /** * 3,锁续期/锁续命 * 拿到锁之后执行业务,业务的执行时间超过了锁的过期时间 * * 如何做? * 给拿到锁的线程创建一个守护线程(看门狗),守护线程定时/延迟 判断拿到锁的线程是否还继续持有锁,如果持有则为其续期 * */
    //模拟一下守护线程为其续期
    ScheduledExecutorService executorService;//创建守护线程池
    ConcurrentSkipListSet<String> set = new ConcurrentSkipListSet<String>();//队列

    @PostConstruct
    public void init2(){
        executorService = Executors.newScheduledThreadPool(1);

        //编写续期的lua
        String expirrenew = "" +
                "if redis.call('get',KEYS[1]) == ARGV[1] then redis.call('expire',KEYS[1],ARGV[2]) ; return true " +
                "else return false " +
                "end";

        executorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                Iterator<String> iterator = set.iterator();
                while (iterator.hasNext()) {
                    String rquestid = iterator.next();

                    redisTemplate.execute(new RedisCallback<Boolean>() {
                        @Override
                        public Boolean doInRedis(RedisConnection redisConnection) throws DataAccessException {
                            Boolean eval = false;
                            try {
                                eval = redisConnection.eval(
                                        expirrenew.getBytes(),
                                        ReturnType.BOOLEAN,
                                        1,
                                        lockey.getBytes(),
                                        rquestid.getBytes(),
                                        "5".getBytes()
                                );
                            } catch (Exception e) {
                                log.error("锁续期失败,{}",e.getMessage());
                            }
                            return eval;
                        }
                    });

                }
            }
        },0,1,TimeUnit.SECONDS);
    }

@GetMapping("/get/maotai5")
    public String seckillMaotai5() {
        String requestid = UUID.randomUUID().toString() + Thread.currentThread().getId();
        //获取锁
        Boolean islock = redisTemplate.opsForValue().setIfAbsent(lockey,requestid,5,TimeUnit.SECONDS);
        if (islock) {
            //获取锁成功后让守护线程为其续期
            set.add(requestid);
            try {
                Integer count = Integer.parseInt(redisTemplate.opsForValue().get(maotai)); // 1
                //如果还有库存
                if (count > 0) {
                    //抢到了茅台,库存减一
                    redisTemplate.opsForValue().set(maotai,String.valueOf(count-1));
                    //后续操作 do something
                    //seckillMaotai5();
                    //模拟业务超时
                    TimeUnit.SECONDS.sleep(10);
                    log.info("我抢到茅台了!");
                    return "ok";
                }else {
                    return "no";
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //解除锁续期
               set.remove(requestid);
                //释放锁
                String unlocklua = "" +
                        "if redis.call('get',KEYS[1]) == ARGV[1] then redis.call('del',KEYS[1]) ; return true " +
                        "else return false " +
                        "end";
                redisTemplate.execute(new RedisCallback<Boolean>() {
                    @Override
                    public Boolean doInRedis(RedisConnection redisConnection) throws DataAccessException {
                        Boolean eval = redisConnection.eval(
                                unlocklua.getBytes(),
                                ReturnType.BOOLEAN,
                                1,
                                lockey.getBytes(),
                                requestid.getBytes()
                        );
                        return eval;
                    }
                });
            }
        }
        return "dont get lock";
    }

锁的可重入/阻塞锁(redisson)

    /** * * 4,如何支持可重入 * 重入次数/过期时间 * 获取 * 获取 * 获取 * * 释放 * 释放 * 释放 * * 基于本地实现 * 还是基于redis但是更换了数据类型,采用hash类型来实现 * key field value * 锁key 请求id 重入次数 * 用lua实现 * * * 5,阻塞/非阻塞的问题:现在的锁是非阻塞的,一旦获取不到锁直接返回了 * 如何做一个阻塞锁呢? * 获取不到就等待锁的释放,直到获取到锁或者等待超时 * 1:基于客户端轮询的方案 * 2:基于redis的发布/订阅方案 * * * 有没有好的实现呢? * Redisson * */
    @Value("${spring.redis.host}")
    String host;
    @Value("${spring.redis.port}")
    String port;

 @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://"+host+":"+port);
        return Redisson.create(config);
    }

    @Autowired
    RedissonClient redissonClient;


    @GetMapping("/get/maotai6")
    public String seckillMaotai6() {
        //要去获取锁
        RLock lock = redissonClient.getLock(lockey);
        lock.lock();
        try {
            Integer count = Integer.parseInt(redisTemplate.opsForValue().get(maotai)); // 1
            //如果还有库存
            if (count > 0) {
                //抢到了茅台,库存减一
                redisTemplate.opsForValue().set(maotai,String.valueOf(count-1));
                //后续操作 do something
                log.info("我抢到茅台了!");
                return "ok";
            }else {
                return "no";
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();;
        }
        return "";
    }

redisson

概述

Redisson内置了一系列的分布式对象分布式集合分布式锁分布式服务等诸多功能特性,是一款基于Redis实现,拥有一系列分布式系统功能特性的工具包,是实现分布式系统架构中缓存中间件的最佳选择。

下载地址:github.com/redisson/re…

实现

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.8.2</version>
</dependency>
 @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://"+host+":"+port);
        return Redisson.create(config);
    }

    @Autowired
    RedissonClient redissonClient;


    @GetMapping("/get/maotai6")
    public String seckillMaotai6() {
        //要去获取锁
        RLock lock = redissonClient.getLock(lockey);
        lock.lock();
        try {
            Integer count = Integer.parseInt(redisTemplate.opsForValue().get(maotai)); // 1
            //如果还有库存
            if (count > 0) {
                //抢到了茅台,库存减一
                redisTemplate.opsForValue().set(maotai,String.valueOf(count-1));
                //后续操作 do something
                log.info("我抢到茅台了!");
                return "ok";
            }else {
                return "no";
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();;
        }
        return "";
    }

源码剖析

1,加锁的(是否支持重入)
2,锁续期的
3,阻塞获取
4,释放
/**
     * 源码如下:
     * 1,加锁
     * <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
     *         internalLockLeaseTime = unit.toMillis(leaseTime);
     *
     *         return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
     *                    #如果锁key不存在
     *                   "if (redis.call('exists', KEYS[1]) == 0) then " +
     *                        #设置锁key,field是唯一标识,value是重入次数
     *                       "redis.call('hset', KEYS[1], ARGV[2], 1); " +
     *                       #设置锁key的过期时间 默认30s
     *                       "redis.call('pexpire', KEYS[1], ARGV[1]); " +
     *                       "return nil; " +
     *                   "end; " +
     *                   #如果锁key存在
     *                   "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
     *                        #重入次数+1
     *                       "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
     *                        #重置过期时间
     *                       "redis.call('pexpire', KEYS[1], ARGV[1]); " +
     *                       "return nil; " +
     *                   "end; " +
     *                   "return redis.call('pttl', KEYS[1]);",
     *                     Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
     *     }
     *
     *  2,锁续期
     *   private void scheduleExpirationRenewal(final long threadId) {
     *         if (expirationRenewalMap.containsKey(getEntryName())) {
     *             return;
     *         }
     *
     *         Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
     *             @Override
     *             public void run(Timeout timeout) throws Exception {
     *                 //续期函数的真正实现
     *                 RFuture<Boolean> future = renewExpirationAsync(threadId);
     *
     *                 future.addListener(new FutureListener<Boolean>() {
     *                     @Override
     *                     public void operationComplete(Future<Boolean> future) throws Exception {
     *                         expirationRenewalMap.remove(getEntryName());
     *                         if (!future.isSuccess()) {
     *                             log.error("Can't update lock " + getName() + " expiration", future.cause());
     *                             return;
     *                         }
     *
     *                         if (future.getNow()) {
     *                             // reschedule itself  再次调用自己,最终形成的结果就是每隔10秒续期一次
     *                             scheduleExpirationRenewal(threadId);
     *                         }
     *                     }
     *                 });
     *             }
     *          // internalLockLeaseTime=30 * 100030秒
     *         }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); //30/3=10秒后异步执行续期函数
     *
     *         if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
     *             task.cancel();
     *         }
     *     }
     *
     *     续期的lua脚本:判断key,field存在则重置过期时间
     *     protected RFuture<Boolean> renewExpirationAsync(long threadId) {
     *         return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
     *                 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
     *                     "redis.call('pexpire', KEYS[1], ARGV[1]); " +
     *                     "return 1; " +
     *                 "end; " +
     *                 "return 0;",
     *             Collections.<Object>singletonList(getName()),
     *             internalLockLeaseTime, getLockName(threadId));
     *     }
     *
     *
     *
     * 4,阻塞锁实现
     *  public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
     *         long threadId = Thread.currentThread().getId();
     *         Long ttl = tryAcquire(leaseTime, unit, threadId);
     *         // lock acquired
     *         if (ttl == null) {
     *             return;
     *         }
     *         //如果没有获取到锁,则订阅:redisson_lock__channel:{key} 频道
     *         RFuture<RedissonLockEntry> future = subscribe(threadId);
     *         commandExecutor.syncSubscription(future);
     *
     *         try {
     *             while (true) {
     *                //尝试再获取一次
     *                 ttl = tryAcquire(leaseTime, unit, threadId);
     *                 // lock acquired
     *                 if (ttl == null) {
     *                     break;
     *                 }
     *
     *                 // waiting for message 阻塞等待锁订阅频道的消息,一旦锁被释放,就会得到信号通知,继续尝试获取锁
     *                 if (ttl >= 0) {
     *                     getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
     *                 } else {
     *                     getEntry(threadId).getLatch().acquire();
     *                 }
     *             }
     *         } finally {
     *            //获取到锁后取消订阅
     *             unsubscribe(future, threadId);
     *         }
     * //        get(lockAsync(leaseTime, unit));
     *     }
     *
     *
     * 5,解锁
     * protected RFuture<Boolean> unlockInnerAsync(long threadId) {
     *         return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
     *                 //key已经不存在了,则向redisson_lock__channel:{key}频道发布锁释放消息
     *                 "if (redis.call('exists', KEYS[1]) == 0) then " +
     *                     "redis.call('publish', KEYS[2], ARGV[1]); " +
     *                     "return 1; " +
     *                 "end;" +
     *                  // hash 中的field 不存在时直接返回,
     *                 "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
     *                     "return nil;" +
     *                 "end; " +
     *                 "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
     *                 //重入次数-1后如果还大于0,延长过期时间
     *                 "if (counter > 0) then " +
     *                     "redis.call('pexpire', KEYS[1], ARGV[2]); " +
     *                     "return 0; " +
     *                 "else " +
     *                 //重入次数-1后如果归0,则删除key,并向redisson_lock__channel:{key}频道发布锁释放消息
     *                     "redis.call('del', KEYS[1]); " +
     *                     "redis.call('publish', KEYS[2], ARGV[1]); " +
     *                     "return 1; "+
     *                 "end; " +
     *                 "return nil;",
     *                 Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
     *
     *     }
     */

布隆过滤器(BloomFilter)

引言:

问题1:什么是Redis缓存穿透?缓存穿透如何解决?

问题2:如何在海量元素中(例如 10 亿无序、不定长、不重复)快速判断一个元素是否存在?

什么是 BloomFilter

布隆过滤器(英语:Bloom Filter)是 1970 年由Burton Howard Bloom提出的,是一种空间效率高的概率型数据结构。

本质上其实就是一个很长的二进制向量和一系列随机映射函数。专门用来检测集合中是否存在特定的元素

产生的契机

回想一下,我们平常在检测集合中是否存在某元素时,都会采用比较的方法。考虑以下情况:

  • 如果集合用线性表存储,查找的时间复杂度为O(n)。
  • 如果用平衡BST(如AVL树、红黑树)存储,时间复杂度为O(logn)。
  • 如果用哈希表存储,并用链地址法与平衡BST解决哈希冲突(参考JDK8的HashMap实现方法),时间复杂度也要有O[log(n/m)],m为哈希分桶数。

redis分布式锁解决超卖_为什么大家都在抢茅台酒_https://bianchenghao6.com/blog_Python_第1张

总而言之,当集合中元素的数量极多时,不仅查找会变得很慢,而且占用的空间也会大到无法想象。BF就是解决这个矛盾的利器。

数据结构&设计思想

BF是由一个长度为m比特的位数组(bit array)k个哈希函数(hash function) 组成的数据结构。位数组均初始化为0,所有哈希函数都可以分别把输入数据尽量均匀地散列。

redis分布式锁解决超卖_为什么大家都在抢茅台酒_https://bianchenghao6.com/blog_Python_第2张

基于BitMap:

redis分布式锁解决超卖_为什么大家都在抢茅台酒_https://bianchenghao6.com/blog_Python_第3张

如果我们要映射一个值到布隆过滤器中,我们需要使用多个不同的哈希函数生成多个哈希值, 并对每个生成的哈希值指向的 bit 位,设置为1

例:

redis分布式锁解决超卖_为什么大家都在抢茅台酒_https://bianchenghao6.com/blog_Python_第4张

redis分布式锁解决超卖_为什么大家都在抢茅台酒_https://bianchenghao6.com/blog_Python_第5张

当要插入一个元素时,将其数据分别输入k个哈希函数,产生k个哈希值。以哈希值作为位数组中的下标,将所有k个对应的比特置为1。

当要查询(即判断是否存在)一个元素时,同样将其数据输入哈希函数,然后检查对应的k个比特。如果有任意一个比特为0,表明该元素一定不在集合中。如果所有比特均为1,表明该集合有(较大的)可能性在集合中。为什么不是一定在集合中呢?因为一个比特被置为1有可能会受到其他元素的影响,这就是所谓“假阳性”(false positive)。相对地,“假阴性”(false negative)在BF中是绝不会出现的。

  • 如果这些点有任何一个 0,则被检索元素一定不在
  • 如果都是 1,则被检索元素很可能在。

误判率问题分析

redis分布式锁解决超卖_为什么大家都在抢茅台酒_https://bianchenghao6.com/blog_Python_第6张

哈希函数有以下两个特点:

  • 如果两个散列值是不相同的(根据同一函数),那么这两个散列值的原始输入也是不相同的。
  • 散列函数的输入和输出不是唯一对应关系的,如果两个散列值相同,两个输入值很可能是相同的。但也可能不同,这种情况称为 “散列碰撞”(或者 “散列冲突”)

redis分布式锁解决超卖_为什么大家都在抢茅台酒_https://bianchenghao6.com/blog_Python_第7张

布隆过滤器的误判是指多个输入经过哈希之后在相同的bit位置1了,这样就无法判断究竟是哪个输入产生的,因此误判的根源在于相同的 bit 位被多次映射且置 1。

不支持删除

hash碰撞这种情况也造成了布隆过滤器的删除问题,传统的布隆过滤器并不支持删除操作,因为布隆过滤器的每一个 bit 并不是独占的,很有可能多个元素共享了某一位。如果我们直接删除这一位的话,会影响其他的元素。

如何选择哈希函数个数和布隆过滤器长度

很显然,过小的布隆过滤器很快所有的 bit 位均为 1,那么查询任何值都会返回“可能存在”,起不到过滤的目的了。布隆过滤器的长度会直接影响误报率,布隆过滤器越长其误报率越小。

另外,哈希函数的个数也需要权衡,个数越多则布隆过滤器 bit 位置位 1 的速度越快,且布隆过滤器的效率越低;但是如果太少的话,那我们的误报率会变高。

redis分布式锁解决超卖_为什么大家都在抢茅台酒_https://bianchenghao6.com/blog_Python_第8张

如何选择适合业务的 k 和 m 值呢,这里直接贴一个公式:

redis分布式锁解决超卖_为什么大家都在抢茅台酒_https://bianchenghao6.com/blog_Python_第9张

布隆过滤器实现

第一种方式:Guava

1、引入Guava pom配置

<dependency>
  <groupId>com.google.guava</groupId>
  <artifactId>guava</artifactId>
  <version>29.0-jre</version>
</dependency>

2、代码实现

public class BloomFilterTest {

   @Test
    public  void test1() {
        BloomFilter<Integer> bloomFilter = BloomFilter.create(Funnels.integerFunnel(), size, fpp);
        // 插入10万样本数据
        for (int i = 0; i < size; i++) {
            bloomFilter.put(i);
        }

        // 用另外十万测试数据,测试误判率
        int count = 0;
        for (int i = capacity; i < size + 100000; i++) {
            if (bloomFilter.mightContain(i)) {
                count++;
                System.out.println(i + "误判了");
            }
        }
        System.out.println("总共的误判数:" + count);
    }
	}
}

运行结果:

redis分布式锁解决超卖_为什么大家都在抢茅台酒_https://bianchenghao6.com/blog_Python_第10张

10万数据里有947个误判,约等于0.01%,也就是代码里设置的误判率:fpp = 0.01

代码分析:

核心BloomFilter.create方法:

@VisibleForTesting
  static <T> BloomFilter<T> create( Funnel<? super T> funnel, long expectedInsertions, double fpp, Strategy strategy) {
    。。。。
}

这里有四个参数:

  • funnel:数据类型(通常是调用Funnels工具类中的)
  • expectedInsertions:指望插入的值的个数
  • fpp:误判率(默认值为0.03)
  • strategy:哈希算法

咱们重点讲一下fpp参数

fpp误判率

情景一:fpp = 0.01

  • 误判个数:947 占内存大小:9585058位数

情景二:fpp = 0.03(默认参数)

  • 误判个数:3033 占内存大小:7298440位数
总结
  • 误判率能够经过fpp参数进行调节
  • fpp越小,须要的内存空间就越大:0.01须要900多万位数,0.03须要700多万位数。
  • fpp越小,集合添加数据时,就须要更多的hash函数运算更多的hash值,去存储到对应的数组下标里。(忘了去看上面的布隆过滤存入数据的过程)

第二种方式:Redisson

上面使用Guava实现的布隆过滤器是把数据放在了本地内存中。分布式的场景中就不合适了,没法共享内存

还能够用Redis来实现布隆过滤器,这里使用Redis封装好的客户端工具Redisson

pom配置:

<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson-spring-boot-starter</artifactId>
  <version>3.13.4</version>
</dependency>

java代码:

public class RedissonBloomFilter {

  public static void main(String[] args) {
    Config config = new Config();
    config.useSingleServer().setAddress("redis://127.0.0.1:6379");
    config.useSingleServer().setPassword("1234");
    //构造Redisson
    RedissonClient redisson = Redisson.create(config);

    RBloomFilter<String> bloomFilter = redisson.getBloomFilter("phoneList");
    //初始化布隆过滤器:预计元素为100000000L,偏差率为3%
    bloomFilter.tryInit(100000000L,0.03);
    //将号码10086插入到布隆过滤器中
    bloomFilter.add("10086");

    //判断下面号码是否在布隆过滤器中
    //输出false
    System.out.println(bloomFilter.contains("123456"));
    //输出true
    System.out.println(bloomFilter.contains("10086"));
  }
}

Twemproxy

1.1.1 简介

redis分布式锁解决超卖_为什么大家都在抢茅台酒_https://bianchenghao6.com/blog_Python_第11张

cluster是redis官方提供的集群方案,功能确实强大(在线扩容,缩容等等),除了官方的cluster,业界有很多三方的缓存代理中间件,比如: predixy, codis, redis-cerberus,squirrel ,cellar act。Twemproxy是使用最广泛、同时也是redis官方所认可的实现方案。

 Twemproxy(又称为nutcracker)由Twitter开源。是一个轻量级的Redis和Memcached代理,主要用来减少对后端缓存服务器的连接数。 

特点:

memcached时代可以称王称霸,但随着redis自身发展,尤其高版本cluster出现,已逐渐被弱化

优点:

简单可靠,具备生产级别应用能力

减少了redis连接数,降低redis连接成本,cluster的所有节点之间都需要互相建立连接。

除了redis,Twemproxy可以对Memcached 协议做代理,在缓存界是个通用性的解决方案。

缺点:

和cluster相比,性能有一定的损失(twitter测试约20%)

自身也会成为一个单点,所以,做双活很重要!

它只是一个代理转发,底层的主从切换等依然靠redis自身的主从和哨兵或cluster。这一点上cluster已经完虐它

redis分布式锁解决超卖_为什么大家都在抢茅台酒_https://bianchenghao6.com/blog_Python_第12张

1.1.2 下载与部署

yum install -y autoconf automake libtool
yum remove -y autoconf 

export twemproxy_path=/opt/redis/latest/twemproxy/

mkdir -p $twemproxy_path

cd $twemproxy_path
wget ftp://ftp.gnu.org/gnu/autoconf/autoconf-2.69.tar.gz
tar -zxvf autoconf-2.69.tar.gz
cd autoconf-2.69 
.configure --prefix=/usr
make && make install

cd $twemproxy_path
wget http://ftp.gnu.org/gnu/automake/automake-1.14.tar.gz
tar -zxvf automake-1.14.tar.gz
cd automake-1.14
.bootstrap.sh
.configure --prefix=/usr
make && make install

cd $twemproxy_path
wget http://ftp.gnu.org/gnu/libtool/libtool-2.4.2.tar.gz
tar -zxvf libtool-2.4.2.tar.gz
cd libtool-2.4.2
.configure --prefix=/usr
make && make install

cd $twemproxy_path
wget https://github.com/twitter/twemproxy/archive/v0.4.1.tar.gz
tar -zxvf v0.4.1.tar.gz
cd twemproxy-0.4.1
.configure --prefix=/usr
make && make install

#编译完,启动文件在src目录中。

1.1.3 配置与启动

1)先准备好两台redis

#将redis.conf拷贝一份,注意以下配置项
#后台启动
daemonize yes
#bind这一行一定要注释掉!允许外部ip连接,否则将来用redis-cli连接操作命令的时候会报一个错误:
#Error: Connection reset by peer
#bind 127.0.0.1 -::1

#启动两个实例,在8081和8082端口
[root@iZ8vb3a9qxofwannyywl6zZ twemproxy]# pwd
/opt/redis/latest/twemproxy
[root@iZ8vb3a9qxofwannyywl6zZ twemproxy]# ..src/redis-server redis.conf --port 8081
[root@iZ8vb3a9qxofwannyywl6zZ twemproxy]# ..src/redis-server redis.conf --port 8082


#确认服务启动成功
[root@iZ8vb3a9qxofwannyywl6zZ twemproxy]# ps aux | grep redis-server
root     18209  0.1  0.0 162492  2680 ?        Ssl  13:35   0:00 ..src/redis-server 127.0.0.1:8081
root     18217  0.0  0.0 162492  2688 ?        Ssl  13:35   0:00 ..src/redis-server 127.0.0.1:8082

2)配置twemproxy

#将yml文件拷贝一份,test.yml,并修改内容为自己的redis地址
[root@iZ8vb3a9qxofwannyywl6zZ conf]# pwd
/opt/redis/latest/twemproxy/twemproxy-0.4.1/conf
[root@iZ8vb3a9qxofwannyywl6zZ conf]# cp nutcracker.yml test.yml
#test.yml文件说明
alpha:      #标记,如果多组,就alpha,beta……往上加,参考 nutcracker.yml 样本
  listen: 127.0.0.1:22121  # 这组集群暴露的端口,将来连这个
  hash: fnv1a_64   #hash散列算法
  distribution: modula  #分片算法,这里用取模方式,一共三种,后面详细介绍
  auto_eject_hosts: true  #自动摘除故障节点
  redis: true        #是不是redis,false则表示memcache
  server_retry_timeout: 2000     #每隔2秒判断故障节点是否正常,如果正常则放回一致性hash环
  server_failure_limit: 3   #多少次无响应,就从一致性hash环中摘除
  #redis实例列表,一定要加别名,否则宕机后更换机器,分片就不一样了
  #加了别名后,将用别名做分片节点名,否则用ip加端口权重,一旦ip变更会重新迁移
  servers:
   - 127.0.0.1:8081:1 redis-1
   - 127.0.0.1:8082:1 redis-2
   
#启动:-d后台启动,-c指定启动文件
[root@iZ8vb3a9qxofwannyywl6zZ conf]# ..src/nutcracker -d -c test.yml
[root@iZ8vb3a9qxofwannyywl6zZ conf]# ps aux | grep nutcracker
root     14601  0.0  0.0   1136   248 pts/0    Ssl+ 09:25   0:00 /usr/sbin/nutcracker -c /opt/nutcracker.yml

3)连接与验证

#连接非常的简单,用redis-cli和直连redis一样

#首先在twemproxy上设置多个key,均成功
[root@iZ8vb3a9qxofwannyywl6zZ ~]# redis-cli -p 22121
127.0.0.1:22121> set a a
OK
127.0.0.1:22121> set b b
OK
127.0.0.1:22121> set c c
OK
127.0.0.1:22121> set d d
OK

#先连redis-1 , 取到ac, bd取不到
[root@iZ8vb3a9qxofwannyywl6zZ ~]# redis-cli -p 8081
127.0.0.1:8081> get a
"a"
127.0.0.1:8081> get b
(nil)
127.0.0.1:8081> get c
"c"
127.0.0.1:8081> get d
(nil)
127.0.0.1:8081>


#再连redis-2 , 发现ac不存在,bd在这里,验证集群分片成功!
[root@iZ8vb3a9qxofwannyywl6zZ ~]# redis-cli -p 8082
127.0.0.1:8082> get a
(nil)
127.0.0.1:8082> get b
"b"
127.0.0.1:8082> get c
(nil)
127.0.0.1:8082> get d
"d"

1.1.4 分片策略

1)读写原理

写入时,twemproxy将多个对应的key计算hash值路由到对应的后端redis机器。

而要在redis集群中查询对应的key/value时,twemproxy同样计算hash值从对应的后端redis收集过来,然后拼接起来返回给用户。

2)策略

后台的redis或memcached集群可以通过以下几种算法进行key/value的分配(distribution属性):

  • ketama: 一个实现一致性hash算法的开源库
  • modula: 通过取模的hash算法来选择一个节点
  • random:随机选择一个节点

经典面试题

Redis6.x 之后为何引入了多线程?

答:

Redis6.0 引入多线程主要是为了提高网络 IO 读写性能(Redis 的瓶颈并不在 CPU,而在内存和网络。)

虽然,Redis6.0 引入了多线程,但是 Redis 的多线程只是在网络数据的读写这类耗时操作上使用了, 执行命令仍然是单线程顺序执行。因此,你也不需要担心线程安全问题。

Redis6.x多线程的实现机制?

redis分布式锁解决超卖_为什么大家都在抢茅台酒_https://bianchenghao6.com/blog_Python_第13张

流程简述如下

  • 主线程负责接收建立连接请求,获取 Socket 放入全局等待读处理队列。
  • 主线程处理完读事件之后,通过 RR(Round Robin)将这些连接分配给这些 IO 线程。
  • 主线程阻塞等待 IO 线程读取 Socket 完毕。
  • 主线程通过单线程的方式执行请求命令,请求数据读取并解析完成,但并不执行。
  • 主线程阻塞等待 IO 线程将数据回写 Socket 完毕。
  • 解除绑定,清空等待队列。

redis分布式锁解决超卖_为什么大家都在抢茅台酒_https://bianchenghao6.com/blog_Python_第14张

该设计有如下特点:

1、IO 线程要么同时在读 socket,要么同时在写,不会同时读或写

2、IO 线程只负责读写 socket 解析命令,不负责命令处理

Redis6.x默认是否开启了多线程?

Redis6.0 的多线程默认是禁用的,只使用主线程

如需开启需要修改 redis 配置文件 redis.conf

io-threads-do-reads yes

开启多线程后,还需要设置线程数,否则是不生效的。同样需要修改 redis 配置文件 redis.conf :

 io-threads 4 #官网建议4核的机器建议设置为2或3个线程,8核的建议设置为6个线程

缓存穿透

  • 缓存穿透: key中对应的缓存数据不存在,导致去请求数据库,造成数据库的压力倍增的情况
  • 缓存击穿: redis过期后的一瞬间,有大量用户请求同一个缓存数据,导致这些请求都去请求数据库,造成数据库压力倍增的情,针对一个key而言
  • 缓存雪崩: 缓存服务器宕机或者大量缓存集中某个时间段失效,导致请求全部去到数据库,造成数据库压力倍增的情况,这个是针对多个key而言

缓存穿透

概念:缓存穿透是指缓存和数据库中都没有的数据,而用户不断发起请求,造成数据库的压力倍增的情况

例:发起为id值为 -1 的数据或 id 为特别大不存在的数据

redis分布式锁解决超卖_为什么大家都在抢茅台酒_https://bianchenghao6.com/blog_Python_第15张

解决方案:

(1)接口层增加校验,比如用户鉴权校验,参数做校验 比如:id 做基础校验,id <=0的直接拦截

(2)对于像ID为负数的非法请求直接过滤掉,采用布隆过滤器(Bloom Filter)

(3)针对在数据库中找不到记录的,我们仍然将该空数据存入缓存中,当然一般会设置一个较短的过期时间

缓存雪崩

概念:缓存服务器宕机或者大量缓存集中某个时间段失效,导致请求全部去到数据库,造成数据库压力倍增的情况,这个是针对多个key而言

redis分布式锁解决超卖_为什么大家都在抢茅台酒_https://bianchenghao6.com/blog_Python_第16张

解决:

(1)实现缓存高可用,通过redis cluster将热点数据均匀分布在不同的Redis库中也能避免全部失效的问题

(2)批量往Redis存数据的时候,把每个Key的失效时间都加个随机值

 setRedis(Key,value,time + Math.random() * 10000);

缓存击穿

概念:redis过期后的一瞬间,有大量用户请求同一个缓存数据,导致这些请求都去请求数据库,造成数据库压力倍增的情,针对一个key而言

缓存击穿与缓存雪崩的区别是这里针对的是某一热门key缓存,而雪崩针对的是大量缓存的集中失效

redis分布式锁解决超卖_为什么大家都在抢茅台酒_https://bianchenghao6.com/blog_Python_第17张

解决方案

 ● 设置热点数据永远不过期。

 ● 使用互斥锁(mutex key)

好了,今天就先码到这里了,接着去搬砖了,茅台没抢着,倒是学习了不少技术,也是收获颇丰

希望大家不要吝啬你的小手,给狂野君点个赞,你的认可,是我最大的动力

以后,会持续为大家送上干货的,欢迎大家关注,以免迷路

以下是往期干货,欢迎收藏

往期干货:

  • 怎样才能快速成为一名架构师?
  • 如何从Java工程师成长为架构师?
  • 六种常用事务解决方案,你方唱罢,我登场(没有最好只有更好)
  • 超详细教程,一文入门Istio架构原理及实战应用
  • 【图解源码】Zookeeper3.7源码剖析,Session的管理机制,Leader选举投票规则,集群数据同步流程
  • 【图解源码】Zookeeper3.7源码分析,包含服务启动流程源、网络通信、RequestProcessor处理请求
  • 【知其然,知其所以然】配置中心 Apollo源码剖析
  • 【推荐】我认为这是最完整的Apollo教程从入门到精通
  • 探针技术-JavaAgent 和字节码增强技术-Byte Buddy
  • 100003字,带你解密 双11、618电商大促场景下的系统架构体系
  • 【开悟篇】Java多线程之JUC从入门到精通
  • 12437字,带你深入探究RPC通讯原理
  • JVM调优实战演练,妈妈再也不同担心我的性能优化了
  • 13651个字,给你解释清楚 JVM对象销毁
  • 搞不懂JVM的类加载机制,JVM性能优化从何谈起?
  • 4859字,609行,一次讲清楚JVM运行数据区
  • 让实习生搭个Redis集群,差点把我”搭“进去~~~
  • 我用Redis分布式锁,抢了瓶茅台,然后GG了~~
  • 新来的,你说下Redis的持久化机制,哪一种能解决我们遇到的这个业务问题?

本文由传智教育博学谷 - 狂野架构师教研团队发布 如果本文对您有帮助,欢迎关注和点赞;如果您有任何建议也可留言评论或私信,您的支持是我坚持创作的动力 转载请注明出处!

发表回复