|
|
@@ -1,20 +1,11 @@
|
|
|
# -*- coding:utf-8 -*-
|
|
|
import math
|
|
|
import os
|
|
|
-import re
|
|
|
-import shutil
|
|
|
-import threading
|
|
|
-import time
|
|
|
-from concurrent.futures import ProcessPoolExecutor, as_completed, ThreadPoolExecutor
|
|
|
+from concurrent.futures import ProcessPoolExecutor, as_completed
|
|
|
|
|
|
import jieba
|
|
|
-import redis
|
|
|
-from bitarray import bitarray
|
|
|
-from tqdm import tqdm
|
|
|
|
|
|
import utils
|
|
|
-import logging
|
|
|
-
|
|
|
from constant import FILE_LONG_TAIL_MERGE
|
|
|
|
|
|
# 文件:长尾词_合并_分词.txt
|
|
|
@@ -47,150 +38,6 @@ CACHE_UNUSED_BITMAP = "unused_bitmap"
|
|
|
# 字符集:UTF-8
|
|
|
CHARSET_UTF_8 = "UTF-8"
|
|
|
|
|
|
-# redis缓存池
|
|
|
-redis_pool: redis.ConnectionPool = None
|
|
|
-
|
|
|
-# 线程池
|
|
|
-thread_pool: ThreadPoolExecutor = None
|
|
|
-
|
|
|
-# 线程本地变量
|
|
|
-local_var = threading.local()
|
|
|
-
|
|
|
-
|
|
|
-def agg_word(file_path: str):
|
|
|
- """
|
|
|
- 长尾词聚合
|
|
|
- :param file_path:
|
|
|
- :return:
|
|
|
- """
|
|
|
-
|
|
|
- # 总长尾词数量
|
|
|
- # word_total_num = 0
|
|
|
- word_total_num = 1000000
|
|
|
-
|
|
|
- # 聚合阈值
|
|
|
- agg_threshold = 0.8
|
|
|
-
|
|
|
- # 每份任务计算量
|
|
|
- task_cal_num = 10000
|
|
|
-
|
|
|
- # 工作现成(减1是为了留一个处理器给redis)
|
|
|
- worker_num = os.cpu_count() - 1
|
|
|
- # worker_num = 1
|
|
|
-
|
|
|
- # 正则表达式:聚合文件分文件
|
|
|
- agg_file_pattern = re.compile(r"长尾词_合并_聚合_\d+_\d+.txt", re.I)
|
|
|
-
|
|
|
- # 最大线程数
|
|
|
- max_threads = 2
|
|
|
-
|
|
|
- # redis最大连接数(和工作线程数保持一致,免得浪费)
|
|
|
- redis_max_conns = max_threads
|
|
|
-
|
|
|
- # redis缓存
|
|
|
- m_redis_cache = redis.StrictRedis(host='127.0.0.1', port=6379)
|
|
|
-
|
|
|
- # 判断文件是否存在
|
|
|
- for file_name in [FILE_LONG_TAIL_MERGE, FILE_LONG_TAIL_MERGE_SPLIT,
|
|
|
- FILE_LONG_TAIL_MERGE_REVERSE_INDEX]:
|
|
|
- input_file = os.path.join(file_path, file_name)
|
|
|
- if os.path.exists(input_file) and not os.path.isfile(input_file):
|
|
|
- raise Exception("文件不存在!文件路径:" + input_file)
|
|
|
-
|
|
|
- # 归档历史数据文件
|
|
|
- history_agg_file_list = [file for file in os.listdir(file_path) if agg_file_pattern.match(file)]
|
|
|
- if len(history_agg_file_list) > 0:
|
|
|
- archive_path = os.path.join(file_path, DIR_AGG_FILE_ARCHIVE % time.strftime('%Y%m%d%H%M%S'))
|
|
|
- os.makedirs(archive_path)
|
|
|
- for history_agg_file in history_agg_file_list:
|
|
|
- shutil.move(os.path.join(file_path, history_agg_file), archive_path)
|
|
|
-
|
|
|
- # 缓存关键词位置
|
|
|
- # word_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE)
|
|
|
- # word_dict = {}
|
|
|
- # with open(word_file, "r", encoding="utf-8") as f:
|
|
|
- # for position, word in enumerate(f, start=1):
|
|
|
- # word = utils.remove_line_break(word)
|
|
|
- # if not word:
|
|
|
- # continue
|
|
|
- # word_dict[position] = word
|
|
|
- # m_redis_cache.hset(CACHE_WORD, mapping=word_dict)
|
|
|
- # # 记录总关键词数
|
|
|
- # word_total_num = len(word_dict)
|
|
|
- # # 释放内存
|
|
|
- # del word_dict
|
|
|
- #
|
|
|
- # # 缓存分词
|
|
|
- # word_split_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_SPLIT)
|
|
|
- # word_split_dict = {}
|
|
|
- # with open(word_split_file, "r", encoding=CHARSET_UTF_8) as f:
|
|
|
- # for position, word_split_line in enumerate(f, start=1):
|
|
|
- # word_split_line = utils.remove_line_break(word_split_line)
|
|
|
- # if not word_split_line:
|
|
|
- # continue
|
|
|
- # word_split_dict[position] = word_split_line
|
|
|
- # m_redis_cache.hset(CACHE_WORD_STEM, mapping=word_split_dict)
|
|
|
- # # 释放内存
|
|
|
- # del word_split_dict
|
|
|
- #
|
|
|
- # # 缓存倒排索引
|
|
|
- # word_reverse_index_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_REVERSE_INDEX)
|
|
|
- # word_reverse_index_dict = {}
|
|
|
- # # 分词
|
|
|
- # key_pattern = re.compile(r"([^,]+),\[", re.I)
|
|
|
- # # 索引
|
|
|
- # index_pattern = re.compile(r"\d+", re.I)
|
|
|
- # with open(word_reverse_index_file, "r", encoding="utf-8") as f:
|
|
|
- # for word_split_line in f:
|
|
|
- # key_m = key_pattern.match(word_split_line)
|
|
|
- # key = key_m.group(1)
|
|
|
- # val = index_pattern.findall(word_split_line[word_split_line.index(","):])
|
|
|
- # word_reverse_index_dict[key] = ",".join(val)
|
|
|
- # m_redis_cache.hset(CACHE_WORD_REVERSE_INDEX, mapping=word_reverse_index_dict)
|
|
|
- # # 释放内存
|
|
|
- # del word_reverse_index_dict
|
|
|
-
|
|
|
- # 先清除,然后重新构建长尾词使用位图
|
|
|
- m_redis_cache.delete(CACHE_UNUSED_BITMAP)
|
|
|
- m_redis_cache.setbit(CACHE_UNUSED_BITMAP, word_total_num + 2, 1)
|
|
|
-
|
|
|
- # 提交任务 并输出结果
|
|
|
- word_agg_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_AGG)
|
|
|
-
|
|
|
- with ProcessPoolExecutor(max_workers=worker_num, initializer=init_process,
|
|
|
- initargs=(redis_max_conns, max_threads, file_path)) as process_pool:
|
|
|
- # 计算任务边界
|
|
|
- task_list = utils.avg_split_task(word_total_num, task_cal_num, 1)
|
|
|
-
|
|
|
- # 提交任务
|
|
|
- process_futures = []
|
|
|
- for skip_line, pos in enumerate(task_list, start=1):
|
|
|
- skip_line = (skip_line % worker_num) + 1
|
|
|
- p_future = process_pool.submit(agg_word_process, agg_threshold, pos[0], pos[1], word_total_num,
|
|
|
- skip_line)
|
|
|
- process_futures.append(p_future)
|
|
|
-
|
|
|
- # 显示任务进度
|
|
|
- with tqdm(total=len(process_futures), desc='文本聚合进度', unit='份', unit_scale=True) as pbar:
|
|
|
- for p_future in as_completed(process_futures):
|
|
|
- p_result = p_future.result()
|
|
|
- # 更新发呆进度
|
|
|
- pbar.update(1)
|
|
|
-
|
|
|
- # 关闭线程
|
|
|
- process_pool.shutdown()
|
|
|
-
|
|
|
- # 获取子进程结果文件列表,并合并
|
|
|
- with open(word_agg_file, "w", encoding="UTF-8") as fo:
|
|
|
- for file in os.listdir(file_path):
|
|
|
- # 不是处理结果部分跳过
|
|
|
- if not agg_file_pattern.match(file):
|
|
|
- continue
|
|
|
-
|
|
|
- with open(os.path.join(file_path, file), "r", encoding="UTF-8") as fi:
|
|
|
- for word in fi:
|
|
|
- fo.write(word)
|
|
|
-
|
|
|
|
|
|
def prepare_word_split_and_reverse_index(file_path: str):
|
|
|
"""
|
|
|
@@ -217,11 +64,11 @@ def prepare_word_split_and_reverse_index(file_path: str):
|
|
|
# 分割任务数量
|
|
|
task_list = utils.avg_split_task(total_line_num, math.ceil(total_line_num / os.cpu_count()))
|
|
|
|
|
|
- # 任务进程处理结果
|
|
|
- p_result_list = []
|
|
|
-
|
|
|
# 多进程处理
|
|
|
with ProcessPoolExecutor(os.cpu_count()) as process_pool:
|
|
|
+ # 任务进程处理结果
|
|
|
+ p_result_list = []
|
|
|
+
|
|
|
# 提交任务
|
|
|
process_futures = [process_pool.submit(word_split_reverse, word_input_file, task[0], task[1]) for task in
|
|
|
task_list]
|
|
|
@@ -237,14 +84,14 @@ def prepare_word_split_and_reverse_index(file_path: str):
|
|
|
# 输出分词结果
|
|
|
split_output_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_SPLIT)
|
|
|
with open(split_output_file, "w", encoding="UTF-8") as fo:
|
|
|
- for start_pos, word_arr_list, reverse_index in p_result_list:
|
|
|
- for word_arr in word_arr_list:
|
|
|
- fo.write("%s\n" % ",".join([str(i) for i in word_arr]))
|
|
|
+ for start_pos, word_list, reverse_index in p_result_list:
|
|
|
+ for word in word_list:
|
|
|
+ fo.write("%s\n" % word)
|
|
|
|
|
|
# 关键词倒排索引
|
|
|
word_reverse_index_dict = dict()
|
|
|
# 合并倒排索引
|
|
|
- for start_pos, word_arr, reverse_index_dict in p_result_list:
|
|
|
+ for start_pos, word_list, reverse_index_dict in p_result_list:
|
|
|
for key, value in reverse_index_dict.items():
|
|
|
reverse_index_arr = word_reverse_index_dict.get(key)
|
|
|
if reverse_index_arr:
|
|
|
@@ -253,8 +100,8 @@ def prepare_word_split_and_reverse_index(file_path: str):
|
|
|
word_reverse_index_dict[key] = value
|
|
|
# 输出倒排索引
|
|
|
with open(os.path.join(file_path, FILE_LONG_TAIL_MERGE_REVERSE_INDEX), "w", encoding="UTF-8") as fo:
|
|
|
- for key, value in word_reverse_index_dict.items():
|
|
|
- fo.write("%s,%s\n" % (key, value))
|
|
|
+ for key, values in word_reverse_index_dict.items():
|
|
|
+ fo.write("%s,%s\n" % (key, ",".join(values)))
|
|
|
|
|
|
# 关闭进程池
|
|
|
process_pool.shutdown()
|
|
|
@@ -271,13 +118,13 @@ def word_split_reverse(input_file: str, start_pos: int, end_pos: int):
|
|
|
|
|
|
# 加载停用词
|
|
|
stop_word_dict = utils.load_stop_word()
|
|
|
- # 关键词存放容器
|
|
|
- word_arr_list = []
|
|
|
+ # 关键词+分词存放容器
|
|
|
+ word_list = []
|
|
|
# 倒排索引
|
|
|
- word_reverse_index = dict()
|
|
|
+ reverse_index = dict()
|
|
|
|
|
|
with open(input_file, "r", encoding="utf-8") as fr:
|
|
|
- for i, tmp_word in enumerate(fr):
|
|
|
+ for i, word in enumerate(fr):
|
|
|
|
|
|
# start_pos是行数,而i要从0开始
|
|
|
if i + 1 < start_pos:
|
|
|
@@ -291,198 +138,30 @@ def word_split_reverse(input_file: str, start_pos: int, end_pos: int):
|
|
|
break
|
|
|
|
|
|
# 分词
|
|
|
- word_list = jieba.cut_for_search(tmp_word.replace("\n", ""))
|
|
|
+ word = word.replace("\n", "").replace("\r", "")
|
|
|
+ stem_list = jieba.cut_for_search(word)
|
|
|
|
|
|
# 分词过滤结果
|
|
|
- word_filter_arr = []
|
|
|
+ stem_filter_arr = []
|
|
|
|
|
|
# 过滤停用词
|
|
|
- for word in word_list:
|
|
|
- if word in stop_word_dict:
|
|
|
+ for stem in stem_list:
|
|
|
+ if stem in stop_word_dict:
|
|
|
continue
|
|
|
-
|
|
|
- word_index_arr = word_reverse_index.get(word)
|
|
|
+ # 倒排索引
|
|
|
+ word_index_arr = reverse_index.get(stem)
|
|
|
+ pos = str(cur_pos)
|
|
|
if word_index_arr:
|
|
|
- word_index_arr.append(cur_pos)
|
|
|
+ word_index_arr.append(pos)
|
|
|
else:
|
|
|
- word_reverse_index[word] = [cur_pos]
|
|
|
-
|
|
|
- word_filter_arr.append(word)
|
|
|
+ reverse_index[stem] = [pos]
|
|
|
+ # 有效分词
|
|
|
+ stem_filter_arr.append(stem)
|
|
|
|
|
|
- if len(word_filter_arr) == 0:
|
|
|
- word_arr_list.append([])
|
|
|
+ if len(stem_filter_arr) == 0:
|
|
|
+ word_list.append(word)
|
|
|
else:
|
|
|
- word_arr_list.append(word_filter_arr)
|
|
|
-
|
|
|
- return start_pos, word_arr_list, word_reverse_index
|
|
|
-
|
|
|
-
|
|
|
-def init_process(max_conns: int, max_threads: int, file_path: str):
|
|
|
- """
|
|
|
- 初始化进程
|
|
|
- :param max_conns: redis最大连接数量
|
|
|
- :param max_threads: 线程最大数量
|
|
|
- :param file_path: 输出文件路径
|
|
|
- :return:
|
|
|
- """
|
|
|
- # redis缓存池 初始化
|
|
|
- global redis_pool
|
|
|
- redis_pool = redis.ConnectionPool(host='127.0.0.1', port=6379, max_connections=max_conns)
|
|
|
-
|
|
|
- global thread_pool
|
|
|
- thread_pool = ThreadPoolExecutor(max_threads, initializer=init_thread, initargs=(file_path,))
|
|
|
-
|
|
|
-
|
|
|
-def agg_word_process(agg_threshold: float, start_pos: int, end_pos: int, final_pos: int,
|
|
|
- skip_line: int):
|
|
|
- """
|
|
|
- 长尾词聚合处理
|
|
|
- :param agg_threshold: 聚合阈值
|
|
|
- :param start_pos: 任务处理开始边界(包含)
|
|
|
- :param end_pos: 任务处理结束边界(不包含)
|
|
|
- :param final_pos: 总任务边界
|
|
|
- :param skip_line: 进度条显示位置
|
|
|
- :return:
|
|
|
- """
|
|
|
+ stem_filter_arr.insert(0, word)
|
|
|
+ word_list.append(",".join(stem_filter_arr))
|
|
|
|
|
|
- # 进度长度
|
|
|
- process_len = 0
|
|
|
- if end_pos == -1:
|
|
|
- process_len = final_pos - start_pos
|
|
|
- else:
|
|
|
- process_len = end_pos - start_pos
|
|
|
-
|
|
|
- with tqdm(total=process_len, desc='子进程-%s:文本聚合进度' % os.getpid(), unit='份', unit_scale=True,
|
|
|
- position=skip_line) as pbar:
|
|
|
-
|
|
|
- thread_futures = [thread_pool.submit(agg_word_thread, main_word_position, agg_threshold) for main_word_position
|
|
|
- in
|
|
|
- range(start_pos, end_pos)]
|
|
|
-
|
|
|
- for t_future in as_completed(thread_futures):
|
|
|
- t_result = t_future.result()
|
|
|
- # 更新发呆进度
|
|
|
- pbar.update(1)
|
|
|
-
|
|
|
- return
|
|
|
-
|
|
|
-
|
|
|
-def init_thread(file_path: str):
|
|
|
- """
|
|
|
- 聚合线程初始化
|
|
|
- :param file_path: 输出文件路径
|
|
|
- :return:
|
|
|
- """
|
|
|
- # 初始化redis客户端
|
|
|
- local_var.redis_cache = redis.StrictRedis(connection_pool=redis_pool)
|
|
|
- # 初始化redis管道
|
|
|
- local_var.redis_pipeline = local_var.redis_cache.pipeline(transaction=False)
|
|
|
-
|
|
|
- # 生成临时结果文件
|
|
|
- word_agg_file = os.path.join(file_path,
|
|
|
- FILE_LONG_TAIL_MERGE_AGG_PID % (os.getpid(), threading.current_thread().ident))
|
|
|
- local_var.file_writer = open(word_agg_file, "w", encoding=CHARSET_UTF_8)
|
|
|
-
|
|
|
- # 已使用位图副本
|
|
|
- local_var.unused_bitmap = bitarray()
|
|
|
-
|
|
|
- # 从倒排索引中获取候选词的位置索引
|
|
|
- local_var.candidate_position_set = set()
|
|
|
-
|
|
|
- # 结果列表
|
|
|
- local_var.result_list = []
|
|
|
-
|
|
|
-
|
|
|
-def agg_word_thread(main_word_position: int, agg_threshold: float):
|
|
|
- try:
|
|
|
- # 获取已使用位图副本
|
|
|
- local_var.unused_bitmap.frombytes(local_var.redis_cache.get(CACHE_UNUSED_BITMAP))
|
|
|
-
|
|
|
- # 判断主词是否为已使用,是则跳过,否则设置为已使用
|
|
|
- if local_var.unused_bitmap[main_word_position]:
|
|
|
- return
|
|
|
- else:
|
|
|
- local_var.redis_cache.setbit(CACHE_UNUSED_BITMAP, main_word_position, 1)
|
|
|
- local_var.unused_bitmap[main_word_position] = 1
|
|
|
-
|
|
|
- # 获取主词和对应的词根
|
|
|
- local_var.redis_pipeline.hget(CACHE_WORD, main_word_position)
|
|
|
- local_var.redis_pipeline.hget(CACHE_WORD_STEM, main_word_position)
|
|
|
- main_result = local_var.redis_pipeline.execute()
|
|
|
- main_word = main_result[0]
|
|
|
- main_word_stem = main_result[1]
|
|
|
-
|
|
|
- # 如果存在为空则返回
|
|
|
- if not main_word or not main_word_stem:
|
|
|
- return
|
|
|
-
|
|
|
- main_word_stem_list = main_word_stem.decode(CHARSET_UTF_8).split(",")
|
|
|
-
|
|
|
- # 从倒排索引获取长尾词位置
|
|
|
- temp_candidate_position_list = local_var.redis_cache.hmget(CACHE_WORD_REVERSE_INDEX, main_word_stem_list)
|
|
|
- for temp_candidate_position in temp_candidate_position_list:
|
|
|
- if not temp_candidate_position:
|
|
|
- continue
|
|
|
- # 排除已聚合
|
|
|
- for candidate_position in temp_candidate_position.decode(CHARSET_UTF_8).split(","):
|
|
|
- if local_var.unused_bitmap[int(candidate_position)]:
|
|
|
- continue
|
|
|
- local_var.candidate_position_set.add(candidate_position)
|
|
|
-
|
|
|
- # 没有找到需要计算的候选词则跳过
|
|
|
- if not local_var.candidate_position_set:
|
|
|
- return
|
|
|
-
|
|
|
- # 从缓存获取关键词列表、分词列表,如果为空则跳过
|
|
|
- local_var.redis_pipeline.hmget(CACHE_WORD, local_var.candidate_position_set)
|
|
|
- local_var.redis_pipeline.hmget(CACHE_WORD_STEM, local_var.candidate_position_set)
|
|
|
- candidate_result = local_var.redis_pipeline.execute()
|
|
|
- candidate_word_cache_list = candidate_result[0]
|
|
|
- candidate_word_stem_cache_list = candidate_result[1]
|
|
|
- if not candidate_word_cache_list or not candidate_word_stem_cache_list:
|
|
|
- return
|
|
|
-
|
|
|
- # 延后编码成字符,以防前面直接返回
|
|
|
- main_word = main_word.decode(CHARSET_UTF_8)
|
|
|
-
|
|
|
- # 计算相似度
|
|
|
- for candidate_position in range(len(local_var.candidate_position_set)):
|
|
|
-
|
|
|
- # 获取关键词、分词,如果存在为空则跳过
|
|
|
- candidate_word = candidate_word_cache_list[int(candidate_position)]
|
|
|
- if not candidate_word:
|
|
|
- continue
|
|
|
- candidate_word_stem = candidate_word_stem_cache_list[int(candidate_position)]
|
|
|
- if not candidate_word_stem:
|
|
|
- continue
|
|
|
- candidate_word = candidate_word.decode(CHARSET_UTF_8)
|
|
|
- candidate_word_stem_list = candidate_word_stem.decode(CHARSET_UTF_8).split(",")
|
|
|
-
|
|
|
- # 计算相关性
|
|
|
- try:
|
|
|
- val = utils.cal_cos_sim(main_word, main_word_stem_list, candidate_word,
|
|
|
- candidate_word_stem_list)
|
|
|
- if val >= agg_threshold:
|
|
|
- local_var.redis_cache.setbit(CACHE_UNUSED_BITMAP, candidate_position, 1)
|
|
|
- local_var.result_list.append(candidate_word)
|
|
|
- except Exception as e:
|
|
|
- logging.error("主关键词:%s 发生异常,涉及的副关键词信息-关键词:%s,分词:%s" % (
|
|
|
- main_word, candidate_word, candidate_word_stem_list), e)
|
|
|
-
|
|
|
- # 保存结果
|
|
|
- if not local_var.result_list:
|
|
|
- return
|
|
|
- local_var.file_writer.write("%s\n" % main_word)
|
|
|
- for candidate_word in local_var.result_list:
|
|
|
- local_var.file_writer.write("%s\n" % candidate_word)
|
|
|
- local_var.file_writer.write("\n")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logging.error("子进程发生异常", e)
|
|
|
- finally:
|
|
|
- # 清除容器数据
|
|
|
- local_var.candidate_position_set.clear()
|
|
|
- local_var.result_list.clear()
|
|
|
- local_var.unused_bitmap.clear()
|
|
|
-
|
|
|
- return
|
|
|
+ return start_pos, word_list, reverse_index
|