Quellcode durchsuchen

完善聚合处理逻辑

ChenYL vor 2 Jahren
Ursprung
Commit
df83aeb01c
2 geänderte Dateien mit 145 neuen und 137 gelöschten Zeilen
  1. 140 135
      src/money.py
  2. 5 2
      src/utils.py

+ 140 - 135
src/money.py

@@ -5,8 +5,11 @@ import os
 import re
 import time
 from concurrent.futures import ProcessPoolExecutor, as_completed
+from multiprocessing import Manager
+from multiprocessing.managers import Array
 
 from bitmap import BitMap
+from tqdm import tqdm
 
 import utils
 import jieba
@@ -28,6 +31,9 @@ FILE_LONG_TAIL_MERGE_REVERSE_INDEX = "长尾词_合并_倒排索引.txt"
 # 文件:长尾词_合并_聚合.txt
 FILE_LONG_TAIL_MERGE_AGG = "长尾词_合并_聚合.txt"
 
+# 子文件:长尾词_合并_聚合_%s.txt
+FILE_LONG_TAIL_MERGE_AGG_PID = "长尾词_合并_聚合_%s.txt"
+
 
 def extract_word_from_5118(file_path: str):
     """
@@ -268,65 +274,109 @@ def prepare_word_split_and_reverse_index(file_path: str):
         process_pool.shutdown()
 
 
-def agg_word_cal(word_file: str, word_split_file: str, word_position_list: list, word_split_position_list: list,
-                 agg_threshold: float, main_word: str, main_key_list: list, candidate_position_list: list):
+def agg_word_process(file_path: str, word_file: str, word_split_file: str, word_position_list: list,
+                     word_split_position_list: list, word_reverse_index_dict: dict, agg_threshold: float,
+                     start_pos: int, end_pos: int):
     """
-    长尾词聚合计算
+    长尾词聚合处理
+    :param file_path: 文件路径
     :param word_file: 长尾词文件路径
     :param word_split_file: 长尾词分词文件路径
     :param word_position_list: 长尾词位置索引
     :param word_split_position_list 词根位置索引
+    :param word_reverse_index_dict: 长尾词分词倒排索引
     :param agg_threshold: 聚合阈值
