key_reverse.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. # -*- coding:utf-8 -*-
  2. from asyncio import FIRST_COMPLETED
  3. from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed, wait
  4. from multiprocessing import Manager
  5. import multiprocessing
  6. from queue import Queue
  7. import sys
  8. from time import time
  9. import os
  10. from unittest import result
  11. import config
  12. import tools
  13. import ast
  14. import re
  15. import math
  16. import stop_word
  17. import logging
  18. import mmap
  19. import threading
  20. TITLE = "生成关键词倒排和统计信息"
  21. def thread_handle(path, start_pos, end_pos):
  22. pattern = re.compile(config.KEY_RE_PATTERAN, re.I)
  23. t = threading.current_thread()
  24. print("线城-%d 开始任务,开始位置:%d,结束位置:%d" % (t.ident, start_pos, end_pos))
  25. # 临时容器
  26. key_reverse = []
  27. with open(path, "r", encoding=config.ENCODING_CHARSET) as fkey, \
  28. mmap.mmap(fkey.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
  29. # 移动到开始位置
  30. fmmap.seek(start_pos)
  31. while True:
  32. # 获取当前处理位置
  33. cur_pos = fmmap.tell()
  34. # 越界检查
  35. if cur_pos > end_pos:
  36. break
  37. line = fmmap.readline()
  38. key_reverse.append(line)
  39. # 暂留
  40. # # 读取关键词数据
  41. # line = fmmap.readline().decode("UTF-8")
  42. # # 如果到行尾则结束
  43. # if not line:
  44. # break
  45. # # 提取数据
  46. # m = pattern.match(line)
  47. # # 获取关键词序号、词根
  48. # index = m.group(1)
  49. # key_root = m.group(3)
  50. # # 转换成真正的list对象
  51. # key_root = ast.literal_eval(key_root)
  52. # key_reverse.append((index, key_root))
  53. # print("线城-%d 结束任务" % t.ident)
  54. # return key_reverse
  55. return {
  56. "tid": t.ident,
  57. "key_reverse": key_reverse
  58. }
  59. def process_handle(stop_word_cache, lines):
  60. pid = os.getpid()
  61. print("进程-%d 接收到数据,开始计算")
  62. pattern = re.compile(config.KEY_RE_PATTERAN, re.I)
  63. key_reverse_dict={}
  64. for line in lines:
  65. line = line.decode("UTF-8")
  66. # 提取数据
  67. m = pattern.match(line)
  68. # 获取关键词序号、词根
  69. index = m.group(1)
  70. key_root = m.group(3)
  71. # 转换成真正的list对象
  72. for item in ast.literal_eval(key_root):
  73. # 排除停用词
  74. if item in stop_word_cache:
  75. continue
  76. # 构建倒排表和统计数据量
  77. val = key_reverse_dict.get(item)
  78. if val:
  79. key_reverse_dict[item]["count"] = key_reverse_dict[item]["count"] + 1
  80. key_reverse_dict[item]["indexs"].append(index)
  81. else:
  82. key_reverse_dict[item]= {
  83. "count":1,
  84. "indexs":[]
  85. }
  86. return {
  87. "pid": pid,
  88. "key_reverse_dict":key_reverse_dict
  89. }
  90. def multi_thread():
  91. """
  92. 构建待排表(多线程)
  93. """
  94. key_reverse_dict = {}
  95. def process_handle_result(future):
  96. print("进程-%d 执行结束,返回部分数据 合并数据" % result["pid"])
  97. result = future.result()
  98. for key,value in result["key_reverse_dict"].items():
  99. val = key_reverse_dict.get(key)
  100. if val:
  101. key_reverse_dict[key]["count"] = key_reverse_dict[key]["count"] + value["count"]
  102. key_reverse_dict[key]["indexs"].append(value["indexs"])
  103. else:
  104. key_reverse_dict[key]= value
  105. print("进程-%d 合并数据结束" % result["pid"])
  106. def thread_handle_result(future):
  107. result = future.result()
  108. logging.info("线程-%d 执行结束,返回部分数据,提交至计算进程" % result["tid"])
  109. process_future = process_pool.submit(process_handle, stop_word_cache, result["key_reverse"])
  110. process_future.add_done_callback(process_handle_result)
  111. tools.init_log()
  112. tools.log_start_msg(TITLE)
  113. logging.info("目前执行的是多线程版")
  114. logging.info("开始执行初始化")
  115. # 提取规则
  116. pattern = re.compile(config.KEY_RE_PATTERAN, re.I)
  117. # 停用表
  118. stop_word_cache = stop_word.load_stop_word()
  119. # 进程处理数
  120. thread_num = 1
  121. process_num = 2
  122. worker_num = 100
  123. # 关键表索引
  124. key_index_cache = tools.load_obj(config.KEY_INDEX_CACHE)
  125. logging.info("开始对数据进行分段计算")
  126. # 对索引文件中的元素进行平分
  127. # 转成列表,计算总长 和 平分后的处理区间
  128. key_index_list = [key for key in key_index_cache.keys()]
  129. total_len = len(key_index_list)
  130. internal = math.ceil(total_len / worker_num )
  131. # 利用 缓存索引文件 生成处理区间的位置信息
  132. # 位置信息容器
  133. pos_list = []
  134. for i in range(worker_num + 1):
  135. # 计算平分点在列表中的位置
  136. l_pos = i * internal
  137. # 如果大于等于列表大小需要额外处理
  138. if l_pos >= total_len:
  139. l_pos = total_len -1
  140. # 获取列表中的词根
  141. key_index = key_index_list[l_pos:l_pos+1]
  142. # 根据词根获取位置信息
  143. pos = key_index_cache[key_index[0]]
  144. # 记录位置信息
  145. pos_list.append(pos)
  146. logging.info("把分段结果提交至多线程执行")
  147. # 生成任务
  148. with ThreadPoolExecutor(thread_num) as thread_pool, \
  149. ProcessPoolExecutor(process_num) as process_pool:
  150. # thread_futures = []
  151. for i in range(0, len(pos_list)-1):
  152. pos = pos_list[i: i+2]
  153. # thread_futures.append()
  154. thread_future = thread_pool.submit(thread_handle, config.KEY_FILE, pos[0], pos[1])
  155. thread_future.add_done_callback(thread_handle_result)
  156. # 等待数据返回
  157. logging.info("等待多线程执行结束")
  158. thread_pool.shutdown(wait=True)
  159. process_pool.shutdown(wait=True)
  160. logging.info("已获取全部子进程返回部分结果,总数据量:%d" % len(key_reverse_dict))
  161. return
  162. # 根据关键词数量进行排序,这里通过items()方法转成元组列表,才能进行排序
  163. logging.info("根据关键词数量进行倒序排列")
  164. sorted_reverse_list = sorted(key_reverse_dict.items(), key=lambda x: x[1]["count"], reverse=True)
  165. # 保存到本地文件
  166. logging.info("保存到本地")
  167. with open(config.KEY_REVERSE_FILE, "w", encoding=config.ENCODING_CHARSET) as f:
  168. for key, value in sorted_reverse_list:
  169. f.write("%s,%d,%s\n" % (key, value["count"], value["indexs"]))
  170. tools.log_end_msg(TITLE)
  171. def handle(path, start_pos, end_pos, share_pattern, stop_word_cache):
  172. pid = os.getpid()
  173. print("进程-%d 开始任务,开始位置:%d,结束位置:%d" % (pid, start_pos, end_pos))
  174. # 临时容器
  175. key_reverse = {}
  176. # 提取数据用的正则
  177. pattern = share_pattern.value
  178. with open(path, "r", encoding=config.ENCODING_CHARSET) as fkey, \
  179. mmap.mmap(fkey.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
  180. # 移动到开始位置
  181. fmmap.seek(start_pos)
  182. while True:
  183. # 获取当前处理位置
  184. cur_pos = fmmap.tell()
  185. # 越界检查
  186. if cur_pos > end_pos:
  187. break
  188. # 读取关键词数据
  189. line = fmmap.readline().decode("UTF-8")
  190. # 如果到行尾则结束
  191. if not line:
  192. break
  193. # 提取数据
  194. m = pattern.match(line)
  195. # 获取关键词序号、词根
  196. index = m.group(1)
  197. key_root = m.group(3)
  198. # 转换成真正的list对象
  199. for item in ast.literal_eval(key_root):
  200. # 排除停用词
  201. if item in stop_word_cache:
  202. continue
  203. # 构建倒排表和统计数据量
  204. val = key_reverse.get(item)
  205. if val:
  206. count = key_reverse[item]["count"]
  207. key_reverse[item]["count"] = count + 1
  208. key_reverse[item]["indexs"].append(index)
  209. else:
  210. key_reverse[item]= {
  211. "count":1,
  212. "indexs":[]
  213. }
  214. print("进程-%d 任务执行结束" % pid)
  215. return key_reverse
  216. def multi_process():
  217. """
  218. 构建待排表(多进程)
  219. """
  220. tools.init_log()
  221. tools.log_start_msg(TITLE)
  222. logging.info("目前执行的是多进程版")
  223. logging.info("开始执行初始化")
  224. manager = Manager()
  225. # 提取规则
  226. share_pattern = manager.Value("pattern", re.compile(config.KEY_RE_PATTERAN, re.I))
  227. # 停用表
  228. stop_word_cache = manager.dict(stop_word.load_stop_word())
  229. # 进程处理数
  230. process_num = os.cpu_count() - 1
  231. worker_num = process_num
  232. pool = ProcessPoolExecutor(max_workers=process_num)
  233. # 关键表索引
  234. key_index_cache = tools.load_obj(config.KEY_INDEX_CACHE)
  235. logging.info("开始对数据进行分段计算")
  236. # 对索引文件中的元素进行平分
  237. # 转成列表,计算总长 和 平分后的处理区间
  238. key_index_list = [key for key in key_index_cache.keys()]
  239. total_len = len(key_index_list)
  240. internal = math.ceil(total_len / worker_num )
  241. # 利用 缓存索引文件 生成处理区间的位置信息
  242. # 位置信息容器
  243. pos_list = []
  244. for i in range(worker_num + 1):
  245. # 计算平分点在列表中的位置
  246. l_pos = i * internal
  247. # 如果大于等于列表大小需要额外处理
  248. if l_pos >= total_len:
  249. l_pos = total_len -1
  250. # 获取列表中的词根
  251. key_index = key_index_list[l_pos:l_pos+1]
  252. # 根据词根获取位置信息
  253. pos = key_index_cache[key_index[0]]
  254. # 记录位置信息
  255. pos_list.append(pos)
  256. logging.info("把分段结果提交至多进程执行")
  257. # 生成任务
  258. process_futures = []
  259. for i in range(0, len(pos_list)-1):
  260. pos = pos_list[i: i+2]
  261. process_futures.append(pool.submit(handle, config.KEY_FILE, pos[0], pos[1], share_pattern, stop_word_cache))
  262. # 等待数据返回
  263. logging.info("等待多进程执行结束")
  264. wait(process_futures,return_when=FIRST_COMPLETED)
  265. key_reverse_dict = {}
  266. for future in as_completed(process_futures):
  267. logging.info("有子进程执行结束,返回部分结果")
  268. result = future.result()
  269. for key, value in result.items():
  270. # 进行数据合并
  271. val_dict = key_reverse_dict.get(key)
  272. if val_dict:
  273. count = val_dict["count"]
  274. key_reverse_dict[key]["count"] = count + value["count"]
  275. key_reverse_dict[key]["indexs"].extend(value["indexs"])
  276. else:
  277. key_reverse_dict[key] = value
  278. logging.info("已获取全部子进程返回部分结果")
  279. # 根据关键词数量进行排序,这里通过items()方法转成元组列表,才能进行排序
  280. logging.info("根据关键词数量进行倒序排列")
  281. sorted_reverse_list = sorted(key_reverse_dict.items(), key=lambda x: x[1]["count"], reverse=True)
  282. # 保存到本地文件
  283. logging.info("保存到本地")
  284. with open(config.KEY_REVERSE_FILE, "w", encoding=config.ENCODING_CHARSET) as f:
  285. for key, value in sorted_reverse_list:
  286. f.write("%s,%d,%s\n" % (key, value["count"], value["indexs"]))
  287. tools.log_end_msg(TITLE)
  288. def one_process():
  289. """
  290. 构建倒排表(单进程版)
  291. """
  292. tools.init_log()
  293. tools.log_start_msg(TITLE)
  294. logging.info("目前执行的是单进程版")
  295. # 提取规则
  296. pattern = re.compile(config.KEY_RE_PATTERAN, re.I)
  297. # 倒排表 容器
  298. key_reverse = {}
  299. # 停用表
  300. stop_word_cache = stop_word.load_stop_word()
  301. with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as fkey:
  302. # 获取文件总大小,获取后需要复原光标位置
  303. fkey.seek(0, os.SEEK_END)
  304. total_num = fkey.tell()
  305. fkey.seek(0)
  306. while True:
  307. # 获取当前处理位置
  308. cur_pos = fkey.tell()
  309. # 进度提示
  310. tools.tip_in_size(total_num, cur_pos)
  311. # 读取关键词数据
  312. line = fkey.readline()
  313. # 如果到行尾则结束
  314. if not line:
  315. break
  316. # 提取数据
  317. m = pattern.match(line)
  318. # 获取关键词序号
  319. index = m.group(1)
  320. # 获取词根
  321. key_root = m.group(3)
  322. # 转换成真正的list对象
  323. for item in ast.literal_eval(key_root):
  324. # 排除停用词
  325. if item in stop_word_cache:
  326. continue
  327. # 构建倒排表
  328. val = key_reverse.get(item)
  329. if val:
  330. count = key_reverse[item]["count"]
  331. key_reverse[item]["count"] = count + 1
  332. key_reverse[item]["indexs"].append(index)
  333. else:
  334. key_reverse[item]= {
  335. "count":1,
  336. "indexs":[]
  337. }
  338. logging.info("根据关键词数量进行倒序排列")
  339. sorted_reverse_list = sorted(key_reverse.items(), key=lambda x: x[1]["count"], reverse=True)
  340. # 保存到本地文件
  341. logging.info("保存到本地")
  342. with open(config.KEY_REVERSE_FILE, "w", encoding=config.ENCODING_CHARSET) as f:
  343. for key, value in sorted_reverse_list:
  344. f.write("%s,%d,%s\n" % (key, value["count"], value["indexs"]))
  345. tools.log_end_msg(TITLE)
  346. if __name__ == "__main__":
  347. multi_thread()