Bläddra i källkod

feat:聚合优化,改为使用生产和消费者模型,分离保存和计算过程

ChenYL 2 år sedan
förälder
incheckning
15c751b082

+ 34 - 17
src/main/java/top/zhixinghe1/money/AggApplication.java

@@ -1,14 +1,17 @@
 package top.zhixinghe1.money;
 
+import me.tongfei.progressbar.ProgressBar;
 import org.apache.commons.lang3.StringUtils;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.FileReader;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -24,6 +27,7 @@ import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.regex.Matcher;
@@ -167,32 +171,45 @@ public class AggApplication {
         // 分割计算任务
         List<CalTask> calTasks = avgSplitTask(totalWord, perTaskNum);
 
+        LinkedBlockingQueue<CalResult> queue = new LinkedBlockingQueue();
+
         // 提交任务
         ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
-//        ExecutorService executorService = Executors.newFixedThreadPool(1);
         for (CalTask calTask : calTasks) {
-            executorService.submit(new CalRunable(calTask.getStartPos(), calTask.getEndPos(), dataDirPath, wordCache, indexCache, bitmap));
+            executorService.submit(new CalRunable(calTask.getStartPos(), calTask.getEndPos(), dataDirPath, wordCache, indexCache, bitmap, queue));
         }
 
-        // 等待任务执行完成
-        executorService.awaitTermination(12, TimeUnit.HOURS);
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
+        String aggFilePath = String.join(File.separator, dataDirPath, String.format("长尾词_合并_聚合_%s.txt", sdf.format(new Date())));
+        try (ProgressBar pb = new ProgressBar("文本聚合计算", totalWord);
+             FileWriter fileWriter = new FileWriter(aggFilePath);
+             BufferedWriter bufferedWriter = new BufferedWriter(fileWriter);) {
+            int taskNum = calTasks.size();
+            int currentTaskProgress = 0;
+            while (true) {
+                CalResult take = queue.take();
+                if (take.isAggStatus()) {
+                    for (String word : take.getSimilarWords()) {
+                        bufferedWriter.write(word);
+                        bufferedWriter.write("\n");
+                    }
 
-        // 合并计算结果
-        List<File> aggResultFiles = Arrays.stream(dataDir.listFiles()).filter(file -> aggFilePattern.matcher(file.getName()).find()).collect(Collectors.toList());
-        if (aggResultFiles.size() == 0) {
-            System.out.println("没有找到任何计算分结果,任务结束");
-            return;
-        }
-        String aggFilePath = String.join(File.separator, dataDirPath, "长尾词_合并_聚合.txt");
-        try (FileOutputStream fileOutputStream = new FileOutputStream(aggFilePath);
-             BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream)) {
-            for (File aggResultFile : aggResultFiles) {
-                try (FileInputStream fileInputStream = new FileInputStream(aggResultFile);
-                     BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream)) {
-                    bufferedOutputStream.write(bufferedInputStream.readAllBytes());
+                    bufferedWriter.write("\n");
+                }
+                if (take.isEndStatus()) {
+                    currentTaskProgress ++ ;
+                }
+                // 更新发呆进度
+                pb.step();
+
+                if (taskNum == currentTaskProgress) {
+                   break;
                 }
             }
         }
+
+        executorService.awaitTermination(1, TimeUnit.MINUTES);
+        System.out.println("聚合任务执行完成");
     }
 
     private static List<CalTask> avgSplitTask(int total, int internal) {

+ 41 - 0
src/main/java/top/zhixinghe1/money/CalResult.java

@@ -0,0 +1,41 @@
+package top.zhixinghe1.money;
+
+import java.util.List;
+
+public class CalResult {
+
+    private boolean endStatus;
+
+    private boolean aggStatus;
+
+    private List<String> similarWords;
+
+    public CalResult(boolean aggStatus, List<String> similarWords) {
+        this.aggStatus = aggStatus;
+        this.similarWords = similarWords;
+    }
+
+    public boolean isAggStatus() {
+        return aggStatus;
+    }
+
+    public void setAggStatus(boolean aggStatus) {
+        this.aggStatus = aggStatus;
+    }
+
+    public List<String> getSimilarWords() {
+        return similarWords;
+    }
+
+    public void setSimilarWords(List<String> similarWords) {
+        this.similarWords = similarWords;
+    }
+
+    public boolean isEndStatus() {
+        return endStatus;
+    }
+
+    public void setEndStatus(boolean endStatus) {
+        this.endStatus = endStatus;
+    }
+}

+ 56 - 65
src/main/java/top/zhixinghe1/money/CalRunable.java

@@ -16,6 +16,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
 
 public class CalRunable implements Runnable {
 
@@ -33,91 +34,81 @@ public class CalRunable implements Runnable {
 
     private BitSet bitmap = null;
 
-    private static final ThreadLocal<BufferedWriter> threadLocal = new ThreadLocal();
-
     private CosineSimilarity cosineSimilarity = new CosineSimilarity();
 
     private Double aggThreshold = 0.8;
 
-    public CalRunable(int start, int end, String dataDirPath, Map<Integer, Word> wordCache, Map<String, Set<Integer>> indexCache, BitSet bitmap) {
+    private LinkedBlockingQueue<CalResult> queue;
+
+    private Set<Integer> indexSet = new HashSet<>();
+    private List<String> result = new ArrayList<>();
+
+    public CalRunable(int start, int end, String dataDirPath, Map<Integer, Word> wordCache, Map<String, Set<Integer>> indexCache, BitSet bitmap, LinkedBlockingQueue<CalResult> queue) {
         this.start = start;
         this.end = end;
         this.dataDirPath = dataDirPath;
         this.wordCache = wordCache;
         this.indexCache = indexCache;
         this.bitmap = bitmap;
+        this.queue = queue;
     }
 
     @Override
     public void run() {
         try {
-            BufferedWriter bufferedWriter = threadLocal.get();
-            if (Objects.isNull(bufferedWriter)) {
-                String aggFilePath = String.join(File.separator, dataDirPath, String.format("长尾词_合并_聚合_%s.txt", Thread.currentThread().getId()));
-                try {
-                    FileWriter fileWriter = new FileWriter(new File(aggFilePath));
-                    bufferedWriter = new BufferedWriter(fileWriter);
-                    threadLocal.set(bufferedWriter);
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-            Set<Integer> indexSet = new HashSet<>();
-            List<String> result = new ArrayList<>();
-            try (ProgressBar pb = new ProgressBar(String.format("线程-%s 文本聚合计算", Thread.currentThread().getId()), end-start+1+1)) {
-                for (int i = start; i <= end; i++) {
-                    // 更新发呆
-                    pb.step();
-
-                    if (bitmap.get(i)) {
-                        continue;
-                    }
-
-                    Word word = wordCache.get(i);
-                    if (Objects.isNull(word.getStemMap()) || word.getStemMap().size() == 0) {
-                        continue;
-                    }
-                    bitmap.set(i, true);
-                    result.add(word.getKey());
-
-                    for (CharSequence stem : word.getStemMap().keySet()) {
-                        Set<Integer> positions = indexCache.get(stem);
-                        for (Integer position : positions) {
-                            if (bitmap.get(position)) {
-                                positions.remove(position);
-                            } else {
-                                indexSet.add(position);
-                            }
-                        }
-                    }
-
-                    for (Integer index : indexSet) {
-                        Word candicateWord = wordCache.get(index);
-                        if (Objects.isNull(candicateWord.getStemMap())) {
-                            continue;
-                        }
-
-                        Double v = cosineSimilarity.cosineSimilarity(word.getStemMap(), candicateWord.getStemMap());
-                        if (v < aggThreshold) {
-                            continue;
-                        }
-                        result.add(candicateWord.getKey());
-                    }
-
-                    // 输出计算结果
-                    if (result.size() == 1) {
-                        continue;
-                    }
-                    for (String s : result) {
-                        bufferedWriter.write(s);
-                        bufferedWriter.write("\n");
-                    }
-                    bufferedWriter.write("\n");
+            for (int i = start; i <= end; i++) {
+                CalResult calResult = null;
+                if (cal(i)) {
+                    calResult = new CalResult(true, new ArrayList<>(result));
+                } else {
+                    calResult = new CalResult(false, null);
                 }
+                calResult.setEndStatus(i == end);
+                queue.put(calResult);
             }
         } catch (Exception e) {
             e.printStackTrace();
         }
     }
 
+    private boolean cal(int i) {
+        // 判断是否已进行计算
+        if (bitmap.get(i)) {
+            return false;
+        }
+
+        // 清除上一轮的数据
+        indexSet.clear();
+        result.clear();
+
+        Word word = wordCache.get(i);
+        if (Objects.isNull(word.getStemMap()) || word.getStemMap().size() == 0) {
+            return false;
+        }
+        bitmap.set(i, true);
+        result.add(word.getKey());
+        for (CharSequence stem : word.getStemMap().keySet()) {
+            Set<Integer> positions = indexCache.get(stem);
+            for (Integer position : positions) {
+                if (bitmap.get(position)) {
+                    positions.remove(position);
+                } else {
+                    indexSet.add(position);
+                }
+            }
+        }
+        for (Integer index : indexSet) {
+            Word candicateWord = wordCache.get(index);
+            if (Objects.isNull(candicateWord.getStemMap())) {
+                continue;
+            }
+            Double v = cosineSimilarity.cosineSimilarity(word.getStemMap(), candicateWord.getStemMap());
+            if (v < aggThreshold) {
+                continue;
+            }
+            result.add(candicateWord.getKey());
+        }
+        // 输出计算结果
+        return result.size() > 1;
+    }
 }