把普通的程序改成并行化放在spark上运行,把其中遇到的简单问题整理了一下。
author: @Huji
读取文件
读取linux文件系统上的文件地址要写作如下:
|
|
注意:这里pandas的read_csv不能读取hdfs上面的文件
sc读取hdfs的时候可以用
|
|
或者
|
|
map函数加入参数
我们都知道map可以把rdd中的每个元素进行一个转换操作,例如
|
|
但是更多的时候我们还需要加入别的参数,可以有如下的方法予以解决[1][2]:
- 通过在flatMap中使用匿名函数来实现
|
|
或者
|
|
- 还可以用下面这种方法生成processDataLine
|
|
toolz
库还提供了高效的curry
装饰器:
|
|
- 采用
functools.partial
的方法将参数传入函数中
|
|
并行化自定义对象
我自己是创建一个station的类,然后构建了一个包含数个station对象的list,对其进行parallelize,然后对这一rdd对象做map操作,这时候就会报错说python的damon中没有station这个属性。
出现这个问题的原因是我定义的station对象只是在driver,但是各个worker并不知道这一个对象的定义,因此我需要单独将其定义在一个文件中,然后确保它能够分布到各个worker上。
下面是一个简单的例子[3],这个例子当中,如果并行化的对象是内置的数据类型,如int之类的,是没有任何问题的,但如果涉及到自己定义的node对象,那么map操作(marker2那句)就会报错。
|
|
详细的解决步骤为:
(1) 首先创建一个单独的模块node.py
,其中包含node
的定义
(2) 在主程序文件中引入node类
|
|
(3) 确保该模块被分布至各个节点(其实这个不写这句话也行,但是在执行程序的命令里面要加上这个文件,后面还会提到)
|
|
关于print输出的问题
原始单机跑Python文件的时候,我经常在函数里面会写一些print之类的输出语句来方便调试,但是在改为并行化的程序之后,只有master上运行的代码段的输出会显示出来,很多并行化的任务会在worker上运行,所以我们并不能看到输出,需要输出的话,可能要对rdd进行take操作,然后取出其中的数值进行输出。
共享变量的问题
在map函数中往往会用到一些大的dataframe,那么到底是本地读取还是先读取好之后broadcast出去呢?目前我觉得还是选择broadcast比较好。
实测如果把文件放到每台机器上,然后在函数中每次读取的话,运行耗时是读取一次然后broadcast的两倍左右。
使用broadcast的时候,注意通过value取出其内容,如下面的例子:
|
|
spark-submit提交任务
可以参考如下语句提交任务
|
|