redis个人源码分析1----hyperloglog(golang实现)

4,155 阅读12分钟

1 基数统计

HLL算法用来进行基数统计。

什么是基数统计:例如给你一个数组[1,2,2,3,3,5,5] ,这个数组的基数是4(一共有4个不重复的元素)。 好了现在知道什么是基数统计了。
对于这个问题,最容易想到的办法当然是使用bitmap来实现,每个bit位表示一个数字是否出现过,比如要表示上面这串数字使用下面的bitmap来表示:

011101

优点:相对省空间,且合并操作简单,比如上面的应用场景1, 如果想统计某2天有多少个ip地址访问,只需要把两天的bitmap结构拿出来做“或”操作即可。
缺点: 空间复杂度还是太大,1个byte只有8个bit,也就是1个byte只能唯一表示8个IP地址(8个不同的客户)那么:
1k才能表示 1024 * 8 = 8192
1M才能表示 1024 * 1024 * 8 = 8388608 (约800多万)
如果商品链接很多,还需要统计每天的数据等等,每个商品每天的链接需要1M以上的内存,太大,内存扛不住。

相反: 使用HLL ,对于精度要求不是特别高的时候,只需要12k的内存,很神奇!!!

2 HLL算法的原理

举一个我们最熟悉的抛硬币例子,出现正反面的概率都是1/2,一直抛硬币直到出现正面,记录下投掷次数k,将这种抛硬币多次直到出现正面的过程记为一次伯努利过程,对于n次伯努利过程,我们会得到n个出现正面的投掷次数值 $ k_1, k_2 ... k_n $, 其中这里的最大值是k_max
看到这里可能有点懵逼,我们把问题缕一缕,联系我们的问题,在做基数统计的时候,其实是这么个问题:

现在的目标是来了一组投掷次数的数据($ k_1, k_2 ... k_n $),想要预测出,一共做了多少次伯努利过程(假设为n),才能得到这样的数据的啊?

根据一顿数学推导---直接给结论: 用 $2^{k_ max}$ 来作为n的估计值。

image

例如:做了一组伯努利过程,发现连续出现5次反面后,出现了1次正面,请问出现这种情况,大概需要做多少次伯努利过程呢? 答案是: $ 2^6 = 64 $ 次实验。

这个其实就是我们进行HLL基数统计的基础。

回到基数统计的问题,我们需要统计一组数据中不重复元素的个数,集合中每个元素的经过hash函数后可以表示成0和1构成的二进制数串,一个二进制串可以类比为一次抛硬币实验,1是抛到正面,0是反面。二进制串中从低位开始第一个1出现的位置可以理解为抛硬币试验中第一次出现正面的抛掷次数kk,那么基于上面的结论,我们可以通过多次抛硬币实验的最大抛到正面的次数来预估总共进行了多少次实验,同样可以可以通过第一个1出现位置的最大值$ k_{max}k ​max ​​ 来预估总共有多少个不同的数字(整体基数)。

所以HLL的基本思想是利用集合中数字的比特串第一个1出现位置的最大值来预估整体基数,但是这种预估方法存在较大误差,为了改善误差情况,HLL中引入分桶平均的概念。

例如 同样举抛硬币的例子,如果只有一组抛硬币实验,运气较好,第一次实验过程就抛了10次才第一次抛到正面,显然根据公式推导得到的实验次数的估计误差较大;如果100个组同时进行抛硬币实验,同时运气这么好的概率就很低了,每组分别进行多次抛硬币实验,并上报各自实验过程中抛到正面的抛掷次数的最大值,就能根据100组的平均值预估整体的实验次数了。

redis里面就是使用了分桶的原理,具体的实现原理如下: 首先来了一个redis object(字符串), 经过hash后,生成了一个8字节的hash值。

graph LR
A[redis object]-->|hash function| B(64 bit)

然后将 64个bit位的前14位作为桶的下标,这样桶大小就是$ 2^{14} = 16348 $
后面50个bit位,相当于是随机的那个伯努利过程,我们找到1第一次出现的位置count,如果当前count比桶里面的oldcount大, 则更新oldcount=count。

