线程池的应用

developS / 2023-07-31 / 原文

业务场景中存在需要推送数据的情况,步骤如下:

1.通过获取当前日期的最小id和最大id

2.分段获取数据,每次取4000条数据

3.使用消息队列推送数据

目前单线程直接推送数据,频率在15000条/分钟。考虑到业务高峰期的情况,数据量在500万以上,推送时间可能达到6h,效率较低,

因此引入多线程加快推送效率!附推送流程!

//设置线程池参数 允许的最长队列为Intger.max 这里不会oom,因为是循环分段,最多积压设置的条数
ExecutorService executorService = Executors.newFixedThreadPool(corePoolSize);

while (startId.compareTo(maxMinIdEntity.getMaxId()) <= 0) {
List<T> list = getDataFromDb(date, startId, endId);
Clog.infoByTags("推送数据到天平", String.format("获取数据:%s-%s;Count:%s", startId, endId, list.size()), mapLog);
//获取总的任务数和当前的任务数
AtomicInteger totalWorks = new AtomicInteger(list.size());
AtomicInteger currentWorks = new AtomicInteger(0);

if(CollectionUtils.isNotEmpty(list))
{
for (T t : list) {
//try catch
try {
//任务超过设置的线程数,可以sleep一会
while (currentWorks.get() >= corePoolSize) {
Thread.sleep(10);
}
currentWorks.incrementAndGet();

executorService.execute(() -> {
try {
//数据组装
TpEntity<R> tpEntity = new TpEntity<>();
tpEntity.setInterfaceName(getInterfaceName());
tpEntity.setData(Arrays.asList(convertToTpData(t)));
//消息队列发送
String messageId = QmqProducer.sendMessageToTp(getQmqSubject(), tpEntity);
if (null == messageId) {
Clog.infoByTags("返回messageId", "返回messageId为空", mapLog);
}
//更新数据库状态
int excuteCount = updatePushResult(t, messageId);
Clog.infoByTags("推送数据到天平", String.format("推送Qmq,更新DB结果:%s", excuteCount), mapLog);

} catch (Exception e) {
CLogger.error("pushDataException_item", e, mapLog);
} finally {
//执行完毕释放
totalWorks.decrementAndGet();
currentWorks.decrementAndGet();
}
});
} catch (Exception e) {
CLogger.error("pushDataException", e, mapLog);
}

}
while (totalWorks.get() > 0) {
Thread.sleep(10);
}
}
//统计数量
executeCount.addAndGet(list.size());
startId = endId;
endId = startId + QConfigUtil.getIntValue(Constants.QconfigKeys.PushDataToTpLimitCount, 1000);
// Thread.sleep(2000);
if (QConfigUtil.isSwitchOn(Constants.SwitchKeys.TestPushDataSwitch)) {
Clog.infoByTags("测试推送数据", "开关停止", mapLog);
break;
}
}
Clog.infoByTags(getDataType(), "处理完成", mapLog);
taskManager.taskFinish(getDataType(), date, executeCount.longValue());
//处理完成后关闭线程池
if (null != executorService && !executorService.isShutdown()) {
executorService.shutdownNow();
}
效果:目前开启30个线程,推送速度可达到15万/分钟,有效解决的了大数据量的推送