Просмотр исходного кода

feat:优化内存使用,增加支持大文本下的计算

ChenYL 2 лет назад
Родитель
Сommit
003b1ea4c6

+ 38 - 94
src/main/java/top/zhixinghe1/money/agg/Agg.java

@@ -1,30 +1,19 @@
 package top.zhixinghe1.money.agg;
 package top.zhixinghe1.money.agg;
 
 
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.LoadingCache;
 import me.tongfei.progressbar.ProgressBar;
 import me.tongfei.progressbar.ProgressBar;
 import me.tongfei.progressbar.ProgressBarBuilder;
 import me.tongfei.progressbar.ProgressBarBuilder;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.roaringbitmap.RoaringBitmap;
-import top.zhixinghe1.money.agg.cache.ReverseIndexLoader;
-import top.zhixinghe1.money.agg.cache.WordLoader;
-import top.zhixinghe1.money.agg.entity.BufferedRandomAccessFile;
+import top.zhixinghe1.money.agg.entity.CalInfo;
+import top.zhixinghe1.money.agg.entity.CalResource;
 import top.zhixinghe1.money.agg.entity.CalResult;
 import top.zhixinghe1.money.agg.entity.CalResult;
 import top.zhixinghe1.money.agg.entity.CalTask;
 import top.zhixinghe1.money.agg.entity.CalTask;