1.2 基数统计的应用场景

    1. 统计一个网站一天有多少个ip地址访问;
    1. 统计某个商品链接每天被多少个不同客户访问;
  • ...

还有很多应用,大致就是统计类的需求其实都很明确, 不需要很准确的值,只需要一个类似的估计值即可,同时不用set来存储,因为set其实很消耗内存,希望这个统计的结构越节约内存越好。 其实都可以用这HLL算法,节约内存。

1.4 HyperLogLog的算法流程

1.4.1 创建一个HLL对象

HLL的头结构体定义:


struct hllhdr {
    char magic[4];      /* "HYLL" 魔数,前面4个字节表示这是一个hll对象*/
    uint8_t encoding;   /* 存储方式,后面会讲到,分为HLL_DENSE or HLL_SPARSE两种存储方式 */
    uint8_t notused[3]; /*保留字段,因为redis是自然字节对齐的,所以空着也是空着,不如定义一下 Reserved for future use, must be zero. */
    uint8_t card[8];    /*缓存的当前hll对象的基数值 Cached cardinality, little endian. */
    uint8_t registers[]; /* Data bytes. 对于dense存储方式,这里就是一个12k的连续数组,对于sparse存储方式,这里长度是不定的,后面会讲到*/
};


创建一个hll对象:

/* Create an HLL object. We always create the HLL using sparse encoding.
 * This will be upgraded to the dense representation as needed.
 这里英文注释其实已经写的很清楚了,默认hll对象使用sparse的编码方式,这样比较节约内存,但是sparse方式存储其实比较难以理解,代码实现也比较复杂,但是对于理解来说,其实就是对于里面hll桶的存储方式的不同,HLL算法本身逻辑上没有区别
 */
robj *createHLLObject(void) {
    robj *o;
    struct hllhdr *hdr;
    sds s;
    uint8_t *p;
    int sparselen = HLL_HDR_SIZE +
                    (((HLL_REGISTERS+(HLL_SPARSE_XZERO_MAX_LEN-1)) /
                     HLL_SPARSE_XZERO_MAX_LEN)*2);  
    //头长度+(16384 + (16384-1) / 16384 * 2),也就是2个字节,默认因为基数统计里面所有的桶都是0,用spase方式存储,只需要2个字节
    int aux;

    /* Populate the sparse representation with as many XZERO opcodes as
     * needed to represent all the registers. */
    aux = HLL_REGISTERS;
    s = sdsnewlen(NULL,sparselen);
    p = (uint8_t*)s + HLL_HDR_SIZE;
    while(aux) {
        int xzero = HLL_SPARSE_XZERO_MAX_LEN;
        if (xzero > aux) xzero = aux;
        HLL_SPARSE_XZERO_SET(p,xzero);
        p += 2;
        aux -= xzero;
    }
    serverAssert((p-(uint8_t*)s) == sparselen);

    /* Create the actual object. */
    o = createObject(OBJ_STRING,s);
    hdr = o->ptr;
    memcpy(hdr->magic,"HYLL",4);
    hdr->encoding = HLL_SPARSE;
    return o;
}



3 pfadd 流程

image

3.1 使用dense方式存储

来一个byte流,传入 是一个void * 指针和一个长度len, 通过MurmurHash64A 函数 计算一个64位的hash值。64位的前14位(这个值是可以修改的)作为index,后面作为50位作为bit流。 2 ^ 14 == 16384 也就是一共有16384个桶。每个桶使用6个bit存储。

后面的50位bit流,如下样子:
00001000....11000 其中第一次出现1的位置我们记为count, 所以count最大值是50, 用6个bit位就够表示了。 2 ^ 6 = 64

故一个HLL对象实际用来存储的空间是16384(个桶) * ( 每个桶6个bit) / 8 = 12288 byte。 也就是使用了约12k的内存。这个其实redis比较牛逼的地方,其实用一个字节来存的话,其实也就是16k的内存,但是为了能省4k的内存,搞出一堆。这个只是dense方式存储,相对是浪费空间的,下面讲的sparse方式存储更加节约空间。

