| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- # -*- coding:utf-8 -*-
- import datetime
- import os
- import math
- import jieba
- from multiprocessing import Process, Manager
- # 待处理的数据文件路径
- DATA_FILE = './data/合并结果.txt'
- # 分词保存
- CUT_OUTPUT_FILE = './data/分词结果.txt'
- # 消费者进程数量
- CONSUMER_NUM = 1
- # 生产者进程数量
- # PRODUCER_NUM = os.cpu_count() - CONSUMER_NUM
- PRODUCER_NUM = 1
- # 是否测试模式
- IS_TEST_MODE = False
- # 测试使用的数据量
- TEST_DATA_NUM = 100 * 10000
- # 编码
- ENCODING_CHARSET = "UTF-8"
- # 发送至消息队列的间隔
- SEND_INTERNAL = 1 * 10000
- # 处理进度提醒间隔
- PROCESS_TIPS_INTERNAL = 10 * 10000
- def cut_word(word):
- """
- 分词
- """
- word_root = jieba.cut_for_search(word)
- return list(word_root)
- def consumer(queue):
- """
- 消费者,把数据保存在指定位置
- """
- print("消费者:启动")
- with open(CUT_OUTPUT_FILE, "w", encoding=ENCODING_CHARSET) as f:
-
- while True:
- msg = queue.get()
- if "quit" == msg.get("command"):
- print("消费者:接收到结束命令")
- break
- if len(msg['payload']) > 0:
- for item in msg['payload']:
- f.write("%s,%s\n" % (item['key'], item['value']))
-
- print("消费者:结束")
- def producer(data, queue, config):
- """
- 多进程进行分词处理
- """
- process_name = config['process_name']
- print('进程:%s -> 分词处理开始' % process_name)
- # 获取待分词的数据
- lines = data[config['current_pos']:config['end_pos']]
- # 统计需要处理的数据量
- total_num = len(lines)
- print("进程:%s ->剩余待处理数量:%d" % (process_name, total_num))
- msg_content = {
- 'payload': []
- }
- for i, line in enumerate(lines):
- # 数据处理
- line = line.replace("\n", "")
- # 分词
- word_root = cut_word(line)
- #
- msg_content['payload'].append({"key": line, "value": word_root})
-
- if len(msg_content) >= SEND_INTERNAL:
- queue.put(msg_content)
- msg_content = {
- 'payload': []
- }
- # 处理进度提示
- if i > 0 and i % PROCESS_TIPS_INTERNAL == 0:
- print("进程:%s -> 当前处理进度:%d / %d" % (process_name, i, total_num))
-
- queue.put(msg_content)
- print('进程:%s -> 分词处理结束' % process_name)
- def main():
- print("开始时间:", datetime.datetime.now())
- # 处理进程容器
- process_list = []
- # 配置文件容器
- config_list = []
- # 设置多进程共享变量
- manager = Manager()
- # 多进程共享的数据源
- global_list = manager.list()
- # 多进程通信队列
- global_queue = manager.Queue()
- print("加载数据")
- with open(DATA_FILE, "r", encoding="UTF-8") as f:
- if IS_TEST_MODE:
- print("当前处于测试模式,测试数据量:%d" % TEST_DATA_NUM)
- global_list.extend(f.readlines()[:TEST_DATA_NUM])
- else:
- global_list.extend(f.readlines())
-
- total_len = len(global_list)
- count = math.ceil(total_len / PRODUCER_NUM)
- print("待处理总数量:%d, 数量区间:%d" % (total_len, count))
- # 构造配置
- for i in range(PRODUCER_NUM):
- start_pos = i * count
- end_pos = i * count + count
- if end_pos >= total_len :
- end_pos = None
- cut_config = {
- "start_pos": start_pos,
- "current_pos": start_pos,
- "end_pos": end_pos,
- "process_name": "线程-%d" % i
- }
- config_list.append(cut_config)
- print("配置", config_list)
- # 启动消费者
- cosumer = Process(target=consumer, args=(global_queue,))
- cosumer.start()
- # 启动生产者
- for i, config in enumerate(config_list):
- p = Process(target=producer, args=(global_list, global_queue, config))
- p.start()
- process_list.append(p)
- for p in process_list:
- p.join()
-
- # 给消费者发送结束指令
- global_queue.put({"command":"quit"})
- # 等待消费者结束执行
- cosumer.join()
- print("结束时间:", datetime.datetime.now())
- if __name__ == '__main__':
- main()
|