money.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544
  1. # -*- coding:utf-8 -*-
  2. import math
  3. import mmap
  4. import os
  5. import re
  6. import time
  7. from concurrent.futures import ProcessPoolExecutor, as_completed
  8. import redis
  9. from bitmap import BitMap
  10. from tqdm import tqdm
  11. import utils
  12. import jieba
  13. import zipfile
  14. import logging
  15. # 文件后缀:长尾词.txt
  16. FILE_SUFFIX_LONG_TAIL = "_长尾词.txt"
  17. # 文件后缀:长尾词_合并.txt
  18. FILE_LONG_TAIL_MERGE = "长尾词_合并.txt"
  19. # 文件:长尾词_合并_分词.txt
  20. FILE_LONG_TAIL_MERGE_SPLIT = "长尾词_合并_分词.txt"
  21. # 文件:长尾词_合并_分词倒排索引.txt
  22. FILE_LONG_TAIL_MERGE_REVERSE_INDEX = "长尾词_合并_倒排索引.txt"
  23. # 文件:长尾词_合并_聚合.txt
  24. FILE_LONG_TAIL_MERGE_AGG = "长尾词_合并_聚合.txt"
  25. # 子文件:长尾词_合并_聚合_%s.txt
  26. FILE_LONG_TAIL_MERGE_AGG_PID = "长尾词_合并_聚合_%s.txt"
  27. # 缓存前缀:分词词根
  28. CACHE_WORD_STEM = "word:stem"
  29. # 缓存前缀:倒排索引
  30. CACHE_WORD_REVERSE_INDEX = "word:reverse_index"
  31. # 缓存:长尾词缓存
  32. CACHE_WORD = "word"
  33. # 缓存:聚合位图
  34. CACHE_UNUSED_BITMAP = "unused_bitmap"
  35. # 字符集:UTF-8
  36. CHARSET_UTF_8 = "UTF-8"
  37. def extract_word_from_5118(file_path: str):
  38. """
  39. 从5118关键词压缩文件中提取数据
  40. :param file_path: 待处理文件夹路径
  41. :return: None
  42. """
  43. file_list = []
  44. for file in os.listdir(file_path):
  45. file_list.append(os.path.join(file_path, file))
  46. for i, file in enumerate(file_list):
  47. zfile = zipfile.ZipFile(file)
  48. filenames = zfile.namelist()
  49. for filename in filenames:
  50. # 重新编码文件名为正确形式
  51. real_name = filename.encode('cp437').decode('gbk')
  52. # 排除无效文件
  53. if real_name in ['打开乱码如何处理?.txt']:
  54. continue
  55. # 关键词存放容器
  56. word_container = set()
  57. # 读取压缩文件中的文件
  58. with zfile.open(filename) as file_content:
  59. lines = file_content.readlines()
  60. # 跳过开头两行
  61. for line in lines[2:]:
  62. split = line.decode("gbk").split(",")
  63. # 只需要第一列的数据
  64. word_container.add(split[0])
  65. output_file_name = real_name[0:real_name.index("--")]
  66. output_file_path = os.path.join(file_path, output_file_name + FILE_SUFFIX_LONG_TAIL)
  67. with open(output_file_path, "w", encoding="utf-8") as f:
  68. for item in word_container:
  69. f.write(item)
  70. f.write("\n")
  71. def merge_word(file_path: str):
  72. """
  73. 合并长尾词(带去重)
  74. :param file_path: 待处理文件夹路径
  75. :return: None
  76. """
  77. # 获取文件列表
  78. file_list = []
  79. for file in os.listdir(file_path):
  80. if file.endswith(FILE_SUFFIX_LONG_TAIL):
  81. file_list.append(os.path.join(file_path, file))
  82. # 长尾词集合容器
  83. word_set = set()
  84. # 读取数据并排重
  85. for i, file in enumerate(file_list):
  86. with open(file, "r", encoding="utf-8") as f:
  87. for word in f:
  88. word_set.add(word.replace("\n", ""))
  89. # 保存合并结果
  90. with open(os.path.join(file_path, FILE_LONG_TAIL_MERGE), "w", encoding="utf-8") as f:
  91. for item in word_set:
  92. f.write(item)
  93. f.write("\n")
  94. def word_split_statistics(file_path: str):
  95. """
  96. 分词统计
  97. :param file_path: 待处理文件夹路径
  98. :return: None
  99. """
  100. file_list = []
  101. for file in os.listdir(file_path):
  102. file_list.append(os.path.join(file_path, file))
  103. stop_word_dict = utils.load_stop_word()
  104. for i, file in enumerate(file_list):
  105. if not file.endswith(FILE_SUFFIX_LONG_TAIL):
  106. continue
  107. # 分词结果容器
  108. key_dict = {}
  109. with open(file, "r", encoding="utf-8") as f:
  110. for tmp_word in f:
  111. # 分词
  112. word_list = jieba.cut_for_search(tmp_word.replace("\n", ""))
  113. # 统计
  114. for word in word_list:
  115. # 过滤停用词
  116. if word in stop_word_dict:
  117. continue
  118. if word in key_dict:
  119. key_dict[word] = key_dict[word] + 1
  120. else:
  121. key_dict[word] = 1
  122. # 根据词频进行倒序排列
  123. sorted_key_list = sorted(key_dict.items(), key=lambda x: x[1], reverse=True)
  124. output_file_name = file[file.rindex("\\") + 1:file.index(FILE_SUFFIX_LONG_TAIL)]
  125. output_file_path = os.path.join(file_path, output_file_name + "_长尾词_分词统计.csv")
  126. with open(output_file_path, "w", encoding="UTF-8") as f:
  127. for key, count in sorted_key_list:
  128. f.write("%s,%d\n" % (key, count))
  129. def word_split_reverse(input_file: str, start_pos: int, end_pos: int):
  130. """
  131. 分词和建立倒排索引
  132. :param input_file: 待处理的文件
  133. :param start_pos: 处理的开始位置
  134. :param end_pos: 处理的结束位置
  135. :return: (开始位置,分词结果,倒排索引)
  136. """
  137. # 加载停用词
  138. stop_word_dict = utils.load_stop_word()
  139. # 关键词存放容器
  140. word_arr_list = []
  141. # 倒排索引
  142. word_reverse_index = dict()
  143. with open(input_file, "r", encoding="utf-8") as fr:
  144. for i, tmp_word in enumerate(fr):
  145. # start_pos是行数,而i要从0开始
  146. if i + 1 < start_pos:
  147. continue
  148. # 当前位置
  149. cur_pos = i + 1
  150. # 到达任务边界,结束
  151. if cur_pos == end_pos:
  152. break
  153. # 分词
  154. word_list = jieba.cut_for_search(tmp_word.replace("\n", ""))
  155. # 分词过滤结果
  156. word_filter_arr = []
  157. # 过滤停用词
  158. for word in word_list:
  159. if word in stop_word_dict:
  160. continue
  161. word_index_arr = word_reverse_index.get(word)
  162. if word_index_arr:
  163. word_index_arr.append(cur_pos)
  164. else:
  165. word_reverse_index[word] = [cur_pos]
  166. word_filter_arr.append(word)
  167. if len(word_filter_arr) == 0:
  168. word_arr_list.append([])
  169. else:
  170. word_arr_list.append(word_filter_arr)
  171. return start_pos, word_arr_list, word_reverse_index
  172. def prepare_word_split_and_reverse_index(file_path: str):
  173. """
  174. 预处理:长尾词分词、建立倒排索引
  175. :param file_path: 待处理文件夹路径
  176. :return:
  177. """
  178. # 判断文件是否存在
  179. word_input_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE)
  180. if os.path.exists(word_input_file) and not os.path.isfile(word_input_file):
  181. print("文件不存在! " + word_input_file)
  182. return
  183. # 总文本数量
  184. total_line_num = 0
  185. with open(word_input_file, "r", encoding="utf-8") as fi:
  186. total_line_num = sum(1 for line in fi)
  187. if total_line_num == 0:
  188. print("没有待处理的数据,文本量为0")
  189. return
  190. # 分割任务数量
  191. task_list = utils.avg_split_task(total_line_num, math.ceil(total_line_num / os.cpu_count()))
  192. # 任务进程处理结果
  193. p_result_list = []
  194. # 多进程处理
  195. with ProcessPoolExecutor(os.cpu_count()) as process_pool:
  196. # 提交任务
  197. process_futures = [process_pool.submit(word_split_reverse, word_input_file, task[0], task[1]) for task in
  198. task_list]
  199. # 处理返回结果
  200. for p_future in as_completed(process_futures):
  201. p_result = p_future.result()
  202. if p_result:
  203. p_result_list.append(p_result)
  204. # 分词结果排序
  205. p_result_list = sorted(p_result_list, key=lambda v: v[0])
  206. # 输出分词结果
  207. split_output_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_SPLIT)
  208. with open(split_output_file, "w", encoding="UTF-8") as fo:
  209. for start_pos, word_arr_list, reverse_index in p_result_list:
  210. for word_arr in word_arr_list:
  211. fo.write("%s\n" % ",".join([str(i) for i in word_arr]))
  212. # 关键词倒排索引
  213. word_reverse_index_dict = dict()
  214. # 合并倒排索引
  215. for start_pos, word_arr, reverse_index_dict in p_result_list:
  216. for key, value in reverse_index_dict.items():
  217. reverse_index_arr = word_reverse_index_dict.get(key)
  218. if reverse_index_arr:
  219. reverse_index_arr.extend(value)
  220. else:
  221. word_reverse_index_dict[key] = value
  222. # 输出倒排索引
  223. with open(os.path.join(file_path, FILE_LONG_TAIL_MERGE_REVERSE_INDEX), "w", encoding="UTF-8") as fo:
  224. for key, value in word_reverse_index_dict.items():
  225. fo.write("%s,%s\n" % (key, value))
  226. # 关闭进程池
  227. process_pool.shutdown()
  228. def agg_word_process(file_path: str, agg_threshold: float, start_pos: int, end_pos: int, final_pos:int, skip_line: int):
  229. """
  230. 长尾词聚合处理
  231. :param file_path: 文件路径
  232. :param word_file: 长尾词文件路径
  233. :param agg_threshold: 聚合阈值
  234. :param start_pos: 任务处理开始边界(包含)
  235. :param end_pos: 任务处理结束边界(不包含)
  236. :param final_pos: 总任务边界
  237. :param skip_line: 进度条显示位置
  238. :return:
  239. """
  240. # 生成临时结果文件
  241. word_agg_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_AGG_PID % os.getpid())
  242. # redis缓存池
  243. redis_pool = redis.ConnectionPool(host='127.0.0.1', port=6379, max_connections=1)
  244. # redis缓存
  245. redis_cache = redis.StrictRedis(connection_pool=redis_pool)
  246. # 进度长度
  247. process_len = 0
  248. if end_pos == -1:
  249. process_len = final_pos - start_pos
  250. else:
  251. process_len = end_pos - start_pos
  252. with (open(word_agg_file, "a", encoding="UTF-8") as fo,
  253. tqdm(total=process_len, desc='子进程-%s:文本聚合进度' % os.getpid(), unit='份', unit_scale=True, position=skip_line) as pbar):
  254. for main_word_position in range(start_pos, end_pos):
  255. try:
  256. # 判断主词是否为已使用,是则跳过,否则设置为已使用
  257. if redis_cache.getbit(CACHE_UNUSED_BITMAP, main_word_position):
  258. continue
  259. else:
  260. redis_cache.setbit(CACHE_UNUSED_BITMAP, main_word_position, 1)
  261. # 获取主词,移除换行符
  262. main_word = redis_cache.hget(CACHE_WORD, main_word_position)
  263. if not main_word:
  264. continue
  265. main_word = main_word.decode(CHARSET_UTF_8)
  266. # 获取主词分词结果
  267. main_word_stem = redis_cache.hget(CACHE_WORD_STEM, main_word_position)
  268. # 为空则跳过
  269. if not main_word_stem:
  270. continue
  271. main_word_stem_list = main_word_stem.decode(CHARSET_UTF_8).split(",")
  272. # 从倒排索引中获取候选词的位置索引
  273. candidate_position_set = set()
  274. temp_candidate_position_list = redis_cache.hmget(CACHE_WORD_REVERSE_INDEX, main_word_stem_list)
  275. for temp_candidate_position in temp_candidate_position_list:
  276. if not temp_candidate_position:
  277. continue
  278. candidate_position_set.update(temp_candidate_position.decode(CHARSET_UTF_8).split(","))
  279. # 没有找到需要计算的候选词则跳过
  280. if not candidate_position_set:
  281. continue
  282. # 结果列表
  283. result_list = []
  284. # 计算相似度
  285. for candidate_position in candidate_position_set:
  286. # 跳过重复
  287. if main_word_position == candidate_position:
  288. continue
  289. # 获取关键词
  290. candidate_word = redis_cache.hget(CACHE_WORD, candidate_position)
  291. if not candidate_word:
  292. continue
  293. candidate_word = candidate_word.decode(CHARSET_UTF_8)
  294. # 获取分词结果
  295. candidate_word_stem = redis_cache.hget(CACHE_WORD_STEM, candidate_position)
  296. # 为空则跳过
  297. if not candidate_word_stem:
  298. continue
  299. candidate_word_stem_list = candidate_word_stem.decode(CHARSET_UTF_8).split(",")
  300. # 计算相关性
  301. try:
  302. val = utils.cal_cos_sim(main_word, main_word_stem_list, candidate_word,
  303. candidate_word_stem_list)
  304. if val >= agg_threshold:
  305. redis_cache.setbit(CACHE_UNUSED_BITMAP, candidate_position, 1)
  306. result_list.append(candidate_word)
  307. except Exception as e:
  308. logging.error("主关键词:%s 发生异常,涉及的副关键词信息-关键词:%s,分词:%s" % (
  309. main_word, candidate_word, candidate_word_stem_list), e)
  310. # 保存结果
  311. if len(result_list) > 0:
  312. fo.write("%s\n" % main_word)
  313. for candidate_word in result_list:
  314. fo.write("%s\n" % candidate_word)
  315. fo.write("\n")
  316. # 清除容器数据
  317. candidate_position_set.clear()
  318. result_list.clear()
  319. # 更新发呆进度
  320. pbar.update(1)
  321. except Exception as e:
  322. logging.error("子进程发生异常", e)
  323. return
  324. def agg_word(file_path: str):
  325. """
  326. 长尾词聚合
  327. :param file_path:
  328. :return:
  329. """
  330. # 总长尾词数量
  331. word_total_num = 0
  332. # 聚合阈值
  333. agg_threshold = 0.8
  334. # 每份任务计算量
  335. task_cal_num = 10000
  336. # 工作现成(减1是为了留一个处理器给redis)
  337. worker_num = os.cpu_count() - 1
  338. # worker_num = 1
  339. # 正则表达式:聚合文件分文件
  340. agg_file_pattern = re.compile(r"长尾词_合并_聚合_\d+.txt", re.I)
  341. # redis缓存
  342. redis_cache = redis.StrictRedis(host='127.0.0.1', port=6379)
  343. # 判断文件是否存在
  344. for file_name in [FILE_LONG_TAIL_MERGE, FILE_LONG_TAIL_MERGE_SPLIT, FILE_LONG_TAIL_MERGE_REVERSE_INDEX]:
  345. input_file = os.path.join(file_path, file_name)
  346. if os.path.exists(input_file) and not os.path.isfile(input_file):
  347. raise Exception("文件不存在!文件路径:" + input_file)
  348. # 删除历史数据文件
  349. for file in os.listdir(file_path):
  350. if agg_file_pattern.match(file):
  351. os.remove(os.path.join(file_path, file))
  352. # 缓存关键词位置
  353. word_file = os.path.join(filePath, FILE_LONG_TAIL_MERGE)
  354. word_dict = {}
  355. with open(word_file, "r", encoding="utf-8") as f:
  356. for position, word in enumerate(f, start=1):
  357. word = utils.remove_line_break(word)
  358. if not word:
  359. continue
  360. word_dict[position] = word
  361. redis_cache.hset(CACHE_WORD, mapping=word_dict)
  362. # 记录总关键词数
  363. word_total_num = len(word_dict)
  364. # 释放内存
  365. del word_dict
  366. #
  367. # # 缓存分词
  368. # word_split_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_SPLIT)
  369. # word_split_dict = {}
  370. # with open(word_split_file, "r", encoding=CHARSET_UTF_8) as f:
  371. # for position, word_split_line in enumerate(f, start=1):
  372. # word_split_line = utils.remove_line_break(word_split_line)
  373. # if not word_split_line:
  374. # continue
  375. # word_split_dict[position] = word_split_line
  376. # redis_cache.hset(CACHE_WORD_STEM, mapping=word_split_dict)
  377. # # 释放内存
  378. # del word_split_dict
  379. #
  380. # # 缓存倒排索引
  381. # word_reverse_index_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_REVERSE_INDEX)
  382. # word_reverse_index_dict = {}
  383. # # 分词
  384. # key_pattern = re.compile(r"([^,]+),\[", re.I)
  385. # # 索引
  386. # index_pattern = re.compile(r"\d+", re.I)
  387. # with open(word_reverse_index_file, "r", encoding="utf-8") as f:
  388. # for word_split_line in f:
  389. # key_m = key_pattern.match(word_split_line)
  390. # key = key_m.group(1)
  391. # val = index_pattern.findall(word_split_line[word_split_line.index(","):])
  392. # word_reverse_index_dict[key] = ",".join(val)
  393. # redis_cache.hset(CACHE_WORD_REVERSE_INDEX, mapping=word_reverse_index_dict)
  394. # # 释放内存
  395. # del word_reverse_index_dict
  396. # 构建长尾词使用位图
  397. redis_cache.setbit(CACHE_UNUSED_BITMAP, word_total_num + 1, 1)
  398. # 提交任务 并输出结果
  399. word_agg_file = os.path.join(file_path, FILE_LONG_TAIL_MERGE_AGG)
  400. with ProcessPoolExecutor(max_workers=worker_num) as process_pool:
  401. # 计算任务边界
  402. task_list = utils.avg_split_task(word_total_num, task_cal_num, 1)
  403. # 提交任务
  404. process_futures = []
  405. for skip_line, pos in enumerate(task_list, start=1):
  406. # skip_line = (skip_line % 4 ) + 1
  407. skip_line = 1
  408. p_future = process_pool.submit(agg_word_process, file_path, agg_threshold, pos[0], pos[1], word_total_num, skip_line)
  409. process_futures.append(p_future)
  410. # 显示任务进度
  411. with tqdm(total=len(process_futures), desc='文本聚合进度', unit='份', unit_scale=True, position=0) as pbar:
  412. for p_future in as_completed(process_futures):
  413. p_result = p_future.result()
  414. # 更新发呆进度
  415. pbar.update(1)
  416. # 关闭线程
  417. process_pool.shutdown()
  418. # 获取子进程结果文件列表,并合并
  419. with open(word_agg_file, "w", encoding="UTF-8") as fo:
  420. for file in os.listdir(file_path):
  421. # 不是处理结果部分跳过
  422. if not agg_file_pattern.match(file):
  423. continue
  424. with open(os.path.join(file_path, file), "r", encoding="UTF-8") as fi:
  425. for word in fi:
  426. fo.write(word)
  427. if __name__ == "__main__":
  428. print("开始时间" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
  429. # filePath = "../data"
  430. filePath = "../data/test"
  431. # extract_word_from_5118(filePath)
  432. # merge_word(filePath)
  433. # prepare_word_split_and_reverse_index(filePath)
  434. agg_word(filePath)
  435. # word_split_statistics(file_path)
  436. # tasks = utils.avg_split_task(100, 12, 1)
  437. # 两者计算余弦值等于:0.8
  438. # val = utils.cal_cos_sim("QQ邮箱格式怎么写", ["QQ", "邮箱", "格式", "怎么", "写"], "QQ邮箱格式如何写",
  439. # ["QQ", "邮箱", "格式", "如何", "写"])
  440. print("结束时间" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))