Kaynağa Gözat

feat:聚合优化,优化redis使用

ChenYL 2 yıl önce
ebeveyn
işleme
72e2d9edb2
1 değiştirilmiş dosya ile 108 ekleme ve 102 silme
  1. 108 102
      src/agg.py

+ 108 - 102
src/agg.py

@@ -50,14 +50,11 @@ CHARSET_UTF_8 = "UTF-8"
 # redis缓存池
 redis_pool: redis.ConnectionPool = None
 
-# redis客户端
-redis_cache: redis.StrictRedis = None
-
 # 线程池
 thread_pool: ThreadPoolExecutor = None
 
 # 线程本地变量
-local_variable = threading.local()
+local_var = threading.local()
 
 
 def agg_word(file_path: str):
@@ -68,8 +65,8 @@ def agg_word(file_path: str):
     """
 
     # 总长尾词数量
-    word_total_num = 0
-    # word_total_num = 1000000
+    # word_total_num = 0
+    word_total_num = 1000000
 
     # 聚合阈值
     agg_threshold = 0.8
@@ -85,7 +82,7 @@ def agg_word(file_path: str):
     agg_file_pattern = re.compile(r"长尾词_合并_聚合_\d+_\d+.txt", re.I)
 
     # 最大线程数
-    max_threads = 3
+    max_threads = 2
 
     # redis最大连接数(和工作线程数保持一致,免得浪费)
     redis_max_conns = max_threads
@@ -108,51 +105,50 @@ def agg_word(file_path: str):
         for history_agg_file in history_agg_file_list:
             shutil.move(os.path.join(file_path, history_agg_file), archive_path)
 
-    return
     # 缓存关键词位置
-    word_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE)
-    word_dict = {}
-    with open(word_file, "r", encoding="utf-8") as f:
-        for position, word in enumerate(f, start=1):
-            word = utils.remove_line_break(word)
-            if not word:
-                continue
-            word_dict[position] = word
-    m_redis_cache.hset(CACHE_WORD, mapping=word_dict)
-    # 记录总关键词数
-    word_total_num = len(word_dict)
-    # 释放内存
-    del word_dict
-
-    # 缓存分词
-    word_split_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_SPLIT)
-    word_split_dict = {}
-    with open(word_split_file, "r", encoding=CHARSET_UTF_8) as f:
-        for position, word_split_line in enumerate(f, start=1):
-            word_split_line = utils.remove_line_break(word_split_line)
-            if not word_split_line:
-                continue
-            word_split_dict[position] = word_split_line
-    m_redis_cache.hset(CACHE_WORD_STEM, mapping=word_split_dict)
-    # 释放内存
-    del word_split_dict
-
-    # 缓存倒排索引
-    word_reverse_index_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_REVERSE_INDEX)
-    word_reverse_index_dict = {}
-    # 分词
-    key_pattern = re.compile(r"([^,]+),\[", re.I)
-    # 索引
-    index_pattern = re.compile(r"\d+", re.I)
-    with open(word_reverse_index_file, "r", encoding="utf-8") as f:
-        for word_split_line in f:
-            key_m = key_pattern.match(word_split_line)
-            key = key_m.group(1)
-            val = index_pattern.findall(word_split_line[word_split_line.index(","):])
-            word_reverse_index_dict[key] = ",".join(val)
-    m_redis_cache.hset(CACHE_WORD_REVERSE_INDEX, mapping=word_reverse_index_dict)
-    # 释放内存
-    del word_reverse_index_dict
+    # word_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE)
+    # word_dict = {}
+    # with open(word_file, "r", encoding="utf-8") as f:
+    #     for position, word in enumerate(f, start=1):
+    #         word = utils.remove_line_break(word)
+    #         if not word:
+    #             continue
+    #         word_dict[position] = word
+    # m_redis_cache.hset(CACHE_WORD, mapping=word_dict)
+    # # 记录总关键词数
+    # word_total_num = len(word_dict)
+    # # 释放内存
+    # del word_dict
+    #
+    # # 缓存分词
+    # word_split_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_SPLIT)
+    # word_split_dict = {}
+    # with open(word_split_file, "r", encoding=CHARSET_UTF_8) as f:
+    #     for position, word_split_line in enumerate(f, start=1):
+    #         word_split_line = utils.remove_line_break(word_split_line)
+    #         if not word_split_line:
+    #             continue
+    #         word_split_dict[position] = word_split_line
+    # m_redis_cache.hset(CACHE_WORD_STEM, mapping=word_split_dict)
+    # # 释放内存
+    # del word_split_dict
+    #
+    # # 缓存倒排索引
+    # word_reverse_index_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_REVERSE_INDEX)
+    # word_reverse_index_dict = {}
+    # # 分词
+    # key_pattern = re.compile(r"([^,]+),\[", re.I)
+    # # 索引
+    # index_pattern = re.compile(r"\d+", re.I)
+    # with open(word_reverse_index_file, "r", encoding="utf-8") as f:
+    #     for word_split_line in f:
+    #         key_m = key_pattern.match(word_split_line)
+    #         key = key_m.group(1)
+    #         val = index_pattern.findall(word_split_line[word_split_line.index(","):])
+    #         word_reverse_index_dict[key] = ",".join(val)
+    # m_redis_cache.hset(CACHE_WORD_REVERSE_INDEX, mapping=word_reverse_index_dict)
+    # # 释放内存
+    # del word_reverse_index_dict
 
     # 先清除,然后重新构建长尾词使用位图
     m_redis_cache.delete(CACHE_UNUSED_BITMAP)
@@ -169,7 +165,7 @@ def agg_word(file_path: str):
         # 提交任务
         process_futures = []
         for skip_line, pos in enumerate(task_list, start=1):
-            skip_line =  (skip_line % worker_num ) + 1
+            skip_line = (skip_line % worker_num) + 1
             p_future = process_pool.submit(agg_word_process, agg_threshold, pos[0], pos[1], word_total_num,
                                            skip_line)
             process_futures.append(p_future)
@@ -378,78 +374,88 @@ def init_thread(file_path: str):
     :return:
     """
     # 初始化redis客户端
