Browse Source

feat:拆分功能结构

ChenYL 2 years ago
parent
commit
829410c507
3 changed files with 421 additions and 405 deletions
  1. 407 0
      src/agg.py
  2. 10 0
      src/constant.py
  3. 4 405
      src/money.py

+ 407 - 0
src/agg.py

@@ -0,0 +1,407 @@
+# -*- coding:utf-8 -*-
+import math
+import os
+import re
+from concurrent.futures import ProcessPoolExecutor, as_completed
+
+import jieba
+import redis
+from tqdm import tqdm
+
+import utils
+import logging
+
+from src.constant import FILE_LONG_TAIL_MERGE
+
+# 文件:长尾词_合并_分词.txt
+FILE_LONG_TAIL_MERGE_SPLIT = "长尾词_合并_分词.txt"
+
+# 文件:长尾词_合并_聚合.txt
+FILE_LONG_TAIL_MERGE_AGG = "长尾词_合并_聚合.txt"
+
+# 文件:长尾词_合并_分词倒排索引.txt
+FILE_LONG_TAIL_MERGE_REVERSE_INDEX = "长尾词_合并_倒排索引.txt"
+
+# 子文件:长尾词_合并_聚合_%s.txt
+FILE_LONG_TAIL_MERGE_AGG_PID = "长尾词_合并_聚合_%s.txt"
+
+# 缓存前缀:分词词根
+CACHE_WORD_STEM = "word:stem"
+
+# 缓存前缀:倒排索引
+CACHE_WORD_REVERSE_INDEX = "word:reverse_index"
+
+# 缓存:长尾词缓存
+CACHE_WORD = "word"
+
+# 缓存:聚合位图
+CACHE_UNUSED_BITMAP = "unused_bitmap"
+
+# 字符集:UTF-8
+CHARSET_UTF_8 = "UTF-8"
+
+
+def agg_word(file_path: str):
+    """
+    长尾词聚合
+    :param file_path:
+    :return:
+    """
+
+    # 总长尾词数量
+    word_total_num = 0
+
+    # 聚合阈值
+    agg_threshold = 0.8
+
+    # 每份任务计算量
+    task_cal_num = 10000
+
+    # 工作现成(减1是为了留一个处理器给redis)
+    worker_num = os.cpu_count() - 1
+    # worker_num = 1
+
+    # 正则表达式:聚合文件分文件
+    agg_file_pattern = re.compile(r"长尾词_合并_聚合_\d+.txt", re.I)
+
+    # redis缓存
+    redis_cache = redis.StrictRedis(host='127.0.0.1', port=6379)
+
+    # 判断文件是否存在
+    for file_name in [FILE_LONG_TAIL_MERGE, FILE_LONG_TAIL_MERGE_SPLIT,
+                      FILE_LONG_TAIL_MERGE_REVERSE_INDEX]:
+        input_file = os.path.join(file_path, file_name)
+        if os.path.exists(input_file) and not os.path.isfile(input_file):
+            raise Exception("文件不存在!文件路径:" + input_file)
+
+    # 删除历史数据文件
+    for file in os.listdir(file_path):
+        if agg_file_pattern.match(file):
+            os.remove(os.path.join(file_path, file))
+
+    # 缓存关键词位置
+    word_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE)
+    word_dict = {}
+    with open(word_file, "r", encoding="utf-8") as f:
+        for position, word in enumerate(f, start=1):
+            word = utils.remove_line_break(word)
+            if not word:
+                continue
+            word_dict[position] = word
+    redis_cache.hset(CACHE_WORD, mapping=word_dict)
+    # 记录总关键词数
+    word_total_num = len(word_dict)
+    # 释放内存
+    del word_dict
+    #
+    # # 缓存分词
+    # word_split_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_SPLIT)
+    # word_split_dict = {}
+    # with open(word_split_file, "r", encoding=CHARSET_UTF_8) as f:
+    #     for position, word_split_line in enumerate(f, start=1):
+    #         word_split_line = utils.remove_line_break(word_split_line)
+    #         if not word_split_line:
+    #             continue
+    #         word_split_dict[position] = word_split_line
+    # redis_cache.hset(CACHE_WORD_STEM, mapping=word_split_dict)
+    # # 释放内存
+    # del word_split_dict
+    #
+    # # 缓存倒排索引
+    # word_reverse_index_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_REVERSE_INDEX)
+    # word_reverse_index_dict = {}
+    # # 分词
+    # key_pattern = re.compile(r"([^,]+),\[", re.I)
+    # # 索引
+    # index_pattern = re.compile(r"\d+", re.I)
+    # with open(word_reverse_index_file, "r", encoding="utf-8") as f:
+    #     for word_split_line in f:
+    #         key_m = key_pattern.match(word_split_line)
+    #         key = key_m.group(1)
+    #         val = index_pattern.findall(word_split_line[word_split_line.index(","):])
+    #         word_reverse_index_dict[key] = ",".join(val)
+    # redis_cache.hset(CACHE_WORD_REVERSE_INDEX, mapping=word_reverse_index_dict)
+    # # 释放内存
+    # del word_reverse_index_dict
+
+    # 构建长尾词使用位图
+    redis_cache.setbit(CACHE_UNUSED_BITMAP, word_total_num + 1, 1)
+
+    # 提交任务 并输出结果
+    word_agg_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_AGG)
+
+    with ProcessPoolExecutor(max_workers=worker_num) as process_pool:
+        # 计算任务边界
+        task_list = utils.avg_split_task(word_total_num, task_cal_num, 1)
+
+        # 提交任务
+        process_futures = []
+        for skip_line, pos in enumerate(task_list, start=1):
+            # skip_line =  (skip_line % 4 ) + 1
+            skip_line = 1
+            p_future = process_pool.submit(agg_word_process, file_path, agg_threshold, pos[0], pos[1], word_total_num,
+                                           skip_line)
+            process_futures.append(p_future)
+
+        # 显示任务进度
+        with tqdm(total=len(process_futures), desc='文本聚合进度', unit='份', unit_scale=True, position=0) as pbar:
+            for p_future in as_completed(process_futures):
+                p_result = p_future.result()
+                # 更新发呆进度
+                pbar.update(1)
+
+        # 关闭线程
+        process_pool.shutdown()
+
+    # 获取子进程结果文件列表,并合并
+    with open(word_agg_file, "w", encoding="UTF-8") as fo:
+        for file in os.listdir(file_path):
+            # 不是处理结果部分跳过
+            if not agg_file_pattern.match(file):
+                continue
+
+            with open(os.path.join(file_path, file), "r", encoding="UTF-8") as fi:
+                for word in fi:
+                    fo.write(word)
+
+
+def prepare_word_split_and_reverse_index(file_path: str):
+    """
+    预处理:长尾词分词、建立倒排索引
+    :param file_path: 待处理文件夹路径
+    :return:
+    """
+
+    # 判断文件是否存在
+    word_input_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE)
+    if os.path.exists(word_input_file) and not os.path.isfile(word_input_file):
+        print("文件不存在! " + word_input_file)
+        return
+
+    # 总文本数量
+    total_line_num = 0
+    with open(word_input_file, "r", encoding="utf-8") as fi:
+        total_line_num = sum(1 for line in fi)
+
+    if total_line_num == 0:
+        print("没有待处理的数据,文本量为0")
+        return
+
+    # 分割任务数量
+    task_list = utils.avg_split_task(total_line_num, math.ceil(total_line_num / os.cpu_count()))
+
+    # 任务进程处理结果
+    p_result_list = []
+
+    # 多进程处理
+    with ProcessPoolExecutor(os.cpu_count()) as process_pool:
+        # 提交任务
+        process_futures = [process_pool.submit(word_split_reverse, word_input_file, task[0], task[1]) for task in
+                           task_list]
+
+        # 处理返回结果
+        for p_future in as_completed(process_futures):
+            p_result = p_future.result()
+            if p_result:
+                p_result_list.append(p_result)
+
+        # 分词结果排序
+        p_result_list = sorted(p_result_list, key=lambda v: v[0])
+        # 输出分词结果
+        split_output_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_SPLIT)
+        with open(split_output_file, "w", encoding="UTF-8") as fo:
+            for start_pos, word_arr_list, reverse_index in p_result_list:
+                for word_arr in word_arr_list:
+                    fo.write("%s\n" % ",".join([str(i) for i in word_arr]))
+
+        # 关键词倒排索引
+        word_reverse_index_dict = dict()
+        # 合并倒排索引
+        for start_pos, word_arr, reverse_index_dict in p_result_list:
+            for key, value in reverse_index_dict.items():
+                reverse_index_arr = word_reverse_index_dict.get(key)
+                if reverse_index_arr:
+                    reverse_index_arr.extend(value)
+                else:
+                    word_reverse_index_dict[key] = value
+        # 输出倒排索引
+        with open(os.path.join(file_path, FILE_LONG_TAIL_MERGE_REVERSE_INDEX), "w", encoding="UTF-8") as fo:
+            for key, value in word_reverse_index_dict.items():
+                fo.write("%s,%s\n" % (key, value))
+
+        # 关闭进程池
+        process_pool.shutdown()
+
+def word_split_reverse(input_file: str, start_pos: int, end_pos: int):
+    """
+    分词和建立倒排索引
+    :param input_file: 待处理的文件
+    :param start_pos: 处理的开始位置
+    :param end_pos: 处理的结束位置
+    :return: (开始位置,分词结果,倒排索引)
+    """
+
+    # 加载停用词
+    stop_word_dict = utils.load_stop_word()
+    # 关键词存放容器
+    word_arr_list = []
+    # 倒排索引
+    word_reverse_index = dict()
+
+    with open(input_file, "r", encoding="utf-8") as fr:
+        for i, tmp_word in enumerate(fr):
+
+            # start_pos是行数,而i要从0开始
+            if i + 1 < start_pos:
+                continue
+
+            # 当前位置
+            cur_pos = i + 1
+
+            # 到达任务边界,结束
+            if cur_pos == end_pos:
+                break
+
+            # 分词
+            word_list = jieba.cut_for_search(tmp_word.replace("\n", ""))
+
+            # 分词过滤结果
+            word_filter_arr = []
+
+            # 过滤停用词
+            for word in word_list:
+                if word in stop_word_dict:
+                    continue
+
+                word_index_arr = word_reverse_index.get(word)
+                if word_index_arr:
+                    word_index_arr.append(cur_pos)
+                else:
+                    word_reverse_index[word] = [cur_pos]
+
+                word_filter_arr.append(word)
+
+            if len(word_filter_arr) == 0:
+                word_arr_list.append([])
+            else:
+                word_arr_list.append(word_filter_arr)
+
+    return start_pos, word_arr_list, word_reverse_index
+
+def agg_word_process(file_path: str, agg_threshold: float, start_pos: int, end_pos: int, final_pos: int,
+                     skip_line: int):
+    """
+    长尾词聚合处理
+    :param file_path: 文件路径
+    :param word_file: 长尾词文件路径
+    :param agg_threshold: 聚合阈值
+    :param start_pos: 任务处理开始边界(包含)
+    :param end_pos: 任务处理结束边界(不包含)
+    :param final_pos: 总任务边界
+    :param skip_line: 进度条显示位置
+    :return:
+    """
+
+    # 生成临时结果文件
+    word_agg_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_AGG_PID % os.getpid())
+
+    # redis缓存池
+    redis_pool = redis.ConnectionPool(host='127.0.0.1', port=6379, max_connections=1)
+
+    # redis缓存
+    redis_cache = redis.StrictRedis(connection_pool=redis_pool)
+
+    # 进度长度
+    process_len = 0
+    if end_pos == -1:
+        process_len = final_pos - start_pos
+    else:
+        process_len = end_pos - start_pos
+
+    with (open(word_agg_file, "a", encoding="UTF-8") as fo,
+          tqdm(total=process_len, desc='子进程-%s:文本聚合进度' % os.getpid(), unit='份', unit_scale=True,
+               position=skip_line) as pbar):
+
+        for main_word_position in range(start_pos, end_pos):
+            try:
+                # 判断主词是否为已使用,是则跳过,否则设置为已使用
+                if redis_cache.getbit(CACHE_UNUSED_BITMAP, main_word_position):
+                    continue
+                else:
+                    redis_cache.setbit(CACHE_UNUSED_BITMAP, main_word_position, 1)
+
+                # 获取主词,移除换行符
+                main_word = redis_cache.hget(CACHE_WORD, main_word_position)
+                if not main_word:
+                    continue
+                main_word = main_word.decode(CHARSET_UTF_8)
+
+                # 获取主词分词结果
+                main_word_stem = redis_cache.hget(CACHE_WORD_STEM, main_word_position)
+                # 为空则跳过
+                if not main_word_stem:
+                    continue
+                main_word_stem_list = main_word_stem.decode(CHARSET_UTF_8).split(",")
+
+                # 从倒排索引中获取候选词的位置索引
+                candidate_position_set = set()
+                temp_candidate_position_list = redis_cache.hmget(CACHE_WORD_REVERSE_INDEX, main_word_stem_list)
+                for temp_candidate_position in temp_candidate_position_list:
+                    if not temp_candidate_position:
+                        continue
+                    candidate_position_set.update(temp_candidate_position.decode(CHARSET_UTF_8).split(","))
+
+                # 没有找到需要计算的候选词则跳过
+                if not candidate_position_set:
+                    continue
+
+                # 结果列表
+                result_list = []
+
+                # 计算相似度
+                for candidate_position in candidate_position_set:
+                    # 跳过重复
+                    if main_word_position == candidate_position:
+                        continue
+
+                    # 获取关键词
+                    candidate_word = redis_cache.hget(CACHE_WORD, candidate_position)
+                    if not candidate_word:
+                        continue
+                    candidate_word = candidate_word.decode(CHARSET_UTF_8)
+
+                    # 获取分词结果
+                    candidate_word_stem = redis_cache.hget(CACHE_WORD_STEM, candidate_position)
+                    # 为空则跳过
+                    if not candidate_word_stem:
+                        continue
+                    candidate_word_stem_list = candidate_word_stem.decode(CHARSET_UTF_8).split(",")
+
+                    # 计算相关性
+                    try:
+                        val = utils.cal_cos_sim(main_word, main_word_stem_list, candidate_word,
+                                                candidate_word_stem_list)
+                        if val >= agg_threshold:
+                            redis_cache.setbit(CACHE_UNUSED_BITMAP, candidate_position, 1)
+                            result_list.append(candidate_word)
+                    except Exception as e:
+                        logging.error("主关键词:%s 发生异常,涉及的副关键词信息-关键词:%s,分词:%s" % (
+                            main_word, candidate_word, candidate_word_stem_list), e)
+
+                # 保存结果
+                if len(result_list) > 0:
+                    fo.write("%s\n" % main_word)
+                    for candidate_word in result_list:
+                        fo.write("%s\n" % candidate_word)
+                    fo.write("\n")
+
+                # 清除容器数据
+                candidate_position_set.clear()
+                result_list.clear()
+
+                # 更新发呆进度
+                pbar.update(1)
+            except Exception as e:
+                logging.error("子进程发生异常", e)
+
+    return

