Преглед на файлове

feat:聚合计算初版

ChenYL преди 2 години
родител
ревизия
bc5a2582d4

+ 17 - 12
pom.xml

@@ -9,8 +9,8 @@
     <version>1.0-SNAPSHOT</version>
 
     <properties>
-        <maven.compiler.source>21</maven.compiler.source>
-        <maven.compiler.target>21</maven.compiler.target>
+        <maven.compiler.source>17</maven.compiler.source>
+        <maven.compiler.target>17</maven.compiler.target>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     </properties>
 
@@ -27,11 +27,6 @@
             <artifactId>progressbar</artifactId>
             <version>0.10.0</version>
         </dependency>
-        <dependency>
-            <groupId>redis.clients</groupId>
-            <artifactId>jedis</artifactId>
-            <version>5.1.0</version>
-        </dependency>
     </dependencies>
 
     <!-- 配置阿里云仓库 -->
@@ -62,22 +57,32 @@
 
     <build>
         <plugins>
-            <!-- 4、指定启动类,指定配置文件,将依赖打成外部jar包 -->
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-jar-plugin</artifactId>
+                <artifactId>maven-assembly-plugin</artifactId>
                 <configuration>
+                    <descriptorRefs>
+                        <!--给jar包起的别名-->
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
                     <archive>
                         <manifest>
-                            <!-- 是否要把第三方jar加入到类构建路径 -->
                             <addClasspath>true</addClasspath>
-                            <!-- 外部依赖jar包的最终位置 -->
                             <classpathPrefix>lib/</classpathPrefix>
-                            <!-- 项目启动类 -->
+                            <!--添加项目中主类-->
                             <mainClass>top.zhixinghe1.money.AggApplication</mainClass>
                         </manifest>
                     </archive>
                 </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
             </plugin>
         </plugins>
     </build>

+ 190 - 65
src/main/java/top/zhixinghe1/money/AggApplication.java

