agg_word.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. # -*- coding:utf-8 -*-
  2. from concurrent.futures import ProcessPoolExecutor, as_completed
  3. from functools import reduce
  4. from itertools import combinations
  5. import math
  6. import mmap
  7. import os
  8. from time import sleep, time
  9. from cal import cal_cos_sim
  10. import config
  11. import tools
  12. import stop_word
  13. import re
  14. import logging
  15. # 问题
  16. # 用线程处理IO高的部分
  17. # 主线程利用率极低
  18. # 优化代码,加快速度(目前速度:约1分钟100个关键词)
  19. # 已解决
  20. # 输出的格式不正确
  21. # 分析结果内容没有写入结果中
  22. # 移除祠根数等于1的词,不做分析
  23. # 减少重复加载 -> 解决:加入仅在子进程时才加载的判断
  24. tools.init_log()
  25. def intesect(x, y):
  26. """
  27. 计算集合的交集
  28. """
  29. return x & y
  30. if __name__ != "__main__":
  31. # 停用词
  32. stop_word_index = stop_word.load_stop_word()
  33. # KEY表索引
  34. key_index = tools.load_obj(config.KEY_INDEX_CACHE)
  35. # 倒排表索引
  36. reverse_index = tools.load_obj(config.KEY_REVERSE_INDEX_CACHE)
  37. # 聚合阈值
  38. agg_threshold = 0.8
  39. # 正则提取
  40. # 倒排表 索引
  41. index_re = r"'(\d+)'"
  42. index_pattern = re.compile(index_re, re.I)
  43. # 关键词
  44. key_re = r"[^,]*,(.*),\["
  45. key_pattern = re.compile(key_re, re.I)
  46. # KEY表 词根
  47. stem_re = r"'([^,]*)'"
  48. stem_pattern = re.compile(stem_re, re.I)
  49. def sub_process(start_pos, end_pos):
  50. """
  51. 子进程
  52. """
  53. pid = os.getpid()
  54. logging.debug("子进程-%d 开始执行任务,开始位置:%d,结束位置:%d" % (pid,start_pos, end_pos))
  55. # 聚合结果
  56. agg_result = []
  57. # 开始时间
  58. start_time = time()
  59. with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as f_key, \
  60. mmap.mmap(f_key.fileno(), 0, access=mmap.ACCESS_READ) as f_key_mmap, \
  61. open(config.KEY_REVERSE_FILE, "r", encoding=config.ENCODING_CHARSET) as f_reverse, \
  62. mmap.mmap(f_reverse.fileno(), 0, access=mmap.ACCESS_READ) as f_reverse_mmap :
  63. # 把关键词索引转换成对应的位置
  64. lower_pos = key_index[start_pos]
  65. upper_pos = key_index[end_pos]
  66. # 移动到开始位置
  67. f_key_mmap.seek(lower_pos)
  68. # 读取主关键词信息
  69. a_keys = {}
  70. while True:
  71. # 校验当前位置是否越界
  72. cur_pos = f_key_mmap.tell()
  73. if cur_pos >= upper_pos:
  74. break
  75. line = f_key_mmap.readline().decode("UTF-8")
  76. # 提取 关键词、词根
  77. key_m = key_pattern.match(line)
  78. a_key = key_m.group(1)
  79. a_stem = []
  80. # 过滤停用词
  81. tmp_stem = stem_pattern.findall(line)
  82. for stem in tmp_stem:
  83. if stem in stop_word_index:
  84. continue
  85. a_stem.append(stem)
  86. # 保存到容器,如果祠根数等于1则没有比较的价值
  87. if len(a_stem) > 1:
  88. a_keys[a_key]=a_stem
  89. # 合并词根
  90. all_stem = set()
  91. for a_stem in a_keys.values():
  92. for stem in a_stem:
  93. all_stem.add(stem)
  94. # 获取倒排信息
  95. reverse_dict = {}
  96. for stem in all_stem:
  97. # 读取倒排表
  98. f_reverse_mmap.seek(reverse_index[stem])
  99. reverse_line = f_reverse_mmap.readline().decode("UTF-8")
  100. # 提取 位置信息
  101. b_indexs = index_pattern.findall(reverse_line)
  102. reverse_dict[stem]=set(b_indexs)
  103. # 计算相关性
  104. for a_key, a_stem in a_keys.items():
  105. # 计算词根组合
  106. logging.debug("子进程-%d 主关键词:%s 开始计算词根组合" % (pid, a_key))
  107. tmp_stem = []
  108. for stem in a_stem:
  109. tmp_stem.append(stem)
  110. num = math.ceil(len(tmp_stem) * 0.7)
  111. stem_combs = list (combinations(tmp_stem, num))
  112. logging.debug("子进程-%d 主关键词:%s 计算词根组合结束" % (pid, a_key))
  113. logging.debug("子进程-%d 主关键词:%s 开始获取词根涉及的关键词信息" % (pid, a_key))
  114. # 计算词根涉及的关键词的交集
  115. b_indexs = set()
  116. for stem_comb in stem_combs:
  117. indexs = [reverse_dict[a_stem] for a_stem in stem_comb]
  118. for b_index in reduce(intesect, indexs):
  119. b_indexs.add(b_index)
  120. logging.debug("子进程-%d 主关键词:%s 总祠根数:%d" % (pid, a_key, len(b_indexs)))
  121. # 获取关键词信息
  122. b_keys = []
  123. for b_index in b_indexs:
  124. # 读取关键词数据
  125. f_key_mmap.seek(key_index[int(b_index)])
  126. line = f_key_mmap.readline().decode("UTF-8")
  127. # 提取 关键词、词根
  128. key_m = key_pattern.match(line)
  129. b_key = key_m.group(1)
  130. b_stem = stem_pattern.findall(line)
  131. b_keys.append((b_key, b_stem))
  132. logging.debug("子进程-%d 主关键词:%s 获取词根涉及的关键词信息结束,涉及计算关键词数量:%d" % (pid, a_key, len(b_keys)))
  133. logging.debug("子进程-%d 主关键词:%s 开始计算相关性" % (pid, a_key))
  134. # 结果容器
  135. correlation_key = []
  136. correlation_key.append(a_key)
  137. # 计算相关性
  138. if b_keys:
  139. for b_key, b_stem in b_keys:
  140. try:
  141. val = cal_cos_sim(a_key, a_stem, b_key, b_stem)
  142. if val >= agg_threshold:
  143. correlation_key.append(b_key)
  144. except Exception as e:
  145. logging.error("主关键词:%s 发生异常,涉及的副关键词信息-关键词:%s,分词:%s" % (a_key, b_key, b_stem), e)
  146. # 有内容则进行保存
  147. if len(correlation_key) > 1:
  148. agg_result.append(correlation_key)
  149. logging.debug("子进程-%d 主关键词:%s 计算相关性结束,相关的关键词数据量:%d" % (pid, a_key, (len(correlation_key)-1)))
  150. logging.debug("子进程-%d 执行任务结束,耗时:%f" % (pid, (time() - start_time)))
  151. return {
  152. "agg_result": agg_result,
  153. "start_pos": start_pos
  154. }
  155. def main_process():
  156. """
  157. 主进程
  158. """
  159. # 进程数
  160. process_num = 4
  161. # KEY 表总长度
  162. total_task = 14500028
  163. # 任务数量
  164. per_task_num = 100
  165. # 处理进度保存间隔(单位:秒)
  166. save_process_internal = 300
  167. # 划分子任务:任务进度记录、任务列表
  168. process_record, tasks = avg_split_task(total_task, per_task_num)
  169. with ProcessPoolExecutor(max_workers=process_num) as process_pool, \
  170. open(config.AGG_FILE, "a", encoding=config.ENCODING_CHARSET) as f:
  171. save_start_time = time()
  172. logging.info("主进程:提交任务到子进程")
  173. process_futures = [process_pool.submit(sub_process, task[0], task[1]) for task in tasks]
  174. for p_future in as_completed(process_futures):
  175. logging.debug("主进程:子进程返回部分数据")
  176. result = p_future.result()
  177. # 记录处理进度
  178. cur_pos = result["start_pos"]
  179. process_record[cur_pos]=1
  180. # 保存分析结果
  181. if result:
  182. logging.debug("主进程:存在有效数据开始处理")
  183. for correlation_key in result["agg_result"]:
  184. f.write("\n######开始######\n")
  185. for key in correlation_key:
  186. f.write("%s\n" % key)
  187. # 保存处理进度
  188. if (time() - save_start_time) > save_process_internal:
  189. logging.debug("保存处理进度")
  190. # 更新开始时间
  191. save_start_time = time()
  192. tools.save_obj(config.ANALYSE_PROCESS_CACHE, process_record)
  193. tools.tip(total_task, cur_pos)
  194. def avg_split_task(total:int, split_internal:int):
  195. """
  196. 平分任务
  197. """
  198. # 任务列表
  199. tasks = None
  200. # 任务进度记录
  201. process_record = None
  202. # 分割的任务份数
  203. split_num = math.ceil(total / split_internal)
  204. # 平分
  205. tmp_lists = []
  206. for i in range(split_num):
  207. # 计算平分点在列表中的位置
  208. start_pos = i * split_internal
  209. end_pos = i * split_internal + split_internal
  210. # 如果超过列表大小需要额外处理
  211. if end_pos >= total:
  212. end_pos = None
  213. tmp_lists.append([start_pos,end_pos])
  214. # 加载进度缓存
  215. if os.path.exists(config.ANALYSE_PROCESS_CACHE):
  216. logging.debug("存在分析进度缓存")
  217. process_record = tools.load_obj(config.ANALYSE_PROCESS_CACHE)
  218. # 更新任务列表
  219. if process_record:
  220. tasks = []
  221. for task in tmp_lists:
  222. pos = task[0] // split_internal
  223. if not process_record[pos]:
  224. tasks.append(task)
  225. else:
  226. tasks = tmp_lists
  227. process_record = [0 for i in range(len(tmp_lists))]
  228. return process_record, tasks
  229. if __name__ == "__main__":
  230. TITLE = "(多进程版 fast_14.py)聚合文件"
  231. tools.log_start_msg(TITLE)
  232. main_process()
  233. tools.log_end_msg(TITLE)