|
|
@@ -0,0 +1,218 @@
|
|
|
+# -*- coding:utf-8 -*-
|
|
|
+
|
|
|
+from concurrent.futures import ProcessPoolExecutor, as_completed
|
|
|
+import mmap
|
|
|
+from multiprocessing.connection import wait
|
|
|
+import random
|
|
|
+import sys
|
|
|
+from time import sleep, time
|
|
|
+import os
|
|
|
+import config
|
|
|
+import tools
|
|
|
+import ast
|
|
|
+import re
|
|
|
+import stop_word
|
|
|
+import logging
|
|
|
+import math
|
|
|
+from multiprocessing import Process, Pool
|
|
|
+
|
|
|
+TITLE = "关键词倒排文件 统计"
|
|
|
+
|
|
|
+# def reverse_statistics(start_pos, end_pos):
|
|
|
+
|
|
|
+def handle(start_pos, end_pos):
|
|
|
+
|
|
|
+ print("进程:%d, 统计开始,开始位置:%d,结束位置:%d" % (os.getpid(), start_pos, end_pos))
|
|
|
+
|
|
|
+ # 统计信息容器
|
|
|
+ reverse_statistics = {}
|
|
|
+
|
|
|
+ with open(config.KEY_REVERSE_FILE, "r", encoding=config.ENCODING_CHARSET) as fr, \
|
|
|
+ mmap.mmap(fr.fileno(), 0 , access=mmap.ACCESS_READ) as fmmap:
|
|
|
+ # 调整开始位置
|
|
|
+ fmmap.seek(start_pos)
|
|
|
+
|
|
|
+ while True:
|
|
|
+ cur_pos = fmmap.tell()
|
|
|
+ # 越界检测
|
|
|
+ if cur_pos >= end_pos:
|
|
|
+ break
|
|
|
+
|
|
|
+ line = fmmap.readline().decode(config.ENCODING_CHARSET)
|
|
|
+ index=line.index(",")
|
|
|
+ key = line[:index]
|
|
|
+ word_root = line[index+1:]
|
|
|
+ word_root = ast.literal_eval(word_root)
|
|
|
+ l = len(word_root)
|
|
|
+
|
|
|
+ reverse_statistics[key]=l
|
|
|
+
|
|
|
+ logging.info("进程:%d, 统计结束" % os.getpid())
|
|
|
+
|
|
|
+ return {
|
|
|
+ "pid":os.getpid(),
|
|
|
+ "statistics":reverse_statistics
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+def main2():
|
|
|
+ # 日志信息配置
|
|
|
+ tools.init_log()
|
|
|
+ tools.log_start_msg(TITLE)
|
|
|
+
|
|
|
+ # 进程数
|
|
|
+ process_num = os.cpu_count()
|
|
|
+
|
|
|
+ # 加载缓存索引文件
|
|
|
+ key_reverse_index = tools.load_obj(config.KEY_REVERSE_INDEX_CACHE)
|
|
|
+
|
|
|
+ # 对索引文件中的元素进行平分
|
|
|
+
|
|
|
+ # 转成列表,计算总长 和 平分后的处理区间
|
|
|
+ key_list = [key for key in key_reverse_index.keys()]
|
|
|
+ key_list_len = len(key_list)
|
|
|
+ internal = math.ceil(key_list_len / process_num )
|
|
|
+
|
|
|
+ # 利用 缓存索引文件 生成处理区间的位置信息
|
|
|
+ # 位置信息容器
|
|
|
+ pos_list = []
|
|
|
+ for i in range(process_num + 1):
|
|
|
+ # 计算平分点在列表中的位置
|
|
|
+ l_pos = i * internal
|
|
|
+ # 如果超过列表大小需要额外处理
|
|
|
+ if l_pos > key_list_len:
|
|
|
+ l_pos = key_list_len -1
|
|
|
+ # 获取列表中的词根
|
|
|
+ key = key_list[l_pos:l_pos+1]
|
|
|
+ # 根据词根获取位置信息
|
|
|
+ pos = key_reverse_index[key[0]]
|
|
|
+ # 记录位置信息
|
|
|
+ pos_list.append(pos)
|
|
|
+
|
|
|
+
|
|
|
+ # 使用用进程池
|
|
|
+ pool = ProcessPoolExecutor(process_num)
|
|
|
+ # 生成任务
|
|
|
+ process_futures = []
|
|
|
+ for i in range(0, len(pos_list)-1):
|
|
|
+ pos = pos_list[i: i+2]
|
|
|
+ process_futures.append(pool.submit(handle, pos[0], pos[1]))
|
|
|
+
|
|
|
+ # with open(config.KEY_REVERSE_STATISTICS_FILE, "w", encoding=config.ENCODING_CHARSET) as fw:
|
|
|
+ # for future in as_completed(process_futures):
|
|
|
+ # logging.info("部分子任务统计结束,保存至本地 - 开始")
|
|
|
+ # for key, value in future.result().items():
|
|
|
+ # fw.write("%s,%s\n"%(key,value))
|
|
|
+ # logging.info("部分子任务统计结束,保存至本地 - 结束")
|
|
|
+
|
|
|
+
|
|
|
+ results = []
|
|
|
+ for future in as_completed(process_futures):
|
|
|
+ result = future.result()
|
|
|
+ logging.info("进程:%d, 统计结束" % result["pid"])
|
|
|
+ results.append(result)
|
|
|
+
|
|
|
+ logging.info("统计结束,保存至本地 - 开始")
|
|
|
+ with open(config.KEY_REVERSE_STATISTICS_FILE, "w", encoding=config.ENCODING_CHARSET) as fw:
|
|
|
+ for r in results:
|
|
|
+ for key, value in r["statistics"].items():
|
|
|
+ fw.write("%s,%s\n"%(key,value))
|
|
|
+ logging.info("部分子任务统计结束,保存至本地 - 结束")
|
|
|
+
|
|
|
+ pool.shutdown(wait=True)
|
|
|
+
|
|
|
+ tools.log_end_msg(TITLE)
|
|
|
+
|
|
|
+ # 测试代码3
|
|
|
+ # pool = ProcessPoolExecutor(3)
|
|
|
+ # for i in range(1,5):
|
|
|
+ # pool.submit(handle, "测试进程-%d"%i, i, i*10)
|
|
|
+
|
|
|
+ # pool.shutdown(wait=True)
|
|
|
+
|
|
|
+ # 测试代码2
|
|
|
+ # pool = Pool(3)
|
|
|
+ # for i in range(1,5):
|
|
|
+ # pool.apply_async(handle, ("测试进程-%d"%i, i, i*10))
|
|
|
+ # pool.close()
|
|
|
+ # pool.join()
|
|
|
+ # print("结束")
|
|
|
+
|
|
|
+ # 测试代码1
|
|
|
+ # p = Process(target=handle, args=('测试进程', 1, 10))
|
|
|
+ # p.start()
|
|
|
+ # p.join()
|
|
|
+
|
|
|
+ # tools.init_log()
|
|
|
+ # tools.log_start_msg(TITLE)
|
|
|
+
|
|
|
+ # key_reverse_index = tools.load_obj(config.KEY_REVERSE_INDEX_CACHE)
|
|
|
+
|
|
|
+ # tmp = [key for key in key_reverse_index.keys()]
|
|
|
+
|
|
|
+ # l = len(tmp)
|
|
|
+ # print("总长:", l)
|
|
|
+ # internal = math.ceil(l / 4)
|
|
|
+ # print("间隔:", internal)
|
|
|
+ # pos = []
|
|
|
+ # for i in range(5):
|
|
|
+ # t = i*internal
|
|
|
+ # if t > l:
|
|
|
+ # t = l-1
|
|
|
+ # pos.append(t)
|
|
|
+ # print(pos)
|
|
|
+
|
|
|
+ # for item in pos:
|
|
|
+ # key = tmp[item:item+1]
|
|
|
+ # print(key)
|
|
|
+ # pos = key_reverse_index[key[0]]
|
|
|
+ # print(key, pos)
|
|
|
+
|
|
|
+
|
|
|
+ # reverse_statistics = {}
|
|
|
+ # logging.info("统计开始")
|
|
|
+ # with open(config.KEY_REVERSE_FILE, "r", encoding=config.ENCODING_CHARSET) as fr, \
|
|
|
+ # mmap.mmap(fr.fileno(), 0 , access=mmap.ACCESS_READ) as fmmap:
|
|
|
+ # for line in fr:
|
|
|
+ # index=line.index(",")
|
|
|
+ # key = line[:index]
|
|
|
+ # word_root = line[index+1:]
|
|
|
+ # word_root = ast.literal_eval(word_root)
|
|
|
+ # l = len(word_root)
|
|
|
+
|
|
|
+ # reverse_statistics[key]=l
|
|
|
+
|
|
|
+ # logging.info("统计结束,保存至本地")
|
|
|
+ # with open(config.KEY_REVERSE_STATISTICS_FILE, "w", encoding=config.ENCODING_CHARSET) as fw:
|
|
|
+ # for key, value in reverse_statistics:
|
|
|
+ # fw.write("%s,%s\n"%(key,value))
|
|
|
+
|
|
|
+ # tools.log_end_msg(TITLE)
|
|
|
+
|
|
|
+
|
|
|
+def main():
|
|
|
+ tools.init_log()
|
|
|
+ tools.log_start_msg(TITLE)
|
|
|
+
|
|
|
+ reverse_statistics = {}
|
|
|
+ logging.info("统计开始")
|
|
|
+ with open(config.KEY_REVERSE_FILE, "r", encoding=config.ENCODING_CHARSET) as fr, \
|
|
|
+ mmap.mmap(fr.fileno(), 0 , access=mmap.ACCESS_READ) as fmmap:
|
|
|
+ for line in fr:
|
|
|
+ index=line.index(",")
|
|
|
+ key = line[:index]
|
|
|
+ word_root = line[index+1:]
|
|
|
+ word_root = ast.literal_eval(word_root)
|
|
|
+ l = len(word_root)
|
|
|
+
|
|
|
+ reverse_statistics[key]=l
|
|
|
+
|
|
|
+ logging.info("统计结束,保存至本地")
|
|
|
+ with open(config.KEY_REVERSE_STATISTICS_FILE, "w", encoding=config.ENCODING_CHARSET) as fw:
|
|
|
+ for key, value in reverse_statistics:
|
|
|
+ fw.write("%s,%s\n"%(key,value))
|
|
|
+
|
|
|
+ tools.log_end_msg(TITLE)
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ main2()
|