Переглянути джерело

feat: 聚合优化,改为使用redis缓存数据

ChenYL 2 роки тому
батько
коміт
9f3cf46045
1 змінених файлів з 128 додано та 94 видалено
  1. 128 94
      src/money.py

+ 128 - 94
src/money.py

@@ -5,9 +5,8 @@ import os
 import re
 import time
 from concurrent.futures import ProcessPoolExecutor, as_completed
-from multiprocessing import Manager
-from multiprocessing.managers import Array
 
+import redis
 from bitmap import BitMap
 from tqdm import tqdm
 
@@ -34,6 +33,21 @@ FILE_LONG_TAIL_MERGE_AGG = "长尾词_合并_聚合.txt"
 # 子文件:长尾词_合并_聚合_%s.txt
 FILE_LONG_TAIL_MERGE_AGG_PID = "长尾词_合并_聚合_%s.txt"
 
+# 缓存前缀:分词词根
+CACHE_WORD_STEM = "word:stem"
+
+# 缓存前缀:倒排索引
+CACHE_WORD_REVERSE_INDEX = "word:reverse_index"
+
+# 缓存:长尾词缓存
+CACHE_WORD = "word"
+
+# 缓存:聚合位图
+CACHE_UNUSED_BITMAP = "unused_bitmap"
+
+# 字符集:UTF-8
+CHARSET_UTF_8 = "UTF-8"
+
 
 def extract_word_from_5118(file_path: str):
     """
@@ -274,56 +288,66 @@ def prepare_word_split_and_reverse_index(file_path: str):
         process_pool.shutdown()
 
 
-def agg_word_process(file_path: str, word_file: str, word_split_file: str, word_position_list: list,
-                     word_split_position_list: list, word_reverse_index_dict: dict, agg_threshold: float,
-                     start_pos: int, end_pos: int):
+def agg_word_process(file_path: str, agg_threshold: float, start_pos: int, end_pos: int, final_pos:int, skip_line: int):
     """
     长尾词聚合处理
     :param file_path: 文件路径
     :param word_file: 长尾词文件路径
-    :param word_split_file: 长尾词分词文件路径
-    :param word_position_list: 长尾词位置索引
-    :param word_split_position_list 词根位置索引
-    :param word_reverse_index_dict: 长尾词分词倒排索引
     :param agg_threshold: 聚合阈值
     :param start_pos: 任务处理开始边界(包含)
     :param end_pos: 任务处理结束边界(不包含)
