| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427 |
- # -*- coding:utf-8 -*-
- from asyncio import FIRST_COMPLETED
- from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed, wait
- from multiprocessing import Manager
- import multiprocessing
- from queue import Queue
- import sys
- from time import time
- import os
- from unittest import result
- import config
- import tools
- import ast
- import re
- import math
- import stop_word
- import logging
- import mmap
- import threading
- TITLE = "生成关键词倒排和统计信息"
- def thread_handle(path, start_pos, end_pos):
- pattern = re.compile(config.KEY_RE_PATTERAN, re.I)
- t = threading.current_thread()
- print("线城-%d 开始任务,开始位置:%d,结束位置:%d" % (t.ident, start_pos, end_pos))
- # 临时容器
- key_reverse = []
- with open(path, "r", encoding=config.ENCODING_CHARSET) as fkey, \
- mmap.mmap(fkey.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
- # 移动到开始位置
- fmmap.seek(start_pos)
- while True:
- # 获取当前处理位置
- cur_pos = fmmap.tell()
- # 越界检查
- if cur_pos > end_pos:
- break
- line = fmmap.readline()
- key_reverse.append(line)
- # 暂留
- # # 读取关键词数据
- # line = fmmap.readline().decode("UTF-8")
- # # 如果到行尾则结束
- # if not line:
- # break
- # # 提取数据
- # m = pattern.match(line)
- # # 获取关键词序号、词根
- # index = m.group(1)
- # key_root = m.group(3)
- # # 转换成真正的list对象
- # key_root = ast.literal_eval(key_root)
- # key_reverse.append((index, key_root))
- # print("线城-%d 结束任务" % t.ident)
- # return key_reverse
- return {
- "tid": t.ident,
- "key_reverse": key_reverse
- }
- def process_handle(stop_word_cache, lines):
- pid = os.getpid()
- print("进程-%d 接收到数据,开始计算")
- pattern = re.compile(config.KEY_RE_PATTERAN, re.I)
- key_reverse_dict={}
- for line in lines:
- line = line.decode("UTF-8")
- # 提取数据
- m = pattern.match(line)
- # 获取关键词序号、词根
- index = m.group(1)
- key_root = m.group(3)
- # 转换成真正的list对象
- for item in ast.literal_eval(key_root):
- # 排除停用词
- if item in stop_word_cache:
- continue
- # 构建倒排表和统计数据量
- val = key_reverse_dict.get(item)
- if val:
- key_reverse_dict[item]["count"] = key_reverse_dict[item]["count"] + 1
- key_reverse_dict[item]["indexs"].append(index)
- else:
- key_reverse_dict[item]= {
- "count":1,
- "indexs":[]
- }
- return {
- "pid": pid,
- "key_reverse_dict":key_reverse_dict
- }
- def multi_thread():
- """
- 构建待排表(多线程)
- """
- key_reverse_dict = {}
- def process_handle_result(future):
- print("进程-%d 执行结束,返回部分数据 合并数据" % result["pid"])
- result = future.result()
- for key,value in result["key_reverse_dict"].items():
- val = key_reverse_dict.get(key)
- if val:
- key_reverse_dict[key]["count"] = key_reverse_dict[key]["count"] + value["count"]
- key_reverse_dict[key]["indexs"].append(value["indexs"])
- else:
- key_reverse_dict[key]= value
- print("进程-%d 合并数据结束" % result["pid"])
-
- def thread_handle_result(future):
- result = future.result()
- logging.info("线程-%d 执行结束,返回部分数据,提交至计算进程" % result["tid"])
- process_future = process_pool.submit(process_handle, stop_word_cache, result["key_reverse"])
- process_future.add_done_callback(process_handle_result)
- tools.init_log()
- tools.log_start_msg(TITLE)
- logging.info("目前执行的是多线程版")
- logging.info("开始执行初始化")
- # 提取规则
- pattern = re.compile(config.KEY_RE_PATTERAN, re.I)
- # 停用表
- stop_word_cache = stop_word.load_stop_word()
- # 进程处理数
- thread_num = 1
- process_num = 2
- worker_num = 100
- # 关键表索引
- key_index_cache = tools.load_obj(config.KEY_INDEX_CACHE)
- logging.info("开始对数据进行分段计算")
- # 对索引文件中的元素进行平分
- # 转成列表,计算总长 和 平分后的处理区间
- key_index_list = [key for key in key_index_cache.keys()]
- total_len = len(key_index_list)
- internal = math.ceil(total_len / worker_num )
- # 利用 缓存索引文件 生成处理区间的位置信息
- # 位置信息容器
- pos_list = []
- for i in range(worker_num + 1):
- # 计算平分点在列表中的位置
- l_pos = i * internal
- # 如果大于等于列表大小需要额外处理
- if l_pos >= total_len:
- l_pos = total_len -1
- # 获取列表中的词根
- key_index = key_index_list[l_pos:l_pos+1]
- # 根据词根获取位置信息
- pos = key_index_cache[key_index[0]]
- # 记录位置信息
- pos_list.append(pos)
-
- logging.info("把分段结果提交至多线程执行")
- # 生成任务
- with ThreadPoolExecutor(thread_num) as thread_pool, \
- ProcessPoolExecutor(process_num) as process_pool:
- # thread_futures = []
- for i in range(0, len(pos_list)-1):
- pos = pos_list[i: i+2]
- # thread_futures.append()
- thread_future = thread_pool.submit(thread_handle, config.KEY_FILE, pos[0], pos[1])
- thread_future.add_done_callback(thread_handle_result)
- # 等待数据返回
- logging.info("等待多线程执行结束")
- thread_pool.shutdown(wait=True)
- process_pool.shutdown(wait=True)
-
-
-
-
- logging.info("已获取全部子进程返回部分结果,总数据量:%d" % len(key_reverse_dict))
- return
- # 根据关键词数量进行排序,这里通过items()方法转成元组列表,才能进行排序
- logging.info("根据关键词数量进行倒序排列")
- sorted_reverse_list = sorted(key_reverse_dict.items(), key=lambda x: x[1]["count"], reverse=True)
- # 保存到本地文件
- logging.info("保存到本地")
- with open(config.KEY_REVERSE_FILE, "w", encoding=config.ENCODING_CHARSET) as f:
- for key, value in sorted_reverse_list:
- f.write("%s,%d,%s\n" % (key, value["count"], value["indexs"]))
- tools.log_end_msg(TITLE)
- def handle(path, start_pos, end_pos, share_pattern, stop_word_cache):
- pid = os.getpid()
- print("进程-%d 开始任务,开始位置:%d,结束位置:%d" % (pid, start_pos, end_pos))
- # 临时容器
- key_reverse = {}
- # 提取数据用的正则
- pattern = share_pattern.value
- with open(path, "r", encoding=config.ENCODING_CHARSET) as fkey, \
- mmap.mmap(fkey.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
- # 移动到开始位置
- fmmap.seek(start_pos)
- while True:
- # 获取当前处理位置
- cur_pos = fmmap.tell()
- # 越界检查
- if cur_pos > end_pos:
- break
- # 读取关键词数据
- line = fmmap.readline().decode("UTF-8")
- # 如果到行尾则结束
- if not line:
- break
- # 提取数据
- m = pattern.match(line)
- # 获取关键词序号、词根
- index = m.group(1)
- key_root = m.group(3)
- # 转换成真正的list对象
- for item in ast.literal_eval(key_root):
- # 排除停用词
- if item in stop_word_cache:
- continue
- # 构建倒排表和统计数据量
- val = key_reverse.get(item)
- if val:
- count = key_reverse[item]["count"]
- key_reverse[item]["count"] = count + 1
- key_reverse[item]["indexs"].append(index)
- else:
- key_reverse[item]= {
- "count":1,
- "indexs":[]
- }
-
- print("进程-%d 任务执行结束" % pid)
- return key_reverse
- def multi_process():
- """
- 构建待排表(多进程)
- """
- tools.init_log()
- tools.log_start_msg(TITLE)
- logging.info("目前执行的是多进程版")
- logging.info("开始执行初始化")
- manager = Manager()
- # 提取规则
- share_pattern = manager.Value("pattern", re.compile(config.KEY_RE_PATTERAN, re.I))
- # 停用表
- stop_word_cache = manager.dict(stop_word.load_stop_word())
- # 进程处理数
- process_num = os.cpu_count() - 1
- worker_num = process_num
- pool = ProcessPoolExecutor(max_workers=process_num)
- # 关键表索引
- key_index_cache = tools.load_obj(config.KEY_INDEX_CACHE)
- logging.info("开始对数据进行分段计算")
- # 对索引文件中的元素进行平分
- # 转成列表,计算总长 和 平分后的处理区间
- key_index_list = [key for key in key_index_cache.keys()]
- total_len = len(key_index_list)
- internal = math.ceil(total_len / worker_num )
- # 利用 缓存索引文件 生成处理区间的位置信息
- # 位置信息容器
- pos_list = []
- for i in range(worker_num + 1):
- # 计算平分点在列表中的位置
- l_pos = i * internal
- # 如果大于等于列表大小需要额外处理
- if l_pos >= total_len:
- l_pos = total_len -1
- # 获取列表中的词根
- key_index = key_index_list[l_pos:l_pos+1]
- # 根据词根获取位置信息
- pos = key_index_cache[key_index[0]]
- # 记录位置信息
- pos_list.append(pos)
-
- logging.info("把分段结果提交至多进程执行")
- # 生成任务
- process_futures = []
- for i in range(0, len(pos_list)-1):
- pos = pos_list[i: i+2]
- process_futures.append(pool.submit(handle, config.KEY_FILE, pos[0], pos[1], share_pattern, stop_word_cache))
- # 等待数据返回
- logging.info("等待多进程执行结束")
- wait(process_futures,return_when=FIRST_COMPLETED)
- key_reverse_dict = {}
- for future in as_completed(process_futures):
- logging.info("有子进程执行结束,返回部分结果")
- result = future.result()
- for key, value in result.items():
- # 进行数据合并
- val_dict = key_reverse_dict.get(key)
- if val_dict:
- count = val_dict["count"]
- key_reverse_dict[key]["count"] = count + value["count"]
- key_reverse_dict[key]["indexs"].extend(value["indexs"])
- else:
- key_reverse_dict[key] = value
- logging.info("已获取全部子进程返回部分结果")
-
- # 根据关键词数量进行排序,这里通过items()方法转成元组列表,才能进行排序
- logging.info("根据关键词数量进行倒序排列")
- sorted_reverse_list = sorted(key_reverse_dict.items(), key=lambda x: x[1]["count"], reverse=True)
- # 保存到本地文件
- logging.info("保存到本地")
- with open(config.KEY_REVERSE_FILE, "w", encoding=config.ENCODING_CHARSET) as f:
- for key, value in sorted_reverse_list:
- f.write("%s,%d,%s\n" % (key, value["count"], value["indexs"]))
- tools.log_end_msg(TITLE)
- def one_process():
- """
- 构建倒排表(单进程版)
- """
- tools.init_log()
- tools.log_start_msg(TITLE)
- logging.info("目前执行的是单进程版")
- # 提取规则
- pattern = re.compile(config.KEY_RE_PATTERAN, re.I)
- # 倒排表 容器
- key_reverse = {}
- # 停用表
- stop_word_cache = stop_word.load_stop_word()
- with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as fkey:
-
- # 获取文件总大小,获取后需要复原光标位置
- fkey.seek(0, os.SEEK_END)
- total_num = fkey.tell()
- fkey.seek(0)
- while True:
- # 获取当前处理位置
- cur_pos = fkey.tell()
-
- # 进度提示
- tools.tip_in_size(total_num, cur_pos)
- # 读取关键词数据
- line = fkey.readline()
- # 如果到行尾则结束
- if not line:
- break
- # 提取数据
- m = pattern.match(line)
- # 获取关键词序号
- index = m.group(1)
- # 获取词根
- key_root = m.group(3)
- # 转换成真正的list对象
- for item in ast.literal_eval(key_root):
-
- # 排除停用词
- if item in stop_word_cache:
- continue
- # 构建倒排表
- val = key_reverse.get(item)
- if val:
- count = key_reverse[item]["count"]
- key_reverse[item]["count"] = count + 1
- key_reverse[item]["indexs"].append(index)
- else:
- key_reverse[item]= {
- "count":1,
- "indexs":[]
- }
- logging.info("根据关键词数量进行倒序排列")
- sorted_reverse_list = sorted(key_reverse.items(), key=lambda x: x[1]["count"], reverse=True)
- # 保存到本地文件
- logging.info("保存到本地")
- with open(config.KEY_REVERSE_FILE, "w", encoding=config.ENCODING_CHARSET) as f:
- for key, value in sorted_reverse_list:
- f.write("%s,%d,%s\n" % (key, value["count"], value["indexs"]))
- tools.log_end_msg(TITLE)
- if __name__ == "__main__":
- multi_thread()
|