瀏覽代碼

feat:优化调用方式为链式调用

ChenYL 2 年之前
父節點
當前提交
68de1b2aa1

+ 1 - 0
.gitignore

@@ -165,3 +165,4 @@ cython_debug/
 tmp/
 !src/tmp
 data/
+*.jar

+ 30 - 37
src/agg.py

@@ -1,6 +1,7 @@
 # -*- coding:utf-8 -*-
 import math
 import os
+import subprocess
 import time
 import zipfile
 from concurrent.futures import ProcessPoolExecutor, as_completed
@@ -31,7 +32,9 @@ WORD_AGG_RESULT_TEMP_FILE = "长尾词_聚合结果_临时.txt"
 WORD_AGG_RESULT_FILE = "长尾词_聚合结果.txt"
 
 # 文件夹:历史聚合数据归档文件夹
-AGG_ARCHIVE_DIR = "长尾词聚合分析_%s"
+WORD_AGG_DIR = "长尾词聚合分析_%s"
+
+jieba.setLogLevel(jieba.logging.INFO)
 
 
 def agg_word(path: str):
@@ -50,10 +53,11 @@ def agg_word(path: str):
     if os.path.isdir(path):
         files = os.listdir(path)
         for file in files:
+            file_path = os.path.join(path, file)
             if file.endswith(COMPRESS_FILE_SUFFIX):
-                zip_files.append(file)
+                zip_files.append(file_path)
             elif file.endswith(WORD_FILE_SUFFIX):
-                txt_files.append(file)
+                txt_files.append(file_path)
     elif path.endswith(COMPRESS_FILE_SUFFIX):
         zip_files.append(path)
     elif path.endswith(WORD_FILE_SUFFIX):
@@ -64,10 +68,8 @@ def agg_word(path: str):
         return
 
     # 创建分析结果文件夹路径
-    data_path = path
-    if os.path.isfile(data_path):
-        data_path = os.path.join(data_path[0:data_path.rindex("\\") + 1],
-                                 AGG_ARCHIVE_DIR % time.strftime('%Y%m%d%H%M%S'))
+    data_path = os.path.join(path[0:path.rindex("\\") + 1] if os.path.isfile(path) else path,
+                             WORD_AGG_DIR % time.strftime('%Y%m%d%H%M%S'))
     os.makedirs(data_path)
     print("创建聚合分析结果文件夹,路径:" + data_path)
 
@@ -77,36 +79,32 @@ def agg_word(path: str):
         start_pos = 1
 
     chains = [
-        ("5118关键词压缩文件提取数据", extract_word_from_5118),
-        ("合并长尾词", merge_word),
-        ("长尾词分词和建立倒排索引", word_split_and_reverse_index),
-        ("调用java聚合处理程序", agg_process),
-        ("对聚合后的文件内容进行排序重写", sort_file_content)
+        ("5118关键词压缩文件提取数据", extract_word_from_5118, True, zip_files),
+        ("合并长尾词", merge_word, True, txt_files),
+        ("长尾词分词和建立倒排索引", word_split_and_reverse_index, False),
+        ("调用java聚合处理程序", agg_process, False),
+        ("对聚合后的文件内容进行排序重写", sort_file_content, False)
     ]
 
-    chains = chains[start_pos:-1]
+    chains = chains[start_pos:]
     chains_len = len(chains)
     for i, chain in enumerate(chains, start=1):
         print("步骤(%s/%s):%s 开始..." % (i, chains_len, chain[0]))
-        is_success = chain[1](data_path)
+        is_success = chain[1](data_path, chain[3]) if chain[2] else chain[1](data_path)
         if not is_success:
             print("执行异常结束执行!")
             return
 
+    print("长尾词聚合程序执行完成!")
+
 
-def extract_word_from_5118(data_path: str, **params):
+def extract_word_from_5118(data_path: str, zip_files: list):
     """
     从5118关键词压缩文件中提取数据
     :param data_path: 分析结果文件夹路径
+    :param zip_files: 待解压缩列表
     :return: None
     """
