亿级数据去重之布隆过滤器

6,657 阅读8分钟

标题很唬人吧,标题党就是本喵了哈哈哈哈哈哈哈。好了回归正题,在我们进行一些爬虫爬取数据的时候,如果保证去重呢,今天和大家聊一聊使用布隆过滤器去重。

首先什么是布隆过滤器呢,让我们依旧来看看百度百科。

  • 布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都比一般的算法要好的多,缺点是有一定的误识别率和删除困难。

看傻了吧,其实我刚开始也给直接干懵了。实际上上边所说的二进制向量就是一个位数组(比如redis中就提供了一个2^32长度的位数组,大小是512MB),而一系列随机映射函数实际上就是多个哈希函数。那么先和大家来扯扯哈希吧

hash函数

哈希函数一般也叫散列函数,就是可以把任意长度的输入,通过散列算法,变成固定长度数输出,输出的结果就是散列值。常见的哈希函数有md5、sha1、sha256等,也可以将哈希函数理解成一种加密手段,但是哈希函数是不可逆的,就是你不可以通过输出域得到散列值逆向推出输入域的值。(因为它里边使用很厉害的科学计算导致它不可逆。)然后说说哈希的几个特性吧

  • 输入域无限,输出域有限(就是你输入域放一句helloword和放一个10G的小电影,输出域都输出的是一段固定长度的输出。输入域哪怕只有一个字符的不同,得到的散列值都是千差万别毫无规律的)
  • 相同的输入一定得到相同的输出(这个就没得说了吧,你用同一个哈希函数放两次同样的10G小电影得到的散列值是一样的)
  • 不同的输入也可能得到相同的输出结果,这种现象被称为哈希碰撞(因为输入域无限输出域有限,必然会导致有可能你输入的两个不同的值,输出域却得到了相同的散列值)
  • 输出域的每个结果在整个输出域中是均匀分布的,也成为哈希函数的离散性(一个哈希函数的好一般就体现在它的离散性上,输出结果分布的越平均,离散性就越好)

哈希函数的用途非常非常多,举个栗子

看这个,当你下载完英雄联盟的时候,如何确定你完整的下载成功了?

这个时候官方会把这个文件丢到哈希函数(md5)中加密,得到如上图所示的散列值,然后当你下载完成的时候,把你下载好的文件用同样的哈希函数加密,如果得到的值和官方所提供的值一样的话,这时候你绝对是完整的下载了这个文件。这种文件校验使用哈希函数比较常见。

所以我这样说你们应该听得懂吧(小声bb....)

嗯!既然大家都懂了那么接下来进入主题说说布隆过滤器 (来自本喵不要脸的自问自答哈哈哈哈哈哈~)

布隆过滤器

首先来说说布隆过滤器的流程

  • 首先将数据通过k个哈希函数进行哈希得到k个散列值(h1,h2,...hk)
  • 确定一个位数组的长度假设为m,将这k个散列值分别对m取模运算,得到k个值(v1,v2....vk)确保每个值在位数组长度范围之内[0,m-1]
  • 如果需要保存,在维数组中,将k个由上一步取模得到的值作为下标值,找到这个k个下标所对应的值改为1即可
  • 如果需要查询这个数据是否存在,看这k个下标所对应的值是否全部为1,如果全部为1表示存在,否则不存在

我猜你看完之后肯定有一些懵逼,这个时候你把位数组想象成一张白纸

然后假设当前还没有添加过数据,你通过了4个哈希函数并且取模得到了4个值(这4个值是一定在这个 位数组长度范围之内的,因为进行了取模操作),这4个值就在这张白纸的某些位置,把这4个位置涂黑

所以当你添加了好多数据的时候,这张白纸会有好多的小黑点(我就不画了,画的丑,还累。。。。)

如何判断重复呢,当你通过多个哈希计算并取模得出来的值所对应的位置在这张纸上都是黑的,这时候就认为这条数据是重复的。只要有一个位置为白,那么就不重复,继续添加,把位置为白的涂黑,已经是黑色的位置就不用管了。

所以判断数据重复只需要看看位数组中对应的下标位置是否都为1,都为1,就重复,这条数据就可以不要了。

我估摸着你们 都看明白了吧......emmmm不管你们明不明白,我接着说!

当我们采用布隆过滤器进行去重的时候,是一定存在失误率的(因为哈希碰撞的特性),但是能够保证重复的数据一定能判断出来!(这个意思就是会有很小的概率当你数据没有重复的时候,我就说你重复了!就不让你添加数据!理直气壮!!!)

并且布隆过滤器还有个特点,就是它只关注数据量的多少,不考虑单个数据样本的大小(这个由哈希的性质1,输入域无限,输出域有限决定的),所以布隆过滤器对于大数据大样本的海量数据去重特别有效。

这个时候就会发现这个失误率和位数组的长度m、和哈希函数的个数k息息相关。这个时候如何设计布隆过滤器呢?不用担心!已经有科学家帮我们研究出了公式,我们只需要根据自己样本数量和预计失误率计算出你需要的东西啦!就是这么得劲!

通过样本数据个数n,和失误率p,可以计算出m(位数组的长度)

根据m、n,计算哈希函数的个数k

通过m、k、n的值,计算出实际真实的失误率

所得到的数组长度和函数个数都向上取整,比如计算出来的k是8.23个 ,那么就直接取9个哈希函数,这样失误率也会比预期的有所下降

所以说,布隆过滤器可以通过很小的代价来进行大量数据的去重,大概就这样!

扯蛋扯了这么多,也该上代码了哈!

代码(Python)

位数组,我这里就直接使用redis中提供的bit了哈,首先需要多个不同的哈希函数(这个是在麻省理工上找的哈)

