AbstractDataTask.java 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package com.dataeasy.server.task;
  2. import java.sql.Timestamp;
  3. import java.util.Arrays;
  4. import java.util.List;
  5. import java.util.Objects;
  6. import java.util.Set;
  7. import java.util.stream.Collectors;
  8. import com.binarywang.spring.starter.wxjava.miniapp.properties.WxMaProperties;
  9. import me.chanjar.weixin.mp.bean.template.WxMpTemplateMessage;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.cache.CacheManager;
  12. import org.springframework.util.CollectionUtils;
  13. import org.springframework.util.StringUtils;
  14. import com.dataeasy.server.atomic.entity.SubscriptionTaskConfig;
  15. import com.dataeasy.server.atomic.entity.SubscriptionUserConfig;
  16. import com.dataeasy.server.atomic.entity.User;
  17. import com.dataeasy.server.atomic.service.ISubscriptionTaskConfigService;
  18. import com.dataeasy.server.atomic.service.ISubscriptionUserConfigService;
  19. import com.dataeasy.server.atomic.service.ISysScheduleTaskLogService;
  20. import com.dataeasy.server.atomic.service.IUserService;
  21. import com.dataeasy.server.constant.PushOptionEnum;
  22. import com.dataeasy.server.constant.ScheduleTaskEnum;
  23. import com.dataeasy.server.core.aop.ScheduleTaskLogAspect;
  24. import com.dataeasy.server.pojo.subscription.SubscriptionUserConfigQuery;
  25. import com.dataeasy.server.pojo.task.ScheduleTaskContext;
  26. import com.dataeasy.server.service.manager.IWxManager;
  27. import lombok.extern.slf4j.Slf4j;
  28. import me.chanjar.weixin.mp.bean.template.WxMpTemplateData;
  29. /**
  30. * @author tyuio
  31. * @version 1.0.0
  32. * @description 数据定时任务 抽象类
  33. * @date 2025/3/7 10:44
  34. */
  35. @Slf4j
  36. public abstract class AbstractDataTask {
  37. @Autowired
  38. protected ISysScheduleTaskLogService taskLogService;
  39. @Autowired
  40. protected ISubscriptionTaskConfigService taskConfigService;
  41. @Autowired
  42. protected ISubscriptionUserConfigService userConfigService;
  43. @Autowired
  44. protected IUserService userService;
  45. @Autowired
  46. protected IWxManager wxMpManager;
  47. @Autowired
  48. protected CacheManager cacheManager;
  49. @Autowired
  50. private WxMaProperties wxMaProperties;
  51. /**
  52. * 定时任务执行逻辑
  53. */
  54. public void execute() {
  55. // 获取定时任务上下文
  56. ScheduleTaskContext scheduleTaskContext = ScheduleTaskLogAspect.scheduleTaskContextThreadLocal.get();
  57. ScheduleTaskEnum task = scheduleTaskContext.getTask();
  58. SubscriptionTaskConfig subscriptionTaskConfig = scheduleTaskContext.getSubscriptionTaskConfig();
  59. // 拉取数据并入库
  60. boolean fetchResult = fetchData();
  61. if (!fetchResult) {
  62. log.warn("任务:{},任务编码:{},拉取数据失败,结束执行", task.getName(), task);
  63. return;
  64. }
  65. // 寻找要推送数据的用户
  66. SubscriptionUserConfigQuery userConfigQuery = new SubscriptionUserConfigQuery();
  67. userConfigQuery.setSubscriptionSourceIds(Arrays.asList(subscriptionTaskConfig.getId()));
  68. userConfigQuery.setPushOption(PushOptionEnum.ENABLED);
  69. userConfigQuery.setCurrentTime(new Timestamp(System.currentTimeMillis()));
  70. List<SubscriptionUserConfig> userConfigList = userConfigService.getByCondition(userConfigQuery);
  71. if (CollectionUtils.isEmpty(userConfigList)) {
  72. log.warn("任务:{},任务编码:{},没有找到推送状态为启用的用户配置,结束执行", task.getName(), task);
  73. return;
  74. }
  75. Set<Long> userIds = userConfigList.stream().filter(v -> Objects.nonNull(v.getUserId()))
  76. .map(SubscriptionUserConfig::getUserId)
  77. .collect(Collectors.toSet());
  78. if (CollectionUtils.isEmpty(userIds)) {
  79. log.warn("任务:{},任务编码:{},没有找到推送状态为启用的用户,结束执行", task.getName(), task);
  80. return;
  81. }
  82. List<User> userList = userService.getByIds(userIds);
  83. if (CollectionUtils.isEmpty(userList)) {
  84. log.warn("任务:{},任务编码:{},没有找到待推送的用户列表,结束执行", task.getName(), task);
  85. return;
  86. }
  87. Set<String> mpOpenIds = userList.stream().filter(v -> StringUtils.hasText(v.getMpOpenId()))
  88. .map(User::getMpOpenId)
  89. .collect(Collectors.toSet());
  90. if (CollectionUtils.isEmpty(mpOpenIds)) {
  91. log.warn("任务:{},任务编码:{},待推送的用户列表中没有找到服务号openid,结束执行", task.getName(), task);
  92. return;
  93. }
  94. // 获取消息模板所需数据
  95. List<WxMpTemplateData> templateDataList = getTemplateMessage();
  96. // 设置小程序
  97. WxMpTemplateMessage.MiniProgram miniProgram = new WxMpTemplateMessage.MiniProgram();
  98. miniProgram.setAppid(wxMaProperties.getAppid());
  99. miniProgram.setPagePath(subscriptionTaskConfig.getPagePath());
  100. // 推送
  101. for (String mpOpenId : mpOpenIds) {
  102. // 生成模板信息
  103. WxMpTemplateMessage mpTemplateMessage = WxMpTemplateMessage.builder()
  104. .toUser(mpOpenId)
  105. .templateId(subscriptionTaskConfig.getTemplateId())
  106. .miniProgram(miniProgram)
  107. .data(templateDataList)
  108. .build();
  109. wxMpManager.sendTemplateMessage(mpTemplateMessage);
  110. }
  111. }
  112. /**
  113. * 拉取所需数据并入库
  114. * @return false-拉取数据入库失败 true-拉取数据入库成功
  115. */
  116. public abstract boolean fetchData();
  117. /**
  118. * 获取消息模板所需数据
  119. * @return
  120. */
  121. public abstract List<WxMpTemplateData> getTemplateMessage();
  122. }