# -*- coding:utf-8 -*- from concurrent.futures import ProcessPoolExecutor, as_completed import logging import os from time import time import config import tools import jieba import mmap # 优化 # 1. 更改为使用多进程 # 日志配置初始化 tools.init_log() def sub_process(start_pos, end_pos, stop_word): """ 子进程 """ pid = os.getpid() logging.debug("子进程-%d 开始执行分词任务,开始位置:%d,结束位置:%d" % (pid, start_pos, end_pos)) # 临时容器 tmp_list = [] # 开始时间 start_time = time() with open(config.MERGE_FILE, "r", encoding=config.ENCODING_CHARSET) as f, \ mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as fmmap: fmmap.seek(start_pos) while True: # 越界检测 cur_pos = fmmap.tell() if cur_pos >= end_pos: break # 读取关键词 key = fmmap.readline().decode("UTF-8").replace("\r","").replace("\n","") # 读取不到任何内容结束执行 if not key : continue # 分词 tmp_stems = list(jieba.cut_for_search(key)) # 排除停用词 stems = set() for stem in tmp_stems: if stem in stop_word: continue stems.add(stem) # 以防止词根数为0 if len(stems) == 0: continue tmp_list.append((key , list(stems))) logging.debug("子进程-%d 执行分词任务结束,耗时:%f" % (pid, (time() - start_time))) return tmp_list def main_process(): """ 主进程 """ # 进程池数 process_num = 4 # 任务分割大小 split_num = 500000 # 位置信息索引 pos_index = [] # 总关键词数量 total_num = 0 # 加载停用词 stop_word = tools.load_stop_word() start_time = time() # 记录位置信息 logging.info("主进程 开始构建位置索引信息") with open(config.MERGE_FILE, "r", encoding=config.ENCODING_CHARSET) as f, \ mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as fmmap: while True: # 获取当前位置 cur_pos = fmmap.tell() # 移动到一下行 line = fmmap.readline() # 结束检测 if not line: break # 记录 pos_index.append(cur_pos) # 计算总关键词数量 total_num = len(pos_index) # 划分子任务 logging.info("主进程 开始划分子任务") tasks = tools.avg_split_task(total_num, split_num) with ProcessPoolExecutor(process_num) as process_pool, \ open(config.KEY_FILE, "w", encoding=config.ENCODING_CHARSET) as f_key: logging.info("主进程 提交任务到子进程") process_futures = [process_pool.submit(sub_process, pos_index[task[0]], pos_index[task[1]], stop_word) for task in tasks] # 移除无效变量 以防占用内存 del pos_index del tasks # 序号计算 count = -1 for p_future in as_completed(process_futures): result = p_future.result() if result: for key, stems in result: count = count + 1 # 写入文件中 f_key.write("%d,%s,%s\n"%(count, key, list(stems))) # 移除无效变量 以防占用内存 process_futures.remove(p_future) logging.info("主进程 构建KEY表耗时:%f" % (time() - start_time)) if __name__ == '__main__': TITLE = "关键词表 生成" tools.log_start_msg(TITLE) main_process() tools.log_end_msg(TITLE)