-    global redis_cache
-    redis_cache = redis.StrictRedis(connection_pool=redis_pool)
+    local_var.redis_cache = redis.StrictRedis(connection_pool=redis_pool)
+    # 初始化redis管道
+    local_var.redis_pipeline = local_var.redis_cache.pipeline(transaction=False)
 
     # 生成临时结果文件
     word_agg_file = os.path.join(file_path,
                                  FILE_LONG_TAIL_MERGE_AGG_PID % (os.getpid(), threading.current_thread().ident))
-    local_variable.file_writer = open(word_agg_file, "w", encoding=CHARSET_UTF_8)
+    local_var.file_writer = open(word_agg_file, "w", encoding=CHARSET_UTF_8)
+
+    # 已使用位图副本
+    local_var.unused_bitmap = bitarray()
+
+    # 从倒排索引中获取候选词的位置索引
+    local_var.candidate_position_set = set()
+
+    # 结果列表
+    local_var.result_list = []
 
 
 def agg_word_thread(main_word_position: int, agg_threshold: float):
     try:
+        # 获取已使用位图副本
+        local_var.unused_bitmap.frombytes(local_var.redis_cache.get(CACHE_UNUSED_BITMAP))
+
         # 判断主词是否为已使用,是则跳过,否则设置为已使用
-        if redis_cache.getbit(CACHE_UNUSED_BITMAP, main_word_position):
+        if local_var.unused_bitmap[main_word_position]:
             return
         else:
-            redis_cache.setbit(CACHE_UNUSED_BITMAP, main_word_position, 1)
-
-        # 获取主词,移除换行符
-        main_word = redis_cache.hget(CACHE_WORD, main_word_position)
-        if not main_word:
+            local_var.redis_cache.setbit(CACHE_UNUSED_BITMAP, main_word_position, 1)
+            local_var.unused_bitmap[main_word_position] = 1
+
+        # 获取主词和对应的词根
+        local_var.redis_pipeline.hget(CACHE_WORD, main_word_position)
+        local_var.redis_pipeline.hget(CACHE_WORD_STEM, main_word_position)
+        main_result = local_var.redis_pipeline.execute()
+        main_word = main_result[0]
+        main_word_stem = main_result[1]
+
+        # 如果存在为空则返回
+        if not main_word or not main_word_stem:
             return
-        main_word = main_word.decode(CHARSET_UTF_8)
 
-        # 获取主词分词结果
-        main_word_stem = redis_cache.hget(CACHE_WORD_STEM, main_word_position)
-        # 为空则跳过
-        if not main_word_stem:
-            return
         main_word_stem_list = main_word_stem.decode(CHARSET_UTF_8).split(",")
 
