# -*- coding:utf-8 -*- from concurrent.futures import ProcessPoolExecutor, as_completed import mmap from multiprocessing.connection import wait import random import sys from time import sleep, time import os import config import tools import ast import re import stop_word import logging import math from multiprocessing import Process, Pool TITLE = "关键词倒排文件 统计" # def reverse_statistics(start_pos, end_pos): def handle(start_pos, end_pos): print("进程:%d, 统计开始,开始位置:%d,结束位置:%d" % (os.getpid(), start_pos, end_pos)) # 统计信息容器 reverse_statistics = {} with open(config.KEY_REVERSE_FILE, "r", encoding=config.ENCODING_CHARSET) as fr, \ mmap.mmap(fr.fileno(), 0 , access=mmap.ACCESS_READ) as fmmap: # 调整开始位置 fmmap.seek(start_pos) while True: cur_pos = fmmap.tell() # 越界检测 if cur_pos >= end_pos: break line = fmmap.readline().decode(config.ENCODING_CHARSET) index=line.index(",") key = line[:index] word_root = line[index+1:] word_root = ast.literal_eval(word_root) l = len(word_root) reverse_statistics[key]=l logging.info("进程:%d, 统计结束" % os.getpid()) return { "pid":os.getpid(), "statistics":reverse_statistics } def main2(): # 日志信息配置 tools.init_log() tools.log_start_msg(TITLE) # 进程数 process_num = os.cpu_count() # 加载缓存索引文件 key_reverse_index = tools.load_obj(config.KEY_REVERSE_INDEX_CACHE) # 对索引文件中的元素进行平分 # 转成列表,计算总长 和 平分后的处理区间 key_list = [key for key in key_reverse_index.keys()] key_list_len = len(key_list) internal = math.ceil(key_list_len / process_num ) # 利用 缓存索引文件 生成处理区间的位置信息 # 位置信息容器 pos_list = [] for i in range(process_num + 1): # 计算平分点在列表中的位置 l_pos = i * internal # 如果超过列表大小需要额外处理 if l_pos > key_list_len: l_pos = key_list_len -1 # 获取列表中的词根 key = key_list[l_pos:l_pos+1] # 根据词根获取位置信息 pos = key_reverse_index[key[0]] # 记录位置信息 pos_list.append(pos) # 使用用进程池 pool = ProcessPoolExecutor(process_num) # 生成任务 process_futures = [] for i in range(0, len(pos_list)-1): pos = pos_list[i: i+2] process_futures.append(pool.submit(handle, pos[0], pos[1])) # with open(config.KEY_REVERSE_STATISTICS_FILE, "w", encoding=config.ENCODING_CHARSET) as fw: # for future in as_completed(process_futures): # logging.info("部分子任务统计结束,保存至本地 - 开始") # for key, value in future.result().items(): # fw.write("%s,%s\n"%(key,value)) # logging.info("部分子任务统计结束,保存至本地 - 结束") results = [] for future in as_completed(process_futures): result = future.result() logging.info("进程:%d, 统计结束" % result["pid"]) results.append(result) logging.info("统计结束,保存至本地 - 开始") with open(config.KEY_REVERSE_STATISTICS_FILE, "w", encoding=config.ENCODING_CHARSET) as fw: for r in results: for key, value in r["statistics"].items(): fw.write("%s,%s\n"%(key,value)) logging.info("部分子任务统计结束,保存至本地 - 结束") pool.shutdown(wait=True) tools.log_end_msg(TITLE) # 测试代码3 # pool = ProcessPoolExecutor(3) # for i in range(1,5): # pool.submit(handle, "测试进程-%d"%i, i, i*10) # pool.shutdown(wait=True) # 测试代码2 # pool = Pool(3) # for i in range(1,5): # pool.apply_async(handle, ("测试进程-%d"%i, i, i*10)) # pool.close() # pool.join() # print("结束") # 测试代码1 # p = Process(target=handle, args=('测试进程', 1, 10)) # p.start() # p.join() # tools.init_log() # tools.log_start_msg(TITLE) # key_reverse_index = tools.load_obj(config.KEY_REVERSE_INDEX_CACHE) # tmp = [key for key in key_reverse_index.keys()] # l = len(tmp) # print("总长:", l) # internal = math.ceil(l / 4) # print("间隔:", internal) # pos = [] # for i in range(5): # t = i*internal # if t > l: # t = l-1 # pos.append(t) # print(pos) # for item in pos: # key = tmp[item:item+1] # print(key) # pos = key_reverse_index[key[0]] # print(key, pos) # reverse_statistics = {} # logging.info("统计开始") # with open(config.KEY_REVERSE_FILE, "r", encoding=config.ENCODING_CHARSET) as fr, \ # mmap.mmap(fr.fileno(), 0 , access=mmap.ACCESS_READ) as fmmap: # for line in fr: # index=line.index(",") # key = line[:index] # word_root = line[index+1:] # word_root = ast.literal_eval(word_root) # l = len(word_root) # reverse_statistics[key]=l # logging.info("统计结束,保存至本地") # with open(config.KEY_REVERSE_STATISTICS_FILE, "w", encoding=config.ENCODING_CHARSET) as fw: # for key, value in reverse_statistics: # fw.write("%s,%s\n"%(key,value)) # tools.log_end_msg(TITLE) def main(): tools.init_log() tools.log_start_msg(TITLE) reverse_statistics = {} logging.info("统计开始") with open(config.KEY_REVERSE_FILE, "r", encoding=config.ENCODING_CHARSET) as fr, \ mmap.mmap(fr.fileno(), 0 , access=mmap.ACCESS_READ) as fmmap: for line in fr: index=line.index(",") key = line[:index] word_root = line[index+1:] word_root = ast.literal_eval(word_root) l = len(word_root) reverse_statistics[key]=l logging.info("统计结束,保存至本地") with open(config.KEY_REVERSE_STATISTICS_FILE, "w", encoding=config.ENCODING_CHARSET) as fw: for key, value in reverse_statistics: fw.write("%s,%s\n"%(key,value)) tools.log_end_msg(TITLE) if __name__ == "__main__": main2()