+    :param final_pos: 总任务边界
+    :param skip_line: 进度条显示位置
     :return:
     """
 
     # 生成临时结果文件
     word_agg_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_AGG_PID % os.getpid())
 
-    with (open(word_file, "r", encoding="UTF-8") as f_word,
-          mmap.mmap(f_word.fileno(), 0, access=mmap.ACCESS_READ) as f_word_mmap,
-          open(word_split_file, "r", encoding="UTF-8") as f_word_split,
-          mmap.mmap(f_word_split.fileno(), 0, access=mmap.ACCESS_READ) as f_word_split_mmap,
-          open(word_agg_file, "a", encoding="UTF-8") as fo):
+    # redis缓存池
+    redis_pool = redis.ConnectionPool(host='127.0.0.1', port=6379, max_connections=1)
+
+    # redis缓存
+    redis_cache = redis.StrictRedis(connection_pool=redis_pool)
+
+    # 进度长度
+    process_len = 0
+    if end_pos == -1:
+        process_len = final_pos - start_pos
+    else:
+        process_len = end_pos - start_pos
+
+    with (open(word_agg_file, "a", encoding="UTF-8") as fo,
+            tqdm(total=process_len, desc='子进程-%s:文本聚合进度' % os.getpid(), unit='份', unit_scale=True, position=skip_line) as pbar):
 
         for main_word_position in range(start_pos, end_pos):
-            # TODO 排除已使用
             try:
+                # 判断主词是否为已使用,是则跳过,否则设置为已使用
+                if redis_cache.getbit(CACHE_UNUSED_BITMAP, main_word_position):
+                    continue
+                else:
+                    redis_cache.setbit(CACHE_UNUSED_BITMAP, main_word_position, 1)
+
                 # 获取主词,移除换行符
-                real_main_word_position = word_position_list[main_word_position]
-                f_word_mmap.seek(real_main_word_position)
-                main_word = utils.remove_line_break(f_word_mmap.readline().decode("UTF-8"))
+                main_word = redis_cache.hget(CACHE_WORD, main_word_position)
+                if not main_word:
+                    continue
+                main_word = main_word.decode(CHARSET_UTF_8)
 
                 # 获取主词分词结果
-                # TODO 增加缓存
-                main_word_split_position = word_split_position_list[main_word_position]
-                f_word_split_mmap.seek(main_word_split_position)
-                temp_main_word_stem = utils.remove_line_break(f_word_split_mmap.readline().decode("UTF-8"))
+                main_word_stem = redis_cache.hget(CACHE_WORD_STEM, main_word_position)
                 # 为空则跳过
-                if len(temp_main_word_stem) == 0:
+                if not main_word_stem:
                     continue
-                main_word_stem_list = temp_main_word_stem.split(",")
+                main_word_stem_list = main_word_stem.decode(CHARSET_UTF_8).split(",")
 
                 # 从倒排索引中获取候选词的位置索引
                 candidate_position_set = set()
-                for main_word_stem in main_word_stem_list:
-                    index_list = word_reverse_index_dict.get(main_word_stem)
-                    if index_list:
-                        candidate_position_set.update(index_list)
+                temp_candidate_position_list = redis_cache.hmget(CACHE_WORD_REVERSE_INDEX, main_word_stem_list)
+                for temp_candidate_position in temp_candidate_position_list:
+                    if not temp_candidate_position:
+                        continue
+                    candidate_position_set.update(temp_candidate_position.decode(CHARSET_UTF_8).split(","))
 
                 # 没有找到需要计算的候选词则跳过
                 if not candidate_position_set:
@@ -334,34 +358,33 @@ def agg_word_process(file_path: str, word_file: str, word_split_file: str, word_
 
                 # 计算相似度
                 for candidate_position in candidate_position_set:
-                    # TODO 跳过已使用
                     # 跳过重复
                     if main_word_position == candidate_position:
                         continue
 
                     # 获取关键词
-                    word_position = word_position_list[candidate_position]
-                    f_word_mmap.seek(word_position)
-                    candidate_word = utils.remove_line_break(f_word_mmap.readline().decode("UTF-8"))
+                    candidate_word = redis_cache.hget(CACHE_WORD, candidate_position)
+                    if not candidate_word:
+                        continue
+                    candidate_word = candidate_word.decode(CHARSET_UTF_8)
 
                     # 获取分词结果
-                    main_word_split_position = word_split_position_list[candidate_position]
-                    f_word_split_mmap.seek(main_word_split_position)
-                    temp_candidate_word_stem = utils.remove_line_break(f_word_split_mmap.readline().decode("UTF-8"))
+                    candidate_word_stem = redis_cache.hget(CACHE_WORD_STEM, candidate_position)
                     # 为空则跳过
-                    if len(temp_candidate_word_stem) == 0:
+                    if not candidate_word_stem:
                         continue
-                    candidate_word_key_list = temp_candidate_word_stem.split(",")
+                    candidate_word_stem_list = candidate_word_stem.decode(CHARSET_UTF_8).split(",")
 
                     # 计算相关性
                     try:
-                        val = utils.cal_cos_sim(main_word, main_word_stem_list, candidate_word, candidate_word_key_list)
+                        val = utils.cal_cos_sim(main_word, main_word_stem_list, candidate_word,
+                                                candidate_word_stem_list)
                         if val >= agg_threshold:
-                            # TODO 记录已使用
+                            redis_cache.setbit(CACHE_UNUSED_BITMAP, candidate_position, 1)
                             result_list.append(candidate_word)
                     except Exception as e:
                         logging.error("主关键词:%s 发生异常,涉及的副关键词信息-关键词:%s,分词:%s" % (
-                            main_word, candidate_word, candidate_word_key_list), e)
+                            main_word, candidate_word, candidate_word_stem_list), e)
 
                 # 保存结果
                 if len(result_list) > 0:
@@ -374,9 +397,13 @@ def agg_word_process(file_path: str, word_file: str, word_split_file: str, word_
                 candidate_position_set.clear()
                 result_list.clear()
 
+                # 更新发呆进度
+                pbar.update(1)
             except Exception as e:
                 logging.error("子进程发生异常", e)
 
+    return
+
 
 def agg_word(file_path: str):
     """
@@ -394,9 +421,16 @@ def agg_word(file_path: str):
     # 每份任务计算量
     task_cal_num = 10000
 
+    # 工作现成(减1是为了留一个处理器给redis)
+    worker_num = os.cpu_count() - 1
+    # worker_num = 1
+
     # 正则表达式:聚合文件分文件
     agg_file_pattern = re.compile(r"长尾词_合并_聚合_\d+.txt", re.I)
 
+    # redis缓存
+    redis_cache = redis.StrictRedis(host='127.0.0.1', port=6379)
+
     # 判断文件是否存在
     for file_name in [FILE_LONG_TAIL_MERGE, FILE_LONG_TAIL_MERGE_SPLIT, FILE_LONG_TAIL_MERGE_REVERSE_INDEX]:
         input_file = os.path.join(file_path, file_name)
@@ -408,73 +442,73 @@ def agg_word(file_path: str):
         if agg_file_pattern.match(file):
             os.remove(os.path.join(file_path, file))
 
