自己动手用 python 写一个流式数据预处理框架

in with 0 comment view 157 times

背景

公司的接口机上面没有安装 spark ,为了充分利用计算机的多个核,加快数据预处理,便自己写了一份多进程数据预处理代码,

功能

这个框架基本功能是

  1. 将任务拆分为若干个子任务并行运行
  2. 可以指定用几个进程跑,将运算后的结果做 reduce
  3. 如果进程结束了将会扫描数据文件夹中是否还有新的文件需要处理,如果有的话会新建一个新的进程处理新添加的文件(处理流数据比较有用)

使用

这个框架的使用比较简单,最主要的做法就是:

  1. 继承一个并行类 (parallel)
    这个类会自动帮你部署好你处理文件所需要的上下文,并且提供 reduce 的功能,将并行处理好的结果合并到一个文件中去。
  2. 实现这个并行类的 run 方法
  3. 调用 reduce 方法合并你的处理结果
    如果你想将结果合并到几个文件中去也可以,你可以将这几个文件的文件名存为 result_dict 的键值,然后调用方法 reduce 就可以自动按照键值合并,并且合并后的文件名是以键值.来命名的

企业微信截图_20190801165533.png

如上图所示,在具体使用的时候只需要重写这个run函数就可以了,可以看到这 run 方法已经可以拿到运算时所需要的上下文了
企业微信截图_20190801182225.png

如何处理大文件

以上讲述的是如何处理多个文件,对多个文件做并行处理,但是如果输入是一个大文件的话,怎么处理呢?
这里的做法是对大文件做分割,将大文件分割成小文件,一般来说分割文件的速度一般会比处理文件的速度快,所以可以一边分割文件,一边并行处理小文件。因为这个框架支持动态添加进程,所以文件数量的变化不会影响原先程序的运行状态,并且也会被处理。可以用于对流式数据的预处理。
具体指令:

python partition.py --file=./游戏.txt --outd=in  --suffix=.txt --len=50000

第一个参数是要处理的大文件名
第二个参数是输出文件夹名
第三个参数是一个文件放多少行数据

实测

python run.py preprocess.py --input=in --n=10

第一参数是 继承 parallel run 方法的py 文件
第二个 --input 是放数据的文件夹地址 每隔1秒钟会扫描文件夹如果有新的文件进来,程序会新建进程处理这份数据
第三个 --n 是使用多少个进程运行

企业微信截图_20190801171649.png

可以看出在运行的时候后台启动了 10 个进程并行处理数据,预处理速度提升了 10 倍

不足之处

现在每一个进程的 cpu 的利用率还不是百分之百,这个和 重写的 run 方法有关,也和一个进程中的线程数量有关,有待改进

Responses