老大甩给我 30G 文件,让我几天内全部导入到数据库( 三 )


虽然这种方式解决上述问题,但是这种方式,每次都需要积累一定任务数才能开始异步执行所有任务 。
另外每次都需要等待所有任务执行结束之后,才能开始下一批任务,批量执行消耗的时间等于最慢的异步任务消耗的时间 。
这种方式线程池中线程存在一定的闲置时间,那有没有办法一直压榨线程池,让它一直在干活呢?
扩展线程池
回到最开始的问题,文件读取导入,其实就是一个「生产者-消费者」消费模型 。
主线程作为生产者不断读取文件,然后将其放置到队列中 。
异步线程作为消费者不断从队列中读取内容,导入到数据库中 。
「一旦队列满载,生产者应该阻塞,直到消费者消费任务 。」
其实我们使用线程池的也是一个「生产者-消费者」消费模型,其也使用阻塞队列 。
那为什么线程池在队列满载的时候,不发生阻塞?
这是因为线程池内部使用 offer 方法,这个方法在队列满载的时候「不会发生阻塞」,而是直接返回。
那我们有没有办法在线程池队列满载的时候,阻塞主线程添加任务?
其实是可以的,我们自定义线程池拒绝策略,当队列满时改为调用 .put 来实现生产者的阻塞 。
RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {if (!executor.isShutdown()) {try {executor.getQueue().put(r);} catch (InterruptedException e) {// should not be interrupted}}}};
这样一旦线程池满载,主线程将会被阻塞 。
使用这种方式之后,我们可以直接使用上面提到的多线程导入的代码 。
ExecutorService executorService = new ThreadPoolExecutor(5,10,60,TimeUnit.MINUTES,new ArrayBlockingQueue<>(100),new ThreadFactoryBuilder().setNameFormat("test-%d").build(),(r, executor) -> {if (!executor.isShutdown()) {try {// 主线程将会被阻塞executor.getQueue().put(r);} catch (InterruptedException e) {// should not be interrupted}}});File file = new File("文件路径");try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {while (iterator.hasNext()) {String line = iterator.nextLine();executorService.submit(() -> convertToDB(line));}}
小结
一个超大的文件,我们可以采用拆分文件的方式,将其拆分成多份文件,然后部署多个应用程序提高读取速度 。
另外读取过程我们还可以使用多线程的方式并发导入,不过我们需要注意线程池满载之后,将会拒绝后续任务 。
我们可以通过扩展线程池,自定义拒绝策略,使读取主线程阻塞 。
好了,今天文章内容就到这里,不知道各位有没有其他更好的解决办法,欢迎留言讨论 。