@@ -1,19 +1,31 @@
 package top.zhixinghe1.money;
 
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
+import org.apache.commons.lang3.StringUtils;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileNotFoundException;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.IOException;
-import java.io.Reader;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -23,66 +35,179 @@ import java.util.stream.Collectors;
  */
 public class AggApplication {
 
-//    public static void main(String[] args) {
-////        String dataDirPath = "D:\\Documents\\ChenYL\\CodeRepository\\money-mining-python\\data";
-//        String dataDirPath = "D:\\Documents\\ChenYL\\CodeRepository\\money-mining-python\\data\\test";
-//
-//        // 判断传入路径是否有效
-//        File dataDir = new File(dataDirPath);
-//        if (!dataDir.exists() || !dataDir.isDirectory()) {
-//            System.out.println(String.format("数据目录路径不存在,%s", dataDirPath));
-//            return;
-//        }
-//
-//        // 判断关键资源文件是否存在
-//        List<String> fileNameList = Arrays.asList("长尾词_合并_分词.txt", "长尾词_合并_聚合.txt", "长尾词_合并.txt", "长尾词_合并_倒排索引.txt");
-//        for (String fileName : fileNameList) {
-//            String resFilePath = String.join(File.separator, dataDirPath, fileName);
-//            File resfile = new File(resFilePath);
-//            if (!resfile.exists() || !resfile.isFile()) {
-//                System.out.println(String.format("文件不存在!文件路径:%s", resFilePath));
-//                return;
-//            }
-//        }
-//
-//        // 归档历史数据文件
-//        File[] files = dataDir.listFiles();
-//        Pattern aggFilePattern = Pattern.compile("长尾词_合并_聚合_\\d+_\\d+.txt");
-//        List<File> historyAggFile = Arrays.stream(files).filter(file -> {
-//            Matcher matcher = aggFilePattern.matcher(file.getName());
-//            return matcher.find();
-//        }).collect(Collectors.toList());
-//        if (Objects.nonNull(historyAggFile) || historyAggFile.size() > 0) {
-//            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
-//            String archivePath = String.join(File.separator, dataDirPath, String.format("长尾词_聚合_归档_%s", sdf.format(new Date())));
-//            File archiveDir = new File(archivePath);
-//            archiveDir.mkdirs();
-//            for (File historyFile : historyAggFile) {
-//                String destPath = String.join(File.separator, archivePath, historyFile.getName());
-//                System.out.println(destPath);
-//                historyFile.renameTo(new File(destPath));
-//            }
-//        }
-//
-//        JedisPool pool = new JedisPool("127.0.0.1", 6379);
-//        try (Jedis jedis = pool.getResource()) {
-//            try (FileReader reader = new FileReader(String.join(File.separator, dataDirPath, "长尾词_合并_分词.txt"));
-//                 BufferedReader br = new BufferedReader(reader)) {
-//                String line = null;
-//                while ((line = br.readLine()) != null) {
-//                    System.out.println(line);
-//                    System.out.println("暂停");
-//                }
-//
-//            } catch (IOException e) {
-//                throw new RuntimeException(e);
-//            }
-//        }
-//    }
-
-    public static void main(String[] args) throws InterruptedException {
-        System.out.println("hello python start");
-        Thread.sleep(5000);
-        System.out.println("hello python end");
+    private static final int perTaskNum = 10000;
+
+    private static final Map<Integer, Word> wordCache = new HashMap();
+
+    private static final Map<String, Set<Integer>> indexCache = new HashMap();
+
+    private static BitSet bitmap = null;
+
+    private static final Pattern aggFilePattern = Pattern.compile("长尾词_合并_聚合_\\d+_\\d+.txt");
+
+    public static void main(String[] args) throws IOException, InterruptedException {
+
+//        String dataDirPath = args[0];
+        String dataDirPath = "E:\\ChenYL\\CodeRepository\\money-mining-python\\data\\test";
+        if (StringUtils.isBlank(dataDirPath)) {
+            System.out.println("没有输入目标数据路径");
+            return;
+        }
+
+        // 判断传入路径是否有效
+        File dataDir = new File(dataDirPath);
+        if (!dataDir.exists() || !dataDir.isDirectory()) {
+            System.out.println(String.format("数据目录路径不存在,%s", dataDirPath));
+            return;
+        }
+
+        // 判断关键资源文件是否存在
+        List<String> fileNameList = Arrays.asList("长尾词_合并_分词.txt", "长尾词_合并.txt", "长尾词_合并_倒排索引.txt");
+        for (String fileName : fileNameList) {
+            String resFilePath = String.join(File.separator, dataDirPath, fileName);
+            File resfile = new File(resFilePath);
+            if (!resfile.exists() || !resfile.isFile()) {
+                System.out.println(String.format("文件不存在!文件路径:%s", resFilePath));
+                return;
+            }
+        }
+
+        // 归档历史数据文件
+        File[] files = dataDir.listFiles();
+        List<File> historyAggFile = Arrays.stream(files).filter(file -> {
+            Matcher matcher = aggFilePattern.matcher(file.getName());
+            return matcher.find();
+        }).collect(Collectors.toList());
+        if (Objects.nonNull(historyAggFile) || historyAggFile.size() > 0) {
+            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
+            String archivePath = String.join(File.separator, dataDirPath, String.format("长尾词_聚合_归档_%s", sdf.format(new Date())));
+            File archiveDir = new File(archivePath);
+            archiveDir.mkdirs();
+            for (File historyFile : historyAggFile) {
+                String destPath = String.join(File.separator, archivePath, historyFile.getName());
+                historyFile.renameTo(new File(destPath));
+            }
+        }
+
+        int totalWord = 0;
+
+        Pattern pattern = Pattern.compile("([^,]+)");
+
+        // 构造关键词缓存
+        try (FileReader reader = new FileReader(String.join(File.separator, dataDirPath, "长尾词_合并_分词.txt"));
+             BufferedReader br = new BufferedReader(reader)) {
+            String line = null;
+            while ((line = br.readLine()) != null) {
+                if (StringUtils.isBlank(line)) {
+                    continue;
+                }
+
+                // 记录总文本数
+                totalWord ++;
+
+                // 提取关键词和分词
+                Matcher matcher = pattern.matcher(line);
+                if (!matcher.find()) {
+                    continue;
+                }
+                String key = matcher.group();
+                if (StringUtils.isBlank(key)) {
+                    continue;
+                }
+
+                List<String> stems = new ArrayList<>();
+                while (matcher.find()) {
+                    String stem = matcher.group();
+                    if (StringUtils.isBlank(stem)) {
+                        continue;
+                    }
+                    stems.add(stem);
+                }
+                Map<CharSequence, Integer> stemMap = stems.stream().collect(Collectors.toMap(Function.identity(), v -> 1, Integer::sum));
+
+                wordCache.put(totalWord, new Word(key, stemMap));
+            }
+        }
+
+        // 构建倒排索引缓存
+        try (FileReader reader = new FileReader(String.join(File.separator, dataDirPath, "长尾词_合并_倒排索引.txt"));
+             BufferedReader br = new BufferedReader(reader)) {
+            String line = null;
+            while ((line = br.readLine()) != null) {
+                if (StringUtils.isBlank(line)) {
+                    continue;
+                }
+
+                // 提取关键词和分词
+                Matcher matcher = pattern.matcher(line);
+                if (!matcher.find()) {
+                    continue;
+                }
+                String stem = matcher.group();
+                if (StringUtils.isBlank(stem)) {
+                    continue;
+                }
+
+                Set<Integer> positions = new CopyOnWriteArraySet<>();
+                while (matcher.find()) {
+                    String position = matcher.group();
+                    if (StringUtils.isBlank(position)) {
+                        continue;
+                    }
+                    positions.add(Integer.valueOf(position));
+                }
+
+                indexCache.put(stem, positions);
+            }
+        }
+
+        // 初始化已处理位图
+        bitmap = new BitSet(totalWord+1);
+
+        // 分割计算任务
+        List<CalTask> calTasks = avgSplitTask(totalWord, perTaskNum);
+
+        // 提交任务
+        ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+//        ExecutorService executorService = Executors.newFixedThreadPool(1);
+        for (CalTask calTask : calTasks) {
+            executorService.submit(new CalRunable(calTask.getStartPos(), calTask.getEndPos(), dataDirPath, wordCache, indexCache, bitmap));
+        }
+
+        // 等待任务执行完成
+        executorService.awaitTermination(12, TimeUnit.HOURS);
+
+        // 合并计算结果
+        List<File> aggResultFiles = Arrays.stream(dataDir.listFiles()).filter(file -> aggFilePattern.matcher(file.getName()).find()).collect(Collectors.toList());
+        if (aggResultFiles.size() == 0) {
+            System.out.println("没有找到任何计算分结果,任务结束");
+            return;
+        }
+        String aggFilePath = String.join(File.separator, dataDirPath, "长尾词_合并_聚合.txt");
+        try (FileOutputStream fileOutputStream = new FileOutputStream(aggFilePath);
+             BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream)) {
+            for (File aggResultFile : aggResultFiles) {
+                try (FileInputStream fileInputStream = new FileInputStream(aggResultFile);
+                     BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream)) {
+                    bufferedOutputStream.write(bufferedInputStream.readAllBytes());
+                }
+            }
+        }
+    }
+
+    private static List<CalTask> avgSplitTask(int total, int internal) {
+        // 分割的任务份数
+        int taskNum = (int) Math.ceil((double) total / internal);
+        // 平分
+        List<CalTask> calTasks = new ArrayList<>();
+        for (int i = 0; i < taskNum; i++) {
+            int start = i * internal + 1;
+            int end = i * internal + internal;
+            if (end > total) {
+                end = total;
+            }
+            calTasks.add(new CalTask(start, end));
+        }
+        return calTasks;
     }
 }

