Prechádzať zdrojové kódy

新增分词处理类

ChenGanBin 3 rokov pred
rodič
commit
634197f4b5
2 zmenil súbory, kde vykonal 198 pridanie a 2 odobranie
  1. 22 2
      cut_multiprocess.py
  2. 176 0
      cut_multiprocess2.py

+ 22 - 2
cut_multiprocess.py

@@ -4,9 +4,12 @@ import datetime
 import os
 import math
 import pickle
+from time import sleep
 import jieba
 from multiprocessing import Process, Manager
 
+from zmq import QUEUE
+
 # TODO
 # 1. 研究jieba多进程切词(windows上无法使用自带的多进程切词功能)
 # 2. 进一步减少断点保存的次数(即调整保存间隔,或者全内存中)
@@ -51,7 +54,6 @@ SAVE_INTERNAL = TEST_SAVE_INTERNAL if IS_TEST_MODE else 1000000
 # 处理进度提醒间隔
 PROCESS_TIPS_INTERNAL = 10 * 10000
 
-
 def save_config(config_path, config_obj):
     """
     保存配置文件
@@ -204,5 +206,23 @@ def main():
 
     print("结束时间:", datetime.datetime.now())
 
+def main2():
+    print("开始时间:", datetime.datetime.now())
+
+    with open(CUT_OUTPUT_FILE, "a", encoding=ENCODING_CHARSET) as f:
+        for i in range(4):
+            config_p = CONFIG_PATH % i
+            print("时间:%s, 读取:%s —— 开始" % (datetime.datetime.now(), config_p))
+            config = load_config(config_p)
+            print("时间:%s, 读取:%s —— 结束" % (datetime.datetime.now(), config_p))
+
+            print("时间:%s,写入文件 -- 开始"% datetime.datetime.now())
+            for (key, value) in config["word_dict"].items():
+                f.write("%s,%s\n" % (key, value))
+            print("时间:%s,写入文件 -- 结束"% datetime.datetime.now())
+
+    print("结束时间:", datetime.datetime.now())
+
+
 if __name__ == '__main__':
-    main()
+    main2()

+ 176 - 0
cut_multiprocess2.py

@@ -0,0 +1,176 @@
+# -*- coding:utf-8 -*-
+
+import datetime
+import os
+import math
+import jieba
+from multiprocessing import Process, Manager
+
+
+# 待处理的数据文件路径
+DATA_FILE = './data/合并结果.txt'
+
+# 分词保存
+CUT_OUTPUT_FILE = './data/分词结果.txt'
+
+# 消费者进程数量
+CONSUMER_NUM = 1
+
+# 生产者进程数量
+# PRODUCER_NUM = os.cpu_count() - CONSUMER_NUM
+PRODUCER_NUM = 1
+
+# 是否测试模式
+IS_TEST_MODE = False
+
+# 测试使用的数据量
+TEST_DATA_NUM = 100 * 10000
+
+# 编码
+ENCODING_CHARSET = "UTF-8"
+
+# 发送至消息队列的间隔
+SEND_INTERNAL = 1 * 10000
+
+# 处理进度提醒间隔
+PROCESS_TIPS_INTERNAL = 10 * 10000
+
+
+def cut_word(word):
+    """
+    分词
+    """
+    word_root = jieba.cut_for_search(word)
+    return list(word_root)
+
+def consumer(queue):
+    """
+    消费者,把数据保存在指定位置
+    """
+
+    print("消费者:启动")
+
+    with open(CUT_OUTPUT_FILE, "w", encoding=ENCODING_CHARSET) as f:
+        
+        while True:
+
+            msg = queue.get()
+
+            if "quit" == msg.get("command"):
+                print("消费者:接收到结束命令")
+                break
+
+            if len(msg['payload']) > 0:
+                for item in msg['payload']:
+                    f.write("%s,%s\n" % (item['key'], item['value']))
+    
+    print("消费者:结束")
+
+def producer(data, queue, config):
+    """
+    多进程进行分词处理
+    """
+
+    process_name = config['process_name']
+
+    print('进程:%s -> 分词处理开始' % process_name)
+
+    # 获取待分词的数据
+    lines = data[config['current_pos']:config['end_pos']]
+
+    # 统计需要处理的数据量
+    total_num = len(lines)
+    print("进程:%s ->剩余待处理数量:%d" % (process_name, total_num))
+
+    msg_content = {
+        'payload': []
+    }
+
+    for i, line in enumerate(lines):
+        # 数据处理
+        line = line.replace("\n", "")
+        # 分词
+        word_root = cut_word(line)
+        # 
+        msg_content['payload'].append({"key": line, "value": word_root})
+        
+        if len(msg_content) >= SEND_INTERNAL:
+            queue.put(msg_content)
+            msg_content = {
+                'payload': []
+            }
+        # 处理进度提示
+        if i > 0 and i % PROCESS_TIPS_INTERNAL == 0:
+            print("进程:%s -> 当前处理进度:%d / %d" % (process_name, i, total_num))
+    
+    queue.put(msg_content)
+
+    print('进程:%s -> 分词处理结束' % process_name)
+
+def main():
+
+    print("开始时间:", datetime.datetime.now())
+
+    # 处理进程容器
+    process_list = []
+    # 配置文件容器
+    config_list = []
+
+    # 设置多进程共享变量
+    manager = Manager()
+    # 多进程共享的数据源
+    global_list = manager.list()
+    # 多进程通信队列
+    global_queue = manager.Queue()
+
+    print("加载数据")
+    with open(DATA_FILE, "r", encoding="UTF-8") as f:
+        if IS_TEST_MODE:
+            print("当前处于测试模式,测试数据量:%d" % TEST_DATA_NUM)
+            global_list.extend(f.readlines()[:TEST_DATA_NUM])
+        else:
+            global_list.extend(f.readlines())
+    
+    total_len = len(global_list)
+    count = math.ceil(total_len / PRODUCER_NUM)
+    print("待处理总数量:%d, 数量区间:%d" % (total_len, count))
+
+    # 构造配置
+    for i in range(PRODUCER_NUM):
+        start_pos = i * count
+        end_pos = i * count + count
+        if end_pos >= total_len :
+            end_pos = None
+        cut_config = {
+            "start_pos": start_pos,
+            "current_pos": start_pos,
+            "end_pos": end_pos,
+            "process_name": "线程-%d" % i
+        }
+        config_list.append(cut_config)
+
+    print("配置", config_list)
+
+    # 启动消费者
+    cosumer = Process(target=consumer, args=(global_queue,))
+    cosumer.start()
+
+    # 启动生产者
+    for i, config in enumerate(config_list):
+        p = Process(target=producer, args=(global_list, global_queue, config))
+        p.start()
+        process_list.append(p)
+
+    for p in process_list:
+        p.join()
+    
+    # 给消费者发送结束指令
+    global_queue.put({"command":"quit"})
+    # 等待消费者结束执行
+    cosumer.join()
+
+    print("结束时间:", datetime.datetime.now())
+
+
+if __name__ == '__main__':
+    main()