key_reverse.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. # -*- coding:utf-8 -*-
  2. from concurrent.futures import ProcessPoolExecutor, as_completed
  3. import math
  4. from time import time
  5. import os
  6. import config
  7. import tools
  8. import re
  9. import logging
  10. import mmap
  11. tools.init_log()
  12. if __name__ != "__main__":
  13. # 正则提取
  14. # 倒排表 索引
  15. index_re = r"(\d+),"
  16. index_pattern = re.compile(index_re, re.I)
  17. # KEY表 词根
  18. stem_re = r"'([^,]*)'"
  19. stem_pattern = re.compile(stem_re, re.I)
  20. def sub_process(start_pos, end_pos):
  21. """
  22. 子进程
  23. """
  24. pid = os.getpid()
  25. logging.debug("进程-%d 开始执行任务,开始位置:%d,结束位置:%d" % (pid, start_pos, end_pos))
  26. # 开始时间
  27. start_time = time()
  28. # 倒排表和统计信息容器
  29. reverse_dict = {}
  30. with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as f_key, \
  31. mmap.mmap(f_key.fileno(), 0, access=mmap.ACCESS_READ) as f_mmap:
  32. # 移动到开始位置
  33. f_mmap.seek(start_pos)
  34. while True:
  35. # 获取当前处理位置
  36. cur_pos = f_mmap.tell()
  37. # 越界检查
  38. if cur_pos >= end_pos:
  39. break
  40. # 提取数据
  41. line = f_mmap.readline().decode(config.ENCODING_CHARSET)
  42. m = index_pattern.match(line)
  43. # 获取关键词序号、词根
  44. index = m.group(1)
  45. stems = stem_pattern.findall(line)
  46. # 构建倒排表和统计数据量
  47. for stem in stems:
  48. obj = reverse_dict.get(stem)
  49. if obj:
  50. obj["count"] = obj["count"] + 1
  51. obj["indexs"].add(index)
  52. else:
  53. tmp_indexs = set()
  54. tmp_indexs.add(index)
  55. reverse_dict[stem]= {
  56. "count": 1,
  57. "indexs": tmp_indexs
  58. }
  59. logging.debug("子进程-%d 任务结束,耗时:%f" % (pid, (time() - start_time)))
  60. return reverse_dict
  61. def main_process():
  62. logging.info("主进程 开始执行初始化")
  63. # 进程处理数
  64. process_num = 4
  65. # 关键表索引
  66. key_index = tools.load_obj(config.KEY_INDEX_CACHE)
  67. # 开始时间
  68. start_time = time()
  69. # 关键词总数
  70. total_num = len(key_index)
  71. # 任务分割大小
  72. split_num = math.ceil(total_num/process_num)
  73. logging.info("主进程 开始划分子任务")
  74. tasks = tools.avg_split_task(total_num, split_num)
  75. with ProcessPoolExecutor(process_num) as process_pool:
  76. logging.info("主进程 提交任务到子进程")
  77. process_futures = [process_pool.submit(sub_process, key_index[task[0]], key_index[task[1]]) for task in tasks]
  78. # 移除无效变量 以防占用内存
  79. del tasks
  80. del key_index
  81. # 倒排表和统计信息容器
  82. reverse_dict = {}
  83. # 进行数据合并
  84. for p_future in as_completed(process_futures):
  85. result = p_future.result()
  86. for key, val_obj in result.items():
  87. reverse_obj = reverse_dict.get(key)
  88. if reverse_obj:
  89. reverse_obj["count"] = reverse_obj["count"] + val_obj["count"]
  90. reverse_obj["indexs"] = reverse_obj["indexs"] | val_obj["indexs"]
  91. else:
  92. reverse_dict[key] = val_obj
  93. # 移除无效变量 以防占用内存
  94. process_futures.remove(p_future)
  95. logging.info("主进程 已获取全部子进程返回结果,总数据量:%d" % len(reverse_dict))
  96. logging.info("主进程 对词根关联的索引进行排序和转换")
  97. for val_obj in reverse_dict.values():
  98. val_obj["indexs"] = list(val_obj["indexs"])
  99. val_obj["indexs"].sort()
  100. # 根据关键词数量进行排序,这里通过items()方法转成元组列表,才能进行排序
  101. logging.info("主进程 根据关键词数量进行排列")
  102. sorted_reverse_list = sorted(reverse_dict.items(), key=lambda x: x[1]["count"], reverse=True)
  103. # 保存到本地文件
  104. logging.info("主进程 保存到本地")
  105. with open("./data/tmp/reverse_test.csv", "w", encoding=config.ENCODING_CHARSET) as f_reverse, \
  106. open(config.KEY_REVERSE_STATISTICS_FILE, "w", encoding=config.ENCODING_CHARSET) as f_statistics:
  107. for key, val_obj in sorted_reverse_list:
  108. f_reverse.write("%s,%s\n" % (key, val_obj["indexs"]))
  109. f_statistics.write("%s,%d\n" % (key, val_obj["count"]))
  110. logging.info("主进程 构建倒排索引耗时:%f" % (time() - start_time))
  111. if __name__ == "__main__":
  112. TITLE = "生成关键词倒排和统计信息"
  113. tools.log_start_msg(TITLE)
  114. main_process()
  115. tools.log_end_msg(TITLE)