您好,欢迎访问代理记账网站
  • 价格透明
  • 信息保密
  • 进度掌控
  • 售后无忧

推荐系统之局部敏感哈希(LSH)

推荐系统之局部敏感哈希(LSH)

前言

局部敏感哈希的基本思想:是希望让相邻的点落入同一个“桶”,这样在进行最近邻搜索时,我们仅需要在一个桶内,或相邻几个桶内的元素中进行搜索即可。如果保持每个桶中的元素个数在一个常数附近,我们就可以把最近邻搜索的时间复杂度降低到常数级别。
高维空间映射到低维空间相似度的定性结论:欧式空间中,将高维空间的点映射到低维空间,原本接近的点在低维空间中肯定依然接近,但原本远离的点则有一定概率变成接近的点。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eBUZv8Ix-1622533742078)(./imgs/LSH.png)]
构造原则

  1. 利用低维空间可以保留高维空间相近距离关系的性质,我们就可以构造局部敏感哈希“桶”。
  2. 对于 Embedding 向量来说,由于 Embedding 大量使用内积操作计算相似度,因此我们也可以用内积操作来构建局部敏感哈希桶。假设 v 是高维空间中的 k 维 Embedding 向量,x 是随机生成的 k 维映射向量。那我们利用内积操作可以将 v 映射到一维空间,得到数值 h ( v ) = v ⋅ x h(v)=v⋅x h(v)=vx
  3. 而且,一维空间也会部分保存高维空间的近似距离信息。因此,我们可以使用哈希函数 h(v) 进行分桶,公式为: h ( v ) = ⌊ ( x ⋅ v + b ) / w ⌋ h(v)=⌊(x⋅v+b)/w⌋ h(v)=(xv+b)/w 。其中, ⌊⌋ 是向下取整操作, w 是分桶宽度,b 是 0 到 w 间的一个均匀分布随机变量,避免分桶边界固化。
  4. 不过,映射操作会损失部分距离信息,如果我们仅采用一个哈希函数进行分桶,必然存在相近点误判的情况,因此,我们可以采用 m 个哈希函数同时进行分桶。如果两个点同时掉进了 m 个桶,那它们是相似点的概率将大大增加。通过分桶找到相邻点的候选集合后,我们就可以在有限的候选集合中通过遍历找到目标点真正的 K 近邻了。

最后,局部敏感哈希能在常数时间得到最近邻的结果吗?
答案是可以的,如果我们能够精确地控制每个桶内的点的规模是 C,假设每个 Embedding 的维度是 N,那么找到最近邻点的时间开销将永远在 O ( C ⋅ N ) O(C⋅N) O(CN)量级。采用多桶策略之后,假设分桶函数数量是 K,那么时间开销也在 O ( K ⋅ C ⋅ N ) O(K⋅C⋅N) O(KCN)量级,这仍然是一个常数。

LSH代码

代码流程:
1、首先,在离线状态下,将所有的向量通过特定的哈希函数映射到对应的索引位置;

2、输入一个向量,用同样的哈希函数计算哈希值,找到对应哈希值位置的所有向量;

3、根据对应的距离度量方法,与第2步查到的所有向量计算距离;

4、筛选出距离最小的n个向量,即为与输入向量最为相似的n个结果。

