|
|
@@ -1,24 +1,55 @@
|
|
|
# -*- coding:utf-8 -*-
|
|
|
|
|
|
+import datetime
|
|
|
import os
|
|
|
import math
|
|
|
import pickle
|
|
|
import jieba
|
|
|
from multiprocessing import Process, Manager
|
|
|
|
|
|
+# TODO
|
|
|
+# 1. 研究jieba多进程切词(windows上无法使用自带的多进程切词功能)
|
|
|
+# 2. 进一步减少断点保存的次数(即调整保存间隔,或者全内存中)
|
|
|
+# 3. 加入在分词完成后保存结果以防丢失
|
|
|
+# 4. 在每个进程中分别保存分词结果,然后再统一合并
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+# 待处理的数据文件路径
|
|
|
+DATA_FILE = './data/合并结果.txt'
|
|
|
+
|
|
|
+# 分词保存
|
|
|
+CUT_OUTPUT_FILE = './data/分词结果.txt'
|
|
|
+
|
|
|
+# 是否分词结束后保存结果
|
|
|
+IS_ASSUME_TOTAL = True
|
|
|
+
|
|
|
+# 是否断点续存
|
|
|
+IS_ASSUME = False
|
|
|
+
|
|
|
+# 是否测试模式
|
|
|
+IS_TEST_MODE = False
|
|
|
+
|
|
|
+# 测试使用的数据量
|
|
|
+TEST_DATA_NUM = 100 * 10000
|
|
|
+
|
|
|
+# 测试模式时断点续存的保存间隔
|
|
|
+TEST_SAVE_INTERNAL = 200
|
|
|
+
|
|
|
+# 编码
|
|
|
+ENCODING_CHARSET = "UTF-8"
|
|
|
+
|
|
|
+# 配置文件路径
|
|
|
+CONFIG_PATH = "./data/pkl/cut_config_%d.pkl"
|
|
|
+
|
|
|
# 处理进程数量
|
|
|
-PROCESS_NUM = 5
|
|
|
+PROCESS_NUM = os.cpu_count()
|
|
|
+
|
|
|
# 保存间隔(多久保存一次)
|
|
|
-SAVE_INTERNAL = 100000
|
|
|
-# 配置文件路径
|
|
|
-CONFIG_PATH = "./cut_config_%d.pkl"
|
|
|
-# 待处理的数据文件路径
|
|
|
-DATA_FILE = './merge.txt'
|
|
|
+SAVE_INTERNAL = TEST_SAVE_INTERNAL if IS_TEST_MODE else 1000000
|
|
|
|
|
|
-# 处理进程容器
|
|
|
-process_list = []
|
|
|
-# 配置文件容器
|
|
|
-config_list = []
|
|
|
+# 处理进度提醒间隔
|
|
|
+PROCESS_TIPS_INTERNAL = 10 * 10000
|
|
|
|
|
|
|
|
|
def save_config(config_path, config_obj):
|
|
|
@@ -43,7 +74,7 @@ def cut_word(word):
|
|
|
word_root = jieba.cut_for_search(word)
|
|
|
return list(word_root)
|
|
|
|
|
|
-def multiprocess_cut_word(process_name, config_path, cut_config):
|
|
|
+def multiprocess_cut_word(process_name, data_list, result_dict, config_path, cut_config):
|
|
|
|
|
|
"""
|
|
|
多进程进行分词处理
|
|
|
@@ -51,61 +82,127 @@ def multiprocess_cut_word(process_name, config_path, cut_config):
|
|
|
|
|
|
print('进程:%s -> 分词处理开始' % process_name)
|
|
|
|
|
|
- if os.path.exists(config_path) :
|
|
|
+ if (IS_ASSUME_TOTAL or IS_ASSUME) and os.path.exists(config_path) :
|
|
|
cut_config = load_config(config_path)
|
|
|
print("进程:%s -> 进断点恢复 当前状态:%s,开始处理位置:%d" % (process_name, cut_config["state"], cut_config["current_pos"]))
|
|
|
|
|
|
if cut_config['state'] == 'run':
|
|
|
- with open(DATA_FILE, "r", encoding="UTF-8") as f:
|
|
|
- lines = f.readlines()
|
|
|
- lines = lines[cut_config['current_pos']:cut_config['end_pos']]
|
|
|
- print("进程:%s ->剩余待处理数量:%d" % (process_name, len(lines)))
|
|
|
- for i, line in enumerate(lines):
|
|
|
- line = line.replace("\n", "")
|
|
|
- word_root = cut_word(line)
|
|
|
- cut_config["word_dict"][line]=word_root
|
|
|
-
|
|
|
- if i > 0 and i % SAVE_INTERNAL == 0:
|
|
|
- cut_config["current_pos"] = cut_config["current_pos"] + SAVE_INTERNAL
|
|
|
- print("进程:%s -> 保存位置 当前状态:%s,开始处理位置:%d" % (process_name, cut_config["state"], cut_config["current_pos"]))
|
|
|
- save_config(config_path, cut_config)
|
|
|
+
|
|
|
+ # 获取带分词的数据
|
|
|
+ lines = data_list[cut_config['current_pos']:cut_config['end_pos']]
|
|
|
+
|
|
|
+ # 统计需要处理的数据量
|
|
|
+ total_num = len(lines)
|
|
|
+ print("进程:%s ->剩余待处理数量:%d" % (process_name, total_num))
|
|
|
+
|
|
|
+ for i, line in enumerate(lines):
|
|
|
+ # 数据处理
|
|
|
+ line = line.replace("\n", "")
|
|
|
+ # 分词
|
|
|
+ cut_config["word_dict"][line]=cut_word(line)
|
|
|
+
|
|
|
+ # 断点保存
|
|
|
+ if IS_ASSUME and i > 0 and i % SAVE_INTERNAL == 0:
|
|
|
+ cut_config["current_pos"] = cut_config["current_pos"] + SAVE_INTERNAL
|
|
|
+ print("进程:%s -> 断点保存 当前状态:%s,当前处理位置:%d" % (process_name, cut_config["state"], cut_config["current_pos"]))
|
|
|
+ save_config(config_path, cut_config)
|
|
|
|
|
|
+ # 处理进度提示
|
|
|
+ if i > 0 and i % PROCESS_TIPS_INTERNAL == 0:
|
|
|
+ print("进程:%s -> 当前处理进度:%d / %d" % (process_name, i, total_num))
|
|
|
+
|
|
|
+ # 最终结果保存
|
|
|
+ if IS_ASSUME_TOTAL or IS_ASSUME:
|
|
|
+ print("进程:%s -> 保存最终的分词结果" % process_name)
|
|
|
cut_config["state"] = "end"
|
|
|
+ cut_config["current_pos"] = cut_config['end_pos']
|
|
|
save_config(config_path, cut_config)
|
|
|
+
|
|
|
+ # result_dict.update(cut_config["word_dict"])
|
|
|
+ result_dict[process_name]=cut_config["word_dict"]
|
|
|
|
|
|
- print('进程:%s -> 分词处理结束' % process_name)
|
|
|
+ print('进程:%s -> 分词处理结束' % process_name)
|
|
|
else :
|
|
|
- print('进程:%s -> 断点恢复 分词处理结束' % process_name)
|
|
|
+ # result_dict.update(cut_config['word_dict'])
|
|
|
+ result_dict[process_name]=cut_config["word_dict"]
|
|
|
+ print('进程:%s -> 断点恢复,分词处理结束' % process_name)
|
|
|
|
|
|
def main():
|
|
|
+
|
|
|
+ print("开始时间:", datetime.datetime.now())
|
|
|
+
|
|
|
+ print("配置:启动用断点续存,保存间隔:%d" % SAVE_INTERNAL if IS_ASSUME else "配置:不启用断点续存")
|
|
|
+ print("配置:保存最终的分词结果" if IS_ASSUME_TOTAL else "配置:不保存最终的分词结果")
|
|
|
+
|
|
|
+ # 处理进程容器
|
|
|
+ process_list = []
|
|
|
+ # 配置文件容器
|
|
|
+ config_list = []
|
|
|
+
|
|
|
+ # 设置多进程共享变量
|
|
|
+ manager = Manager()
|
|
|
+ # 多进程共享的数据源
|
|
|
+ global_list = manager.list()
|
|
|
+ # 多进程返回的结果
|
|
|
+ result_dict = manager.dict()
|
|
|
+
|
|
|
+ print("加载数据")
|
|
|
with open(DATA_FILE, "r", encoding="UTF-8") as f:
|
|
|
- lines = f.readlines()
|
|
|
- total_len = len(lines)
|
|
|
- count = math.ceil(total_len / PROCESS_NUM)
|
|
|
- print("总数量:%d, 数量区间:%d" % (total_len, count))
|
|
|
- for i in range(PROCESS_NUM):
|
|
|
- start_pos = i * count
|
|
|
- end_pos = i * count + count
|
|
|
- if end_pos >= total_len :
|
|
|
- end_pos = -1
|
|
|
- cut_config = {
|
|
|
- "state": "run",
|
|
|
- "start_pos": start_pos,
|
|
|
- "current_pos": start_pos,
|
|
|
- "end_pos": end_pos,
|
|
|
- "word_dict": {}
|
|
|
- }
|
|
|
- config_list.append(cut_config)
|
|
|
+ if IS_TEST_MODE:
|
|
|
+ print("当前处于测试模式,测试数据量:%d" % TEST_DATA_NUM)
|
|
|
+ global_list.extend(f.readlines()[:TEST_DATA_NUM])
|
|
|
+ else:
|
|
|
+ global_list.extend(f.readlines())
|
|
|
+
|
|
|
+ total_len = len(global_list)
|
|
|
+ count = math.ceil(total_len / PROCESS_NUM)
|
|
|
+ print("待处理总数量:%d, 数量区间:%d" % (total_len, count))
|
|
|
+
|
|
|
+ # 构造配置
|
|
|
+ for i in range(PROCESS_NUM):
|
|
|
+ start_pos = i * count
|
|
|
+ end_pos = i * count + count
|
|
|
+ if end_pos >= total_len :
|
|
|
+ end_pos = -1
|
|
|
+ cut_config = {
|
|
|
+ "state": "run",
|
|
|
+ "start_pos": start_pos,
|
|
|
+ "current_pos": start_pos,
|
|
|
+ "end_pos": end_pos,
|
|
|
+ "word_dict": {}
|
|
|
+ }
|
|
|
+ config_list.append(cut_config)
|
|
|
|
|
|
print("配置", config_list)
|
|
|
|
|
|
for i, config in enumerate(config_list):
|
|
|
- p = Process(target=multiprocess_cut_word, args=("进程-%d" % i, CONFIG_PATH % i, config))
|
|
|
+ p = Process(target=multiprocess_cut_word, args=("进程-%d" % i, global_list, result_dict, CONFIG_PATH % i, config))
|
|
|
p.start()
|
|
|
process_list.append(p)
|
|
|
|
|
|
for p in process_list:
|
|
|
p.join()
|
|
|
|
|
|
+ print("合并最终的分词结果:开始")
|
|
|
+
|
|
|
+ result = []
|
|
|
+ print("处理成list便于写入文件")
|
|
|
+ for (process_name, word_dict) in result_dict.items():
|
|
|
+ tmp = None
|
|
|
+ for (key, value) in word_dict.items():
|
|
|
+ tmp = ["%s,%s\n" % (key, value) for (key, value) in word_dict.items() ]
|
|
|
+ result.extend(tmp)
|
|
|
+ print("写入文件")
|
|
|
+ with open(CUT_OUTPUT_FILE, "w", encoding=ENCODING_CHARSET) as f:
|
|
|
+ f.writelines(result)
|
|
|
+ # with open(CUT_OUTPUT_FILE, "w", encoding=ENCODING_CHARSET) as f:
|
|
|
+ # for (process_name, word_dict) in result_dict.items():
|
|
|
+ # for (key, value) in word_dict.items():
|
|
|
+ # f.write("%s,%s\n" % (key, value))
|
|
|
+ # # f.write("\n")
|
|
|
+ print("合并最终的分词结果:结束")
|
|
|
+
|
|
|
+ print("结束时间:", datetime.datetime.now())
|
|
|
+
|
|
|
if __name__ == '__main__':
|
|
|
main()
|