# -*- coding:utf-8 -*- from asyncio import FIRST_COMPLETED from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed, wait from multiprocessing import Manager import multiprocessing from queue import Queue import sys from time import time import os from unittest import result import config import tools import ast import re import math import stop_word import logging import mmap import threading TITLE = "生成关键词倒排和统计信息" def thread_handle(path, start_pos, end_pos): pattern = re.compile(config.KEY_RE_PATTERAN, re.I) t = threading.current_thread() print("线城-%d 开始任务,开始位置:%d,结束位置:%d" % (t.ident, start_pos, end_pos)) # 临时容器 key_reverse = [] with open(path, "r", encoding=config.ENCODING_CHARSET) as fkey, \ mmap.mmap(fkey.fileno(), 0, access=mmap.ACCESS_READ) as fmmap: # 移动到开始位置 fmmap.seek(start_pos) while True: # 获取当前处理位置 cur_pos = fmmap.tell() # 越界检查 if cur_pos > end_pos: break line = fmmap.readline() key_reverse.append(line) # 暂留 # # 读取关键词数据 # line = fmmap.readline().decode("UTF-8") # # 如果到行尾则结束 # if not line: # break # # 提取数据 # m = pattern.match(line) # # 获取关键词序号、词根 # index = m.group(1) # key_root = m.group(3) # # 转换成真正的list对象 # key_root = ast.literal_eval(key_root) # key_reverse.append((index, key_root)) # print("线城-%d 结束任务" % t.ident) # return key_reverse return { "tid": t.ident, "key_reverse": key_reverse } def process_handle(stop_word_cache, lines): pid = os.getpid() print("进程-%d 接收到数据,开始计算") pattern = re.compile(config.KEY_RE_PATTERAN, re.I) key_reverse_dict={} for line in lines: line = line.decode("UTF-8") # 提取数据 m = pattern.match(line) # 获取关键词序号、词根 index = m.group(1) key_root = m.group(3) # 转换成真正的list对象 for item in ast.literal_eval(key_root): # 排除停用词 if item in stop_word_cache: continue # 构建倒排表和统计数据量 val = key_reverse_dict.get(item) if val: key_reverse_dict[item]["count"] = key_reverse_dict[item]["count"] + 1 key_reverse_dict[item]["indexs"].append(index) else: key_reverse_dict[item]= { "count":1, "indexs":[] } return { "pid": pid, "key_reverse_dict":key_reverse_dict } def multi_thread(): """ 构建待排表(多线程) """ key_reverse_dict = {} def process_handle_result(future): print("进程-%d 执行结束,返回部分数据 合并数据" % result["pid"]) result = future.result() for key,value in result["key_reverse_dict"].items(): val = key_reverse_dict.get(key) if val: key_reverse_dict[key]["count"] = key_reverse_dict[key]["count"] + value["count"] key_reverse_dict[key]["indexs"].append(value["indexs"]) else: key_reverse_dict[key]= value print("进程-%d 合并数据结束" % result["pid"]) def thread_handle_result(future): result = future.result() logging.info("线程-%d 执行结束,返回部分数据,提交至计算进程" % result["tid"]) process_future = process_pool.submit(process_handle, stop_word_cache, result["key_reverse"]) process_future.add_done_callback(process_handle_result) tools.init_log() tools.log_start_msg(TITLE) logging.info("目前执行的是多线程版") logging.info("开始执行初始化") # 提取规则 pattern = re.compile(config.KEY_RE_PATTERAN, re.I) # 停用表 stop_word_cache = stop_word.load_stop_word() # 进程处理数 thread_num = 1 process_num = 2 worker_num = 100 # 关键表索引 key_index_cache = tools.load_obj(config.KEY_INDEX_CACHE) logging.info("开始对数据进行分段计算") # 对索引文件中的元素进行平分 # 转成列表,计算总长 和 平分后的处理区间 key_index_list = [key for key in key_index_cache.keys()] total_len = len(key_index_list) internal = math.ceil(total_len / worker_num ) # 利用 缓存索引文件 生成处理区间的位置信息 # 位置信息容器 pos_list = [] for i in range(worker_num + 1): # 计算平分点在列表中的位置 l_pos = i * internal # 如果大于等于列表大小需要额外处理 if l_pos >= total_len: l_pos = total_len -1 # 获取列表中的词根 key_index = key_index_list[l_pos:l_pos+1] # 根据词根获取位置信息 pos = key_index_cache[key_index[0]] # 记录位置信息 pos_list.append(pos) logging.info("把分段结果提交至多线程执行") # 生成任务 with ThreadPoolExecutor(thread_num) as thread_pool, \ ProcessPoolExecutor(process_num) as process_pool: # thread_futures = [] for i in range(0, len(pos_list)-1): pos = pos_list[i: i+2] # thread_futures.append() thread_future = thread_pool.submit(thread_handle, config.KEY_FILE, pos[0], pos[1]) thread_future.add_done_callback(thread_handle_result) # 等待数据返回 logging.info("等待多线程执行结束") thread_pool.shutdown(wait=True) process_pool.shutdown(wait=True) logging.info("已获取全部子进程返回部分结果,总数据量:%d" % len(key_reverse_dict)) return # 根据关键词数量进行排序,这里通过items()方法转成元组列表,才能进行排序 logging.info("根据关键词数量进行倒序排列") sorted_reverse_list = sorted(key_reverse_dict.items(), key=lambda x: x[1]["count"], reverse=True) # 保存到本地文件 logging.info("保存到本地") with open(config.KEY_REVERSE_FILE, "w", encoding=config.ENCODING_CHARSET) as f: for key, value in sorted_reverse_list: f.write("%s,%d,%s\n" % (key, value["count"], value["indexs"])) tools.log_end_msg(TITLE) def handle(path, start_pos, end_pos, share_pattern, stop_word_cache): pid = os.getpid() print("进程-%d 开始任务,开始位置:%d,结束位置:%d" % (pid, start_pos, end_pos)) # 临时容器 key_reverse = {} # 提取数据用的正则 pattern = share_pattern.value with open(path, "r", encoding=config.ENCODING_CHARSET) as fkey, \ mmap.mmap(fkey.fileno(), 0, access=mmap.ACCESS_READ) as fmmap: # 移动到开始位置 fmmap.seek(start_pos) while True: # 获取当前处理位置 cur_pos = fmmap.tell() # 越界检查 if cur_pos > end_pos: break # 读取关键词数据 line = fmmap.readline().decode("UTF-8") # 如果到行尾则结束 if not line: break # 提取数据 m = pattern.match(line) # 获取关键词序号、词根 index = m.group(1) key_root = m.group(3) # 转换成真正的list对象 for item in ast.literal_eval(key_root): # 排除停用词 if item in stop_word_cache: continue # 构建倒排表和统计数据量 val = key_reverse.get(item) if val: count = key_reverse[item]["count"] key_reverse[item]["count"] = count + 1 key_reverse[item]["indexs"].append(index) else: key_reverse[item]= { "count":1, "indexs":[] } print("进程-%d 任务执行结束" % pid) return key_reverse def multi_process(): """ 构建待排表(多进程) """ tools.init_log() tools.log_start_msg(TITLE) logging.info("目前执行的是多进程版") logging.info("开始执行初始化") manager = Manager() # 提取规则 share_pattern = manager.Value("pattern", re.compile(config.KEY_RE_PATTERAN, re.I)) # 停用表 stop_word_cache = manager.dict(stop_word.load_stop_word()) # 进程处理数 process_num = os.cpu_count() - 1 worker_num = process_num pool = ProcessPoolExecutor(max_workers=process_num) # 关键表索引 key_index_cache = tools.load_obj(config.KEY_INDEX_CACHE) logging.info("开始对数据进行分段计算") # 对索引文件中的元素进行平分 # 转成列表,计算总长 和 平分后的处理区间 key_index_list = [key for key in key_index_cache.keys()] total_len = len(key_index_list) internal = math.ceil(total_len / worker_num ) # 利用 缓存索引文件 生成处理区间的位置信息 # 位置信息容器 pos_list = [] for i in range(worker_num + 1): # 计算平分点在列表中的位置 l_pos = i * internal # 如果大于等于列表大小需要额外处理 if l_pos >= total_len: l_pos = total_len -1 # 获取列表中的词根 key_index = key_index_list[l_pos:l_pos+1] # 根据词根获取位置信息 pos = key_index_cache[key_index[0]] # 记录位置信息 pos_list.append(pos) logging.info("把分段结果提交至多进程执行") # 生成任务 process_futures = [] for i in range(0, len(pos_list)-1): pos = pos_list[i: i+2] process_futures.append(pool.submit(handle, config.KEY_FILE, pos[0], pos[1], share_pattern, stop_word_cache)) # 等待数据返回 logging.info("等待多进程执行结束") wait(process_futures,return_when=FIRST_COMPLETED) key_reverse_dict = {} for future in as_completed(process_futures): logging.info("有子进程执行结束,返回部分结果") result = future.result() for key, value in result.items(): # 进行数据合并 val_dict = key_reverse_dict.get(key) if val_dict: count = val_dict["count"] key_reverse_dict[key]["count"] = count + value["count"] key_reverse_dict[key]["indexs"].extend(value["indexs"]) else: key_reverse_dict[key] = value logging.info("已获取全部子进程返回部分结果") # 根据关键词数量进行排序,这里通过items()方法转成元组列表,才能进行排序 logging.info("根据关键词数量进行倒序排列") sorted_reverse_list = sorted(key_reverse_dict.items(), key=lambda x: x[1]["count"], reverse=True) # 保存到本地文件 logging.info("保存到本地") with open(config.KEY_REVERSE_FILE, "w", encoding=config.ENCODING_CHARSET) as f: for key, value in sorted_reverse_list: f.write("%s,%d,%s\n" % (key, value["count"], value["indexs"])) tools.log_end_msg(TITLE) def one_process(): """ 构建倒排表(单进程版) """ tools.init_log() tools.log_start_msg(TITLE) logging.info("目前执行的是单进程版") # 提取规则 pattern = re.compile(config.KEY_RE_PATTERAN, re.I) # 倒排表 容器 key_reverse = {} # 停用表 stop_word_cache = stop_word.load_stop_word() with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as fkey: # 获取文件总大小,获取后需要复原光标位置 fkey.seek(0, os.SEEK_END) total_num = fkey.tell() fkey.seek(0) while True: # 获取当前处理位置 cur_pos = fkey.tell() # 进度提示 tools.tip_in_size(total_num, cur_pos) # 读取关键词数据 line = fkey.readline() # 如果到行尾则结束 if not line: break # 提取数据 m = pattern.match(line) # 获取关键词序号 index = m.group(1) # 获取词根 key_root = m.group(3) # 转换成真正的list对象 for item in ast.literal_eval(key_root): # 排除停用词 if item in stop_word_cache: continue # 构建倒排表 val = key_reverse.get(item) if val: count = key_reverse[item]["count"] key_reverse[item]["count"] = count + 1 key_reverse[item]["indexs"].append(index) else: key_reverse[item]= { "count":1, "indexs":[] } logging.info("根据关键词数量进行倒序排列") sorted_reverse_list = sorted(key_reverse.items(), key=lambda x: x[1]["count"], reverse=True) # 保存到本地文件 logging.info("保存到本地") with open(config.KEY_REVERSE_FILE, "w", encoding=config.ENCODING_CHARSET) as f: for key, value in sorted_reverse_list: f.write("%s,%d,%s\n" % (key, value["count"], value["indexs"])) tools.log_end_msg(TITLE) if __name__ == "__main__": multi_thread()