计算出index(桶的下标), count(后面50个bit中第一次出现1的位置)后,下一步就是更新桶的操作。 根据index找到桶,然后看当前的count 是否大于oldcount,大于则更新下oldcount = count。此时为了性能考虑,是不会去统计当前的基数的,而是将HLL的头里面的一个标志位置为1,表示下次进行pfcount操作的时候,当前的缓存值已经失效了,需要重新统计缓存值。在后面pfcount流程的时候,发现这个标记为失效,就会去重新统计新的基数,放入基数缓存。

/* Call hllDenseAdd() or hllSparseAdd() according to the HLL encoding. */
int hllAdd(robj *o, unsigned char *ele, size_t elesize) {
    struct hllhdr *hdr = o->ptr;
    switch(hdr->encoding) {
    case HLL_DENSE: return hllDenseAdd(hdr->registers,ele,elesize);
    case HLL_SPARSE: return hllSparseAdd(o,ele,elesize);//sparse
    default: return -1; /* Invalid representation. */
    }
}



/* "Add" the element in the dense hyperloglog data structure.
 * Actually nothing is added, but the max 0 pattern counter of the subset
 * the element belongs to is incremented if needed.
 *
 * This is just a wrapper to hllDenseSet(), performing the hashing of the
 * element in order to retrieve the index and zero-run count. */
int hllDenseAdd(uint8_t *registers, unsigned char *ele, size_t elesize) {
    long index;
    uint8_t count = hllPatLen(ele,elesize,&index);//index就是桶的下标, count则是后面50个bit位中1第一次出现的位置
    /* Update the register if this element produced a longer run of zeroes. */
    return hllDenseSet(registers,index,count);
}


/* ================== Dense representation implementation  ================== */

/* Low level function to set the dense HLL register at 'index' to the
 * specified value if the current value is smaller than 'count'.
 *
 * 'registers' is expected to have room for HLL_REGISTERS plus an
 * additional byte on the right. This requirement is met by sds strings
 * automatically since they are implicitly null terminated.
 *
 * The function always succeed, however if as a result of the operation
 * the approximated cardinality changed, 1 is returned. Otherwise 0
 * is returned. */
int hllDenseSet(uint8_t *registers, long index, uint8_t count) {
    uint8_t oldcount;
    //找到对应的index获取其中的值
    HLL_DENSE_GET_REGISTER(oldcount,registers,index);
    if (count > oldcount) { //如果新的值比老的大,就更新来的
        HLL_DENSE_SET_REGISTER(registers,index,count);
        return 1;
    } else {
        return 0;
    }
}



/* Given a string element to add to the HyperLogLog, returns the length
 * of the pattern 000..1 of the element hash. As a side effect 'regp' is
 * set to the register index this element hashes to. */
int hllPatLen(unsigned char *ele, size_t elesize, long *regp) {
    uint64_t hash, bit, index;
    int count;

    /* Count the number of zeroes starting from bit HLL_REGISTERS
     * (that is a power of two corresponding to the first bit we don't use
     * as index). The max run can be 64-P+1 = Q+1 bits.
     *
     * Note that the final "1" ending the sequence of zeroes must be
     * included in the count, so if we find "001" the count is 3, and
     * the smallest count possible is no zeroes at all, just a 1 bit
     * at the first position, that is a count of 1.
     *
     * This may sound like inefficient, but actually in the average case
     * there are high probabilities to find a 1 after a few iterations. */
    hash = MurmurHash64A(ele,elesize,0xadc83b19ULL);
    index = hash & HLL_P_MASK; /* Register index. */
    hash >>= HLL_P; /* Remove bits used to address the register. */
    hash |= ((uint64_t)1<<HLL_Q); /* Make sure the loop terminates
                                     and count will be <= Q+1. */
    bit = 1;
    count = 1; /* Initialized to 1 since we count the "00000...1" pattern. */
    while((hash & bit) == 0) {
        count++;
        bit <<= 1;
    }
    *regp = (int) index;

    serverLog(LL_NOTICE,"pf hash idx=%d, count=%d", index, count);
    return count;
}

3.2 pfcount流程

统计基数流程,就如果cache标志位是有效的,直接返回缓存值,否则重新计算HLL的所有16384个桶,然后进行统计修正,具体的修正的原理,涉及很多的数学知识和论文,这里就不提及了。

