| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- # -*- coding:utf-8 -*-
- import os
- import math
- import pickle
- import jieba
- from multiprocessing import Process, Manager
- # 处理进程数量
- PROCESS_NUM = 5
- # 保存间隔(多久保存一次)
- SAVE_INTERNAL = 100000
- # 配置文件路径
- CONFIG_PATH = "./cut_config_%d.pkl"
- # 待处理的数据文件路径
- DATA_FILE = './merge.txt'
- # 处理进程容器
- process_list = []
- # 配置文件容器
- config_list = []
- def save_config(config_path, config_obj):
- """
- 保存配置文件
- """
- with open(config_path, "wb") as f:
- pickle.dump(config_obj, f)
- def load_config(config_path):
- """
- 加载配置文件
- """
- with open(config_path, "rb") as f:
- return pickle.load(f)
- def cut_word(word):
- """
- 分词
- """
- word_root = jieba.cut_for_search(word)
- return list(word_root)
- def multiprocess_cut_word(process_name, config_path, cut_config):
- """
- 多进程进行分词处理
- """
- print('进程:%s -> 分词处理开始' % process_name)
- if os.path.exists(config_path) :
- cut_config = load_config(config_path)
- print("进程:%s -> 进断点恢复 当前状态:%s,开始处理位置:%d" % (process_name, cut_config["state"], cut_config["current_pos"]))
- if cut_config['state'] == 'run':
- with open(DATA_FILE, "r", encoding="UTF-8") as f:
- lines = f.readlines()
- lines = lines[cut_config['current_pos']:cut_config['end_pos']]
- print("进程:%s ->剩余待处理数量:%d" % (process_name, len(lines)))
- for i, line in enumerate(lines):
- line = line.replace("\n", "")
- word_root = cut_word(line)
- cut_config["word_dict"][line]=word_root
- if i > 0 and i % SAVE_INTERNAL == 0:
- cut_config["current_pos"] = cut_config["current_pos"] + SAVE_INTERNAL
- print("进程:%s -> 保存位置 当前状态:%s,开始处理位置:%d" % (process_name, cut_config["state"], cut_config["current_pos"]))
- save_config(config_path, cut_config)
-
- cut_config["state"] = "end"
- save_config(config_path, cut_config)
- print('进程:%s -> 分词处理结束' % process_name)
- else :
- print('进程:%s -> 断点恢复 分词处理结束' % process_name)
- def main():
- with open(DATA_FILE, "r", encoding="UTF-8") as f:
- lines = f.readlines()
- total_len = len(lines)
- count = math.ceil(total_len / PROCESS_NUM)
- print("总数量:%d, 数量区间:%d" % (total_len, count))
- for i in range(PROCESS_NUM):
- start_pos = i * count
- end_pos = i * count + count
- if end_pos >= total_len :
- end_pos = -1
- cut_config = {
- "state": "run",
- "start_pos": start_pos,
- "current_pos": start_pos,
- "end_pos": end_pos,
- "word_dict": {}
- }
- config_list.append(cut_config)
- print("配置", config_list)
- for i, config in enumerate(config_list):
- p = Process(target=multiprocess_cut_word, args=("进程-%d" % i, CONFIG_PATH % i, config))
- p.start()
- process_list.append(p)
- for p in process_list:
- p.join()
- if __name__ == '__main__':
- main()
|