| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- # -*- coding:utf-8 -*-
- import math
- import logging
- import os
- import config
- import logging.config
- import pickle
- import mmap
- TITLE = "工具类"
- tip_internal_cache = {}
- def init_log():
- """
- 日志初始化工具
- """
- # 读取日志配置文件内容
- logging.config.fileConfig('./logging.conf')
- # 用一个没有在配置文件中定义的logger名称来创建一个日志器logger
- return logging.getLogger()
- def log_start_msg(msg):
- """
- 执行开始时的简易日志输出
- """
- logging.info("-----------------%s 开始-----------------" % msg)
- def log_end_msg(msg):
- """
- 执行结束时的简易日志输出
- """
- logging.info("-----------------%s 结束-----------------" % msg)
- def get_tip_internal(total_num):
- """
- 计算进度提示间隔
- """
- # 尝试从缓存中获取
- internal = tip_internal_cache.get(total_num)
- # 不存在则进行计算并放入缓存中
- if not internal:
- internal = math.ceil(total_num * config.PRECENT_TIPS)
- tip_internal_cache[total_num] = internal
- return internal
-
- def tip(total_num, cur_num, is_zero_base=True):
- """
- 简易进度提示
- total_num 总数量
- cur_num 当前进度(0基)
- internal 提示间隔
- """
- # TODO
- # 修改成百分比提示
- internal = get_tip_internal(total_num)
- # cur_num + 1 是0基修正
- if is_zero_base:
- cur_num = cur_num + 1
- # 进度提示
- if cur_num == total_num:
- logging.info("当前进度 %d / %d" % (total_num, total_num))
- elif cur_num % internal == 0:
- logging.info("当前进度 %d / %d" % (cur_num, total_num))
- def tip_in_size(total_size, cur_pos):
- """
- 简易进度提示(用于不知道总行数的情形)
- total_size 总数量
- cur_num 当前进度
- """
- # 尝试从缓存中获取
- tip_internal = tip_internal_cache.get(total_size)
- if not tip_internal:
- # 不存在缓存,构建 提示检查点 和 提示间隔 信息
- internal = math.ceil(total_size * config.PRECENT_TIPS)
- tip_internal= {
- "check_point": cur_pos,
- "internal": internal
- }
- # 放入缓存
- tip_internal_cache[total_size] = tip_internal
-
- # 当前位置超过提示检查点则显示进度
- if cur_pos >= tip_internal["check_point"]:
- logging.info("当前进度 %d / %d" % (cur_pos, total_size))
- # 修改 提示检查点
- check_point = tip_internal["check_point"]
- internal = tip_internal["internal"]
- while cur_pos >= check_point:
- check_point = check_point + internal
- # 如果 提示检查点大于总值,则置为总值
- if check_point > total_size:
- check_point = total_size
- # 如果不手动中断会陷入循环
- break
-
- # 更新 提示检查点
- tip_internal["check_point"] = check_point
- def save_obj(path, obj):
- """
- 保存对象至本地
- """
- with open(path, "wb") as f:
- pickle.dump(obj, f)
- def load_obj(path):
- """
- 加载对象
- """
- with open(path, "rb") as f:
- return pickle.load(f)
- def load_stop_word():
- """
- 加载停用词
- """
- # 判断是否存在缓存
- if os.path.exists(config.STOP_WORD_CACHE):
- logging.debug("存在停用词缓存")
- return load_obj(config.STOP_WORD_CACHE)
- logging.debug("正在构建停用词缓存")
- # 停用词容器
- stop_word = set()
- # 构建停用词列表
- stop_word_files = os.listdir(config.STOP_WORD_DIR)
- for file in stop_word_files:
- stop_word_file = os.path.join(config.STOP_WORD_DIR, file)
- with open(stop_word_file, encoding=config.ENCODING_CHARSET) as f:
- for item in f:
- # 移除换行符
- stop_word.add(item.replace("\n","").replace("\r", ""))
- # 改成dict提升检索速度
- stop_word_dict = {}
- for item in stop_word:
- stop_word_dict[item]=None
-
- logging.debug("把停用词缓存保存到本地")
- # 保存本地作为缓存
- save_obj(config.STOP_WORD_CACHE, stop_word_dict)
-
- return stop_word_dict
- def avg_split_task(total:int, split_internal:int):
- """
- 平分任务
- """
- # 分割的任务份数
- split_num = math.ceil(total / split_internal)
- # 平分
- tasks = []
- for i in range(split_num):
- # 计算平分点在列表中的位置
- start_pos = i * split_internal
- end_pos = i * split_internal + split_internal
- # 如果超过列表大小需要额外处理
- if end_pos >= total:
- end_pos = -1
- tasks.append([start_pos,end_pos])
-
- return tasks
- if __name__ == "__main__":
- stop_word = load_stop_word()
- with open("./data/stopword.txt","w",encoding="UTF-8") as f:
- for stopWord in stop_word.keys():
- f.write("%s\n" % stopWord)
|