Ver Fonte

增加长尾词聚合

ChenYL há 2 anos atrás
pai
commit
9b0517c474
3 ficheiros alterados com 452 adições e 10 exclusões
  1. 0 0
      src/logging.conf
  2. 379 9
      src/money.py
  3. 73 1
      src/utils.py

+ 0 - 0
src/tmp/logging.conf → src/logging.conf


+ 379 - 9
src/money.py

@@ -1,15 +1,32 @@
 # -*- coding:utf-8 -*-
-
+import math
+import mmap
 import os
+import re
+import time
+from concurrent.futures import ProcessPoolExecutor, as_completed
+
+from bitmap import BitMap
+
 import utils
 import jieba
 import zipfile
+import logging
 
 # 文件后缀:长尾词.txt
 FILE_SUFFIX_LONG_TAIL = "_长尾词.txt"
 
 # 文件后缀:长尾词_合并.txt
-FILE_SUFFIX_LONG_TAIL_MERGE = "_长尾词_合并.txt"
+FILE_LONG_TAIL_MERGE = "长尾词_合并.txt"
+
+# 文件:长尾词_合并_分词.txt
+FILE_LONG_TAIL_MERGE_SPLIT = "长尾词_合并_分词.txt"
+
+# 文件:长尾词_合并_分词倒排索引.txt
+FILE_LONG_TAIL_MERGE_REVERSE_INDEX = "长尾词_合并_倒排索引.txt"
+
+# 文件:长尾词_合并_聚合.txt
+FILE_LONG_TAIL_MERGE_AGG = "长尾词_合并_聚合.txt"
 
 
 def extract_word_from_5118(file_path: str):
@@ -75,7 +92,7 @@ def merge_word(file_path: str):
                 word_set.add(word.replace("\n", ""))
 
     # 保存合并结果
-    with open(os.path.join(file_path, str(len(file_list)) + FILE_SUFFIX_LONG_TAIL_MERGE), "w", encoding="utf-8") as f:
+    with open(os.path.join(file_path, FILE_LONG_TAIL_MERGE), "w", encoding="utf-8") as f:
         for item in word_set:
             f.write(item)
             f.write("\n")
@@ -102,9 +119,9 @@ def word_split_statistics(file_path: str):
         key_dict = {}
 
         with open(file, "r", encoding="utf-8") as f:
-            for word in f:
+            for tmp_word in f:
                 # 分词
-                word_list = jieba.cut_for_search(word.replace("\n", ""))
+                word_list = jieba.cut_for_search(tmp_word.replace("\n", ""))
                 # 统计
                 for word in word_list:
                     # 过滤停用词
@@ -126,10 +143,363 @@ def word_split_statistics(file_path: str):
                 f.write("%s,%d\n" % (key, count))
 
 
