| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- # -*- coding:utf-8 -*-
- from concurrent.futures import ProcessPoolExecutor, as_completed
- import logging
- import os
- from time import time
- import config
- import tools
- import jieba
- import mmap
- # 优化
- # 1. 更改为使用多进程
- # 日志配置初始化
- tools.init_log()
- def sub_process(start_pos, end_pos, stop_word):
- """
- 子进程
- """
- pid = os.getpid()
- logging.debug("子进程-%d 开始执行分词任务,开始位置:%d,结束位置:%d" % (pid, start_pos, end_pos))
- # 临时容器
- tmp_list = []
- # 开始时间
- start_time = time()
-
- with open(config.MERGE_FILE, "r", encoding=config.ENCODING_CHARSET) as f, \
- mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
- fmmap.seek(start_pos)
- while True:
- # 越界检测
- cur_pos = fmmap.tell()
- if cur_pos >= end_pos:
- break
- # 读取关键词
- key = fmmap.readline().decode("UTF-8").replace("\r","").replace("\n","")
- # 读取不到任何内容结束执行
- if not key :
- continue
-
- # 分词
- tmp_stems = list(jieba.cut_for_search(key))
- # 排除停用词
- stems = set()
- for stem in tmp_stems:
- if stem in stop_word:
- continue
- stems.add(stem)
-
- # 以防止词根数为0
- if len(stems) == 0:
- continue
-
- tmp_list.append((key , list(stems)))
-
- logging.debug("子进程-%d 执行分词任务结束,耗时:%f" % (pid, (time() - start_time)))
-
- return tmp_list
- def main_process():
- """
- 主进程
- """
- # 进程池数
- process_num = 4
- # 任务分割大小
- split_num = 500000
- # 位置信息索引
- pos_index = []
- # 总关键词数量
- total_num = 0
- # 加载停用词
- stop_word = tools.load_stop_word()
- start_time = time()
- # 记录位置信息
- logging.info("主进程 开始构建位置索引信息")
- with open(config.MERGE_FILE, "r", encoding=config.ENCODING_CHARSET) as f, \
- mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
- while True:
- # 获取当前位置
- cur_pos = fmmap.tell()
- # 移动到一下行
- line = fmmap.readline()
- # 结束检测
- if not line:
- break
- # 记录
- pos_index.append(cur_pos)
- # 计算总关键词数量
- total_num = len(pos_index)
- # 划分子任务
- logging.info("主进程 开始划分子任务")
- tasks = tools.avg_split_task(total_num, split_num)
- with ProcessPoolExecutor(process_num) as process_pool, \
- open(config.KEY_FILE, "w", encoding=config.ENCODING_CHARSET) as f_key:
- logging.info("主进程 提交任务到子进程")
- process_futures = [process_pool.submit(sub_process, pos_index[task[0]], pos_index[task[1]], stop_word) for task in tasks]
- # 移除无效变量 以防占用内存
- del pos_index
- del tasks
- # 序号计算
- count = -1
- for p_future in as_completed(process_futures):
- result = p_future.result()
- if result:
- for key, stems in result:
- count = count + 1
- # 写入文件中
- f_key.write("%d,%s,%s\n"%(count, key, list(stems)))
-
- # 移除无效变量 以防占用内存
- process_futures.remove(p_future)
-
- logging.info("主进程 构建KEY表耗时:%f" % (time() - start_time))
- if __name__ == '__main__':
- TITLE = "关键词表 生成"
- tools.log_start_msg(TITLE)
- main_process()
- tools.log_end_msg(TITLE)
|