上班第一天,老板就甩来30G文件是种什么体验...( 三 )


异步线程作为消费者不断从队列中读取内容,导入到数据库中 。
「一旦队列满载,生产者应该阻塞,直到消费者消费任务 。」
其实我们使用线程池的也是一个**「生产者-消费者」**消费模型,其也使用阻塞队列 。
那为什么线程池在队列满载的时候,不发生阻塞?
这是因为线程池内部使用 offer 方法,这个方法在队列满载的时候**「不会发生阻塞」**,而是直接返回。
那我们有没有办法在线程池队列满载的时候,阻塞主线程添加任务?
其实是可以的,我们自定义线程池拒绝策略,当队列满时改为调用 .put 来实现生产者的阻塞 。
RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {if (!executor.isShutdown()) {try {executor.getQueue().put(r);} catch (InterruptedException e) {// should not be interrupted}}}};
这样一旦线程池满载,主线程将会被阻塞 。
使用这种方式之后,我们可以直接使用上面提到的多线程导入的代码 。
ExecutorService executorService = new ThreadPoolExecutor(5,10,60,TimeUnit.MINUTES,new ArrayBlockingQueue<>(100),new ThreadFactoryBuilder().setNameFormat("test-%d").build(),(r, executor) -> {if (!executor.isShutdown()) {try {// 主线程将会被阻塞executor.getQueue().put(r);} catch (InterruptedException e) {// should not be interrupted}}});File file = new File("文件路径");try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {while (iterator.hasNext()) {String line = iterator.nextLine();executorService.submit(() -> convertToDB(line));}}
小 结
一个超大的文件,我们可以采用拆分文件的方式,将其拆分成多份文件,然后部署多个应用程序提高读取速度 。
另外读取过程我们还可以使用多线程的方式并发导入,不过我们需要注意线程池满载之后,将会拒绝后续任务 。
我们可以通过扩展线程池,自定义拒绝策略,使读取主线程阻塞 。
好了,今天文章内容就到这里,不知道各位有没有其他更好的解决办法,欢迎留言讨论 。