cut_multiprocess.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. # -*- coding:utf-8 -*-
  2. import datetime
  3. import os
  4. import math
  5. import pickle
  6. from time import sleep
  7. import jieba
  8. from multiprocessing import Process, Manager
  9. from zmq import QUEUE
  10. # TODO
  11. # 1. 研究jieba多进程切词(windows上无法使用自带的多进程切词功能)
  12. # 2. 进一步减少断点保存的次数(即调整保存间隔,或者全内存中)
  13. # 3. 加入在分词完成后保存结果以防丢失
  14. # 4. 在每个进程中分别保存分词结果,然后再统一合并
  15. # 待处理的数据文件路径
  16. DATA_FILE = './data/合并结果.txt'
  17. # 分词保存
  18. CUT_OUTPUT_FILE = './data/分词结果.txt'
  19. # 是否分词结束后保存结果
  20. IS_ASSUME_TOTAL = True
  21. # 是否断点续存
  22. IS_ASSUME = False
  23. # 是否测试模式
  24. IS_TEST_MODE = False
  25. # 测试使用的数据量
  26. TEST_DATA_NUM = 100 * 10000
  27. # 测试模式时断点续存的保存间隔
  28. TEST_SAVE_INTERNAL = 200
  29. # 编码
  30. ENCODING_CHARSET = "UTF-8"
  31. # 配置文件路径
  32. CONFIG_PATH = "./data/pkl/cut_config_%d.pkl"
  33. # 处理进程数量
  34. PROCESS_NUM = os.cpu_count()
  35. # 保存间隔(多久保存一次)
  36. SAVE_INTERNAL = TEST_SAVE_INTERNAL if IS_TEST_MODE else 1000000
  37. # 处理进度提醒间隔
  38. PROCESS_TIPS_INTERNAL = 10 * 10000
  39. def save_config(config_path, config_obj):
  40. """
  41. 保存配置文件
  42. """
  43. with open(config_path, "wb") as f:
  44. pickle.dump(config_obj, f)
  45. def load_config(config_path):
  46. """
  47. 加载配置文件
  48. """
  49. with open(config_path, "rb") as f:
  50. return pickle.load(f)
  51. def cut_word(word):
  52. """
  53. 分词
  54. """
  55. word_root = jieba.cut_for_search(word)
  56. return list(word_root)
  57. def multiprocess_cut_word(process_name, data_list, result_dict, config_path, cut_config):
  58. """
  59. 多进程进行分词处理
  60. """
  61. print('进程:%s -> 分词处理开始' % process_name)
  62. if (IS_ASSUME_TOTAL or IS_ASSUME) and os.path.exists(config_path) :
  63. cut_config = load_config(config_path)
  64. print("进程:%s -> 进断点恢复 当前状态:%s,开始处理位置:%d" % (process_name, cut_config["state"], cut_config["current_pos"]))
  65. if cut_config['state'] == 'run':
  66. # 获取带分词的数据
  67. lines = data_list[cut_config['current_pos']:cut_config['end_pos']]
  68. # 统计需要处理的数据量
  69. total_num = len(lines)
  70. print("进程:%s ->剩余待处理数量:%d" % (process_name, total_num))
  71. for i, line in enumerate(lines):
  72. # 数据处理
  73. line = line.replace("\n", "")
  74. # 分词
  75. cut_config["word_dict"][line]=cut_word(line)
  76. # 断点保存
  77. if IS_ASSUME and i > 0 and i % SAVE_INTERNAL == 0:
  78. cut_config["current_pos"] = cut_config["current_pos"] + SAVE_INTERNAL
  79. print("进程:%s -> 断点保存 当前状态:%s,当前处理位置:%d" % (process_name, cut_config["state"], cut_config["current_pos"]))
  80. save_config(config_path, cut_config)
  81. # 处理进度提示
  82. if i > 0 and i % PROCESS_TIPS_INTERNAL == 0:
  83. print("进程:%s -> 当前处理进度:%d / %d" % (process_name, i, total_num))
  84. # 最终结果保存
  85. if IS_ASSUME_TOTAL or IS_ASSUME:
  86. print("进程:%s -> 保存最终的分词结果" % process_name)
  87. cut_config["state"] = "end"
  88. cut_config["current_pos"] = cut_config['end_pos']
  89. save_config(config_path, cut_config)
  90. # result_dict.update(cut_config["word_dict"])
  91. result_dict[process_name]=cut_config["word_dict"]
  92. print('进程:%s -> 分词处理结束' % process_name)
  93. else :
  94. # result_dict.update(cut_config['word_dict'])
  95. result_dict[process_name]=cut_config["word_dict"]
  96. print('进程:%s -> 断点恢复,分词处理结束' % process_name)
  97. def main():
  98. print("开始时间:", datetime.datetime.now())
  99. print("配置:启动用断点续存,保存间隔:%d" % SAVE_INTERNAL if IS_ASSUME else "配置:不启用断点续存")
  100. print("配置:保存最终的分词结果" if IS_ASSUME_TOTAL else "配置:不保存最终的分词结果")
  101. # 处理进程容器
  102. process_list = []
  103. # 配置文件容器
  104. config_list = []
  105. # 设置多进程共享变量
  106. manager = Manager()
  107. # 多进程共享的数据源
  108. global_list = manager.list()
  109. # 多进程返回的结果
  110. result_dict = manager.dict()
  111. print("加载数据")
  112. with open(DATA_FILE, "r", encoding="UTF-8") as f:
  113. if IS_TEST_MODE:
  114. print("当前处于测试模式,测试数据量:%d" % TEST_DATA_NUM)
  115. global_list.extend(f.readlines()[:TEST_DATA_NUM])
  116. else:
  117. global_list.extend(f.readlines())
  118. total_len = len(global_list)
  119. count = math.ceil(total_len / PROCESS_NUM)
  120. print("待处理总数量:%d, 数量区间:%d" % (total_len, count))
  121. # 构造配置
  122. for i in range(PROCESS_NUM):
  123. start_pos = i * count
  124. end_pos = i * count + count
  125. if end_pos >= total_len :
  126. end_pos = -1
  127. cut_config = {
  128. "state": "run",
  129. "start_pos": start_pos,
  130. "current_pos": start_pos,
  131. "end_pos": end_pos,
  132. "word_dict": {}
  133. }
  134. config_list.append(cut_config)
  135. print("配置", config_list)
  136. for i, config in enumerate(config_list):
  137. p = Process(target=multiprocess_cut_word, args=("进程-%d" % i, global_list, result_dict, CONFIG_PATH % i, config))
  138. p.start()
  139. process_list.append(p)
  140. for p in process_list:
  141. p.join()
  142. print("合并最终的分词结果:开始")
  143. result = []
  144. print("处理成list便于写入文件")
  145. for (process_name, word_dict) in result_dict.items():
  146. tmp = None
  147. for (key, value) in word_dict.items():
  148. tmp = ["%s,%s\n" % (key, value) for (key, value) in word_dict.items() ]
  149. result.extend(tmp)
  150. print("写入文件")
  151. with open(CUT_OUTPUT_FILE, "w", encoding=ENCODING_CHARSET) as f:
  152. f.writelines(result)
  153. # with open(CUT_OUTPUT_FILE, "w", encoding=ENCODING_CHARSET) as f:
  154. # for (process_name, word_dict) in result_dict.items():
  155. # for (key, value) in word_dict.items():
  156. # f.write("%s,%s\n" % (key, value))
  157. # # f.write("\n")
  158. print("合并最终的分词结果:结束")
  159. print("结束时间:", datetime.datetime.now())
  160. def main2():
  161. print("开始时间:", datetime.datetime.now())
  162. with open(CUT_OUTPUT_FILE, "a", encoding=ENCODING_CHARSET) as f:
  163. for i in range(4):
  164. config_p = CONFIG_PATH % i
  165. print("时间:%s, 读取:%s —— 开始" % (datetime.datetime.now(), config_p))
  166. config = load_config(config_p)
  167. print("时间:%s, 读取:%s —— 结束" % (datetime.datetime.now(), config_p))
  168. print("时间:%s,写入文件 -- 开始"% datetime.datetime.now())
  169. for (key, value) in config["word_dict"].items():
  170. f.write("%s,%s\n" % (key, value))
  171. print("时间:%s,写入文件 -- 结束"% datetime.datetime.now())
  172. print("结束时间:", datetime.datetime.now())
  173. if __name__ == '__main__':
  174. main2()