# -*- coding:utf-8 -*- import math import os import re import shutil import threading import time from concurrent.futures import ProcessPoolExecutor, as_completed, ThreadPoolExecutor import jieba import redis from bitarray import bitarray from tqdm import tqdm import utils import logging from constant import FILE_LONG_TAIL_MERGE # 文件:长尾词_合并_分词.txt FILE_LONG_TAIL_MERGE_SPLIT = "长尾词_合并_分词.txt" # 文件:长尾词_合并_聚合.txt FILE_LONG_TAIL_MERGE_AGG = "长尾词_合并_聚合.txt" # 文件夹:历史聚合数据归档文件夹 DIR_AGG_FILE_ARCHIVE = "长尾词_聚合_归档_%s" # 文件:长尾词_合并_分词倒排索引.txt FILE_LONG_TAIL_MERGE_REVERSE_INDEX = "长尾词_合并_倒排索引.txt" # 子文件:长尾词_合并_聚合_%s.txt FILE_LONG_TAIL_MERGE_AGG_PID = "长尾词_合并_聚合_%s_%s.txt" # 缓存前缀:分词词根 CACHE_WORD_STEM = "word:stem" # 缓存前缀:倒排索引 CACHE_WORD_REVERSE_INDEX = "word:reverse_index" # 缓存:长尾词缓存 CACHE_WORD = "word" # 缓存:聚合位图 CACHE_UNUSED_BITMAP = "unused_bitmap" # 字符集:UTF-8 CHARSET_UTF_8 = "UTF-8" # redis缓存池 redis_pool: redis.ConnectionPool = None # 线程池 thread_pool: ThreadPoolExecutor = None # 线程本地变量 local_var = threading.local() def agg_word(file_path: str): """ 长尾词聚合 :param file_path: :return: """ # 总长尾词数量 # word_total_num = 0 word_total_num = 1000000 # 聚合阈值 agg_threshold = 0.8 # 每份任务计算量 task_cal_num = 10000 # 工作现成(减1是为了留一个处理器给redis) worker_num = os.cpu_count() - 1 # worker_num = 1 # 正则表达式:聚合文件分文件 agg_file_pattern = re.compile(r"长尾词_合并_聚合_\d+_\d+.txt", re.I) # 最大线程数 max_threads = 2 # redis最大连接数(和工作线程数保持一致,免得浪费) redis_max_conns = max_threads # redis缓存 m_redis_cache = redis.StrictRedis(host='127.0.0.1', port=6379) # 判断文件是否存在 for file_name in [FILE_LONG_TAIL_MERGE, FILE_LONG_TAIL_MERGE_SPLIT, FILE_LONG_TAIL_MERGE_REVERSE_INDEX]: input_file = os.path.join(file_path, file_name) if os.path.exists(input_file) and not os.path.isfile(input_file): raise Exception("文件不存在!文件路径:" + input_file) # 归档历史数据文件 history_agg_file_list = [file for file in os.listdir(file_path) if agg_file_pattern.match(file)] if len(history_agg_file_list) > 0: archive_path = os.path.join(file_path, DIR_AGG_FILE_ARCHIVE % time.strftime('%Y%m%d%H%M%S')) os.makedirs(archive_path) for history_agg_file in history_agg_file_list: shutil.move(os.path.join(file_path, history_agg_file), archive_path) # 缓存关键词位置 # word_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE) # word_dict = {} # with open(word_file, "r", encoding="utf-8") as f: # for position, word in enumerate(f, start=1): # word = utils.remove_line_break(word) # if not word: # continue # word_dict[position] = word # m_redis_cache.hset(CACHE_WORD, mapping=word_dict) # # 记录总关键词数 # word_total_num = len(word_dict) # # 释放内存 # del word_dict # # # 缓存分词 # word_split_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_SPLIT) # word_split_dict = {} # with open(word_split_file, "r", encoding=CHARSET_UTF_8) as f: # for position, word_split_line in enumerate(f, start=1): # word_split_line = utils.remove_line_break(word_split_line) # if not word_split_line: # continue # word_split_dict[position] = word_split_line # m_redis_cache.hset(CACHE_WORD_STEM, mapping=word_split_dict) # # 释放内存 # del word_split_dict # # # 缓存倒排索引 # word_reverse_index_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_REVERSE_INDEX) # word_reverse_index_dict = {} # # 分词 # key_pattern = re.compile(r"([^,]+),\[", re.I) # # 索引 # index_pattern = re.compile(r"\d+", re.I) # with open(word_reverse_index_file, "r", encoding="utf-8") as f: # for word_split_line in f: # key_m = key_pattern.match(word_split_line) # key = key_m.group(1) # val = index_pattern.findall(word_split_line[word_split_line.index(","):]) # word_reverse_index_dict[key] = ",".join(val) # m_redis_cache.hset(CACHE_WORD_REVERSE_INDEX, mapping=word_reverse_index_dict) # # 释放内存 # del word_reverse_index_dict # 先清除,然后重新构建长尾词使用位图 m_redis_cache.delete(CACHE_UNUSED_BITMAP) m_redis_cache.setbit(CACHE_UNUSED_BITMAP, word_total_num + 2, 1) # 提交任务 并输出结果 word_agg_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_AGG) with ProcessPoolExecutor(max_workers=worker_num, initializer=init_process, initargs=(redis_max_conns, max_threads, file_path)) as process_pool: # 计算任务边界 task_list = utils.avg_split_task(word_total_num, task_cal_num, 1) # 提交任务 process_futures = [] for skip_line, pos in enumerate(task_list, start=1): skip_line = (skip_line % worker_num) + 1 p_future = process_pool.submit(agg_word_process, agg_threshold, pos[0], pos[1], word_total_num, skip_line) process_futures.append(p_future) # 显示任务进度 with tqdm(total=len(process_futures), desc='文本聚合进度', unit='份', unit_scale=True) as pbar: for p_future in as_completed(process_futures): p_result = p_future.result() # 更新发呆进度 pbar.update(1) # 关闭线程 process_pool.shutdown() # 获取子进程结果文件列表,并合并 with open(word_agg_file, "w", encoding="UTF-8") as fo: for file in os.listdir(file_path): # 不是处理结果部分跳过 if not agg_file_pattern.match(file): continue with open(os.path.join(file_path, file), "r", encoding="UTF-8") as fi: for word in fi: fo.write(word) def prepare_word_split_and_reverse_index(file_path: str): """ 预处理:长尾词分词、建立倒排索引 :param file_path: 待处理文件夹路径 :return: """ # 判断文件是否存在 word_input_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE) if os.path.exists(word_input_file) and not os.path.isfile(word_input_file): print("文件不存在! " + word_input_file) return # 总文本数量 total_line_num = 0 with open(word_input_file, "r", encoding="utf-8") as fi: total_line_num = sum(1 for line in fi) if total_line_num == 0: print("没有待处理的数据,文本量为0") return # 分割任务数量 task_list = utils.avg_split_task(total_line_num, math.ceil(total_line_num / os.cpu_count())) # 任务进程处理结果 p_result_list = [] # 多进程处理 with ProcessPoolExecutor(os.cpu_count()) as process_pool: # 提交任务 process_futures = [process_pool.submit(word_split_reverse, word_input_file, task[0], task[1]) for task in task_list] # 处理返回结果 for p_future in as_completed(process_futures): p_result = p_future.result() if p_result: p_result_list.append(p_result) # 分词结果排序 p_result_list = sorted(p_result_list, key=lambda v: v[0]) # 输出分词结果 split_output_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_SPLIT) with open(split_output_file, "w", encoding="UTF-8") as fo: for start_pos, word_arr_list, reverse_index in p_result_list: for word_arr in word_arr_list: fo.write("%s\n" % ",".join([str(i) for i in word_arr])) # 关键词倒排索引 word_reverse_index_dict = dict() # 合并倒排索引 for start_pos, word_arr, reverse_index_dict in p_result_list: for key, value in reverse_index_dict.items(): reverse_index_arr = word_reverse_index_dict.get(key) if reverse_index_arr: reverse_index_arr.extend(value) else: word_reverse_index_dict[key] = value # 输出倒排索引 with open(os.path.join(file_path, FILE_LONG_TAIL_MERGE_REVERSE_INDEX), "w", encoding="UTF-8") as fo: for key, value in word_reverse_index_dict.items(): fo.write("%s,%s\n" % (key, value)) # 关闭进程池 process_pool.shutdown() def word_split_reverse(input_file: str, start_pos: int, end_pos: int): """ 分词和建立倒排索引 :param input_file: 待处理的文件 :param start_pos: 处理的开始位置 :param end_pos: 处理的结束位置 :return: (开始位置,分词结果,倒排索引) """ # 加载停用词 stop_word_dict = utils.load_stop_word() # 关键词存放容器 word_arr_list = [] # 倒排索引 word_reverse_index = dict() with open(input_file, "r", encoding="utf-8") as fr: for i, tmp_word in enumerate(fr): # start_pos是行数,而i要从0开始 if i + 1 < start_pos: continue # 当前位置 cur_pos = i + 1 # 到达任务边界,结束 if cur_pos == end_pos: break # 分词 word_list = jieba.cut_for_search(tmp_word.replace("\n", "")) # 分词过滤结果 word_filter_arr = [] # 过滤停用词 for word in word_list: if word in stop_word_dict: continue word_index_arr = word_reverse_index.get(word) if word_index_arr: word_index_arr.append(cur_pos) else: word_reverse_index[word] = [cur_pos] word_filter_arr.append(word) if len(word_filter_arr) == 0: word_arr_list.append([]) else: word_arr_list.append(word_filter_arr) return start_pos, word_arr_list, word_reverse_index def init_process(max_conns: int, max_threads: int, file_path: str): """ 初始化进程 :param max_conns: redis最大连接数量 :param max_threads: 线程最大数量 :param file_path: 输出文件路径 :return: """ # redis缓存池 初始化 global redis_pool redis_pool = redis.ConnectionPool(host='127.0.0.1', port=6379, max_connections=max_conns) global thread_pool thread_pool = ThreadPoolExecutor(max_threads, initializer=init_thread, initargs=(file_path,)) def agg_word_process(agg_threshold: float, start_pos: int, end_pos: int, final_pos: int, skip_line: int): """ 长尾词聚合处理 :param agg_threshold: 聚合阈值 :param start_pos: 任务处理开始边界(包含) :param end_pos: 任务处理结束边界(不包含) :param final_pos: 总任务边界 :param skip_line: 进度条显示位置 :return: """ # 进度长度 process_len = 0 if end_pos == -1: process_len = final_pos - start_pos else: process_len = end_pos - start_pos with tqdm(total=process_len, desc='子进程-%s:文本聚合进度' % os.getpid(), unit='份', unit_scale=True, position=skip_line) as pbar: thread_futures = [thread_pool.submit(agg_word_thread, main_word_position, agg_threshold) for main_word_position in range(start_pos, end_pos)] for t_future in as_completed(thread_futures): t_result = t_future.result() # 更新发呆进度 pbar.update(1) return def init_thread(file_path: str): """ 聚合线程初始化 :param file_path: 输出文件路径 :return: """ # 初始化redis客户端 local_var.redis_cache = redis.StrictRedis(connection_pool=redis_pool) # 初始化redis管道 local_var.redis_pipeline = local_var.redis_cache.pipeline(transaction=False) # 生成临时结果文件 word_agg_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_AGG_PID % (os.getpid(), threading.current_thread().ident)) local_var.file_writer = open(word_agg_file, "w", encoding=CHARSET_UTF_8) # 已使用位图副本 local_var.unused_bitmap = bitarray() # 从倒排索引中获取候选词的位置索引 local_var.candidate_position_set = set() # 结果列表 local_var.result_list = [] def agg_word_thread(main_word_position: int, agg_threshold: float): try: # 获取已使用位图副本 local_var.unused_bitmap.frombytes(local_var.redis_cache.get(CACHE_UNUSED_BITMAP)) # 判断主词是否为已使用,是则跳过,否则设置为已使用 if local_var.unused_bitmap[main_word_position]: return else: local_var.redis_cache.setbit(CACHE_UNUSED_BITMAP, main_word_position, 1) local_var.unused_bitmap[main_word_position] = 1 # 获取主词和对应的词根 local_var.redis_pipeline.hget(CACHE_WORD, main_word_position) local_var.redis_pipeline.hget(CACHE_WORD_STEM, main_word_position) main_result = local_var.redis_pipeline.execute() main_word = main_result[0] main_word_stem = main_result[1] # 如果存在为空则返回 if not main_word or not main_word_stem: return main_word_stem_list = main_word_stem.decode(CHARSET_UTF_8).split(",") # 从倒排索引获取长尾词位置 temp_candidate_position_list = local_var.redis_cache.hmget(CACHE_WORD_REVERSE_INDEX, main_word_stem_list) for temp_candidate_position in temp_candidate_position_list: if not temp_candidate_position: continue # 排除已聚合 for candidate_position in temp_candidate_position.decode(CHARSET_UTF_8).split(","): if local_var.unused_bitmap[int(candidate_position)]: continue local_var.candidate_position_set.add(candidate_position) # 没有找到需要计算的候选词则跳过 if not local_var.candidate_position_set: return # 从缓存获取关键词列表、分词列表,如果为空则跳过 local_var.redis_pipeline.hmget(CACHE_WORD, local_var.candidate_position_set) local_var.redis_pipeline.hmget(CACHE_WORD_STEM, local_var.candidate_position_set) candidate_result = local_var.redis_pipeline.execute() candidate_word_cache_list = candidate_result[0] candidate_word_stem_cache_list = candidate_result[1] if not candidate_word_cache_list or not candidate_word_stem_cache_list: return # 延后编码成字符,以防前面直接返回 main_word = main_word.decode(CHARSET_UTF_8) # 计算相似度 for candidate_position in range(len(local_var.candidate_position_set)): # 获取关键词、分词,如果存在为空则跳过 candidate_word = candidate_word_cache_list[int(candidate_position)] if not candidate_word: continue candidate_word_stem = candidate_word_stem_cache_list[int(candidate_position)] if not candidate_word_stem: continue candidate_word = candidate_word.decode(CHARSET_UTF_8) candidate_word_stem_list = candidate_word_stem.decode(CHARSET_UTF_8).split(",") # 计算相关性 try: val = utils.cal_cos_sim(main_word, main_word_stem_list, candidate_word, candidate_word_stem_list) if val >= agg_threshold: local_var.redis_cache.setbit(CACHE_UNUSED_BITMAP, candidate_position, 1) local_var.result_list.append(candidate_word) except Exception as e: logging.error("主关键词:%s 发生异常,涉及的副关键词信息-关键词:%s,分词:%s" % ( main_word, candidate_word, candidate_word_stem_list), e) # 保存结果 if not local_var.result_list: return local_var.file_writer.write("%s\n" % main_word) for candidate_word in local_var.result_list: local_var.file_writer.write("%s\n" % candidate_word) local_var.file_writer.write("\n") except Exception as e: logging.error("子进程发生异常", e) finally: # 清除容器数据 local_var.candidate_position_set.clear() local_var.result_list.clear() local_var.unused_bitmap.clear() return