布隆过滤器的实现

Posted by hang.li on November 20, 2022

BloomFilter

  • 请求到达正式业务之前, 判断该请求是否有效
  • 维护一个大的bit数组, 把有效key的一次或多次的hash索引位置标志已存在. 当有请求进来时, 计算进来的key的hash索引, 判断每一个索引的值是否为true.
  • 常用于处理==缓存穿透==问题

    选用redis实现的好处

  • 部署高可用节点时, 减少每一个节点的开销
  • redis可以持久化, 避免因服务器宕机导致需要重新灌数据的开销
  • redis中可实现的数组长度更长,大数据量下,hash索引可散列的范围更大

    实现

    索引个数,bit数组长度

    1. 使用google中根据原有数据长度计算索引个数, bit数组长度的方法

      hash算法

    2. 使用一个key衍生出多个key, 根据对bit长度求余的hash算法

      initBloomFilter方法

    3. 根据数据的长度, 计算hash个数,和bit数组长度, 将以上信息和容错率信息保存到redis中(这样可以实现多个过滤器)
    4. 使用通道将信息,所有数据压到redis中,使用通道可以提高效率,减少网络开销(测试时初始数据,可以减少75%时间)

      代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;

import java.util.List;

@Component
public class BloomFilterUtil {

    /**
     * 布隆过滤器信息key的后缀
     */
    private final static String BLOOM_FILTER_INFO_SUFFIX = ":info";

    /**
     * 布隆过滤器信息容错率的key
     */
    private final static String FPP = "fpp";

    /**
     * 布隆过滤器信息bit数组长度的key
     */
    private final static String NUM_BITS = "numBits";
    /**
     * 布隆过滤器信息hash次数的key
     */
    private final static String NUM_HASH_FUNCTIONS = "numHashFunctions";

    /**
     * 布隆过滤器数组key的后缀
     */
    private final static String BLOOM_FILTER_BIT_ARRAY_SUFFIX = ":bitarray";

    /**
     * 注入redistemplate
     */
    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    /**
     * 添加一个值
     * @param bloomFilterName bloomFilter名称
     * @param key 添加的key
     * @throws Exception 没有找到当前过滤器
     */
    public void put(String bloomFilterName, String key) throws Exception {
        long[] hashIndexArray = hash(bloomFilterName, key);

        StringBuilder sbName = new StringBuilder(bloomFilterName);
        sbName.append(BLOOM_FILTER_BIT_ARRAY_SUFFIX);

        for (long hashIndex : hashIndexArray) {
            redisTemplate.opsForValue().setBit(sbName.toString().intern(), hashIndex, true);
        }
    }

    /**
     * 判断当前key是否可能存在
     * @param bloomFilterName bloomFilter名称
     * @param key 判断的key
     * @return 是否可能存在
     * @throws Exception 没有找到bloomFilter
     */
    public boolean mightContain(String bloomFilterName, String key) throws Exception {
        boolean keyIsContain = false;
        long[] hashIndexArray = hash(bloomFilterName, key);

        StringBuilder sbName = new StringBuilder(bloomFilterName);
        sbName.append(BLOOM_FILTER_BIT_ARRAY_SUFFIX);

        for (long hashIndex : hashIndexArray) {
            if (keyIsContain = redisTemplate.opsForValue().getBit(sbName.toString().intern(), hashIndex)) {
                return keyIsContain;
            }
        }
        return keyIsContain;
    }


    /**
     * 计算key的hash
     * 根据名称拿到bloomFilter信息
     * @param key
     * @return
     */
    private long[] hash(String bloomFilterName, String key) throws Exception {
        long numBits;
        int numHashFunctions;

        StringBuilder sbName = new StringBuilder(bloomFilterName);
        sbName.append(BLOOM_FILTER_INFO_SUFFIX);

        Object numBitsString = redisTemplate.opsForHash().get(sbName.toString().intern(), NUM_BITS);
        Object numHashFunctionsString = redisTemplate.opsForHash().get(sbName.toString().intern(), NUM_HASH_FUNCTIONS);
        if (ObjectUtils.isEmpty(numBitsString) || ObjectUtils.isEmpty(numHashFunctionsString)) {
            // redis中找不到当前BloomFilter信息
            throw new Exception();
        } else {
            numBits = Integer.valueOf(numBitsString.toString());
            numHashFunctions = Integer.valueOf(numHashFunctionsString.toString());
        }
        return hash(key, numHashFunctions, numBits);
    }

