Contents

从Pandas到PySpark

PySpark的特性

Immutable

  • Changes create new object references
  • Old versions are unchanged

Lazy

  • Compute does not happen until output is requested

Pandas VS PySpark

Load CSV

1
2
3
4
5
# Pandas
df = pd.read_csv("datafile.csv")

# PySpark
df = spark.read.options(header=True, inferSchema=True).csv("datafile.csv")

View Dataframe

1
2
3
4
5
# Pandas
df

# PySpark
df.show()

Columns & Data Types

1
2
3
4
5
6
7
# Pandas
df.columns
df.dtypes

# PySpark
df.columns
df.dtypes

Rename Columns

1
2
3
4
5
6
7
# Pandas
df.columns = ['a', 'b', 'c']
df.rename(columns = {'old': 'new'})

# PySpark
df.toDF('a', 'b', 'c')
df.withColumnRenamed('old', 'new')

Drop Columns

1
2
3
4
5
# Pandas
df.drop('colx', axis=1)

# PySpark
df.drop('colx')

Filtering

1
2
3
4
5
6
7
# Pandas
df[df.col1 < 20]
df[(df.col1 < 20) & (df.col2 == 6)] # 注意括号

# PySpark
df[df.col1 < 20]
df[(df.col1 < 20) & (df.col2 == 6)] # 注意括号

Add Column

1
2
3
4
5
# Pandas
df['col3'] = 1 / df.col1 # 除以0得infty

# PySpark
df.withColumn('col3', 1 / df.col1) # 除以0得Null

Fill Nulls

1
2
3
4
5
# Pandas
df.fillna(0) # more options

# PySpark
df.fillna(0)

Aggregation

1
2
3
4
5
# Pandas
df.groupby(['col1', 'col2']).agg({'col3': 'mean', 'col4': 'min'})

# PySpark
df.groupby(['col1', 'col2']).agg({'col3': 'mean', 'col4': 'min'})

标准的转化

1
2
3
4
5
6
7
# Pandas
import numpy as np
df['logCol'] = np.log(df.col)

# PySpark
import pyspark.sql.functions as F
df.withColumn('logCol', F.log(df.col))

使用pyspark.sql.functions里的函数,可以直接调用JVM,即Java引擎,而非使用较慢的python。

例子Row Conditional Statements

1
2
3
4
5
6
7
8
9
# Pandas
df['status'] = df.apply(lambda r: 1 if r.col1 > 20 else 2 if r.col2 == 6 else 3, axis=1)

# PySpark
import pyspark.sql.functions as F
df.withColumn('status', \
	F.when(df.col1 > 20, 1) \
	 .when(df.col2 == 6, 2) \
	 .otherwise(3))

更复杂的转化,python必要时

1
2
3
4
5
6
7
8
9
# Pandas
df['col5'] = df.col1.apply(lambda x : x + 1)

# PySpark
import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType
# udf必须deterministic,pyspark优化默认相同运算结果相同
fn = F.udf(lambda x : x + 1, DoubleType())
df.withColumn('col5', fn(df.col1))

Join

1
2
3
4
5
6
7
# Pandas
leftDF.merge(rightDF, on='key')
leftDF.merge(rightDF, left_on='a', right_on='b')

# PySpark
leftDF.join(rightDF, on='key')
leftDF.join(rightDF, leftDF.a == rightDF.b)

默认为inner join

统计

1
2
3
4
5
6
# Pandas
df.describe()

# PySpark
df.describe().show()
df.selectExpr("percentile_approx(col1, array(.25, .5, .75)) as col1").show()

统计图

1
2
3
4
5
# Pandas
df.hist()

# PySpark
df.sample(False, 0.1).toPandas().hist()

SQL

1
2
3
4
5
6
# Pandas
NA

# PySpark
df.createOrReplaceTempView('foo')
df2 = spark.sql('select * from foo')

使用习惯

  • 使用pyspark.sql.functions内置函数。
  • 在cluster中保持和driver相同的版本和库。
  • 观测状态http://localhost:4040/
  • Learn about SSH port forwarding

不要:

  • Try to iterate through rows
  • Hard code a master in your dirver
    • Use spark-submit for that
  • df.toPandas().head()
    • instead do: df.limit(5).toPandas()

参考

Spark Submit 2017