| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- # -*- coding:utf-8 -*-
- from concurrent.futures import ProcessPoolExecutor, as_completed
- import math
- from time import time
- import os
- import config
- import tools
- import re
- import logging
- import mmap
- tools.init_log()
- if __name__ != "__main__":
- # 正则提取
- # 倒排表 索引
- index_re = r"(\d+),"
- index_pattern = re.compile(index_re, re.I)
- # KEY表 词根
- stem_re = r"'([^,]*)'"
- stem_pattern = re.compile(stem_re, re.I)
- def sub_process(start_pos, end_pos):
- """
- 子进程
- """
- pid = os.getpid()
- logging.debug("进程-%d 开始执行任务,开始位置:%d,结束位置:%d" % (pid, start_pos, end_pos))
- # 开始时间
- start_time = time()
- # 倒排表和统计信息容器
- reverse_dict = {}
- with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as f_key, \
- mmap.mmap(f_key.fileno(), 0, access=mmap.ACCESS_READ) as f_mmap:
- # 移动到开始位置
- f_mmap.seek(start_pos)
- while True:
- # 获取当前处理位置
- cur_pos = f_mmap.tell()
- # 越界检查
- if cur_pos >= end_pos:
- break
- # 提取数据
- line = f_mmap.readline().decode(config.ENCODING_CHARSET)
- m = index_pattern.match(line)
- # 获取关键词序号、词根
- index = m.group(1)
- stems = stem_pattern.findall(line)
- # 构建倒排表和统计数据量
- for stem in stems:
- obj = reverse_dict.get(stem)
- if obj:
- obj["count"] = obj["count"] + 1
- obj["indexs"].add(index)
- else:
- tmp_indexs = set()
- tmp_indexs.add(index)
- reverse_dict[stem]= {
- "count": 1,
- "indexs": tmp_indexs
- }
-
- logging.debug("子进程-%d 任务结束,耗时:%f" % (pid, (time() - start_time)))
- return reverse_dict
-
- def main_process():
-
- logging.info("主进程 开始执行初始化")
-
- # 进程处理数
- process_num = 4
- # 关键表索引
- key_index = tools.load_obj(config.KEY_INDEX_CACHE)
- # 开始时间
- start_time = time()
- # 关键词总数
- total_num = len(key_index)
- # 任务分割大小
- split_num = math.ceil(total_num/process_num)
- logging.info("主进程 开始划分子任务")
- tasks = tools.avg_split_task(total_num, split_num)
-
- with ProcessPoolExecutor(process_num) as process_pool:
- logging.info("主进程 提交任务到子进程")
- process_futures = [process_pool.submit(sub_process, key_index[task[0]], key_index[task[1]]) for task in tasks]
- # 移除无效变量 以防占用内存
- del tasks
- del key_index
- # 倒排表和统计信息容器
- reverse_dict = {}
-
- # 进行数据合并
- for p_future in as_completed(process_futures):
- result = p_future.result()
- for key, val_obj in result.items():
- reverse_obj = reverse_dict.get(key)
- if reverse_obj:
- reverse_obj["count"] = reverse_obj["count"] + val_obj["count"]
- reverse_obj["indexs"] = reverse_obj["indexs"] | val_obj["indexs"]
- else:
- reverse_dict[key] = val_obj
-
- # 移除无效变量 以防占用内存
- process_futures.remove(p_future)
-
- logging.info("主进程 已获取全部子进程返回结果,总数据量:%d" % len(reverse_dict))
-
- logging.info("主进程 对词根关联的索引进行排序和转换")
- for val_obj in reverse_dict.values():
- val_obj["indexs"] = list(val_obj["indexs"])
- val_obj["indexs"].sort()
- # 根据关键词数量进行排序,这里通过items()方法转成元组列表,才能进行排序
- logging.info("主进程 根据关键词数量进行排列")
- sorted_reverse_list = sorted(reverse_dict.items(), key=lambda x: x[1]["count"], reverse=True)
- # 保存到本地文件
- logging.info("主进程 保存到本地")
- with open("./data/tmp/reverse_test.csv", "w", encoding=config.ENCODING_CHARSET) as f_reverse, \
- open(config.KEY_REVERSE_STATISTICS_FILE, "w", encoding=config.ENCODING_CHARSET) as f_statistics:
- for key, val_obj in sorted_reverse_list:
- f_reverse.write("%s,%s\n" % (key, val_obj["indexs"]))
- f_statistics.write("%s,%d\n" % (key, val_obj["count"]))
-
- logging.info("主进程 构建倒排索引耗时:%f" % (time() - start_time))
- if __name__ == "__main__":
-
- TITLE = "生成关键词倒排和统计信息"
- tools.log_start_msg(TITLE)
- main_process()
- tools.log_end_msg(TITLE)
-
|