Переглянути джерело

feat:聚合优化,嵌入Java调用

ChenYL 2 роки тому
батько
коміт
f675450774
2 змінених файлів з 82 додано та 285 видалено
  1. 78 283
      src/agg.py
  2. 4 2
      src/money.py

+ 78 - 283
src/agg.py

@@ -3,22 +3,18 @@ import math
 import os
 import re
 import shutil
-import threading
 import time
-from concurrent.futures import ProcessPoolExecutor, as_completed, ThreadPoolExecutor
+from concurrent.futures import ProcessPoolExecutor, as_completed
 
 import jieba
 import redis
-from bitarray import bitarray
 from tqdm import tqdm
 
 import utils
-import logging
-
 from constant import FILE_LONG_TAIL_MERGE
 
 # 文件:长尾词_合并_分词.txt
-FILE_LONG_TAIL_MERGE_SPLIT = "长尾词_合并_分词.txt"
+FILE_LONG_TAIL_MERGE_STEM = "长尾词_合并_分词.txt"
 
 # 文件:长尾词_合并_聚合.txt
 FILE_LONG_TAIL_MERGE_AGG = "长尾词_合并_聚合.txt"
@@ -33,29 +29,22 @@ FILE_LONG_TAIL_MERGE_REVERSE_INDEX = "长尾词_合并_倒排索引.txt"
 FILE_LONG_TAIL_MERGE_AGG_PID = "长尾词_合并_聚合_%s_%s.txt"
 
 # 缓存前缀:分词词根
-CACHE_WORD_STEM = "word:stem"
+CACHE_WORD_STEM = "word:stem:%s"
 
 # 缓存前缀:倒排索引
-CACHE_WORD_REVERSE_INDEX = "word:reverse_index"
+CACHE_WORD_REVERSE_INDEX = "word:reverse_index:%s"
 
 # 缓存:长尾词缓存
-CACHE_WORD = "word"
+CACHE_WORD = "word:normal_index:%s"
 
 # 缓存:聚合位图
-CACHE_UNUSED_BITMAP = "unused_bitmap"
+CACHE_UNUSED_BITMAP = "word:bitmap"
 
 # 字符集:UTF-8
 CHARSET_UTF_8 = "UTF-8"
 
