| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- # -*- coding:utf-8 -*-
- from concurrent.futures import ProcessPoolExecutor, as_completed
- import mmap
- from multiprocessing.connection import wait
- import random
- import sys
- from time import sleep, time
- import os
- import config
- import tools
- import ast
- import re
- import stop_word
- import logging
- import math
- from multiprocessing import Process, Pool
- TITLE = "关键词倒排文件 统计"
- # def reverse_statistics(start_pos, end_pos):
- def handle(start_pos, end_pos):
- print("进程:%d, 统计开始,开始位置:%d,结束位置:%d" % (os.getpid(), start_pos, end_pos))
- # 统计信息容器
- reverse_statistics = {}
-
- with open(config.KEY_REVERSE_FILE, "r", encoding=config.ENCODING_CHARSET) as fr, \
- mmap.mmap(fr.fileno(), 0 , access=mmap.ACCESS_READ) as fmmap:
- # 调整开始位置
- fmmap.seek(start_pos)
- while True:
- cur_pos = fmmap.tell()
- # 越界检测
- if cur_pos >= end_pos:
- break
-
- line = fmmap.readline().decode(config.ENCODING_CHARSET)
- index=line.index(",")
- key = line[:index]
- word_root = line[index+1:]
- word_root = ast.literal_eval(word_root)
- l = len(word_root)
-
- reverse_statistics[key]=l
-
- logging.info("进程:%d, 统计结束" % os.getpid())
- return {
- "pid":os.getpid(),
- "statistics":reverse_statistics
- }
-
- def main2():
- # 日志信息配置
- tools.init_log()
- tools.log_start_msg(TITLE)
- # 进程数
- process_num = os.cpu_count()
- # 加载缓存索引文件
- key_reverse_index = tools.load_obj(config.KEY_REVERSE_INDEX_CACHE)
- # 对索引文件中的元素进行平分
- # 转成列表,计算总长 和 平分后的处理区间
- key_list = [key for key in key_reverse_index.keys()]
- key_list_len = len(key_list)
- internal = math.ceil(key_list_len / process_num )
- # 利用 缓存索引文件 生成处理区间的位置信息
- # 位置信息容器
- pos_list = []
- for i in range(process_num + 1):
- # 计算平分点在列表中的位置
- l_pos = i * internal
- # 如果超过列表大小需要额外处理
- if l_pos > key_list_len:
- l_pos = key_list_len -1
- # 获取列表中的词根
- key = key_list[l_pos:l_pos+1]
- # 根据词根获取位置信息
- pos = key_reverse_index[key[0]]
- # 记录位置信息
- pos_list.append(pos)
-
- # 使用用进程池
- pool = ProcessPoolExecutor(process_num)
- # 生成任务
- process_futures = []
- for i in range(0, len(pos_list)-1):
- pos = pos_list[i: i+2]
- process_futures.append(pool.submit(handle, pos[0], pos[1]))
-
- # with open(config.KEY_REVERSE_STATISTICS_FILE, "w", encoding=config.ENCODING_CHARSET) as fw:
- # for future in as_completed(process_futures):
- # logging.info("部分子任务统计结束,保存至本地 - 开始")
- # for key, value in future.result().items():
- # fw.write("%s,%s\n"%(key,value))
- # logging.info("部分子任务统计结束,保存至本地 - 结束")
-
- results = []
- for future in as_completed(process_futures):
- result = future.result()
- logging.info("进程:%d, 统计结束" % result["pid"])
- results.append(result)
- logging.info("统计结束,保存至本地 - 开始")
- with open(config.KEY_REVERSE_STATISTICS_FILE, "w", encoding=config.ENCODING_CHARSET) as fw:
- for r in results:
- for key, value in r["statistics"].items():
- fw.write("%s,%s\n"%(key,value))
- logging.info("部分子任务统计结束,保存至本地 - 结束")
- pool.shutdown(wait=True)
- tools.log_end_msg(TITLE)
- # 测试代码3
- # pool = ProcessPoolExecutor(3)
- # for i in range(1,5):
- # pool.submit(handle, "测试进程-%d"%i, i, i*10)
- # pool.shutdown(wait=True)
- # 测试代码2
- # pool = Pool(3)
- # for i in range(1,5):
- # pool.apply_async(handle, ("测试进程-%d"%i, i, i*10))
- # pool.close()
- # pool.join()
- # print("结束")
- # 测试代码1
- # p = Process(target=handle, args=('测试进程', 1, 10))
- # p.start()
- # p.join()
-
- # tools.init_log()
- # tools.log_start_msg(TITLE)
- # key_reverse_index = tools.load_obj(config.KEY_REVERSE_INDEX_CACHE)
- # tmp = [key for key in key_reverse_index.keys()]
-
- # l = len(tmp)
- # print("总长:", l)
- # internal = math.ceil(l / 4)
- # print("间隔:", internal)
- # pos = []
- # for i in range(5):
- # t = i*internal
- # if t > l:
- # t = l-1
- # pos.append(t)
- # print(pos)
- # for item in pos:
- # key = tmp[item:item+1]
- # print(key)
- # pos = key_reverse_index[key[0]]
- # print(key, pos)
-
- # reverse_statistics = {}
- # logging.info("统计开始")
- # with open(config.KEY_REVERSE_FILE, "r", encoding=config.ENCODING_CHARSET) as fr, \
- # mmap.mmap(fr.fileno(), 0 , access=mmap.ACCESS_READ) as fmmap:
- # for line in fr:
- # index=line.index(",")
- # key = line[:index]
- # word_root = line[index+1:]
- # word_root = ast.literal_eval(word_root)
- # l = len(word_root)
-
- # reverse_statistics[key]=l
- # logging.info("统计结束,保存至本地")
- # with open(config.KEY_REVERSE_STATISTICS_FILE, "w", encoding=config.ENCODING_CHARSET) as fw:
- # for key, value in reverse_statistics:
- # fw.write("%s,%s\n"%(key,value))
- # tools.log_end_msg(TITLE)
- def main():
- tools.init_log()
- tools.log_start_msg(TITLE)
-
- reverse_statistics = {}
- logging.info("统计开始")
- with open(config.KEY_REVERSE_FILE, "r", encoding=config.ENCODING_CHARSET) as fr, \
- mmap.mmap(fr.fileno(), 0 , access=mmap.ACCESS_READ) as fmmap:
- for line in fr:
- index=line.index(",")
- key = line[:index]
- word_root = line[index+1:]
- word_root = ast.literal_eval(word_root)
- l = len(word_root)
-
- reverse_statistics[key]=l
- logging.info("统计结束,保存至本地")
- with open(config.KEY_REVERSE_STATISTICS_FILE, "w", encoding=config.ENCODING_CHARSET) as fw:
- for key, value in reverse_statistics:
- fw.write("%s,%s\n"%(key,value))
- tools.log_end_msg(TITLE)
- if __name__ == "__main__":
- main2()
|