Browse Source

优化代码

ChenGanBin 3 năm trước cách đây
mục cha
commit
7895701c0c
10 tập tin đã thay đổi với 361 bổ sung808 xóa
  1. 21 15
      cut.py
  2. 125 30
      key.py
  3. 17 26
      key_index.py
  4. 103 371
      key_reverse.py
  5. 0 6
      key_reverse_index.py
  6. 0 218
      key_reverse_statistics.py
  7. 1 1
      logging.conf
  8. 48 35
      merge.py
  9. 0 65
      stop_word.py
  10. 46 41
      tools.py

+ 21 - 15
cut.py

@@ -9,10 +9,12 @@ import logging.config
 
 from stop_word import load_stop_word
 
-TITLE = "分词处理"
 
 # 待处理的数据文件
-DATA_FILE = "E:\Download\怎么长尾词_1655561719.csv"
+INPUT_FILE = "E:\Download\怎么长尾词_1655561719.csv"
+
+# 处理输出文件
+OUTPUT_FILE = "E:\Download\长尾关键词\怎么长尾词_分词统计.csv"
 
 def cut_word_and_statistics(data):
 
@@ -63,35 +65,39 @@ def cut_word_and_statistics(data):
 
     return sorted_key_list
 
-def main():
-
-    # 日志初始化
-    tools.init_log()
-
-    tools.log_start_msg(TITLE)
+def main(orig_file, dest_file):
 
-    if not os.path.exists(DATA_FILE):
-        logging.warning("待处理的数据文件不存在:%s" % DATA_FILE)
+    if not os.path.exists(orig_file):
+        logging.warning("待处理的数据文件不存在:%s" % orig_file)
         return
 
     # 读取数据
-    logging.info("正在读取待处理的数据文件:%s" % DATA_FILE)
+    logging.info("正在读取待处理的数据文件:%s" % orig_file)
     lines = None
-    with open(DATA_FILE, "r", encoding=config.ENCODING_CHARSET) as f:
+    with open(orig_file, "r", encoding=config.ENCODING_CHARSET) as f:
         lines = f.readlines()
     
     # 执行分词和词频统计(跳过前两行)
     word_root_list = cut_word_and_statistics(lines[2:])
 
     # 导出数据
-    logging.info("正在导出分词数据,位置:%s" % config.CUT_FILE)
-    with open(config.CUT_FILE, "w", encoding=config.ENCODING_CHARSET) as f:
+    logging.info("正在导出分词数据,位置:%s" % dest_file)
+    with open(dest_file, "w", encoding=config.ENCODING_CHARSET) as f:
         for key, count in word_root_list:
             f.write("%s,%d\n" % (key, count))
 
-    tools.log_end_msg(TITLE)
+    
 
 
 if __name__ == '__main__':
+    TITLE = "分词处理"
+
+    # 日志初始化
+    tools.init_log()
+
+    tools.log_start_msg(TITLE)
+
     main()
 
+    tools.log_end_msg(TITLE)
+

+ 125 - 30
key.py

@@ -1,52 +1,147 @@
 # -*- coding:utf-8 -*-
 
+from concurrent.futures import ProcessPoolExecutor, as_completed
 import logging
+import os
+from time import time
 import config
 import tools
 import jieba
-import datetime
 import mmap
 
-TITLE = "关键词表 生成"
+# 优化
+# 1. 更改为使用多进程
 
+# 日志配置初始化
+tools.init_log()
 
-def main():
+def sub_process(start_pos, end_pos, stop_word):
+    """
+    子进程
+    """
+    pid = os.getpid()
+
+    logging.debug("子进程-%d 开始执行分词任务,开始位置:%d,结束位置:%d" % (pid, start_pos, end_pos))
+
+    # 临时容器
+    tmp_list = []
+
+    # 开始时间
+    start_time = time()
     
-    # 日志配置初始化
-    tools.init_log()
-    tools.log_start_msg(TITLE)
+    with open(config.MERGE_FILE, "r", encoding=config.ENCODING_CHARSET) as f, \
+        mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
 
