# -*- coding:utf-8 -*- import datetime import os import math import pickle from time import sleep import jieba from multiprocessing import Process, Manager from zmq import QUEUE # TODO # 1. 研究jieba多进程切词(windows上无法使用自带的多进程切词功能) # 2. 进一步减少断点保存的次数(即调整保存间隔,或者全内存中) # 3. 加入在分词完成后保存结果以防丢失 # 4. 在每个进程中分别保存分词结果,然后再统一合并 # 待处理的数据文件路径 DATA_FILE = './data/合并结果.txt' # 分词保存 CUT_OUTPUT_FILE = './data/分词结果.txt' # 是否分词结束后保存结果 IS_ASSUME_TOTAL = True # 是否断点续存 IS_ASSUME = False # 是否测试模式 IS_TEST_MODE = False # 测试使用的数据量 TEST_DATA_NUM = 100 * 10000 # 测试模式时断点续存的保存间隔 TEST_SAVE_INTERNAL = 200 # 编码 ENCODING_CHARSET = "UTF-8" # 配置文件路径 CONFIG_PATH = "./data/pkl/cut_config_%d.pkl" # 处理进程数量 PROCESS_NUM = os.cpu_count() # 保存间隔(多久保存一次) SAVE_INTERNAL = TEST_SAVE_INTERNAL if IS_TEST_MODE else 1000000 # 处理进度提醒间隔 PROCESS_TIPS_INTERNAL = 10 * 10000 def save_config(config_path, config_obj): """ 保存配置文件 """ with open(config_path, "wb") as f: pickle.dump(config_obj, f) def load_config(config_path): """ 加载配置文件 """ with open(config_path, "rb") as f: return pickle.load(f) def cut_word(word): """ 分词 """ word_root = jieba.cut_for_search(word) return list(word_root) def multiprocess_cut_word(process_name, data_list, result_dict, config_path, cut_config): """ 多进程进行分词处理 """ print('进程:%s -> 分词处理开始' % process_name) if (IS_ASSUME_TOTAL or IS_ASSUME) and os.path.exists(config_path) : cut_config = load_config(config_path) print("进程:%s -> 进断点恢复 当前状态:%s,开始处理位置:%d" % (process_name, cut_config["state"], cut_config["current_pos"])) if cut_config['state'] == 'run': # 获取带分词的数据 lines = data_list[cut_config['current_pos']:cut_config['end_pos']] # 统计需要处理的数据量 total_num = len(lines) print("进程:%s ->剩余待处理数量:%d" % (process_name, total_num)) for i, line in enumerate(lines): # 数据处理 line = line.replace("\n", "") # 分词 cut_config["word_dict"][line]=cut_word(line) # 断点保存 if IS_ASSUME and i > 0 and i % SAVE_INTERNAL == 0: cut_config["current_pos"] = cut_config["current_pos"] + SAVE_INTERNAL print("进程:%s -> 断点保存 当前状态:%s,当前处理位置:%d" % (process_name, cut_config["state"], cut_config["current_pos"])) save_config(config_path, cut_config) # 处理进度提示 if i > 0 and i % PROCESS_TIPS_INTERNAL == 0: print("进程:%s -> 当前处理进度:%d / %d" % (process_name, i, total_num)) # 最终结果保存 if IS_ASSUME_TOTAL or IS_ASSUME: print("进程:%s -> 保存最终的分词结果" % process_name) cut_config["state"] = "end" cut_config["current_pos"] = cut_config['end_pos'] save_config(config_path, cut_config) # result_dict.update(cut_config["word_dict"]) result_dict[process_name]=cut_config["word_dict"] print('进程:%s -> 分词处理结束' % process_name) else : # result_dict.update(cut_config['word_dict']) result_dict[process_name]=cut_config["word_dict"] print('进程:%s -> 断点恢复,分词处理结束' % process_name) def main(): print("开始时间:", datetime.datetime.now()) print("配置:启动用断点续存,保存间隔:%d" % SAVE_INTERNAL if IS_ASSUME else "配置:不启用断点续存") print("配置:保存最终的分词结果" if IS_ASSUME_TOTAL else "配置:不保存最终的分词结果") # 处理进程容器 process_list = [] # 配置文件容器 config_list = [] # 设置多进程共享变量 manager = Manager() # 多进程共享的数据源 global_list = manager.list() # 多进程返回的结果 result_dict = manager.dict() print("加载数据") with open(DATA_FILE, "r", encoding="UTF-8") as f: if IS_TEST_MODE: print("当前处于测试模式,测试数据量:%d" % TEST_DATA_NUM) global_list.extend(f.readlines()[:TEST_DATA_NUM]) else: global_list.extend(f.readlines()) total_len = len(global_list) count = math.ceil(total_len / PROCESS_NUM) print("待处理总数量:%d, 数量区间:%d" % (total_len, count)) # 构造配置 for i in range(PROCESS_NUM): start_pos = i * count end_pos = i * count + count if end_pos >= total_len : end_pos = -1 cut_config = { "state": "run", "start_pos": start_pos, "current_pos": start_pos, "end_pos": end_pos, "word_dict": {} } config_list.append(cut_config) print("配置", config_list) for i, config in enumerate(config_list): p = Process(target=multiprocess_cut_word, args=("进程-%d" % i, global_list, result_dict, CONFIG_PATH % i, config)) p.start() process_list.append(p) for p in process_list: p.join() print("合并最终的分词结果:开始") result = [] print("处理成list便于写入文件") for (process_name, word_dict) in result_dict.items(): tmp = None for (key, value) in word_dict.items(): tmp = ["%s,%s\n" % (key, value) for (key, value) in word_dict.items() ] result.extend(tmp) print("写入文件") with open(CUT_OUTPUT_FILE, "w", encoding=ENCODING_CHARSET) as f: f.writelines(result) # with open(CUT_OUTPUT_FILE, "w", encoding=ENCODING_CHARSET) as f: # for (process_name, word_dict) in result_dict.items(): # for (key, value) in word_dict.items(): # f.write("%s,%s\n" % (key, value)) # # f.write("\n") print("合并最终的分词结果:结束") print("结束时间:", datetime.datetime.now()) def main2(): print("开始时间:", datetime.datetime.now()) with open(CUT_OUTPUT_FILE, "a", encoding=ENCODING_CHARSET) as f: for i in range(4): config_p = CONFIG_PATH % i print("时间:%s, 读取:%s —— 开始" % (datetime.datetime.now(), config_p)) config = load_config(config_p) print("时间:%s, 读取:%s —— 结束" % (datetime.datetime.now(), config_p)) print("时间:%s,写入文件 -- 开始"% datetime.datetime.now()) for (key, value) in config["word_dict"].items(): f.write("%s,%s\n" % (key, value)) print("时间:%s,写入文件 -- 结束"% datetime.datetime.now()) print("结束时间:", datetime.datetime.now()) if __name__ == '__main__': main2()