    /**
     * 计算key的hash(动态key + 求余法)
     * @return
     */
    private long[] hash(String key, int numHashFunctions, long numBits) {
        long[] hashIndexArray = new long[numHashFunctions];
        for (int i = 0, j = hashIndexArray.length; i < j; i++) {
            hashIndexArray[i] = (key.hashCode() + numHashFunctions) % numBits;
        }
        return hashIndexArray;
    }

    /**
     * 计算bit数组长度
     * 来自google
     *
     * @param expectedInsertions 已有数据长度
     * @param fpp                容错率
     * @return 每个hash索引个数
     */
    static long optimalNumOfBits(long expectedInsertions, double fpp) {
        if (fpp == 0.0D) {
            fpp = 4.9E-324D;
        }

        return (long) ((double) (-expectedInsertions) * Math.log(fpp) / (Math.log(2.0D) * Math.log(2.0D)));
    }

    /**
     * 计算每个key的index个数
     * 来自google
     *
     * @param expectedInsertions 已有数据长度
     * @param numBits            bit数组长度
     * @return 每个key的index个数
     */
    static int optimalNumOfHashFunctions(long expectedInsertions, long numBits) {
        return Math.max(1, (int) Math.round((double) numBits / (double) expectedInsertions * Math.log(2.0D)));
    }

    /**
     * 默认容错率0.03
     */
    public boolean initBloomFilter(String bloomFilterName, List<String> dataList) {
        return initBloomFilter(bloomFilterName, dataList, 0.03d);
    }

    /**
     * 初始化数据
     * @param bloomFilterName 过滤器名称
     * @param dataList 数据id列表
     * @param fpp 容错率
     * @return 成功失败
     */
    public boolean initBloomFilter(String bloomFilterName, List<String> dataList, double fpp) {
        int numHashFunctions;
        long expectedInsertions, numBits;

        expectedInsertions = dataList.size();
        numBits = optimalNumOfBits(expectedInsertions, fpp);
        numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits);

        // 1.executePipelined 重写 入参 RedisCallback 的doInRedis方法
        List<Object> resultList = redisTemplate.executePipelined((RedisConnection connection) -> {
            // 2.connection 打开管道
            connection.openPipeline();

            // 3.1 保存当前bloomFilter信息
            connection.hSet((bloomFilterName + BLOOM_FILTER_INFO_SUFFIX).getBytes(), FPP.getBytes(), String.valueOf(fpp).getBytes());
            connection.hSet((bloomFilterName + BLOOM_FILTER_INFO_SUFFIX).getBytes(), NUM_BITS.getBytes(), String.valueOf(numBits).getBytes());
            connection.hSet((bloomFilterName + BLOOM_FILTER_INFO_SUFFIX).getBytes(), NUM_HASH_FUNCTIONS.getBytes(), String.valueOf(numHashFunctions).getBytes());

            //  3.connection 给本次管道内添加 要一次性执行的多条命令
            for (String key : dataList) {
                long[] hashIndexArray = hash(key, numHashFunctions, numBits);
                for (long hashIndex : hashIndexArray) {
                    connection.setBit((bloomFilterName + BLOOM_FILTER_BIT_ARRAY_SUFFIX).getBytes(), hashIndex, true);
                }
            }

            // 4.关闭管道 不需要close 否则拿不到返回值
            connection.closePipeline();
            // 这里一定要返回null,最终pipeline的执行结果,才会返回给最外层
            return null;
        });
        return true;
    }
}