Browse Source

Merge branch 'newbranch2'

# Conflicts:
#	.gitignore
ChenYL 2 years ago
parent
commit
311ae0ab1a
18 changed files with 721 additions and 687 deletions
  1. 1 0
      .gitignore
  2. 4 0
      REMEAD.md
  3. 2 3
      agg_word.py
  4. 139 0
      analyse.py
  5. 0 73
      cal.py
  6. 0 1
      config.py
  7. 22 18
      cut.py
  8. 135 0
      filter.py
  9. 125 30
      key.py
  10. 22 27
      key_index.py
  11. 103 371
      key_reverse.py
  12. 8 9
      key_reverse_index.py
  13. 1 1
      logging.conf
  14. 48 35
      merge.py
  15. 58 0
      split.py
  16. 1 13
      statistics.py
  17. 0 65
      stop_word.py
  18. 52 41
      tools.py

+ 1 - 0
.gitignore

@@ -1,4 +1,5 @@
 __pycache__/
+data/
 data/tmp/*.pkl
 data/tmp/*.txt
 data/tmp/*.csv

+ 4 - 0
REMEAD.md

@@ -1,3 +1,7 @@
+# 待办日志
+1. 链式调用
+2. 聚合结果分析
+
 # 处理步骤
 
 1. 从5118下载泛词(csv文件)

+ 2 - 3
agg_word.py

@@ -6,12 +6,11 @@ from itertools import combinations
 import math
 import mmap
 import os
-from time import sleep, time
+from time import time
 from cal import cal_cos_sim
 
 import config
 import tools
-import stop_word
 import re
 import logging
 
@@ -36,7 +35,7 @@ def intesect(x, y):
 
 if __name__ != "__main__":
     # 停用词
-    stop_word_index = stop_word.load_stop_word()
+    stop_word_index = tools.load_stop_word()
 
     # KEY表索引
     key_index = tools.load_obj(config.KEY_INDEX_CACHE)

+ 139 - 0
analyse.py

@@ -0,0 +1,139 @@
+# -*- coding:utf-8 -*-
+
+import re
+import mmap
+import tools
+import jieba
+
+def transfer_str(num):
+    msg = None
+    if num >= 10000:
+        msg = "%d万%d" % (num//10000, num%10000)
+    else:
+        msg = str(num)
+    return msg
+
+def cal(list):
+    list_len = len(list)
+    list_count = sum(list)
+    sum_msg = transfer_str(list_len)
+    count_msg = transfer_str(list_count)
+    avg_msg = transfer_str(int(list_count/list_len))
+    return sum_msg, count_msg, avg_msg
+
+def tip(condition, list):
+    print("条件:%s - 涉及:%s个词根,涉及词数:%s,平均约:%s 词数/词根" % ((condition,)+ cal(list)))
+
+def keyStat(fmap: mmap.mmap, keyword:str):
+    fmap.seek(0)
+    pattern = re.compile(keyword)
+    stopWord = tools.load_stop_word()
+    totalSize = fmap.size()
+    
+    statDict = {}
+    while True:
+        curPos = fmap.tell();
+        if curPos >= totalSize:
+            break
+
+        lineContent = f_mmap.readline().decode("UTF-8")
+        tmpList = pattern.findall(lineContent)
+        if tmpList:
+            cutList = list(jieba.cut_for_search(lineContent.replace("\r","").replace("\n","")))
+            for cutKeyword in cutList:
+                if cutKeyword in stopWord:
+                    continue
+
+                count = statDict.get(cutKeyword)
+                if count:
+                    statDict[cutKeyword]=count+1
+                else:
+                    statDict[cutKeyword]=1
+    
+    sorted_key_list = sorted(statDict.items(), key=lambda x: x[1], reverse=True)
+
+    print("与关键词:%s 相关的词共计:%d" % (keyword, len(sorted_key_list)))
+
+    count_list = [ele for ele in statDict.values()]
+
+    tip("等于1", [val for val in count_list if val == 1])
+
+    tip("大于1小于100", [val for val in count_list if val > 1 and val < 100])
+
+    tip("大于等于100小于200", [val for val in count_list if val >= 100 and val < 200])
+
+    tip("大于等于200小于300", [val for val in count_list if val >= 200 and val < 300])
+
+    tip("大于等于300小于400", [val for val in count_list if val >= 300 and val < 400])
+
+    tip("大于等于400小于500", [val for val in count_list if val >= 400 and val < 500])
+        
+    tip("大于等于500小于1000", [val for val in count_list if val >= 500 and val < 1000])
+
+    tip("大于等于1000小于5000", [val for val in count_list if val >= 1000 and val < 5000])
+
+    tip("大于等于5000小于1万", [val for val in count_list if val >= 5000 and val < 10000])
+
+    tip("大于等于1万小于5万", [val for val in count_list if val >= 10000 and val < 50000])
+
+    tip("大于等于5万小于10万", [val for val in count_list if val >= 50000 and val < 100000])
+
+    tip("大于等于10万", [val for val in count_list if val >= 100000])
+
+    with open("./data/test/stat_%s.csv" % keyword, "w", encoding="UTF-8") as fw:
+        for key, count in sorted_key_list:
+            if count > 1:
+                fw.write("%s,%d\n" % (key, count))
+
+def keyFilter(fmap: mmap.mmap, keyword:str):
+    fmap.seek(0)
+    pattern = re.compile(keyword)
+    
+    totalSize = fmap.size()
+    
+    with open("./data/test/filter_%s.csv" % keyword, "w", encoding="UTF-8") as fw:
+        while True:
+            curPos = fmap.tell();
+            if curPos >= totalSize:
+                break
+
+            lineContent = f_mmap.readline().decode("UTF-8")
+            tmpList = pattern.findall(lineContent)
+            if tmpList:
+                fw.write("%s\n"%lineContent.replace("\r","").replace("\n",""))
+            
+
+def countKeyword(fmap: mmap.mmap, keywords:set):
+    for keyword in keywords:
+        f_mmap.seek(0)
+        pattern = re.compile(keyword)
+    
+        count=0
+    
+        while True:
+            lineContent = f_mmap.readline().decode("UTF-8")
+            if not lineContent:
+                break
+            
+            tmpList = pattern.findall(lineContent)
+            if tmpList:
+                count += 1
+        
+        print("关键词:%s,共出现次数:%d" % (keyword, count))
+
+
+INPUT_FILE = "./data/tmp/merge.csv"
+
+with open(INPUT_FILE, "r", encoding="UTF-8") as f, \
+    mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as f_mmap:
+
+    filterSet = set();
+    with open("./data/过滤名单.txt", "r", encoding="UTF-8") as f_filter:
+        while True:
+            lineContent = f_filter.readline().replace("\n","").replace("\r","")
+            if not lineContent:
+                break
+            
+            filterSet.add(lineContent)
+    
+    countKeyword(f_mmap, filterSet)

+ 0 - 73
cal.py

@@ -41,76 +41,3 @@ def cal_cos_sim(a_word:str, a_stem:list, b_word:str, b_stem:list):
     val = col_sim(np.array(a_vec), np.array(b_vec))
     return val
 
-
-if __name__ == "__main__":
-
-    # a_word= "QQ邮箱格式怎么写"
-    # b_word= "QQ邮箱格式如何写"
-    # a_word_root = ['QQ', '邮箱', '格式', '怎么', '写']
-    # b_word_root = ['QQ', '邮箱', '格式', '如何', '写']
-    # print(cal_cos_sim(a_word, a_word_root, b_word, b_word_root))
-
-     # 合并词根,用于生成词向量
-    # union_word_root = merge_stem(a_word_root, b_word_root)
-    # print(union_word_root)
-
-    # # 生成词向量
-    # a_vec, b_vec = gen_word_vec(a_word, b_word, union_word_root)
-    # print(a_vec)
-    # print(b_vec)
-    # # a_vec = [1,1,1,1,0,1]
-    # # b_vec = [1,1,1,0,1,1]
-    # print(col_sim(np.array(a_vec), np.array(b_vec)))
-
-    # s = "0,腋下长了一个小疙瘩是什么东西,['腋下', '长', '了', '一个', '小', '疙瘩', '是', '什么', '东西']"
-    # s_r = r"'([^,]*)'"
-    # pattern = re.compile(s_r, re.I)
-    # for i in pattern.findall(s):
-    #     print(i)
-    
-    # s_r = r"([\d]*),(.*),\["
-    # pattern = re.compile(s_r, re.I)
-    # m = pattern.match(s)
-    # for i in m.groups():
-    #     print(i)
-
-    # import mmap
-    # with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as f_key, \
-    #     mmap.mmap(f_key.fileno(), 0, access=mmap.ACCESS_READ) as f_key_mmap:
-
-    #     key_info_re = r"([\d]*),(.*),\["
-    #     key_info_pattern = re.compile(key_info_re, re.I)
-
-    #     s_r = r"'([^,]*)'"
-    #     s_pattern = re.compile(s_r, re.I)
-
-    #     a_line = f_key_mmap.readline().decode("UTF-8")
-    #     b_line = f_key_mmap.readline().decode("UTF-8")
-
-    #     a_m = key_info_pattern.match(a_line)
-    #     a_key = a_m.group(2)
-    #     a_stem = s_pattern.findall(a_line)
-    #     print(a_stem)
-
-    #     b_m = key_info_pattern.match(b_line)
-    #     b_key = b_m.group(2)
-    #     b_stem = s_pattern.findall(b_line)
-    #     print(b_stem)
-
-    #     print(cal_cos_sim(a_key, a_stem, b_key, b_stem))
-
-    # a_key = "吃什么东西减肥最快"
-    # a_stem = ['吃', '什么', '东西', '减肥', '最快']
-
-    # b_key="vc++读写什么文件最快"
-    # b_stem =['v', 'c++', '读写', '什么', '文件', '最快']
-    # print(cal_cos_sim(a_key, a_stem, b_key, b_stem))
-
-    # print(re.findall("c\\+\\+", "vc++读写什么文件最快"))
-    
-    
-    print("".join([".", "?", "^", "$", "*", "+", "\\", "[", "]", "|", "{", "}", "(", ")"]))
-    # s = r"([.?^$*+\[]|{}()])"
-    s=r"([\\])"
-    re.findall(s, "vc++读写什么文件最快")
-    print(re.findall(s, "vc++读写什么文件\最快"))

+ 0 - 1
config.py

@@ -52,7 +52,6 @@ ANALYSE_BITMAP_CACHE = "./data/cache/analyse_bitmap.pkl"
 ANALYSE_PROCESS_CACHE = "./data/cache/analyse_process.pkl"
 
 # 正则表达式中需要额外处理的特殊符号
-# RE_SPECIAL_SIMBOL = "'.', '?', '^', '$', '*', '+', '\\', '[', ']', '|', '{', '}', '(', ')"
 RE_SPECIAL_SIMBOL = [".", "?", "^", "$", "*", "+", "\\", "[", "]", "|", "{", "}", "(", ")"]
 
 # 百分比进度提示

+ 22 - 18
cut.py

@@ -7,12 +7,12 @@ import jieba
 import logging
 import logging.config
 
-from stop_word import load_stop_word
-
-TITLE = "分词处理"
 
 # 待处理的数据文件
-DATA_FILE = "E:\Download\怎么长尾词_1655561719.csv"
+INPUT_FILE = "E:\Download\怎么长尾词_1655561719.csv"
+
+# 处理输出文件
+OUTPUT_FILE = "E:\Download\长尾关键词\怎么长尾词_分词统计.csv"
 
 def cut_word_and_statistics(data):
 
@@ -25,7 +25,7 @@ def cut_word_and_statistics(data):
     # 分词结果容器
     key_dict = {}
     # 停用词
-    stop_word = load_stop_word()
+    stop_word = tools.load_stop_word()
     # 待处理数据总数量
     total_num = len(data)
 
@@ -63,35 +63,39 @@ def cut_word_and_statistics(data):
 
     return sorted_key_list
 
-def main():
-
-    # 日志初始化
-    tools.init_log()
+def main(orig_file, dest_file):
 
-    tools.log_start_msg(TITLE)
-
-    if not os.path.exists(DATA_FILE):
-        logging.warning("待处理的数据文件不存在:%s" % DATA_FILE)
+    if not os.path.exists(orig_file):
+        logging.warning("待处理的数据文件不存在:%s" % orig_file)
         return
 
     # 读取数据
-    logging.info("正在读取待处理的数据文件:%s" % DATA_FILE)
+    logging.info("正在读取待处理的数据文件:%s" % orig_file)
     lines = None
-    with open(DATA_FILE, "r", encoding=config.ENCODING_CHARSET) as f:
+    with open(orig_file, "r", encoding=config.ENCODING_CHARSET) as f:
         lines = f.readlines()
     
     # 执行分词和词频统计(跳过前两行)
     word_root_list = cut_word_and_statistics(lines[2:])
 
     # 导出数据
-    logging.info("正在导出分词数据,位置:%s" % config.CUT_FILE)
-    with open(config.CUT_FILE, "w", encoding=config.ENCODING_CHARSET) as f:
+    logging.info("正在导出分词数据,位置:%s" % dest_file)
+    with open(dest_file, "w", encoding=config.ENCODING_CHARSET) as f:
         for key, count in word_root_list:
             f.write("%s,%d\n" % (key, count))
 
-    tools.log_end_msg(TITLE)
+    
 
 
 if __name__ == '__main__':
+    TITLE = "分词处理"
+
+    # 日志初始化
+    tools.init_log()
+
+    tools.log_start_msg(TITLE)
+
     main()
 
+    tools.log_end_msg(TITLE)
+

+ 135 - 0
filter.py

@@ -0,0 +1,135 @@
+# -*-: coding:utf-8 -*-
+
+import csv
+import re
+
+def filter3():
+    INPUT_DATA = r"./data/agg_filter3.csv"
+    OUTPUT_TEMP = "./data/agg_filter4.csv"
+
+    startPattern = re.compile("######开始######")
+    keyPattern = re.compile("赚钱")
+
+    total = []
+    sub = None
+    with open(INPUT_DATA, "r", encoding="GBK") as fr,\
+        open(OUTPUT_TEMP, "w", encoding="GBK") as fw:
+
+        for line in fr.readlines():
+            
+            tl = startPattern.findall(line)
+            if len(tl) > 0:
+                sub = []
+                sub.append(line)
+                total.append(sub)
+            elif line.startswith("\n"):
+                continue
+            else:
+                kl = keyPattern.findall(line)
+                if len(kl)>0:
+                    sub.append(line)
+    
+        sortedList = sorted(total, key=lambda x:len(x), reverse=True)
+
+        fw.write("统计信息")
+        fw.write("%s%d\n" % ("总数:", len(sortedList)))
+        fw.write("%s%d\n" %("大于等于1000:", len([subList for subList in sortedList if len(subList)>=1000])))
+        fw.write("%s%d\n" %("大于等于500小于1000:", len([subList for subList in sortedList if len(subList)>=500 and len(subList) < 1000])))
+        fw.write("%s%d\n" %("大于等于100小于500:", len([subList for subList in sortedList if len(subList)>=100 and len(subList) < 500])))
+        fw.write("%s%d\n" %("大于等于50小于100:", len([subList for subList in sortedList if len(subList)>=50 and len(subList)<100])))
+        fw.write("%s%d\n" %("大于等于10小于50:", len([subList for subList in sortedList if len(subList)>=10 and len(subList)<50])))
+        fw.write("%s%d\n" %("大于等于5小于10:", len([subList for subList in sortedList if len(subList)>=5 and len(subList)<10])))
+        fw.write("%s%d\n" %("大于等于3小于5:", len([subList for subList in sortedList if len(subList)>=3 and len(subList)<5])))
+        fw.write("%s%d\n" %("等于2:", len([subList for subList in sortedList if len(subList)==2])))
+        fw.write("%s%d\n" %("等于1:", len([subList for subList in sortedList if len(subList)==1])))
+       
+        for subList in sortedList:
+            if len(subList) == 1:
+                continue
+
+            fw.write("\n")
+            for line in subList:
+                fw.write(line)
+
+def filter2():
+    INPUT_DATA = r"./data/agg_filter.csv"
+    OUTPUT_TEMP = "./data/agg_filter3.csv"
+
+    startPattern = re.compile("######开始######")
+
+    total = []
+    sub = None
+    with open(INPUT_DATA, "r", encoding="GBK") as fr,\
+        open(OUTPUT_TEMP, "w", encoding="GBK") as fw:
+
+        for line in fr.readlines():
+            
+            tl = startPattern.findall(line)
+            if len(tl) > 0:
+                sub = []
+                sub.append(line)
+                total.append(sub)
+            elif line.startswith("\n"):
+                continue
+            else:
+                sub.append(line)
+    
+        sortedList = sorted(total, key=lambda x:len(x), reverse=True)
+
+        fw.write("统计信息")
+        fw.write("%s%d\n" % ("总数:", len(sortedList)))
+        fw.write("%s%d\n" %("大于等于1000:", len([subList for subList in sortedList if len(subList)>=1000])))
+        fw.write("%s%d\n" %("大于等于500小于1000:", len([subList for subList in sortedList if len(subList)>=500 and len(subList) < 1000])))
+        fw.write("%s%d\n" %("大于等于100小于500:", len([subList for subList in sortedList if len(subList)>=100 and len(subList) < 500])))
+        fw.write("%s%d\n" %("大于等于50小于100:", len([subList for subList in sortedList if len(subList)>=50 and len(subList)<100])))
+        fw.write("%s%d\n" %("大于等于10小于50:", len([subList for subList in sortedList if len(subList)>=10 and len(subList)<50])))
+        fw.write("%s%d\n" %("大于等于5小于10:", len([subList for subList in sortedList if len(subList)>=5 and len(subList)<10])))
+        fw.write("%s%d\n" %("大于等于3小于5:", len([subList for subList in sortedList if len(subList)>=3 and len(subList)<5])))
+        fw.write("%s%d\n" %("等于2:", len([subList for subList in sortedList if len(subList)==2])))
+        fw.write("%s%d\n" %("等于1:", len([subList for subList in sortedList if len(subList)==1])))
+
+        for subList in sortedList:
+            if len(subList) == 1:
+                continue
+
+            fw.write("\n")
+            for line in subList:
+                fw.write(line)
+
+        
+
+def filter1():
+    # INPUT_DATA = r"E:\Documents\Code\LongTailKeyDataMining\agg.csv"
+    INPUT_DATA = r"./data/agg_filter.csv"
+    OUTPUT_TEMP = "./data/agg_filter2.csv"
+
+    filterPattern = []
+    with open("./data/过滤名单.txt", "r", encoding="UTF-8") as f_filter:
+        filterSet = set();
+        while True:
+            lineContent = f_filter.readline().replace("\n","").replace("\r","")
+            if not lineContent:
+                break
+                
+            filterSet.add(lineContent)
+
+        for r in filterSet:
+            filterPattern.append(re.compile(r))
+
+    with open(INPUT_DATA, "r", encoding="GBK") as fr,\
+        open(OUTPUT_TEMP, "w", encoding="GBK") as fw:
+
+        for line in fr.readlines():
+            writeFlag = True
+            for p in filterPattern:
+                l = p.findall(line)
+                if len(l) > 0:
+                    writeFlag = False
+                    break
+            
+            if writeFlag:
+                fw.write(line)
+
+if __name__ == '__main__':
+    filter3()
+

+ 125 - 30
key.py

@@ -1,52 +1,147 @@
 # -*- coding:utf-8 -*-
 
+from concurrent.futures import ProcessPoolExecutor, as_completed
 import logging
+import os
+from time import time
 import config
 import tools
 import jieba
-import datetime
 import mmap
 
-TITLE = "关键词表 生成"
+# 优化
+# 1. 更改为使用多进程
 
+# 日志配置初始化
+tools.init_log()
 
-def main():
+def sub_process(start_pos, end_pos, stop_word):
+    """
+    子进程
+    """
+    pid = os.getpid()
+
+    logging.debug("子进程-%d 开始执行分词任务,开始位置:%d,结束位置:%d" % (pid, start_pos, end_pos))
+
+    # 临时容器
+    tmp_list = []
+
+    # 开始时间
+    start_time = time()
     
-    # 日志配置初始化
-    tools.init_log()
-    tools.log_start_msg(TITLE)
+    with open(config.MERGE_FILE, "r", encoding=config.ENCODING_CHARSET) as f, \
+        mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
 
-    with open(config.MERGE_FILE, "r", encoding=config.ENCODING_CHARSET) as fmerge, \
-        open(config.KEY_FILE, "w", encoding=config.ENCODING_CHARSET) as fw, \
-        mmap.mmap(fmerge.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
+        fmmap.seek(start_pos)
 
-        # TODO 
-        # 这里可能有IO优化的余地
-        # 这里可以不用mmap,改用一条一条readline()进行读取
-        # 进度提示也不完整
+        while True:
+            # 越界检测
+            cur_pos = fmmap.tell()
+            if cur_pos >= end_pos:
+                break
 
-            count = -1
-            total_num = fmmap.size()
+            # 读取关键词
+            key = fmmap.readline().decode("UTF-8").replace("\r","").replace("\n","")
 
-            while True:
-                count = count + 1
-                # 读取关键词
-                word = fmmap.readline().decode("UTF-8").replace("\r","").replace("\n","")
+            # 读取不到任何内容结束执行
+            if not key :
+                continue
+                
+            # 分词
+            tmp_stems = list(jieba.cut_for_search(key))
 
-                # 读取不到任何内容结束执行
-                if not word :
-                    break
+            # 排除停用词
+            stems = set()
+            for stem in tmp_stems:
+                if stem in stop_word:
+                    continue
+                stems.add(stem)
                 
-                # 分词
-                word_root = list(jieba.cut_for_search(word))
+            # 以防止词根数为0
+            if len(stems) == 0:
+                continue
+        
+            tmp_list.append((key , list(stems)))
+    
+    logging.debug("子进程-%d 执行分词任务结束,耗时:%f" % (pid, (time() - start_time)))
+    
+    return tmp_list
+
+def main_process():
+    """
+    主进程
+    """
+
+    # 进程池数
+    process_num = 4
+
+    # 任务分割大小
+    split_num = 500000
+
+    # 位置信息索引
+    pos_index = []
+
+    # 总关键词数量
+    total_num = 0
 
-                # 写入文件中
-                fw.write("%d,%s,%s\n"%(count,word,word_root))
+    # 加载停用词
+    stop_word = tools.load_stop_word()
 
-                # 进度提示
-                tools.tip(total_num, fmmap.tell(), False)
+    start_time = time()
 
-    tools.log_end_msg(TITLE)
+    # 记录位置信息
+    logging.info("主进程 开始构建位置索引信息")
+    with open(config.MERGE_FILE, "r", encoding=config.ENCODING_CHARSET) as f, \
+        mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
+
+        while True:
+            # 获取当前位置
+            cur_pos = fmmap.tell()
+            # 移动到一下行
+            line = fmmap.readline()
+            # 结束检测
+            if not line:
+                break
+            # 记录
+            pos_index.append(cur_pos)
+
+        # 计算总关键词数量
+        total_num = len(pos_index)
+
+    # 划分子任务
+    logging.info("主进程 开始划分子任务")
+    tasks = tools.avg_split_task(total_num, split_num)
+
+    with ProcessPoolExecutor(process_num) as process_pool, \
+        open(config.KEY_FILE, "w", encoding=config.ENCODING_CHARSET) as f_key:
+
+        logging.info("主进程 提交任务到子进程")
+        process_futures = [process_pool.submit(sub_process, pos_index[task[0]], pos_index[task[1]], stop_word) for task in tasks]
+
+        # 移除无效变量 以防占用内存
+        del pos_index
+        del tasks
+
+        # 序号计算
+        count = -1 
+        for p_future in as_completed(process_futures):
+            result = p_future.result()
+            if result:
+                for key, stems in result:
+                    count = count + 1
+                    # 写入文件中
+                    f_key.write("%d,%s,%s\n"%(count, key, list(stems)))
+            
+            # 移除无效变量 以防占用内存
+            process_futures.remove(p_future)
+    
+    logging.info("主进程 构建KEY表耗时:%f" % (time() - start_time))
 
 if __name__ == '__main__':
-    main()
+
+    TITLE = "关键词表 生成"
+    tools.log_start_msg(TITLE)
+
+    main_process()
+
+    tools.log_end_msg(TITLE)

+ 22 - 27
key_index.py

@@ -3,17 +3,12 @@
 import config
 import tools
 import mmap
-import logging
 
-TITLE = "关键词索引"
 
 def main():
-    # 日志配置初始化
-    tools.init_log()
-    tools.log_start_msg(TITLE)
-
-    # 关键词索引容器
-    key_index_cache = {}
+    
+    # 关键词索引容器,
+    key_index = []
 
     with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as fkey, \
         mmap.mmap(fkey.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
@@ -25,34 +20,34 @@ def main():
             # 读取光标位置
             cur_pos = fmmap.tell()
             # 把光标移动到下一行
-            line = fmmap.readline().decode(config.ENCODING_CHARSET)
+            line = fmmap.readline()
             # 如果没有数据则结束
             if not line :
                 break
-            
-            # 获取关键词序号
-            index = line.index(",")
 
-            # 建立关键词序号和位置的关系
-            key_index_cache[line[:index]]=cur_pos
-            
+            # 建立关键词序号和位置的关系,以索引当行号(0基)
+            key_index.append(cur_pos)
+
             # 进度显示
             tools.tip_in_size(total_num, cur_pos)
         
-        # 保存索引
-        tools.save_obj(config.KEY_INDEX_CACHE, key_index_cache)
+    with open("./data/tmp/key_index_test.csv", "w", encoding=config.ENCODING_CHARSET) as f:
+        f.write(",".join([str(i) for i in key_index]))
+
+        
+    # 保存索引
+    # tools.save_obj(config.KEY_INDEX_CACHE, key_index)
 
-    tools.log_end_msg(TITLE)
-    
 
 if __name__ == '__main__':
-    main()
 
-    # key_index_cache = tools.load_obj(config.KEY_INDEX_CACHE)
+    TITLE = "关键词索引"
 
-    # with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as fkey, \
-    #     mmap.mmap(fkey.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
-    #         for key,value in key_index_cache.items():
-    #             fmmap.seek(value)
-    #             line = fmmap.readline().decode(config.ENCODING_CHARSET)
-    #             logging.debug("key: %s, value: %d, 内容:%s" % (key, value, line))
+    # 日志配置初始化
+    tools.init_log()
+    tools.log_start_msg(TITLE)
+
+    main()
+    
+    tools.log_end_msg(TITLE)
+    

+ 103 - 371
key_reverse.py

@@ -1,427 +1,159 @@
 # -*- coding:utf-8 -*-
 
-from asyncio import FIRST_COMPLETED
-from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed, wait
-from multiprocessing import Manager
-import multiprocessing
-from queue import Queue
-import sys
+from concurrent.futures import ProcessPoolExecutor, as_completed
+import math
 from time import time
 import os
-from unittest import result
 import config
 import tools
-import ast
 import re
-import math
-import stop_word
 import logging
 import mmap
-import threading
-
-TITLE = "生成关键词倒排和统计信息"
-
-def thread_handle(path, start_pos, end_pos):
-    pattern = re.compile(config.KEY_RE_PATTERAN, re.I)
-    t = threading.current_thread()
-    print("线城-%d 开始任务,开始位置:%d,结束位置:%d" % (t.ident, start_pos, end_pos))
-    # 临时容器
-    key_reverse = []
-    with open(path, "r", encoding=config.ENCODING_CHARSET) as fkey, \
-        mmap.mmap(fkey.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
-        # 移动到开始位置
-        fmmap.seek(start_pos)
-
-        while True:
-            # 获取当前处理位置
-            cur_pos = fmmap.tell()
-
-            # 越界检查
-            if cur_pos > end_pos:
-                break
-
-            line = fmmap.readline()
-            key_reverse.append(line)
-            # 暂留
-            # # 读取关键词数据
-            # line = fmmap.readline().decode("UTF-8")
 
-            # # 如果到行尾则结束
-            # if not line:
-            #     break
 
-            # # 提取数据
-            # m = pattern.match(line)
-            # # 获取关键词序号、词根
-            # index = m.group(1)
-            # key_root = m.group(3)
+tools.init_log()
 
-            # # 转换成真正的list对象
-            # key_root = ast.literal_eval(key_root)
+if __name__ != "__main__":
 
-            # key_reverse.append((index, key_root))
-    # print("线城-%d 结束任务" % t.ident)
-    # return key_reverse
-    return {
-        "tid": t.ident,
-        "key_reverse": key_reverse
-    }
+    # 正则提取
+    # 倒排表 索引
+    index_re = r"(\d+),"
+    index_pattern = re.compile(index_re, re.I)
 
+    # KEY表 词根
+    stem_re = r"'([^,]*)'"
+    stem_pattern = re.compile(stem_re, re.I)
 
-
-def process_handle(stop_word_cache, lines):
-    pid = os.getpid()
-    print("进程-%d 接收到数据,开始计算")
-    pattern = re.compile(config.KEY_RE_PATTERAN, re.I)
-
-    key_reverse_dict={}
-
-    for line in lines:
-        line = line.decode("UTF-8")
-        # 提取数据
-        m = pattern.match(line)
-        # 获取关键词序号、词根
-        index = m.group(1)
-        key_root = m.group(3)
-
-        # 转换成真正的list对象
-        for item in ast.literal_eval(key_root):
-            # 排除停用词
-            if item in stop_word_cache:
-                continue
-            # 构建倒排表和统计数据量
-            val = key_reverse_dict.get(item)
-            if val:
-                key_reverse_dict[item]["count"] = key_reverse_dict[item]["count"] + 1
-                key_reverse_dict[item]["indexs"].append(index) 
-            else:
-                key_reverse_dict[item]= {
-                    "count":1,
-                    "indexs":[]
-                }
-    return {
-        "pid": pid,
-        "key_reverse_dict":key_reverse_dict
-    }
-
-def multi_thread():
+def sub_process(start_pos, end_pos):
     """
-    构建待排表(多线程)
+    子进程
     """
+    pid = os.getpid()
 
-    key_reverse_dict = {}
-
-    def process_handle_result(future):
-        print("进程-%d 执行结束,返回部分数据 合并数据" % result["pid"])
-        result = future.result()
-        for key,value in result["key_reverse_dict"].items():
-            val = key_reverse_dict.get(key)
-            if val:
-                key_reverse_dict[key]["count"] = key_reverse_dict[key]["count"] + value["count"]
-                key_reverse_dict[key]["indexs"].append(value["indexs"]) 
-            else:
-                key_reverse_dict[key]= value
-        print("进程-%d 合并数据结束" % result["pid"])
-        
-
-    def thread_handle_result(future):
-        result = future.result()
-        logging.info("线程-%d 执行结束,返回部分数据,提交至计算进程" % result["tid"])
-        process_future = process_pool.submit(process_handle, stop_word_cache, result["key_reverse"])
-        process_future.add_done_callback(process_handle_result)
-
-    tools.init_log()
-    tools.log_start_msg(TITLE)
-
-    logging.info("目前执行的是多线程版")
-
-    logging.info("开始执行初始化")
-    # 提取规则
-    pattern = re.compile(config.KEY_RE_PATTERAN, re.I)
-    # 停用表
-    stop_word_cache = stop_word.load_stop_word()
-    # 进程处理数
-    thread_num = 1
-    process_num = 2
-    worker_num = 100
-    # 关键表索引
-    key_index_cache = tools.load_obj(config.KEY_INDEX_CACHE)
-
-    logging.info("开始对数据进行分段计算")
-
-    # 对索引文件中的元素进行平分
-    # 转成列表,计算总长 和 平分后的处理区间
-    key_index_list = [key for key in key_index_cache.keys()]
-    total_len = len(key_index_list)
-    internal = math.ceil(total_len / worker_num )
-
-    # 利用 缓存索引文件 生成处理区间的位置信息
-    # 位置信息容器
-    pos_list = []
-    for i in range(worker_num + 1):
-        # 计算平分点在列表中的位置
-        l_pos = i * internal
-        # 如果大于等于列表大小需要额外处理
-        if l_pos >= total_len:
-            l_pos = total_len -1
-        # 获取列表中的词根
-        key_index = key_index_list[l_pos:l_pos+1]
-        # 根据词根获取位置信息
-        pos = key_index_cache[key_index[0]]
-        # 记录位置信息
-        pos_list.append(pos)
-   
-    logging.info("把分段结果提交至多线程执行")
-    # 生成任务
-    with ThreadPoolExecutor(thread_num) as thread_pool, \
-        ProcessPoolExecutor(process_num) as process_pool:
-
-        # thread_futures = []
-
-        for i in range(0, len(pos_list)-1):
-            pos = pos_list[i: i+2]
-            # thread_futures.append()
-            thread_future = thread_pool.submit(thread_handle, config.KEY_FILE, pos[0], pos[1])
-            thread_future.add_done_callback(thread_handle_result)
-
-        # 等待数据返回
-        logging.info("等待多线程执行结束")
-        thread_pool.shutdown(wait=True)
-        process_pool.shutdown(wait=True)
-        
-
+    logging.debug("进程-%d 开始执行任务,开始位置:%d,结束位置:%d" % (pid, start_pos, end_pos))
 
-            
-                
-    
-    logging.info("已获取全部子进程返回部分结果,总数据量:%d" % len(key_reverse_dict))
-    return 
-    # 根据关键词数量进行排序,这里通过items()方法转成元组列表,才能进行排序
-    logging.info("根据关键词数量进行倒序排列")
-    sorted_reverse_list = sorted(key_reverse_dict.items(), key=lambda x: x[1]["count"], reverse=True)
+    # 开始时间
+    start_time = time()
 
-    # 保存到本地文件
-    logging.info("保存到本地")
-    with open(config.KEY_REVERSE_FILE, "w", encoding=config.ENCODING_CHARSET) as f:
-        for key, value in sorted_reverse_list:
-            f.write("%s,%d,%s\n" % (key, value["count"], value["indexs"]))
+    # 倒排表和统计信息容器
+    reverse_dict = {}
 
-    tools.log_end_msg(TITLE)
-
-def handle(path, start_pos, end_pos, share_pattern, stop_word_cache):
-    pid = os.getpid()
-    print("进程-%d 开始任务,开始位置:%d,结束位置:%d" % (pid, start_pos, end_pos))
-    # 临时容器
-    key_reverse = {}
-    # 提取数据用的正则
-    pattern = share_pattern.value
-    with open(path, "r", encoding=config.ENCODING_CHARSET) as fkey, \
-        mmap.mmap(fkey.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
+    with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as f_key, \
+        mmap.mmap(f_key.fileno(), 0, access=mmap.ACCESS_READ) as f_mmap:
         # 移动到开始位置
-        fmmap.seek(start_pos)
+        f_mmap.seek(start_pos)
 
         while True:
             # 获取当前处理位置
-            cur_pos = fmmap.tell()
+            cur_pos = f_mmap.tell()
 
             # 越界检查
-            if cur_pos > end_pos:
-                break
-
-            # 读取关键词数据
-            line = fmmap.readline().decode("UTF-8")
-
-            # 如果到行尾则结束
-            if not line:
+            if cur_pos >= end_pos:
                 break
 
             # 提取数据
-            m = pattern.match(line)
+            line = f_mmap.readline().decode(config.ENCODING_CHARSET)
+            m = index_pattern.match(line)
             # 获取关键词序号、词根
             index = m.group(1)
-            key_root = m.group(3)
-
-            # 转换成真正的list对象
-            for item in ast.literal_eval(key_root):
-                # 排除停用词
-                if item in stop_word_cache:
-                    continue
-                # 构建倒排表和统计数据量
-                val = key_reverse.get(item)
-                if val:
-                    count = key_reverse[item]["count"]
-                    key_reverse[item]["count"] = count + 1
-                    key_reverse[item]["indexs"].append(index) 
+            stems = stem_pattern.findall(line)
+
+            # 构建倒排表和统计数据量
+            for stem in stems:
+                obj = reverse_dict.get(stem)
+                if obj:
+                    obj["count"] = obj["count"] + 1
+                    obj["indexs"].add(index) 
                 else:
-                    key_reverse[item]= {
-                        "count":1,
-                        "indexs":[]
+                    tmp_indexs = set()
+                    tmp_indexs.add(index)
+                    reverse_dict[stem]= {
+                        "count": 1,
+                        "indexs": tmp_indexs
                     }
     
-    print("进程-%d 任务执行结束" % pid)
+    logging.debug("子进程-%d 任务结束,耗时:%f" % (pid, (time() - start_time)))
 
-    return key_reverse
-
-def multi_process():
-    """
-    构建待排表(多进程)
-    """
-
-    tools.init_log()
-    tools.log_start_msg(TITLE)
-
-    logging.info("目前执行的是多进程版")
-
-    logging.info("开始执行初始化")
-    manager = Manager()
-    # 提取规则
-    share_pattern = manager.Value("pattern", re.compile(config.KEY_RE_PATTERAN, re.I))
-    # 停用表
-    stop_word_cache = manager.dict(stop_word.load_stop_word())
-    # 进程处理数
-    process_num = os.cpu_count() - 1
-    worker_num = process_num
-    pool = ProcessPoolExecutor(max_workers=process_num)
-    # 关键表索引
-    key_index_cache = tools.load_obj(config.KEY_INDEX_CACHE)
-
-    logging.info("开始对数据进行分段计算")
-
-    # 对索引文件中的元素进行平分
-    # 转成列表,计算总长 和 平分后的处理区间
-    key_index_list = [key for key in key_index_cache.keys()]
-    total_len = len(key_index_list)
-    internal = math.ceil(total_len / worker_num )
-
-    # 利用 缓存索引文件 生成处理区间的位置信息
-    # 位置信息容器
-    pos_list = []
-    for i in range(worker_num + 1):
-        # 计算平分点在列表中的位置
-        l_pos = i * internal
-        # 如果大于等于列表大小需要额外处理
-        if l_pos >= total_len:
-            l_pos = total_len -1
-        # 获取列表中的词根
-        key_index = key_index_list[l_pos:l_pos+1]
-        # 根据词根获取位置信息
-        pos = key_index_cache[key_index[0]]
-        # 记录位置信息
-        pos_list.append(pos)
-   
-    logging.info("把分段结果提交至多进程执行")
-    # 生成任务
-    process_futures = []
-    for i in range(0, len(pos_list)-1):
-        pos = pos_list[i: i+2]
-        process_futures.append(pool.submit(handle, config.KEY_FILE, pos[0], pos[1], share_pattern, stop_word_cache))
-
-    # 等待数据返回
-    logging.info("等待多进程执行结束")
-    wait(process_futures,return_when=FIRST_COMPLETED)
-    key_reverse_dict = {}
-    for future in as_completed(process_futures):
-        logging.info("有子进程执行结束,返回部分结果")
-        result = future.result()
-        for key, value in result.items():
-            # 进行数据合并
-            val_dict = key_reverse_dict.get(key)
-            if val_dict:
-                count = val_dict["count"]
-                key_reverse_dict[key]["count"] = count + value["count"]
-                key_reverse_dict[key]["indexs"].extend(value["indexs"])
-            else:
-                key_reverse_dict[key] = value
-
-    logging.info("已获取全部子进程返回部分结果")
+    return reverse_dict
     
-    # 根据关键词数量进行排序,这里通过items()方法转成元组列表,才能进行排序
-    logging.info("根据关键词数量进行倒序排列")
-    sorted_reverse_list = sorted(key_reverse_dict.items(), key=lambda x: x[1]["count"], reverse=True)
 
-    # 保存到本地文件
-    logging.info("保存到本地")
-    with open(config.KEY_REVERSE_FILE, "w", encoding=config.ENCODING_CHARSET) as f:
-        for key, value in sorted_reverse_list:
-            f.write("%s,%d,%s\n" % (key, value["count"], value["indexs"]))
+def main_process():
+    
+    logging.info("主进程 开始执行初始化")
+    
+    # 进程处理数
+    process_num = 4
 
-    tools.log_end_msg(TITLE)
+    # 关键表索引
+    key_index = tools.load_obj(config.KEY_INDEX_CACHE)
 
-def one_process():
-    """
-    构建倒排表(单进程版)
-    """
+    # 开始时间
+    start_time = time()
 
-    tools.init_log()
-    tools.log_start_msg(TITLE)
+    # 关键词总数
+    total_num = len(key_index)
 
-    logging.info("目前执行的是单进程版")
+    # 任务分割大小
+    split_num = math.ceil(total_num/process_num)
 
-    # 提取规则
-    pattern = re.compile(config.KEY_RE_PATTERAN, re.I)
+    logging.info("主进程 开始划分子任务")
+    tasks = tools.avg_split_task(total_num, split_num)
+    
+    with ProcessPoolExecutor(process_num) as process_pool:
 
-    # 倒排表 容器
-    key_reverse = {}
+        logging.info("主进程 提交任务到子进程")
+        process_futures = [process_pool.submit(sub_process, key_index[task[0]], key_index[task[1]]) for task in tasks]
 
-    # 停用表
-    stop_word_cache = stop_word.load_stop_word()
+        # 移除无效变量 以防占用内存
+        del tasks
+        del key_index
 
-    with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as fkey:
+         # 倒排表和统计信息容器
+        reverse_dict = {}
         
-        # 获取文件总大小,获取后需要复原光标位置
-        fkey.seek(0, os.SEEK_END)
-        total_num = fkey.tell()
-        fkey.seek(0)
-
-        while True:
-            # 获取当前处理位置
-            cur_pos = fkey.tell()
+        # 进行数据合并
+        for p_future in as_completed(process_futures):
+            result = p_future.result()
+            for key, val_obj in result.items():
+                reverse_obj = reverse_dict.get(key)
+                if reverse_obj:
+                    reverse_obj["count"] = reverse_obj["count"] + val_obj["count"]
+                    reverse_obj["indexs"] = reverse_obj["indexs"] | val_obj["indexs"]
+                else:
+                    reverse_dict[key] = val_obj
             
-            # 进度提示
-            tools.tip_in_size(total_num, cur_pos)
+            # 移除无效变量 以防占用内存
+            process_futures.remove(p_future)
+        
+        logging.info("主进程 已获取全部子进程返回结果,总数据量:%d" % len(reverse_dict))
+    
+    logging.info("主进程 对词根关联的索引进行排序和转换")
+    for val_obj in reverse_dict.values():
+        val_obj["indexs"] = list(val_obj["indexs"])
+        val_obj["indexs"].sort()
 
-            # 读取关键词数据
-            line = fkey.readline()
+    # 根据关键词数量进行排序,这里通过items()方法转成元组列表,才能进行排序
+    logging.info("主进程 根据关键词数量进行排列")
+    sorted_reverse_list = sorted(reverse_dict.items(), key=lambda x: x[1]["count"], reverse=True)
 
-            # 如果到行尾则结束
-            if not line:
-                break
+    # 保存到本地文件
+    logging.info("主进程 保存到本地")
+    with open("./data/tmp/reverse_test.csv", "w", encoding=config.ENCODING_CHARSET) as f_reverse, \
+        open(config.KEY_REVERSE_STATISTICS_FILE, "w", encoding=config.ENCODING_CHARSET) as f_statistics:
+        for key, val_obj in sorted_reverse_list:
+            f_reverse.write("%s,%s\n" % (key, val_obj["indexs"]))
+            f_statistics.write("%s,%d\n" % (key, val_obj["count"]))
+    
+    logging.info("主进程 构建倒排索引耗时:%f" % (time() - start_time))
 
-            # 提取数据
-            m = pattern.match(line)
-            # 获取关键词序号
-            index = m.group(1)
-            # 获取词根
-            key_root = m.group(3)
-            # 转换成真正的list对象
-            for item in ast.literal_eval(key_root):
-                
-                # 排除停用词
-                if item in stop_word_cache:
-                    continue
-
-                # 构建倒排表
-                val = key_reverse.get(item)
-                if val:
-                    count = key_reverse[item]["count"]
-                    key_reverse[item]["count"] = count + 1
-                    key_reverse[item]["indexs"].append(index) 
-                else:
-                    key_reverse[item]= {
-                        "count":1,
-                        "indexs":[]
-                    }
 
-    logging.info("根据关键词数量进行倒序排列")
-    sorted_reverse_list = sorted(key_reverse.items(), key=lambda x: x[1]["count"], reverse=True)
+if __name__ == "__main__":
+    
+    TITLE = "生成关键词倒排和统计信息"
+    tools.log_start_msg(TITLE)
 
-    # 保存到本地文件
-    logging.info("保存到本地")
-    with open(config.KEY_REVERSE_FILE, "w", encoding=config.ENCODING_CHARSET) as f:
-        for key, value in sorted_reverse_list:
-            f.write("%s,%d,%s\n" % (key, value["count"], value["indexs"]))
+    main_process()
 
     tools.log_end_msg(TITLE)
 
-if __name__ == "__main__":
-    multi_thread()
+   

+ 8 - 9
key_reverse_index.py

@@ -1,13 +1,7 @@
 # -*- coding:utf-8 -*-
 
-import sys
-from time import time
-import os
 import config
 import tools
-import ast
-import re
-import stop_word
 import mmap
 
 TITLE = "关键词倒排索引"
@@ -18,7 +12,7 @@ def main():
     tools.log_start_msg(TITLE)
 
     # 关键词倒排索引容器
-    key_reverse_index_cache = {}
+    reverse_index = []
 
     with open(config.KEY_REVERSE_FILE, "r", encoding=config.ENCODING_CHARSET) as freverse, \
         mmap.mmap(freverse.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
@@ -38,13 +32,18 @@ def main():
             
             # 获取词根位置,建立词根和位置的关系
             index = line.index(",")
-            key_reverse_index_cache[line[:index]]=cur_pos
+            key = line[:index]
+            next_pos = fmmap.tell()
+            reverse_index.append((key, cur_pos, next_pos))
             
             # 进度显示
             tools.tip_in_size(total_num, cur_pos)
         
         # 保存索引
-        tools.save_obj(config.KEY_REVERSE_INDEX_CACHE, key_reverse_index_cache)
+        with open("./data/tmp/reverse_index_test.csv", "w", encoding=config.ENCODING_CHARSET) as f:
+            for key, cur_pos, next_pos in reverse_index:
+                f.write("%s,%d,%d\n" % (key, cur_pos, next_pos))
+        # tools.save_obj(config.KEY_REVERSE_INDEX_CACHE, key_reverse_index_cache)
 
     tools.log_end_msg(TITLE)
 

+ 1 - 1
logging.conf

@@ -8,7 +8,7 @@ keys=fileHandler,consoleHandler
 keys=simpleFormatter
 
 [logger_root]
-level=INFO
+level=DEBUG
 handlers=fileHandler,consoleHandler
 
 [handler_consoleHandler]

+ 48 - 35
merge.py

@@ -6,10 +6,9 @@ import tools
 import logging
 import zipfile
 
-TITLE= "拓展词合并"
 
-# 合并的文件目录
-DATA_DIR = "E:\Download\长尾关键词\普通-p"
+# 合并的文件目录
+DATA_DIR = "E:\Download\长尾关键词\长尾关键词-什么\普通-p"
 
 def get_files(path):
     '''
@@ -36,12 +35,6 @@ def merge_file_content():
         跳过压缩文件中的文件
     ----------
     """
-
-    # 日志初始化
-    tools.init_log()
-
-    tools.log_start_msg(TITLE)
-
     # 获取文件列表
     files = get_files(DATA_DIR)
 
@@ -49,36 +42,56 @@ def merge_file_content():
     total_num = len(files)
     logging.info("待处理文件数:%d" % total_num)
 
-    with open(config.MERGE_FILE, "w", encoding="utf-8") as f:
+    # 排重过滤
+    repeat_set = set()
 
-        for i, file in enumerate(files):
-            zfile = zipfile.ZipFile(file)
-            filenames = zfile.namelist()
-            for filename in filenames:
+    # 关键词排重前总数
+    total_count=0
 
-                # 重新编码文件名为正确形式
-                realname = filename.encode('cp437').decode('gbk')
-                
-                # 排除无效文件
-                if realname in config.MERGE_EXCLUDE_FILES:
-                    continue
-
-                logging.info("正在处理文件: %s" % realname)
-
-                # 读取压缩文件中的文件
-                with zfile.open(filename) as file_content:
-                    lines = file_content.readlines()
-                    # 跳过开头两行
-                    for line in lines[2:]:
-                        split = line.decode("gbk").split(",")
-                        # 只需要第一列的数据
-                        f.write(split[0])
-                        f.write("\n")
-                
-            tools.tip(total_num, i)
+    # 读取数据并进行排重
+    for i, file in enumerate(files):
+        zfile = zipfile.ZipFile(file)
+        filenames = zfile.namelist()
+        for filename in filenames:
 
-    tools.log_end_msg(TITLE)
+            # 重新编码文件名为正确形式
+            realname = filename.encode('cp437').decode('gbk')
+                
+            # 排除无效文件
+            if realname in config.MERGE_EXCLUDE_FILES:
+                continue
+
+            logging.info("正在处理文件: %s" % realname)
+
+            # 读取压缩文件中的文件
+            with zfile.open(filename) as file_content:
+                lines = file_content.readlines()
+                # 跳过开头两行
+                for line in lines[2:]:
+                    split = line.decode("gbk").split(",")
+                    # 只需要第一列的数据
+                    repeat_set.add(split[0])
+                    # 记录次数
+                    total_count = total_count + 1
+        
+        tools.tip(total_num, i)
+    
+    logging.info("正在保存合并结果,文件位置:%s,排重前数据量:%d,排重后数据量:%d" % (config.MERGE_FILE, total_count, len(repeat_set)))
+    with open(config.MERGE_FILE, "w", encoding="utf-8") as f:
+        for item in repeat_set:
+            f.write(item)
+            f.write("\n")
+    
 
 if __name__ == '__main__':
+
+    TITLE= "拓展词合并"
+
+    # 日志初始化
+    tools.init_log()
+    tools.log_start_msg(TITLE)
+
     merge_file_content()
+
+    tools.log_end_msg(TITLE)
         

+ 58 - 0
split.py

@@ -0,0 +1,58 @@
+# -*-: coding:utf-8 -*-
+
+import csv
+import re
+
+def split():
+    INPUT_DATA = r"./data/agg_filter.csv"
+    OUTPUT_TEMP = "./data/split/agg_split_%d.txt"
+    OUTPUT_TEMP2 = "./data/split/agg_split_%d_%d.txt"
+
+    startPattern = re.compile("######开始######")
+    con_l = []
+    sub = None
+    with open(INPUT_DATA, "r", encoding="GBK") as fr:
+        for line in fr.readlines():
+            tl = startPattern.findall(line)
+            if len(tl) > 0:
+                sub = []
+                sub.append(line)
+                con_l.append(sub)
+            elif line.startswith("\n"):
+                continue
+            else:
+                sub.append(line)
+    
+    # step = 71500
+    # for i, v in enumerate(range(0, len(con_l), step)):
+    #     with open(OUTPUT_TEMP % (i+1), "w", encoding="GBK") as fw:
+    #         for ele in con_l[v:v+step]:
+    #             if len(ele) == 1:
+    #                 continue
+
+    #             fw.write("\n")
+    #             for content in ele:
+    #                 fw.write(content)
+    filter_l = [
+        (1000, 1000, [subList for subList in con_l if len(subList)>=1000]),
+        (500, 1000, [subList for subList in con_l if len(subList)>=500 and len(subList) < 1000]),
+        (100,500,[subList for subList in con_l if len(subList)>=100 and len(subList) < 500]),
+        (50,100,[subList for subList in con_l if len(subList)>=50 and len(subList)<100]),
+        (10,50,[subList for subList in con_l if len(subList)>=10 and len(subList)<50]),
+        (5,10,[subList for subList in con_l if len(subList)>=5 and len(subList)<10]),
+        (3,5,[subList for subList in con_l if len(subList)>=3 and len(subList)<5]),
+        (2,2,[subList for subList in con_l if len(subList)==2])
+        # (1,1,[subList for subList in con_l if len(subList)==1])
+    ]
+
+    for start, end, sublist in filter_l:
+        with open(OUTPUT_TEMP2 % (start, end), "w", encoding="GBK") as fw:
+            for ele in sublist:
+                fw.write("\n")
+                for content in ele:
+                    fw.write(content)
+    
+    
+   
+if __name__ == '__main__':
+    split()

+ 1 - 13
statistics.py

@@ -131,11 +131,6 @@ def memory_statistics():
 
 def main():
 
-    # num = 459789
-    # print(num%10000)
-    # print(num//10000)
-    # return
-
     tools.init_log()
     tools.log_start_msg(TASK_TITLE)
 
@@ -146,11 +141,4 @@ def main():
 
 
 if __name__ == "__main__":
-    # print("加载开始")
-    # cache = tools.load_obj(config.KEY_REVERSE_INDEX_HOT_CACHE+".bak")
-    # print("加载结束")
-    # time.sleep(20)
-
-    Shape = namedtuple('Shape', ['x', 'y', 'z'])
-    exm = Shape(1, 2, 3)
-    print(exm.index(2))
+    main()

+ 0 - 65
stop_word.py

@@ -1,65 +0,0 @@
-# -*- coding:utf-8 -*-
-
-from datetime import datetime
-import os
-import time
-import tools
-import config
-import pickle
-import logging
-
-TITLE = "停用词"
-
-def load_stop_word():
-    """
-    加载停用词
-    """
-
-    # 判断是否存在缓存
-    if os.path.exists(config.STOP_WORD_CACHE):
-        logging.debug("存在停用词缓存")
-        return tools.load_obj(config.STOP_WORD_CACHE)
-
-    logging.debug("正在构建停用词缓存")
-
-    # 停用词容器
-    stop_word = []
-
-    # 构建停用词列表
-    stop_word_files = os.listdir(config.STOP_WORD_DIR)
-    for file in stop_word_files:
-        stop_word_file = os.path.join(config.STOP_WORD_DIR, file)
-        with open(stop_word_file, encoding=config.ENCODING_CHARSET) as f:
-            for item in f:
-                # 移除换行符
-                stop_word.append(item.replace("\n",""))
-    # 去重
-    stop_word = list(set(stop_word))
-
-    # 把list改成dict提升检索速度
-    stop_word_dict = {}
-    for item in stop_word:
-        stop_word_dict[item]=None
-    
-    logging.debug("把停用词缓存保存到本地")
-
-    # 保存本地作为缓存
-    tools.save_obj(config.STOP_WORD_CACHE, stop_word_dict)
-    
-    return stop_word_dict
-
-if __name__ == '__main__':
-
-    tools.init_log()
-    tools.log_start_msg(TITLE)
-
-    stop_word = load_stop_word()
-
-    start = time.time()
-    for i in range(1400*10000):
-        for item in ["总之", "风雨无阻","千"]:
-            item in stop_word
-    end = time.time()
-    print("耗时:", end - start)
-
-    tools.log_end_msg(TITLE)

+ 52 - 41
tools.py

@@ -116,9 +116,6 @@ def tip_in_size(total_size, cur_pos):
         # 更新 提示检查点
         tip_internal["check_point"] = check_point
 
-
-    
-
 def save_obj(path, obj):
     """
     保存对象至本地
@@ -133,52 +130,66 @@ def load_obj(path):
     with open(path, "rb") as f:
         return pickle.load(f)
 
-if __name__ == "__main__":
-
-    init_log()
-
-    log_start_msg(TITLE)
-
-    # 测试普通提示
-    # total = 3
-    # for i in range(total):
-    #     tip(total, i)
-
-    # 测试mmap的提示
-    # with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as fkey, \
-    #     mmap.mmap(fkey.fileno(), 0, access=mmap.ACCESS_READ) as fmmap:
-        
-    #     # 总大小
-    #     total_num = fmmap.size()
+def load_stop_word():
+    """
+    加载停用词
+    """
 
-    #     while True:
-    #         # 读取光标位置
-    #         cur_pos = fmmap.tell()
-    #         # 把光标移动到下一行
-    #         line = fmmap.readline()
+    # 判断是否存在缓存
+    if os.path.exists(config.STOP_WORD_CACHE):
+        logging.debug("存在停用词缓存")
+        return load_obj(config.STOP_WORD_CACHE)
 
-    #         # 进度显示
-    #         tip_in_size(total_num, cur_pos)
+    logging.debug("正在构建停用词缓存")
 
-    #         if not line:
-    #             break
+    # 停用词容器
+    stop_word = set()
 
-    # 测试逐行读取的进度提示
-    with open(config.KEY_FILE, "r", encoding=config.ENCODING_CHARSET) as fkey:
+    # 构建停用词列表
+    stop_word_files = os.listdir(config.STOP_WORD_DIR)
+    for file in stop_word_files:
+        stop_word_file = os.path.join(config.STOP_WORD_DIR, file)
+        with open(stop_word_file, encoding=config.ENCODING_CHARSET) as f:
+            for item in f:
+                # 移除换行符
+                stop_word.add(item.replace("\n","").replace("\r", ""))
 
-        fkey.seek(0, os.SEEK_END)
-        total_num = fkey.tell()
-        fkey.seek(0)
+    # 改成dict提升检索速度
+    stop_word_dict = {}
+    for item in stop_word:
+        stop_word_dict[item]=None
+    
+    logging.debug("把停用词缓存保存到本地")
 
-        while True:
+    # 保存本地作为缓存
+    save_obj(config.STOP_WORD_CACHE, stop_word_dict)
+    
+    return stop_word_dict
 
-            cur_pos = fkey.tell()
 
-            line = fkey.readline()
+def avg_split_task(total:int, split_internal:int):
+    """
+    平分任务
+    """
 
-            tip_in_size(total_num, cur_pos)
+    # 分割的任务份数
+    split_num = math.ceil(total / split_internal)
 
-            if not line:
-                break;
+    # 平分
+    tasks = []
+    for i in range(split_num):
+        # 计算平分点在列表中的位置
+        start_pos = i * split_internal
+        end_pos = i * split_internal + split_internal
+        # 如果超过列表大小需要额外处理
+        if end_pos >= total:
+            end_pos = -1
+        tasks.append([start_pos,end_pos])
     
-    log_end_msg(TITLE)
+    return tasks
+
+if __name__ == "__main__":
+    stop_word = load_stop_word()
+    with open("./data/stopword.txt","w",encoding="UTF-8") as f:
+        for stopWord in stop_word.keys():
+            f.write("%s\n" % stopWord)