agg_word.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. # -*- coding:utf-8 -*-
  2. from concurrent.futures import ProcessPoolExecutor, as_completed
  3. from functools import reduce
  4. from itertools import combinations
  5. import math
  6. import mmap
  7. import os
  8. from time import time
  9. from cal import cal_cos_sim
  10. import config
  11. import tools
  12. import re
  13. import logging
  14. # 问题
  15. # 用线程处理IO高的部分
  16. # 主线程利用率极低
  17. # 优化代码,加快速度(目前速度:约1分钟100个关键词)
  18. # 已解决
  19. # 输出的格式不正确
  20. # 分析结果内容没有写入结果中
  21. # 移除祠根数等于1的词,不做分析
  22. # 减少重复加载 -> 解决:加入仅在子进程时才加载的判断
  23. tools.init_log()
  24. def intesect(x, y):
  25. """
  26. 计算集合的交集
  27. """
  28. return x & y
  29. if __name__ != "__main__":
  30. # 停用词
  31. stop_word_index = tools.load_stop_word()
  32. # KEY表索引
  33. key_index = tools.load_obj(config.KEY_INDEX_CACHE)
  34. # 倒排表索引
  35. reverse_index = tools.load_obj(config.KEY_REVERSE_INDEX_CACHE)
  36. # 聚合阈值
  37. agg_threshold = 0.8
  38. # 正则提取
  39. # 倒排表 索引
  40. index_re = r"'(\d+)'"
  41. index_pattern = re.compile(index_re, re.I)
  42. # 关键词
  43. key_re = r"[^,]*,(.*),\["
  44. key_pattern = re.compile(key_re, re.I)
  45. # KEY表 词根
  46. stem_re = r"'([^,]*)'"
  47. stem_pattern = re.compile(stem_re, re.I)
  48. def sub_process(start_pos, end_pos):
  49. """
  50. 子进程
  51. """
  52. pid = os.getpid()
  53. logging.info("子进程-%d 开始执行任务,开始位置:%d,结束位置:%d" % (pid,start_pos, end_pos))
  54. # 聚合结果
  55. agg_result = []
  56. # 开始时间
  57. start_time = time()
  58. with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as f_key, \
  59. mmap.mmap(f_key.fileno(), 0, access=mmap.ACCESS_READ) as f_key_mmap, \
  60. open(config.KEY_REVERSE_FILE, "r", encoding=config.ENCODING_CHARSET) as f_reverse, \
  61. mmap.mmap(f_reverse.fileno(), 0, access=mmap.ACCESS_READ) as f_reverse_mmap :
  62. # 把关键词索引转换成对应的位置
  63. lower_pos = key_index[start_pos]
  64. upper_pos = key_index[end_pos]
  65. # 移动到开始位置
  66. f_key_mmap.seek(lower_pos)
  67. # 读取主关键词信息
  68. a_keys = {}
  69. while True:
  70. # 校验当前位置是否越界
  71. cur_pos = f_key_mmap.tell()
  72. if cur_pos >= upper_pos:
  73. break
  74. line = f_key_mmap.readline().decode("UTF-8")
  75. # 提取 关键词、词根
  76. key_m = key_pattern.match(line)
  77. a_key = key_m.group(1)
  78. a_stem = []
  79. # 过滤停用词
  80. tmp_stem = stem_pattern.findall(line)
  81. for stem in tmp_stem:
  82. if stem in stop_word_index:
  83. continue
  84. a_stem.append(stem)
  85. # 保存到容器,如果祠根数等于1则没有比较的价值
  86. if len(a_stem) > 1:
  87. a_keys[a_key]=a_stem
  88. # 合并词根
  89. all_stem = set()
  90. for a_stem in a_keys.values():
  91. for stem in a_stem:
  92. all_stem.add(stem)
  93. # 获取倒排信息
  94. reverse_dict = {}
  95. for stem in all_stem:
  96. # 读取倒排表
  97. f_reverse_mmap.seek(reverse_index[stem])
  98. reverse_line = f_reverse_mmap.readline().decode("UTF-8")
  99. # 提取 位置信息
  100. b_indexs = index_pattern.findall(reverse_line)
  101. reverse_dict[stem]=set(b_indexs)
  102. # 计算相关性
  103. for a_key, a_stem in a_keys.items():
  104. # 计算词根组合
  105. logging.debug("子进程-%d 主关键词:%s 开始计算词根组合" % (pid, a_key))
  106. tmp_stem = []
  107. for stem in a_stem:
  108. tmp_stem.append(stem)
  109. num = math.ceil(len(tmp_stem) * 0.7)
  110. stem_combs = list (combinations(tmp_stem, num))
  111. logging.debug("子进程-%d 主关键词:%s 计算词根组合结束" % (pid, a_key))
  112. logging.debug("子进程-%d 主关键词:%s 开始获取词根涉及的关键词信息" % (pid, a_key))
  113. # 计算词根涉及的关键词的交集
  114. b_indexs = set()
  115. for stem_comb in stem_combs:
  116. indexs = [reverse_dict[a_stem] for a_stem in stem_comb]
  117. for b_index in reduce(intesect, indexs):
  118. b_indexs.add(b_index)
  119. logging.debug("子进程-%d 主关键词:%s 总祠根数:%d" % (pid, a_key, len(b_indexs)))
  120. # 获取关键词信息
  121. b_keys = []
  122. for b_index in b_indexs:
  123. # 读取关键词数据
  124. f_key_mmap.seek(key_index[int(b_index)])
  125. line = f_key_mmap.readline().decode("UTF-8")
  126. # 提取 关键词、词根
  127. key_m = key_pattern.match(line)
  128. b_key = key_m.group(1)
  129. b_stem = stem_pattern.findall(line)
  130. b_keys.append((b_key, b_stem))
  131. logging.debug("子进程-%d 主关键词:%s 获取词根涉及的关键词信息结束,涉及计算关键词数量:%d" % (pid, a_key, len(b_keys)))
  132. logging.debug("子进程-%d 主关键词:%s 开始计算相关性" % (pid, a_key))
  133. # 结果容器
  134. correlation_key = []
  135. correlation_key.append(a_key)
  136. # 计算相关性
  137. if b_keys:
  138. for b_key, b_stem in b_keys:
  139. try:
  140. val = cal_cos_sim(a_key, a_stem, b_key, b_stem)
  141. if val >= agg_threshold:
  142. correlation_key.append(b_key)
  143. except Exception as e:
  144. logging.error("主关键词:%s 发生异常,涉及的副关键词信息-关键词:%s,分词:%s" % (a_key, b_key, b_stem), e)
  145. # 有内容则进行保存
  146. if len(correlation_key) > 1:
  147. agg_result.append(correlation_key)
  148. logging.debug("子进程-%d 主关键词:%s 计算相关性结束,相关的关键词数据量:%d" % (pid, a_key, (len(correlation_key)-1)))
  149. logging.info("子进程-%d 执行任务结束,耗时:%f" % (pid, (time() - start_time)))
  150. return {
  151. "agg_result": agg_result,
  152. "start_pos": start_pos
  153. }
  154. def main_process():
  155. """
  156. 主进程
  157. """
  158. # 进程数
  159. process_num = 4
  160. # KEY 表总长度
  161. total_task = 14500028
  162. # 任务数量
  163. per_task_num = 100
  164. # 划分子任务:任务进度记录、任务列表
  165. process_record, tasks = avg_split_task(total_task, per_task_num)
  166. with ProcessPoolExecutor(max_workers=process_num) as process_pool, \
  167. open(config.AGG_FILE, "a", encoding=config.ENCODING_CHARSET) as f:
  168. logging.info("主进程:提交任务到子进程")
  169. process_futures = [process_pool.submit(sub_process, task[0], task[1]) for task in tasks]
  170. for p_future in as_completed(process_futures):
  171. logging.debug("主进程:子进程返回部分数据")
  172. result = p_future.result()
  173. # 记录处理进度
  174. cur_pos = result["start_pos"]
  175. process_record[cur_pos//per_task_num]=1
  176. # 保存分析结果
  177. if result:
  178. logging.debug("主进程:存在有效数据开始处理")
  179. for correlation_key in result["agg_result"]:
  180. f.write("\n######开始######\n")
  181. for key in correlation_key:
  182. f.write("%s\n" % key)
  183. # 保存处理进度
  184. tools.save_obj(config.ANALYSE_PROCESS_CACHE, process_record)
  185. tools.tip(total_task, cur_pos)
  186. def avg_split_task(total:int, split_internal:int):
  187. """
  188. 平分任务
  189. """
  190. # 任务列表
  191. tasks = None
  192. # 任务进度记录
  193. process_record = None
  194. # 分割的任务份数
  195. split_num = math.ceil(total / split_internal)
  196. # 平分
  197. tmp_lists = []
  198. for i in range(split_num):
  199. # 计算平分点在列表中的位置
  200. start_pos = i * split_internal
  201. end_pos = i * split_internal + split_internal
  202. # 如果超过列表大小需要额外处理
  203. if end_pos >= total:
  204. end_pos = None
  205. tmp_lists.append([start_pos,end_pos])
  206. # 加载进度缓存
  207. if os.path.exists(config.ANALYSE_PROCESS_CACHE):
  208. logging.debug("存在分析进度缓存")
  209. process_record = tools.load_obj(config.ANALYSE_PROCESS_CACHE)
  210. # 更新任务列表
  211. if process_record:
  212. tasks = []
  213. for task in tmp_lists:
  214. pos = task[0] // split_internal
  215. if not process_record[pos]:
  216. tasks.append(task)
  217. else:
  218. tasks = tmp_lists
  219. process_record = [0 for i in range(len(tmp_lists))]
  220. return process_record, tasks
  221. if __name__ == "__main__":
  222. TITLE = "(多进程版 fast_14.py)聚合文件"
  223. tools.log_start_msg(TITLE)
  224. main_process()
  225. tools.log_end_msg(TITLE)