+ 10 - 0
src/constant.py

@@ -0,0 +1,10 @@
+# -*- coding:utf-8 -*-
+
+# 文件后缀:长尾词_合并.txt
+FILE_LONG_TAIL_MERGE = "长尾词_合并.txt"
+
+
+
+
+
+

+ 4 - 405
src/money.py

@@ -1,53 +1,17 @@
 # -*- coding:utf-8 -*-
-import math
-import mmap
 import os
-import re
 import time
-from concurrent.futures import ProcessPoolExecutor, as_completed
+import zipfile
 
-import redis
-from bitmap import BitMap
-from tqdm import tqdm
+import jieba
 
 import utils
-import jieba
-import zipfile
-import logging
+from agg import agg_word
+from src.constant import FILE_LONG_TAIL_MERGE
 
 # 文件后缀:长尾词.txt
 FILE_SUFFIX_LONG_TAIL = "_长尾词.txt"
 
-# 文件后缀:长尾词_合并.txt
-FILE_LONG_TAIL_MERGE = "长尾词_合并.txt"
-
-# 文件:长尾词_合并_分词.txt
-FILE_LONG_TAIL_MERGE_SPLIT = "长尾词_合并_分词.txt"
-
-# 文件:长尾词_合并_分词倒排索引.txt
-FILE_LONG_TAIL_MERGE_REVERSE_INDEX = "长尾词_合并_倒排索引.txt"
-
-# 文件:长尾词_合并_聚合.txt
-FILE_LONG_TAIL_MERGE_AGG = "长尾词_合并_聚合.txt"
-
-# 子文件:长尾词_合并_聚合_%s.txt
-FILE_LONG_TAIL_MERGE_AGG_PID = "长尾词_合并_聚合_%s.txt"
-
-# 缓存前缀:分词词根
-CACHE_WORD_STEM = "word:stem"
-
-# 缓存前缀:倒排索引
-CACHE_WORD_REVERSE_INDEX = "word:reverse_index"
-
-# 缓存:长尾词缓存
-CACHE_WORD = "word"
-
-# 缓存:聚合位图
-CACHE_UNUSED_BITMAP = "unused_bitmap"
-
-# 字符集:UTF-8
-CHARSET_UTF_8 = "UTF-8"
-
 
 def extract_word_from_5118(file_path: str):
     """
@@ -163,370 +127,6 @@ def word_split_statistics(file_path: str):
                 f.write("%s,%d\n" % (key, count))
 
 
-def word_split_reverse(input_file: str, start_pos: int, end_pos: int):
-    """
-    分词和建立倒排索引
-    :param input_file: 待处理的文件
-    :param start_pos: 处理的开始位置
-    :param end_pos: 处理的结束位置
-    :return: (开始位置,分词结果,倒排索引)
-    """
-
-    # 加载停用词
-    stop_word_dict = utils.load_stop_word()
-    # 关键词存放容器
-    word_arr_list = []
-    # 倒排索引
-    word_reverse_index = dict()
-
-    with open(input_file, "r", encoding="utf-8") as fr:
-        for i, tmp_word in enumerate(fr):
-
-            # start_pos是行数,而i要从0开始
-            if i + 1 < start_pos:
-                continue
-
-            # 当前位置
-            cur_pos = i + 1
-
-            # 到达任务边界,结束
-            if cur_pos == end_pos:
-                break
-
-            # 分词
-            word_list = jieba.cut_for_search(tmp_word.replace("\n", ""))
-
-            # 分词过滤结果
-            word_filter_arr = []
-
-            # 过滤停用词
-            for word in word_list:
-                if word in stop_word_dict:
-                    continue
-
-                word_index_arr = word_reverse_index.get(word)
-                if word_index_arr:
-                    word_index_arr.append(cur_pos)
-                else:
-                    word_reverse_index[word] = [cur_pos]
-
-                word_filter_arr.append(word)
-
-            if len(word_filter_arr) == 0:
-                word_arr_list.append([])
-            else:
-                word_arr_list.append(word_filter_arr)
-
-    return start_pos, word_arr_list, word_reverse_index
-
-
-def prepare_word_split_and_reverse_index(file_path: str):
-    """
-    预处理:长尾词分词、建立倒排索引
-    :param file_path: 待处理文件夹路径
-    :return:
-    """
-
-    # 判断文件是否存在
-    word_input_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE)
-    if os.path.exists(word_input_file) and not os.path.isfile(word_input_file):
-        print("文件不存在! " + word_input_file)
-        return
-
-    # 总文本数量
-    total_line_num = 0
-    with open(word_input_file, "r", encoding="utf-8") as fi:
-        total_line_num = sum(1 for line in fi)
-
-    if total_line_num == 0:
-        print("没有待处理的数据,文本量为0")
-        return
-
-    # 分割任务数量
-    task_list = utils.avg_split_task(total_line_num, math.ceil(total_line_num / os.cpu_count()))
-
-    # 任务进程处理结果
-    p_result_list = []
-
-    # 多进程处理
-    with ProcessPoolExecutor(os.cpu_count()) as process_pool:
-        # 提交任务
-        process_futures = [process_pool.submit(word_split_reverse, word_input_file, task[0], task[1]) for task in
-                           task_list]
-
-        # 处理返回结果
-        for p_future in as_completed(process_futures):
-            p_result = p_future.result()
-            if p_result:
-                p_result_list.append(p_result)
-
-        # 分词结果排序
-        p_result_list = sorted(p_result_list, key=lambda v: v[0])
-        # 输出分词结果
-        split_output_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_SPLIT)
-        with open(split_output_file, "w", encoding="UTF-8") as fo:
-            for start_pos, word_arr_list, reverse_index in p_result_list:
-                for word_arr in word_arr_list:
-                    fo.write("%s\n" % ",".join([str(i) for i in word_arr]))
-
-        # 关键词倒排索引
-        word_reverse_index_dict = dict()
-        # 合并倒排索引
-        for start_pos, word_arr, reverse_index_dict in p_result_list:
-            for key, value in reverse_index_dict.items():
-                reverse_index_arr = word_reverse_index_dict.get(key)
-                if reverse_index_arr:
-                    reverse_index_arr.extend(value)
-                else:
-                    word_reverse_index_dict[key] = value
-        # 输出倒排索引
-        with open(os.path.join(file_path, FILE_LONG_TAIL_MERGE_REVERSE_INDEX), "w", encoding="UTF-8") as fo:
-            for key, value in word_reverse_index_dict.items():
-                fo.write("%s,%s\n" % (key, value))
-
-        # 关闭进程池
-        process_pool.shutdown()
-
-
-def agg_word_process(file_path: str, agg_threshold: float, start_pos: int, end_pos: int, final_pos:int, skip_line: int):
-    """
-    长尾词聚合处理
-    :param file_path: 文件路径
-    :param word_file: 长尾词文件路径
-    :param agg_threshold: 聚合阈值
-    :param start_pos: 任务处理开始边界(包含)
-    :param end_pos: 任务处理结束边界(不包含)
-    :param final_pos: 总任务边界
-    :param skip_line: 进度条显示位置
-    :return:
-    """
-
-    # 生成临时结果文件
-    word_agg_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_AGG_PID % os.getpid())
-
-    # redis缓存池
-    redis_pool = redis.ConnectionPool(host='127.0.0.1', port=6379, max_connections=1)
-
-    # redis缓存
-    redis_cache = redis.StrictRedis(connection_pool=redis_pool)
-
-    # 进度长度
-    process_len = 0
-    if end_pos == -1:
-        process_len = final_pos - start_pos
-    else:
-        process_len = end_pos - start_pos
-
-    with (open(word_agg_file, "a", encoding="UTF-8") as fo,
-            tqdm(total=process_len, desc='子进程-%s:文本聚合进度' % os.getpid(), unit='份', unit_scale=True, position=skip_line) as pbar):
-
-        for main_word_position in range(start_pos, end_pos):
-            try:
-                # 判断主词是否为已使用,是则跳过,否则设置为已使用
-                if redis_cache.getbit(CACHE_UNUSED_BITMAP, main_word_position):
-                    continue
-                else:
-                    redis_cache.setbit(CACHE_UNUSED_BITMAP, main_word_position, 1)
-
-                # 获取主词,移除换行符
-                main_word = redis_cache.hget(CACHE_WORD, main_word_position)
-                if not main_word:
-                    continue
-                main_word = main_word.decode(CHARSET_UTF_8)
-
-                # 获取主词分词结果
-                main_word_stem = redis_cache.hget(CACHE_WORD_STEM, main_word_position)
-                # 为空则跳过
-                if not main_word_stem:
-                    continue
-                main_word_stem_list = main_word_stem.decode(CHARSET_UTF_8).split(",")
-
-                # 从倒排索引中获取候选词的位置索引
-                candidate_position_set = set()
-                temp_candidate_position_list = redis_cache.hmget(CACHE_WORD_REVERSE_INDEX, main_word_stem_list)
-                for temp_candidate_position in temp_candidate_position_list:
-                    if not temp_candidate_position:
-                        continue
-                    candidate_position_set.update(temp_candidate_position.decode(CHARSET_UTF_8).split(","))
-
-                # 没有找到需要计算的候选词则跳过
-                if not candidate_position_set:
-                    continue
-
-                # 结果列表
-                result_list = []
-
-                # 计算相似度
-                for candidate_position in candidate_position_set:
-                    # 跳过重复
-                    if main_word_position == candidate_position:
-                        continue
-
-                    # 获取关键词
-                    candidate_word = redis_cache.hget(CACHE_WORD, candidate_position)
-                    if not candidate_word:
-                        continue
-                    candidate_word = candidate_word.decode(CHARSET_UTF_8)
-
-                    # 获取分词结果
-                    candidate_word_stem = redis_cache.hget(CACHE_WORD_STEM, candidate_position)
-                    # 为空则跳过
-                    if not candidate_word_stem:
-                        continue
-                    candidate_word_stem_list = candidate_word_stem.decode(CHARSET_UTF_8).split(",")
-
-                    # 计算相关性
-                    try:
-                        val = utils.cal_cos_sim(main_word, main_word_stem_list, candidate_word,
-                                                candidate_word_stem_list)
-                        if val >= agg_threshold:
-                            redis_cache.setbit(CACHE_UNUSED_BITMAP, candidate_position, 1)
-                            result_list.append(candidate_word)
-                    except Exception as e:
-                        logging.error("主关键词:%s 发生异常,涉及的副关键词信息-关键词:%s,分词:%s" % (
-                            main_word, candidate_word, candidate_word_stem_list), e)
-
-                # 保存结果
-                if len(result_list) > 0:
-                    fo.write("%s\n" % main_word)
-                    for candidate_word in result_list:
-                        fo.write("%s\n" % candidate_word)
-                    fo.write("\n")
-
-                # 清除容器数据
-                candidate_position_set.clear()
-                result_list.clear()
-
-                # 更新发呆进度
-                pbar.update(1)
-            except Exception as e:
-                logging.error("子进程发生异常", e)
-
-    return
-
-
-def agg_word(file_path: str):
-    """
-    长尾词聚合
-    :param file_path:
-    :return:
-    """
-
-    # 总长尾词数量
-    word_total_num = 0
-
-    # 聚合阈值
-    agg_threshold = 0.8
-
-    # 每份任务计算量
-    task_cal_num = 10000
-
-    # 工作现成(减1是为了留一个处理器给redis)
-    worker_num = os.cpu_count() - 1
-    # worker_num = 1
-
-    # 正则表达式:聚合文件分文件
-    agg_file_pattern = re.compile(r"长尾词_合并_聚合_\d+.txt", re.I)
-
-    # redis缓存
-    redis_cache = redis.StrictRedis(host='127.0.0.1', port=6379)
-
-    # 判断文件是否存在
-    for file_name in [FILE_LONG_TAIL_MERGE, FILE_LONG_TAIL_MERGE_SPLIT, FILE_LONG_TAIL_MERGE_REVERSE_INDEX]:
-        input_file = os.path.join(file_path, file_name)
-        if os.path.exists(input_file) and not os.path.isfile(input_file):
-            raise Exception("文件不存在!文件路径:" + input_file)
-
-    # 删除历史数据文件
-    for file in os.listdir(file_path):
-        if agg_file_pattern.match(file):
-            os.remove(os.path.join(file_path, file))
-
-    # 缓存关键词位置
-    word_file = os.path.join(filePath, FILE_LONG_TAIL_MERGE)
-    word_dict = {}
-    with open(word_file, "r", encoding="utf-8") as f:
-        for position, word in enumerate(f, start=1):
-            word = utils.remove_line_break(word)
-            if not word:
-                continue
-            word_dict[position] = word
-    redis_cache.hset(CACHE_WORD, mapping=word_dict)
-    # 记录总关键词数
-    word_total_num = len(word_dict)
-    # 释放内存
-    del word_dict
-    #
-    # # 缓存分词
-    # word_split_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_SPLIT)
-    # word_split_dict = {}
-    # with open(word_split_file, "r", encoding=CHARSET_UTF_8) as f:
-    #     for position, word_split_line in enumerate(f, start=1):
-    #         word_split_line = utils.remove_line_break(word_split_line)
-    #         if not word_split_line:
-    #             continue
-    #         word_split_dict[position] = word_split_line
-    # redis_cache.hset(CACHE_WORD_STEM, mapping=word_split_dict)
-    # # 释放内存
-    # del word_split_dict
-    #
-    # # 缓存倒排索引
-    # word_reverse_index_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_REVERSE_INDEX)
-    # word_reverse_index_dict = {}
-    # # 分词
-    # key_pattern = re.compile(r"([^,]+),\[", re.I)
-    # # 索引
-    # index_pattern = re.compile(r"\d+", re.I)
-    # with open(word_reverse_index_file, "r", encoding="utf-8") as f:
-    #     for word_split_line in f:
-    #         key_m = key_pattern.match(word_split_line)
-    #         key = key_m.group(1)
-    #         val = index_pattern.findall(word_split_line[word_split_line.index(","):])
-    #         word_reverse_index_dict[key] = ",".join(val)
-    # redis_cache.hset(CACHE_WORD_REVERSE_INDEX, mapping=word_reverse_index_dict)
-    # # 释放内存
-    # del word_reverse_index_dict
-
-    # 构建长尾词使用位图
-    redis_cache.setbit(CACHE_UNUSED_BITMAP, word_total_num + 1, 1)
-
-    # 提交任务 并输出结果
-    word_agg_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_AGG)
-
-    with ProcessPoolExecutor(max_workers=worker_num) as process_pool:
-        # 计算任务边界
-        task_list = utils.avg_split_task(word_total_num, task_cal_num, 1)
-
-        # 提交任务
-        process_futures = []
-        for skip_line, pos in enumerate(task_list, start=1):
-            # skip_line =  (skip_line % 4 ) + 1
-            skip_line = 1
-            p_future = process_pool.submit(agg_word_process, file_path, agg_threshold, pos[0], pos[1], word_total_num, skip_line)
-            process_futures.append(p_future)
-
-        # 显示任务进度
-        with tqdm(total=len(process_futures), desc='文本聚合进度', unit='份', unit_scale=True, position=0) as pbar:
-            for p_future in as_completed(process_futures):
-                p_result = p_future.result()
-                # 更新发呆进度
-                pbar.update(1)
-
-        # 关闭线程
-        process_pool.shutdown()
-
-    # 获取子进程结果文件列表,并合并
-    with open(word_agg_file, "w", encoding="UTF-8") as fo:
-        for file in os.listdir(file_path):
-            # 不是处理结果部分跳过
-            if not agg_file_pattern.match(file):
-                continue
-
-            with open(os.path.join(file_path, file), "r", encoding="UTF-8") as fi:
-                for word in fi:
-                    fo.write(word)
-
-
 if __name__ == "__main__":
     print("开始时间" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
     # filePath = "../data"
@@ -541,4 +141,3 @@ if __name__ == "__main__":
     # val = utils.cal_cos_sim("QQ邮箱格式怎么写", ["QQ", "邮箱", "格式", "怎么", "写"], "QQ邮箱格式如何写",
     #                         ["QQ", "邮箱", "格式", "如何", "写"])
     print("结束时间" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
-