import numpy as np
from typing import List, Union
class EuclideanLSH:

    def __init__(self, tables_num: int, a: int, depth: int):
        """

        :param tables_num: hash_table的个数
        :param a: a越大,被纳入同个位置的向量就越多,即可以提高原来相似的向量映射到同个位置的概率,
                反之,则可以降低原来不相似的向量映射到同个位置的概率。
        :param depth: 向量的维度数
        """
        self.tables_num = tables_num
        self.a = a
        # 为了方便矩阵运算,调整了shape,每一列代表一个hash_table的随机向量
        self.R = np.random.random([depth, tables_num])
        self.b = np.random.uniform(0, a, [1, tables_num])
        # 初始化空的hash_table
        self.hash_tables = [dict() for i in range(tables_num)]

    def _hash(self, inputs: Union[List[List], np.ndarray]):
        """
        将向量映射到对应的hash_table的索引
        :param inputs: 输入的单个或多个向量
        :return: 每一行代表一个向量输出的所有索引,每一列代表位于一个hash_table中的索引
        """
        # H(V) = |V·R + b| / a,R是一个随机向量,a是桶宽,b是一个在[0,a]之间均匀分布的随机变量
        hash_val = np.floor(np.abs(np.matmul(inputs, self.R) + self.b) / self.a)

        return hash_val

    def insert(self, inputs):
        """
        将向量映射到对应的hash_table的索引,并插入到所有hash_table中
        :param inputs:
        :return:
        """
        # 将inputs转化为二维向量
        inputs = np.array(inputs)
        if len(inputs.shape) == 1:
            inputs = inputs.reshape([1, -1])
        # indexs: [21. 23. 24. 26. 25. 22. 25. 25. 22. 26.]
        hash_index = self._hash(inputs)
        for inputs_one, indexs in zip(inputs, hash_index):
            for i, key in enumerate(indexs):
                # i代表第i个hash_table,key则为当前hash_table的索引位置
                # inputs_one代表当前向量, 具有相同hash index的输入放入到同一个桶中
                # [{"hash_key": [inputs_ones]}]
                self.hash_tables[i].setdefault(key, []).append(tuple(inputs_one))

    def query(self, inputs, nums=20, op = "or"):
        """
        查询与inputs相似的向量,并输出相似度最高的nums个
        :param inputs: 输入向量
        :param nums:
        :return:
        """
        hash_val = self._hash(inputs).ravel()
        candidates = set()

        # 将相同索引位置的向量添加到候选集中
        if op == "or":
            for i, key in enumerate(hash_val):
                candidates.update(self.hash_tables[i][key])
                
        elif op == "and":
            for i, key in enumerate(hash_val[1:], start=1):
                if i == 1:
                    candidates = self.hash_tables[i - 1][key]
                l2 = self.hash_tables[i][key]
                candidates = set(candidates)&set(l2)
        # 根据向量距离进行排序
        candidates = sorted(candidates, key=lambda x: self.euclidean_dis(x, inputs))
        
        return candidates[:nums]

    @staticmethod
    def euclidean_dis(x, y):
        """
        计算欧式距离
        :param x:
        :param y:
        :return:
        """
        x = np.array(x)
        y = np.array(y)

        return np.sqrt(np.sum(np.power(x - y, 2)))


if __name__ == '__main__':
    
    data = np.random.random([10000, 100])
    query = np.random.random([100])

    lsh = EuclideanLSH(10, 1, 100)
    lsh.insert(data)
    res = lsh.query(query, 20)
    res = np.array(res)
    print("LSH: \n", np.sum(np.power(res - query, 2), axis=-1))
    
    # 获得排序后的index数值
    # 一一计算欧式距离,进行比较
    sort = np.argsort(np.sum(np.power(data - query, 2), axis=-1))
    print("Original data distance(Top-k): \n", np.sum(np.power(data[sort[:20]] - query, 2), axis=-1))
    print("Original data distance(Bottom-k): \n",np.sum(np.power(data[sort[-20:]] - query, 2), axis=-1))
LSH: 
 [10.08980867 10.11797684 10.12562829 10.278338   10.93401227 11.07314636
 11.07426626 11.12684973 11.17300039 11.17381982 11.17470708 11.1872176
 11.18894438 11.33127372 11.34651465 11.36570334 11.37687416 11.39542495
 11.46208599 11.47289804]
Original data distance(Top-k): 
 [10.08980867 10.11797684 10.12562829 10.13820015 10.278338   10.55935714
 10.68896141 10.78527213 10.93401227 10.994057   11.07314636 11.07426626
 11.12684973 11.17300039 11.17381982 11.17470708 11.1872176  11.18894438
 11.33127372 11.34651465]
Original data distance(Bottom-k): 
 [21.88635607 21.96086613 21.96727453 22.06513118 22.09067843 22.09252059
 22.10877624 22.13209293 22.25154688 22.33314858 22.39606409 22.40801702
 22.43777882 22.45125568 22.54360093 22.71898604 22.87555687 22.93491961
 22.96644845 23.05905718]

参考

12-Embedding召回中的局部敏感哈希策略
局部敏感哈希(LSH):高维数据下的最近邻查找


分享:

低价透明

统一报价,无隐形消费

金牌服务

一对一专属顾问7*24小时金牌服务

信息保密

个人信息安全有保障

售后无忧

服务出问题客服经理全程跟进