-
-    # 记录关键词位置
-    word_position_list = []
-    word_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE)
-    with (open(word_file, "r", encoding="utf-8") as f,
-          mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as fmmap):
-        while True:
-            # 获取当前位置
-            cur_pos = fmmap.tell()
-            # 移动到一下行
-            line = fmmap.readline()
-            # 结束检测
-            if not line:
-                break
-            # 记录
-            word_position_list.append(cur_pos)
-            word_total_num = word_total_num + 1
-
-    # 记录分词位置
-    word_split_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_SPLIT)
-    word_split_position_list = []
-    with (open(word_split_file, "r", encoding="utf-8") as f,
-          mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as fmmap):
-        while True:
-            # 获取当前位置
-            cur_pos = fmmap.tell()
-            # 移动到一下行
-            line = fmmap.readline()
-            # 结束检测
-            if not line:
-                break
-            # 记录
-            word_split_position_list.append(cur_pos)
-
-    # 获取倒排索引
-    word_reverse_index_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_REVERSE_INDEX)
-    word_reverse_index_dict = {}
-    # 分词
-    key_re = r"([^,]+),\["
-    key_pattern = re.compile(key_re, re.I)
-    # 索引
-    index_pattern = re.compile(r"\d+", re.I)
-    with open(word_reverse_index_file, "r", encoding="utf-8") as f:
-        for line in f:
-            key_m = key_pattern.match(line)
-            key = key_m.group(1)
-            val = index_pattern.findall(line[line.index(","):])
-            word_reverse_index_dict[key] = [int(v) for v in val]
+    # 缓存关键词位置
+    word_file = os.path.join(filePath, FILE_LONG_TAIL_MERGE)
+    word_dict = {}
+    with open(word_file, "r", encoding="utf-8") as f:
+        for position, word in enumerate(f, start=1):
+            word = utils.remove_line_break(word)
+            if not word:
+                continue
+            word_dict[position] = word
+    redis_cache.hset(CACHE_WORD, mapping=word_dict)
+    # 记录总关键词数
+    word_total_num = len(word_dict)
+    # 释放内存
+    del word_dict
+    #
+    # # 缓存分词
+    # word_split_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_SPLIT)
+    # word_split_dict = {}
+    # with open(word_split_file, "r", encoding=CHARSET_UTF_8) as f:
+    #     for position, word_split_line in enumerate(f, start=1):
+    #         word_split_line = utils.remove_line_break(word_split_line)
+    #         if not word_split_line:
+    #             continue
+    #         word_split_dict[position] = word_split_line
+    # redis_cache.hset(CACHE_WORD_STEM, mapping=word_split_dict)
+    # # 释放内存
+    # del word_split_dict
+    #
+    # # 缓存倒排索引
+    # word_reverse_index_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_REVERSE_INDEX)
+    # word_reverse_index_dict = {}
+    # # 分词
+    # key_pattern = re.compile(r"([^,]+),\[", re.I)
+    # # 索引
+    # index_pattern = re.compile(r"\d+", re.I)
+    # with open(word_reverse_index_file, "r", encoding="utf-8") as f:
+    #     for word_split_line in f:
+    #         key_m = key_pattern.match(word_split_line)
+    #         key = key_m.group(1)
+    #         val = index_pattern.findall(word_split_line[word_split_line.index(","):])
+    #         word_reverse_index_dict[key] = ",".join(val)
+    # redis_cache.hset(CACHE_WORD_REVERSE_INDEX, mapping=word_reverse_index_dict)
+    # # 释放内存
+    # del word_reverse_index_dict
+
+    # 构建长尾词使用位图
+    redis_cache.setbit(CACHE_UNUSED_BITMAP, word_total_num + 1, 1)
 
     # 提交任务 并输出结果
     word_agg_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_AGG)
 
-    with ProcessPoolExecutor(max_workers=os.cpu_count()) as process_pool:
+    with ProcessPoolExecutor(max_workers=worker_num) as process_pool:
         # 计算任务边界
         task_list = utils.avg_split_task(word_total_num, task_cal_num, 1)
 
         # 提交任务
         process_futures = []
-        for start_pos, end_pos in task_list:
-            p_future = process_pool.submit(agg_word_process, file_path, word_file, word_split_file, word_position_list,
-                                           word_split_position_list, word_reverse_index_dict, agg_threshold, start_pos, end_pos)
+        for skip_line, pos in enumerate(task_list, start=1):
+            # skip_line =  (skip_line % 4 ) + 1
+            skip_line = 1
+            p_future = process_pool.submit(agg_word_process, file_path, agg_threshold, pos[0], pos[1], word_total_num, skip_line)
             process_futures.append(p_future)
 
         # 显示任务进度
-        with tqdm(total=len(process_futures), desc='文本聚合进度', unit='份', unit_scale=True) as pbar:
+        with tqdm(total=len(process_futures), desc='文本聚合进度', unit='份', unit_scale=True, position=0) as pbar:
             for p_future in as_completed(process_futures):
-                p_future.result()
+                p_result = p_future.result()
                 # 更新发呆进度
                 pbar.update(1)