|
|
@@ -1,155 +1,289 @@
|
|
|
# -*- coding:utf-8 -*-
|
|
|
|
|
|
+from concurrent.futures import ProcessPoolExecutor, as_completed
|
|
|
+from functools import reduce
|
|
|
+from itertools import combinations
|
|
|
+import math
|
|
|
import mmap
|
|
|
+import os
|
|
|
+from time import sleep, time
|
|
|
+from cal import cal_cos_sim
|
|
|
+
|
|
|
import config
|
|
|
import tools
|
|
|
import stop_word
|
|
|
import re
|
|
|
-import ast
|
|
|
-import cal
|
|
|
import logging
|
|
|
-import ast
|
|
|
-from bitmap import BitMap
|
|
|
|
|
|
-TITLE = "聚合文件"
|
|
|
+# 问题
|
|
|
+# 用线程处理IO高的部分
|
|
|
+# 主线程利用率极低
|
|
|
+# 优化代码,加快速度(目前速度:约1分钟100个关键词)
|
|
|
+
|
|
|
+# 已解决
|
|
|
+# 输出的格式不正确
|
|
|
+# 分析结果内容没有写入结果中
|
|
|
+# 移除祠根数等于1的词,不做分析
|
|
|
+# 减少重复加载 -> 解决:加入仅在子进程时才加载的判断
|
|
|
+
|
|
|
+tools.init_log()
|
|
|
|
|
|
-def re_extract_key(pattern, line):
|
|
|
+def intesect(x, y):
|
|
|
"""
|
|
|
- 正则提取关键词信息
|
|
|
+ 计算集合的交集
|
|
|
"""
|
|
|
- m = pattern.match(line)
|
|
|
- # 关键词 序号
|
|
|
- index = m.group(1)
|
|
|
- # 关键词
|
|
|
- key = m.group(2)
|
|
|
- # 关键词 分词词根
|
|
|
- word_root = m.group(3)
|
|
|
- # 把index转换成数字方便使用
|
|
|
- return int(index), key, word_root
|
|
|
-
|
|
|
-def main():
|
|
|
- # 初始化日志配置
|
|
|
- tools.init_log()
|
|
|
- tools.log_start_msg(TITLE)
|
|
|
+ return x & y
|
|
|
|
|
|
+if __name__ != "__main__":
|
|
|
# 停用词
|
|
|
- logging.info("加载停用词")
|
|
|
- stop_word_cache = stop_word.load_stop_word()
|
|
|
- # 关键词索引
|
|
|
- logging.info("加载关键词索引")
|
|
|
- key_index_cache = tools.load_obj(config.KEY_INDEX_CACHE)
|
|
|
- # 倒排索引
|
|
|
- logging.info("加载倒排索引")
|
|
|
- key_reverse_index_cache = tools.load_obj(config.KEY_REVERSE_INDEX_CACHE)
|
|
|
- # 正则 提取数据
|
|
|
- s = r"(\d+),([^,]*),(.*)"
|
|
|
- pattern = re.compile(s, re.I)
|
|
|
-
|
|
|
- with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as fkey, \
|
|
|
- open(config.KEY_REVERSE_FILE, "r", encoding=config.ENCODING_CHARSET) as freverse, \
|
|
|
- mmap.mmap(fkey.fileno(), 0, access=mmap.ACCESS_READ) as f_key_mmap, \
|
|
|
- mmap.mmap(freverse.fileno(), 0, access=mmap.ACCESS_READ) as f_reverse_mmap:
|
|
|
-
|
|
|
- # 计算总关键词数
|
|
|
-
|
|
|
- # TODO 这里要改成从统计信息中获取
|
|
|
- total_count = 14500029
|
|
|
+ stop_word_index = stop_word.load_stop_word()
|
|
|
+
|
|
|
+ # KEY表索引
|
|
|
+ key_index = tools.load_obj(config.KEY_INDEX_CACHE)
|
|
|
+
|
|
|
+ # 倒排表索引
|
|
|
+ reverse_index = tools.load_obj(config.KEY_REVERSE_INDEX_CACHE)
|
|
|
+
|
|
|
+ # 聚合阈值
|
|
|
+ agg_threshold = 0.8
|
|
|
+
|
|
|
+ # 正则提取
|
|
|
+ # 倒排表 索引
|
|
|
+ index_re = r"'(\d+)'"
|
|
|
+ index_pattern = re.compile(index_re, re.I)
|
|
|
+ # 关键词
|
|
|
+ key_re = r"[^,]*,(.*),\["
|
|
|
+ key_pattern = re.compile(key_re, re.I)
|
|
|
+ # KEY表 词根
|
|
|
+ stem_re = r"'([^,]*)'"
|
|
|
+ stem_pattern = re.compile(stem_re, re.I)
|
|
|
+
|
|
|
+def sub_process(start_pos, end_pos):
|
|
|
+ """
|
|
|
+ 子进程
|
|
|
+ """
|
|
|
+ pid = os.getpid()
|
|
|
+
|
|
|
+ logging.debug("子进程-%d 开始执行任务,开始位置:%d,结束位置:%d" % (pid,start_pos, end_pos))
|
|
|
+
|
|
|
+ # 聚合结果
|
|
|
+ agg_result = []
|
|
|
+
|
|
|
+ # 开始时间
|
|
|
+ start_time = time()
|
|
|
+
|
|
|
+ with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as f_key, \
|
|
|
+ mmap.mmap(f_key.fileno(), 0, access=mmap.ACCESS_READ) as f_key_mmap, \
|
|
|
+ open(config.KEY_REVERSE_FILE, "r", encoding=config.ENCODING_CHARSET) as f_reverse, \
|
|
|
+ mmap.mmap(f_reverse.fileno(), 0, access=mmap.ACCESS_READ) as f_reverse_mmap :
|
|
|
|
|
|
- # 生成位图bitmap
|
|
|
- bm = BitMap(total_count)
|
|
|
+ # 把关键词索引转换成对应的位置
|
|
|
+ lower_pos = key_index[start_pos]
|
|
|
+ upper_pos = key_index[end_pos]
|
|
|
|
|
|
- # 待处理的文件总大小
|
|
|
- total_num = f_key_mmap.size()
|
|
|
+ # 移动到开始位置
|
|
|
+ f_key_mmap.seek(lower_pos)
|
|
|
|
|
|
+ # 读取主关键词信息
|
|
|
+ a_keys = {}
|
|
|
while True:
|
|
|
- # 当前处理位置
|
|
|
+ # 校验当前位置是否越界
|
|
|
cur_pos = f_key_mmap.tell()
|
|
|
+ if cur_pos >= upper_pos:
|
|
|
+ break
|
|
|
+
|
|
|
+ line = f_key_mmap.readline().decode("UTF-8")
|
|
|
+ # 提取 关键词、词根
|
|
|
+ key_m = key_pattern.match(line)
|
|
|
+ a_key = key_m.group(1)
|
|
|
+ a_stem = []
|
|
|
+ # 过滤停用词
|
|
|
+ tmp_stem = stem_pattern.findall(line)
|
|
|
+ for stem in tmp_stem:
|
|
|
+ if stem in stop_word_index:
|
|
|
+ continue
|
|
|
+ a_stem.append(stem)
|
|
|
+ # 保存到容器,如果祠根数等于1则没有比较的价值
|
|
|
+ if len(a_stem) > 1:
|
|
|
+ a_keys[a_key]=a_stem
|
|
|
+
|
|
|
+ # 合并词根
|
|
|
+ all_stem = set()
|
|
|
+ for a_stem in a_keys.values():
|
|
|
+ for stem in a_stem:
|
|
|
+ all_stem.add(stem)
|
|
|
|
|
|
- # 进度提示
|
|
|
- tools.tip_in_size(total_num, cur_pos)
|
|
|
+ # 获取倒排信息
|
|
|
+ reverse_dict = {}
|
|
|
+ for stem in all_stem:
|
|
|
+ # 读取倒排表
|
|
|
+ f_reverse_mmap.seek(reverse_index[stem])
|
|
|
+ reverse_line = f_reverse_mmap.readline().decode("UTF-8")
|
|
|
+ # 提取 位置信息
|
|
|
+ b_indexs = index_pattern.findall(reverse_line)
|
|
|
+ reverse_dict[stem]=set(b_indexs)
|
|
|
+
|
|
|
+ # 计算相关性
|
|
|
+ for a_key, a_stem in a_keys.items():
|
|
|
+ # 计算词根组合
|
|
|
+ logging.debug("子进程-%d 主关键词:%s 开始计算词根组合" % (pid, a_key))
|
|
|
+ tmp_stem = []
|
|
|
+ for stem in a_stem:
|
|
|
+ tmp_stem.append(stem)
|
|
|
+ num = math.ceil(len(tmp_stem) * 0.7)
|
|
|
+ stem_combs = list (combinations(tmp_stem, num))
|
|
|
+ logging.debug("子进程-%d 主关键词:%s 计算词根组合结束" % (pid, a_key))
|
|
|
+
|
|
|
+ logging.debug("子进程-%d 主关键词:%s 开始获取词根涉及的关键词信息" % (pid, a_key))
|
|
|
+ # 计算词根涉及的关键词的交集
|
|
|
+ b_indexs = set()
|
|
|
+ for stem_comb in stem_combs:
|
|
|
+ indexs = [reverse_dict[a_stem] for a_stem in stem_comb]
|
|
|
+ for b_index in reduce(intesect, indexs):
|
|
|
+ b_indexs.add(b_index)
|
|
|
+ logging.debug("子进程-%d 主关键词:%s 总祠根数:%d" % (pid, a_key, len(b_indexs)))
|
|
|
+ # 获取关键词信息
|
|
|
+ b_keys = []
|
|
|
+ for b_index in b_indexs:
|
|
|
+ # 读取关键词数据
|
|
|
+ f_key_mmap.seek(key_index[int(b_index)])
|
|
|
+ line = f_key_mmap.readline().decode("UTF-8")
|
|
|
+ # 提取 关键词、词根
|
|
|
+ key_m = key_pattern.match(line)
|
|
|
+ b_key = key_m.group(1)
|
|
|
+ b_stem = stem_pattern.findall(line)
|
|
|
+ b_keys.append((b_key, b_stem))
|
|
|
+ logging.debug("子进程-%d 主关键词:%s 获取词根涉及的关键词信息结束,涉及计算关键词数量:%d" % (pid, a_key, len(b_keys)))
|
|
|
+
|
|
|
+ logging.debug("子进程-%d 主关键词:%s 开始计算相关性" % (pid, a_key))
|
|
|
+ # 结果容器
|
|
|
+ correlation_key = []
|
|
|
+ correlation_key.append(a_key)
|
|
|
+ # 计算相关性
|
|
|
+ if b_keys:
|
|
|
+ for b_key, b_stem in b_keys:
|
|
|
+ try:
|
|
|
+ val = cal_cos_sim(a_key, a_stem, b_key, b_stem)
|
|
|
+ if val >= agg_threshold:
|
|
|
+ correlation_key.append(b_key)
|
|
|
+ except Exception as e:
|
|
|
+ logging.error("主关键词:%s 发生异常,涉及的副关键词信息-关键词:%s,分词:%s" % (a_key, b_key, b_stem), e)
|
|
|
+
|
|
|
+ # 有内容则进行保存
|
|
|
+ if len(correlation_key) > 1:
|
|
|
+ agg_result.append(correlation_key)
|
|
|
+ logging.debug("子进程-%d 主关键词:%s 计算相关性结束,相关的关键词数据量:%d" % (pid, a_key, (len(correlation_key)-1)))
|
|
|
|
|
|
- # 获取要处理的关键词
|
|
|
- line = f_key_mmap.readline().decode(config.ENCODING_CHARSET)
|
|
|
|
|
|
- # 如果没有任何内容则结束
|
|
|
- if not line:
|
|
|
- logging.info("发现空白line")
|
|
|
- break
|
|
|
+ logging.debug("子进程-%d 执行任务结束,耗时:%f" % (pid, (time() - start_time)))
|
|
|
|
|
|
- # 提取信息
|
|
|
- index, key, word_root = re_extract_key(pattern, line)
|
|
|
+ return {
|
|
|
+ "agg_result": agg_result,
|
|
|
+ "start_pos": start_pos
|
|
|
+ }
|
|
|
|
|
|
- # bitmap校验,如果已经处理过则跳过
|
|
|
- if bm.test(index):
|
|
|
- logging.debug("主关键词:%s 已处理,跳过" % key)
|
|
|
- continue
|
|
|
+def main_process():
|
|
|
+ """
|
|
|
+ 主进程
|
|
|
+ """
|
|
|
|
|
|
- # 通过bitmap校验,设置对应的bit为0
|
|
|
- bm.set(index)
|
|
|
+ # 进程数
|
|
|
+ process_num = 4
|
|
|
|
|
|
- # 聚合结果存放容器
|
|
|
- agg_cache = []
|
|
|
+ # KEY 表总长度
|
|
|
+ total_task = 14500028
|
|
|
|
|
|
- # 记录主要关键词
|
|
|
- agg_cache.append(key)
|
|
|
+ # 任务数量
|
|
|
+ per_task_num = 100
|
|
|
|
|
|
- # 转换成真正的list对象
|
|
|
- logging.debug("当前处理的主关键词:%s, 词根数量:%d" % (key, len(word_root)))
|
|
|
- for item in ast.literal_eval(word_root):
|
|
|
- # 排除停用词
|
|
|
- if item in stop_word_cache:
|
|
|
- continue
|
|
|
-
|
|
|
- # 根据倒排索引,获取相关的关键词序号
|
|
|
- other_key_pos = key_reverse_index_cache.get(item)
|
|
|
- f_reverse_mmap.seek(other_key_pos)
|
|
|
- other_key_line = f_reverse_mmap.readline().decode(config.ENCODING_CHARSET)
|
|
|
- # 截取关键词索引部分
|
|
|
- other_index = other_key_line.index(",")
|
|
|
- other_key_indexs = other_key_line[other_index+1:]
|
|
|
- # 转换成真正的list对象
|
|
|
- other_key_indexs = ast.literal_eval(other_key_indexs)
|
|
|
- if not other_key_indexs:
|
|
|
- continue
|
|
|
+ # 处理进度保存间隔(单位:秒)
|
|
|
+ save_process_internal = 300
|
|
|
+
|
|
|
+ # 划分子任务:任务进度记录、任务列表
|
|
|
+ process_record, tasks = avg_split_task(total_task, per_task_num)
|
|
|
+
|
|
|
+ with ProcessPoolExecutor(max_workers=process_num) as process_pool, \
|
|
|
+ open(config.AGG_FILE, "a", encoding=config.ENCODING_CHARSET) as f:
|
|
|
|
|
|
- logging.debug("词根:%s, 涉及的其它关键词数量:%d" % (item, len(other_key_indexs)))
|
|
|
- for other_key_index in other_key_indexs:
|
|
|
- # bitmap校验,如果已经处理过则跳过
|
|
|
- if bm.test(int(other_key_index)):
|
|
|
- logging.debug("待比较关键词:%s 已处理,跳过" % other_key_index)
|
|
|
- continue
|
|
|
-
|
|
|
- # 从关键词索引中获取关键词位置
|
|
|
- pos = key_index_cache[other_key_index]
|
|
|
- # 获取待比较的关键词
|
|
|
- f_key_mmap.seek(pos)
|
|
|
- other_key_line = f_key_mmap.readline().decode(config.ENCODING_CHARSET)
|
|
|
- other_key_index, other_key,other_word_root = re_extract_key(pattern, other_key_line)
|
|
|
-
|
|
|
- # 计算相关性
|
|
|
- val = cal.cal_cos(key, other_key, word_root, other_word_root)
|
|
|
- if val >= 0.8:
|
|
|
- # 设置bitmap,该关键词已经处理过
|
|
|
- bm.set(other_key_index)
|
|
|
-
|
|
|
- # 记录类似的关键词
|
|
|
- agg_cache.append(other_key)
|
|
|
+ save_start_time = time()
|
|
|
+
|
|
|
+ logging.info("主进程:提交任务到子进程")
|
|
|
+ process_futures = [process_pool.submit(sub_process, task[0], task[1]) for task in tasks]
|
|
|
|
|
|
- # 保存到本地
|
|
|
- with open(config.AGG_ANALYSE_FILE % key, "w", encoding=config.ENCODING_CHARSET) as f:
|
|
|
- for item in agg_cache:
|
|
|
- f.write(item)
|
|
|
- f.write("\n")
|
|
|
-
|
|
|
- # 如果所有的关键词都处理完则结束
|
|
|
- if bm.all():
|
|
|
- logging.info("bitmap全部为1")
|
|
|
- break
|
|
|
- else:
|
|
|
- count = bm.count()
|
|
|
- logging.info("已处理数量:%d / %d,剩余数量:%d / %d" % (count, total_count, (total_count - count), total_count))
|
|
|
+ for p_future in as_completed(process_futures):
|
|
|
+ logging.debug("主进程:子进程返回部分数据")
|
|
|
+ result = p_future.result()
|
|
|
+
|
|
|
+ # 记录处理进度
|
|
|
+ cur_pos = result["start_pos"]
|
|
|
+ process_record[cur_pos]=1
|
|
|
+
|
|
|
+ # 保存分析结果
|
|
|
+ if result:
|
|
|
+ logging.debug("主进程:存在有效数据开始处理")
|
|
|
+ for correlation_key in result["agg_result"]:
|
|
|
+ f.write("\n######开始######\n")
|
|
|
+ for key in correlation_key:
|
|
|
+ f.write("%s\n" % key)
|
|
|
+
|
|
|
+ # 保存处理进度
|
|
|
+ if (time() - save_start_time) > save_process_internal:
|
|
|
+ logging.debug("保存处理进度")
|
|
|
+ # 更新开始时间
|
|
|
+ save_start_time = time()
|
|
|
+ tools.save_obj(config.ANALYSE_PROCESS_CACHE, process_record)
|
|
|
+
|
|
|
+ tools.tip(total_task, cur_pos)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+def avg_split_task(total:int, split_internal:int):
|
|
|
+ """
|
|
|
+ 平分任务
|
|
|
+ """
|
|
|
+ # 任务列表
|
|
|
+ tasks = None
|
|
|
+ # 任务进度记录
|
|
|
+ process_record = None
|
|
|
|
|
|
- tools.log_end_msg(TITLE)
|
|
|
+ # 分割的任务份数
|
|
|
+ split_num = math.ceil(total / split_internal)
|
|
|
+
|
|
|
+ # 平分
|
|
|
+ tmp_lists = []
|
|
|
+ for i in range(split_num):
|
|
|
+ # 计算平分点在列表中的位置
|
|
|
+ start_pos = i * split_internal
|
|
|
+ end_pos = i * split_internal + split_internal
|
|
|
+ # 如果超过列表大小需要额外处理
|
|
|
+ if end_pos >= total:
|
|
|
+ end_pos = None
|
|
|
+ tmp_lists.append([start_pos,end_pos])
|
|
|
+
|
|
|
+ # 加载进度缓存
|
|
|
+ if os.path.exists(config.ANALYSE_PROCESS_CACHE):
|
|
|
+ logging.debug("存在分析进度缓存")
|
|
|
+ process_record = tools.load_obj(config.ANALYSE_PROCESS_CACHE)
|
|
|
+
|
|
|
+ # 更新任务列表
|
|
|
+ if process_record:
|
|
|
+ tasks = []
|
|
|
+ for task in tmp_lists:
|
|
|
+ pos = task[0] // split_internal
|
|
|
+ if not process_record[pos]:
|
|
|
+ tasks.append(task)
|
|
|
+ else:
|
|
|
+ tasks = tmp_lists
|
|
|
+ process_record = [0 for i in range(len(tmp_lists))]
|
|
|
+
|
|
|
+ return process_record, tasks
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
- main()
|
|
|
+
|
|
|
+ TITLE = "(多进程版 fast_14.py)聚合文件"
|
|
|
+ tools.log_start_msg(TITLE)
|
|
|
+
|
|
|
+ main_process()
|
|
|
+
|
|
|
+ tools.log_end_msg(TITLE)
|