agg.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536
  1. # -*- coding:utf-8 -*-
  2. import math
  3. import os
  4. import re
  5. import shutil
  6. import threading
  7. import time
  8. from concurrent.futures import ProcessPoolExecutor, as_completed, ThreadPoolExecutor
  9. import jieba
  10. import redis
  11. from bitarray import bitarray
  12. from tqdm import tqdm
  13. import utils
  14. import logging
  15. from constant import FILE_LONG_TAIL_MERGE
  16. # 文件:长尾词_合并_分词.txt
  17. FILE_LONG_TAIL_MERGE_SPLIT = "长尾词_合并_分词.txt"
  18. # 文件:长尾词_合并_聚合.txt
  19. FILE_LONG_TAIL_MERGE_AGG = "长尾词_合并_聚合.txt"
  20. # 文件夹:历史聚合数据归档文件夹
  21. DIR_AGG_FILE_ARCHIVE = "长尾词_聚合_归档_%s"
  22. # 文件:长尾词_合并_分词倒排索引.txt
  23. FILE_LONG_TAIL_MERGE_REVERSE_INDEX = "长尾词_合并_倒排索引.txt"
  24. # 子文件:长尾词_合并_聚合_%s.txt
  25. FILE_LONG_TAIL_MERGE_AGG_PID = "长尾词_合并_聚合_%s_%s.txt"
  26. # 缓存前缀:分词词根
  27. CACHE_WORD_STEM = "word:stem"
  28. # 缓存前缀:倒排索引
  29. CACHE_WORD_REVERSE_INDEX = "word:reverse_index"
  30. # 缓存:长尾词缓存
  31. CACHE_WORD = "word"
  32. # 缓存:聚合位图
  33. CACHE_UNUSED_BITMAP = "unused_bitmap"
  34. # 字符集:UTF-8
  35. CHARSET_UTF_8 = "UTF-8"
  36. # redis缓存池
  37. redis_pool: redis.ConnectionPool = None
  38. # 线程池
  39. thread_pool: ThreadPoolExecutor = None
  40. # 线程池(计算用)
  41. cal_thread_pool: ThreadPoolExecutor = None
  42. # 线程本地变量
  43. local_var = threading.local()
  44. def agg_word(file_path: str):
  45. """
  46. 长尾词聚合
  47. :param file_path:
  48. :return:
  49. """
  50. # 总长尾词数量
  51. # word_total_num = 0
  52. word_total_num = 1000000
  53. # 聚合阈值
  54. agg_threshold = 0.8
  55. # 每个进程任务计算量
  56. per_process_task_num = 10000
  57. # 每个线程任务计算量
  58. per_thread_task_num = 50
  59. # 工作现成(减1是为了留一个处理器给redis)
  60. worker_num = os.cpu_count() - 1
  61. # worker_num = 1
  62. # 最大线程数
  63. max_threads = 2
  64. # redis最大连接数(和工作线程数保持一致,免得浪费)
  65. redis_max_conns = max_threads
  66. # 正则表达式:聚合文件分文件
  67. agg_file_pattern = re.compile(r"长尾词_合并_聚合_\d+_\d+.txt", re.I)
  68. # redis缓存
  69. m_redis_cache = redis.StrictRedis(host='127.0.0.1', port=6379)
  70. # 判断文件是否存在
  71. for file_name in [FILE_LONG_TAIL_MERGE, FILE_LONG_TAIL_MERGE_SPLIT,
  72. FILE_LONG_TAIL_MERGE_REVERSE_INDEX]:
  73. input_file = os.path.join(file_path, file_name)
  74. if os.path.exists(input_file) and not os.path.isfile(input_file):
  75. raise Exception("文件不存在!文件路径:" + input_file)
  76. # 归档历史数据文件
  77. history_agg_file_list = [file for file in os.listdir(file_path) if agg_file_pattern.match(file)]
  78. if len(history_agg_file_list) > 0:
  79. archive_path = os.path.join(file_path, DIR_AGG_FILE_ARCHIVE % time.strftime('%Y%m%d%H%M%S'))
  80. os.makedirs(archive_path)
  81. for history_agg_file in history_agg_file_list:
  82. shutil.move(os.path.join(file_path, history_agg_file), archive_path)
  83. # 缓存关键词位置
  84. # word_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE)
  85. # word_dict = {}
  86. # with open(word_file, "r", encoding="utf-8") as f:
  87. # for position, word in enumerate(f, start=1):
  88. # word = utils.remove_line_break(word)
  89. # if not word:
  90. # continue
  91. # word_dict[position] = word
  92. # m_redis_cache.hset(CACHE_WORD, mapping=word_dict)
  93. # # 记录总关键词数
  94. # word_total_num = len(word_dict)
  95. # # 释放内存
  96. # del word_dict
  97. #
  98. # # 缓存分词
  99. # word_split_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_SPLIT)
  100. # word_split_dict = {}
  101. # with open(word_split_file, "r", encoding=CHARSET_UTF_8) as f:
  102. # for position, word_split_line in enumerate(f, start=1):
  103. # word_split_line = utils.remove_line_break(word_split_line)
  104. # if not word_split_line:
  105. # continue
  106. # word_split_dict[position] = word_split_line
  107. # m_redis_cache.hset(CACHE_WORD_STEM, mapping=word_split_dict)
  108. # # 释放内存
  109. # del word_split_dict
  110. #
  111. # # 缓存倒排索引
  112. # word_reverse_index_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_REVERSE_INDEX)
  113. # word_reverse_index_dict = {}
  114. # # 分词
  115. # key_pattern = re.compile(r"([^,]+),\[", re.I)
  116. # # 索引
  117. # index_pattern = re.compile(r"\d+", re.I)
  118. # with open(word_reverse_index_file, "r", encoding="utf-8") as f:
  119. # for word_split_line in f:
  120. # key_m = key_pattern.match(word_split_line)
  121. # key = key_m.group(1)
  122. # val = index_pattern.findall(word_split_line[word_split_line.index(","):])
  123. # word_reverse_index_dict[key] = ",".join(val)
  124. # m_redis_cache.hset(CACHE_WORD_REVERSE_INDEX, mapping=word_reverse_index_dict)
  125. # # 释放内存
  126. # del word_reverse_index_dict
  127. # 先清除,然后重新构建长尾词使用位图
  128. m_redis_cache.delete(CACHE_UNUSED_BITMAP)
  129. m_redis_cache.setbit(CACHE_UNUSED_BITMAP, word_total_num + 2, 1)
  130. # 提交任务 并输出结果
  131. word_agg_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_AGG)
  132. with ProcessPoolExecutor(max_workers=worker_num, initializer=init_process,
  133. initargs=(redis_max_conns, max_threads, file_path)) as process_pool:
  134. # 计算任务边界
  135. task_list = utils.avg_split_task(word_total_num, per_process_task_num, 1)
  136. # 提交任务
  137. process_futures = []
  138. for skip_line, pos in enumerate(task_list, start=1):
  139. skip_line = (skip_line % worker_num) + 1
  140. p_future = process_pool.submit(agg_word_process, agg_threshold, pos[0], pos[1], word_total_num,
  141. skip_line, per_thread_task_num)
  142. process_futures.append(p_future)
  143. # 显示任务进度
  144. with tqdm(total=len(process_futures), desc='文本聚合进度', unit='份', unit_scale=True) as pbar:
  145. for p_future in as_completed(process_futures):
  146. p_result = p_future.result()
  147. # 更新发呆进度
  148. pbar.update(1)
  149. # 关闭线程
  150. process_pool.shutdown()
  151. # 获取子进程结果文件列表,并合并
  152. with open(word_agg_file, "w", encoding="UTF-8") as fo:
  153. for file in os.listdir(file_path):
  154. # 不是处理结果部分跳过
  155. if not agg_file_pattern.match(file):
  156. continue
  157. with open(os.path.join(file_path, file), "r", encoding="UTF-8") as fi:
  158. for word in fi:
  159. fo.write(word)
  160. def prepare_word_split_and_reverse_index(file_path: str):
  161. """
  162. 预处理:长尾词分词、建立倒排索引
  163. :param file_path: 待处理文件夹路径
  164. :return:
  165. """
  166. # 判断文件是否存在
  167. word_input_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE)
  168. if os.path.exists(word_input_file) and not os.path.isfile(word_input_file):
  169. print("文件不存在! " + word_input_file)
  170. return
  171. # 总文本数量
  172. total_line_num = 0
  173. with open(word_input_file, "r", encoding="utf-8") as fi:
  174. total_line_num = sum(1 for line in fi)
  175. if total_line_num == 0:
  176. print("没有待处理的数据,文本量为0")
  177. return
  178. # 分割任务数量
  179. task_list = utils.avg_split_task(total_line_num, math.ceil(total_line_num / os.cpu_count()))
  180. # 任务进程处理结果
  181. p_result_list = []
  182. # 多进程处理
  183. with ProcessPoolExecutor(os.cpu_count()) as process_pool:
  184. # 提交任务
  185. process_futures = [process_pool.submit(word_split_reverse, word_input_file, task[0], task[1]) for task in
  186. task_list]
  187. # 处理返回结果
  188. for p_future in as_completed(process_futures):
  189. p_result = p_future.result()
  190. if p_result:
  191. p_result_list.append(p_result)
  192. # 分词结果排序
  193. p_result_list = sorted(p_result_list, key=lambda v: v[0])
  194. # 输出分词结果
  195. split_output_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_SPLIT)
  196. with open(split_output_file, "w", encoding="UTF-8") as fo:
  197. for start_pos, word_arr_list, reverse_index in p_result_list:
  198. for word_arr in word_arr_list:
  199. fo.write("%s\n" % ",".join([str(i) for i in word_arr]))
  200. # 关键词倒排索引
  201. word_reverse_index_dict = dict()
  202. # 合并倒排索引
  203. for start_pos, word_arr, reverse_index_dict in p_result_list:
  204. for key, value in reverse_index_dict.items():
  205. reverse_index_arr = word_reverse_index_dict.get(key)
  206. if reverse_index_arr:
  207. reverse_index_arr.extend(value)
  208. else:
  209. word_reverse_index_dict[key] = value
  210. # 输出倒排索引
  211. with open(os.path.join(file_path, FILE_LONG_TAIL_MERGE_REVERSE_INDEX), "w", encoding="UTF-8") as fo:
  212. for key, value in word_reverse_index_dict.items():
  213. fo.write("%s,%s\n" % (key, value))
  214. # 关闭进程池
  215. process_pool.shutdown()
  216. def word_split_reverse(input_file: str, start_pos: int, end_pos: int):
  217. """
  218. 分词和建立倒排索引
  219. :param input_file: 待处理的文件
  220. :param start_pos: 处理的开始位置
  221. :param end_pos: 处理的结束位置
  222. :return: (开始位置,分词结果,倒排索引)
  223. """
  224. # 加载停用词
  225. stop_word_dict = utils.load_stop_word()
  226. # 关键词存放容器
  227. word_arr_list = []
  228. # 倒排索引
  229. word_reverse_index = dict()
  230. with open(input_file, "r", encoding="utf-8") as fr:
  231. for i, tmp_word in enumerate(fr):
  232. # start_pos是行数,而i要从0开始
  233. if i + 1 < start_pos:
  234. continue
  235. # 当前位置
  236. cur_pos = i + 1
  237. # 到达任务边界,结束
  238. if cur_pos == end_pos:
  239. break
  240. # 分词
  241. word_list = jieba.cut_for_search(tmp_word.replace("\n", ""))
  242. # 分词过滤结果
  243. word_filter_arr = []
  244. # 过滤停用词
  245. for word in word_list:
  246. if word in stop_word_dict:
  247. continue
  248. word_index_arr = word_reverse_index.get(word)
  249. if word_index_arr:
  250. word_index_arr.append(cur_pos)
  251. else:
  252. word_reverse_index[word] = [cur_pos]
  253. word_filter_arr.append(word)
  254. if len(word_filter_arr) == 0:
  255. word_arr_list.append([])
  256. else:
  257. word_arr_list.append(word_filter_arr)
  258. return start_pos, word_arr_list, word_reverse_index
  259. def init_process(max_conns: int, max_threads: int, file_path: str):
  260. """
  261. 初始化进程
  262. :param max_conns: redis最大连接数量
  263. :param max_threads: 线程最大数量
  264. :param file_path: 输出文件路径
  265. :return:
  266. """
  267. # redis缓存池 初始化
  268. global redis_pool
  269. redis_pool = redis.ConnectionPool(host='127.0.0.1', port=6379, max_connections=max_conns)
  270. global thread_pool
  271. thread_pool = ThreadPoolExecutor(max_threads, initializer=init_thread, initargs=(file_path,))
  272. global cal_thread_pool
  273. cal_thread_pool = ThreadPoolExecutor(max_threads * 3, initargs=(file_path,))
  274. def agg_word_process(agg_threshold: float, start_pos: int, end_pos: int, final_pos: int,
  275. skip_line: int, per_thread_task_num: int):
  276. """
  277. 长尾词聚合处理
  278. :param agg_threshold: 聚合阈值
  279. :param start_pos: 任务处理开始边界(包含)
  280. :param end_pos: 任务处理结束边界(不包含)
  281. :param final_pos: 总任务边界
  282. :param skip_line: 进度条显示位置
  283. :param per_thread_task_num: 每个线程的计算量
  284. :return:
  285. """
  286. # 进度长度
  287. process_len = 0
  288. if end_pos == -1:
  289. process_len = final_pos - start_pos
  290. else:
  291. process_len = end_pos - start_pos
  292. with tqdm(total=process_len, desc='子进程-%s:文本聚合进度' % os.getpid(), unit='份', unit_scale=True,
  293. position=skip_line) as pbar:
  294. thread_futures = [thread_pool.submit(agg_word_thread, main_word_position, agg_threshold, per_thread_task_num) for main_word_position
  295. in
  296. range(start_pos, end_pos)]
  297. for t_future in as_completed(thread_futures):
  298. t_result = t_future.result()
  299. # 更新发呆进度
  300. pbar.update(1)
  301. return
  302. def init_thread(file_path: str):
  303. """
  304. 聚合线程初始化
  305. :param file_path: 输出文件路径
  306. :return:
  307. """
  308. # 初始化redis客户端
  309. local_var.redis_cache = redis.StrictRedis(connection_pool=redis_pool)
  310. # 初始化redis管道
  311. local_var.redis_pipeline = local_var.redis_cache.pipeline(transaction=False)
  312. # 生成临时结果文件
  313. word_agg_file = os.path.join(file_path,
  314. FILE_LONG_TAIL_MERGE_AGG_PID % (os.getpid(), threading.current_thread().ident))
  315. local_var.file_writer = open(word_agg_file, "w", encoding=CHARSET_UTF_8)
  316. # 已使用位图副本
  317. local_var.unused_bitmap = bitarray()
  318. # 从倒排索引中获取候选词的位置索引
  319. local_var.candidate_position_set = set()
  320. # 结果列表
  321. local_var.result_list = []
  322. def agg_word_thread(main_word_position: int, agg_threshold: float, per_thread_task_num: int):
  323. """
  324. 聚合线程
  325. :param main_word_position: 主关键词位置
  326. :param agg_threshold: 聚合阈值
  327. :param per_thread_task_num: 每个线程任务计算量
  328. :return:
  329. """
  330. try:
  331. # 获取已使用位图副本
  332. local_var.unused_bitmap.frombytes(local_var.redis_cache.get(CACHE_UNUSED_BITMAP))
  333. # 判断主词是否为已使用,是则跳过,否则设置为已使用
  334. if local_var.unused_bitmap[main_word_position]:
  335. return
  336. else:
  337. local_var.redis_cache.setbit(CACHE_UNUSED_BITMAP, main_word_position, 1)
  338. local_var.unused_bitmap[main_word_position] = 1
  339. # 获取主词和对应的词根
  340. local_var.redis_pipeline.hget(CACHE_WORD, main_word_position)
  341. local_var.redis_pipeline.hget(CACHE_WORD_STEM, main_word_position)
  342. main_result = local_var.redis_pipeline.execute()
  343. main_word = main_result[0]
  344. main_word_stem = main_result[1]
  345. # 如果存在为空则返回
  346. if not main_word or not main_word_stem:
  347. return
  348. main_word_stem_list = main_word_stem.decode(CHARSET_UTF_8).split(",")
  349. # 从倒排索引获取长尾词位置
  350. temp_candidate_position_list = local_var.redis_cache.hmget(CACHE_WORD_REVERSE_INDEX, main_word_stem_list)
  351. for temp_candidate_position in temp_candidate_position_list:
  352. if not temp_candidate_position:
  353. continue
  354. # 排除已聚合
  355. for candidate_position in temp_candidate_position.decode(CHARSET_UTF_8).split(","):
  356. if local_var.unused_bitmap[int(candidate_position)]:
  357. continue
  358. local_var.candidate_position_set.add(candidate_position)
  359. # 没有找到需要计算的候选词则跳过
  360. if not local_var.candidate_position_set:
  361. return
  362. # 从缓存获取关键词列表、分词列表,如果为空则跳过
  363. local_var.redis_pipeline.hmget(CACHE_WORD, local_var.candidate_position_set)
  364. local_var.redis_pipeline.hmget(CACHE_WORD_STEM, local_var.candidate_position_set)
  365. candidate_result = local_var.redis_pipeline.execute()
  366. candidate_word_cache_list = candidate_result[0]
  367. candidate_word_stem_cache_list = candidate_result[1]
  368. if not candidate_word_cache_list or not candidate_word_stem_cache_list:
  369. return
  370. # 延后编码成字符,以防前面直接返回
  371. main_word = main_word.decode(CHARSET_UTF_8)
  372. task_num = len(candidate_word_cache_list)
  373. if task_num <= per_thread_task_num:
  374. t_result = agg_word_cal(agg_threshold, main_word, main_word_stem_list, candidate_word_cache_list, candidate_word_stem_cache_list)
  375. local_var.result_list.extend(t_result)
  376. else:
  377. task_list = utils.avg_split_task(task_num, per_thread_task_num)
  378. cal_futures = [cal_thread_pool.submit(agg_word_cal, agg_threshold, main_word, main_word_stem_list,
  379. candidate_word_cache_list[start_pos:end_pos],
  380. candidate_word_stem_cache_list[start_pos:end_pos])
  381. for start_pos, end_pos in task_list]
  382. for cal_future in as_completed(cal_futures):
  383. local_var.result_list.extend(cal_future.result())
  384. # 保存结果
  385. if not local_var.result_list:
  386. return
  387. local_var.file_writer.write("%s\n" % main_word)
  388. for candidate_position, candidate_word in local_var.result_list:
  389. local_var.file_writer.write("%s\n" % candidate_word)
  390. local_var.redis_pipeline.setbit(CACHE_UNUSED_BITMAP, candidate_position, 1)
  391. local_var.file_writer.write("\n")
  392. local_var.redis_pipeline.execute()
  393. except Exception as e:
  394. logging.error("子进程发生异常", e)
  395. finally:
  396. # 清除容器数据
  397. local_var.candidate_position_set.clear()
  398. local_var.result_list.clear()
  399. local_var.unused_bitmap.clear()
  400. return
  401. def agg_word_cal(agg_threshold: float, main_word: str, main_word_stem_list: list,
  402. candidate_word_cache_list: list, candidate_word_stem_cache_list: list):
  403. """
  404. 计算相关性
  405. :param agg_threshold: 聚合阈值
  406. :param main_word: 主关键词
  407. :param main_word_stem_list: 主关键词分词
  408. :param candidate_word_cache_list: 候选词列表
  409. :param candidate_word_stem_cache_list: 候选词分词列表
  410. :return:
  411. """
  412. # 计算结果容器
  413. cal_result_list = []
  414. # 计算相似度
  415. for candidate_position in range(len(candidate_word_cache_list)):
  416. # 获取关键词、分词,如果存在为空则跳过
  417. candidate_word = candidate_word_cache_list[int(candidate_position)]
  418. if not candidate_word:
  419. continue
  420. candidate_word_stem_list = candidate_word_stem_cache_list[int(candidate_position)]
  421. if not candidate_word_stem_list:
  422. continue
  423. candidate_word = candidate_word.decode(CHARSET_UTF_8)
  424. candidate_word_stem_list = candidate_word_stem_list.decode(CHARSET_UTF_8).split(",")
  425. # 计算相关性
  426. try:
  427. val = utils.cal_cos_sim(main_word, main_word_stem_list, candidate_word,
  428. candidate_word_stem_list)
  429. if val >= agg_threshold:
  430. cal_result_list.append((candidate_position, candidate_word))
  431. except Exception as e:
  432. logging.error("主关键词:%s 发生异常,涉及的副关键词信息-关键词:%s,分词:%s" % (
  433. main_word, candidate_word, candidate_word_stem_list), e)
  434. return cal_result_list