应用sqlite和python多进程模块解决大数据文件的处理

摘要

为处理计算包含54万患者,共8个单个大小超过2G的CSV数据,应用了本地sqlite数据库来实现分批处理,减小内存压力。并应用了multiprocess模块来实现多进程处理,充分利用多核CPU来减少运算时间。

直接运算的问题

  • 使用pandas读取大于2G的数据,需要占用远远超过2G的内存,就算主机有32G,在读取多个文件,在加上多个复杂计算的代码后,内存也很快被挤爆。而且运算需要的时间过长,不做多进程设计的情况下,16核的CPU只有可怜的一核在做运算,其他核处于围观状态。就算内存没被撑爆,运算时间也需要10个小时以上。直接不加处理运算在面对此情况下的需求时,变得一筹莫展。怎么破?

破解思路

首先要解决数据太大占用内存过高的问题。我想到的解决方案是把数据导入一个本地的sqlite数据库中,然后按照批次的调用患者数据进行计算,这样不用把所有数据一次导入内存,实现批次流水作业。但是还没有解决54万患者数据运算时间过长的问题。解决这个问题,就要把所有的cpu资源都用上。就我具体主机而言,CPU是16核的,那我们就一次运行16个进程,让cpu使用率达到100%。把两种改进的方法结合起来,在一轮处理中,调出16批各自独立的患者,没批患者量适中不能撑爆内存,比如我选择2000名,那么我们一轮在占用100%cpu资源的情况下,就能同时处理32000名患者。实际情况下,运用了这种策略,原来预计要10个多小时的数据运算,只用了不到1小时就完成,成就感满满😄。

具体实现方法

python sqlite数据库操作使用sqlalchemy包。建立相应的表,实现数据导入。

数据的直接操作和计算使用pandas包。先建立唯一患者标识列表,然后每次16组,每组2000名患者从sqlite中调出该批患者所有表中数据。数据调出方式也是异步并行完成,充分利用cpu。然后把比组数据丢入一个cpu运算进程进行运算。所有16个进程的计算都完成后。把结果合并到一个统一的数据集中。然后开始下一轮数据调出和运算,直至所有患者的数据都抽取完,并把结果都累计到一个统一的结果数据集中。

多进程操作使用了multiprocess模组,使用其中的pool对象建立一个报告16个进程的进程池。然后使用pool对象的async_apply执行方法,异步的把核心计算启动函数在同时间启动,达到并行计算,全部利用cpu资源的效果。

总结

  • 我之前也有利用multiprocess模块并行处理大数据文件的经验,但之前还是比较依赖大内存,解决的方式不够系统和优雅。这次的方案有点是资源调用充分,并且利用数据库,可以合理的控制内存的消耗。不足之处就是数据库的建立需要一些时间,同时需要占用磁盘空间。并