| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184 |
- # -*- coding:utf-8 -*-
- import math
- import logging
- import os
- import config
- import logging.config
- import pickle
- import mmap
- TITLE = "工具类"
- tip_internal_cache = {}
- def init_log():
- """
- 日志初始化工具
- """
- # 读取日志配置文件内容
- logging.config.fileConfig('./logging.conf')
- # 用一个没有在配置文件中定义的logger名称来创建一个日志器logger
- return logging.getLogger()
- def log_start_msg(msg):
- """
- 执行开始时的简易日志输出
- """
- logging.info("-----------------%s 开始-----------------" % msg)
- def log_end_msg(msg):
- """
- 执行结束时的简易日志输出
- """
- logging.info("-----------------%s 结束-----------------" % msg)
- def get_tip_internal(total_num):
- """
- 计算进度提示间隔
- """
- # 尝试从缓存中获取
- internal = tip_internal_cache.get(total_num)
- # 不存在则进行计算并放入缓存中
- if not internal:
- internal = math.ceil(total_num * config.PRECENT_TIPS)
- tip_internal_cache[total_num] = internal
- return internal
-
- def tip(total_num, cur_num, is_zero_base=True):
- """
- 简易进度提示
- total_num 总数量
- cur_num 当前进度(0基)
- internal 提示间隔
- """
- # TODO
- # 修改成百分比提示
- internal = get_tip_internal(total_num)
- # cur_num + 1 是0基修正
- if is_zero_base:
- cur_num = cur_num + 1
- # 进度提示
- if cur_num == total_num:
- logging.info("当前进度 %d / %d" % (total_num, total_num))
- elif cur_num % internal == 0:
- logging.info("当前进度 %d / %d" % (cur_num, total_num))
- def tip_in_size(total_size, cur_pos):
- """
- 简易进度提示(用于不知道总行数的情形)
- total_size 总数量
- cur_num 当前进度
- """
- # 尝试从缓存中获取
- tip_internal = tip_internal_cache.get(total_size)
- if not tip_internal:
- # 不存在缓存,构建 提示检查点 和 提示间隔 信息
- internal = math.ceil(total_size * config.PRECENT_TIPS)
- tip_internal= {
- "check_point": cur_pos,
- "internal": internal
- }
- # 放入缓存
- tip_internal_cache[total_size] = tip_internal
-
- # 当前位置超过提示检查点则显示进度
- if cur_pos >= tip_internal["check_point"]:
- logging.info("当前进度 %d / %d" % (cur_pos, total_size))
- # 修改 提示检查点
- check_point = tip_internal["check_point"]
- internal = tip_internal["internal"]
- while cur_pos >= check_point:
- check_point = check_point + internal
- # 如果 提示检查点大于总值,则置为总值
- if check_point > total_size:
- check_point = total_size
- # 如果不手动中断会陷入循环
- break
-
- # 更新 提示检查点
- tip_internal["check_point"] = check_point
-
- def save_obj(path, obj):
- """
- 保存对象至本地
- """
- with open(path, "wb") as f:
- pickle.dump(obj, f)
- def load_obj(path):
- """
- 加载对象
- """
- with open(path, "rb") as f:
- return pickle.load(f)
- if __name__ == "__main__":
- init_log()
- log_start_msg(TITLE)
- # 测试普通提示
- # total = 3
- # for i in range(total):
- # tip(total, i)
- # 测试mmap的提示
- # with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as fkey, \
- # mmap.mmap(fkey.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
-
- # # 总大小
- # total_num = fmmap.size()
- # while True:
- # # 读取光标位置
- # cur_pos = fmmap.tell()
- # # 把光标移动到下一行
- # line = fmmap.readline()
- # # 进度显示
- # tip_in_size(total_num, cur_pos)
- # if not line:
- # break
- # 测试逐行读取的进度提示
- with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as fkey:
- fkey.seek(0, os.SEEK_END)
- total_num = fkey.tell()
- fkey.seek(0)
- while True:
- cur_pos = fkey.tell()
- line = fkey.readline()
- tip_in_size(total_num, cur_pos)
- if not line:
- break;
-
- log_end_msg(TITLE)
|