-        # 获取已使用位图副本
-        unused_bitmap = bitarray()
-        unused_bitmap.frombytes(redis_cache.get(CACHE_UNUSED_BITMAP))
-
-        # 从倒排索引中获取候选词的位置索引
-        candidate_position_set = set()
-        temp_candidate_position_list = redis_cache.hmget(CACHE_WORD_REVERSE_INDEX, main_word_stem_list)
+        # 从倒排索引获取长尾词位置
+        temp_candidate_position_list = local_var.redis_cache.hmget(CACHE_WORD_REVERSE_INDEX, main_word_stem_list)
         for temp_candidate_position in temp_candidate_position_list:
             if not temp_candidate_position:
                 continue
             # 排除已聚合
             for candidate_position in temp_candidate_position.decode(CHARSET_UTF_8).split(","):
-                if unused_bitmap[int(candidate_position)]:
+                if local_var.unused_bitmap[int(candidate_position)]:
                     continue
-                candidate_position_set.add(candidate_position)
+                local_var.candidate_position_set.add(candidate_position)
 
         # 没有找到需要计算的候选词则跳过
-        if not candidate_position_set:
+        if not local_var.candidate_position_set:
             return
 
-        # 结果列表
-        result_list = []
+        # 从缓存获取关键词列表、分词列表,如果为空则跳过
+        local_var.redis_pipeline.hmget(CACHE_WORD, local_var.candidate_position_set)
+        local_var.redis_pipeline.hmget(CACHE_WORD_STEM, local_var.candidate_position_set)
+        candidate_result = local_var.redis_pipeline.execute()
+        candidate_word_cache_list = candidate_result[0]
+        candidate_word_stem_cache_list = candidate_result[1]
+        if not candidate_word_cache_list or not candidate_word_stem_cache_list:
+            return
 
-        # 从缓存获取关键词列表
-        candidate_word_cache_list = redis_cache.hmget(CACHE_WORD, candidate_position_set)
-        # 从缓存获取分词列表
-        candidate_word_stem_cache_list = redis_cache.hmget(CACHE_WORD_STEM, candidate_position_set)
+        # 延后编码成字符,以防前面直接返回
+        main_word = main_word.decode(CHARSET_UTF_8)
 
         # 计算相似度
-        for candidate_position in range(len(candidate_position_set)):
+        for candidate_position in range(len(local_var.candidate_position_set)):
 
-            # 优化获取关键词
+            # 获取关键词、分词,如果存在为空则跳过
             candidate_word = candidate_word_cache_list[int(candidate_position)]
             if not candidate_word:
                 continue
-            candidate_word = candidate_word.decode(CHARSET_UTF_8)
-
-            # 优化获取分词结果
             candidate_word_stem = candidate_word_stem_cache_list[int(candidate_position)]
-            # 为空则跳过
             if not candidate_word_stem:
                 continue
+            candidate_word = candidate_word.decode(CHARSET_UTF_8)
             candidate_word_stem_list = candidate_word_stem.decode(CHARSET_UTF_8).split(",")
 
             # 计算相关性
@@ -457,26 +463,26 @@ def agg_word_thread(main_word_position: int, agg_threshold: float):
                 val = utils.cal_cos_sim(main_word, main_word_stem_list, candidate_word,
                                         candidate_word_stem_list)
                 if val >= agg_threshold:
-                    redis_cache.setbit(CACHE_UNUSED_BITMAP, candidate_position, 1)
-                    result_list.append(candidate_word)
+                    local_var.redis_cache.setbit(CACHE_UNUSED_BITMAP, candidate_position, 1)
+                    local_var.result_list.append(candidate_word)
             except Exception as e:
                 logging.error("主关键词:%s 发生异常,涉及的副关键词信息-关键词:%s,分词:%s" % (
                     main_word, candidate_word, candidate_word_stem_list), e)
 
         # 保存结果
-        if len(result_list) > 0:
-            local_variable.file_writer.write("%s\n" % main_word)
-            for candidate_word in result_list:
-                local_variable.file_writer.write("%s\n" % candidate_word)
-            local_variable.file_writer.write("\n")
-            local_variable.file_writer.flush()
-
-        # 清除容器数据
-        candidate_position_set.clear()
-        result_list.clear()
-        unused_bitmap.clear()
+        if not local_var.result_list:
+            return
+        local_var.file_writer.write("%s\n" % main_word)
+        for candidate_word in local_var.result_list:
+            local_var.file_writer.write("%s\n" % candidate_word)
+        local_var.file_writer.write("\n")
 
     except Exception as e:
         logging.error("子进程发生异常", e)
+    finally:
+        # 清除容器数据
+        local_var.candidate_position_set.clear()
+        local_var.result_list.clear()
+        local_var.unused_bitmap.clear()
 
     return