|
|
@@ -7,6 +7,7 @@ from concurrent.futures import ProcessPoolExecutor, as_completed, ThreadPoolExec
|
|
|
|
|
|
import jieba
|
|
|
import redis
|
|
|
+from bitarray import bitarray
|
|
|
from tqdm import tqdm
|
|
|
|
|
|
import utils
|
|
|
@@ -63,7 +64,7 @@ def agg_word(file_path: str):
|
|
|
|
|
|
# 总长尾词数量
|
|
|
word_total_num = 0
|
|
|
- # word_total_num = 100000
|
|
|
+ # word_total_num = 1000000
|
|
|
|
|
|
# 聚合阈值
|
|
|
agg_threshold = 0.8
|
|
|
@@ -146,7 +147,7 @@ def agg_word(file_path: str):
|
|
|
|
|
|
# 先清除,然后重新构建长尾词使用位图
|
|
|
m_redis_cache.delete(CACHE_UNUSED_BITMAP)
|
|
|
- m_redis_cache.setbit(CACHE_UNUSED_BITMAP, word_total_num + 1, 1)
|
|
|
+ m_redis_cache.setbit(CACHE_UNUSED_BITMAP, word_total_num + 2, 1)
|
|
|
|
|
|
# 提交任务 并输出结果
|
|
|
word_agg_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_AGG)
|
|
|
@@ -398,6 +399,10 @@ def agg_word_thread(main_word_position: int, agg_threshold: float):
|
|
|
return
|
|
|
main_word_stem_list = main_word_stem.decode(CHARSET_UTF_8).split(",")
|
|
|
|
|
|
+ # 获取已使用位图副本
|
|
|
+ unused_bitmap = bitarray()
|
|
|
+ unused_bitmap.frombytes(redis_cache.get(CACHE_UNUSED_BITMAP))
|
|
|
+
|
|
|
# 从倒排索引中获取候选词的位置索引
|
|
|
candidate_position_set = set()
|
|
|
temp_candidate_position_list = redis_cache.hmget(CACHE_WORD_REVERSE_INDEX, main_word_stem_list)
|
|
|
@@ -406,7 +411,7 @@ def agg_word_thread(main_word_position: int, agg_threshold: float):
|
|
|
continue
|
|
|
# 排除已聚合
|
|
|
for candidate_position in temp_candidate_position.decode(CHARSET_UTF_8).split(","):
|
|
|
- if redis_cache.getbit(CACHE_UNUSED_BITMAP, candidate_position):
|
|
|
+ if unused_bitmap[int(candidate_position)]:
|
|
|
continue
|
|
|
candidate_position_set.add(candidate_position)
|
|
|
|
|
|
@@ -417,20 +422,22 @@ def agg_word_thread(main_word_position: int, agg_threshold: float):
|
|
|
# 结果列表
|
|
|
result_list = []
|
|
|
|
|
|
+ # 从缓存获取关键词列表
|
|
|
+ candidate_word_cache_list = redis_cache.hmget(CACHE_WORD, candidate_position_set)
|
|
|
+ # 从缓存获取分词列表
|
|
|
+ candidate_word_stem_cache_list = redis_cache.hmget(CACHE_WORD_STEM, candidate_position_set)
|
|
|
+
|
|
|
# 计算相似度
|
|
|
- for candidate_position in candidate_position_set:
|
|
|
- # 跳过重复
|
|
|
- if main_word_position == candidate_position:
|
|
|
- continue
|
|
|
+ for candidate_position in range(len(candidate_position_set)):
|
|
|
|
|
|
- # 获取关键词
|
|
|
- candidate_word = redis_cache.hget(CACHE_WORD, candidate_position)
|
|
|
+ # 优化获取关键词
|
|
|
+ candidate_word = candidate_word_cache_list[int(candidate_position)]
|
|
|
if not candidate_word:
|
|
|
continue
|
|
|
candidate_word = candidate_word.decode(CHARSET_UTF_8)
|
|
|
|
|
|
- # 获取分词结果
|
|
|
- candidate_word_stem = redis_cache.hget(CACHE_WORD_STEM, candidate_position)
|
|
|
+ # 优化获取分词结果
|
|
|
+ candidate_word_stem = candidate_word_stem_cache_list[int(candidate_position)]
|
|
|
# 为空则跳过
|
|
|
if not candidate_word_stem:
|
|
|
continue
|
|
|
@@ -458,6 +465,7 @@ def agg_word_thread(main_word_position: int, agg_threshold: float):
|
|
|
# 清除容器数据
|
|
|
candidate_position_set.clear()
|
|
|
result_list.clear()
|
|
|
+ unused_bitmap.clear()
|
|
|
|
|
|
except Exception as e:
|
|
|
logging.error("子进程发生异常", e)
|