+def word_split_reverse(input_file: str, start_pos: int, end_pos: int):
+    """
+    分词和建立倒排索引
+    :param input_file: 待处理的文件
+    :param start_pos: 处理的开始位置
+    :param end_pos: 处理的结束位置
+    :return: (开始位置,分词结果,倒排索引)
+    """
+
+    # 加载停用词
+    stop_word_dict = utils.load_stop_word()
+    # 关键词存放容器
+    word_arr_list = []
+    # 倒排索引
+    word_reverse_index = dict()
+
+    with open(input_file, "r", encoding="utf-8") as fr:
+        for i, tmp_word in enumerate(fr):
+
+            # start_pos是行数,而i要从0开始
+            if i + 1 < start_pos:
+                continue
+
+            # 当前位置
+            cur_pos = i + 1
+
+            # 到达任务边界,结束
+            if cur_pos == end_pos:
+                break
+
+            # 分词
+            word_list = jieba.cut_for_search(tmp_word.replace("\n", ""))
+
+            # 分词过滤结果
+            word_filter_arr = []
+
+            # 过滤停用词
+            for word in word_list:
+                if word in stop_word_dict:
+                    continue
+
+                word_index_arr = word_reverse_index.get(word)
+                if word_index_arr:
+                    word_index_arr.append(cur_pos)
+                else:
+                    word_reverse_index[word] = [cur_pos]
+
+                word_filter_arr.append(word)
+
+            if len(word_filter_arr) == 0:
+                word_arr_list.append([])
+            else:
+                word_arr_list.append(word_filter_arr)
+
+    return start_pos, word_arr_list, word_reverse_index
+
+
+def prepare_word_split_and_reverse_index(file_path: str):
+    """
+    预处理:长尾词分词、建立倒排索引
+    :param file_path: 待处理文件夹路径
+    :return:
+    """
+
+    # 判断文件是否存在
+    word_input_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE)
+    if os.path.exists(word_input_file) and not os.path.isfile(word_input_file):
+        print("文件不存在! " + word_input_file)
+        return
+
+    # 总文本数量
+    total_line_num = 0
+    with open(word_input_file, "r", encoding="utf-8") as fi:
+        total_line_num = sum(1 for line in fi)
+
+    if total_line_num == 0:
+        print("没有待处理的数据,文本量为0")
+        return
+
+    # 分割任务数量
+    task_list = utils.avg_split_task(total_line_num, math.ceil(total_line_num / os.cpu_count()))
+
+    # 任务进程处理结果
+    p_result_list = []
+
+    # 多进程处理
+    with ProcessPoolExecutor(os.cpu_count()) as process_pool:
+        # 提交任务
+        process_futures = [process_pool.submit(word_split_reverse, word_input_file, task[0], task[1]) for task in
+                           task_list]
+
+        # 处理返回结果
+        for p_future in as_completed(process_futures):
+            p_result = p_future.result()
+            if p_result:
+                p_result_list.append(p_result)
+
+        # 分词结果排序
+        p_result_list = sorted(p_result_list, key=lambda v: v[0])
+        # 输出分词结果
+        split_output_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_SPLIT)
+        with open(split_output_file, "w", encoding="UTF-8") as fo:
+            for start_pos, word_arr_list, reverse_index in p_result_list:
+                for word_arr in word_arr_list:
+                    fo.write("%s\n" % ",".join([str(i) for i in word_arr]))
+
+        # 关键词倒排索引
+        word_reverse_index_dict = dict()
+        # 合并倒排索引
+        for start_pos, word_arr, reverse_index_dict in p_result_list:
+            for key, value in reverse_index_dict.items():
+                reverse_index_arr = word_reverse_index_dict.get(key)
+                if reverse_index_arr:
+                    reverse_index_arr.extend(value)
+                else:
+                    word_reverse_index_dict[key] = value
+        # 输出倒排索引
+        with open(os.path.join(file_path, FILE_LONG_TAIL_MERGE_REVERSE_INDEX), "w", encoding="UTF-8") as fo:
+            for key, value in word_reverse_index_dict.items():
+                fo.write("%s,%s\n" % (key, value))
+
+        # 关闭进程池
+        process_pool.shutdown()
+
+
+def agg_word_cal(word_file: str, word_split_file: str, word_position_list: list, word_split_position_list: list,
+                 agg_threshold: float, main_word: str, main_key_list: list, candidate_position_list: list):
+    """
+    长尾词聚合计算
+    :param word_file: 长尾词文件路径
+    :param word_split_file: 长尾词分词文件路径
+    :param word_position_list: 长尾词位置索引
+    :param word_split_position_list 词根位置索引
+    :param agg_threshold: 聚合阈值
+    :param main_word: 主词
+    :param main_key_list: 主词词根
+    :param candidate_position_list: 候选词位置
+    :return:
+    """
+
+    # 结果容器
+    result_list = []
+
+    with (open(word_file, "r", encoding="UTF-8") as f_key,
+          mmap.mmap(f_key.fileno(), 0, access=mmap.ACCESS_READ) as f_key_mmap,
+          open(word_split_file, "r", encoding="UTF-8") as f_key_split,
+          mmap.mmap(f_key_split.fileno(), 0, access=mmap.ACCESS_READ) as f_key_split_mmap):
+
+        if not candidate_position_list:
+            logging.info("子进程:候选词列表为空,结束执行")
+            return
+
+        for candidate_position in candidate_position_list:
+            try:
+                # 获取关键词
+                word_position = word_position_list[candidate_position]
+                f_key_mmap.seek(word_position)
+                candidate_word = f_key_mmap.readline().decode("UTF-8").replace("\r", "").replace("\n", "")
+
+                if candidate_word == "小孩在肚子里怎么呼吸":
+                    print("子:出现拉拉")
+
+                # 获取分词结果
+                key_position = word_split_position_list[candidate_position]
+                f_key_split_mmap.seek(key_position)
+                temp_candidate_word_stem = f_key_split_mmap.readline().decode("UTF-8").replace("\r", "").replace("\n", "")
+                # 为空则跳过
+                if len(temp_candidate_word_stem) == 0:
+                    continue
+                candidate_word_key_list = temp_candidate_word_stem.split(",")
+
+                # 计算相关性
+                try:
+                    val = utils.cal_cos_sim(main_word, main_key_list, candidate_word, candidate_word_key_list)
+                    if val >= agg_threshold:
+                        result_list.append((candidate_position, candidate_word))
+                except Exception as e:
+                    logging.error("主关键词:%s 发生异常,涉及的副关键词信息-关键词:%s,分词:%s" % (
+                        main_word, candidate_word, candidate_word_key_list), e)
+            except Exception as e:
+                logging.error("子进程发生异常", e)
+
+    return result_list
+
+
+def agg_word(file_path: str):
+    """
+    长尾词聚合
+    :param file_path:
+    :return:
+    """
+
+    # 判断文件是否存在
+    for file_name in [FILE_LONG_TAIL_MERGE, FILE_LONG_TAIL_MERGE_SPLIT, FILE_LONG_TAIL_MERGE_REVERSE_INDEX]:
+        input_file = os.path.join(file_path, file_name)
+        if os.path.exists(input_file) and not os.path.isfile(input_file):
+            raise Exception("文件不存在!文件路径:" + input_file)
+
+    # 总长尾词数量
+    word_total_num = 0
+
+    # 记录关键词位置
+    word_position_list = [0]
+    word_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE)
+    with (open(word_file, "r", encoding="utf-8") as f,
+          mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as fmmap):
+        while True:
+            # 获取当前位置
+            cur_pos = fmmap.tell()
+            # 移动到一下行
+            line = fmmap.readline()
+            # 结束检测
+            if not line:
+                break
+            # 记录
+            word_position_list.append(cur_pos)
+            word_total_num = word_total_num + 1
+
+    # 记录分词位置
+    word_split_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_SPLIT)
+    word_split_position_list = [0]
+    with (open(word_split_file, "r", encoding="utf-8") as f,
+          mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as fmmap):
+        while True:
+            # 获取当前位置
+            cur_pos = fmmap.tell()
+            # 移动到一下行
+            line = fmmap.readline()
+            # 结束检测
+            if not line:
+                break
+            # 记录
+            word_split_position_list.append(cur_pos)
+
+    # 获取倒排索引
+    word_reverse_index_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_REVERSE_INDEX)
+    word_reverse_index_dict = {}
+    # 分词
+    key_re = r"([^,]+),\["
+    key_pattern = re.compile(key_re, re.I)
+    # 索引
+    index_pattern = re.compile(r"\d+", re.I)
+    with open(word_reverse_index_file, "r", encoding="utf-8") as f:
+        for line in f:
+            key_m = key_pattern.match(line)
+            key = key_m.group(1)
+            val = index_pattern.findall(line[line.index(","):])
+            word_reverse_index_dict[key] = [int(v) for v in val]
+
+    # 已使用长尾词位图
+    unused_bitmap = BitMap(word_total_num + 1)
+
+    # 聚合阈值
+    agg_threshold = 0.8
+
+    # 提交子进程阈值
+    process_threshold = os.cpu_count() * 30
+
+    # 提交任务 并输出结果
+    word_agg_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_AGG)
+    with (ProcessPoolExecutor(max_workers=os.cpu_count()) as process_pool,
+          open(word_agg_file, "w", encoding="UTF-8") as fo,
+          open(word_file, "r", encoding="UTF-8") as f_word,
+          open(word_split_file, "r", encoding="UTF-8") as f_word_split,
+          mmap.mmap(f_word_split.fileno(), 0, access=mmap.ACCESS_READ) as f_split_mmap):
+
+        # 准备数据
+        for position_index, main_word in enumerate(f_word, 1):
+            # 判断是否已聚合,否则置为已使用
+            if unused_bitmap.test(position_index):
+                continue
+            else:
+                unused_bitmap.set(position_index)
+
+            # 移除换行符
+            main_word = utils.remove_line_break(main_word)
+            if main_word == "小孩在肚子里怎么呼吸":
+                print("出现拉拉")
+            # 获取分词结果
+            key_position = word_split_position_list[position_index]
+            f_split_mmap.seek(key_position)
+            temp_main_word_stem = utils.remove_line_break(f_split_mmap.readline().decode("UTF-8"))
+            # 为空则跳过
+            if len(temp_main_word_stem) == 0:
+                continue
+            main_word_stem_list = temp_main_word_stem.split(",")
+
+            # 从倒排索引中获取候选词的位置索引
+            candidate_position_set = set()
+            for main_word_stem in main_word_stem_list:
+                index_list = word_reverse_index_dict.get(main_word_stem)
+                if index_list:
+                    candidate_position_set.update(index_list)
+
+            # 排除已使用的长尾词
+            candidate_position_list = []
+            for candidate_position in candidate_position_set:
+                # 跳过已使用
+                if unused_bitmap.test(candidate_position):
+                    continue
+                candidate_position_list.append(candidate_position)
+
+            # 分割计算任务,没有计算任务则跳过
+            candidate_position_len = len(candidate_position_list)
+            if candidate_position_len == 0:
+                continue
+
+            # 暂存分析结果
+            p_result_list = []
+            # 任务量不足直接在主进程上进行计算,否则提交子进程计算
+            if candidate_position_len < process_threshold:
+                p_result = agg_word_cal(word_file, word_split_file, word_position_list, word_split_position_list, agg_threshold, main_word, main_word_stem_list, candidate_position_list)
+                if len(p_result) > 0:
+                    p_result_list.extend(p_result)
+            else:
+                task_split_list = utils.avg_split_task(candidate_position_len, math.ceil(candidate_position_len / os.cpu_count()))
+                all_task_list = [candidate_position_list[start_pos:end_pos] for start_pos, end_pos in task_split_list]
+
+                # 提交任务
+                process_futures = []
+                for task_list in all_task_list:
+                    if not task_list:
+                        continue
+                    p_future = process_pool.submit(agg_word_cal, word_file, word_split_file, word_position_list, word_split_position_list, agg_threshold, main_word, main_word_stem_list, task_list)
+                    process_futures.append(p_future)
+
+                for p_future in as_completed(process_futures):
+                    p_result = p_future.result()
+                    if p_result and len(p_result) > 0:
+                        p_result_list.extend(p_result)
+
+            # 处理分析结果,并标记已处理数据
+            if len(p_result_list) > 0:
+                fo.write("%s\n" % main_word)
+                for candidate_position, candidate_word in p_result_list:
+                    unused_bitmap.set(candidate_position)
+                    fo.write("%s\n" % candidate_word)
+                fo.write("\n")
+
+            # 清除上一轮的数据
+            p_result_list.clear()
+
+        # 关闭线程
+        process_pool.shutdown()
+
+
 if __name__ == "__main__":