-    :param main_word: 主词
-    :param main_key_list: 主词词根
-    :param candidate_position_list: 候选词位置
+    :param start_pos: 任务处理开始边界(包含)
+    :param end_pos: 任务处理结束边界(不包含)
     :return:
     """
 
-    # 结果容器
-    result_list = []
-
-    with (open(word_file, "r", encoding="UTF-8") as f_key,
-          mmap.mmap(f_key.fileno(), 0, access=mmap.ACCESS_READ) as f_key_mmap,
-          open(word_split_file, "r", encoding="UTF-8") as f_key_split,
-          mmap.mmap(f_key_split.fileno(), 0, access=mmap.ACCESS_READ) as f_key_split_mmap):
+    # 生成临时结果文件
+    word_agg_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_AGG_PID % os.getpid())
 
-        if not candidate_position_list:
-            logging.info("子进程:候选词列表为空,结束执行")
-            return
+    with (open(word_file, "r", encoding="UTF-8") as f_word,
+          mmap.mmap(f_word.fileno(), 0, access=mmap.ACCESS_READ) as f_word_mmap,
+          open(word_split_file, "r", encoding="UTF-8") as f_word_split,
+          mmap.mmap(f_word_split.fileno(), 0, access=mmap.ACCESS_READ) as f_word_split_mmap,
+          open(word_agg_file, "a", encoding="UTF-8") as fo):
 
-        for candidate_position in candidate_position_list:
+        for main_word_position in range(start_pos, end_pos):
+            # TODO 排除已使用
             try:
-                # 获取关键词
-                word_position = word_position_list[candidate_position]
-                f_key_mmap.seek(word_position)
-                candidate_word = f_key_mmap.readline().decode("UTF-8").replace("\r", "").replace("\n", "")
-
-                if candidate_word == "小孩在肚子里怎么呼吸":
-                    print("子:出现拉拉")
-
-                # 获取分词结果
-                key_position = word_split_position_list[candidate_position]
-                f_key_split_mmap.seek(key_position)
-                temp_candidate_word_stem = f_key_split_mmap.readline().decode("UTF-8").replace("\r", "").replace("\n", "")
+                # 获取主词,移除换行符
+                real_main_word_position = word_position_list[main_word_position]
+                f_word_mmap.seek(real_main_word_position)
+                main_word = utils.remove_line_break(f_word_mmap.readline().decode("UTF-8"))
+
+                # 获取主词分词结果
+                # TODO 增加缓存
+                main_word_split_position = word_split_position_list[main_word_position]
+                f_word_split_mmap.seek(main_word_split_position)
+                temp_main_word_stem = utils.remove_line_break(f_word_split_mmap.readline().decode("UTF-8"))
                 # 为空则跳过
-                if len(temp_candidate_word_stem) == 0:
+                if len(temp_main_word_stem) == 0:
+                    continue
+                main_word_stem_list = temp_main_word_stem.split(",")
+
+                # 从倒排索引中获取候选词的位置索引
+                candidate_position_set = set()
+                for main_word_stem in main_word_stem_list:
+                    index_list = word_reverse_index_dict.get(main_word_stem)
+                    if index_list:
+                        candidate_position_set.update(index_list)
+
+                # 没有找到需要计算的候选词则跳过
+                if not candidate_position_set:
                     continue
-                candidate_word_key_list = temp_candidate_word_stem.split(",")
-
-                # 计算相关性
-                try:
-                    val = utils.cal_cos_sim(main_word, main_key_list, candidate_word, candidate_word_key_list)
-                    if val >= agg_threshold:
-                        result_list.append((candidate_position, candidate_word))
-                except Exception as e:
-                    logging.error("主关键词:%s 发生异常,涉及的副关键词信息-关键词:%s,分词:%s" % (
-                        main_word, candidate_word, candidate_word_key_list), e)
+
+                # 结果列表
+                result_list = []
+
+                # 计算相似度
+                for candidate_position in candidate_position_set:
+                    # TODO 跳过已使用
+                    # 跳过重复
+                    if main_word_position == candidate_position:
+                        continue
+
+                    # 获取关键词
+                    word_position = word_position_list[candidate_position]
+                    f_word_mmap.seek(word_position)
+                    candidate_word = utils.remove_line_break(f_word_mmap.readline().decode("UTF-8"))
+
+                    # 获取分词结果
+                    main_word_split_position = word_split_position_list[candidate_position]
+                    f_word_split_mmap.seek(main_word_split_position)
+                    temp_candidate_word_stem = utils.remove_line_break(f_word_split_mmap.readline().decode("UTF-8"))
+                    # 为空则跳过
+                    if len(temp_candidate_word_stem) == 0:
+                        continue
+                    candidate_word_key_list = temp_candidate_word_stem.split(",")
+
+                    # 计算相关性
+                    try:
+                        val = utils.cal_cos_sim(main_word, main_word_stem_list, candidate_word, candidate_word_key_list)
+                        if val >= agg_threshold:
+                            # TODO 记录已使用
+                            result_list.append(candidate_word)
+                    except Exception as e:
+                        logging.error("主关键词:%s 发生异常,涉及的副关键词信息-关键词:%s,分词:%s" % (
+                            main_word, candidate_word, candidate_word_key_list), e)
+
+                # 保存结果
+                if len(result_list) > 0:
+                    fo.write("%s\n" % main_word)
+                    for candidate_word in result_list:
+                        fo.write("%s\n" % candidate_word)
+                    fo.write("\n")
+
+                # 清除容器数据
+                candidate_position_set.clear()
+                result_list.clear()
+
             except Exception as e:
                 logging.error("子进程发生异常", e)
 
-    return result_list
-
 
 def agg_word(file_path: str):
     """