-
-    # 获取压缩文件列表
-    zip_files = []
-    for file in os.listdir(data_path):
-        if file.endswith(COMPRESS_FILE_SUFFIX):
-            zip_files.append(os.path.join(data_path, file))
-
     for i, file in enumerate(zip_files):
         z_file = zipfile.ZipFile(file)
         filenames = z_file.namelist()
@@ -140,26 +138,21 @@ def extract_word_from_5118(data_path: str, **params):
     return True
 
 
-def merge_word(data_path: str, **params):
+def merge_word(data_path: str, txt_files: list):
     """
     合并长尾词(带去重)
     :param data_path: 分析结果文件夹路径
+    :param txt_files: 待合并文件列表
     :return:
     """
-    # 获取文件列表
-    file_list = []
-    for file in os.listdir(data_path):
-        if file.endswith(WORD_FILE_SUFFIX):
-            file_list.append(os.path.join(data_path, file))
-
     # 长尾词集合容器
     word_set = set()
 
     # 读取数据并排重
-    for i, file in enumerate(file_list):
+    for i, file in enumerate(txt_files):
         with open(file, "r", encoding="utf-8") as f:
-            for word in f:
-                word_set.add(word.replace("\n", ""))
+            for line in f:
+                word_set.add(utils.remove_line_break(line))
 
     # 保存合并结果
     with open(os.path.join(data_path, WORD_FILE), "w", encoding="utf-8") as f:
@@ -176,7 +169,6 @@ def word_split_and_reverse_index(data_path: str):
     :param data_path: 数据存放路径
     :return:
     """
-
     # 判断文件是否存在
     file = os.path.join(data_path, WORD_FILE)
     if os.path.exists(file) and not os.path.isfile(file):
@@ -304,9 +296,11 @@ def agg_process(data_path: str):
     """
     调用java聚合处理程序
     :param data_path: 分析结果文件夹路径
-    :return:
+    :return: True-运行正常 False-运行失败
     """
-    return True
+    cmds = ["java", "-jar", "./resources/money-mining-1.0-jar-with-dependencies.jar", "agg", data_path]
+    return_code = subprocess.run(cmds).returncode
+    return 0 == return_code
 
 
 def sort_file_content(data_path: str):
@@ -315,10 +309,9 @@ def sort_file_content(data_path: str):
     :param data_path: 分析结果文件夹路径
     :return:
     """
-
     # 构造源文件路径
     src_path = os.path.join(data_path, WORD_AGG_RESULT_TEMP_FILE)
-    if os.path.exists(src_path) and not os.path.isfile(src_path):
+    if not os.path.exists(src_path) or not os.path.isfile(src_path):
         print("文件不存在! " + src_path)
         return False
 

+ 1 - 1
src/mining.py

@@ -1,7 +1,7 @@
 # -*- coding:utf-8 -*-
 import sys
 
-from agg import agg_word
+from agg import agg_word, agg_process
 
 
 def main(args: list):

+ 0 - 0
src/conf/stopwords/baidu_stopwords.txt → src/resources/stopwords/baidu_stopwords.txt


+ 0 - 0
src/conf/stopwords/cn_stopwords.txt → src/resources/stopwords/cn_stopwords.txt


+ 0 - 0
src/conf/stopwords/hit_stopwords.txt → src/resources/stopwords/hit_stopwords.txt


+ 0 - 0
src/conf/stopwords/scu_stopwords.txt → src/resources/stopwords/scu_stopwords.txt


+ 0 - 0
src/conf/stopwords/停用词.txt → src/resources/stopwords/停用词.txt


+ 1 - 1
src/utils.py

@@ -4,7 +4,7 @@ import os
 import pickle
 
 # 停用词存放文件夹
-STOP_WORD_DIR = "./conf/stopwords"
+STOP_WORD_DIR = "./resources/stopwords"
 
 # 临时文件路径
 TEMP_PATH = "../tmp"