# -*- coding:utf-8 -*- from concurrent.futures import ProcessPoolExecutor, as_completed import math from time import time import os import config import tools import re import logging import mmap tools.init_log() if __name__ != "__main__": # 正则提取 # 倒排表 索引 index_re = r"(\d+)," index_pattern = re.compile(index_re, re.I) # KEY表 词根 stem_re = r"'([^,]*)'" stem_pattern = re.compile(stem_re, re.I) def sub_process(start_pos, end_pos): """ 子进程 """ pid = os.getpid() logging.debug("进程-%d 开始执行任务,开始位置:%d,结束位置:%d" % (pid, start_pos, end_pos)) # 开始时间 start_time = time() # 倒排表和统计信息容器 reverse_dict = {} with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as f_key, \ mmap.mmap(f_key.fileno(), 0, access=mmap.ACCESS_READ) as f_mmap: # 移动到开始位置 f_mmap.seek(start_pos) while True: # 获取当前处理位置 cur_pos = f_mmap.tell() # 越界检查 if cur_pos >= end_pos: break # 提取数据 line = f_mmap.readline().decode(config.ENCODING_CHARSET) m = index_pattern.match(line) # 获取关键词序号、词根 index = m.group(1) stems = stem_pattern.findall(line) # 构建倒排表和统计数据量 for stem in stems: obj = reverse_dict.get(stem) if obj: obj["count"] = obj["count"] + 1 obj["indexs"].add(index) else: tmp_indexs = set() tmp_indexs.add(index) reverse_dict[stem]= { "count": 1, "indexs": tmp_indexs } logging.debug("子进程-%d 任务结束,耗时:%f" % (pid, (time() - start_time))) return reverse_dict def main_process(): logging.info("主进程 开始执行初始化") # 进程处理数 process_num = 4 # 关键表索引 key_index = tools.load_obj(config.KEY_INDEX_CACHE) # 开始时间 start_time = time() # 关键词总数 total_num = len(key_index) # 任务分割大小 split_num = math.ceil(total_num/process_num) logging.info("主进程 开始划分子任务") tasks = tools.avg_split_task(total_num, split_num) with ProcessPoolExecutor(process_num) as process_pool: logging.info("主进程 提交任务到子进程") process_futures = [process_pool.submit(sub_process, key_index[task[0]], key_index[task[1]]) for task in tasks] # 移除无效变量 以防占用内存 del tasks del key_index # 倒排表和统计信息容器 reverse_dict = {} # 进行数据合并 for p_future in as_completed(process_futures): result = p_future.result() for key, val_obj in result.items(): reverse_obj = reverse_dict.get(key) if reverse_obj: reverse_obj["count"] = reverse_obj["count"] + val_obj["count"] reverse_obj["indexs"] = reverse_obj["indexs"] | val_obj["indexs"] else: reverse_dict[key] = val_obj # 移除无效变量 以防占用内存 process_futures.remove(p_future) logging.info("主进程 已获取全部子进程返回结果,总数据量:%d" % len(reverse_dict)) logging.info("主进程 对词根关联的索引进行排序和转换") for val_obj in reverse_dict.values(): val_obj["indexs"] = list(val_obj["indexs"]) val_obj["indexs"].sort() # 根据关键词数量进行排序,这里通过items()方法转成元组列表,才能进行排序 logging.info("主进程 根据关键词数量进行排列") sorted_reverse_list = sorted(reverse_dict.items(), key=lambda x: x[1]["count"], reverse=True) # 保存到本地文件 logging.info("主进程 保存到本地") with open(config.KEY_REVERSE_FILE, "w", encoding=config.ENCODING_CHARSET) as f_reverse, \ open(config.KEY_REVERSE_STATISTICS_FILE, "w", encoding=config.ENCODING_CHARSET) as f_statistics: for key, val_obj in sorted_reverse_list: f_reverse.write("%s,%s\n" % (key, val_obj["indexs"])) f_statistics.write("%s,%d\n" % (key, val_obj["count"])) logging.info("主进程 构建倒排索引耗时:%f" % (time() - start_time)) if __name__ == "__main__": TITLE = "生成关键词倒排和统计信息" tools.log_start_msg(TITLE) main_process() tools.log_end_msg(TITLE)