| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505 |
- # -*- coding:utf-8 -*-
- import math
- import mmap
- import os
- import re
- import time
- from concurrent.futures import ProcessPoolExecutor, as_completed
- from bitmap import BitMap
- import utils
- import jieba
- import zipfile
- import logging
- # 文件后缀:长尾词.txt
- FILE_SUFFIX_LONG_TAIL = "_长尾词.txt"
- # 文件后缀:长尾词_合并.txt
- FILE_LONG_TAIL_MERGE = "长尾词_合并.txt"
- # 文件:长尾词_合并_分词.txt
- FILE_LONG_TAIL_MERGE_SPLIT = "长尾词_合并_分词.txt"
- # 文件:长尾词_合并_分词倒排索引.txt
- FILE_LONG_TAIL_MERGE_REVERSE_INDEX = "长尾词_合并_倒排索引.txt"
- # 文件:长尾词_合并_聚合.txt
- FILE_LONG_TAIL_MERGE_AGG = "长尾词_合并_聚合.txt"
- def extract_word_from_5118(file_path: str):
- """
- 从5118关键词压缩文件中提取数据
- :param file_path: 待处理文件夹路径
- :return: None
- """
- file_list = []
- for file in os.listdir(file_path):
- file_list.append(os.path.join(file_path, file))
- for i, file in enumerate(file_list):
- zfile = zipfile.ZipFile(file)
- filenames = zfile.namelist()
- for filename in filenames:
- # 重新编码文件名为正确形式
- real_name = filename.encode('cp437').decode('gbk')
- # 排除无效文件
- if real_name in ['打开乱码如何处理?.txt']:
- continue
- # 关键词存放容器
- word_container = set()
- # 读取压缩文件中的文件
- with zfile.open(filename) as file_content:
- lines = file_content.readlines()
- # 跳过开头两行
- for line in lines[2:]:
- split = line.decode("gbk").split(",")
- # 只需要第一列的数据
- word_container.add(split[0])
- output_file_name = real_name[0:real_name.index("--")]
- output_file_path = os.path.join(file_path, output_file_name + FILE_SUFFIX_LONG_TAIL)
- with open(output_file_path, "w", encoding="utf-8") as f:
- for item in word_container:
- f.write(item)
- f.write("\n")
- def merge_word(file_path: str):
- """
- 合并长尾词(带去重)
- :param file_path: 待处理文件夹路径
- :return: None
- """
- # 获取文件列表
- file_list = []
- for file in os.listdir(file_path):
- if file.endswith(FILE_SUFFIX_LONG_TAIL):
- file_list.append(os.path.join(file_path, file))
- # 长尾词集合容器
- word_set = set()
- # 读取数据并排重
- for i, file in enumerate(file_list):
- with open(file, "r", encoding="utf-8") as f:
- for word in f:
- word_set.add(word.replace("\n", ""))
- # 保存合并结果
- with open(os.path.join(file_path, FILE_LONG_TAIL_MERGE), "w", encoding="utf-8") as f:
- for item in word_set:
- f.write(item)
- f.write("\n")
- def word_split_statistics(file_path: str):
- """
- 分词统计
- :param file_path: 待处理文件夹路径
- :return: None
- """
- file_list = []
- for file in os.listdir(file_path):
- file_list.append(os.path.join(file_path, file))
- stop_word_dict = utils.load_stop_word()
- for i, file in enumerate(file_list):
- if not file.endswith(FILE_SUFFIX_LONG_TAIL):
- continue
- # 分词结果容器
- key_dict = {}
- with open(file, "r", encoding="utf-8") as f:
- for tmp_word in f:
- # 分词
- word_list = jieba.cut_for_search(tmp_word.replace("\n", ""))
- # 统计
- for word in word_list:
- # 过滤停用词
- if word in stop_word_dict:
- continue
- if word in key_dict:
- key_dict[word] = key_dict[word] + 1
- else:
- key_dict[word] = 1
- # 根据词频进行倒序排列
- sorted_key_list = sorted(key_dict.items(), key=lambda x: x[1], reverse=True)
- output_file_name = file[file.rindex("\\") + 1:file.index(FILE_SUFFIX_LONG_TAIL)]
- output_file_path = os.path.join(file_path, output_file_name + "_长尾词_分词统计.csv")
- with open(output_file_path, "w", encoding="UTF-8") as f:
- for key, count in sorted_key_list:
- f.write("%s,%d\n" % (key, count))
- def word_split_reverse(input_file: str, start_pos: int, end_pos: int):
- """
- 分词和建立倒排索引
- :param input_file: 待处理的文件
- :param start_pos: 处理的开始位置
- :param end_pos: 处理的结束位置
- :return: (开始位置,分词结果,倒排索引)
- """
- # 加载停用词
- stop_word_dict = utils.load_stop_word()
- # 关键词存放容器
- word_arr_list = []
- # 倒排索引
- word_reverse_index = dict()
- with open(input_file, "r", encoding="utf-8") as fr:
- for i, tmp_word in enumerate(fr):
- # start_pos是行数,而i要从0开始
- if i + 1 < start_pos:
- continue
- # 当前位置
- cur_pos = i + 1
- # 到达任务边界,结束
- if cur_pos == end_pos:
- break
- # 分词
- word_list = jieba.cut_for_search(tmp_word.replace("\n", ""))
- # 分词过滤结果
- word_filter_arr = []
- # 过滤停用词
- for word in word_list:
- if word in stop_word_dict:
- continue
- word_index_arr = word_reverse_index.get(word)
- if word_index_arr:
- word_index_arr.append(cur_pos)
- else:
- word_reverse_index[word] = [cur_pos]
- word_filter_arr.append(word)
- if len(word_filter_arr) == 0:
- word_arr_list.append([])
- else:
- word_arr_list.append(word_filter_arr)
- return start_pos, word_arr_list, word_reverse_index
- def prepare_word_split_and_reverse_index(file_path: str):
- """
- 预处理:长尾词分词、建立倒排索引
- :param file_path: 待处理文件夹路径
- :return:
- """
- # 判断文件是否存在
- word_input_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE)
- if os.path.exists(word_input_file) and not os.path.isfile(word_input_file):
- print("文件不存在! " + word_input_file)
- return
- # 总文本数量
- total_line_num = 0
- with open(word_input_file, "r", encoding="utf-8") as fi:
- total_line_num = sum(1 for line in fi)
- if total_line_num == 0:
- print("没有待处理的数据,文本量为0")
- return
- # 分割任务数量
- task_list = utils.avg_split_task(total_line_num, math.ceil(total_line_num / os.cpu_count()))
- # 任务进程处理结果
- p_result_list = []
- # 多进程处理
- with ProcessPoolExecutor(os.cpu_count()) as process_pool:
- # 提交任务
- process_futures = [process_pool.submit(word_split_reverse, word_input_file, task[0], task[1]) for task in
- task_list]
- # 处理返回结果
- for p_future in as_completed(process_futures):
- p_result = p_future.result()
- if p_result:
- p_result_list.append(p_result)
- # 分词结果排序
- p_result_list = sorted(p_result_list, key=lambda v: v[0])
- # 输出分词结果
- split_output_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_SPLIT)
- with open(split_output_file, "w", encoding="UTF-8") as fo:
- for start_pos, word_arr_list, reverse_index in p_result_list:
- for word_arr in word_arr_list:
- fo.write("%s\n" % ",".join([str(i) for i in word_arr]))
- # 关键词倒排索引
- word_reverse_index_dict = dict()
- # 合并倒排索引
- for start_pos, word_arr, reverse_index_dict in p_result_list:
- for key, value in reverse_index_dict.items():
- reverse_index_arr = word_reverse_index_dict.get(key)
- if reverse_index_arr:
- reverse_index_arr.extend(value)
- else:
- word_reverse_index_dict[key] = value
- # 输出倒排索引
- with open(os.path.join(file_path, FILE_LONG_TAIL_MERGE_REVERSE_INDEX), "w", encoding="UTF-8") as fo:
- for key, value in word_reverse_index_dict.items():
- fo.write("%s,%s\n" % (key, value))
- # 关闭进程池
- process_pool.shutdown()
- def agg_word_cal(word_file: str, word_split_file: str, word_position_list: list, word_split_position_list: list,
- agg_threshold: float, main_word: str, main_key_list: list, candidate_position_list: list):
- """
- 长尾词聚合计算
- :param word_file: 长尾词文件路径
- :param word_split_file: 长尾词分词文件路径
- :param word_position_list: 长尾词位置索引
- :param word_split_position_list 词根位置索引
- :param agg_threshold: 聚合阈值
- :param main_word: 主词
- :param main_key_list: 主词词根
- :param candidate_position_list: 候选词位置
- :return:
- """
- # 结果容器
- result_list = []
- with (open(word_file, "r", encoding="UTF-8") as f_key,
- mmap.mmap(f_key.fileno(), 0, access=mmap.ACCESS_READ) as f_key_mmap,
- open(word_split_file, "r", encoding="UTF-8") as f_key_split,
- mmap.mmap(f_key_split.fileno(), 0, access=mmap.ACCESS_READ) as f_key_split_mmap):
- if not candidate_position_list:
- logging.info("子进程:候选词列表为空,结束执行")
- return
- for candidate_position in candidate_position_list:
- try:
- # 获取关键词
- word_position = word_position_list[candidate_position]
- f_key_mmap.seek(word_position)
- candidate_word = f_key_mmap.readline().decode("UTF-8").replace("\r", "").replace("\n", "")
- if candidate_word == "小孩在肚子里怎么呼吸":
- print("子:出现拉拉")
- # 获取分词结果
- key_position = word_split_position_list[candidate_position]
- f_key_split_mmap.seek(key_position)
- temp_candidate_word_stem = f_key_split_mmap.readline().decode("UTF-8").replace("\r", "").replace("\n", "")
- # 为空则跳过
- if len(temp_candidate_word_stem) == 0:
- continue
- candidate_word_key_list = temp_candidate_word_stem.split(",")
- # 计算相关性
- try:
- val = utils.cal_cos_sim(main_word, main_key_list, candidate_word, candidate_word_key_list)
- if val >= agg_threshold:
- result_list.append((candidate_position, candidate_word))
- except Exception as e:
- logging.error("主关键词:%s 发生异常,涉及的副关键词信息-关键词:%s,分词:%s" % (
- main_word, candidate_word, candidate_word_key_list), e)
- except Exception as e:
- logging.error("子进程发生异常", e)
- return result_list
- def agg_word(file_path: str):
- """
- 长尾词聚合
- :param file_path:
- :return:
- """
- # 判断文件是否存在
- for file_name in [FILE_LONG_TAIL_MERGE, FILE_LONG_TAIL_MERGE_SPLIT, FILE_LONG_TAIL_MERGE_REVERSE_INDEX]:
- input_file = os.path.join(file_path, file_name)
- if os.path.exists(input_file) and not os.path.isfile(input_file):
- raise Exception("文件不存在!文件路径:" + input_file)
- # 总长尾词数量
- word_total_num = 0
- # 记录关键词位置
- word_position_list = [0]
- word_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE)
- with (open(word_file, "r", encoding="utf-8") as f,
- mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as fmmap):
- while True:
- # 获取当前位置
- cur_pos = fmmap.tell()
- # 移动到一下行
- line = fmmap.readline()
- # 结束检测
- if not line:
- break
- # 记录
- word_position_list.append(cur_pos)
- word_total_num = word_total_num + 1
- # 记录分词位置
- word_split_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_SPLIT)
- word_split_position_list = [0]
- with (open(word_split_file, "r", encoding="utf-8") as f,
- mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as fmmap):
- while True:
- # 获取当前位置
- cur_pos = fmmap.tell()
- # 移动到一下行
- line = fmmap.readline()
- # 结束检测
- if not line:
- break
- # 记录
- word_split_position_list.append(cur_pos)
- # 获取倒排索引
- word_reverse_index_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_REVERSE_INDEX)
- word_reverse_index_dict = {}
- # 分词
- key_re = r"([^,]+),\["
- key_pattern = re.compile(key_re, re.I)
- # 索引
- index_pattern = re.compile(r"\d+", re.I)
- with open(word_reverse_index_file, "r", encoding="utf-8") as f:
- for line in f:
- key_m = key_pattern.match(line)
- key = key_m.group(1)
- val = index_pattern.findall(line[line.index(","):])
- word_reverse_index_dict[key] = [int(v) for v in val]
- # 已使用长尾词位图
- unused_bitmap = BitMap(word_total_num + 1)
- # 聚合阈值
- agg_threshold = 0.8
- # 提交子进程阈值
- process_threshold = os.cpu_count() * 30
- # 提交任务 并输出结果
- word_agg_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_AGG)
- with (ProcessPoolExecutor(max_workers=os.cpu_count()) as process_pool,
- open(word_agg_file, "w", encoding="UTF-8") as fo,
- open(word_file, "r", encoding="UTF-8") as f_word,
- open(word_split_file, "r", encoding="UTF-8") as f_word_split,
- mmap.mmap(f_word_split.fileno(), 0, access=mmap.ACCESS_READ) as f_split_mmap):
- # 准备数据
- for position_index, main_word in enumerate(f_word, 1):
- # 判断是否已聚合,否则置为已使用
- if unused_bitmap.test(position_index):
- continue
- else:
- unused_bitmap.set(position_index)
- # 移除换行符
- main_word = utils.remove_line_break(main_word)
- if main_word == "小孩在肚子里怎么呼吸":
- print("出现拉拉")
- # 获取分词结果
- key_position = word_split_position_list[position_index]
- f_split_mmap.seek(key_position)
- temp_main_word_stem = utils.remove_line_break(f_split_mmap.readline().decode("UTF-8"))
- # 为空则跳过
- if len(temp_main_word_stem) == 0:
- continue
- main_word_stem_list = temp_main_word_stem.split(",")
- # 从倒排索引中获取候选词的位置索引
- candidate_position_set = set()
- for main_word_stem in main_word_stem_list:
- index_list = word_reverse_index_dict.get(main_word_stem)
- if index_list:
- candidate_position_set.update(index_list)
- # 排除已使用的长尾词
- candidate_position_list = []
- for candidate_position in candidate_position_set:
- # 跳过已使用
- if unused_bitmap.test(candidate_position):
- continue
- candidate_position_list.append(candidate_position)
- # 分割计算任务,没有计算任务则跳过
- candidate_position_len = len(candidate_position_list)
- if candidate_position_len == 0:
- continue
- # 暂存分析结果
- p_result_list = []
- # 任务量不足直接在主进程上进行计算,否则提交子进程计算
- if candidate_position_len < process_threshold:
- p_result = agg_word_cal(word_file, word_split_file, word_position_list, word_split_position_list, agg_threshold, main_word, main_word_stem_list, candidate_position_list)
- if len(p_result) > 0:
- p_result_list.extend(p_result)
- else:
- task_split_list = utils.avg_split_task(candidate_position_len, math.ceil(candidate_position_len / os.cpu_count()))
- all_task_list = [candidate_position_list[start_pos:end_pos] for start_pos, end_pos in task_split_list]
- # 提交任务
- process_futures = []
- for task_list in all_task_list:
- if not task_list:
- continue
- p_future = process_pool.submit(agg_word_cal, word_file, word_split_file, word_position_list, word_split_position_list, agg_threshold, main_word, main_word_stem_list, task_list)
- process_futures.append(p_future)
- for p_future in as_completed(process_futures):
- p_result = p_future.result()
- if p_result and len(p_result) > 0:
- p_result_list.extend(p_result)
- # 处理分析结果,并标记已处理数据
- if len(p_result_list) > 0:
- fo.write("%s\n" % main_word)
- for candidate_position, candidate_word in p_result_list:
- unused_bitmap.set(candidate_position)
- fo.write("%s\n" % candidate_word)
- fo.write("\n")
- # 清除上一轮的数据
- p_result_list.clear()
- # 关闭线程
- process_pool.shutdown()
- if __name__ == "__main__":
- print("开始时间" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
- # filePath = "../data"
- filePath = "../data/test"
- # extract_word_from_5118(filePath)
- # merge_word(filePath)
- # prepare_word_split_and_reverse_index(filePath)
- agg_word(filePath)
- # word_split_statistics(file_path)
- # tasks = utils.avg_split_task(100, 12)
- # 两者计算余弦值等于:0.8
- # val = utils.cal_cos_sim("QQ邮箱格式怎么写", ["QQ", "邮箱", "格式", "怎么", "写"], "QQ邮箱格式如何写",
- # ["QQ", "邮箱", "格式", "如何", "写"])
- print("结束时间" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
|