-    filePath = "../data"
-    # filePath = "E:\Download\测试"
+    print("开始时间" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
+    # filePath = "../data"
+    filePath = "../data/test"
     # extract_word_from_5118(filePath)
-    merge_word(filePath)
+    # merge_word(filePath)
+    # prepare_word_split_and_reverse_index(filePath)
+    agg_word(filePath)
     # word_split_statistics(file_path)
-
+    # tasks = utils.avg_split_task(100, 12)
+    # 两者计算余弦值等于:0.8
+    # val = utils.cal_cos_sim("QQ邮箱格式怎么写", ["QQ", "邮箱", "格式", "怎么", "写"], "QQ邮箱格式如何写",
+    #                         ["QQ", "邮箱", "格式", "如何", "写"])
+    print("结束时间" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))

+ 73 - 1
src/utils.py

@@ -1,7 +1,9 @@
 # -*- coding:utf-8 -*-
-
+import math
 import os
 import pickle
+import re
+import numpy as np
 
 # 停用词存放文件夹
 STOP_WORD_DIR = "./conf/stopwords"
@@ -12,6 +14,9 @@ TEMP_PATH = "../tmp"
 # 停用词模型
 STOP_WORD_CACHE = "stop_word.pkl"
 
+# 正则表达式中需要额外处理的特殊符号
+RE_SPECIAL_SYMBOL = [".", "?", "^", "$", "*", "+", "\\", "[", "]", "|", "{", "}", "(", ")"]
+
 
 def save_obj(path, obj):
     """
@@ -63,3 +68,70 @@ def load_stop_word():
     save_obj(stop_word_cache_path, stop_word_dict)
 
     return stop_word_dict
+
+
+def avg_split_task(total: int, split_internal: int):
+    """
+    平分任务,包含开始位置,不包含结束位置
+    :param total: 任务总数量
+    :param split_internal: 每份数量
+    :return: (开始位置,结束位置)
+    """
+
+    # 分割的任务份数
+    split_num = math.ceil(total / split_internal)
+
+    # 平分
+    tasks = []
+    for i in range(split_num):
+        # 计算平分点在列表中的位置
+        start_pos = i * split_internal
+        end_pos = i * split_internal + split_internal
+        # 如果超过列表大小需要额外处理
+        if end_pos >= total:
+            end_pos = -1
+        tasks.append([start_pos, end_pos])
+
+    return tasks
+
+
+def cal_cos_sim(a_word: str, a_stem: list, b_word: str, b_stem: list):
+    """
+    计算余弦相似性
+    :param a_word: A词
+    :param a_stem: A词根列表
+    :param b_word: B词
+    :param b_stem: B词根列表
+    :return: 余弦值
+    """
+    # 合并词根
+    union_stem = list(set(a_stem).union(set(b_stem)))
+
+    # 生成词向量
+    a_vec, b_vec = [], []
+    for word in union_stem:
+        if word in RE_SPECIAL_SYMBOL:
+            word = "\\" + word
+        if word == "c++":
+            word = "c\\+\\+"
+        a_vec.append(len(re.findall(word, a_word)))
+        b_vec.append(len(re.findall(word, b_word)))
+
+    # 计算余弦相关性
+    vec1 = np.array(a_vec)
+    vec2 = np.array(b_vec)
+    val = (np.linalg.norm(vec1) * np.linalg.norm(vec2))
+    if val == 0:
+        return 0
+    return vec1.dot(vec2) / val
+
+
+def remove_line_break(line: str):
+    """
+    移除换行符
+    :param line: 待处理文本
+    :return: 替换后的结果
+    """
+    if line:
+        return line.replace("\r", "").replace("\n", "")
+    return line