+ 0 - 11
src/main/java/top/zhixinghe1/money/AggCalConsumer.java

@@ -1,11 +0,0 @@
-package top.zhixinghe1.money;
-
-/**
- * 文本聚合 计算
- */
-public class AggCalConsumer implements Runnable{
-    @Override
-    public void run() {
-
-    }
-}

+ 0 - 11
src/main/java/top/zhixinghe1/money/AggCalProducer.java

@@ -1,11 +0,0 @@
-package top.zhixinghe1.money;
-
-/**
- * 文本聚合 发起
- */
-public class AggCalProducer implements Runnable{
-    @Override
-    public void run() {
-
-    }
-}

+ 0 - 11
src/main/java/top/zhixinghe1/money/AggResultWriter.java

@@ -1,11 +0,0 @@
-package top.zhixinghe1.money;
-
-/**
- * 文本聚合 结果存储
- */
-public class AggResultWriter implements Runnable{
-    @Override
-    public void run() {
-
-    }
-}

+ 123 - 0
src/main/java/top/zhixinghe1/money/CalRunable.java

@@ -0,0 +1,123 @@
+package top.zhixinghe1.money;
+
+import me.tongfei.progressbar.ProgressBar;
+import org.apache.commons.text.similarity.CosineSimilarity;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class CalRunable implements Runnable {
+
+    private int start;
+
+    private int end;
+
+    private String dataDirPath;
+
+    private BufferedWriter writer;
+
+    private Map<Integer, Word> wordCache = new HashMap();
+
+    private Map<String, Set<Integer>> indexCache = new HashMap();
+
+    private BitSet bitmap = null;
+
+    private static final ThreadLocal<BufferedWriter> threadLocal = new ThreadLocal();
+
+    private CosineSimilarity cosineSimilarity = new CosineSimilarity();
+
+    private Double aggThreshold = 0.8;
+
+    public CalRunable(int start, int end, String dataDirPath, Map<Integer, Word> wordCache, Map<String, Set<Integer>> indexCache, BitSet bitmap) {
+        this.start = start;
+        this.end = end;
+        this.dataDirPath = dataDirPath;
+        this.wordCache = wordCache;
+        this.indexCache = indexCache;
+        this.bitmap = bitmap;
+    }
+
+    @Override
+    public void run() {
+        try {
+            BufferedWriter bufferedWriter = threadLocal.get();
+            if (Objects.isNull(bufferedWriter)) {
+                String aggFilePath = String.join(File.separator, dataDirPath, String.format("长尾词_合并_聚合_%s.txt", Thread.currentThread().getId()));
+                try {
+                    FileWriter fileWriter = new FileWriter(new File(aggFilePath));
+                    bufferedWriter = new BufferedWriter(fileWriter);
+                    threadLocal.set(bufferedWriter);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+            Set<Integer> indexSet = new HashSet<>();
+            List<String> result = new ArrayList<>();
+            try (ProgressBar pb = new ProgressBar(String.format("线程-%s 文本聚合计算", Thread.currentThread().getId()), end-start+1+1)) {
+                for (int i = start; i <= end; i++) {
+                    // 更新发呆
+                    pb.step();
+
+                    if (bitmap.get(i)) {
+                        continue;
+                    }
+
+                    Word word = wordCache.get(i);
+                    if (Objects.isNull(word.getStemMap()) || word.getStemMap().size() == 0) {
+                        continue;
+                    }
+                    bitmap.set(i, true);
+                    result.add(word.getKey());
+
+                    for (CharSequence stem : word.getStemMap().keySet()) {
+                        Set<Integer> positions = indexCache.get(stem);
+                        for (Integer position : positions) {
+                            if (bitmap.get(position)) {
+                                positions.remove(position);
+                            } else {
+                                indexSet.add(position);
+                            }
+                        }
+                    }
+
+                    for (Integer index : indexSet) {
+                        Word candicateWord = wordCache.get(index);
+                        if (Objects.isNull(candicateWord.getStemMap())) {
+                            continue;
+                        }
+
+                        Double v = cosineSimilarity.cosineSimilarity(word.getStemMap(), candicateWord.getStemMap());
+                        if (v < aggThreshold) {
+                            continue;
+                        }
+                        result.add(candicateWord.getKey());
+                    }
+
+                    // 输出计算结果
+                    if (result.size() == 1) {
+                        continue;
+                    }
+                    for (String s : result) {
+                        bufferedWriter.write(s);
+                        bufferedWriter.write("\n");
+                    }
+                    bufferedWriter.write("\n");
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+}

+ 34 - 0
src/main/java/top/zhixinghe1/money/CalTask.java

@@ -0,0 +1,34 @@
+package top.zhixinghe1.money;
+
+import java.io.Serial;
+import java.io.Serializable;
+
+public class CalTask implements Serializable {
+    @Serial
+    private static final long serialVersionUID = 6711062995204035815L;
+
+    private int startPos;
+
+    private int endPos;
+
+    public CalTask(int startPos, int endPos) {
+        this.startPos = startPos;
+        this.endPos = endPos;
+    }
+
+    public int getStartPos() {
+        return startPos;
+    }
+
+    public void setStartPos(int startPos) {
+        this.startPos = startPos;
+    }
+
+    public int getEndPos() {
+        return endPos;
+    }
+
+    public void setEndPos(int endPos) {
+        this.endPos = endPos;
+    }
+}

+ 26 - 47
src/main/java/top/zhixinghe1/money/Test.java

@@ -1,31 +1,10 @@
 package top.zhixinghe1.money;
 
 import me.tongfei.progressbar.ProgressBar;
-import org.apache.commons.text.similarity.CosineSimilarity;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
-import redis.clients.jedis.Pipeline;
-import redis.clients.jedis.Response;
 
-import java.io.File;
-import java.io.Reader;
-import java.text.SimpleDateFormat;
-import java.time.LocalTime;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
-import java.util.Date;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.logging.SimpleFormatter;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 
 public class Test {
     public static void main(String[] args) throws InterruptedException {
@@ -47,8 +26,8 @@ public class Test {
 //            }
 //        }
 
-        JedisPool pool = new JedisPool("127.0.0.1", 6379);
-        try (Jedis jedis = pool.getResource()) {
+//        JedisPool pool = new JedisPool("127.0.0.1", 6379);
+//        try (Jedis jedis = pool.getResource()) {
 //            // Store & Retrieve a simple string
 //            jedis.set("foo", "bar");
 //            String foo = jedis.get("foo");
@@ -108,30 +87,30 @@ public class Test {
 //                System.out.println("暂停");
 //            }
 //            TODO bitmap使用
-            jedis.setbit("testBit", 32, true);
-            jedis.setbit("testBit", 1, true);
-            jedis.setbit("testBit", 15, true);
-            jedis.setbit("testBit", 27, true);
-            List<Integer> list = Arrays.asList(1, 15, 27, 32);
-            for (Integer p : list) {
-                boolean testBit = jedis.getbit("testBit", p);
-                if (testBit) {
-                    System.out.println(String.format("redis testBit 设置成功"));
-                } else {
-                    System.out.println(String.format("redis testBit 设置失败"));
-                }
-            }
-            BitSet bitSet = BitSet.valueOf(jedis.get("testBit").getBytes());
-            for (Integer p : list) {
-                boolean testBit = bitSet.get(p);
-                if (testBit) {
-                    System.out.println(String.format("bitset testBit 设置成功"));
-                } else {
-                    System.out.println(String.format("bitset testBit 设置失败"));
-                }
-            }
-            System.out.println("暂停");
-        }
+//            jedis.setbit("testBit", 32, true);
+//            jedis.setbit("testBit", 1, true);
+//            jedis.setbit("testBit", 15, true);
+//            jedis.setbit("testBit", 27, true);
+//            List<Integer> list = Arrays.asList(1, 15, 27, 32);
+//            for (Integer p : list) {
+//                boolean testBit = jedis.getbit("testBit", p);
+//                if (testBit) {
+//                    System.out.println(String.format("redis testBit 设置成功"));
+//                } else {
+//                    System.out.println(String.format("redis testBit 设置失败"));
+//                }
+//            }
+//            BitSet bitSet = BitSet.valueOf(jedis.get("testBit").getBytes());
+//            for (Integer p : list) {
+//                boolean testBit = bitSet.get(p);
+//                if (testBit) {
+//                    System.out.println(String.format("bitset testBit 设置成功"));
+//                } else {
+//                    System.out.println(String.format("bitset testBit 设置失败"));
+//                }
+//            }
+//            System.out.println("暂停");
+//        }
 
         System.out.println("暂停");
     }

+ 40 - 0
src/main/java/top/zhixinghe1/money/Word.java

@@ -0,0 +1,40 @@
+package top.zhixinghe1.money;
+
+import java.io.Serial;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author tyuio
+ */
+public class Word implements Serializable {
+
+    @Serial
+    private static final long serialVersionUID = 888376712090774661L;
+
+    private String key;
+
+    private Map<CharSequence, Integer> stemMap;
+
+    public Word(String key, Map<CharSequence, Integer> stemMap) {
+        this.key = key;
+        this.stemMap = stemMap;
+    }
+
+    public String getKey() {
+        return key;
+    }
+
+    public void setKey(String key) {
+        this.key = key;
+    }
+
+    public Map<CharSequence, Integer> getStemMap() {
+        return stemMap;
+    }
+
+    public void setStemMap(Map<CharSequence, Integer> stemMap) {
+        this.stemMap = stemMap;
+    }
+}

+ 0 - 4
src/main/java/top/zhixinghe1/money/constant.java

@@ -1,4 +0,0 @@
-package top.zhixinghe1.money;
-
-public class constant {
-}