-    with open(config.MERGE_FILE, "r", encoding=config.ENCODING_CHARSET) as fmerge, \
-        open(config.KEY_FILE, "w", encoding=config.ENCODING_CHARSET) as fw, \
-        mmap.mmap(fmerge.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
+        fmmap.seek(start_pos)
 
-        # TODO 
-        # 这里可能有IO优化的余地
-        # 这里可以不用mmap,改用一条一条readline()进行读取
-        # 进度提示也不完整
+        while True:
+            # 越界检测
+            cur_pos = fmmap.tell()
+            if cur_pos >= end_pos:
+                break
 
-            count = -1
-            total_num = fmmap.size()
+            # 读取关键词
+            key = fmmap.readline().decode("UTF-8").replace("\r","").replace("\n","")
 
-            while True:
-                count = count + 1
-                # 读取关键词
-                word = fmmap.readline().decode("UTF-8").replace("\r","").replace("\n","")
+            # 读取不到任何内容结束执行
+            if not key :
+                continue
+                
+            # 分词
+            tmp_stems = list(jieba.cut_for_search(key))
 
-                # 读取不到任何内容结束执行
-                if not word :
-                    break
+            # 排除停用词
+            stems = set()
+            for stem in tmp_stems:
+                if stem in stop_word:
+                    continue
+                stems.add(stem)
                 
-                # 分词
-                word_root = list(jieba.cut_for_search(word))
+            # 以防止词根数为0
+            if len(stems) == 0:
+                continue
+        
+            tmp_list.append((key , list(stems)))
+    
+    logging.debug("子进程-%d 执行分词任务结束,耗时:%f" % (pid, (time() - start_time)))
+    
+    return tmp_list
+
+def main_process():
+    """
+    主进程
+    """
+
+    # 进程池数
+    process_num = 4
+
+    # 任务分割大小
+    split_num = 500000
+
+    # 位置信息索引
+    pos_index = []
+
+    # 总关键词数量
+    total_num = 0
 
-                # 写入文件中
-                fw.write("%d,%s,%s\n"%(count,word,word_root))
+    # 加载停用词
+    stop_word = tools.load_stop_word()
 
-                # 进度提示
-                tools.tip(total_num, fmmap.tell(), False)
+    start_time = time()
 
-    tools.log_end_msg(TITLE)
+    # 记录位置信息
+    logging.info("主进程 开始构建位置索引信息")
+    with open(config.MERGE_FILE, "r", encoding=config.ENCODING_CHARSET) as f, \
+        mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
+
+        while True:
+            # 获取当前位置
+            cur_pos = fmmap.tell()
+            # 移动到一下行
+            line = fmmap.readline()
+            # 结束检测
+            if not line:
+                break
+            # 记录
+            pos_index.append(cur_pos)
+
+        # 计算总关键词数量
+        total_num = len(pos_index)
+
+    # 划分子任务
+    logging.info("主进程 开始划分子任务")
+    tasks = tools.avg_split_task(total_num, split_num)
+
+    with ProcessPoolExecutor(process_num) as process_pool, \
+        open(config.KEY_FILE, "w", encoding=config.ENCODING_CHARSET) as f_key:
+
+        logging.info("主进程 提交任务到子进程")
+        process_futures = [process_pool.submit(sub_process, pos_index[task[0]], pos_index[task[1]], stop_word) for task in tasks]
+
+        # 移除无效变量 以防占用内存
+        del pos_index
+        del tasks
+
+        # 序号计算
+        count = -1 
+        for p_future in as_completed(process_futures):
+            result = p_future.result()
+            if result:
+                for key, stems in result:
+                    count = count + 1
+                    # 写入文件中
+                    f_key.write("%d,%s,%s\n"%(count, key, list(stems)))
+            
+            # 移除无效变量 以防占用内存
+            process_futures.remove(p_future)
+    
+    logging.info("主进程 构建KEY表耗时:%f" % (time() - start_time))
 
 if __name__ == '__main__':
-    main()
+
+    TITLE = "关键词表 生成"
+    tools.log_start_msg(TITLE)
+
+    main_process()
+
+    tools.log_end_msg(TITLE)

+ 17 - 26
key_index.py

@@ -3,17 +3,12 @@
 import config
 import tools
 import mmap
-import logging
 
-TITLE = "关键词索引"
 
 def main():
-    # 日志配置初始化
-    tools.init_log()
-    tools.log_start_msg(TITLE)
-
-    # 关键词索引容器
-    key_index_cache = {}
+    
+    # 关键词索引容器,
+    key_index = []
 
     with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as fkey, \
         mmap.mmap(fkey.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
@@ -25,34 +20,30 @@ def main():
             # 读取光标位置
             cur_pos = fmmap.tell()
             # 把光标移动到下一行
-            line = fmmap.readline().decode(config.ENCODING_CHARSET)
+            line = fmmap.readline()
             # 如果没有数据则结束
             if not line :
                 break
-            
-            # 获取关键词序号
-            index = line.index(",")
 
-            # 建立关键词序号和位置的关系
-            key_index_cache[line[:index]]=cur_pos
-            
+            # 建立关键词序号和位置的关系,以索引当行号(0基)
+            key_index.append(cur_pos)
+
             # 进度显示
             tools.tip_in_size(total_num, cur_pos)
         
         # 保存索引
-        tools.save_obj(config.KEY_INDEX_CACHE, key_index_cache)
+        tools.save_obj(config.KEY_INDEX_CACHE, key_index)
 
-    tools.log_end_msg(TITLE)
-    
 
 if __name__ == '__main__':
-    main()
 
-    # key_index_cache = tools.load_obj(config.KEY_INDEX_CACHE)
+    TITLE = "关键词索引"
+
+    # 日志配置初始化
+    tools.init_log()
+    tools.log_start_msg(TITLE)
 
-    # with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as fkey, \
-    #     mmap.mmap(fkey.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
-    #         for key,value in key_index_cache.items():
-    #             fmmap.seek(value)
-    #             line = fmmap.readline().decode(config.ENCODING_CHARSET)
-    #             logging.debug("key: %s, value: %d, 内容:%s" % (key, value, line))
+    main()
+    
+    tools.log_end_msg(TITLE)
+    

+ 103 - 371
key_reverse.py

@@ -1,427 +1,159 @@
 # -*- coding:utf-8 -*-
 
-from asyncio import FIRST_COMPLETED
-from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed, wait
-from multiprocessing import Manager
-import multiprocessing
-from queue import Queue
-import sys
+from concurrent.futures import ProcessPoolExecutor, as_completed
+import math
 from time import time
 import os
-from unittest import result
 import config
 import tools
-import ast
 import re
-import math
-import stop_word
 import logging
 import mmap
-import threading
-
-TITLE = "生成关键词倒排和统计信息"
-
-def thread_handle(path, start_pos, end_pos):
-    pattern = re.compile(config.KEY_RE_PATTERAN, re.I)
-    t = threading.current_thread()
-    print("线城-%d 开始任务,开始位置:%d,结束位置:%d" % (t.ident, start_pos, end_pos))
-    # 临时容器
-    key_reverse = []
-    with open(path, "r", encoding=config.ENCODING_CHARSET) as fkey, \
-        mmap.mmap(fkey.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
-        # 移动到开始位置
-        fmmap.seek(start_pos)
-
-        while True:
-            # 获取当前处理位置
-            cur_pos = fmmap.tell()
-
-            # 越界检查
-            if cur_pos > end_pos:
-                break
-
-            line = fmmap.readline()
-            key_reverse.append(line)
-            # 暂留
-            # # 读取关键词数据
-            # line = fmmap.readline().decode("UTF-8")
 
-            # # 如果到行尾则结束
-            # if not line:
-            #     break
 
-            # # 提取数据
-            # m = pattern.match(line)
-            # # 获取关键词序号、词根
-            # index = m.group(1)
-            # key_root = m.group(3)
+tools.init_log()
 
-            # # 转换成真正的list对象
-            # key_root = ast.literal_eval(key_root)
+if __name__ != "__main__":
 
-            # key_reverse.append((index, key_root))
-    # print("线城-%d 结束任务" % t.ident)
-    # return key_reverse
-    return {
-        "tid": t.ident,
-        "key_reverse": key_reverse
-    }
+    # 正则提取
+    # 倒排表 索引
+    index_re = r"(\d+),"
+    index_pattern = re.compile(index_re, re.I)
 
+    # KEY表 词根
+    stem_re = r"'([^,]*)'"
+    stem_pattern = re.compile(stem_re, re.I)
 
-
-def process_handle(stop_word_cache, lines):
-    pid = os.getpid()
-    print("进程-%d 接收到数据,开始计算")
-    pattern = re.compile(config.KEY_RE_PATTERAN, re.I)
-
-    key_reverse_dict={}
-
-    for line in lines:
-        line = line.decode("UTF-8")
-        # 提取数据
-        m = pattern.match(line)
-        # 获取关键词序号、词根
-        index = m.group(1)
-        key_root = m.group(3)
-
-        # 转换成真正的list对象
-        for item in ast.literal_eval(key_root):
-            # 排除停用词
-            if item in stop_word_cache:
-                continue
-            # 构建倒排表和统计数据量
-            val = key_reverse_dict.get(item)
-            if val:
-                key_reverse_dict[item]["count"] = key_reverse_dict[item]["count"] + 1
-                key_reverse_dict[item]["indexs"].append(index) 
-            else:
-                key_reverse_dict[item]= {
-                    "count":1,
-                    "indexs":[]
-                }
-    return {
-        "pid": pid,
-        "key_reverse_dict":key_reverse_dict
-    }
-
-def multi_thread():
+def sub_process(start_pos, end_pos):
     """
-    构建待排表(多线程)
+    子进程
     """
+    pid = os.getpid()
 
-    key_reverse_dict = {}
-
-    def process_handle_result(future):
-        print("进程-%d 执行结束,返回部分数据 合并数据" % result["pid"])
-        result = future.result()
-        for key,value in result["key_reverse_dict"].items():
-            val = key_reverse_dict.get(key)
-            if val:
-                key_reverse_dict[key]["count"] = key_reverse_dict[key]["count"] + value["count"]
-                key_reverse_dict[key]["indexs"].append(value["indexs"]) 
-            else:
-                key_reverse_dict[key]= value
-        print("进程-%d 合并数据结束" % result["pid"])
-        
-
-    def thread_handle_result(future):
-        result = future.result()
-        logging.info("线程-%d 执行结束,返回部分数据,提交至计算进程" % result["tid"])
-        process_future = process_pool.submit(process_handle, stop_word_cache, result["key_reverse"])
-        process_future.add_done_callback(process_handle_result)
-
-    tools.init_log()
-    tools.log_start_msg(TITLE)
-
-    logging.info("目前执行的是多线程版")
-
-    logging.info("开始执行初始化")
-    # 提取规则
-    pattern = re.compile(config.KEY_RE_PATTERAN, re.I)
-    # 停用表
-    stop_word_cache = stop_word.load_stop_word()
-    # 进程处理数
-    thread_num = 1
-    process_num = 2
-    worker_num = 100
-    # 关键表索引
-    key_index_cache = tools.load_obj(config.KEY_INDEX_CACHE)
-
-    logging.info("开始对数据进行分段计算")
-
-    # 对索引文件中的元素进行平分
-    # 转成列表,计算总长 和 平分后的处理区间
-    key_index_list = [key for key in key_index_cache.keys()]
-    total_len = len(key_index_list)
-    internal = math.ceil(total_len / worker_num )
-
-    # 利用 缓存索引文件 生成处理区间的位置信息
-    # 位置信息容器
-    pos_list = []
-    for i in range(worker_num + 1):
-        # 计算平分点在列表中的位置
-        l_pos = i * internal
-        # 如果大于等于列表大小需要额外处理
-        if l_pos >= total_len:
-            l_pos = total_len -1
-        # 获取列表中的词根
-        key_index = key_index_list[l_pos:l_pos+1]
-        # 根据词根获取位置信息
-        pos = key_index_cache[key_index[0]]
-        # 记录位置信息
-        pos_list.append(pos)
-   
-    logging.info("把分段结果提交至多线程执行")
-    # 生成任务
-    with ThreadPoolExecutor(thread_num) as thread_pool, \
-        ProcessPoolExecutor(process_num) as process_pool:
-
-        # thread_futures = []
-
-        for i in range(0, len(pos_list)-1):
-            pos = pos_list[i: i+2]
-            # thread_futures.append()
-            thread_future = thread_pool.submit(thread_handle, config.KEY_FILE, pos[0], pos[1])
-            thread_future.add_done_callback(thread_handle_result)
-
-        # 等待数据返回
-        logging.info("等待多线程执行结束")
-        thread_pool.shutdown(wait=True)
-        process_pool.shutdown(wait=True)
-        
-
+    logging.debug("进程-%d 开始执行任务,开始位置:%d,结束位置:%d" % (pid, start_pos, end_pos))
 
-            
-                
-    
-    logging.info("已获取全部子进程返回部分结果,总数据量:%d" % len(key_reverse_dict))
-    return 
-    # 根据关键词数量进行排序,这里通过items()方法转成元组列表,才能进行排序
-    logging.info("根据关键词数量进行倒序排列")
-    sorted_reverse_list = sorted(key_reverse_dict.items(), key=lambda x: x[1]["count"], reverse=True)
+    # 开始时间
+    start_time = time()
 
-    # 保存到本地文件
-    logging.info("保存到本地")
-    with open(config.KEY_REVERSE_FILE, "w", encoding=config.ENCODING_CHARSET) as f:
-        for key, value in sorted_reverse_list:
-            f.write("%s,%d,%s\n" % (key, value["count"], value["indexs"]))
+    # 倒排表和统计信息容器
+    reverse_dict = {}
 
-    tools.log_end_msg(TITLE)
-
-def handle(path, start_pos, end_pos, share_pattern, stop_word_cache):
-    pid = os.getpid()
-    print("进程-%d 开始任务,开始位置:%d,结束位置:%d" % (pid, start_pos, end_pos))
-    # 临时容器
-    key_reverse = {}
-    # 提取数据用的正则
-    pattern = share_pattern.value
-    with open(path, "r", encoding=config.ENCODING_CHARSET) as fkey, \
-        mmap.mmap(fkey.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
+    with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as f_key, \
+        mmap.mmap(f_key.fileno(), 0, access=mmap.ACCESS_READ) as f_mmap:
         # 移动到开始位置
-        fmmap.seek(start_pos)
+        f_mmap.seek(start_pos)
 
         while True:
             # 获取当前处理位置
-            cur_pos = fmmap.tell()
+            cur_pos = f_mmap.tell()
 
             # 越界检查
-            if cur_pos > end_pos:
-                break
-
-            # 读取关键词数据
-            line = fmmap.readline().decode("UTF-8")
-
-            # 如果到行尾则结束
-            if not line:
+            if cur_pos >= end_pos:
                 break
 
             # 提取数据
-            m = pattern.match(line)
+            line = f_mmap.readline().decode(config.ENCODING_CHARSET)
+            m = index_pattern.match(line)
             # 获取关键词序号、词根
             index = m.group(1)
-            key_root = m.group(3)
-
-            # 转换成真正的list对象
-            for item in ast.literal_eval(key_root):
-                # 排除停用词
-                if item in stop_word_cache:
-                    continue
-                # 构建倒排表和统计数据量
-                val = key_reverse.get(item)
-                if val:
-                    count = key_reverse[item]["count"]
-                    key_reverse[item]["count"] = count + 1
-                    key_reverse[item]["indexs"].append(index) 
+            stems = stem_pattern.findall(line)
+
+            # 构建倒排表和统计数据量
+            for stem in stems:
+                obj = reverse_dict.get(stem)
+                if obj:
+                    obj["count"] = obj["count"] + 1
+                    obj["indexs"].add(index) 
                 else:
-                    key_reverse[item]= {
-                        "count":1,
-                        "indexs":[]
+                    tmp_indexs = set()
+                    tmp_indexs.add(index)
+                    reverse_dict[stem]= {
+                        "count": 1,
+                        "indexs": tmp_indexs
                     }
     
-    print("进程-%d 任务执行结束" % pid)
+    logging.debug("子进程-%d 任务结束,耗时:%f" % (pid, (time() - start_time)))
 
-    return key_reverse
-
-def multi_process():
-    """
-    构建待排表(多进程)
-    """
-
-    tools.init_log()
-    tools.log_start_msg(TITLE)
-
-    logging.info("目前执行的是多进程版")
-
-    logging.info("开始执行初始化")
-    manager = Manager()
-    # 提取规则
-    share_pattern = manager.Value("pattern", re.compile(config.KEY_RE_PATTERAN, re.I))
-    # 停用表
-    stop_word_cache = manager.dict(stop_word.load_stop_word())
-    # 进程处理数
-    process_num = os.cpu_count() - 1
-    worker_num = process_num
-    pool = ProcessPoolExecutor(max_workers=process_num)
-    # 关键表索引
-    key_index_cache = tools.load_obj(config.KEY_INDEX_CACHE)
-
-    logging.info("开始对数据进行分段计算")
-
-    # 对索引文件中的元素进行平分
-    # 转成列表,计算总长 和 平分后的处理区间
-    key_index_list = [key for key in key_index_cache.keys()]
-    total_len = len(key_index_list)
-    internal = math.ceil(total_len / worker_num )
-
-    # 利用 缓存索引文件 生成处理区间的位置信息
-    # 位置信息容器
-    pos_list = []
-    for i in range(worker_num + 1):
-        # 计算平分点在列表中的位置
-        l_pos = i * internal
-        # 如果大于等于列表大小需要额外处理
-        if l_pos >= total_len:
-            l_pos = total_len -1
-        # 获取列表中的词根
-        key_index = key_index_list[l_pos:l_pos+1]
-        # 根据词根获取位置信息
-        pos = key_index_cache[key_index[0]]
-        # 记录位置信息
-        pos_list.append(pos)
-   
-    logging.info("把分段结果提交至多进程执行")
-    # 生成任务
-    process_futures = []
-    for i in range(0, len(pos_list)-1):
-        pos = pos_list[i: i+2]
-        process_futures.append(pool.submit(handle, config.KEY_FILE, pos[0], pos[1], share_pattern, stop_word_cache))
-
-    # 等待数据返回
-    logging.info("等待多进程执行结束")
-    wait(process_futures,return_when=FIRST_COMPLETED)
-    key_reverse_dict = {}
-    for future in as_completed(process_futures):
-        logging.info("有子进程执行结束,返回部分结果")
-        result = future.result()
-        for key, value in result.items():
-            # 进行数据合并
-            val_dict = key_reverse_dict.get(key)
-            if val_dict:
-                count = val_dict["count"]
-                key_reverse_dict[key]["count"] = count + value["count"]
-                key_reverse_dict[key]["indexs"].extend(value["indexs"])
-            else:
-                key_reverse_dict[key] = value
-
-    logging.info("已获取全部子进程返回部分结果")
+    return reverse_dict
     
-    # 根据关键词数量进行排序,这里通过items()方法转成元组列表,才能进行排序
-    logging.info("根据关键词数量进行倒序排列")
-    sorted_reverse_list = sorted(key_reverse_dict.items(), key=lambda x: x[1]["count"], reverse=True)
 
-    # 保存到本地文件
-    logging.info("保存到本地")
-    with open(config.KEY_REVERSE_FILE, "w", encoding=config.ENCODING_CHARSET) as f:
-        for key, value in sorted_reverse_list:
-            f.write("%s,%d,%s\n" % (key, value["count"], value["indexs"]))
+def main_process():
+    
+    logging.info("主进程 开始执行初始化")
+    
+    # 进程处理数
+    process_num = 4
 
-    tools.log_end_msg(TITLE)
+    # 关键表索引
+    key_index = tools.load_obj(config.KEY_INDEX_CACHE)
 
-def one_process():
-    """
-    构建倒排表(单进程版)
-    """
+    # 开始时间
+    start_time = time()
 
-    tools.init_log()
-    tools.log_start_msg(TITLE)
+    # 关键词总数
+    total_num = len(key_index)
 
-    logging.info("目前执行的是单进程版")
+    # 任务分割大小
+    split_num = math.ceil(total_num/process_num)
 
-    # 提取规则
-    pattern = re.compile(config.KEY_RE_PATTERAN, re.I)
+    logging.info("主进程 开始划分子任务")
+    tasks = tools.avg_split_task(total_num, split_num)
+    
+    with ProcessPoolExecutor(process_num) as process_pool:
 
-    # 倒排表 容器
-    key_reverse = {}
+        logging.info("主进程 提交任务到子进程")
+        process_futures = [process_pool.submit(sub_process, key_index[task[0]], key_index[task[1]]) for task in tasks]
 
-    # 停用表
-    stop_word_cache = stop_word.load_stop_word()
+        # 移除无效变量 以防占用内存
+        del tasks
+        del key_index
 
-    with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as fkey:
+         # 倒排表和统计信息容器
+        reverse_dict = {}
         
-        # 获取文件总大小,获取后需要复原光标位置
-        fkey.seek(0, os.SEEK_END)
-        total_num = fkey.tell()
-        fkey.seek(0)
-
-        while True:
-            # 获取当前处理位置
-            cur_pos = fkey.tell()
+        # 进行数据合并
+        for p_future in as_completed(process_futures):
+            result = p_future.result()
+            for key, val_obj in result.items():
+                reverse_obj = reverse_dict.get(key)
+                if reverse_obj:
+                    reverse_obj["count"] = reverse_obj["count"] + val_obj["count"]
+                    reverse_obj["indexs"] = reverse_obj["indexs"] | val_obj["indexs"]
+                else:
+                    reverse_dict[key] = val_obj
             
-            # 进度提示
-            tools.tip_in_size(total_num, cur_pos)
+            # 移除无效变量 以防占用内存
+            process_futures.remove(p_future)
+        
+        logging.info("主进程 已获取全部子进程返回结果,总数据量:%d" % len(reverse_dict))
+    
+    logging.info("主进程 对词根关联的索引进行排序和转换")
+    for val_obj in reverse_dict.values():
+        val_obj["indexs"] = list(val_obj["indexs"])
+        val_obj["indexs"].sort()
 
-            # 读取关键词数据
-            line = fkey.readline()
+    # 根据关键词数量进行排序,这里通过items()方法转成元组列表,才能进行排序
+    logging.info("主进程 根据关键词数量进行排列")
+    sorted_reverse_list = sorted(reverse_dict.items(), key=lambda x: x[1]["count"], reverse=True)
 
-            # 如果到行尾则结束
-            if not line:
-                break
+    # 保存到本地文件
+    logging.info("主进程 保存到本地")
+    with open(config.KEY_REVERSE_FILE, "w", encoding=config.ENCODING_CHARSET) as f_reverse, \
+        open(config.KEY_REVERSE_STATISTICS_FILE, "w", encoding=config.ENCODING_CHARSET) as f_statistics:
+        for key, val_obj in sorted_reverse_list:
+            f_reverse.write("%s,%s\n" % (key, val_obj["indexs"]))
+            f_statistics.write("%s,%d\n" % (key, val_obj["count"]))
+    
+    logging.info("主进程 构建倒排索引耗时:%f" % (time() - start_time))
 
-            # 提取数据
-            m = pattern.match(line)
-            # 获取关键词序号
-            index = m.group(1)
-            # 获取词根
-            key_root = m.group(3)
-            # 转换成真正的list对象
-            for item in ast.literal_eval(key_root):
-                
-                # 排除停用词
-                if item in stop_word_cache:
-                    continue
-
-                # 构建倒排表
-                val = key_reverse.get(item)
-                if val:
-                    count = key_reverse[item]["count"]
-                    key_reverse[item]["count"] = count + 1
-                    key_reverse[item]["indexs"].append(index) 
-                else:
-                    key_reverse[item]= {
-                        "count":1,
-                        "indexs":[]
-                    }
 
-    logging.info("根据关键词数量进行倒序排列")
-    sorted_reverse_list = sorted(key_reverse.items(), key=lambda x: x[1]["count"], reverse=True)
+if __name__ == "__main__":
+    
+    TITLE = "生成关键词倒排和统计信息"
+    tools.log_start_msg(TITLE)
 
-    # 保存到本地文件
-    logging.info("保存到本地")
-    with open(config.KEY_REVERSE_FILE, "w", encoding=config.ENCODING_CHARSET) as f:
-        for key, value in sorted_reverse_list:
-            f.write("%s,%d,%s\n" % (key, value["count"], value["indexs"]))
+    main_process()
 
     tools.log_end_msg(TITLE)
 
-if __name__ == "__main__":
-    multi_thread()
+   

+ 0 - 6
key_reverse_index.py

@@ -1,13 +1,7 @@
 # -*- coding:utf-8 -*-
 
-import sys
-from time import time
-import os
 import config
 import tools
-import ast
-import re
-import stop_word
 import mmap
 
 TITLE = "关键词倒排索引"

+ 0 - 218
key_reverse_statistics.py

@@ -1,218 +0,0 @@
-# -*- coding:utf-8 -*-
-
-from concurrent.futures import ProcessPoolExecutor, as_completed
-import mmap
-from multiprocessing.connection import wait
-import random
-import sys
-from time import sleep, time
-import os
-import config
-import tools
-import ast
-import re
-import stop_word
-import logging
-import math
-from multiprocessing import Process, Pool
-
-TITLE = "关键词倒排文件 统计"
-
-# def reverse_statistics(start_pos, end_pos):
-
-def handle(start_pos, end_pos):
-
-    print("进程:%d, 统计开始,开始位置:%d,结束位置:%d" % (os.getpid(), start_pos, end_pos))
-
-    # 统计信息容器
-    reverse_statistics = {}
-    
-    with open(config.KEY_REVERSE_FILE, "r", encoding=config.ENCODING_CHARSET) as fr, \
-        mmap.mmap(fr.fileno(), 0 , access=mmap.ACCESS_READ) as fmmap:
-        # 调整开始位置
-        fmmap.seek(start_pos)
-
-        while True:
-            cur_pos = fmmap.tell()
-            # 越界检测
-            if cur_pos >= end_pos:
-                break
-            
-            line = fmmap.readline().decode(config.ENCODING_CHARSET)
-            index=line.index(",")
-            key = line[:index]
-            word_root = line[index+1:]
-            word_root = ast.literal_eval(word_root)
-            l = len(word_root)
-            
-            reverse_statistics[key]=l
-            
-    logging.info("进程:%d, 统计结束" % os.getpid())
-
-    return {
-        "pid":os.getpid(),
-        "statistics":reverse_statistics
-    }
-    
-
-def main2():
-    # 日志信息配置
-    tools.init_log()
-    tools.log_start_msg(TITLE)
-
-    # 进程数
-    process_num = os.cpu_count()
-
-    # 加载缓存索引文件
-    key_reverse_index = tools.load_obj(config.KEY_REVERSE_INDEX_CACHE)
-
-    # 对索引文件中的元素进行平分
-
-    # 转成列表,计算总长 和 平分后的处理区间
-    key_list = [key for key in key_reverse_index.keys()]
-    key_list_len = len(key_list)
-    internal = math.ceil(key_list_len / process_num )
-
-    # 利用 缓存索引文件 生成处理区间的位置信息
-    # 位置信息容器
-    pos_list = []
-    for i in range(process_num + 1):
-        # 计算平分点在列表中的位置
-        l_pos = i * internal
-        # 如果超过列表大小需要额外处理
-        if l_pos > key_list_len:
-            l_pos = key_list_len -1
-        # 获取列表中的词根
-        key = key_list[l_pos:l_pos+1]
-        # 根据词根获取位置信息
-        pos = key_reverse_index[key[0]]
-        # 记录位置信息
-        pos_list.append(pos)
-
-     
-    # 使用用进程池
-    pool = ProcessPoolExecutor(process_num)
-    # 生成任务
-    process_futures = []
-    for i in range(0, len(pos_list)-1):
-        pos = pos_list[i: i+2]
-        process_futures.append(pool.submit(handle, pos[0], pos[1]))
-    
-    # with open(config.KEY_REVERSE_STATISTICS_FILE, "w", encoding=config.ENCODING_CHARSET) as fw:
-    #     for future in as_completed(process_futures):
-    #         logging.info("部分子任务统计结束,保存至本地 - 开始")
-    #         for key, value in future.result().items():
-    #             fw.write("%s,%s\n"%(key,value))
-    #         logging.info("部分子任务统计结束,保存至本地 - 结束")
-
-    
-    results = []
-    for future in as_completed(process_futures):
-        result = future.result()
-        logging.info("进程:%d, 统计结束" % result["pid"])
-        results.append(result)
-
-    logging.info("统计结束,保存至本地 - 开始")
-    with open(config.KEY_REVERSE_STATISTICS_FILE, "w", encoding=config.ENCODING_CHARSET) as fw:
-        for r in results:
-            for key, value in r["statistics"].items():
-                fw.write("%s,%s\n"%(key,value))
-    logging.info("部分子任务统计结束,保存至本地 - 结束")
-
-    pool.shutdown(wait=True)
-
-    tools.log_end_msg(TITLE)
-
-    # 测试代码3
-    # pool = ProcessPoolExecutor(3)
-    # for i in range(1,5):
-    #     pool.submit(handle, "测试进程-%d"%i, i, i*10)
-
-    # pool.shutdown(wait=True)
-
-    # 测试代码2
-    # pool = Pool(3)
-    # for i in range(1,5):
-    #     pool.apply_async(handle, ("测试进程-%d"%i, i, i*10))
-    # pool.close()
-    # pool.join()
-    # print("结束")
-
-    # 测试代码1
-    # p = Process(target=handle, args=('测试进程', 1, 10))
-    # p.start()
-    # p.join()
-    
-    # tools.init_log()
-    # tools.log_start_msg(TITLE)
-
-    # key_reverse_index = tools.load_obj(config.KEY_REVERSE_INDEX_CACHE)
-
-    # tmp = [key for key in key_reverse_index.keys()]
-    
-    # l = len(tmp)
-    # print("总长:", l)
-    # internal = math.ceil(l / 4)
-    # print("间隔:", internal)
-    # pos = []
-    # for i in range(5):
-    #     t = i*internal
-    #     if t > l:
-    #         t = l-1
-    #     pos.append(t)
-    # print(pos)
-
-    # for item in pos:
-    #     key = tmp[item:item+1]
-    #     print(key)
-    #     pos = key_reverse_index[key[0]]
-    #     print(key, pos)
-
-    
-    # reverse_statistics = {}
-    # logging.info("统计开始")
-    # with open(config.KEY_REVERSE_FILE, "r", encoding=config.ENCODING_CHARSET) as fr, \
-    #     mmap.mmap(fr.fileno(), 0 , access=mmap.ACCESS_READ) as fmmap:
-    #     for line in fr:
-    #         index=line.index(",")
-    #         key = line[:index]
-    #         word_root = line[index+1:]
-    #         word_root = ast.literal_eval(word_root)
-    #         l = len(word_root)
-            
-    #         reverse_statistics[key]=l
-
-    # logging.info("统计结束,保存至本地")
-    # with open(config.KEY_REVERSE_STATISTICS_FILE, "w", encoding=config.ENCODING_CHARSET) as fw:
-    #     for key, value in reverse_statistics:
-    #         fw.write("%s,%s\n"%(key,value))
-
-    # tools.log_end_msg(TITLE)
-
-
-def main():
-    tools.init_log()
-    tools.log_start_msg(TITLE)
-    
-    reverse_statistics = {}
-    logging.info("统计开始")
-    with open(config.KEY_REVERSE_FILE, "r", encoding=config.ENCODING_CHARSET) as fr, \
-        mmap.mmap(fr.fileno(), 0 , access=mmap.ACCESS_READ) as fmmap:
-        for line in fr:
-            index=line.index(",")
-            key = line[:index]
-            word_root = line[index+1:]
-            word_root = ast.literal_eval(word_root)
-            l = len(word_root)
-            
-            reverse_statistics[key]=l
-
-    logging.info("统计结束,保存至本地")
-    with open(config.KEY_REVERSE_STATISTICS_FILE, "w", encoding=config.ENCODING_CHARSET) as fw:
-        for key, value in reverse_statistics:
-            fw.write("%s,%s\n"%(key,value))
-
-    tools.log_end_msg(TITLE)
-
-if __name__ == "__main__":
-    main2()

+ 1 - 1
logging.conf

@@ -8,7 +8,7 @@ keys=fileHandler,consoleHandler
 keys=simpleFormatter
 
 [logger_root]
-level=INFO
+level=DEBUG
 handlers=fileHandler,consoleHandler
 
 [handler_consoleHandler]

+ 48 - 35
merge.py

@@ -6,10 +6,9 @@ import tools
 import logging
 import zipfile
 
-TITLE= "拓展词合并"
 
-# 合并的文件目录
-DATA_DIR = "E:\Download\长尾关键词\普通-p"
+# 合并的文件目录
+DATA_DIR = "E:\Download\长尾关键词\长尾关键词-什么\普通-p"
 
 def get_files(path):
     '''
@@ -36,12 +35,6 @@ def merge_file_content():
         跳过压缩文件中的文件
     ----------
     """
-
-    # 日志初始化
-    tools.init_log()
-
-    tools.log_start_msg(TITLE)
-
     # 获取文件列表
     files = get_files(DATA_DIR)
 
@@ -49,36 +42,56 @@ def merge_file_content():
     total_num = len(files)
     logging.info("待处理文件数:%d" % total_num)
 
-    with open(config.MERGE_FILE, "w", encoding="utf-8") as f:
+    # 排重过滤
+    repeat_set = set()
 
-        for i, file in enumerate(files):
-            zfile = zipfile.ZipFile(file)
-            filenames = zfile.namelist()
-            for filename in filenames:
+    # 关键词排重前总数
+    total_count=0
 
-                # 重新编码文件名为正确形式
-                realname = filename.encode('cp437').decode('gbk')
-                
-                # 排除无效文件
-                if realname in config.MERGE_EXCLUDE_FILES:
-                    continue
-
-                logging.info("正在处理文件: %s" % realname)
-
-                # 读取压缩文件中的文件
-                with zfile.open(filename) as file_content:
-                    lines = file_content.readlines()
-                    # 跳过开头两行
-                    for line in lines[2:]:
-                        split = line.decode("gbk").split(",")
-                        # 只需要第一列的数据
-                        f.write(split[0])
-                        f.write("\n")
-                
-            tools.tip(total_num, i)
+    # 读取数据并进行排重
+    for i, file in enumerate(files):
+        zfile = zipfile.ZipFile(file)
+        filenames = zfile.namelist()
+        for filename in filenames:
 
-    tools.log_end_msg(TITLE)
+            # 重新编码文件名为正确形式
+            realname = filename.encode('cp437').decode('gbk')
+                
+            # 排除无效文件
+            if realname in config.MERGE_EXCLUDE_FILES:
+                continue
+
+            logging.info("正在处理文件: %s" % realname)
+
+            # 读取压缩文件中的文件
+            with zfile.open(filename) as file_content:
+                lines = file_content.readlines()
+                # 跳过开头两行
+                for line in lines[2:]:
+                    split = line.decode("gbk").split(",")
+                    # 只需要第一列的数据
+                    repeat_set.add(split[0])
+                    # 记录次数
+                    total_count = total_count + 1
+        
+        tools.tip(total_num, i)
+    
+    logging.info("正在保存合并结果,文件位置:%s,排重前数据量:%d,排重后数据量:%d" % (config.MERGE_FILE, total_count, len(repeat_set)))
+    with open(config.MERGE_FILE, "w", encoding="utf-8") as f:
+        for item in repeat_set:
+            f.write(item)
+            f.write("\n")
+    
 
 if __name__ == '__main__':
+
+    TITLE= "拓展词合并"
+
+    # 日志初始化
+    tools.init_log()
+    tools.log_start_msg(TITLE)
+
     merge_file_content()
+
+    tools.log_end_msg(TITLE)
         

+ 0 - 65
stop_word.py

@@ -1,65 +0,0 @@
-# -*- coding:utf-8 -*-
-
-from datetime import datetime
-import os
-import time
-import tools
-import config
-import pickle
-import logging
-
-TITLE = "停用词"
-
-def load_stop_word():
-    """
-    加载停用词
-    """
-
-    # 判断是否存在缓存
-    if os.path.exists(config.STOP_WORD_CACHE):
-        logging.debug("存在停用词缓存")
-        return tools.load_obj(config.STOP_WORD_CACHE)
-
-    logging.debug("正在构建停用词缓存")
-
-    # 停用词容器
-    stop_word = []
-
-    # 构建停用词列表
-    stop_word_files = os.listdir(config.STOP_WORD_DIR)
-    for file in stop_word_files:
-        stop_word_file = os.path.join(config.STOP_WORD_DIR, file)
-        with open(stop_word_file, encoding=config.ENCODING_CHARSET) as f:
-            for item in f:
-                # 移除换行符
-                stop_word.append(item.replace("\n",""))
-    # 去重
-    stop_word = list(set(stop_word))
-
-    # 把list改成dict提升检索速度
-    stop_word_dict = {}
-    for item in stop_word:
-        stop_word_dict[item]=None
-    
-    logging.debug("把停用词缓存保存到本地")
-
-    # 保存本地作为缓存
-    tools.save_obj(config.STOP_WORD_CACHE, stop_word_dict)
-    
-    return stop_word_dict
-
-if __name__ == '__main__':
-
-    tools.init_log()
-    tools.log_start_msg(TITLE)
-
-    stop_word = load_stop_word()
-
-    start = time.time()
-    for i in range(1400*10000):
-        for item in ["总之", "风雨无阻","千"]:
-            item in stop_word
-    end = time.time()
-    print("耗时:", end - start)
-
-    tools.log_end_msg(TITLE)

+ 46 - 41
tools.py

@@ -116,9 +116,6 @@ def tip_in_size(total_size, cur_pos):
         # 更新 提示检查点
         tip_internal["check_point"] = check_point
 
-
-    
-
 def save_obj(path, obj):
     """
     保存对象至本地
@@ -133,52 +130,60 @@ def load_obj(path):
     with open(path, "rb") as f:
         return pickle.load(f)
 
-if __name__ == "__main__":
-
-    init_log()
-
-    log_start_msg(TITLE)
-
-    # 测试普通提示
-    # total = 3
-    # for i in range(total):
-    #     tip(total, i)
-
-    # 测试mmap的提示
-    # with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as fkey, \
-    #     mmap.mmap(fkey.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
-        
-    #     # 总大小
-    #     total_num = fmmap.size()
+def load_stop_word():
+    """
+    加载停用词
+    """
 
-    #     while True:
-    #         # 读取光标位置
-    #         cur_pos = fmmap.tell()
-    #         # 把光标移动到下一行
-    #         line = fmmap.readline()
+    # 判断是否存在缓存
+    if os.path.exists(config.STOP_WORD_CACHE):
+        logging.debug("存在停用词缓存")
+        return load_obj(config.STOP_WORD_CACHE)
 
-    #         # 进度显示
-    #         tip_in_size(total_num, cur_pos)
+    logging.debug("正在构建停用词缓存")
 
-    #         if not line:
-    #             break
+    # 停用词容器
+    stop_word = set()
 
-    # 测试逐行读取的进度提示
-    with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as fkey:
+    # 构建停用词列表
+    stop_word_files = os.listdir(config.STOP_WORD_DIR)
+    for file in stop_word_files:
+        stop_word_file = os.path.join(config.STOP_WORD_DIR, file)
+        with open(stop_word_file, encoding=config.ENCODING_CHARSET) as f:
+            for item in f:
+                # 移除换行符
+                stop_word.add(item.replace("\n","").replace("\r", ""))
 
-        fkey.seek(0, os.SEEK_END)
-        total_num = fkey.tell()
-        fkey.seek(0)
+    # 改成dict提升检索速度
+    stop_word_dict = {}
+    for item in stop_word:
+        stop_word_dict[item]=None
+    
+    logging.debug("把停用词缓存保存到本地")
 
-        while True:
+    # 保存本地作为缓存
+    save_obj(config.STOP_WORD_CACHE, stop_word_dict)
+    
+    return stop_word_dict
 
-            cur_pos = fkey.tell()
 
-            line = fkey.readline()
+def avg_split_task(total:int, split_internal:int):
+    """
+    平分任务
+    """
 
-            tip_in_size(total_num, cur_pos)
+    # 分割的任务份数
+    split_num = math.ceil(total / split_internal)
 
-            if not line:
-                break;
+    # 平分
+    tasks = []
+    for i in range(split_num):
+        # 计算平分点在列表中的位置
+        start_pos = i * split_internal
+        end_pos = i * split_internal + split_internal
+        # 如果超过列表大小需要额外处理
+        if end_pos >= total:
+            end_pos = -1
+        tasks.append([start_pos,end_pos])
     
-    log_end_msg(TITLE)
+    return tasks