/* Return the approximated cardinality of the set based on the harmonic
 * mean of the registers values. 'hdr' points to the start of the SDS
 * representing the String object holding the HLL representation.
 *
 * If the sparse representation of the HLL object is not valid, the integer
 * pointed by 'invalid' is set to non-zero, otherwise it is left untouched.
 *
 * hllCount() supports a special internal-only encoding of HLL_RAW, that
 * is, hdr->registers will point to an uint8_t array of HLL_REGISTERS element.
 * This is useful in order to speedup PFCOUNT when called against multiple
 * keys (no need to work with 6-bit integers encoding). */
uint64_t hllCount(struct hllhdr *hdr, int *invalid) {
    double m = HLL_REGISTERS;
    double E;
    int j;
    int reghisto[HLL_Q+2] = {0};

    /* Compute register histogram */
    if (hdr->encoding == HLL_DENSE) {
        hllDenseRegHisto(hdr->registers,reghisto);
    } else if (hdr->encoding == HLL_SPARSE) {
        hllSparseRegHisto(hdr->registers,
                         sdslen((sds)hdr)-HLL_HDR_SIZE,invalid,reghisto);
    } else if (hdr->encoding == HLL_RAW) {
        hllRawRegHisto(hdr->registers,reghisto);
    } else {
        serverPanic("Unknown HyperLogLog encoding in hllCount()");
    }

    /* Estimate cardinality form register histogram. See:
     * "New cardinality estimation algorithms for HyperLogLog sketches"
     * Otmar Ertl, arXiv:1702.01284 */
    //这里具体的修正流程,要去看论文,就照着抄过来实现就可以了。 
    double z = m * hllTau((m-reghisto[HLL_Q+1])/(double)m);
    for (j = HLL_Q; j >= 1; --j) {
        z += reghisto[j];
        z *= 0.5;
    }
    z += m * hllSigma(reghisto[0]/(double)m);
    E = llroundl(HLL_ALPHA_INF*m*m/z);

    return (uint64_t) E;
}

4. 后记

其实原理是很简单的,而且里面涉及到很多的数学知识,也是不能全部看懂,不得不感慨,redis对内存的节约是真的很变态的。对于sparse模式,节约的内存更加恐怖,因为这个其实对于hll算法的原理理解其实影响不大,本文就不做详细介绍了。

最后贴上我用golang模仿写的一个hyperloglog代码:

package goRedis

import (
	"bytes"
	"encoding/binary"
	"fmt"
	"math"
)

const seed = 0xadc83b19

const hll_dense = 1
const hll_sparse = 2

const hll_p = 14
const hll_q = 64 - hll_p
const hll_registers = 1 << hll_p
const hll_p_mask = hll_registers - 1
const hll_bits = 6
const hll_sparse_val_max_value = 32
const hll_alpha_inf = 0.721347520444481703680

type hllhdr struct {
	magic      string
	encoding   uint8
	notused    [3]uint8
	card       [8]uint64
	registers  []byte //实际存储的,因为后面如果encoding方式采用sparse的话,长度会变化,所以使用slice比较好
	vaildCache bool
}

func initHLL(encoding uint8) *hllhdr {
	hdr := new(hllhdr)
	hdr.magic = "HYLL"
	hdr.encoding = encoding

	if encoding == hll_dense {
		hdr.registers = make([]byte, hll_registers*1) // 先简单实现下 用一个字节存6个bit
	} else {
		panic("HLL SPARSE encoding format doesn't support.")
	}
	return hdr
}

func hllDenseSet(hllObj *hllhdr, index uint64, count int) bool {
	if count > int(hllObj.registers[index]) {
		hllObj.registers[index] = byte(count)
		return true
	}
	return false
}

func PfAddCommand(hllObj *hllhdr, val []byte) {
	index, count := hllPartLen(val)
	if hllObj.encoding == hll_dense {
		hllDenseSet(hllObj, index, count)
		hllObj.vaildCache = false
	} else {
		panic("HLL SPARSE encoding format doesn't support.")
	}
}

