|
|
@@ -0,0 +1,427 @@
|
|
|
+# -*- coding:utf-8 -*-
|
|
|
+
|
|
|
+from asyncio import FIRST_COMPLETED
|
|
|
+from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed, wait
|
|
|
+from multiprocessing import Manager
|
|
|
+import multiprocessing
|
|
|
+from queue import Queue
|
|
|
+import sys
|
|
|
+from time import time
|
|
|
+import os
|
|
|
+from unittest import result
|
|
|
+import config
|
|
|
+import tools
|
|
|
+import ast
|
|
|
+import re
|
|
|
+import math
|
|
|
+import stop_word
|
|
|
+import logging
|
|
|
+import mmap
|
|
|
+import threading
|
|
|
+
|
|
|
+TITLE = "生成关键词倒排和统计信息"
|
|
|
+
|
|
|
+def thread_handle(path, start_pos, end_pos):
|
|
|
+ pattern = re.compile(config.KEY_RE_PATTERAN, re.I)
|
|
|
+ t = threading.current_thread()
|
|
|
+ print("线城-%d 开始任务,开始位置:%d,结束位置:%d" % (t.ident, start_pos, end_pos))
|
|
|
+ # 临时容器
|
|
|
+ key_reverse = []
|
|
|
+ with open(path, "r", encoding=config.ENCODING_CHARSET) as fkey, \
|
|
|
+ mmap.mmap(fkey.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
|
|
|
+ # 移动到开始位置
|
|
|
+ fmmap.seek(start_pos)
|
|
|
+
|
|
|
+ while True:
|
|
|
+ # 获取当前处理位置
|
|
|
+ cur_pos = fmmap.tell()
|
|
|
+
|
|
|
+ # 越界检查
|
|
|
+ if cur_pos > end_pos:
|
|
|
+ break
|
|
|
+
|
|
|
+ line = fmmap.readline()
|
|
|
+ key_reverse.append(line)
|
|
|
+ # 暂留
|
|
|
+ # # 读取关键词数据
|
|
|
+ # line = fmmap.readline().decode("UTF-8")
|
|
|
+
|
|
|
+ # # 如果到行尾则结束
|
|
|
+ # if not line:
|
|
|
+ # break
|
|
|
+
|
|
|
+ # # 提取数据
|
|
|
+ # m = pattern.match(line)
|
|
|
+ # # 获取关键词序号、词根
|
|
|
+ # index = m.group(1)
|
|
|
+ # key_root = m.group(3)
|
|
|
+
|
|
|
+ # # 转换成真正的list对象
|
|
|
+ # key_root = ast.literal_eval(key_root)
|
|
|
+
|
|
|
+ # key_reverse.append((index, key_root))
|
|
|
+ # print("线城-%d 结束任务" % t.ident)
|
|
|
+ # return key_reverse
|
|
|
+ return {
|
|
|
+ "tid": t.ident,
|
|
|
+ "key_reverse": key_reverse
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+def process_handle(stop_word_cache, lines):
|
|
|
+ pid = os.getpid()
|
|
|
+ print("进程-%d 接收到数据,开始计算")
|
|
|
+ pattern = re.compile(config.KEY_RE_PATTERAN, re.I)
|
|
|
+
|
|
|
+ key_reverse_dict={}
|
|
|
+
|
|
|
+ for line in lines:
|
|
|
+ line = line.decode("UTF-8")
|
|
|
+ # 提取数据
|
|
|
+ m = pattern.match(line)
|
|
|
+ # 获取关键词序号、词根
|
|
|
+ index = m.group(1)
|
|
|
+ key_root = m.group(3)
|
|
|
+
|
|
|
+ # 转换成真正的list对象
|
|
|
+ for item in ast.literal_eval(key_root):
|
|
|
+ # 排除停用词
|
|
|
+ if item in stop_word_cache:
|
|
|
+ continue
|
|
|
+ # 构建倒排表和统计数据量
|
|
|
+ val = key_reverse_dict.get(item)
|
|
|
+ if val:
|
|
|
+ key_reverse_dict[item]["count"] = key_reverse_dict[item]["count"] + 1
|
|
|
+ key_reverse_dict[item]["indexs"].append(index)
|
|
|
+ else:
|
|
|
+ key_reverse_dict[item]= {
|
|
|
+ "count":1,
|
|
|
+ "indexs":[]
|
|
|
+ }
|
|
|
+ return {
|
|
|
+ "pid": pid,
|
|
|
+ "key_reverse_dict":key_reverse_dict
|
|
|
+ }
|
|
|
+
|
|
|
+def multi_thread():
|
|
|
+ """
|
|
|
+ 构建待排表(多线程)
|
|
|
+ """
|
|
|
+
|
|
|
+ key_reverse_dict = {}
|
|
|
+
|
|
|
+ def process_handle_result(future):
|
|
|
+ print("进程-%d 执行结束,返回部分数据 合并数据" % result["pid"])
|
|
|
+ result = future.result()
|
|
|
+ for key,value in result["key_reverse_dict"].items():
|
|
|
+ val = key_reverse_dict.get(key)
|
|
|
+ if val:
|
|
|
+ key_reverse_dict[key]["count"] = key_reverse_dict[key]["count"] + value["count"]
|
|
|
+ key_reverse_dict[key]["indexs"].append(value["indexs"])
|
|
|
+ else:
|
|
|
+ key_reverse_dict[key]= value
|
|
|
+ print("进程-%d 合并数据结束" % result["pid"])
|
|
|
+
|
|
|
+
|
|
|
+ def thread_handle_result(future):
|
|
|
+ result = future.result()
|
|
|
+ logging.info("线程-%d 执行结束,返回部分数据,提交至计算进程" % result["tid"])
|
|
|
+ process_future = process_pool.submit(process_handle, stop_word_cache, result["key_reverse"])
|
|
|
+ process_future.add_done_callback(process_handle_result)
|
|
|
+
|
|
|
+ tools.init_log()
|
|
|
+ tools.log_start_msg(TITLE)
|
|
|
+
|
|
|
+ logging.info("目前执行的是多线程版")
|
|
|
+
|
|
|
+ logging.info("开始执行初始化")
|
|
|
+ # 提取规则
|
|
|
+ pattern = re.compile(config.KEY_RE_PATTERAN, re.I)
|
|
|
+ # 停用表
|
|
|
+ stop_word_cache = stop_word.load_stop_word()
|
|
|
+ # 进程处理数
|
|
|
+ thread_num = 1
|
|
|
+ process_num = 2
|
|
|
+ worker_num = 100
|
|
|
+ # 关键表索引
|
|
|
+ key_index_cache = tools.load_obj(config.KEY_INDEX_CACHE)
|
|
|
+
|
|
|
+ logging.info("开始对数据进行分段计算")
|
|
|
+
|
|
|
+ # 对索引文件中的元素进行平分
|
|
|
+ # 转成列表,计算总长 和 平分后的处理区间
|
|
|
+ key_index_list = [key for key in key_index_cache.keys()]
|
|
|
+ total_len = len(key_index_list)
|
|
|
+ internal = math.ceil(total_len / worker_num )
|
|
|
+
|
|
|
+ # 利用 缓存索引文件 生成处理区间的位置信息
|
|
|
+ # 位置信息容器
|
|
|
+ pos_list = []
|
|
|
+ for i in range(worker_num + 1):
|
|
|
+ # 计算平分点在列表中的位置
|
|
|
+ l_pos = i * internal
|
|
|
+ # 如果大于等于列表大小需要额外处理
|
|
|
+ if l_pos >= total_len:
|
|
|
+ l_pos = total_len -1
|
|
|
+ # 获取列表中的词根
|
|
|
+ key_index = key_index_list[l_pos:l_pos+1]
|
|
|
+ # 根据词根获取位置信息
|
|
|
+ pos = key_index_cache[key_index[0]]
|
|
|
+ # 记录位置信息
|
|
|
+ pos_list.append(pos)
|
|
|
+
|
|
|
+ logging.info("把分段结果提交至多线程执行")
|
|
|
+ # 生成任务
|
|
|
+ with ThreadPoolExecutor(thread_num) as thread_pool, \
|
|
|
+ ProcessPoolExecutor(process_num) as process_pool:
|
|
|
+
|
|
|
+ # thread_futures = []
|
|
|
+
|
|
|
+ for i in range(0, len(pos_list)-1):
|
|
|
+ pos = pos_list[i: i+2]
|
|
|
+ # thread_futures.append()
|
|
|
+ thread_future = thread_pool.submit(thread_handle, config.KEY_FILE, pos[0], pos[1])
|
|
|
+ thread_future.add_done_callback(thread_handle_result)
|
|
|
+
|
|
|
+ # 等待数据返回
|
|
|
+ logging.info("等待多线程执行结束")
|
|
|
+ thread_pool.shutdown(wait=True)
|
|
|
+ process_pool.shutdown(wait=True)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ logging.info("已获取全部子进程返回部分结果,总数据量:%d" % len(key_reverse_dict))
|
|
|
+ return
|
|
|
+ # 根据关键词数量进行排序,这里通过items()方法转成元组列表,才能进行排序
|
|
|
+ logging.info("根据关键词数量进行倒序排列")
|
|
|
+ sorted_reverse_list = sorted(key_reverse_dict.items(), key=lambda x: x[1]["count"], reverse=True)
|
|
|
+
|
|
|
+ # 保存到本地文件
|
|
|
+ logging.info("保存到本地")
|
|
|
+ with open(config.KEY_REVERSE_FILE, "w", encoding=config.ENCODING_CHARSET) as f:
|
|
|
+ for key, value in sorted_reverse_list:
|
|
|
+ f.write("%s,%d,%s\n" % (key, value["count"], value["indexs"]))
|
|
|
+
|
|
|
+ tools.log_end_msg(TITLE)
|
|
|
+
|
|
|
+def handle(path, start_pos, end_pos, share_pattern, stop_word_cache):
|
|
|
+ pid = os.getpid()
|
|
|
+ print("进程-%d 开始任务,开始位置:%d,结束位置:%d" % (pid, start_pos, end_pos))
|
|
|
+ # 临时容器
|
|
|
+ key_reverse = {}
|
|
|
+ # 提取数据用的正则
|
|
|
+ pattern = share_pattern.value
|
|
|
+ with open(path, "r", encoding=config.ENCODING_CHARSET) as fkey, \
|
|
|
+ mmap.mmap(fkey.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
|
|
|
+ # 移动到开始位置
|
|
|
+ fmmap.seek(start_pos)
|
|
|
+
|
|
|
+ while True:
|
|
|
+ # 获取当前处理位置
|
|
|
+ cur_pos = fmmap.tell()
|
|
|
+
|
|
|
+ # 越界检查
|
|
|
+ if cur_pos > end_pos:
|
|
|
+ break
|
|
|
+
|
|
|
+ # 读取关键词数据
|
|
|
+ line = fmmap.readline().decode("UTF-8")
|
|
|
+
|
|
|
+ # 如果到行尾则结束
|
|
|
+ if not line:
|
|
|
+ break
|
|
|
+
|
|
|
+ # 提取数据
|
|
|
+ m = pattern.match(line)
|
|
|
+ # 获取关键词序号、词根
|
|
|
+ index = m.group(1)
|
|
|
+ key_root = m.group(3)
|
|
|
+
|
|
|
+ # 转换成真正的list对象
|
|
|
+ for item in ast.literal_eval(key_root):
|
|
|
+ # 排除停用词
|
|
|
+ if item in stop_word_cache:
|
|
|
+ continue
|
|
|
+ # 构建倒排表和统计数据量
|
|
|
+ val = key_reverse.get(item)
|
|
|
+ if val:
|
|
|
+ count = key_reverse[item]["count"]
|
|
|
+ key_reverse[item]["count"] = count + 1
|
|
|
+ key_reverse[item]["indexs"].append(index)
|
|
|
+ else:
|
|
|
+ key_reverse[item]= {
|
|
|
+ "count":1,
|
|
|
+ "indexs":[]
|
|
|
+ }
|
|
|
+
|
|
|
+ print("进程-%d 任务执行结束" % pid)
|
|
|
+
|
|
|
+ return key_reverse
|
|
|
+
|
|
|
+def multi_process():
|
|
|
+ """
|
|
|
+ 构建待排表(多进程)
|
|
|
+ """
|
|
|
+
|
|
|
+ tools.init_log()
|
|
|
+ tools.log_start_msg(TITLE)
|
|
|
+
|
|
|
+ logging.info("目前执行的是多进程版")
|
|
|
+
|
|
|
+ logging.info("开始执行初始化")
|
|
|
+ manager = Manager()
|
|
|
+ # 提取规则
|
|
|
+ share_pattern = manager.Value("pattern", re.compile(config.KEY_RE_PATTERAN, re.I))
|
|
|
+ # 停用表
|
|
|
+ stop_word_cache = manager.dict(stop_word.load_stop_word())
|
|
|
+ # 进程处理数
|
|
|
+ process_num = os.cpu_count() - 1
|
|
|
+ worker_num = process_num
|
|
|
+ pool = ProcessPoolExecutor(max_workers=process_num)
|
|
|
+ # 关键表索引
|
|
|
+ key_index_cache = tools.load_obj(config.KEY_INDEX_CACHE)
|
|
|
+
|
|
|
+ logging.info("开始对数据进行分段计算")
|
|
|
+
|
|
|
+ # 对索引文件中的元素进行平分
|
|
|
+ # 转成列表,计算总长 和 平分后的处理区间
|
|
|
+ key_index_list = [key for key in key_index_cache.keys()]
|
|
|
+ total_len = len(key_index_list)
|
|
|
+ internal = math.ceil(total_len / worker_num )
|
|
|
+
|
|
|
+ # 利用 缓存索引文件 生成处理区间的位置信息
|
|
|
+ # 位置信息容器
|
|
|
+ pos_list = []
|
|
|
+ for i in range(worker_num + 1):
|
|
|
+ # 计算平分点在列表中的位置
|
|
|
+ l_pos = i * internal
|
|
|
+ # 如果大于等于列表大小需要额外处理
|
|
|
+ if l_pos >= total_len:
|
|
|
+ l_pos = total_len -1
|
|
|
+ # 获取列表中的词根
|
|
|
+ key_index = key_index_list[l_pos:l_pos+1]
|
|
|
+ # 根据词根获取位置信息
|
|
|
+ pos = key_index_cache[key_index[0]]
|
|
|
+ # 记录位置信息
|
|
|
+ pos_list.append(pos)
|
|
|
+
|
|
|
+ logging.info("把分段结果提交至多进程执行")
|
|
|
+ # 生成任务
|
|
|
+ process_futures = []
|
|
|
+ for i in range(0, len(pos_list)-1):
|
|
|
+ pos = pos_list[i: i+2]
|
|
|
+ process_futures.append(pool.submit(handle, config.KEY_FILE, pos[0], pos[1], share_pattern, stop_word_cache))
|
|
|
+
|
|
|
+ # 等待数据返回
|
|
|
+ logging.info("等待多进程执行结束")
|
|
|
+ wait(process_futures,return_when=FIRST_COMPLETED)
|
|
|
+ key_reverse_dict = {}
|
|
|
+ for future in as_completed(process_futures):
|
|
|
+ logging.info("有子进程执行结束,返回部分结果")
|
|
|
+ result = future.result()
|
|
|
+ for key, value in result.items():
|
|
|
+ # 进行数据合并
|
|
|
+ val_dict = key_reverse_dict.get(key)
|
|
|
+ if val_dict:
|
|
|
+ count = val_dict["count"]
|
|
|
+ key_reverse_dict[key]["count"] = count + value["count"]
|
|
|
+ key_reverse_dict[key]["indexs"].extend(value["indexs"])
|
|
|
+ else:
|
|
|
+ key_reverse_dict[key] = value
|
|
|
+
|
|
|
+ logging.info("已获取全部子进程返回部分结果")
|
|
|
+
|
|
|
+ # 根据关键词数量进行排序,这里通过items()方法转成元组列表,才能进行排序
|
|
|
+ logging.info("根据关键词数量进行倒序排列")
|
|
|
+ sorted_reverse_list = sorted(key_reverse_dict.items(), key=lambda x: x[1]["count"], reverse=True)
|
|
|
+
|
|
|
+ # 保存到本地文件
|
|
|
+ logging.info("保存到本地")
|
|
|
+ with open(config.KEY_REVERSE_FILE, "w", encoding=config.ENCODING_CHARSET) as f:
|
|
|
+ for key, value in sorted_reverse_list:
|
|
|
+ f.write("%s,%d,%s\n" % (key, value["count"], value["indexs"]))
|
|
|
+
|
|
|
+ tools.log_end_msg(TITLE)
|
|
|
+
|
|
|
+def one_process():
|
|
|
+ """
|
|
|
+ 构建倒排表(单进程版)
|
|
|
+ """
|
|
|
+
|
|
|
+ tools.init_log()
|
|
|
+ tools.log_start_msg(TITLE)
|
|
|
+
|
|
|
+ logging.info("目前执行的是单进程版")
|
|
|
+
|
|
|
+ # 提取规则
|
|
|
+ pattern = re.compile(config.KEY_RE_PATTERAN, re.I)
|
|
|
+
|
|
|
+ # 倒排表 容器
|
|
|
+ key_reverse = {}
|
|
|
+
|
|
|
+ # 停用表
|
|
|
+ stop_word_cache = stop_word.load_stop_word()
|
|
|
+
|
|
|
+ with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as fkey:
|
|
|
+
|
|
|
+ # 获取文件总大小,获取后需要复原光标位置
|
|
|
+ fkey.seek(0, os.SEEK_END)
|
|
|
+ total_num = fkey.tell()
|
|
|
+ fkey.seek(0)
|
|
|
+
|
|
|
+ while True:
|
|
|
+ # 获取当前处理位置
|
|
|
+ cur_pos = fkey.tell()
|
|
|
+
|
|
|
+ # 进度提示
|
|
|
+ tools.tip_in_size(total_num, cur_pos)
|
|
|
+
|
|
|
+ # 读取关键词数据
|
|
|
+ line = fkey.readline()
|
|
|
+
|
|
|
+ # 如果到行尾则结束
|
|
|
+ if not line:
|
|
|
+ break
|
|
|
+
|
|
|
+ # 提取数据
|
|
|
+ m = pattern.match(line)
|
|
|
+ # 获取关键词序号
|
|
|
+ index = m.group(1)
|
|
|
+ # 获取词根
|
|
|
+ key_root = m.group(3)
|
|
|
+ # 转换成真正的list对象
|
|
|
+ for item in ast.literal_eval(key_root):
|
|
|
+
|
|
|
+ # 排除停用词
|
|
|
+ if item in stop_word_cache:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 构建倒排表
|
|
|
+ val = key_reverse.get(item)
|
|
|
+ if val:
|
|
|
+ count = key_reverse[item]["count"]
|
|
|
+ key_reverse[item]["count"] = count + 1
|
|
|
+ key_reverse[item]["indexs"].append(index)
|
|
|
+ else:
|
|
|
+ key_reverse[item]= {
|
|
|
+ "count":1,
|
|
|
+ "indexs":[]
|
|
|
+ }
|
|
|
+
|
|
|
+ logging.info("根据关键词数量进行倒序排列")
|
|
|
+ sorted_reverse_list = sorted(key_reverse.items(), key=lambda x: x[1]["count"], reverse=True)
|
|
|
+
|
|
|
+ # 保存到本地文件
|
|
|
+ logging.info("保存到本地")
|
|
|
+ with open(config.KEY_REVERSE_FILE, "w", encoding=config.ENCODING_CHARSET) as f:
|
|
|
+ for key, value in sorted_reverse_list:
|
|
|
+ f.write("%s,%d,%s\n" % (key, value["count"], value["indexs"]))
|
|
|
+
|
|
|
+ tools.log_end_msg(TITLE)
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ multi_thread()
|