-import top.zhixinghe1.money.agg.entity.Word;
 
 
-import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.FileWriter;
-import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.time.temporal.ChronoUnit;
 import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.List;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -34,66 +23,37 @@ import java.util.concurrent.LinkedBlockingQueue;
  */
  */
 public class Agg {
 public class Agg {
 
 
-    private static final int perTaskNum = 10000;
 
 
-    private static RoaringBitmap bitmap = null;
 
 
     public void process(String dataDirPath) throws Exception {
     public void process(String dataDirPath) throws Exception {
-        if (StringUtils.isBlank(dataDirPath)) {
-            throw new Exception("没有输入目标数据路径");
-        }
-
-        // 判断传入路径是否有效
-        File dataDir = new File(dataDirPath);
-        if (!dataDir.exists() || !dataDir.isDirectory()) {
-            throw new Exception(String.format("数据目录路径不存在,%s", dataDirPath));
-        }
-
-        // 判断关键资源文件是否存在
-        List<String> fileNameList = Arrays.asList(Constant.WORD_STEM_FILE, Constant.WORD_REVERSE_INDEX_FILE);
-        for (String fileName : fileNameList) {
-            String resFilePath = String.join(File.separator, dataDirPath, fileName);
-            File resfile = new File(resFilePath);
-            if (!resfile.exists() || !resfile.isFile()) {
-                throw new Exception(String.format("文件不存在!文件路径:%s", resFilePath));
-            }
-        }
 
 
-        // 读取统计信息
-        int wordTotalNum = 499995;
-        int reverseIndexTotalNum = 88090;
+        // 校验资源
+        System.out.println("校验资源有效性...");
+        checkResource(dataDirPath);
 
 
-        // 创建关键词位置缓存
-        String wordFilePath = String.join(File.separator, dataDirPath, Constant.WORD_STEM_FILE);
-        long[] lineContentIndex = getLineContentIndex(wordTotalNum, wordFilePath);
-        LoadingCache<Integer, Word> wordCache = Caffeine.newBuilder()
-                .maximumSize(1000000)
-                .build(new WordLoader(dataDirPath, lineContentIndex));
+        // 计算任务公共信息
+        System.out.println("加载计算任务基本信息...");
+        CalInfo calInfo = new CalInfo(dataDirPath);
 
 
-        // 创建倒排索引缓存
-        String stemFilePath = String.join(File.separator, dataDirPath, Constant.WORD_REVERSE_INDEX_FILE);
-        Map<String, long[]> lienContentIndexMap = getLienContentIndexMap(reverseIndexTotalNum, stemFilePath);
-        LoadingCache<String, RoaringBitmap> indexCache = Caffeine.newBuilder()
-                .maximumSize(1000000)
-                .build(new ReverseIndexLoader(dataDirPath, lienContentIndexMap));
-
-        // 初始化已处理位图
-        bitmap = new RoaringBitmap();
+        // 计算资源
+        System.out.println("加载计算资源...");
+        CalResource calResource = new CalResource(calInfo);
 
 
         // 分割计算任务
         // 分割计算任务
-        List<CalTask> calTasks = avgSplitTask(wordTotalNum, perTaskNum);
-
-        LinkedBlockingQueue<CalResult> queue = new LinkedBlockingQueue();
+        System.out.println("生成计算任务...");
+        List<CalTask> calTasks = CalTask.avgSplitTask(calInfo.getTaskTotalNum(), calInfo.getPerTaskNum());
 
 
         // 提交任务
         // 提交任务
+        System.out.println("提交计算任务...");
         ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
         ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
         for (CalTask calTask : calTasks) {
         for (CalTask calTask : calTasks) {
-            executorService.submit(new CalRunnable(calTask.getStartPos(), calTask.getEndPos(), wordCache, indexCache, bitmap, queue));
+            executorService.submit(new CalRunnable(calInfo, calResource, calTask));
         }
         }
 
 
+        // 显示任务进度
         String aggFilePath = String.join(File.separator, dataDirPath, Constant.WORD_AGG_RESULT_TEMP_FILE);
         String aggFilePath = String.join(File.separator, dataDirPath, Constant.WORD_AGG_RESULT_TEMP_FILE);
         ProgressBarBuilder progressBarBuilder = new ProgressBarBuilder().setTaskName("文本聚合计算")
         ProgressBarBuilder progressBarBuilder = new ProgressBarBuilder().setTaskName("文本聚合计算")
-                .setInitialMax(wordTotalNum)
+                .setInitialMax(calInfo.getTaskTotalNum())
                 .setUnit("个", 1)
                 .setUnit("个", 1)
                 .setSpeedUnit(ChronoUnit.SECONDS)
                 .setSpeedUnit(ChronoUnit.SECONDS)
                 .showSpeed();
                 .showSpeed();
@@ -102,6 +62,7 @@ public class Agg {
              BufferedWriter bufferedWriter = new BufferedWriter(fileWriter);) {
              BufferedWriter bufferedWriter = new BufferedWriter(fileWriter);) {
             int taskNum = calTasks.size();
             int taskNum = calTasks.size();
             int currentTaskProgress = 0;
             int currentTaskProgress = 0;
+            LinkedBlockingQueue<CalResult> queue = calResource.getQueue();
             while (true) {
             while (true) {
                 CalResult take = queue.take();
                 CalResult take = queue.take();
                 if (take.isAggStatus()) {
                 if (take.isAggStatus()) {
@@ -128,47 +89,30 @@ public class Agg {
         System.out.println("聚合任务执行完成");
         System.out.println("聚合任务执行完成");
     }
     }
 
 
-    private static List<CalTask> avgSplitTask(int total, int internal) {
-        // 分割的任务份数
-        int taskNum = (int) Math.ceil((double) total / internal);
-        // 平分
-        List<CalTask> calTasks = new ArrayList<>();
-        for (int i = 0; i < taskNum; i++) {
-            int start = i * internal + 1;
-            int end = i * internal + internal;
-            if (end > total) {
-                end = total;
-            }
-            calTasks.add(new CalTask(start, end));
+    /**
+     * 校验资源
+     * @param dataDirPath
+     * @throws Exception
+     */
+    private void checkResource(String dataDirPath) throws Exception {
+        if (StringUtils.isBlank(dataDirPath)) {
+            throw new Exception("没有输入目标数据路径");
         }
         }
-        return calTasks;
-    }
 
 
-    private long[] getLineContentIndex(int wordTotalNum, String filePath) throws IOException {
-        BufferedRandomAccessFile randomAccessFile = new BufferedRandomAccessFile(filePath, "r");
-        long[] lineContentIndex = new long[wordTotalNum+1];
-        for (int i = 1; i <= wordTotalNum; i++, randomAccessFile.readLine()) {
-            lineContentIndex[i] = randomAccessFile.getFilePointer();
+        // 判断传入路径是否有效
+        File dataDir = new File(dataDirPath);
+        if (!dataDir.exists() || !dataDir.isDirectory()) {
+            throw new Exception(String.format("数据目录路径不存在,%s", dataDirPath));
         }
         }
-        return lineContentIndex;
-    }
 
 
-    private ConcurrentHashMap<String, long[]> getLienContentIndexMap(int stemTotalNum, String filePath) throws IOException {
-//        BufferedRandomAccessFile randomAccessFile = new BufferedRandomAccessFile(filePath, "r");
-        // TODO 这里太慢了
-        RandomAccessFile randomAccessFile = new RandomAccessFile(filePath, "r");
-        ConcurrentHashMap<String, long[]> lineContentIndexMap = new ConcurrentHashMap<>(stemTotalNum);
-        String line = null;
-        int cnt = 0;
-        while ((line = randomAccessFile.readLine()) != null) {
-            line = new String(line.getBytes("8859_1"), "UTF-8");
-            String stem = line.substring(0, line.indexOf(","));
-            long[] basicInfo = new long[2];
-            basicInfo[0] = randomAccessFile.getFilePointer();
-            basicInfo[1] = line.getBytes().length;
-            lineContentIndexMap.put(stem, basicInfo);
-            cnt++;
+        // 判断关键资源文件是否存在
+        List<String> fileNameList = Arrays.asList(Constant.WORD_STEM_FILE, Constant.WORD_REVERSE_INDEX_FILE, Constant.WORD_BASIC_INFO_FILE);
+        for (String fileName : fileNameList) {
+            String resFilePath = String.join(File.separator, dataDirPath, fileName);
+            File resfile = new File(resFilePath);
+            if (!resfile.exists() || !resfile.isFile()) {
+                throw new Exception(String.format("文件不存在!文件路径:%s", resFilePath));
+            }
         }
         }
-        return lineContentIndexMap;
     }
     }
 }
 }

+ 62 - 51
src/main/java/top/zhixinghe1/money/agg/CalRunnable.java

@@ -1,16 +1,18 @@
 package top.zhixinghe1.money.agg;
 package top.zhixinghe1.money.agg;
 
 
-import com.github.benmanes.caffeine.cache.LoadingCache;
 import org.apache.commons.text.similarity.CosineSimilarity;
 import org.apache.commons.text.similarity.CosineSimilarity;
 import org.roaringbitmap.RoaringBitmap;
 import org.roaringbitmap.RoaringBitmap;
+import top.zhixinghe1.money.agg.entity.CalInfo;
+import top.zhixinghe1.money.agg.entity.CalResource;
 import top.zhixinghe1.money.agg.entity.CalResult;
 import top.zhixinghe1.money.agg.entity.CalResult;
+import top.zhixinghe1.money.agg.entity.CalTask;
 import top.zhixinghe1.money.agg.entity.Word;
 import top.zhixinghe1.money.agg.entity.Word;
 
 
+import java.math.BigDecimal;
+import java.math.RoundingMode;
 import java.util.ArrayList;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.List;
 import java.util.Objects;
 import java.util.Objects;
-import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 
 /**
 /**
@@ -18,44 +20,43 @@ import java.util.concurrent.LinkedBlockingQueue;
  */
  */
 public class CalRunnable implements Runnable {
 public class CalRunnable implements Runnable {
 
 
-    private int start;
+    /**
+     * 计算任务基础信息
+     */
+    private CalInfo calInfo;
 
 
-    private int end;
+    /**
+     * 计算资源
+     */
+    private CalResource calResource;
 
 
-    private LoadingCache<Integer, Word> wordCache;
-
-    private LoadingCache<String, RoaringBitmap> indexCache;
-
-    private RoaringBitmap bitmap;
+    /**
+     * 计算任务边界
+     */
+    private CalTask calTask;
 
 
+    /**
+     * 相似度计算器
+     */
     private CosineSimilarity cosineSimilarity = new CosineSimilarity();
     private CosineSimilarity cosineSimilarity = new CosineSimilarity();
 
 
-    private Double aggThreshold = 0.8;
-
-    private LinkedBlockingQueue<CalResult> queue;
-
-    private List<String> result = new ArrayList<>();
-
-    public CalRunnable(int start, int end, LoadingCache<Integer, Word> wordCache, LoadingCache<String, RoaringBitmap> indexCache, RoaringBitmap bitmap, LinkedBlockingQueue<CalResult> queue) {
-        this.start = start;
-        this.end = end;
-        this.wordCache = wordCache;
-        this.indexCache = indexCache;
-        this.bitmap = bitmap;
-        this.queue = queue;
+    public CalRunnable(CalInfo calInfo, CalResource calResource, CalTask calTask) {
+        this.calInfo = calInfo;
+        this.calResource = calResource;
+        this.calTask = calTask;
     }
     }
 
 
     @Override
     @Override
     public void run() {
     public void run() {
         try {
         try {
-            for (int i = start; i <= end; i++) {
-                CalResult calResult;
-                if (cal(i)) {
-                    calResult = new CalResult(true, new ArrayList<>(result));
-                } else {
-                    calResult = new CalResult(false, null);
-                }
-                calResult.setEndStatus(i == end);
+            int end = calTask.getEndPos();
+            LinkedBlockingQueue<CalResult> queue = calResource.getQueue();
+            for (int pos = calTask.getStartPos(); pos <= end; pos++) {
+                // 计算聚合结果
+                List<String> result = cal(pos);
+                // 返回计算结果
+                CalResult calResult = Objects.nonNull(result) ? new CalResult(true, result) : new CalResult(false, null);
+                calResult.setEndStatus(pos == end);
                 queue.put(calResult);
                 queue.put(calResult);
             }
             }
         } catch (Exception e) {
         } catch (Exception e) {
@@ -63,42 +64,52 @@ public class CalRunnable implements Runnable {
         }
         }
     }
     }
 
 
-    private boolean cal(int i) {
+    /**
+     * 聚合计算
+     * @param pos
+     * @return
+     */
+    private List<String> cal(int pos) {
         // 判断是否已进行计算
         // 判断是否已进行计算
-        if (bitmap.contains(i)) {
-            return false;
+        if (!calResource.checkAndSetCalStatus(pos)) {
+            return null;
         }
         }
 
 
-        // 清除上一轮的数据
-        result.clear();
-
-        Word word = wordCache.get(i);
+        // 获取主词
+        Word word = calResource.getWord(pos);
         if (Objects.isNull(word.getStemMap()) || word.getStemMap().size() == 0) {
         if (Objects.isNull(word.getStemMap()) || word.getStemMap().size() == 0) {
-            return false;
+            return null;
         }
         }
-        bitmap.add(i);
-        result.add(word.getKey());
 
 
+        // 计算候选词位图
         RoaringBitmap finalBitmap = new RoaringBitmap();
         RoaringBitmap finalBitmap = new RoaringBitmap();
         for (CharSequence stem : word.getStemMap().keySet()) {
         for (CharSequence stem : word.getStemMap().keySet()) {
-            RoaringBitmap stemBitmap = indexCache.get((String) stem);
+            RoaringBitmap stemBitmap = calResource.getWordBitmap((String) stem);
             finalBitmap.or(stemBitmap);
             finalBitmap.or(stemBitmap);
         }
         }
-        finalBitmap.andNot(bitmap);
+        finalBitmap.andNot(calResource.getUsedBitmap());
 
 
-        for (Integer index : finalBitmap) {
-            Word candicateWord = wordCache.get(index);
-            if (Objects.isNull(candicateWord.getStemMap())) {
+        // 设置主词
+        List<String> result = new ArrayList<>();
+        result.add(word.getKey());
+
+        // 计算候选词
+        for (Integer candidatePos : finalBitmap) {
+            Word candidateWord = calResource.getWord(candidatePos);
+            if (Objects.isNull(candidateWord.getStemMap())) {
                 continue;
                 continue;
             }
             }
-            Double v = cosineSimilarity.cosineSimilarity(word.getStemMap(), candicateWord.getStemMap());
-            if (v < aggThreshold) {
+            // 计算相似度,小于聚合阈值跳过
+            Double similarity = cosineSimilarity.cosineSimilarity(word.getStemMap(), candidateWord.getStemMap());
+            if (BigDecimal.valueOf(similarity).setScale(2, RoundingMode.HALF_UP).compareTo(calInfo.getAggThreshold()) < 0) {
                 continue;
                 continue;
             }
             }
-            bitmap.add(index);
-            result.add(candicateWord.getKey());
+            // 设置长尾词使用状态
+            calResource.setCalStatus(candidatePos);
+            result.add(candidateWord.getKey());
         }
         }
+
         // 输出计算结果
         // 输出计算结果
-        return result.size() > 1;
+        return result.size() >= calInfo.getAggResultThreshold() ? result : null;
     }
     }
 }
 }

+ 5 - 0
src/main/java/top/zhixinghe1/money/agg/Constant.java

@@ -19,4 +19,9 @@ public class Constant {
      * 长尾词聚合结果临时文件
      * 长尾词聚合结果临时文件
      */
      */
     public static final String WORD_AGG_RESULT_TEMP_FILE = "长尾词_聚合结果_临时.txt";
     public static final String WORD_AGG_RESULT_TEMP_FILE = "长尾词_聚合结果_临时.txt";
+
+    /**
+     * 长尾词基本信息文件
+     */
+    public static final String WORD_BASIC_INFO_FILE = "长尾词_基本信息.txt";
 }
 }

+ 0 - 18
src/main/java/top/zhixinghe1/money/agg/Test.java

@@ -1,18 +0,0 @@
-package top.zhixinghe1.money.agg;
-
-import org.roaringbitmap.ImmutableBitmapDataProvider;
-import org.roaringbitmap.IntConsumer;
-import org.roaringbitmap.RoaringBitmap;
-import org.roaringbitmap.buffer.MutableRoaringBitmap;
-import org.roaringbitmap.longlong.IntegerUtil;
-
-import java.util.HashMap;
-
-public class Test {
-    public static void main(String[] args) {
-        RoaringBitmap integers = RoaringBitmap.bitmapOf(1, 3, 7, 10);
-        for (Integer integer : integers) {
-            System.out.println(integer);
-        }
-    }
-}

+ 0 - 68
src/main/java/top/zhixinghe1/money/agg/cache/ReverseIndexLoader.java

@@ -1,68 +0,0 @@
-package top.zhixinghe1.money.agg.cache;
-
-import com.github.benmanes.caffeine.cache.CacheLoader;
-import org.checkerframework.checker.nullness.qual.Nullable;
-import org.roaringbitmap.RoaringBitmap;
-import top.zhixinghe1.money.agg.Constant;
-import top.zhixinghe1.money.agg.entity.BufferedRandomAccessFile;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.Map;
-
-public class ReverseIndexLoader implements CacheLoader<String, RoaringBitmap> {
-
-    /**
-     * 数据目录路径
-     */
-    private String dataPath;
-
-    /**
-     * 行索引
-     */
-    private Map<String, long[]> lineIndex;
-
-    /**
-     * 文件读写器
-     */
-//    private BufferedRandomAccessFile randomAccessFile;
-    private RandomAccessFile randomAccessFile;
-
-    private MappedByteBuffer mappedByteBuffer;
-
-    public ReverseIndexLoader(String dataPath, Map<String, long[]> lineIndex) throws IOException {
-        this.dataPath = dataPath;
-        this.lineIndex = lineIndex;
-        String filePath = String.join(File.separator, dataPath, Constant.WORD_REVERSE_INDEX_FILE);
-//        randomAccessFile = new BufferedRandomAccessFile(filePath, "r", 10*1024*1024);
-        randomAccessFile = new RandomAccessFile(filePath, "r");
-        MappedByteBuffer mappedByteBuffer = randomAccessFile.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, Integer.MAX_VALUE);
-    }
-
-    @Override
-    public @Nullable RoaringBitmap load(String s) throws Exception {
-        long[] basicInfo = lineIndex.get(s);
-//        randomAccessFile.seek();
-//        String lineContent = new String(randomAccessFile.readLine().getBytes("8859_1"), "UTF-8");
-//        String lineContent = randomAccessFile.readUTF();
-//        mappedByteBuffer.position((int) basicInfo[0]);
-        byte[] buff = new byte[(int) basicInfo[2]];
-        mappedByteBuffer.put(buff, (int)basicInfo[0], (int)basicInfo[2]);
-        String lineContent = new String(buff);
-
-        String[] split = lineContent.substring(lineContent.indexOf(",") + 1).split(",");
-        int[] positionIntegers = new int[split.length];
-        for (int i = 0; i < split.length; i++) {
-            try {
-                positionIntegers[i] = Integer.parseInt(split[i]);
-            } catch (Exception e) {
-                System.out.println("暂停");
-            }
-        }
-
-        return RoaringBitmap.bitmapOf(positionIntegers);
-    }
-}

+ 0 - 72
src/main/java/top/zhixinghe1/money/agg/cache/WordLoader.java

@@ -1,72 +0,0 @@
-package top.zhixinghe1.money.agg.cache;
-
-import com.github.benmanes.caffeine.cache.CacheLoader;
-import org.checkerframework.checker.nullness.qual.Nullable;
-import top.zhixinghe1.money.agg.Constant;
-import top.zhixinghe1.money.agg.entity.BufferedRandomAccessFile;
-import top.zhixinghe1.money.agg.entity.Word;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-public class WordLoader implements CacheLoader<Integer, Word> {
-
-    /**
-     * 数据目录路径
-     */
-    private String dataPath;
-
-    /**
-     * 行索引
-     */
-    private long[] lineIndex;
-
-    /**
-     * 文件读写器
-     */
-    private BufferedRandomAccessFile randomAccessFile;
-
-    public WordLoader(String dataPath, long[] lineIndex) throws IOException {
-        this.dataPath = dataPath;
-        this.lineIndex = lineIndex;
-        String filePath = String.join(File.separator, dataPath, Constant.WORD_STEM_FILE);
-        randomAccessFile = new BufferedRandomAccessFile(filePath, "r", 10*1024*1024);
-    }
-
-    @Override
-    public @Nullable Word load(Integer integer) throws Exception {
-        randomAccessFile.seek(lineIndex[integer]);
-        String lineContent = new String(randomAccessFile.readLine().getBytes("8859_1"), "UTF-8");
-
-        // 提取关键词和分词
-        int keyPosition = lineContent.indexOf(",");
-        String key = keyPosition == -1 ? lineContent : lineContent.substring(0, keyPosition);
-        Map<CharSequence, Integer> stemMap = Collections.EMPTY_MAP;
-        if (keyPosition != -1 && lineContent.length() != keyPosition + 1) {
-            stemMap = Arrays.asList(lineContent.substring(keyPosition+1).split(","))
-                    .stream().collect(Collectors.toMap(Function.identity(), v -> 1, Integer::sum));
-        }
-
-        return new Word(key, stemMap);
-    }
-
-    public static void main(String[] args) throws Exception {
-        int wordTotalNum = 500000;
-        String dataPath = "E:\\ChenYL\\CodeRepository\\money-mining-python\\data\\test\\长尾词聚合分析_20240131224320";
-        String filePath = String.join(File.separator, dataPath, Constant.WORD_STEM_FILE);
-        BufferedRandomAccessFile randomAccessFile = new BufferedRandomAccessFile(filePath, "r");
-        long[] lineContentIndex = new long[wordTotalNum+1];
-        for (int i = 1; i <= wordTotalNum; i++, randomAccessFile.readLine()) {
-            lineContentIndex[i] = randomAccessFile.getFilePointer();
-        }
-
-        WordLoader wordLoader = new WordLoader(dataPath, lineContentIndex);
-        Word load = wordLoader.load(1000);
-        System.out.println("暂停");
-    }
-}

+ 69 - 101
src/main/java/top/zhixinghe1/money/agg/entity/BufferedRandomAccessFile.java

@@ -5,33 +5,20 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.io.RandomAccessFile;
 import java.util.Arrays;
 import java.util.Arrays;
-import java.util.logging.Logger;
 
 
-/**
- * A <code>BufferedRandomAccessFile</code> is like a
- * <code>RandomAccessFile</code>, but it uses a private buffer so that most
- * operations do not require a disk access.
- * <P>
- *
- * Note: The operations on this class are unmonitored. Also, the correct
- * functioning of the <code>RandomAccessFile</code> methods that are not
- * overridden here relies on the implementation of those methods in the
- * superclass.
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public final class BufferedRandomAccessFile extends RandomAccessFile
-{
+public class BufferedRandomAccessFile extends RandomAccessFile {
     static final int LogBuffSz_ = 16; // 64K buffer
     static final int LogBuffSz_ = 16; // 64K buffer
     public static final int BuffSz_ = (1 << LogBuffSz_);
     public static final int BuffSz_ = (1 << LogBuffSz_);
     static final long BuffMask_ = ~(((long) BuffSz_) - 1L);
     static final long BuffMask_ = ~(((long) BuffSz_) - 1L);
 
 
+    private String path_;
+
     /*
     /*
      * This implementation is based on the buffer implementation in Modula-3's
      * This implementation is based on the buffer implementation in Modula-3's
      * "Rd", "Wr", "RdClass", and "WrClass" interfaces.
      * "Rd", "Wr", "RdClass", and "WrClass" interfaces.
      */
      */
     private boolean dirty_; // true iff unflushed bytes exist
     private boolean dirty_; // true iff unflushed bytes exist
-    private boolean closed_; // true iff the file is closed
+    private boolean syncNeeded_; // dirty_ can be cleared by e.g. seek, so track sync separately
     private long curr_; // current position in file
     private long curr_; // current position in file
     private long lo_, hi_; // bounds on characters in "buff"
     private long lo_, hi_; // bounds on characters in "buff"
     private byte[] buff_; // local buffer
     private byte[] buff_; // local buffer
@@ -97,15 +84,13 @@ public final class BufferedRandomAccessFile extends RandomAccessFile
      * in mode <code>mode</code>, which should be "r" for reading only, or
      * in mode <code>mode</code>, which should be "r" for reading only, or
      * "rw" for reading and writing.
      * "rw" for reading and writing.
      */
      */
-    public BufferedRandomAccessFile(File file, String mode) throws IOException
-    {
-        super(file, mode);
-        this.init(0);
+    public BufferedRandomAccessFile(File file, String mode) throws IOException {
+        this(file, mode, 0);
     }
     }
 
 
-    public BufferedRandomAccessFile(File file, String mode, int size) throws IOException
-    {
+    public BufferedRandomAccessFile(File file, String mode, int size) throws IOException {
         super(file, mode);
         super(file, mode);
+        path_ = file.getAbsolutePath();
         this.init(size);
         this.init(size);
     }
     }
 
 
@@ -114,21 +99,18 @@ public final class BufferedRandomAccessFile extends RandomAccessFile
      * <code>name</code> in mode <code>mode</code>, which should be "r" for
      * <code>name</code> in mode <code>mode</code>, which should be "r" for
      * reading only, or "rw" for reading and writing.
      * reading only, or "rw" for reading and writing.
      */
      */
-    public BufferedRandomAccessFile(String name, String mode) throws IOException
-    {
-        super(name, mode);
-        this.init(0);
+    public BufferedRandomAccessFile(String name, String mode) throws IOException {
+        this(name, mode, 0);
     }
     }
 
 
-    public BufferedRandomAccessFile(String name, String mode, int size) throws FileNotFoundException
-    {
+    public BufferedRandomAccessFile(String name, String mode, int size) throws FileNotFoundException {
         super(name, mode);
         super(name, mode);
+        path_ = name;
         this.init(size);
         this.init(size);
     }
     }
 
 
-    private void init(int size)
-    {
-        this.dirty_ = this.closed_ = false;
+    private void init(int size) {
+        this.dirty_ = false;
         this.lo_ = this.curr_ = this.hi_ = 0;
         this.lo_ = this.curr_ = this.hi_ = 0;
         this.buff_ = (size > BuffSz_) ? new byte[size] : new byte[BuffSz_];
         this.buff_ = (size > BuffSz_) ? new byte[size] : new byte[BuffSz_];
         this.maxHi_ = (long) BuffSz_;
         this.maxHi_ = (long) BuffSz_;
@@ -136,10 +118,27 @@ public final class BufferedRandomAccessFile extends RandomAccessFile
         this.diskPos_ = 0L;
         this.diskPos_ = 0L;
     }
     }
 
 
-    public void close() throws IOException
-    {
+    public String getPath() {
+        return path_;
+    }
+
+    public void sync() throws IOException {
+        if (syncNeeded_) {
+            flush();
+            getChannel().force(true);
+            syncNeeded_ = false;
+        }
+    }
+
+//      public boolean isEOF() throws IOException
+//      {
+//          assert getFilePointer() <= length();
+//          return getFilePointer() == length();
+//      }
+
+    public void close() throws IOException {
         this.flush();
         this.flush();
-        this.closed_ = true;
+        this.buff_ = null;
         super.close();
         super.close();
     }
     }
 
 
@@ -147,16 +146,13 @@ public final class BufferedRandomAccessFile extends RandomAccessFile
      * Flush any bytes in the file's buffer that have not yet been written to
      * Flush any bytes in the file's buffer that have not yet been written to
      * disk. If the file was created read-only, this method is a no-op.
      * disk. If the file was created read-only, this method is a no-op.
      */
      */
-    public void flush() throws IOException
-    {
+    public void flush() throws IOException {
         this.flushBuffer();
         this.flushBuffer();
     }
     }
 
 
     /* Flush any dirty bytes in the buffer to disk. */
     /* Flush any dirty bytes in the buffer to disk. */
-    private void flushBuffer() throws IOException
-    {
-        if (this.dirty_)
-        {
+    private void flushBuffer() throws IOException {
+        if (this.dirty_) {
             if (this.diskPos_ != this.lo_)
             if (this.diskPos_ != this.lo_)
                 super.seek(this.lo_);
                 super.seek(this.lo_);
             int len = (int) (this.curr_ - this.lo_);
             int len = (int) (this.curr_ - this.lo_);
@@ -171,20 +167,17 @@ public final class BufferedRandomAccessFile extends RandomAccessFile
      * number of bytes read. If the return result is less than
      * number of bytes read. If the return result is less than
      * "this.buff.length", then EOF was read.
      * "this.buff.length", then EOF was read.
      */
      */
-    private int fillBuffer() throws IOException
-    {
+    private int fillBuffer() throws IOException {
         int cnt = 0;
         int cnt = 0;
         int rem = this.buff_.length;
         int rem = this.buff_.length;
-        while (rem > 0)
-        {
+        while (rem > 0) {
             int n = super.read(this.buff_, cnt, rem);
             int n = super.read(this.buff_, cnt, rem);
             if (n < 0)
             if (n < 0)
                 break;
                 break;
             cnt += n;
             cnt += n;
             rem -= n;
             rem -= n;
         }
         }
-        if ( (cnt < 0) && (this.hitEOF_ = (cnt < this.buff_.length)) )
-        {
+        if ((cnt < 0) && (this.hitEOF_ = (cnt < this.buff_.length))) {
             // make sure buffer that wasn't read is initialized with -1
             // make sure buffer that wasn't read is initialized with -1
             Arrays.fill(this.buff_, cnt, this.buff_.length, (byte) 0xff);
             Arrays.fill(this.buff_, cnt, this.buff_.length, (byte) 0xff);
         }
         }
@@ -201,27 +194,21 @@ public final class BufferedRandomAccessFile extends RandomAccessFile
      * is at or past the end-of-file, which can only happen if the file was
      * is at or past the end-of-file, which can only happen if the file was
      * opened in read-only mode.
      * opened in read-only mode.
      */
      */
-    public void seek(long pos) throws IOException
-    {
-        if (pos >= this.hi_ || pos < this.lo_)
-        {
+    public void seek(long pos) throws IOException {
+        if (pos >= this.hi_ || pos < this.lo_) {
             // seeking outside of current buffer -- flush and read
             // seeking outside of current buffer -- flush and read
             this.flushBuffer();
             this.flushBuffer();
             this.lo_ = pos & BuffMask_; // start at BuffSz boundary
             this.lo_ = pos & BuffMask_; // start at BuffSz boundary
             this.maxHi_ = this.lo_ + (long) this.buff_.length;
             this.maxHi_ = this.lo_ + (long) this.buff_.length;
-            if (this.diskPos_ != this.lo_)
-            {
+            if (this.diskPos_ != this.lo_) {
                 super.seek(this.lo_);
                 super.seek(this.lo_);
                 this.diskPos_ = this.lo_;
                 this.diskPos_ = this.lo_;
             }
             }
             int n = this.fillBuffer();
             int n = this.fillBuffer();
             this.hi_ = this.lo_ + (long) n;
             this.hi_ = this.lo_ + (long) n;
-        }
-        else
-        {
+        } else {
             // seeking inside current buffer -- no read required
             // seeking inside current buffer -- no read required
-            if (pos < this.curr_)
-            {
+            if (pos < this.curr_) {
                 // if seeking backwards, we must flush to maintain V4
                 // if seeking backwards, we must flush to maintain V4
                 this.flushBuffer();
                 this.flushBuffer();
             }
             }
@@ -229,20 +216,17 @@ public final class BufferedRandomAccessFile extends RandomAccessFile
         this.curr_ = pos;
         this.curr_ = pos;
     }
     }
 
 
-    public long getFilePointer()
-    {
+    public long getFilePointer() {
         return this.curr_;
         return this.curr_;
     }
     }
 
 
-    public long length() throws IOException
-    {
+    public long length() throws IOException {
+        // max accounts for the case where we have written past the old file length, but not yet flushed our buffer
         return Math.max(this.curr_, super.length());
         return Math.max(this.curr_, super.length());
     }
     }
 
 
-    public int read() throws IOException
-    {
-        if (this.curr_ >= this.hi_)
-        {
+    public int read() throws IOException {
+        if (this.curr_ >= this.hi_) {
             // test for EOF
             // test for EOF
             // if (this.hi < this.maxHi) return -1;
             // if (this.hi < this.maxHi) return -1;
             if (this.hitEOF_)
             if (this.hitEOF_)
@@ -258,15 +242,12 @@ public final class BufferedRandomAccessFile extends RandomAccessFile
         return ((int) res) & 0xFF; // convert byte -> int
         return ((int) res) & 0xFF; // convert byte -> int
     }
     }
 
 
-    public int read(byte[] b) throws IOException
-    {
+    public int read(byte[] b) throws IOException {
         return this.read(b, 0, b.length);
         return this.read(b, 0, b.length);
     }
     }
 
 
-    public int read(byte[] b, int off, int len) throws IOException
-    {
-        if (this.curr_ >= this.hi_)
-        {
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (this.curr_ >= this.hi_) {
             // test for EOF
             // test for EOF
             // if (this.hi < this.maxHi) return -1;
             // if (this.hi < this.maxHi) return -1;
             if (this.hitEOF_)
             if (this.hitEOF_)
@@ -284,21 +265,15 @@ public final class BufferedRandomAccessFile extends RandomAccessFile
         return len;
         return len;
     }
     }
 
 
-    public void write(int b) throws IOException
-    {
-        if (this.curr_ >= this.hi_)
-        {
-            if (this.hitEOF_ && this.hi_ < this.maxHi_)
-            {
+    public void write(int b) throws IOException {
+        if (this.curr_ >= this.hi_) {
+            if (this.hitEOF_ && this.hi_ < this.maxHi_) {
                 // at EOF -- bump "hi"
                 // at EOF -- bump "hi"
                 this.hi_++;
                 this.hi_++;
-            }
-            else
-            {
+            } else {
                 // slow path -- write current buffer; read next one
                 // slow path -- write current buffer; read next one
                 this.seek(this.curr_);
                 this.seek(this.curr_);
-                if (this.curr_ == this.hi_)
-                {
+                if (this.curr_ == this.hi_) {
                     // appending to EOF -- bump "hi"
                     // appending to EOF -- bump "hi"
                     this.hi_++;
                     this.hi_++;
                 }
                 }
@@ -307,21 +282,20 @@ public final class BufferedRandomAccessFile extends RandomAccessFile
         this.buff_[(int) (this.curr_ - this.lo_)] = (byte) b;
         this.buff_[(int) (this.curr_ - this.lo_)] = (byte) b;
         this.curr_++;
         this.curr_++;
         this.dirty_ = true;
         this.dirty_ = true;
+        syncNeeded_ = true;
     }
     }
 
 
-    public void write(byte[] b) throws IOException
-    {
+    public void write(byte[] b) throws IOException {
         this.write(b, 0, b.length);
         this.write(b, 0, b.length);
     }
     }
 
 
-    public void write(byte[] b, int off, int len) throws IOException
-    {
-        while (len > 0)
-        {
+    public void write(byte[] b, int off, int len) throws IOException {
+        while (len > 0) {
             int n = this.writeAtMost(b, off, len);
             int n = this.writeAtMost(b, off, len);
             off += n;
             off += n;
             len -= n;
             len -= n;
             this.dirty_ = true;
             this.dirty_ = true;
+            syncNeeded_ = true;
         }
         }
     }
     }
 
 
@@ -329,21 +303,15 @@ public final class BufferedRandomAccessFile extends RandomAccessFile
      * Write at most "len" bytes to "b" starting at position "off", and return
      * Write at most "len" bytes to "b" starting at position "off", and return
      * the number of bytes written.
      * the number of bytes written.
      */
      */
-    private int writeAtMost(byte[] b, int off, int len) throws IOException
-    {
-        if (this.curr_ >= this.hi_)
-        {
-            if (this.hitEOF_ && this.hi_ < this.maxHi_)
-            {
+    private int writeAtMost(byte[] b, int off, int len) throws IOException {
+        if (this.curr_ >= this.hi_) {
+            if (this.hitEOF_ && this.hi_ < this.maxHi_) {
                 // at EOF -- bump "hi"
                 // at EOF -- bump "hi"
                 this.hi_ = this.maxHi_;
                 this.hi_ = this.maxHi_;
-            }
-            else
-            {
+            } else {
                 // slow path -- write current buffer; read next one
                 // slow path -- write current buffer; read next one
                 this.seek(this.curr_);
                 this.seek(this.curr_);
-                if (this.curr_ == this.hi_)
-                {
+                if (this.curr_ == this.hi_) {
                     // appending to EOF -- bump "hi"
                     // appending to EOF -- bump "hi"
                     this.hi_ = this.maxHi_;
                     this.hi_ = this.maxHi_;
                 }
                 }
@@ -355,4 +323,4 @@ public final class BufferedRandomAccessFile extends RandomAccessFile
         this.curr_ += len;
         this.curr_ += len;
         return len;
         return len;
     }
     }
-}
+}

+ 127 - 0
src/main/java/top/zhixinghe1/money/agg/entity/CalInfo.java

@@ -0,0 +1,127 @@
+package top.zhixinghe1.money.agg.entity;
+
+import top.zhixinghe1.money.agg.Constant;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+
+/**
+ * 计算任务基础信息
+ */
+public class CalInfo {
+
+    /**
+     * 数据路径
+     */
+    private String dataDirPath;
+
+    /**
+     * 长尾词+分词 文件位置
+     */
+    private String wordStemFilePath;
+
+    /**
+     * 分词找长尾词倒排索引 文件位置
+     */
+    private String reverseIndexFilePath;
+
+    /**
+     * 任务总数
+     */
+    private Integer taskTotalNum;
+
+    /**
+     * 长尾词总数
+     */
+    private Integer wordTotalNum;
+
+    /**
+     * 倒排索引总数
+     */
+    private Integer reverseIndexTotalNum;
+
+    /**
+     * 缓存最大数量
+     */
+    public Integer cacheMaximumSize = 50 * 10000;
+
+    /**
+     * 聚合阈值
+     */
+    public BigDecimal aggThreshold = new BigDecimal("0.8");
+
+    /**
+     * 聚合结果阈值
+     */
+    public Integer aggResultThreshold = 10;
+
+    /**
+     * 每个线程计算任务数量
+     */
+    private Integer perTaskNum = 10000;
+
+    public CalInfo(String dataDirPath) throws IOException {
+        this.dataDirPath = dataDirPath;
+        this.wordStemFilePath = String.join(File.separator, dataDirPath, Constant.WORD_STEM_FILE);
+        this.reverseIndexFilePath = String.join(File.separator, dataDirPath, Constant.WORD_REVERSE_INDEX_FILE);
+        // 加载基本信息
+        String basicInfoFilePath = String.join(File.separator, dataDirPath, Constant.WORD_BASIC_INFO_FILE);
+        try (BufferedRandomAccessFile bufferedRandomAccessFile = new BufferedRandomAccessFile(basicInfoFilePath, "r");) {
+            String line;
+            while ((line = bufferedRandomAccessFile.readLine()) != null) {
+                line = new String(line.getBytes("8859_1"), "UTF-8");
+                if (line.startsWith("长尾词总数")) {
+                    this.wordTotalNum = Integer.parseInt(line.split(":")[1]);
+                    continue;
+                }
+                if (line.startsWith("倒排索引总数")) {
+                    this.reverseIndexTotalNum = Integer.parseInt(line.split(":")[1]);
+                    continue;
+                }
+            }
+
+        }
+        this.taskTotalNum = this.wordTotalNum;
+    }
+
+    public Integer getTaskTotalNum() {
+        return taskTotalNum;
+    }
+
+    public Integer getWordTotalNum() {
+        return wordTotalNum;
+    }
+
+    public Integer getReverseIndexTotalNum() {
+        return reverseIndexTotalNum;
+    }
+
+    public String getDataDirPath() {
+        return dataDirPath;
+    }
+
+    public Integer getCacheMaximumSize() {
+        return cacheMaximumSize;
+    }
+
+    public BigDecimal getAggThreshold() {
+        return aggThreshold;
+    }
+
+    public Integer getAggResultThreshold() {
+        return aggResultThreshold;
+    }
+
+    public Integer getPerTaskNum() {
+        return perTaskNum;
+    }
+
+    public String getWordStemFilePath() {
+        return wordStemFilePath;
+    }
+
+    public String getReverseIndexFilePath() {
+        return reverseIndexFilePath;
+    }
+}

+ 257 - 0
src/main/java/top/zhixinghe1/money/agg/entity/CalResource.java

@@ -0,0 +1,257 @@
+package top.zhixinghe1.money.agg.entity;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.roaringbitmap.RoaringBitmap;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * 计算资源
+ */
+public class CalResource {
+
+    /**
+     * 计算任务基础信息
+     */
+    private CalInfo calInfo;
+
+    /**
+     * 记录已计算长尾词位图
+     */
+    private RoaringBitmap usedBitmap = new RoaringBitmap();
+
+    /**
+     * 长尾词+分词 位置索引
+     */
+    private long[] wordIndexArr;
+
+    /**
+     * 长尾词+分词 缓存
+     */
+    private Cache<Integer, Word> wordCache;
+
+    /**
+     * 分词找长尾词 倒排索引
+     */
+    private ConcurrentHashMap<String, Long> reverseIndexMap;
+
+    /**
+     * 分词找长尾词 缓存
+     */
+    private Cache<String, RoaringBitmap> reverseIndexCache;
+
+    /**
+     * 计算结果 队列
+     */
+    private LinkedBlockingQueue<CalResult> queue = new LinkedBlockingQueue();
+
+    /**
+     * 读写锁
+     */
+    private ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
+
+    /**
+     * 写锁
+     */
+    private ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
+
+    /**
+     * 长尾词+分词 随机读写
+     */
+    private ThreadLocal<BufferedRandomAccessFile> localWordRandomAccessFile = new ThreadLocal();
+
+    /**
+     * 分词找长尾词 随机读写
+     */
+    private ThreadLocal<BufferedRandomAccessFile> localReverseIndexRandomAccessFile = new ThreadLocal();
+
+
+
+    public CalResource(CalInfo calInfo) throws IOException {
+        this.calInfo = calInfo;
+        loadIndex(calInfo);
+    }
+
+    /**
+     * 检测长尾词计算状态
+     * @param pos
+     * @return
+     */
+    public boolean checkAndSetCalStatus(int pos) {
+        try {
+            writeLock.lock();
+            return usedBitmap.checkedAdd(pos);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    /**
+     * 设置长尾词计算状态
+     * @param pos
+     * @return
+     */
+    public void setCalStatus(int pos) {
+        try {
+            writeLock.lock();
+            usedBitmap.add(pos);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    /**
+     * 获取长尾词
+     * @param pos
+     * @return
+     */
+    public Word getWord(int pos) {
+        return wordCache.get(pos, integer -> this.loadWord(integer));
+    }
+
+    /**
+     * 获取分词倒排索引
+     * @param stem
+     * @return
+     */
+    public RoaringBitmap getWordBitmap(String stem) {
+        return reverseIndexCache.get(stem, s -> this.loadReverseIndex(s));
+    }
+
+    /**
+     * 构造计算任务公共信息
+     * @return
+     */
+    private void loadIndex(CalInfo calInfo) throws IOException {
+        // 创建关键词位置缓存
+        wordIndexArr = buildWordIndexArr(calInfo.getWordTotalNum(), calInfo.getWordStemFilePath());
+        wordCache = Caffeine.newBuilder().maximumSize(calInfo.cacheMaximumSize).build();
+
+        // 创建倒排索引缓存
+        reverseIndexMap = buildReverseIndexMap(calInfo.getReverseIndexTotalNum(), calInfo.getReverseIndexFilePath());
+        reverseIndexCache = Caffeine.newBuilder().maximumSize(calInfo.cacheMaximumSize).build();
+    }
+
+    /**
+     * 构建关键词位置索引
+     * @param wordTotalNum
+     * @param filePath
+     * @return
+     * @throws IOException
+     */
+    private long[] buildWordIndexArr(int wordTotalNum, String filePath) throws IOException {
+        BufferedRandomAccessFile randomAccessFile = new BufferedRandomAccessFile(filePath, "r");
+        long[] lineContentIndex = new long[wordTotalNum+1];
+        for (int i = 1; i <= wordTotalNum; i++, randomAccessFile.readLine()) {
+            lineContentIndex[i] = randomAccessFile.getFilePointer();
+        }
+        return lineContentIndex;
+    }
+
+    /**
+     * 构建倒排索引位置索引
+     * @param reverseIndexTotalNum
+     * @param filePath
+     * @return
+     * @throws IOException
+     */
+    private ConcurrentHashMap<String, Long> buildReverseIndexMap(int reverseIndexTotalNum, String filePath) throws IOException {
+        BufferedRandomAccessFile randomAccessFile = new BufferedRandomAccessFile(filePath, "r");
+        ConcurrentHashMap<String, Long> lineContentIndexMap = new ConcurrentHashMap<>(reverseIndexTotalNum);
+        String line = null;
+        long position = 0;
+        for (int i = 1; i <= reverseIndexTotalNum; i++) {
+            try {
+                position = randomAccessFile.getFilePointer();
+                line = randomAccessFile.readLine();
+                line = new String(line.getBytes("8859_1"), "UTF-8");
+                String stem = line.substring(0, line.indexOf(","));
+                lineContentIndexMap.put(stem, position);
+            } catch (Exception e ) {
+                System.out.println("异常:"+ i);
+            }
+
+        }
+        return lineContentIndexMap;
+    }
+
+    private Word loadWord(Integer integer) {
+        try {
+            BufferedRandomAccessFile bufferedRandomAccessFile = localWordRandomAccessFile.get();
+            if (Objects.isNull(bufferedRandomAccessFile)) {
+                bufferedRandomAccessFile = new BufferedRandomAccessFile(calInfo.getWordStemFilePath(), "r");
+                localWordRandomAccessFile.set(bufferedRandomAccessFile);
+            }
+            bufferedRandomAccessFile.seek(wordIndexArr[integer]);
+            String content = bufferedRandomAccessFile.readLine();
+            if (content == null) {
+                System.out.println("暂停");
+            }
+            String lineContent = new String(content.getBytes("8859_1"), "UTF-8");
+
+            // 提取关键词和分词
+            int keyPosition = lineContent.indexOf(",");
+            String key = keyPosition == -1 ? lineContent : lineContent.substring(0, keyPosition);
+            Map<CharSequence, Integer> stemMap = Collections.EMPTY_MAP;
+            if (keyPosition != -1 && lineContent.length() != keyPosition + 1) {
+                stemMap = Arrays.asList(lineContent.substring(keyPosition+1).split(","))
+                        .stream().collect(Collectors.toMap(Function.identity(), v -> 1, Integer::sum));
+            }
+
+            return new Word(key, stemMap);
+        } catch (IOException e) {
+            throw new RuntimeException("关键词:"+integer, e);
+        }
+    }
+
+    public RoaringBitmap loadReverseIndex(String s) {
+        try {
+            BufferedRandomAccessFile bufferedRandomAccessFile = localReverseIndexRandomAccessFile.get();
+            if (Objects.isNull(bufferedRandomAccessFile)) {
+                bufferedRandomAccessFile = new BufferedRandomAccessFile(calInfo.getReverseIndexFilePath(), "r");
+                localReverseIndexRandomAccessFile.set(bufferedRandomAccessFile);
+            }
+            bufferedRandomAccessFile.seek(reverseIndexMap.get(s));
+            String content = bufferedRandomAccessFile.readLine();
+            if (content == null) {
+                System.out.println("暂停");
+            }
+            String lineContent = new String(content.getBytes("8859_1"), "UTF-8");
+
+            String[] split = lineContent.substring(lineContent.indexOf(",") + 1).split(",");
+            int[] positionIntegers = new int[split.length];
+            for (int i = 0; i < split.length; i++) {
+                positionIntegers[i] = Integer.parseInt(split[i]);
+            }
+            return RoaringBitmap.bitmapOf(positionIntegers);
+        } catch (IOException e) {
+            throw new RuntimeException("关键词:"+s, e);
+        }
+    }
+
+    public LinkedBlockingQueue<CalResult> getQueue() {
+        return queue;
+    }
+
+    public RoaringBitmap getUsedBitmap() {
+        return usedBitmap;
+    }
+
+    public ThreadLocal<BufferedRandomAccessFile> getLocalWordRandomAccessFile() {
+        return localWordRandomAccessFile;
+    }
+
+    public ThreadLocal<BufferedRandomAccessFile> getLocalReverseIndexRandomAccessFile() {
+        return localReverseIndexRandomAccessFile;
+    }
+}

+ 12 - 0
src/main/java/top/zhixinghe1/money/agg/entity/CalResult.java

@@ -2,12 +2,24 @@ package top.zhixinghe1.money.agg.entity;
 
 
 import java.util.List;
 import java.util.List;
 
 
+/**
+ * 计算结果
+ */
 public class CalResult {
 public class CalResult {
 
 
+    /**
+     * 结束标志位
+     */
     private boolean endStatus;
     private boolean endStatus;
 
 
+    /**
+     * 聚合结果标志位
+     */
     private boolean aggStatus;
     private boolean aggStatus;
 
 
+    /**
+     * 聚合结果
+     */
     private List<String> similarWords;
     private List<String> similarWords;
 
 
     public CalResult(boolean aggStatus, List<String> similarWords) {
     public CalResult(boolean aggStatus, List<String> similarWords) {

+ 24 - 0
src/main/java/top/zhixinghe1/money/agg/entity/CalTask.java

@@ -2,6 +2,8 @@ package top.zhixinghe1.money.agg.entity;
 
 
 import java.io.Serial;
 import java.io.Serial;
 import java.io.Serializable;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
 
 
 /**
 /**
  * 计算任务边界(左右闭区间)
  * 计算任务边界(左右闭区间)
@@ -19,6 +21,28 @@ public class CalTask implements Serializable {
         this.endPos = endPos;
         this.endPos = endPos;
     }
     }
 
 
+    /**
+     * 划分计算任务
+     * @param total
+     * @param internal
+     * @return
+     */
+    public static List<CalTask> avgSplitTask(int total, int internal) {
+        // 分割的任务份数
+        int taskNum = (int) Math.ceil((double) total / internal);
+        // 平分
+        List<CalTask> calTasks = new ArrayList<>();
+        for (int i = 0; i < taskNum; i++) {
+            int start = i * internal + 1;
+            int end = i * internal + internal;
+            if (end > total) {
+                end = total;
+            }
+            calTasks.add(new CalTask(start, end));
+        }
+        return calTasks;
+    }
+
     public int getStartPos() {
     public int getStartPos() {
         return startPos;
         return startPos;
     }
     }