cut_multiprocess2.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. # -*- coding:utf-8 -*-
  2. import datetime
  3. import os
  4. import math
  5. import jieba
  6. from multiprocessing import Process, Manager
  7. # 待处理的数据文件路径
  8. DATA_FILE = './data/合并结果.txt'
  9. # 分词保存
  10. CUT_OUTPUT_FILE = './data/分词结果.txt'
  11. # 消费者进程数量
  12. CONSUMER_NUM = 1
  13. # 生产者进程数量
  14. # PRODUCER_NUM = os.cpu_count() - CONSUMER_NUM
  15. PRODUCER_NUM = 1
  16. # 是否测试模式
  17. IS_TEST_MODE = False
  18. # 测试使用的数据量
  19. TEST_DATA_NUM = 100 * 10000
  20. # 编码
  21. ENCODING_CHARSET = "UTF-8"
  22. # 发送至消息队列的间隔
  23. SEND_INTERNAL = 1 * 10000
  24. # 处理进度提醒间隔
  25. PROCESS_TIPS_INTERNAL = 10 * 10000
  26. def cut_word(word):
  27. """
  28. 分词
  29. """
  30. word_root = jieba.cut_for_search(word)
  31. return list(word_root)
  32. def consumer(queue):
  33. """
  34. 消费者,把数据保存在指定位置
  35. """
  36. print("消费者:启动")
  37. with open(CUT_OUTPUT_FILE, "w", encoding=ENCODING_CHARSET) as f:
  38. while True:
  39. msg = queue.get()
  40. if "quit" == msg.get("command"):
  41. print("消费者:接收到结束命令")
  42. break
  43. if len(msg['payload']) > 0:
  44. for item in msg['payload']:
  45. f.write("%s,%s\n" % (item['key'], item['value']))
  46. print("消费者:结束")
  47. def producer(data, queue, config):
  48. """
  49. 多进程进行分词处理
  50. """
  51. process_name = config['process_name']
  52. print('进程:%s -> 分词处理开始' % process_name)
  53. # 获取待分词的数据
  54. lines = data[config['current_pos']:config['end_pos']]
  55. # 统计需要处理的数据量
  56. total_num = len(lines)
  57. print("进程:%s ->剩余待处理数量:%d" % (process_name, total_num))
  58. msg_content = {
  59. 'payload': []
  60. }
  61. for i, line in enumerate(lines):
  62. # 数据处理
  63. line = line.replace("\n", "")
  64. # 分词
  65. word_root = cut_word(line)
  66. #
  67. msg_content['payload'].append({"key": line, "value": word_root})
  68. if len(msg_content) >= SEND_INTERNAL:
  69. queue.put(msg_content)
  70. msg_content = {
  71. 'payload': []
  72. }
  73. # 处理进度提示
  74. if i > 0 and i % PROCESS_TIPS_INTERNAL == 0:
  75. print("进程:%s -> 当前处理进度:%d / %d" % (process_name, i, total_num))
  76. queue.put(msg_content)
  77. print('进程:%s -> 分词处理结束' % process_name)
  78. def main():
  79. print("开始时间:", datetime.datetime.now())
  80. # 处理进程容器
  81. process_list = []
  82. # 配置文件容器
  83. config_list = []
  84. # 设置多进程共享变量
  85. manager = Manager()
  86. # 多进程共享的数据源
  87. global_list = manager.list()
  88. # 多进程通信队列
  89. global_queue = manager.Queue()
  90. print("加载数据")
  91. with open(DATA_FILE, "r", encoding="UTF-8") as f:
  92. if IS_TEST_MODE:
  93. print("当前处于测试模式,测试数据量:%d" % TEST_DATA_NUM)
  94. global_list.extend(f.readlines()[:TEST_DATA_NUM])
  95. else:
  96. global_list.extend(f.readlines())
  97. total_len = len(global_list)
  98. count = math.ceil(total_len / PRODUCER_NUM)
  99. print("待处理总数量:%d, 数量区间:%d" % (total_len, count))
  100. # 构造配置
  101. for i in range(PRODUCER_NUM):
  102. start_pos = i * count
  103. end_pos = i * count + count
  104. if end_pos >= total_len :
  105. end_pos = None
  106. cut_config = {
  107. "start_pos": start_pos,
  108. "current_pos": start_pos,
  109. "end_pos": end_pos,
  110. "process_name": "线程-%d" % i
  111. }
  112. config_list.append(cut_config)
  113. print("配置", config_list)
  114. # 启动消费者
  115. cosumer = Process(target=consumer, args=(global_queue,))
  116. cosumer.start()
  117. # 启动生产者
  118. for i, config in enumerate(config_list):
  119. p = Process(target=producer, args=(global_list, global_queue, config))
  120. p.start()
  121. process_list.append(p)
  122. for p in process_list:
  123. p.join()
  124. # 给消费者发送结束指令
  125. global_queue.put({"command":"quit"})
  126. # 等待消费者结束执行
  127. cosumer.join()
  128. print("结束时间:", datetime.datetime.now())
  129. if __name__ == '__main__':
  130. main()