#
#**************************************************************************
#*                                                                        *
#*          General Purpose Hash Function Algorithms Library              *
#*                                                                        *
#* Author: Arash Partow - 2002                                            *
#* URL: http://www.partow.net                                             *
#* URL: http://www.partow.net/programming/hashfunctions/index.html        *
#*                                                                        *
#* Copyright notice:                                                      *
#* Free use of the General Purpose Hash Function Algorithms Library is    *
#* permitted under the guidelines and in accordance with the MIT License. *
#* http://www.opensource.org/licenses/MIT                                 *
#*                                                                        *
#**************************************************************************



def rs_hash(key):
    a = 378551
    b = 63689
    hash_value = 0
    for i in range(len(key)):
        hash_value = hash_value * a + ord(key[i])
        a = a * b
    return hash_value


def js_hash(key):
    hash_value = 1315423911
    for i in range(len(key)):
        hash_value ^= ((hash_value << 5) + ord(key[i]) + (hash_value >> 2))
    return hash_value


def pjw_hash(key):
    bits_in_unsigned_int = 4 * 8
    three_quarters = (bits_in_unsigned_int * 3) / 4
    one_eighth = bits_in_unsigned_int / 8
    high_bits = 0xFFFFFFFF << int(bits_in_unsigned_int - one_eighth)
    hash_value = 0
    test = 0

    for i in range(len(key)):
        hash_value = (hash_value << int(one_eighth)) + ord(key[i])
        test = hash_value & high_bits
    if test != 0:
        hash_value = ((hash_value ^ (test >> int(three_quarters))) & (~high_bits))
    return hash_value & 0x7FFFFFFF


def elf_hash(key):
    hash_value = 0
    for i in range(len(key)):
        hash_value = (hash_value << 4) + ord(key[i])
        x = hash_value & 0xF0000000
        if x != 0:
            hash_value ^= (x >> 24)
        hash_value &= ~x
    return hash_value


def bkdr_hash(key):
    seed = 131  # 31 131 1313 13131 131313 etc..
    hash_value = 0
    for i in range(len(key)):
        hash_value = (hash_value * seed) + ord(key[i])
    return hash_value


def sdbm_hash(key):
    hash_value = 0
    for i in range(len(key)):
        hash_value = ord(key[i]) + (hash_value << 6) + (hash_value << 16) - hash_value;
    return hash_value


def djb_hash(key):
    hash_value = 5381
    for i in range(len(key)):
        hash_value = ((hash_value << 5) + hash_value) + ord(key[i])
    return hash_value


def dek_hash(key):
    hash_value = len(key);
    for i in range(len(key)):
        hash_value = ((hash_value << 5) ^ (hash_value >> 27)) ^ ord(key[i])
    return hash_value


def bp_hash(key):
    hash_value = 0
    for i in range(len(key)):
        hash_value = hash_value << 7 ^ ord(key[i])
    return hash_value


def fnv_hash(key):
    fnv_prime = 0x811C9DC5
    hash_value = 0
    for i in range(len(key)):
        hash_value *= fnv_prime
        hash_value ^= ord(key[i])
    return hash_value


def ap_hash(key):
    hash_value = 0xAAAAAAAA
    for i in range(len(key)):
        if (i & 1) == 0:
            hash_value ^= ((hash_value << 7) ^ ord(key[i]) * (hash_value >> 3))
        else:
            hash_value ^= (~((hash_value << 11) + ord(key[i]) ^ (hash_value >> 5)))
    return hash_value

接下来才是bloom的实现了

from BloomFilter.GeneralHashFunctions import *
import redis
import pickle
import base64


class BloomFilterRedis(object):
    # 哈希函数列表
    hash_list = [rs_hash, js_hash, pjw_hash, elf_hash, bkdr_hash, sdbm_hash, djb_hash, dek_hash]

    def __init__(self, key, host='127.0.0.1', port=6379):
        self.key = key
        self.redis = redis.StrictRedis(host=host, port=port, charset='utf-8')

    def random_generator(self, hash_value):
        '''
        将hash函数得出的函数值映射到[0, 2^32-1]区间内
        '''
        return hash_value % (1 << 32)

    def do_filter(self, item, save=True):
        """
        过滤,判断是否存在
        :param item:
        :return:
        """
        flag = True  # 默认存在
        for hash in self.hash_list:
            # 计算哈希值
            hash_value = hash(item)
            # 获取映射到位数组的下标值
            index_value = self.random_generator(hash_value)
            # 判断指定位置标记是否为 0
            if self.redis.getbit(self.key, index_value) == 0:
                # 如果不存在需要保存,则写入
                if save:
                    self.redis.setbit(self.key, index_value, 1)
                flag = False
        return flag


class Stu(object):
    def __init__(self, name, age):
        self.name = name
        self.age = age


if __name__ == '__main__':
    bloom = BloomFilterRedis("bloom_obj")
	# 对对象进行去重,必须实现序列化
    data = pickle.dumps(Stu("xiaohong", 18))
    data1 = base64.b64encode(data).decode()
    ret = bloom.do_filter(data1)
    print(ret)

    data = pickle.dumps(Stu("xiaohong", 18))
    data1 = base64.b64encode(data).decode()
    ret = bloom.do_filter(data1)
    print(ret)

    bloom = BloomFilterRedis("bloom_url")
    ret = bloom.do_filter("http://www.baidu.com")
    print(ret)
    ret = bloom.do_filter("http://www.baidu.com")
    print(ret)

好了 ,代码也撸上来了,欢迎各位小伙伴一起提意见哈哈哈哈哈哈,那么今天就到这里,我先撤!!!