| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- # -*- coding:utf-8 -*-
- from concurrent.futures import ProcessPoolExecutor, as_completed
- from functools import reduce
- from itertools import combinations
- import math
- import mmap
- import os
- from time import sleep, time
- from cal import cal_cos_sim
- import config
- import tools
- import stop_word
- import re
- import logging
- # 问题
- # 用线程处理IO高的部分
- # 主线程利用率极低
- # 优化代码,加快速度(目前速度:约1分钟100个关键词)
- # 已解决
- # 输出的格式不正确
- # 分析结果内容没有写入结果中
- # 移除祠根数等于1的词,不做分析
- # 减少重复加载 -> 解决:加入仅在子进程时才加载的判断
- tools.init_log()
- def intesect(x, y):
- """
- 计算集合的交集
- """
- return x & y
- if __name__ != "__main__":
- # 停用词
- stop_word_index = stop_word.load_stop_word()
- # KEY表索引
- key_index = tools.load_obj(config.KEY_INDEX_CACHE)
- # 倒排表索引
- reverse_index = tools.load_obj(config.KEY_REVERSE_INDEX_CACHE)
- # 聚合阈值
- agg_threshold = 0.8
- # 正则提取
- # 倒排表 索引
- index_re = r"'(\d+)'"
- index_pattern = re.compile(index_re, re.I)
- # 关键词
- key_re = r"[^,]*,(.*),\["
- key_pattern = re.compile(key_re, re.I)
- # KEY表 词根
- stem_re = r"'([^,]*)'"
- stem_pattern = re.compile(stem_re, re.I)
- def sub_process(start_pos, end_pos):
- """
- 子进程
- """
- pid = os.getpid()
- logging.debug("子进程-%d 开始执行任务,开始位置:%d,结束位置:%d" % (pid,start_pos, end_pos))
- # 聚合结果
- agg_result = []
-
- # 开始时间
- start_time = time()
- with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as f_key, \
- mmap.mmap(f_key.fileno(), 0, access=mmap.ACCESS_READ) as f_key_mmap, \
- open(config.KEY_REVERSE_FILE, "r", encoding=config.ENCODING_CHARSET) as f_reverse, \
- mmap.mmap(f_reverse.fileno(), 0, access=mmap.ACCESS_READ) as f_reverse_mmap :
-
- # 把关键词索引转换成对应的位置
- lower_pos = key_index[start_pos]
- upper_pos = key_index[end_pos]
- # 移动到开始位置
- f_key_mmap.seek(lower_pos)
- # 读取主关键词信息
- a_keys = {}
- while True:
- # 校验当前位置是否越界
- cur_pos = f_key_mmap.tell()
- if cur_pos >= upper_pos:
- break
-
- line = f_key_mmap.readline().decode("UTF-8")
- # 提取 关键词、词根
- key_m = key_pattern.match(line)
- a_key = key_m.group(1)
- a_stem = []
- # 过滤停用词
- tmp_stem = stem_pattern.findall(line)
- for stem in tmp_stem:
- if stem in stop_word_index:
- continue
- a_stem.append(stem)
- # 保存到容器,如果祠根数等于1则没有比较的价值
- if len(a_stem) > 1:
- a_keys[a_key]=a_stem
- # 合并词根
- all_stem = set()
- for a_stem in a_keys.values():
- for stem in a_stem:
- all_stem.add(stem)
- # 获取倒排信息
- reverse_dict = {}
- for stem in all_stem:
- # 读取倒排表
- f_reverse_mmap.seek(reverse_index[stem])
- reverse_line = f_reverse_mmap.readline().decode("UTF-8")
- # 提取 位置信息
- b_indexs = index_pattern.findall(reverse_line)
- reverse_dict[stem]=set(b_indexs)
-
- # 计算相关性
- for a_key, a_stem in a_keys.items():
- # 计算词根组合
- logging.debug("子进程-%d 主关键词:%s 开始计算词根组合" % (pid, a_key))
- tmp_stem = []
- for stem in a_stem:
- tmp_stem.append(stem)
- num = math.ceil(len(tmp_stem) * 0.7)
- stem_combs = list (combinations(tmp_stem, num))
- logging.debug("子进程-%d 主关键词:%s 计算词根组合结束" % (pid, a_key))
- logging.debug("子进程-%d 主关键词:%s 开始获取词根涉及的关键词信息" % (pid, a_key))
- # 计算词根涉及的关键词的交集
- b_indexs = set()
- for stem_comb in stem_combs:
- indexs = [reverse_dict[a_stem] for a_stem in stem_comb]
- for b_index in reduce(intesect, indexs):
- b_indexs.add(b_index)
- logging.debug("子进程-%d 主关键词:%s 总祠根数:%d" % (pid, a_key, len(b_indexs)))
- # 获取关键词信息
- b_keys = []
- for b_index in b_indexs:
- # 读取关键词数据
- f_key_mmap.seek(key_index[int(b_index)])
- line = f_key_mmap.readline().decode("UTF-8")
- # 提取 关键词、词根
- key_m = key_pattern.match(line)
- b_key = key_m.group(1)
- b_stem = stem_pattern.findall(line)
- b_keys.append((b_key, b_stem))
- logging.debug("子进程-%d 主关键词:%s 获取词根涉及的关键词信息结束,涉及计算关键词数量:%d" % (pid, a_key, len(b_keys)))
- logging.debug("子进程-%d 主关键词:%s 开始计算相关性" % (pid, a_key))
- # 结果容器
- correlation_key = []
- correlation_key.append(a_key)
- # 计算相关性
- if b_keys:
- for b_key, b_stem in b_keys:
- try:
- val = cal_cos_sim(a_key, a_stem, b_key, b_stem)
- if val >= agg_threshold:
- correlation_key.append(b_key)
- except Exception as e:
- logging.error("主关键词:%s 发生异常,涉及的副关键词信息-关键词:%s,分词:%s" % (a_key, b_key, b_stem), e)
- # 有内容则进行保存
- if len(correlation_key) > 1:
- agg_result.append(correlation_key)
- logging.debug("子进程-%d 主关键词:%s 计算相关性结束,相关的关键词数据量:%d" % (pid, a_key, (len(correlation_key)-1)))
-
- logging.debug("子进程-%d 执行任务结束,耗时:%f" % (pid, (time() - start_time)))
- return {
- "agg_result": agg_result,
- "start_pos": start_pos
- }
-
- def main_process():
- """
- 主进程
- """
- # 进程数
- process_num = 4
- # KEY 表总长度
- total_task = 14500028
- # 任务数量
- per_task_num = 100
- # 处理进度保存间隔(单位:秒)
- save_process_internal = 300
- # 划分子任务:任务进度记录、任务列表
- process_record, tasks = avg_split_task(total_task, per_task_num)
- with ProcessPoolExecutor(max_workers=process_num) as process_pool, \
- open(config.AGG_FILE, "a", encoding=config.ENCODING_CHARSET) as f:
- save_start_time = time()
- logging.info("主进程:提交任务到子进程")
- process_futures = [process_pool.submit(sub_process, task[0], task[1]) for task in tasks]
-
- for p_future in as_completed(process_futures):
- logging.debug("主进程:子进程返回部分数据")
- result = p_future.result()
- # 记录处理进度
- cur_pos = result["start_pos"]
- process_record[cur_pos]=1
- # 保存分析结果
- if result:
- logging.debug("主进程:存在有效数据开始处理")
- for correlation_key in result["agg_result"]:
- f.write("\n######开始######\n")
- for key in correlation_key:
- f.write("%s\n" % key)
-
- # 保存处理进度
- if (time() - save_start_time) > save_process_internal:
- logging.debug("保存处理进度")
- # 更新开始时间
- save_start_time = time()
- tools.save_obj(config.ANALYSE_PROCESS_CACHE, process_record)
- tools.tip(total_task, cur_pos)
-
-
- def avg_split_task(total:int, split_internal:int):
- """
- 平分任务
- """
- # 任务列表
- tasks = None
- # 任务进度记录
- process_record = None
- # 分割的任务份数
- split_num = math.ceil(total / split_internal)
- # 平分
- tmp_lists = []
- for i in range(split_num):
- # 计算平分点在列表中的位置
- start_pos = i * split_internal
- end_pos = i * split_internal + split_internal
- # 如果超过列表大小需要额外处理
- if end_pos >= total:
- end_pos = None
- tmp_lists.append([start_pos,end_pos])
-
- # 加载进度缓存
- if os.path.exists(config.ANALYSE_PROCESS_CACHE):
- logging.debug("存在分析进度缓存")
- process_record = tools.load_obj(config.ANALYSE_PROCESS_CACHE)
-
- # 更新任务列表
- if process_record:
- tasks = []
- for task in tmp_lists:
- pos = task[0] // split_internal
- if not process_record[pos]:
- tasks.append(task)
- else:
- tasks = tmp_lists
- process_record = [0 for i in range(len(tmp_lists))]
- return process_record, tasks
- if __name__ == "__main__":
- TITLE = "(多进程版 fast_14.py)聚合文件"
- tools.log_start_msg(TITLE)
- main_process()
- tools.log_end_msg(TITLE)
|