money.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505
  1. # -*- coding:utf-8 -*-
  2. import math
  3. import mmap
  4. import os
  5. import re
  6. import time
  7. from concurrent.futures import ProcessPoolExecutor, as_completed
  8. from bitmap import BitMap
  9. import utils
  10. import jieba
  11. import zipfile
  12. import logging
  13. # 文件后缀:长尾词.txt
  14. FILE_SUFFIX_LONG_TAIL = "_长尾词.txt"
  15. # 文件后缀:长尾词_合并.txt
  16. FILE_LONG_TAIL_MERGE = "长尾词_合并.txt"
  17. # 文件:长尾词_合并_分词.txt
  18. FILE_LONG_TAIL_MERGE_SPLIT = "长尾词_合并_分词.txt"
  19. # 文件:长尾词_合并_分词倒排索引.txt
  20. FILE_LONG_TAIL_MERGE_REVERSE_INDEX = "长尾词_合并_倒排索引.txt"
  21. # 文件:长尾词_合并_聚合.txt
  22. FILE_LONG_TAIL_MERGE_AGG = "长尾词_合并_聚合.txt"
  23. def extract_word_from_5118(file_path: str):
  24. """
  25. 从5118关键词压缩文件中提取数据
  26. :param file_path: 待处理文件夹路径
  27. :return: None
  28. """
  29. file_list = []
  30. for file in os.listdir(file_path):
  31. file_list.append(os.path.join(file_path, file))
  32. for i, file in enumerate(file_list):
  33. zfile = zipfile.ZipFile(file)
  34. filenames = zfile.namelist()
  35. for filename in filenames:
  36. # 重新编码文件名为正确形式
  37. real_name = filename.encode('cp437').decode('gbk')
  38. # 排除无效文件
  39. if real_name in ['打开乱码如何处理?.txt']:
  40. continue
  41. # 关键词存放容器
  42. word_container = set()
  43. # 读取压缩文件中的文件
  44. with zfile.open(filename) as file_content:
  45. lines = file_content.readlines()
  46. # 跳过开头两行
  47. for line in lines[2:]:
  48. split = line.decode("gbk").split(",")
  49. # 只需要第一列的数据
  50. word_container.add(split[0])
  51. output_file_name = real_name[0:real_name.index("--")]
  52. output_file_path = os.path.join(file_path, output_file_name + FILE_SUFFIX_LONG_TAIL)
  53. with open(output_file_path, "w", encoding="utf-8") as f:
  54. for item in word_container:
  55. f.write(item)
  56. f.write("\n")
  57. def merge_word(file_path: str):
  58. """
  59. 合并长尾词(带去重)
  60. :param file_path: 待处理文件夹路径
  61. :return: None
  62. """
  63. # 获取文件列表
  64. file_list = []
  65. for file in os.listdir(file_path):
  66. if file.endswith(FILE_SUFFIX_LONG_TAIL):
  67. file_list.append(os.path.join(file_path, file))
  68. # 长尾词集合容器
  69. word_set = set()
  70. # 读取数据并排重
  71. for i, file in enumerate(file_list):
  72. with open(file, "r", encoding="utf-8") as f:
  73. for word in f:
  74. word_set.add(word.replace("\n", ""))
  75. # 保存合并结果
  76. with open(os.path.join(file_path, FILE_LONG_TAIL_MERGE), "w", encoding="utf-8") as f:
  77. for item in word_set:
  78. f.write(item)
  79. f.write("\n")
  80. def word_split_statistics(file_path: str):
  81. """
  82. 分词统计
  83. :param file_path: 待处理文件夹路径
  84. :return: None
  85. """
  86. file_list = []
  87. for file in os.listdir(file_path):
  88. file_list.append(os.path.join(file_path, file))
  89. stop_word_dict = utils.load_stop_word()
  90. for i, file in enumerate(file_list):
  91. if not file.endswith(FILE_SUFFIX_LONG_TAIL):
  92. continue
  93. # 分词结果容器
  94. key_dict = {}
  95. with open(file, "r", encoding="utf-8") as f:
  96. for tmp_word in f:
  97. # 分词
  98. word_list = jieba.cut_for_search(tmp_word.replace("\n", ""))
  99. # 统计
  100. for word in word_list:
  101. # 过滤停用词
  102. if word in stop_word_dict:
  103. continue
  104. if word in key_dict:
  105. key_dict[word] = key_dict[word] + 1
  106. else:
  107. key_dict[word] = 1
  108. # 根据词频进行倒序排列
  109. sorted_key_list = sorted(key_dict.items(), key=lambda x: x[1], reverse=True)
  110. output_file_name = file[file.rindex("\\") + 1:file.index(FILE_SUFFIX_LONG_TAIL)]
  111. output_file_path = os.path.join(file_path, output_file_name + "_长尾词_分词统计.csv")
  112. with open(output_file_path, "w", encoding="UTF-8") as f:
  113. for key, count in sorted_key_list:
  114. f.write("%s,%d\n" % (key, count))
  115. def word_split_reverse(input_file: str, start_pos: int, end_pos: int):
  116. """
  117. 分词和建立倒排索引
  118. :param input_file: 待处理的文件
  119. :param start_pos: 处理的开始位置
  120. :param end_pos: 处理的结束位置
  121. :return: (开始位置,分词结果,倒排索引)
  122. """
  123. # 加载停用词
  124. stop_word_dict = utils.load_stop_word()
  125. # 关键词存放容器
  126. word_arr_list = []
  127. # 倒排索引
  128. word_reverse_index = dict()
  129. with open(input_file, "r", encoding="utf-8") as fr:
  130. for i, tmp_word in enumerate(fr):
  131. # start_pos是行数,而i要从0开始
  132. if i + 1 < start_pos:
  133. continue
  134. # 当前位置
  135. cur_pos = i + 1
  136. # 到达任务边界,结束
  137. if cur_pos == end_pos:
  138. break
  139. # 分词
  140. word_list = jieba.cut_for_search(tmp_word.replace("\n", ""))
  141. # 分词过滤结果
  142. word_filter_arr = []
  143. # 过滤停用词
  144. for word in word_list:
  145. if word in stop_word_dict:
  146. continue
  147. word_index_arr = word_reverse_index.get(word)
  148. if word_index_arr:
  149. word_index_arr.append(cur_pos)
  150. else:
  151. word_reverse_index[word] = [cur_pos]
  152. word_filter_arr.append(word)
  153. if len(word_filter_arr) == 0:
  154. word_arr_list.append([])
  155. else:
  156. word_arr_list.append(word_filter_arr)
  157. return start_pos, word_arr_list, word_reverse_index
  158. def prepare_word_split_and_reverse_index(file_path: str):
  159. """
  160. 预处理:长尾词分词、建立倒排索引
  161. :param file_path: 待处理文件夹路径
  162. :return:
  163. """
  164. # 判断文件是否存在
  165. word_input_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE)
  166. if os.path.exists(word_input_file) and not os.path.isfile(word_input_file):
  167. print("文件不存在! " + word_input_file)
  168. return
  169. # 总文本数量
  170. total_line_num = 0
  171. with open(word_input_file, "r", encoding="utf-8") as fi:
  172. total_line_num = sum(1 for line in fi)
  173. if total_line_num == 0:
  174. print("没有待处理的数据,文本量为0")
  175. return
  176. # 分割任务数量
  177. task_list = utils.avg_split_task(total_line_num, math.ceil(total_line_num / os.cpu_count()))
  178. # 任务进程处理结果
  179. p_result_list = []
  180. # 多进程处理
  181. with ProcessPoolExecutor(os.cpu_count()) as process_pool:
  182. # 提交任务
  183. process_futures = [process_pool.submit(word_split_reverse, word_input_file, task[0], task[1]) for task in
  184. task_list]
  185. # 处理返回结果
  186. for p_future in as_completed(process_futures):
  187. p_result = p_future.result()
  188. if p_result:
  189. p_result_list.append(p_result)
  190. # 分词结果排序
  191. p_result_list = sorted(p_result_list, key=lambda v: v[0])
  192. # 输出分词结果
  193. split_output_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_SPLIT)
  194. with open(split_output_file, "w", encoding="UTF-8") as fo:
  195. for start_pos, word_arr_list, reverse_index in p_result_list:
  196. for word_arr in word_arr_list:
  197. fo.write("%s\n" % ",".join([str(i) for i in word_arr]))
  198. # 关键词倒排索引
  199. word_reverse_index_dict = dict()
  200. # 合并倒排索引
  201. for start_pos, word_arr, reverse_index_dict in p_result_list:
  202. for key, value in reverse_index_dict.items():
  203. reverse_index_arr = word_reverse_index_dict.get(key)
  204. if reverse_index_arr:
  205. reverse_index_arr.extend(value)
  206. else:
  207. word_reverse_index_dict[key] = value
  208. # 输出倒排索引
  209. with open(os.path.join(file_path, FILE_LONG_TAIL_MERGE_REVERSE_INDEX), "w", encoding="UTF-8") as fo:
  210. for key, value in word_reverse_index_dict.items():
  211. fo.write("%s,%s\n" % (key, value))
  212. # 关闭进程池
  213. process_pool.shutdown()
  214. def agg_word_cal(word_file: str, word_split_file: str, word_position_list: list, word_split_position_list: list,
  215. agg_threshold: float, main_word: str, main_key_list: list, candidate_position_list: list):
  216. """
  217. 长尾词聚合计算
  218. :param word_file: 长尾词文件路径
  219. :param word_split_file: 长尾词分词文件路径
  220. :param word_position_list: 长尾词位置索引
  221. :param word_split_position_list 词根位置索引
  222. :param agg_threshold: 聚合阈值
  223. :param main_word: 主词
  224. :param main_key_list: 主词词根
  225. :param candidate_position_list: 候选词位置
  226. :return:
  227. """
  228. # 结果容器
  229. result_list = []
  230. with (open(word_file, "r", encoding="UTF-8") as f_key,
  231. mmap.mmap(f_key.fileno(), 0, access=mmap.ACCESS_READ) as f_key_mmap,
  232. open(word_split_file, "r", encoding="UTF-8") as f_key_split,
  233. mmap.mmap(f_key_split.fileno(), 0, access=mmap.ACCESS_READ) as f_key_split_mmap):
  234. if not candidate_position_list:
  235. logging.info("子进程:候选词列表为空,结束执行")
  236. return
  237. for candidate_position in candidate_position_list:
  238. try:
  239. # 获取关键词
  240. word_position = word_position_list[candidate_position]
  241. f_key_mmap.seek(word_position)
  242. candidate_word = f_key_mmap.readline().decode("UTF-8").replace("\r", "").replace("\n", "")
  243. if candidate_word == "小孩在肚子里怎么呼吸":
  244. print("子:出现拉拉")
  245. # 获取分词结果
  246. key_position = word_split_position_list[candidate_position]
  247. f_key_split_mmap.seek(key_position)
  248. temp_candidate_word_stem = f_key_split_mmap.readline().decode("UTF-8").replace("\r", "").replace("\n", "")
  249. # 为空则跳过
  250. if len(temp_candidate_word_stem) == 0:
  251. continue
  252. candidate_word_key_list = temp_candidate_word_stem.split(",")
  253. # 计算相关性
  254. try:
  255. val = utils.cal_cos_sim(main_word, main_key_list, candidate_word, candidate_word_key_list)
  256. if val >= agg_threshold:
  257. result_list.append((candidate_position, candidate_word))
  258. except Exception as e:
  259. logging.error("主关键词:%s 发生异常,涉及的副关键词信息-关键词:%s,分词:%s" % (
  260. main_word, candidate_word, candidate_word_key_list), e)
  261. except Exception as e:
  262. logging.error("子进程发生异常", e)
  263. return result_list
  264. def agg_word(file_path: str):
  265. """
  266. 长尾词聚合
  267. :param file_path:
  268. :return:
  269. """
  270. # 判断文件是否存在
  271. for file_name in [FILE_LONG_TAIL_MERGE, FILE_LONG_TAIL_MERGE_SPLIT, FILE_LONG_TAIL_MERGE_REVERSE_INDEX]:
  272. input_file = os.path.join(file_path, file_name)
  273. if os.path.exists(input_file) and not os.path.isfile(input_file):
  274. raise Exception("文件不存在!文件路径:" + input_file)
  275. # 总长尾词数量
  276. word_total_num = 0
  277. # 记录关键词位置
  278. word_position_list = [0]
  279. word_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE)
  280. with (open(word_file, "r", encoding="utf-8") as f,
  281. mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as fmmap):
  282. while True:
  283. # 获取当前位置
  284. cur_pos = fmmap.tell()
  285. # 移动到一下行
  286. line = fmmap.readline()
  287. # 结束检测
  288. if not line:
  289. break
  290. # 记录
  291. word_position_list.append(cur_pos)
  292. word_total_num = word_total_num + 1
  293. # 记录分词位置
  294. word_split_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_SPLIT)
  295. word_split_position_list = [0]
  296. with (open(word_split_file, "r", encoding="utf-8") as f,
  297. mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as fmmap):
  298. while True:
  299. # 获取当前位置
  300. cur_pos = fmmap.tell()
  301. # 移动到一下行
  302. line = fmmap.readline()
  303. # 结束检测
  304. if not line:
  305. break
  306. # 记录
  307. word_split_position_list.append(cur_pos)
  308. # 获取倒排索引
  309. word_reverse_index_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_REVERSE_INDEX)
  310. word_reverse_index_dict = {}
  311. # 分词
  312. key_re = r"([^,]+),\["
  313. key_pattern = re.compile(key_re, re.I)
  314. # 索引
  315. index_pattern = re.compile(r"\d+", re.I)
  316. with open(word_reverse_index_file, "r", encoding="utf-8") as f:
  317. for line in f:
  318. key_m = key_pattern.match(line)
  319. key = key_m.group(1)
  320. val = index_pattern.findall(line[line.index(","):])
  321. word_reverse_index_dict[key] = [int(v) for v in val]
  322. # 已使用长尾词位图
  323. unused_bitmap = BitMap(word_total_num + 1)
  324. # 聚合阈值
  325. agg_threshold = 0.8
  326. # 提交子进程阈值
  327. process_threshold = os.cpu_count() * 30
  328. # 提交任务 并输出结果
  329. word_agg_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_AGG)
  330. with (ProcessPoolExecutor(max_workers=os.cpu_count()) as process_pool,
  331. open(word_agg_file, "w", encoding="UTF-8") as fo,
  332. open(word_file, "r", encoding="UTF-8") as f_word,
  333. open(word_split_file, "r", encoding="UTF-8") as f_word_split,
  334. mmap.mmap(f_word_split.fileno(), 0, access=mmap.ACCESS_READ) as f_split_mmap):
  335. # 准备数据
  336. for position_index, main_word in enumerate(f_word, 1):
  337. # 判断是否已聚合,否则置为已使用
  338. if unused_bitmap.test(position_index):
  339. continue
  340. else:
  341. unused_bitmap.set(position_index)
  342. # 移除换行符
  343. main_word = utils.remove_line_break(main_word)
  344. if main_word == "小孩在肚子里怎么呼吸":
  345. print("出现拉拉")
  346. # 获取分词结果
  347. key_position = word_split_position_list[position_index]
  348. f_split_mmap.seek(key_position)
  349. temp_main_word_stem = utils.remove_line_break(f_split_mmap.readline().decode("UTF-8"))
  350. # 为空则跳过
  351. if len(temp_main_word_stem) == 0:
  352. continue
  353. main_word_stem_list = temp_main_word_stem.split(",")
  354. # 从倒排索引中获取候选词的位置索引
  355. candidate_position_set = set()
  356. for main_word_stem in main_word_stem_list:
  357. index_list = word_reverse_index_dict.get(main_word_stem)
  358. if index_list:
  359. candidate_position_set.update(index_list)
  360. # 排除已使用的长尾词
  361. candidate_position_list = []
  362. for candidate_position in candidate_position_set:
  363. # 跳过已使用
  364. if unused_bitmap.test(candidate_position):
  365. continue
  366. candidate_position_list.append(candidate_position)
  367. # 分割计算任务,没有计算任务则跳过
  368. candidate_position_len = len(candidate_position_list)
  369. if candidate_position_len == 0:
  370. continue
  371. # 暂存分析结果
  372. p_result_list = []
  373. # 任务量不足直接在主进程上进行计算,否则提交子进程计算
  374. if candidate_position_len < process_threshold:
  375. p_result = agg_word_cal(word_file, word_split_file, word_position_list, word_split_position_list, agg_threshold, main_word, main_word_stem_list, candidate_position_list)
  376. if len(p_result) > 0:
  377. p_result_list.extend(p_result)
  378. else:
  379. task_split_list = utils.avg_split_task(candidate_position_len, math.ceil(candidate_position_len / os.cpu_count()))
  380. all_task_list = [candidate_position_list[start_pos:end_pos] for start_pos, end_pos in task_split_list]
  381. # 提交任务
  382. process_futures = []
  383. for task_list in all_task_list:
  384. if not task_list:
  385. continue
  386. p_future = process_pool.submit(agg_word_cal, word_file, word_split_file, word_position_list, word_split_position_list, agg_threshold, main_word, main_word_stem_list, task_list)
  387. process_futures.append(p_future)
  388. for p_future in as_completed(process_futures):
  389. p_result = p_future.result()
  390. if p_result and len(p_result) > 0:
  391. p_result_list.extend(p_result)
  392. # 处理分析结果,并标记已处理数据
  393. if len(p_result_list) > 0:
  394. fo.write("%s\n" % main_word)
  395. for candidate_position, candidate_word in p_result_list:
  396. unused_bitmap.set(candidate_position)
  397. fo.write("%s\n" % candidate_word)
  398. fo.write("\n")
  399. # 清除上一轮的数据
  400. p_result_list.clear()
  401. # 关闭线程
  402. process_pool.shutdown()
  403. if __name__ == "__main__":
  404. print("开始时间" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
  405. # filePath = "../data"
  406. filePath = "../data/test"
  407. # extract_word_from_5118(filePath)
  408. # merge_word(filePath)
  409. # prepare_word_split_and_reverse_index(filePath)
  410. agg_word(filePath)
  411. # word_split_statistics(file_path)
  412. # tasks = utils.avg_split_task(100, 12)
  413. # 两者计算余弦值等于:0.8
  414. # val = utils.cal_cos_sim("QQ邮箱格式怎么写", ["QQ", "邮箱", "格式", "怎么", "写"], "QQ邮箱格式如何写",
  415. # ["QQ", "邮箱", "格式", "如何", "写"])
  416. print("结束时间" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))