@@ -335,17 +385,32 @@ def agg_word(file_path: str):
     :return:
     """
 
+    # 总长尾词数量
+    word_total_num = 0
+
+    # 聚合阈值
+    agg_threshold = 0.8
+
+    # 每份任务计算量
+    task_cal_num = 10000
+
+    # 正则表达式:聚合文件分文件
+    agg_file_pattern = re.compile(r"长尾词_合并_聚合_\d+.txt", re.I)
+
     # 判断文件是否存在
     for file_name in [FILE_LONG_TAIL_MERGE, FILE_LONG_TAIL_MERGE_SPLIT, FILE_LONG_TAIL_MERGE_REVERSE_INDEX]:
         input_file = os.path.join(file_path, file_name)
         if os.path.exists(input_file) and not os.path.isfile(input_file):
             raise Exception("文件不存在!文件路径:" + input_file)
 
-    # 总长尾词数量
-    word_total_num = 0
+    # 删除历史数据文件
+    for file in os.listdir(file_path):
+        if agg_file_pattern.match(file):
+            os.remove(os.path.join(file_path, file))
+
 
     # 记录关键词位置
-    word_position_list = [0]
+    word_position_list = []
     word_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE)
     with (open(word_file, "r", encoding="utf-8") as f,
           mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as fmmap):
@@ -363,7 +428,7 @@ def agg_word(file_path: str):
 
     # 记录分词位置
     word_split_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_SPLIT)
-    word_split_position_list = [0]
+    word_split_position_list = []
     with (open(word_split_file, "r", encoding="utf-8") as f,
           mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as fmmap):
         while True:
@@ -392,102 +457,41 @@ def agg_word(file_path: str):
             val = index_pattern.findall(line[line.index(","):])
             word_reverse_index_dict[key] = [int(v) for v in val]
 
-    # 已使用长尾词位图
-    unused_bitmap = BitMap(word_total_num + 1)
-
-    # 聚合阈值
-    agg_threshold = 0.8
-
-    # 提交子进程阈值
-    process_threshold = os.cpu_count() * 30
-
     # 提交任务 并输出结果
     word_agg_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_AGG)
-    with (ProcessPoolExecutor(max_workers=os.cpu_count()) as process_pool,
-          open(word_agg_file, "w", encoding="UTF-8") as fo,
-          open(word_file, "r", encoding="UTF-8") as f_word,
-          open(word_split_file, "r", encoding="UTF-8") as f_word_split,
-          mmap.mmap(f_word_split.fileno(), 0, access=mmap.ACCESS_READ) as f_split_mmap):
-
-        # 准备数据
-        for position_index, main_word in enumerate(f_word, 1):
-            # 判断是否已聚合,否则置为已使用
-            if unused_bitmap.test(position_index):
-                continue
-            else:
-                unused_bitmap.set(position_index)
-
-            # 移除换行符
-            main_word = utils.remove_line_break(main_word)
-            if main_word == "小孩在肚子里怎么呼吸":
-                print("出现拉拉")
-            # 获取分词结果
-            key_position = word_split_position_list[position_index]
-            f_split_mmap.seek(key_position)
-            temp_main_word_stem = utils.remove_line_break(f_split_mmap.readline().decode("UTF-8"))
-            # 为空则跳过
-            if len(temp_main_word_stem) == 0:
-                continue
-            main_word_stem_list = temp_main_word_stem.split(",")
-
-            # 从倒排索引中获取候选词的位置索引
-            candidate_position_set = set()
-            for main_word_stem in main_word_stem_list:
-                index_list = word_reverse_index_dict.get(main_word_stem)
-                if index_list:
-                    candidate_position_set.update(index_list)
-
-            # 排除已使用的长尾词
-            candidate_position_list = []
-            for candidate_position in candidate_position_set:
-                # 跳过已使用
-                if unused_bitmap.test(candidate_position):
-                    continue
-                candidate_position_list.append(candidate_position)
 
-            # 分割计算任务,没有计算任务则跳过
-            candidate_position_len = len(candidate_position_list)
-            if candidate_position_len == 0:
-                continue
-
-            # 暂存分析结果
-            p_result_list = []
-            # 任务量不足直接在主进程上进行计算,否则提交子进程计算
-            if candidate_position_len < process_threshold:
-                p_result = agg_word_cal(word_file, word_split_file, word_position_list, word_split_position_list, agg_threshold, main_word, main_word_stem_list, candidate_position_list)
-                if len(p_result) > 0:
-                    p_result_list.extend(p_result)
-            else:
-                task_split_list = utils.avg_split_task(candidate_position_len, math.ceil(candidate_position_len / os.cpu_count()))
-                all_task_list = [candidate_position_list[start_pos:end_pos] for start_pos, end_pos in task_split_list]
-
-                # 提交任务
-                process_futures = []
-                for task_list in all_task_list:
-                    if not task_list:
-                        continue
-                    p_future = process_pool.submit(agg_word_cal, word_file, word_split_file, word_position_list, word_split_position_list, agg_threshold, main_word, main_word_stem_list, task_list)
-                    process_futures.append(p_future)
-
-                for p_future in as_completed(process_futures):
-                    p_result = p_future.result()
-                    if p_result and len(p_result) > 0:
-                        p_result_list.extend(p_result)
+    with ProcessPoolExecutor(max_workers=os.cpu_count()) as process_pool:
+        # 计算任务边界
+        task_list = utils.avg_split_task(word_total_num, task_cal_num, 1)
 
-            # 处理分析结果,并标记已处理数据
-            if len(p_result_list) > 0:
-                fo.write("%s\n" % main_word)
-                for candidate_position, candidate_word in p_result_list:
-                    unused_bitmap.set(candidate_position)
-                    fo.write("%s\n" % candidate_word)
-                fo.write("\n")
-
-            # 清除上一轮的数据
-            p_result_list.clear()
+        # 提交任务
+        process_futures = []
+        for start_pos, end_pos in task_list:
+            p_future = process_pool.submit(agg_word_process, file_path, word_file, word_split_file, word_position_list,
+                                           word_split_position_list, word_reverse_index_dict, agg_threshold, start_pos, end_pos)
+            process_futures.append(p_future)
+
+        # 显示任务进度
+        with tqdm(total=len(process_futures), desc='文本聚合进度', unit='份', unit_scale=True) as pbar:
+            for p_future in as_completed(process_futures):
+                p_future.result()
+                # 更新发呆进度
+                pbar.update(1)
 
         # 关闭线程
         process_pool.shutdown()
 
+    # 获取子进程结果文件列表,并合并
+    with open(word_agg_file, "w", encoding="UTF-8") as fo:
+        for file in os.listdir(file_path):
+            # 不是处理结果部分跳过
+            if not agg_file_pattern.match(file):
+                continue
+
+            with open(os.path.join(file_path, file), "r", encoding="UTF-8") as fi:
+                for word in fi:
+                    fo.write(word)
+
 
 if __name__ == "__main__":
     print("开始时间" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
@@ -498,8 +502,9 @@ if __name__ == "__main__":
     # prepare_word_split_and_reverse_index(filePath)
     agg_word(filePath)
     # word_split_statistics(file_path)
-    # tasks = utils.avg_split_task(100, 12)
+    # tasks = utils.avg_split_task(100, 12, 1)
     # 两者计算余弦值等于:0.8
     # val = utils.cal_cos_sim("QQ邮箱格式怎么写", ["QQ", "邮箱", "格式", "怎么", "写"], "QQ邮箱格式如何写",
     #                         ["QQ", "邮箱", "格式", "如何", "写"])
     print("结束时间" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
+

+ 5 - 2
src/utils.py

@@ -70,9 +70,10 @@ def load_stop_word():
     return stop_word_dict
 
 
-def avg_split_task(total: int, split_internal: int):
+def avg_split_task(total: int, split_internal: int, start=0):
     """
-    平分任务,包含开始位置,不包含结束位置
+    平分任务,包含开始位置,不包含结束位置,开始位置是从0开始
+    :param start: 开始位置
     :param total: 任务总数量
     :param split_internal: 每份数量
     :return: (开始位置,结束位置)
@@ -87,6 +88,8 @@ def avg_split_task(total: int, split_internal: int):
         # 计算平分点在列表中的位置
         start_pos = i * split_internal
         end_pos = i * split_internal + split_internal
+        if i == 0:
+            start_pos = start
         # 如果超过列表大小需要额外处理
         if end_pos >= total:
             end_pos = -1