| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 |
- # -*- coding:utf-8 -*-
- import datetime
- import os
- import math
- import pickle
- from time import sleep
- import jieba
- from multiprocessing import Process, Manager
- from zmq import QUEUE
- # TODO
- # 1. 研究jieba多进程切词(windows上无法使用自带的多进程切词功能)
- # 2. 进一步减少断点保存的次数(即调整保存间隔,或者全内存中)
- # 3. 加入在分词完成后保存结果以防丢失
- # 4. 在每个进程中分别保存分词结果,然后再统一合并
- # 待处理的数据文件路径
- DATA_FILE = './data/合并结果.txt'
- # 分词保存
- CUT_OUTPUT_FILE = './data/分词结果.txt'
- # 是否分词结束后保存结果
- IS_ASSUME_TOTAL = True
- # 是否断点续存
- IS_ASSUME = False
- # 是否测试模式
- IS_TEST_MODE = False
- # 测试使用的数据量
- TEST_DATA_NUM = 100 * 10000
- # 测试模式时断点续存的保存间隔
- TEST_SAVE_INTERNAL = 200
- # 编码
- ENCODING_CHARSET = "UTF-8"
- # 配置文件路径
- CONFIG_PATH = "./data/pkl/cut_config_%d.pkl"
- # 处理进程数量
- PROCESS_NUM = os.cpu_count()
- # 保存间隔(多久保存一次)
- SAVE_INTERNAL = TEST_SAVE_INTERNAL if IS_TEST_MODE else 1000000
- # 处理进度提醒间隔
- PROCESS_TIPS_INTERNAL = 10 * 10000
- def save_config(config_path, config_obj):
- """
- 保存配置文件
- """
- with open(config_path, "wb") as f:
- pickle.dump(config_obj, f)
- def load_config(config_path):
- """
- 加载配置文件
- """
- with open(config_path, "rb") as f:
- return pickle.load(f)
- def cut_word(word):
- """
- 分词
- """
- word_root = jieba.cut_for_search(word)
- return list(word_root)
- def multiprocess_cut_word(process_name, data_list, result_dict, config_path, cut_config):
- """
- 多进程进行分词处理
- """
- print('进程:%s -> 分词处理开始' % process_name)
- if (IS_ASSUME_TOTAL or IS_ASSUME) and os.path.exists(config_path) :
- cut_config = load_config(config_path)
- print("进程:%s -> 进断点恢复 当前状态:%s,开始处理位置:%d" % (process_name, cut_config["state"], cut_config["current_pos"]))
- if cut_config['state'] == 'run':
- # 获取带分词的数据
- lines = data_list[cut_config['current_pos']:cut_config['end_pos']]
- # 统计需要处理的数据量
- total_num = len(lines)
- print("进程:%s ->剩余待处理数量:%d" % (process_name, total_num))
- for i, line in enumerate(lines):
- # 数据处理
- line = line.replace("\n", "")
- # 分词
- cut_config["word_dict"][line]=cut_word(line)
- # 断点保存
- if IS_ASSUME and i > 0 and i % SAVE_INTERNAL == 0:
- cut_config["current_pos"] = cut_config["current_pos"] + SAVE_INTERNAL
- print("进程:%s -> 断点保存 当前状态:%s,当前处理位置:%d" % (process_name, cut_config["state"], cut_config["current_pos"]))
- save_config(config_path, cut_config)
-
- # 处理进度提示
- if i > 0 and i % PROCESS_TIPS_INTERNAL == 0:
- print("进程:%s -> 当前处理进度:%d / %d" % (process_name, i, total_num))
- # 最终结果保存
- if IS_ASSUME_TOTAL or IS_ASSUME:
- print("进程:%s -> 保存最终的分词结果" % process_name)
- cut_config["state"] = "end"
- cut_config["current_pos"] = cut_config['end_pos']
- save_config(config_path, cut_config)
-
- # result_dict.update(cut_config["word_dict"])
- result_dict[process_name]=cut_config["word_dict"]
- print('进程:%s -> 分词处理结束' % process_name)
- else :
- # result_dict.update(cut_config['word_dict'])
- result_dict[process_name]=cut_config["word_dict"]
- print('进程:%s -> 断点恢复,分词处理结束' % process_name)
- def main():
- print("开始时间:", datetime.datetime.now())
- print("配置:启动用断点续存,保存间隔:%d" % SAVE_INTERNAL if IS_ASSUME else "配置:不启用断点续存")
- print("配置:保存最终的分词结果" if IS_ASSUME_TOTAL else "配置:不保存最终的分词结果")
- # 处理进程容器
- process_list = []
- # 配置文件容器
- config_list = []
- # 设置多进程共享变量
- manager = Manager()
- # 多进程共享的数据源
- global_list = manager.list()
- # 多进程返回的结果
- result_dict = manager.dict()
- print("加载数据")
- with open(DATA_FILE, "r", encoding="UTF-8") as f:
- if IS_TEST_MODE:
- print("当前处于测试模式,测试数据量:%d" % TEST_DATA_NUM)
- global_list.extend(f.readlines()[:TEST_DATA_NUM])
- else:
- global_list.extend(f.readlines())
-
- total_len = len(global_list)
- count = math.ceil(total_len / PROCESS_NUM)
- print("待处理总数量:%d, 数量区间:%d" % (total_len, count))
- # 构造配置
- for i in range(PROCESS_NUM):
- start_pos = i * count
- end_pos = i * count + count
- if end_pos >= total_len :
- end_pos = -1
- cut_config = {
- "state": "run",
- "start_pos": start_pos,
- "current_pos": start_pos,
- "end_pos": end_pos,
- "word_dict": {}
- }
- config_list.append(cut_config)
- print("配置", config_list)
- for i, config in enumerate(config_list):
- p = Process(target=multiprocess_cut_word, args=("进程-%d" % i, global_list, result_dict, CONFIG_PATH % i, config))
- p.start()
- process_list.append(p)
- for p in process_list:
- p.join()
- print("合并最终的分词结果:开始")
- result = []
- print("处理成list便于写入文件")
- for (process_name, word_dict) in result_dict.items():
- tmp = None
- for (key, value) in word_dict.items():
- tmp = ["%s,%s\n" % (key, value) for (key, value) in word_dict.items() ]
- result.extend(tmp)
- print("写入文件")
- with open(CUT_OUTPUT_FILE, "w", encoding=ENCODING_CHARSET) as f:
- f.writelines(result)
- # with open(CUT_OUTPUT_FILE, "w", encoding=ENCODING_CHARSET) as f:
- # for (process_name, word_dict) in result_dict.items():
- # for (key, value) in word_dict.items():
- # f.write("%s,%s\n" % (key, value))
- # # f.write("\n")
- print("合并最终的分词结果:结束")
- print("结束时间:", datetime.datetime.now())
- def main2():
- print("开始时间:", datetime.datetime.now())
- with open(CUT_OUTPUT_FILE, "a", encoding=ENCODING_CHARSET) as f:
- for i in range(4):
- config_p = CONFIG_PATH % i
- print("时间:%s, 读取:%s —— 开始" % (datetime.datetime.now(), config_p))
- config = load_config(config_p)
- print("时间:%s, 读取:%s —— 结束" % (datetime.datetime.now(), config_p))
- print("时间:%s,写入文件 -- 开始"% datetime.datetime.now())
- for (key, value) in config["word_dict"].items():
- f.write("%s,%s\n" % (key, value))
- print("时间:%s,写入文件 -- 结束"% datetime.datetime.now())
- print("结束时间:", datetime.datetime.now())
- if __name__ == '__main__':
- main2()
|