func hllTau(x float64) float64 {
	if x == 0. || x == 1. {
		return 0.
	}
	var zPrime float64
	y := 1.0
	z := 1 - x
	for {
		x = math.Sqrt(x)
		zPrime = z
		y *= 0.5
		z -= math.Pow(1-x, 2) * y
		if zPrime == z {
			break
		}
	}
	return z / 3

}

func hllDenseRegHisto(hllObj *hllhdr, reghisto *[hll_q + 2]int) {
	for i := 0; i < hll_registers; i++ {
		reg := hllObj.registers[i]
		reghisto[reg]++
	}
}

func hllSigma(x float64) float64 {
	if x == 1. {
		return math.MaxInt64
	}
	var zPrime float64
	y := float64(1)
	z := x
	for {
		x *= x
		zPrime = z
		z += x * y
		y += y
		if zPrime == z {
			break
		}
	}
	return z
}

func hllCount(hllObj *hllhdr) int {
	m := float64(hll_registers)
	var reghisto [hll_q + 2]int
	if hllObj.encoding == hll_dense {
		hllDenseRegHisto(hllObj, &reghisto)
	} else {
		panic("impliment me..")
	}

	z := m * hllTau((m - (float64(reghisto[hll_q+1]))/m))
	for j := hll_q; j >= 1; j-- {
		z += float64(reghisto[j])
		z *= 0.5
	}
	z += m * hllSigma(float64(reghisto[0])/m)
	E := math.Round(hll_alpha_inf * m * m / z)

	return int(E)

}

func PfCountCommand(hllObj *hllhdr) int {
	var ret int
	if hllObj.vaildCache {
		return 0
	} else {
		ret = hllCount(hllObj)
	}

	return ret
}

func CreateHLLObject() *hllhdr {
	hdr := initHLL(hll_dense)
	return hdr
}

func Murmurhash(buff []byte, seed uint32) uint64 {
	buffLen := uint64(len(buff))
	m := uint64(0xc6a4a7935bd1e995)
	r := uint32(47)
	h := uint64(seed) ^ (buffLen * m)

	for i := uint64(0); i < buffLen-(buffLen&7); {
		var k uint64
		bBuffer := bytes.NewBuffer(buff[i : i+8])
		binary.Read(bBuffer, binary.LittleEndian, &k)

		k *= m
		k ^= k >> r
		k *= m
		h ^= k
		h *= m

		binary.Write(bBuffer, binary.LittleEndian, &k)
		i += 8
	}
	switch buffLen & 7 {
	case 7:
		h ^= uint64(buff[6]) << 48
		fallthrough
	case 6:
		h ^= uint64(buff[5]) << 40
		fallthrough
	case 5:
		h ^= uint64(buff[4]) << 32
		fallthrough
	case 4:
		h ^= uint64(buff[3]) << 24
		fallthrough
	case 3:
		h ^= uint64(buff[2]) << 16
		fallthrough
	case 2:
		h ^= uint64(buff[1]) << 8
		fallthrough
	case 1:
		h ^= uint64(buff[0])
		fallthrough
	default:
		h *= m
	}

	h ^= h >> r
	h *= m
	h ^= h >> r
	return h
}

func hllPartLen(buff []byte) (index uint64, count int) {
	hash := Murmurhash(buff, seed)
	index = hash & uint64(hll_p_mask) //这里就是取出后14个bit,作为index
	hash >>= hll_p                    //右移把后面14个bit清理掉,注意这里的bit流其实是倒序的
	hash |= uint64(1) << hll_q        //当前的最高位设置1,其实是一个哨兵,避免count为0
	bit := uint64(1)
	count = 1
	for (hash & bit) == 0 {
		count++
		bit <<= 1
	}
	fmt.Printf("pf hash idx=%d, count=%d\n", index, count)

	return index, count
}

//func hllSparseSet(o, index int64, count int64) {
//	if count > hll_sparse_val_max_value {
//		goto promote
//	}
//
//promote:
//}

测试代码:

func TestAll(t *testing.T) {
	hllObj := CreateHLLObject()
	test1 := []string{"apple", "apple", "orange", "ttt", "aaa"}

	for _, str := range test1 {
		PfAddCommand(hllObj, []byte(str))
	}

	println(PfCountCommand(hllObj))
}