# -*- coding:utf-8 -*- import math import logging import os import config import logging.config import pickle import mmap TITLE = "工具类" tip_internal_cache = {} def init_log(): """ 日志初始化工具 """ # 读取日志配置文件内容 logging.config.fileConfig('./logging.conf') # 用一个没有在配置文件中定义的logger名称来创建一个日志器logger return logging.getLogger() def log_start_msg(msg): """ 执行开始时的简易日志输出 """ logging.info("-----------------%s 开始-----------------" % msg) def log_end_msg(msg): """ 执行结束时的简易日志输出 """ logging.info("-----------------%s 结束-----------------" % msg) def get_tip_internal(total_num): """ 计算进度提示间隔 """ # 尝试从缓存中获取 internal = tip_internal_cache.get(total_num) # 不存在则进行计算并放入缓存中 if not internal: internal = math.ceil(total_num * config.PRECENT_TIPS) tip_internal_cache[total_num] = internal return internal def tip(total_num, cur_num, is_zero_base=True): """ 简易进度提示 total_num 总数量 cur_num 当前进度(0基) internal 提示间隔 """ # TODO # 修改成百分比提示 internal = get_tip_internal(total_num) # cur_num + 1 是0基修正 if is_zero_base: cur_num = cur_num + 1 # 进度提示 if cur_num == total_num: logging.info("当前进度 %d / %d" % (total_num, total_num)) elif cur_num % internal == 0: logging.info("当前进度 %d / %d" % (cur_num, total_num)) def tip_in_size(total_size, cur_pos): """ 简易进度提示(用于不知道总行数的情形) total_size 总数量 cur_num 当前进度 """ # 尝试从缓存中获取 tip_internal = tip_internal_cache.get(total_size) if not tip_internal: # 不存在缓存,构建 提示检查点 和 提示间隔 信息 internal = math.ceil(total_size * config.PRECENT_TIPS) tip_internal= { "check_point": cur_pos, "internal": internal } # 放入缓存 tip_internal_cache[total_size] = tip_internal # 当前位置超过提示检查点则显示进度 if cur_pos >= tip_internal["check_point"]: logging.info("当前进度 %d / %d" % (cur_pos, total_size)) # 修改 提示检查点 check_point = tip_internal["check_point"] internal = tip_internal["internal"] while cur_pos >= check_point: check_point = check_point + internal # 如果 提示检查点大于总值,则置为总值 if check_point > total_size: check_point = total_size # 如果不手动中断会陷入循环 break # 更新 提示检查点 tip_internal["check_point"] = check_point def save_obj(path, obj): """ 保存对象至本地 """ with open(path, "wb") as f: pickle.dump(obj, f) def load_obj(path): """ 加载对象 """ with open(path, "rb") as f: return pickle.load(f) if __name__ == "__main__": init_log() log_start_msg(TITLE) # 测试普通提示 # total = 3 # for i in range(total): # tip(total, i) # 测试mmap的提示 # with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as fkey, \ # mmap.mmap(fkey.fileno(), 0, access=mmap.ACCESS_READ) as fmmap: # # 总大小 # total_num = fmmap.size() # while True: # # 读取光标位置 # cur_pos = fmmap.tell() # # 把光标移动到下一行 # line = fmmap.readline() # # 进度显示 # tip_in_size(total_num, cur_pos) # if not line: # break # 测试逐行读取的进度提示 with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as fkey: fkey.seek(0, os.SEEK_END) total_num = fkey.tell() fkey.seek(0) while True: cur_pos = fkey.tell() line = fkey.readline() tip_in_size(total_num, cur_pos) if not line: break; log_end_msg(TITLE)