# -*- coding:utf-8 -*- from concurrent.futures import ProcessPoolExecutor, as_completed from functools import reduce from itertools import combinations import math import mmap import os from time import sleep, time from cal import cal_cos_sim import config import tools import stop_word import re import logging # 问题 # 用线程处理IO高的部分 # 主线程利用率极低 # 优化代码,加快速度(目前速度:约1分钟100个关键词) # 已解决 # 输出的格式不正确 # 分析结果内容没有写入结果中 # 移除祠根数等于1的词,不做分析 # 减少重复加载 -> 解决:加入仅在子进程时才加载的判断 tools.init_log() def intesect(x, y): """ 计算集合的交集 """ return x & y if __name__ != "__main__": # 停用词 stop_word_index = stop_word.load_stop_word() # KEY表索引 key_index = tools.load_obj(config.KEY_INDEX_CACHE) # 倒排表索引 reverse_index = tools.load_obj(config.KEY_REVERSE_INDEX_CACHE) # 聚合阈值 agg_threshold = 0.8 # 正则提取 # 倒排表 索引 index_re = r"'(\d+)'" index_pattern = re.compile(index_re, re.I) # 关键词 key_re = r"[^,]*,(.*),\[" key_pattern = re.compile(key_re, re.I) # KEY表 词根 stem_re = r"'([^,]*)'" stem_pattern = re.compile(stem_re, re.I) def sub_process(start_pos, end_pos): """ 子进程 """ pid = os.getpid() logging.info("子进程-%d 开始执行任务,开始位置:%d,结束位置:%d" % (pid,start_pos, end_pos)) # 聚合结果 agg_result = [] # 开始时间 start_time = time() with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as f_key, \ mmap.mmap(f_key.fileno(), 0, access=mmap.ACCESS_READ) as f_key_mmap, \ open(config.KEY_REVERSE_FILE, "r", encoding=config.ENCODING_CHARSET) as f_reverse, \ mmap.mmap(f_reverse.fileno(), 0, access=mmap.ACCESS_READ) as f_reverse_mmap : # 把关键词索引转换成对应的位置 lower_pos = key_index[start_pos] upper_pos = key_index[end_pos] # 移动到开始位置 f_key_mmap.seek(lower_pos) # 读取主关键词信息 a_keys = {} while True: # 校验当前位置是否越界 cur_pos = f_key_mmap.tell() if cur_pos >= upper_pos: break line = f_key_mmap.readline().decode("UTF-8") # 提取 关键词、词根 key_m = key_pattern.match(line) a_key = key_m.group(1) a_stem = [] # 过滤停用词 tmp_stem = stem_pattern.findall(line) for stem in tmp_stem: if stem in stop_word_index: continue a_stem.append(stem) # 保存到容器,如果祠根数等于1则没有比较的价值 if len(a_stem) > 1: a_keys[a_key]=a_stem # 合并词根 all_stem = set() for a_stem in a_keys.values(): for stem in a_stem: all_stem.add(stem) # 获取倒排信息 reverse_dict = {} for stem in all_stem: # 读取倒排表 f_reverse_mmap.seek(reverse_index[stem]) reverse_line = f_reverse_mmap.readline().decode("UTF-8") # 提取 位置信息 b_indexs = index_pattern.findall(reverse_line) reverse_dict[stem]=set(b_indexs) # 计算相关性 for a_key, a_stem in a_keys.items(): # 计算词根组合 logging.debug("子进程-%d 主关键词:%s 开始计算词根组合" % (pid, a_key)) tmp_stem = [] for stem in a_stem: tmp_stem.append(stem) num = math.ceil(len(tmp_stem) * 0.7) stem_combs = list (combinations(tmp_stem, num)) logging.debug("子进程-%d 主关键词:%s 计算词根组合结束" % (pid, a_key)) logging.debug("子进程-%d 主关键词:%s 开始获取词根涉及的关键词信息" % (pid, a_key)) # 计算词根涉及的关键词的交集 b_indexs = set() for stem_comb in stem_combs: indexs = [reverse_dict[a_stem] for a_stem in stem_comb] for b_index in reduce(intesect, indexs): b_indexs.add(b_index) logging.debug("子进程-%d 主关键词:%s 总祠根数:%d" % (pid, a_key, len(b_indexs))) # 获取关键词信息 b_keys = [] for b_index in b_indexs: # 读取关键词数据 f_key_mmap.seek(key_index[int(b_index)]) line = f_key_mmap.readline().decode("UTF-8") # 提取 关键词、词根 key_m = key_pattern.match(line) b_key = key_m.group(1) b_stem = stem_pattern.findall(line) b_keys.append((b_key, b_stem)) logging.debug("子进程-%d 主关键词:%s 获取词根涉及的关键词信息结束,涉及计算关键词数量:%d" % (pid, a_key, len(b_keys))) logging.debug("子进程-%d 主关键词:%s 开始计算相关性" % (pid, a_key)) # 结果容器 correlation_key = [] correlation_key.append(a_key) # 计算相关性 if b_keys: for b_key, b_stem in b_keys: try: val = cal_cos_sim(a_key, a_stem, b_key, b_stem) if val >= agg_threshold: correlation_key.append(b_key) except Exception as e: logging.error("主关键词:%s 发生异常,涉及的副关键词信息-关键词:%s,分词:%s" % (a_key, b_key, b_stem), e) # 有内容则进行保存 if len(correlation_key) > 1: agg_result.append(correlation_key) logging.debug("子进程-%d 主关键词:%s 计算相关性结束,相关的关键词数据量:%d" % (pid, a_key, (len(correlation_key)-1))) logging.info("子进程-%d 执行任务结束,耗时:%f" % (pid, (time() - start_time))) return { "agg_result": agg_result, "start_pos": start_pos } def main_process(): """ 主进程 """ # 进程数 process_num = 4 # KEY 表总长度 total_task = 14500028 # 任务数量 per_task_num = 100 # 划分子任务:任务进度记录、任务列表 process_record, tasks = avg_split_task(total_task, per_task_num) with ProcessPoolExecutor(max_workers=process_num) as process_pool, \ open(config.AGG_FILE, "a", encoding=config.ENCODING_CHARSET) as f: logging.info("主进程:提交任务到子进程") process_futures = [process_pool.submit(sub_process, task[0], task[1]) for task in tasks] for p_future in as_completed(process_futures): logging.debug("主进程:子进程返回部分数据") result = p_future.result() # 记录处理进度 cur_pos = result["start_pos"] process_record[cur_pos//per_task_num]=1 # 保存分析结果 if result: logging.debug("主进程:存在有效数据开始处理") for correlation_key in result["agg_result"]: f.write("\n######开始######\n") for key in correlation_key: f.write("%s\n" % key) # 保存处理进度 tools.save_obj(config.ANALYSE_PROCESS_CACHE, process_record) tools.tip(total_task, cur_pos) def avg_split_task(total:int, split_internal:int): """ 平分任务 """ # 任务列表 tasks = None # 任务进度记录 process_record = None # 分割的任务份数 split_num = math.ceil(total / split_internal) # 平分 tmp_lists = [] for i in range(split_num): # 计算平分点在列表中的位置 start_pos = i * split_internal end_pos = i * split_internal + split_internal # 如果超过列表大小需要额外处理 if end_pos >= total: end_pos = None tmp_lists.append([start_pos,end_pos]) # 加载进度缓存 if os.path.exists(config.ANALYSE_PROCESS_CACHE): logging.debug("存在分析进度缓存") process_record = tools.load_obj(config.ANALYSE_PROCESS_CACHE) # 更新任务列表 if process_record: tasks = [] for task in tmp_lists: pos = task[0] // split_internal if not process_record[pos]: tasks.append(task) else: tasks = tmp_lists process_record = [0 for i in range(len(tmp_lists))] return process_record, tasks if __name__ == "__main__": TITLE = "(多进程版 fast_14.py)聚合文件" tools.log_start_msg(TITLE) main_process() tools.log_end_msg(TITLE)