key.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. # -*- coding:utf-8 -*-
  2. from concurrent.futures import ProcessPoolExecutor, as_completed
  3. import logging
  4. import os
  5. from time import time
  6. import config
  7. import tools
  8. import jieba
  9. import mmap
  10. # 优化
  11. # 1. 更改为使用多进程
  12. # 日志配置初始化
  13. tools.init_log()
  14. def sub_process(start_pos, end_pos, stop_word):
  15. """
  16. 子进程
  17. """
  18. pid = os.getpid()
  19. logging.debug("子进程-%d 开始执行分词任务,开始位置:%d,结束位置:%d" % (pid, start_pos, end_pos))
  20. # 临时容器
  21. tmp_list = []
  22. # 开始时间
  23. start_time = time()
  24. with open(config.MERGE_FILE, "r", encoding=config.ENCODING_CHARSET) as f, \
  25. mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
  26. fmmap.seek(start_pos)
  27. while True:
  28. # 越界检测
  29. cur_pos = fmmap.tell()
  30. if cur_pos >= end_pos:
  31. break
  32. # 读取关键词
  33. key = fmmap.readline().decode("UTF-8").replace("\r","").replace("\n","")
  34. # 读取不到任何内容结束执行
  35. if not key :
  36. continue
  37. # 分词
  38. tmp_stems = list(jieba.cut_for_search(key))
  39. # 排除停用词
  40. stems = set()
  41. for stem in tmp_stems:
  42. if stem in stop_word:
  43. continue
  44. stems.add(stem)
  45. # 以防止词根数为0
  46. if len(stems) == 0:
  47. continue
  48. tmp_list.append((key , list(stems)))
  49. logging.debug("子进程-%d 执行分词任务结束,耗时:%f" % (pid, (time() - start_time)))
  50. return tmp_list
  51. def main_process():
  52. """
  53. 主进程
  54. """
  55. # 进程池数
  56. process_num = 4
  57. # 任务分割大小
  58. split_num = 500000
  59. # 位置信息索引
  60. pos_index = []
  61. # 总关键词数量
  62. total_num = 0
  63. # 加载停用词
  64. stop_word = tools.load_stop_word()
  65. start_time = time()
  66. # 记录位置信息
  67. logging.info("主进程 开始构建位置索引信息")
  68. with open(config.MERGE_FILE, "r", encoding=config.ENCODING_CHARSET) as f, \
  69. mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
  70. while True:
  71. # 获取当前位置
  72. cur_pos = fmmap.tell()
  73. # 移动到一下行
  74. line = fmmap.readline()
  75. # 结束检测
  76. if not line:
  77. break
  78. # 记录
  79. pos_index.append(cur_pos)
  80. # 计算总关键词数量
  81. total_num = len(pos_index)
  82. # 划分子任务
  83. logging.info("主进程 开始划分子任务")
  84. tasks = tools.avg_split_task(total_num, split_num)
  85. with ProcessPoolExecutor(process_num) as process_pool, \
  86. open(config.KEY_FILE, "w", encoding=config.ENCODING_CHARSET) as f_key:
  87. logging.info("主进程 提交任务到子进程")
  88. process_futures = [process_pool.submit(sub_process, pos_index[task[0]], pos_index[task[1]], stop_word) for task in tasks]
  89. # 移除无效变量 以防占用内存
  90. del pos_index
  91. del tasks
  92. # 序号计算
  93. count = -1
  94. for p_future in as_completed(process_futures):
  95. result = p_future.result()
  96. if result:
  97. for key, stems in result:
  98. count = count + 1
  99. # 写入文件中
  100. f_key.write("%d,%s,%s\n"%(count, key, list(stems)))
  101. # 移除无效变量 以防占用内存
  102. process_futures.remove(p_future)
  103. logging.info("主进程 构建KEY表耗时:%f" % (time() - start_time))
  104. if __name__ == '__main__':
  105. TITLE = "关键词表 生成"
  106. tools.log_start_msg(TITLE)
  107. main_process()
  108. tools.log_end_msg(TITLE)