|
|
@@ -64,7 +64,7 @@ def sub_process(start_pos, end_pos):
|
|
|
"""
|
|
|
pid = os.getpid()
|
|
|
|
|
|
- logging.debug("子进程-%d 开始执行任务,开始位置:%d,结束位置:%d" % (pid,start_pos, end_pos))
|
|
|
+ logging.info("子进程-%d 开始执行任务,开始位置:%d,结束位置:%d" % (pid,start_pos, end_pos))
|
|
|
|
|
|
# 聚合结果
|
|
|
agg_result = []
|
|
|
@@ -175,7 +175,7 @@ def sub_process(start_pos, end_pos):
|
|
|
logging.debug("子进程-%d 主关键词:%s 计算相关性结束,相关的关键词数据量:%d" % (pid, a_key, (len(correlation_key)-1)))
|
|
|
|
|
|
|
|
|
- logging.debug("子进程-%d 执行任务结束,耗时:%f" % (pid, (time() - start_time)))
|
|
|
+ logging.info("子进程-%d 执行任务结束,耗时:%f" % (pid, (time() - start_time)))
|
|
|
|
|
|
return {
|
|
|
"agg_result": agg_result,
|
|
|
@@ -196,17 +196,12 @@ def main_process():
|
|
|
# 任务数量
|
|
|
per_task_num = 100
|
|
|
|
|
|
- # 处理进度保存间隔(单位:秒)
|
|
|
- save_process_internal = 300
|
|
|
-
|
|
|
# 划分子任务:任务进度记录、任务列表
|
|
|
process_record, tasks = avg_split_task(total_task, per_task_num)
|
|
|
|
|
|
with ProcessPoolExecutor(max_workers=process_num) as process_pool, \
|
|
|
open(config.AGG_FILE, "a", encoding=config.ENCODING_CHARSET) as f:
|
|
|
|
|
|
- save_start_time = time()
|
|
|
-
|
|
|
logging.info("主进程:提交任务到子进程")
|
|
|
process_futures = [process_pool.submit(sub_process, task[0], task[1]) for task in tasks]
|
|
|
|
|
|
@@ -216,7 +211,7 @@ def main_process():
|
|
|
|
|
|
# 记录处理进度
|
|
|
cur_pos = result["start_pos"]
|
|
|
- process_record[cur_pos]=1
|
|
|
+ process_record[cur_pos//per_task_num]=1
|
|
|
|
|
|
# 保存分析结果
|
|
|
if result:
|
|
|
@@ -227,11 +222,7 @@ def main_process():
|
|
|
f.write("%s\n" % key)
|
|
|
|
|
|
# 保存处理进度
|
|
|
- if (time() - save_start_time) > save_process_internal:
|
|
|
- logging.debug("保存处理进度")
|
|
|
- # 更新开始时间
|
|
|
- save_start_time = time()
|
|
|
- tools.save_obj(config.ANALYSE_PROCESS_CACHE, process_record)
|
|
|
+ tools.save_obj(config.ANALYSE_PROCESS_CACHE, process_record)
|
|
|
|
|
|
tools.tip(total_task, cur_pos)
|
|
|
|