|
|
@@ -1,228 +0,0 @@
|
|
|
-# -*- coding:utf-8 -*-
|
|
|
-
|
|
|
-import datetime
|
|
|
-import os
|
|
|
-import math
|
|
|
-import pickle
|
|
|
-from time import sleep
|
|
|
-import jieba
|
|
|
-from multiprocessing import Process, Manager
|
|
|
-
|
|
|
-from zmq import QUEUE
|
|
|
-
|
|
|
-# TODO
|
|
|
-# 1. 研究jieba多进程切词(windows上无法使用自带的多进程切词功能)
|
|
|
-# 2. 进一步减少断点保存的次数(即调整保存间隔,或者全内存中)
|
|
|
-# 3. 加入在分词完成后保存结果以防丢失
|
|
|
-# 4. 在每个进程中分别保存分词结果,然后再统一合并
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-# 待处理的数据文件路径
|
|
|
-DATA_FILE = './data/合并结果.txt'
|
|
|
-
|
|
|
-# 分词保存
|
|
|
-CUT_OUTPUT_FILE = './data/分词结果.txt'
|
|
|
-
|
|
|
-# 是否分词结束后保存结果
|
|
|
-IS_ASSUME_TOTAL = True
|
|
|
-
|
|
|
-# 是否断点续存
|
|
|
-IS_ASSUME = False
|
|
|
-
|
|
|
-# 是否测试模式
|
|
|
-IS_TEST_MODE = False
|
|
|
-
|
|
|
-# 测试使用的数据量
|
|
|
-TEST_DATA_NUM = 100 * 10000
|
|
|
-
|
|
|
-# 测试模式时断点续存的保存间隔
|
|
|
-TEST_SAVE_INTERNAL = 200
|
|
|
-
|
|
|
-# 编码
|
|
|
-ENCODING_CHARSET = "UTF-8"
|
|
|
-
|
|
|
-# 配置文件路径
|
|
|
-CONFIG_PATH = "./data/pkl/cut_config_%d.pkl"
|
|
|
-
|
|
|
-# 处理进程数量
|
|
|
-PROCESS_NUM = os.cpu_count()
|
|
|
-
|
|
|
-# 保存间隔(多久保存一次)
|
|
|
-SAVE_INTERNAL = TEST_SAVE_INTERNAL if IS_TEST_MODE else 1000000
|
|
|
-
|
|
|
-# 处理进度提醒间隔
|
|
|
-PROCESS_TIPS_INTERNAL = 10 * 10000
|
|
|
-
|
|
|
-def save_config(config_path, config_obj):
|
|
|
- """
|
|
|
- 保存配置文件
|
|
|
- """
|
|
|
- with open(config_path, "wb") as f:
|
|
|
- pickle.dump(config_obj, f)
|
|
|
-
|
|
|
-
|
|
|
-def load_config(config_path):
|
|
|
- """
|
|
|
- 加载配置文件
|
|
|
- """
|
|
|
- with open(config_path, "rb") as f:
|
|
|
- return pickle.load(f)
|
|
|
-
|
|
|
-def cut_word(word):
|
|
|
- """
|
|
|
- 分词
|
|
|
- """
|
|
|
- word_root = jieba.cut_for_search(word)
|
|
|
- return list(word_root)
|
|
|
-
|
|
|
-def multiprocess_cut_word(process_name, data_list, result_dict, config_path, cut_config):
|
|
|
-
|
|
|
- """
|
|
|
- 多进程进行分词处理
|
|
|
- """
|
|
|
-
|
|
|
- print('进程:%s -> 分词处理开始' % process_name)
|
|
|
-
|
|
|
- if (IS_ASSUME_TOTAL or IS_ASSUME) and os.path.exists(config_path) :
|
|
|
- cut_config = load_config(config_path)
|
|
|
- print("进程:%s -> 进断点恢复 当前状态:%s,开始处理位置:%d" % (process_name, cut_config["state"], cut_config["current_pos"]))
|
|
|
-
|
|
|
- if cut_config['state'] == 'run':
|
|
|
-
|
|
|
- # 获取带分词的数据
|
|
|
- lines = data_list[cut_config['current_pos']:cut_config['end_pos']]
|
|
|
-
|
|
|
- # 统计需要处理的数据量
|
|
|
- total_num = len(lines)
|
|
|
- print("进程:%s ->剩余待处理数量:%d" % (process_name, total_num))
|
|
|
-
|
|
|
- for i, line in enumerate(lines):
|
|
|
- # 数据处理
|
|
|
- line = line.replace("\n", "")
|
|
|
- # 分词
|
|
|
- cut_config["word_dict"][line]=cut_word(line)
|
|
|
-
|
|
|
- # 断点保存
|
|
|
- if IS_ASSUME and i > 0 and i % SAVE_INTERNAL == 0:
|
|
|
- cut_config["current_pos"] = cut_config["current_pos"] + SAVE_INTERNAL
|
|
|
- print("进程:%s -> 断点保存 当前状态:%s,当前处理位置:%d" % (process_name, cut_config["state"], cut_config["current_pos"]))
|
|
|
- save_config(config_path, cut_config)
|
|
|
-
|
|
|
- # 处理进度提示
|
|
|
- if i > 0 and i % PROCESS_TIPS_INTERNAL == 0:
|
|
|
- print("进程:%s -> 当前处理进度:%d / %d" % (process_name, i, total_num))
|
|
|
-
|
|
|
- # 最终结果保存
|
|
|
- if IS_ASSUME_TOTAL or IS_ASSUME:
|
|
|
- print("进程:%s -> 保存最终的分词结果" % process_name)
|
|
|
- cut_config["state"] = "end"
|
|
|
- cut_config["current_pos"] = cut_config['end_pos']
|
|
|
- save_config(config_path, cut_config)
|
|
|
-
|
|
|
- # result_dict.update(cut_config["word_dict"])
|
|
|
- result_dict[process_name]=cut_config["word_dict"]
|
|
|
-
|
|
|
- print('进程:%s -> 分词处理结束' % process_name)
|
|
|
- else :
|
|
|
- # result_dict.update(cut_config['word_dict'])
|
|
|
- result_dict[process_name]=cut_config["word_dict"]
|
|
|
- print('进程:%s -> 断点恢复,分词处理结束' % process_name)
|
|
|
-
|
|
|
-def main():
|
|
|
-
|
|
|
- print("开始时间:", datetime.datetime.now())
|
|
|
-
|
|
|
- print("配置:启动用断点续存,保存间隔:%d" % SAVE_INTERNAL if IS_ASSUME else "配置:不启用断点续存")
|
|
|
- print("配置:保存最终的分词结果" if IS_ASSUME_TOTAL else "配置:不保存最终的分词结果")
|
|
|
-
|
|
|
- # 处理进程容器
|
|
|
- process_list = []
|
|
|
- # 配置文件容器
|
|
|
- config_list = []
|
|
|
-
|
|
|
- # 设置多进程共享变量
|
|
|
- manager = Manager()
|
|
|
- # 多进程共享的数据源
|
|
|
- global_list = manager.list()
|
|
|
- # 多进程返回的结果
|
|
|
- result_dict = manager.dict()
|
|
|
-
|
|
|
- print("加载数据")
|
|
|
- with open(DATA_FILE, "r", encoding="UTF-8") as f:
|
|
|
- if IS_TEST_MODE:
|
|
|
- print("当前处于测试模式,测试数据量:%d" % TEST_DATA_NUM)
|
|
|
- global_list.extend(f.readlines()[:TEST_DATA_NUM])
|
|
|
- else:
|
|
|
- global_list.extend(f.readlines())
|
|
|
-
|
|
|
- total_len = len(global_list)
|
|
|
- count = math.ceil(total_len / PROCESS_NUM)
|
|
|
- print("待处理总数量:%d, 数量区间:%d" % (total_len, count))
|
|
|
-
|
|
|
- # 构造配置
|
|
|
- for i in range(PROCESS_NUM):
|
|
|
- start_pos = i * count
|
|
|
- end_pos = i * count + count
|
|
|
- if end_pos >= total_len :
|
|
|
- end_pos = -1
|
|
|
- cut_config = {
|
|
|
- "state": "run",
|
|
|
- "start_pos": start_pos,
|
|
|
- "current_pos": start_pos,
|
|
|
- "end_pos": end_pos,
|
|
|
- "word_dict": {}
|
|
|
- }
|
|
|
- config_list.append(cut_config)
|
|
|
-
|
|
|
- print("配置", config_list)
|
|
|
-
|
|
|
- for i, config in enumerate(config_list):
|
|
|
- p = Process(target=multiprocess_cut_word, args=("进程-%d" % i, global_list, result_dict, CONFIG_PATH % i, config))
|
|
|
- p.start()
|
|
|
- process_list.append(p)
|
|
|
-
|
|
|
- for p in process_list:
|
|
|
- p.join()
|
|
|
-
|
|
|
- print("合并最终的分词结果:开始")
|
|
|
-
|
|
|
- result = []
|
|
|
- print("处理成list便于写入文件")
|
|
|
- for (process_name, word_dict) in result_dict.items():
|
|
|
- tmp = None
|
|
|
- for (key, value) in word_dict.items():
|
|
|
- tmp = ["%s,%s\n" % (key, value) for (key, value) in word_dict.items() ]
|
|
|
- result.extend(tmp)
|
|
|
- print("写入文件")
|
|
|
- with open(CUT_OUTPUT_FILE, "w", encoding=ENCODING_CHARSET) as f:
|
|
|
- f.writelines(result)
|
|
|
- # with open(CUT_OUTPUT_FILE, "w", encoding=ENCODING_CHARSET) as f:
|
|
|
- # for (process_name, word_dict) in result_dict.items():
|
|
|
- # for (key, value) in word_dict.items():
|
|
|
- # f.write("%s,%s\n" % (key, value))
|
|
|
- # # f.write("\n")
|
|
|
- print("合并最终的分词结果:结束")
|
|
|
-
|
|
|
- print("结束时间:", datetime.datetime.now())
|
|
|
-
|
|
|
-def main2():
|
|
|
- print("开始时间:", datetime.datetime.now())
|
|
|
-
|
|
|
- with open(CUT_OUTPUT_FILE, "a", encoding=ENCODING_CHARSET) as f:
|
|
|
- for i in range(4):
|
|
|
- config_p = CONFIG_PATH % i
|
|
|
- print("时间:%s, 读取:%s —— 开始" % (datetime.datetime.now(), config_p))
|
|
|
- config = load_config(config_p)
|
|
|
- print("时间:%s, 读取:%s —— 结束" % (datetime.datetime.now(), config_p))
|
|
|
-
|
|
|
- print("时间:%s,写入文件 -- 开始"% datetime.datetime.now())
|
|
|
- for (key, value) in config["word_dict"].items():
|
|
|
- f.write("%s,%s\n" % (key, value))
|
|
|
- print("时间:%s,写入文件 -- 结束"% datetime.datetime.now())
|
|
|
-
|
|
|
- print("结束时间:", datetime.datetime.now())
|
|
|
-
|
|
|
-
|
|
|
-if __name__ == '__main__':
|
|
|
- main2()
|