cut_multiprocess.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. # -*- coding:utf-8 -*-
  2. import os
  3. import math
  4. import pickle
  5. import jieba
  6. from multiprocessing import Process, Manager
  7. # 处理进程数量
  8. PROCESS_NUM = 5
  9. # 保存间隔(多久保存一次)
  10. SAVE_INTERNAL = 100000
  11. # 配置文件路径
  12. CONFIG_PATH = "./cut_config_%d.pkl"
  13. # 待处理的数据文件路径
  14. DATA_FILE = './merge.txt'
  15. # 处理进程容器
  16. process_list = []
  17. # 配置文件容器
  18. config_list = []
  19. def save_config(config_path, config_obj):
  20. """
  21. 保存配置文件
  22. """
  23. with open(config_path, "wb") as f:
  24. pickle.dump(config_obj, f)
  25. def load_config(config_path):
  26. """
  27. 加载配置文件
  28. """
  29. with open(config_path, "rb") as f:
  30. return pickle.load(f)
  31. def cut_word(word):
  32. """
  33. 分词
  34. """
  35. word_root = jieba.cut_for_search(word)
  36. return list(word_root)
  37. def multiprocess_cut_word(process_name, config_path, cut_config):
  38. """
  39. 多进程进行分词处理
  40. """
  41. print('进程:%s -> 分词处理开始' % process_name)
  42. if os.path.exists(config_path) :
  43. cut_config = load_config(config_path)
  44. print("进程:%s -> 进断点恢复 当前状态:%s,开始处理位置:%d" % (process_name, cut_config["state"], cut_config["current_pos"]))
  45. if cut_config['state'] == 'run':
  46. with open(DATA_FILE, "r", encoding="UTF-8") as f:
  47. lines = f.readlines()
  48. lines = lines[cut_config['current_pos']:cut_config['end_pos']]
  49. print("进程:%s ->剩余待处理数量:%d" % (process_name, len(lines)))
  50. for i, line in enumerate(lines):
  51. line = line.replace("\n", "")
  52. word_root = cut_word(line)
  53. cut_config["word_dict"][line]=word_root
  54. if i > 0 and i % SAVE_INTERNAL == 0:
  55. cut_config["current_pos"] = cut_config["current_pos"] + SAVE_INTERNAL
  56. print("进程:%s -> 保存位置 当前状态:%s,开始处理位置:%d" % (process_name, cut_config["state"], cut_config["current_pos"]))
  57. save_config(config_path, cut_config)
  58. cut_config["state"] = "end"
  59. save_config(config_path, cut_config)
  60. print('进程:%s -> 分词处理结束' % process_name)
  61. else :
  62. print('进程:%s -> 断点恢复 分词处理结束' % process_name)
  63. def main():
  64. with open(DATA_FILE, "r", encoding="UTF-8") as f:
  65. lines = f.readlines()
  66. total_len = len(lines)
  67. count = math.ceil(total_len / PROCESS_NUM)
  68. print("总数量:%d, 数量区间:%d" % (total_len, count))
  69. for i in range(PROCESS_NUM):
  70. start_pos = i * count
  71. end_pos = i * count + count
  72. if end_pos >= total_len :
  73. end_pos = -1
  74. cut_config = {
  75. "state": "run",
  76. "start_pos": start_pos,
  77. "current_pos": start_pos,
  78. "end_pos": end_pos,
  79. "word_dict": {}
  80. }
  81. config_list.append(cut_config)
  82. print("配置", config_list)
  83. for i, config in enumerate(config_list):
  84. p = Process(target=multiprocess_cut_word, args=("进程-%d" % i, CONFIG_PATH % i, config))
  85. p.start()
  86. process_list.append(p)
  87. for p in process_list:
  88. p.join()
  89. if __name__ == '__main__':
  90. main()