key_reverse_statistics.py 6.4 KB


  1. # -*- coding:utf-8 -*-
  2. from concurrent.futures import ProcessPoolExecutor, as_completed
  3. import mmap
  4. from multiprocessing.connection import wait
  5. import random
  6. import sys
  7. from time import sleep, time
  8. import os
  9. import config
  10. import tools
  11. import ast
  12. import re
  13. import stop_word
  14. import logging
  15. import math
  16. from multiprocessing import Process, Pool
  17. TITLE = "关键词倒排文件 统计"
  18. # def reverse_statistics(start_pos, end_pos):
  19. def handle(start_pos, end_pos):
  20. print("进程:%d, 统计开始,开始位置:%d,结束位置:%d" % (os.getpid(), start_pos, end_pos))
  21. # 统计信息容器
  22. reverse_statistics = {}
  23. with open(config.KEY_REVERSE_FILE, "r", encoding=config.ENCODING_CHARSET) as fr, \
  24. mmap.mmap(fr.fileno(), 0 , access=mmap.ACCESS_READ) as fmmap:
  25. # 调整开始位置
  26. fmmap.seek(start_pos)
  27. while True:
  28. cur_pos = fmmap.tell()
  29. # 越界检测
  30. if cur_pos >= end_pos:
  31. break
  32. line = fmmap.readline().decode(config.ENCODING_CHARSET)
  33. index=line.index(",")
  34. key = line[:index]
  35. word_root = line[index+1:]
  36. word_root = ast.literal_eval(word_root)
  37. l = len(word_root)
  38. reverse_statistics[key]=l
  39. logging.info("进程:%d, 统计结束" % os.getpid())
  40. return {
  41. "pid":os.getpid(),
  42. "statistics":reverse_statistics
  43. }
  44. def main2():
  45. # 日志信息配置
  46. tools.init_log()
  47. tools.log_start_msg(TITLE)
  48. # 进程数
  49. process_num = os.cpu_count()
  50. # 加载缓存索引文件
  51. key_reverse_index = tools.load_obj(config.KEY_REVERSE_INDEX_CACHE)
  52. # 对索引文件中的元素进行平分
  53. # 转成列表,计算总长 和 平分后的处理区间
  54. key_list = [key for key in key_reverse_index.keys()]
  55. key_list_len = len(key_list)
  56. internal = math.ceil(key_list_len / process_num )
  57. # 利用 缓存索引文件 生成处理区间的位置信息
  58. # 位置信息容器
  59. pos_list = []
  60. for i in range(process_num + 1):
  61. # 计算平分点在列表中的位置
  62. l_pos = i * internal
  63. # 如果超过列表大小需要额外处理
  64. if l_pos > key_list_len:
  65. l_pos = key_list_len -1
  66. # 获取列表中的词根
  67. key = key_list[l_pos:l_pos+1]
  68. # 根据词根获取位置信息
  69. pos = key_reverse_index[key[0]]
  70. # 记录位置信息
  71. pos_list.append(pos)
  72. # 使用用进程池
  73. pool = ProcessPoolExecutor(process_num)
  74. # 生成任务
  75. process_futures = []
  76. for i in range(0, len(pos_list)-1):
  77. pos = pos_list[i: i+2]
  78. process_futures.append(pool.submit(handle, pos[0], pos[1]))
  79. # with open(config.KEY_REVERSE_STATISTICS_FILE, "w", encoding=config.ENCODING_CHARSET) as fw:
  80. # for future in as_completed(process_futures):
  81. # logging.info("部分子任务统计结束,保存至本地 - 开始")
  82. # for key, value in future.result().items():
  83. # fw.write("%s,%s\n"%(key,value))
  84. # logging.info("部分子任务统计结束,保存至本地 - 结束")
  85. results = []
  86. for future in as_completed(process_futures):
  87. result = future.result()
  88. logging.info("进程:%d, 统计结束" % result["pid"])
  89. results.append(result)
  90. logging.info("统计结束,保存至本地 - 开始")
  91. with open(config.KEY_REVERSE_STATISTICS_FILE, "w", encoding=config.ENCODING_CHARSET) as fw:
  92. for r in results:
  93. for key, value in r["statistics"].items():
  94. fw.write("%s,%s\n"%(key,value))
  95. logging.info("部分子任务统计结束,保存至本地 - 结束")
  96. pool.shutdown(wait=True)
  97. tools.log_end_msg(TITLE)
  98. # 测试代码3
  99. # pool = ProcessPoolExecutor(3)
  100. # for i in range(1,5):
  101. # pool.submit(handle, "测试进程-%d"%i, i, i*10)
  102. # pool.shutdown(wait=True)
  103. # 测试代码2
  104. # pool = Pool(3)
  105. # for i in range(1,5):
  106. # pool.apply_async(handle, ("测试进程-%d"%i, i, i*10))
  107. # pool.close()
  108. # pool.join()
  109. # print("结束")
  110. # 测试代码1
  111. # p = Process(target=handle, args=('测试进程', 1, 10))
  112. # p.start()
  113. # p.join()
  114. # tools.init_log()
  115. # tools.log_start_msg(TITLE)
  116. # key_reverse_index = tools.load_obj(config.KEY_REVERSE_INDEX_CACHE)
  117. # tmp = [key for key in key_reverse_index.keys()]
  118. # l = len(tmp)
  119. # print("总长:", l)
  120. # internal = math.ceil(l / 4)
  121. # print("间隔:", internal)
  122. # pos = []
  123. # for i in range(5):
  124. # t = i*internal
  125. # if t > l:
  126. # t = l-1
  127. # pos.append(t)
  128. # print(pos)
  129. # for item in pos:
  130. # key = tmp[item:item+1]
  131. # print(key)
  132. # pos = key_reverse_index[key[0]]
  133. # print(key, pos)
  134. # reverse_statistics = {}
  135. # logging.info("统计开始")
  136. # with open(config.KEY_REVERSE_FILE, "r", encoding=config.ENCODING_CHARSET) as fr, \
  137. # mmap.mmap(fr.fileno(), 0 , access=mmap.ACCESS_READ) as fmmap:
  138. # for line in fr:
  139. # index=line.index(",")
  140. # key = line[:index]
  141. # word_root = line[index+1:]
  142. # word_root = ast.literal_eval(word_root)
  143. # l = len(word_root)
  144. # reverse_statistics[key]=l
  145. # logging.info("统计结束,保存至本地")
  146. # with open(config.KEY_REVERSE_STATISTICS_FILE, "w", encoding=config.ENCODING_CHARSET) as fw:
  147. # for key, value in reverse_statistics:
  148. # fw.write("%s,%s\n"%(key,value))
  149. # tools.log_end_msg(TITLE)
  150. def main():
  151. tools.init_log()
  152. tools.log_start_msg(TITLE)
  153. reverse_statistics = {}
  154. logging.info("统计开始")
  155. with open(config.KEY_REVERSE_FILE, "r", encoding=config.ENCODING_CHARSET) as fr, \
  156. mmap.mmap(fr.fileno(), 0 , access=mmap.ACCESS_READ) as fmmap:
  157. for line in fr:
  158. index=line.index(",")
  159. key = line[:index]
  160. word_root = line[index+1:]
  161. word_root = ast.literal_eval(word_root)
  162. l = len(word_root)
  163. reverse_statistics[key]=l
  164. logging.info("统计结束,保存至本地")
  165. with open(config.KEY_REVERSE_STATISTICS_FILE, "w", encoding=config.ENCODING_CHARSET) as fw:
  166. for key, value in reverse_statistics:
  167. fw.write("%s,%s\n"%(key,value))
  168. tools.log_end_msg(TITLE)
  169. if __name__ == "__main__":
  170. main2()