# -*- coding:utf-8 -*- import datetime import os import math import jieba from multiprocessing import Process, Manager # 待处理的数据文件路径 DATA_FILE = './data/合并结果.txt' # 分词保存 CUT_OUTPUT_FILE = './data/分词结果.txt' # 消费者进程数量 CONSUMER_NUM = 1 # 生产者进程数量 # PRODUCER_NUM = os.cpu_count() - CONSUMER_NUM PRODUCER_NUM = 1 # 是否测试模式 IS_TEST_MODE = False # 测试使用的数据量 TEST_DATA_NUM = 100 * 10000 # 编码 ENCODING_CHARSET = "UTF-8" # 发送至消息队列的间隔 SEND_INTERNAL = 1 * 10000 # 处理进度提醒间隔 PROCESS_TIPS_INTERNAL = 10 * 10000 def cut_word(word): """ 分词 """ word_root = jieba.cut_for_search(word) return list(word_root) def consumer(queue): """ 消费者,把数据保存在指定位置 """ print("消费者:启动") with open(CUT_OUTPUT_FILE, "w", encoding=ENCODING_CHARSET) as f: while True: msg = queue.get() if "quit" == msg.get("command"): print("消费者:接收到结束命令") break if len(msg['payload']) > 0: for item in msg['payload']: f.write("%s,%s\n" % (item['key'], item['value'])) print("消费者:结束") def producer(data, queue, config): """ 多进程进行分词处理 """ process_name = config['process_name'] print('进程:%s -> 分词处理开始' % process_name) # 获取待分词的数据 lines = data[config['current_pos']:config['end_pos']] # 统计需要处理的数据量 total_num = len(lines) print("进程:%s ->剩余待处理数量:%d" % (process_name, total_num)) msg_content = { 'payload': [] } for i, line in enumerate(lines): # 数据处理 line = line.replace("\n", "") # 分词 word_root = cut_word(line) # msg_content['payload'].append({"key": line, "value": word_root}) if len(msg_content) >= SEND_INTERNAL: queue.put(msg_content) msg_content = { 'payload': [] } # 处理进度提示 if i > 0 and i % PROCESS_TIPS_INTERNAL == 0: print("进程:%s -> 当前处理进度:%d / %d" % (process_name, i, total_num)) queue.put(msg_content) print('进程:%s -> 分词处理结束' % process_name) def main(): print("开始时间:", datetime.datetime.now()) # 处理进程容器 process_list = [] # 配置文件容器 config_list = [] # 设置多进程共享变量 manager = Manager() # 多进程共享的数据源 global_list = manager.list() # 多进程通信队列 global_queue = manager.Queue() print("加载数据") with open(DATA_FILE, "r", encoding="UTF-8") as f: if IS_TEST_MODE: print("当前处于测试模式,测试数据量:%d" % TEST_DATA_NUM) global_list.extend(f.readlines()[:TEST_DATA_NUM]) else: global_list.extend(f.readlines()) total_len = len(global_list) count = math.ceil(total_len / PRODUCER_NUM) print("待处理总数量:%d, 数量区间:%d" % (total_len, count)) # 构造配置 for i in range(PRODUCER_NUM): start_pos = i * count end_pos = i * count + count if end_pos >= total_len : end_pos = None cut_config = { "start_pos": start_pos, "current_pos": start_pos, "end_pos": end_pos, "process_name": "线程-%d" % i } config_list.append(cut_config) print("配置", config_list) # 启动消费者 cosumer = Process(target=consumer, args=(global_queue,)) cosumer.start() # 启动生产者 for i, config in enumerate(config_list): p = Process(target=producer, args=(global_list, global_queue, config)) p.start() process_list.append(p) for p in process_list: p.join() # 给消费者发送结束指令 global_queue.put({"command":"quit"}) # 等待消费者结束执行 cosumer.join() print("结束时间:", datetime.datetime.now()) if __name__ == '__main__': main()