# -*- coding:utf-8 -*- import math import mmap import os import re import time from concurrent.futures import ProcessPoolExecutor, as_completed from bitmap import BitMap import utils import jieba import zipfile import logging # 文件后缀:长尾词.txt FILE_SUFFIX_LONG_TAIL = "_长尾词.txt" # 文件后缀:长尾词_合并.txt FILE_LONG_TAIL_MERGE = "长尾词_合并.txt" # 文件:长尾词_合并_分词.txt FILE_LONG_TAIL_MERGE_SPLIT = "长尾词_合并_分词.txt" # 文件:长尾词_合并_分词倒排索引.txt FILE_LONG_TAIL_MERGE_REVERSE_INDEX = "长尾词_合并_倒排索引.txt" # 文件:长尾词_合并_聚合.txt FILE_LONG_TAIL_MERGE_AGG = "长尾词_合并_聚合.txt" def extract_word_from_5118(file_path: str): """ 从5118关键词压缩文件中提取数据 :param file_path: 待处理文件夹路径 :return: None """ file_list = [] for file in os.listdir(file_path): file_list.append(os.path.join(file_path, file)) for i, file in enumerate(file_list): zfile = zipfile.ZipFile(file) filenames = zfile.namelist() for filename in filenames: # 重新编码文件名为正确形式 real_name = filename.encode('cp437').decode('gbk') # 排除无效文件 if real_name in ['打开乱码如何处理?.txt']: continue # 关键词存放容器 word_container = set() # 读取压缩文件中的文件 with zfile.open(filename) as file_content: lines = file_content.readlines() # 跳过开头两行 for line in lines[2:]: split = line.decode("gbk").split(",") # 只需要第一列的数据 word_container.add(split[0]) output_file_name = real_name[0:real_name.index("--")] output_file_path = os.path.join(file_path, output_file_name + FILE_SUFFIX_LONG_TAIL) with open(output_file_path, "w", encoding="utf-8") as f: for item in word_container: f.write(item) f.write("\n") def merge_word(file_path: str): """ 合并长尾词(带去重) :param file_path: 待处理文件夹路径 :return: None """ # 获取文件列表 file_list = [] for file in os.listdir(file_path): if file.endswith(FILE_SUFFIX_LONG_TAIL): file_list.append(os.path.join(file_path, file)) # 长尾词集合容器 word_set = set() # 读取数据并排重 for i, file in enumerate(file_list): with open(file, "r", encoding="utf-8") as f: for word in f: word_set.add(word.replace("\n", "")) # 保存合并结果 with open(os.path.join(file_path, FILE_LONG_TAIL_MERGE), "w", encoding="utf-8") as f: for item in word_set: f.write(item) f.write("\n") def word_split_statistics(file_path: str): """ 分词统计 :param file_path: 待处理文件夹路径 :return: None """ file_list = [] for file in os.listdir(file_path): file_list.append(os.path.join(file_path, file)) stop_word_dict = utils.load_stop_word() for i, file in enumerate(file_list): if not file.endswith(FILE_SUFFIX_LONG_TAIL): continue # 分词结果容器 key_dict = {} with open(file, "r", encoding="utf-8") as f: for tmp_word in f: # 分词 word_list = jieba.cut_for_search(tmp_word.replace("\n", "")) # 统计 for word in word_list: # 过滤停用词 if word in stop_word_dict: continue if word in key_dict: key_dict[word] = key_dict[word] + 1 else: key_dict[word] = 1 # 根据词频进行倒序排列 sorted_key_list = sorted(key_dict.items(), key=lambda x: x[1], reverse=True) output_file_name = file[file.rindex("\\") + 1:file.index(FILE_SUFFIX_LONG_TAIL)] output_file_path = os.path.join(file_path, output_file_name + "_长尾词_分词统计.csv") with open(output_file_path, "w", encoding="UTF-8") as f: for key, count in sorted_key_list: f.write("%s,%d\n" % (key, count)) def word_split_reverse(input_file: str, start_pos: int, end_pos: int): """ 分词和建立倒排索引 :param input_file: 待处理的文件 :param start_pos: 处理的开始位置 :param end_pos: 处理的结束位置 :return: (开始位置,分词结果,倒排索引) """ # 加载停用词 stop_word_dict = utils.load_stop_word() # 关键词存放容器 word_arr_list = [] # 倒排索引 word_reverse_index = dict() with open(input_file, "r", encoding="utf-8") as fr: for i, tmp_word in enumerate(fr): # start_pos是行数,而i要从0开始 if i + 1 < start_pos: continue # 当前位置 cur_pos = i + 1 # 到达任务边界,结束 if cur_pos == end_pos: break # 分词 word_list = jieba.cut_for_search(tmp_word.replace("\n", "")) # 分词过滤结果 word_filter_arr = [] # 过滤停用词 for word in word_list: if word in stop_word_dict: continue word_index_arr = word_reverse_index.get(word) if word_index_arr: word_index_arr.append(cur_pos) else: word_reverse_index[word] = [cur_pos] word_filter_arr.append(word) if len(word_filter_arr) == 0: word_arr_list.append([]) else: word_arr_list.append(word_filter_arr) return start_pos, word_arr_list, word_reverse_index def prepare_word_split_and_reverse_index(file_path: str): """ 预处理:长尾词分词、建立倒排索引 :param file_path: 待处理文件夹路径 :return: """ # 判断文件是否存在 word_input_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE) if os.path.exists(word_input_file) and not os.path.isfile(word_input_file): print("文件不存在! " + word_input_file) return # 总文本数量 total_line_num = 0 with open(word_input_file, "r", encoding="utf-8") as fi: total_line_num = sum(1 for line in fi) if total_line_num == 0: print("没有待处理的数据,文本量为0") return # 分割任务数量 task_list = utils.avg_split_task(total_line_num, math.ceil(total_line_num / os.cpu_count())) # 任务进程处理结果 p_result_list = [] # 多进程处理 with ProcessPoolExecutor(os.cpu_count()) as process_pool: # 提交任务 process_futures = [process_pool.submit(word_split_reverse, word_input_file, task[0], task[1]) for task in task_list] # 处理返回结果 for p_future in as_completed(process_futures): p_result = p_future.result() if p_result: p_result_list.append(p_result) # 分词结果排序 p_result_list = sorted(p_result_list, key=lambda v: v[0]) # 输出分词结果 split_output_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_SPLIT) with open(split_output_file, "w", encoding="UTF-8") as fo: for start_pos, word_arr_list, reverse_index in p_result_list: for word_arr in word_arr_list: fo.write("%s\n" % ",".join([str(i) for i in word_arr])) # 关键词倒排索引 word_reverse_index_dict = dict() # 合并倒排索引 for start_pos, word_arr, reverse_index_dict in p_result_list: for key, value in reverse_index_dict.items(): reverse_index_arr = word_reverse_index_dict.get(key) if reverse_index_arr: reverse_index_arr.extend(value) else: word_reverse_index_dict[key] = value # 输出倒排索引 with open(os.path.join(file_path, FILE_LONG_TAIL_MERGE_REVERSE_INDEX), "w", encoding="UTF-8") as fo: for key, value in word_reverse_index_dict.items(): fo.write("%s,%s\n" % (key, value)) # 关闭进程池 process_pool.shutdown() def agg_word_cal(word_file: str, word_split_file: str, word_position_list: list, word_split_position_list: list, agg_threshold: float, main_word: str, main_key_list: list, candidate_position_list: list): """ 长尾词聚合计算 :param word_file: 长尾词文件路径 :param word_split_file: 长尾词分词文件路径 :param word_position_list: 长尾词位置索引 :param word_split_position_list 词根位置索引 :param agg_threshold: 聚合阈值 :param main_word: 主词 :param main_key_list: 主词词根 :param candidate_position_list: 候选词位置 :return: """ # 结果容器 result_list = [] with (open(word_file, "r", encoding="UTF-8") as f_key, mmap.mmap(f_key.fileno(), 0, access=mmap.ACCESS_READ) as f_key_mmap, open(word_split_file, "r", encoding="UTF-8") as f_key_split, mmap.mmap(f_key_split.fileno(), 0, access=mmap.ACCESS_READ) as f_key_split_mmap): if not candidate_position_list: logging.info("子进程:候选词列表为空,结束执行") return for candidate_position in candidate_position_list: try: # 获取关键词 word_position = word_position_list[candidate_position] f_key_mmap.seek(word_position) candidate_word = f_key_mmap.readline().decode("UTF-8").replace("\r", "").replace("\n", "") if candidate_word == "小孩在肚子里怎么呼吸": print("子:出现拉拉") # 获取分词结果 key_position = word_split_position_list[candidate_position] f_key_split_mmap.seek(key_position) temp_candidate_word_stem = f_key_split_mmap.readline().decode("UTF-8").replace("\r", "").replace("\n", "") # 为空则跳过 if len(temp_candidate_word_stem) == 0: continue candidate_word_key_list = temp_candidate_word_stem.split(",") # 计算相关性 try: val = utils.cal_cos_sim(main_word, main_key_list, candidate_word, candidate_word_key_list) if val >= agg_threshold: result_list.append((candidate_position, candidate_word)) except Exception as e: logging.error("主关键词:%s 发生异常,涉及的副关键词信息-关键词:%s,分词:%s" % ( main_word, candidate_word, candidate_word_key_list), e) except Exception as e: logging.error("子进程发生异常", e) return result_list def agg_word(file_path: str): """ 长尾词聚合 :param file_path: :return: """ # 判断文件是否存在 for file_name in [FILE_LONG_TAIL_MERGE, FILE_LONG_TAIL_MERGE_SPLIT, FILE_LONG_TAIL_MERGE_REVERSE_INDEX]: input_file = os.path.join(file_path, file_name) if os.path.exists(input_file) and not os.path.isfile(input_file): raise Exception("文件不存在!文件路径:" + input_file) # 总长尾词数量 word_total_num = 0 # 记录关键词位置 word_position_list = [0] word_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE) with (open(word_file, "r", encoding="utf-8") as f, mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as fmmap): while True: # 获取当前位置 cur_pos = fmmap.tell() # 移动到一下行 line = fmmap.readline() # 结束检测 if not line: break # 记录 word_position_list.append(cur_pos) word_total_num = word_total_num + 1 # 记录分词位置 word_split_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_SPLIT) word_split_position_list = [0] with (open(word_split_file, "r", encoding="utf-8") as f, mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as fmmap): while True: # 获取当前位置 cur_pos = fmmap.tell() # 移动到一下行 line = fmmap.readline() # 结束检测 if not line: break # 记录 word_split_position_list.append(cur_pos) # 获取倒排索引 word_reverse_index_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_REVERSE_INDEX) word_reverse_index_dict = {} # 分词 key_re = r"([^,]+),\[" key_pattern = re.compile(key_re, re.I) # 索引 index_pattern = re.compile(r"\d+", re.I) with open(word_reverse_index_file, "r", encoding="utf-8") as f: for line in f: key_m = key_pattern.match(line) key = key_m.group(1) val = index_pattern.findall(line[line.index(","):]) word_reverse_index_dict[key] = [int(v) for v in val] # 已使用长尾词位图 unused_bitmap = BitMap(word_total_num + 1) # 聚合阈值 agg_threshold = 0.8 # 提交子进程阈值 process_threshold = os.cpu_count() * 30 # 提交任务 并输出结果 word_agg_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_AGG) with (ProcessPoolExecutor(max_workers=os.cpu_count()) as process_pool, open(word_agg_file, "w", encoding="UTF-8") as fo, open(word_file, "r", encoding="UTF-8") as f_word, open(word_split_file, "r", encoding="UTF-8") as f_word_split, mmap.mmap(f_word_split.fileno(), 0, access=mmap.ACCESS_READ) as f_split_mmap): # 准备数据 for position_index, main_word in enumerate(f_word, 1): # 判断是否已聚合,否则置为已使用 if unused_bitmap.test(position_index): continue else: unused_bitmap.set(position_index) # 移除换行符 main_word = utils.remove_line_break(main_word) if main_word == "小孩在肚子里怎么呼吸": print("出现拉拉") # 获取分词结果 key_position = word_split_position_list[position_index] f_split_mmap.seek(key_position) temp_main_word_stem = utils.remove_line_break(f_split_mmap.readline().decode("UTF-8")) # 为空则跳过 if len(temp_main_word_stem) == 0: continue main_word_stem_list = temp_main_word_stem.split(",") # 从倒排索引中获取候选词的位置索引 candidate_position_set = set() for main_word_stem in main_word_stem_list: index_list = word_reverse_index_dict.get(main_word_stem) if index_list: candidate_position_set.update(index_list) # 排除已使用的长尾词 candidate_position_list = [] for candidate_position in candidate_position_set: # 跳过已使用 if unused_bitmap.test(candidate_position): continue candidate_position_list.append(candidate_position) # 分割计算任务,没有计算任务则跳过 candidate_position_len = len(candidate_position_list) if candidate_position_len == 0: continue # 暂存分析结果 p_result_list = [] # 任务量不足直接在主进程上进行计算,否则提交子进程计算 if candidate_position_len < process_threshold: p_result = agg_word_cal(word_file, word_split_file, word_position_list, word_split_position_list, agg_threshold, main_word, main_word_stem_list, candidate_position_list) if len(p_result) > 0: p_result_list.extend(p_result) else: task_split_list = utils.avg_split_task(candidate_position_len, math.ceil(candidate_position_len / os.cpu_count())) all_task_list = [candidate_position_list[start_pos:end_pos] for start_pos, end_pos in task_split_list] # 提交任务 process_futures = [] for task_list in all_task_list: if not task_list: continue p_future = process_pool.submit(agg_word_cal, word_file, word_split_file, word_position_list, word_split_position_list, agg_threshold, main_word, main_word_stem_list, task_list) process_futures.append(p_future) for p_future in as_completed(process_futures): p_result = p_future.result() if p_result and len(p_result) > 0: p_result_list.extend(p_result) # 处理分析结果,并标记已处理数据 if len(p_result_list) > 0: fo.write("%s\n" % main_word) for candidate_position, candidate_word in p_result_list: unused_bitmap.set(candidate_position) fo.write("%s\n" % candidate_word) fo.write("\n") # 清除上一轮的数据 p_result_list.clear() # 关闭线程 process_pool.shutdown() if __name__ == "__main__": print("开始时间" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())) # filePath = "../data" filePath = "../data/test" # extract_word_from_5118(filePath) # merge_word(filePath) # prepare_word_split_and_reverse_index(filePath) agg_word(filePath) # word_split_statistics(file_path) # tasks = utils.avg_split_task(100, 12) # 两者计算余弦值等于:0.8 # val = utils.cal_cos_sim("QQ邮箱格式怎么写", ["QQ", "邮箱", "格式", "怎么", "写"], "QQ邮箱格式如何写", # ["QQ", "邮箱", "格式", "如何", "写"]) print("结束时间" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))