背景
公司的接口机上面没有安装 spark ,为了充分利用计算机的多个核,加快数据预处理,便自己写了一份多进程数据预处理代码,
功能
这个框架基本功能是
- 将任务拆分为若干个子任务并行运行
- 可以指定用几个进程跑,将运算后的结果做 reduce
- 如果进程结束了将会扫描数据文件夹中是否还有新的文件需要处理,如果有的话会新建一个新的进程处理新添加的文件(处理流数据比较有用)
使用
这个框架的使用比较简单,最主要的做法就是:
- 继承一个并行类 (parallel)
这个类会自动帮你部署好你处理文件所需要的上下文,并且提供 reduce 的功能,将并行处理好的结果合并到一个文件中去。 - 实现这个并行类的 run 方法
- 调用 reduce 方法合并你的处理结果
如果你想将结果合并到几个文件中去也可以,你可以将这几个文件的文件名存为 result_dict 的键值,然后调用方法 reduce 就可以自动按照键值合并,并且合并后的文件名是以键值.来命名的
如上图所示,在具体使用的时候只需要重写这个run函数就可以了,可以看到这 run 方法已经可以拿到运算时所需要的上下文了
如何处理大文件
以上讲述的是如何处理多个文件,对多个文件做并行处理,但是如果输入是一个大文件的话,怎么处理呢?
这里的做法是对大文件做分割,将大文件分割成小文件,一般来说分割文件的速度一般会比处理文件的速度快,所以可以一边分割文件,一边并行处理小文件。因为这个框架支持动态添加进程,所以文件数量的变化不会影响原先程序的运行状态,并且也会被处理。可以用于对流式数据的预处理。
具体指令:
python partition.py --file=./游戏.txt --outd=in --suffix=.txt --len=50000
第一个参数是要处理的大文件名
第二个参数是输出文件夹名
第三个参数是一个文件放多少行数据
实测
python run.py preprocess.py --input=in --n=10
第一参数是 继承 parallel run 方法的py 文件
第二个 --input 是放数据的文件夹地址 每隔1秒钟会扫描文件夹如果有新的文件进来,程序会新建进程处理这份数据
第三个 --n 是使用多少个进程运行
可以看出在运行的时候后台启动了 10 个进程并行处理数据,预处理速度提升了 10 倍
不足之处
现在每一个进程的 cpu 的利用率还不是百分之百,这个和 重写的 run 方法有关,也和一个进程中的线程数量有关,有待改进
本文由 admin 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: Aug 7, 2019 at 10:30 am