-# redis缓存池
-redis_pool: redis.ConnectionPool = None
-
-# 线程池
-thread_pool: ThreadPoolExecutor = None
-
-# 线程本地变量
-local_var = threading.local()
-
+# 正则表达式:聚合文件分文件
+agg_file_pattern = re.compile(r"长尾词_合并_聚合_\d+_\d+.txt", re.I)
 
 def agg_word(file_path: str):
     """
@@ -74,24 +63,11 @@ def agg_word(file_path: str):
     # 每份任务计算量
     task_cal_num = 10000
 
-    # 工作现成(减1是为了留一个处理器给redis)
-    worker_num = os.cpu_count() - 1
-    # worker_num = 1
-
-    # 正则表达式:聚合文件分文件
-    agg_file_pattern = re.compile(r"长尾词_合并_聚合_\d+_\d+.txt", re.I)
-
-    # 最大线程数
-    max_threads = 2
-
-    # redis最大连接数(和工作线程数保持一致,免得浪费)
-    redis_max_conns = max_threads
-
     # redis缓存
-    m_redis_cache = redis.StrictRedis(host='127.0.0.1', port=6379)
+    redis_cache = redis.StrictRedis(host='127.0.0.1', port=6379, decode_responses=True)
 
     # 判断文件是否存在
-    for file_name in [FILE_LONG_TAIL_MERGE, FILE_LONG_TAIL_MERGE_SPLIT,
+    for file_name in [FILE_LONG_TAIL_MERGE, FILE_LONG_TAIL_MERGE_STEM,
                       FILE_LONG_TAIL_MERGE_REVERSE_INDEX]:
         input_file = os.path.join(file_path, file_name)
         if os.path.exists(input_file) and not os.path.isfile(input_file):
@@ -105,91 +81,81 @@ def agg_word(file_path: str):
         for history_agg_file in history_agg_file_list:
             shutil.move(os.path.join(file_path, history_agg_file), archive_path)
 
-    # 缓存关键词位置
-    # word_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE)
-    # word_dict = {}
-    # with open(word_file, "r", encoding="utf-8") as f:
-    #     for position, word in enumerate(f, start=1):
-    #         word = utils.remove_line_break(word)
-    #         if not word:
-    #             continue
-    #         word_dict[position] = word
-    # m_redis_cache.hset(CACHE_WORD, mapping=word_dict)
-    # # 记录总关键词数
-    # word_total_num = len(word_dict)
-    # # 释放内存
-    # del word_dict
-    #
-    # # 缓存分词
-    # word_split_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_SPLIT)
-    # word_split_dict = {}
-    # with open(word_split_file, "r", encoding=CHARSET_UTF_8) as f:
-    #     for position, word_split_line in enumerate(f, start=1):
-    #         word_split_line = utils.remove_line_break(word_split_line)
-    #         if not word_split_line:
-    #             continue
-    #         word_split_dict[position] = word_split_line
-    # m_redis_cache.hset(CACHE_WORD_STEM, mapping=word_split_dict)
-    # # 释放内存
-    # del word_split_dict
-    #
-    # # 缓存倒排索引
-    # word_reverse_index_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_REVERSE_INDEX)
-    # word_reverse_index_dict = {}
-    # # 分词
-    # key_pattern = re.compile(r"([^,]+),\[", re.I)
-    # # 索引
-    # index_pattern = re.compile(r"\d+", re.I)
-    # with open(word_reverse_index_file, "r", encoding="utf-8") as f:
-    #     for word_split_line in f:
-    #         key_m = key_pattern.match(word_split_line)
-    #         key = key_m.group(1)
-    #         val = index_pattern.findall(word_split_line[word_split_line.index(","):])
-    #         word_reverse_index_dict[key] = ",".join(val)
-    # m_redis_cache.hset(CACHE_WORD_REVERSE_INDEX, mapping=word_reverse_index_dict)
-    # # 释放内存
-    # del word_reverse_index_dict
-
-    # 先清除,然后重新构建长尾词使用位图
-    m_redis_cache.delete(CACHE_UNUSED_BITMAP)
-    m_redis_cache.setbit(CACHE_UNUSED_BITMAP, word_total_num + 2, 1)
+    with redis_cache.pipeline(transaction=False) as redis_pipeline:
+        # 删除历史缓存
+        keys = redis_cache.keys("word*")
+        for key in keys:
+            redis_pipeline.delete(key)
+        redis_pipeline.execute()
+
+        # 缓存关键词位置
+        word_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE)
+        word_dict = {}
+        with open(word_file, "r", encoding="utf-8") as f:
+            for position, word in enumerate(f, start=1):
+                word = utils.remove_line_break(word)
+                if not word:
+                    continue
+                word_dict[CACHE_WORD % position] = word
+        redis_cache.mset(mapping=word_dict)
+        # 记录总关键词数
+        word_total_num = len(word_dict)
+        # 释放内存
+        del word_dict
+
+        # 缓存分词
+        word_stem_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_STEM)
+        with open(word_stem_file, "r", encoding=CHARSET_UTF_8) as f:
+            for position, word_stem_line in enumerate(f, start=1):
+                word_stem_line = utils.remove_line_break(word_stem_line)
+                if not word_stem_line:
+                    continue
+                redis_key = CACHE_WORD_STEM % position
+                for word_stem in word_stem_line.split(","):
+                    redis_pipeline.lpush(redis_key, word_stem)
+        redis_pipeline.execute()
+
+        # 缓存倒排索引
+        word_reverse_index_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_REVERSE_INDEX)
+        # 分词
+        key_pattern = re.compile(r"([^,]+),\[", re.I)
+        # 索引
+        index_pattern = re.compile(r"\d+", re.I)
+        with open(word_reverse_index_file, "r", encoding="utf-8") as f:
+            for word_stem_line in f:
+                key_m = key_pattern.match(word_stem_line)
+                key = key_m.group(1)
+                val = index_pattern.findall(word_stem_line[word_stem_line.index(","):])
+                redis_pipeline.sadd(CACHE_WORD_REVERSE_INDEX % key, val)
+        redis_pipeline.execute()
+
+        # 先清除,然后重新构建长尾词使用位图
+        redis_pipeline.delete(CACHE_UNUSED_BITMAP)
+        redis_pipeline.setbit(CACHE_UNUSED_BITMAP, word_total_num + 2, 1)
+        redis_pipeline.execute()
 
-    # 提交任务 并输出结果
-    word_agg_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_AGG)
+    return
 
-    with ProcessPoolExecutor(max_workers=worker_num, initializer=init_process,
-                             initargs=(redis_max_conns, max_threads, file_path)) as process_pool:
-        # 计算任务边界
-        task_list = utils.avg_split_task(word_total_num, task_cal_num, 1)
+    # subprocess.run(["java", "-jar", "D:\\Documents\\ChenYL\\CodeRepository\\money-mining-java\\target\\money-mining-1.0-SNAPSHOT.jar"])
 
-        # 提交任务
-        process_futures = []
-        for skip_line, pos in enumerate(task_list, start=1):
-            skip_line = (skip_line % worker_num) + 1
-            p_future = process_pool.submit(agg_word_process, agg_threshold, pos[0], pos[1], word_total_num,
-                                           skip_line)
-            process_futures.append(p_future)
-
-        # 显示任务进度
-        with tqdm(total=len(process_futures), desc='文本聚合进度', unit='份', unit_scale=True) as pbar:
-            for p_future in as_completed(process_futures):
-                p_result = p_future.result()
-                # 更新发呆进度
-                pbar.update(1)
-
-        # 关闭线程
-        process_pool.shutdown()
+    # 获取分结果文件列表并合并
+    result_list = [file for file in os.listdir(file_path) if agg_file_pattern.match(file)]
 
-    # 获取子进程结果文件列表,并合并
-    with open(word_agg_file, "w", encoding="UTF-8") as fo:
-        for file in os.listdir(file_path):
-            # 不是处理结果部分跳过
-            if not agg_file_pattern.match(file):
-                continue
+    if not result_list:
+        print("没有找到待合并的计算结果分文件")
+        return
+
+    # 提交任务 并输出结果
+    word_agg_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_AGG)
 
+    # 显示任务进度,
+    with (tqdm(total=len(result_list), desc='主进程:合并分文件', unit='份', unit_scale=True) as pbar,
+          open(word_agg_file, "w", encoding="UTF-8") as fo):
+        for file in result_list:
             with open(os.path.join(file_path, file), "r", encoding="UTF-8") as fi:
                 for word in fi:
                     fo.write(word)
+            pbar.update(1)
 
 
 def prepare_word_split_and_reverse_index(file_path: str):
@@ -235,7 +201,7 @@ def prepare_word_split_and_reverse_index(file_path: str):
         # 分词结果排序
         p_result_list = sorted(p_result_list, key=lambda v: v[0])
         # 输出分词结果
-        split_output_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_SPLIT)
+        split_output_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_STEM)
         with open(split_output_file, "w", encoding="UTF-8") as fo:
             for start_pos, word_arr_list, reverse_index in p_result_list:
                 for word_arr in word_arr_list:
@@ -315,174 +281,3 @@ def word_split_reverse(input_file: str, start_pos: int, end_pos: int):
                 word_arr_list.append(word_filter_arr)
 
     return start_pos, word_arr_list, word_reverse_index
-
-
-def init_process(max_conns: int, max_threads: int, file_path: str):
-    """
-    初始化进程
-    :param max_conns: redis最大连接数量
-    :param max_threads: 线程最大数量
-    :param file_path: 输出文件路径
-    :return:
-    """
-    # redis缓存池 初始化
-    global redis_pool
-    redis_pool = redis.ConnectionPool(host='127.0.0.1', port=6379, max_connections=max_conns)
-
-    global thread_pool
-    thread_pool = ThreadPoolExecutor(max_threads, initializer=init_thread, initargs=(file_path,))
-
-
-def agg_word_process(agg_threshold: float, start_pos: int, end_pos: int, final_pos: int,
-                     skip_line: int):
-    """
-    长尾词聚合处理
-    :param agg_threshold: 聚合阈值
-    :param start_pos: 任务处理开始边界(包含)
-    :param end_pos: 任务处理结束边界(不包含)
-    :param final_pos: 总任务边界
-    :param skip_line: 进度条显示位置
-    :return:
-    """
-
-    # 进度长度
-    process_len = 0
-    if end_pos == -1:
-        process_len = final_pos - start_pos
-    else:
-        process_len = end_pos - start_pos
-
-    with tqdm(total=process_len, desc='子进程-%s:文本聚合进度' % os.getpid(), unit='份', unit_scale=True,
-              position=skip_line) as pbar:
-
-        thread_futures = [thread_pool.submit(agg_word_thread, main_word_position, agg_threshold) for main_word_position
-                          in
-                          range(start_pos, end_pos)]
-
-        for t_future in as_completed(thread_futures):
-            t_result = t_future.result()
-            # 更新发呆进度
-            pbar.update(1)
-
-    return
-
-
-def init_thread(file_path: str):
-    """
-    聚合线程初始化
-    :param file_path: 输出文件路径
-    :return:
-    """
-    # 初始化redis客户端
-    local_var.redis_cache = redis.StrictRedis(connection_pool=redis_pool)
-    # 初始化redis管道
-    local_var.redis_pipeline = local_var.redis_cache.pipeline(transaction=False)
-
-    # 生成临时结果文件
-    word_agg_file = os.path.join(file_path,
-                                 FILE_LONG_TAIL_MERGE_AGG_PID % (os.getpid(), threading.current_thread().ident))
-    local_var.file_writer = open(word_agg_file, "w", encoding=CHARSET_UTF_8)
-
-    # 已使用位图副本
-    local_var.unused_bitmap = bitarray()
-
-    # 从倒排索引中获取候选词的位置索引
-    local_var.candidate_position_set = set()
-
-    # 结果列表
-    local_var.result_list = []
-
-
-def agg_word_thread(main_word_position: int, agg_threshold: float):
-    try:
-        # 获取已使用位图副本
-        local_var.unused_bitmap.frombytes(local_var.redis_cache.get(CACHE_UNUSED_BITMAP))
-
-        # 判断主词是否为已使用,是则跳过,否则设置为已使用
-        if local_var.unused_bitmap[main_word_position]:
-            return
-        else:
-            local_var.redis_cache.setbit(CACHE_UNUSED_BITMAP, main_word_position, 1)
-            local_var.unused_bitmap[main_word_position] = 1
-
-        # 获取主词和对应的词根
-        local_var.redis_pipeline.hget(CACHE_WORD, main_word_position)
-        local_var.redis_pipeline.hget(CACHE_WORD_STEM, main_word_position)
-        main_result = local_var.redis_pipeline.execute()
-        main_word = main_result[0]
-        main_word_stem = main_result[1]
-
-        # 如果存在为空则返回
-        if not main_word or not main_word_stem:
-            return
-
-        main_word_stem_list = main_word_stem.decode(CHARSET_UTF_8).split(",")
-
-        # 从倒排索引获取长尾词位置
-        temp_candidate_position_list = local_var.redis_cache.hmget(CACHE_WORD_REVERSE_INDEX, main_word_stem_list)
-        for temp_candidate_position in temp_candidate_position_list:
-            if not temp_candidate_position:
-                continue
-            # 排除已聚合
-            for candidate_position in temp_candidate_position.decode(CHARSET_UTF_8).split(","):
-                if local_var.unused_bitmap[int(candidate_position)]:
-                    continue
-                local_var.candidate_position_set.add(candidate_position)
-
-        # 没有找到需要计算的候选词则跳过
-        if not local_var.candidate_position_set:
-            return
-
-        # 从缓存获取关键词列表、分词列表,如果为空则跳过
-        local_var.redis_pipeline.hmget(CACHE_WORD, local_var.candidate_position_set)
-        local_var.redis_pipeline.hmget(CACHE_WORD_STEM, local_var.candidate_position_set)
-        candidate_result = local_var.redis_pipeline.execute()
-        candidate_word_cache_list = candidate_result[0]
-        candidate_word_stem_cache_list = candidate_result[1]
-        if not candidate_word_cache_list or not candidate_word_stem_cache_list:
-            return
-
-        # 延后编码成字符,以防前面直接返回
-        main_word = main_word.decode(CHARSET_UTF_8)
-
-        # 计算相似度
-        for candidate_position in range(len(local_var.candidate_position_set)):
-
-            # 获取关键词、分词,如果存在为空则跳过
-            candidate_word = candidate_word_cache_list[int(candidate_position)]
-            if not candidate_word:
-                continue
-            candidate_word_stem = candidate_word_stem_cache_list[int(candidate_position)]
-            if not candidate_word_stem:
-                continue
-            candidate_word = candidate_word.decode(CHARSET_UTF_8)
-            candidate_word_stem_list = candidate_word_stem.decode(CHARSET_UTF_8).split(",")
-
-            # 计算相关性
-            try:
-                val = utils.cal_cos_sim(main_word, main_word_stem_list, candidate_word,
-                                        candidate_word_stem_list)
-                if val >= agg_threshold:
-                    local_var.redis_cache.setbit(CACHE_UNUSED_BITMAP, candidate_position, 1)
-                    local_var.result_list.append(candidate_word)
-            except Exception as e:
-                logging.error("主关键词:%s 发生异常,涉及的副关键词信息-关键词:%s,分词:%s" % (
-                    main_word, candidate_word, candidate_word_stem_list), e)
-
-        # 保存结果
-        if not local_var.result_list:
-            return
-        local_var.file_writer.write("%s\n" % main_word)
-        for candidate_word in local_var.result_list:
-            local_var.file_writer.write("%s\n" % candidate_word)
-        local_var.file_writer.write("\n")
-
-    except Exception as e:
-        logging.error("子进程发生异常", e)
-    finally:
-        # 清除容器数据
-        local_var.candidate_position_set.clear()
-        local_var.result_list.clear()
-        local_var.unused_bitmap.clear()
-
-    return

+ 4 - 2
src/money.py

@@ -1,13 +1,14 @@
 # -*- coding:utf-8 -*-
 import os
+import subprocess
 import time
 import zipfile
 
 import jieba
 
 import utils
-from agg import agg_word
 from constant import FILE_LONG_TAIL_MERGE
+import agg
 
 # 文件后缀:长尾词.txt
 FILE_SUFFIX_LONG_TAIL = "_长尾词.txt"
@@ -134,10 +135,11 @@ if __name__ == "__main__":
     # extract_word_from_5118(filePath)
     # merge_word(filePath)
     # prepare_word_split_and_reverse_index(filePath)
-    agg_word(filePath)
+    agg.agg_word(filePath)
     # word_split_statistics(file_path)
     # tasks = utils.avg_split_task(100, 12, 1)
     # 两者计算余弦值等于:0.8
     # val = utils.cal_cos_sim("QQ邮箱格式怎么写", ["QQ", "邮箱", "格式", "怎么", "写"], "QQ邮箱格式如何写",
     #                         ["QQ", "邮箱", "格式", "如何", "写"])
+    # r = subprocess.run(["java", "-jar", "D:\\Documents\\ChenYL\\CodeRepository\\money-mining-java\\target\\money-mining-1.0-SNAPSHOT.jar"])
     print("结束时间" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))