Contents

Spark 101: DataFrame基本使用

Example 1. DataFrame的5种基本操作

1.筛选行,2.选取列,3.增加行列,4.分组总结,5.排序

一个简单的工资数据集:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
df = spark.createDataFrame([
    (1, 'sales', 4200),
    (2, 'admin', 3100),
    (3, 'sales', 4000),
    (4, 'sales', 4000),
    (5, 'admin', 2700),
    (6, 'dev', 3400),
    (7, 'dev', 5200),
    (8, 'dev', 3700),
    (9, 'dev', 4400),
    (10, 'dev', 4400)
], schema=['id', 'dept', 'salary'])

df.show()

# +---+-----+------+
# | id| dept|salary|
# +---+-----+------+
# |  1|sales|  4200|
# |  2|admin|  3100|
# |  3|sales|  4000|
# |  4|sales|  4000|
# |  5|admin|  2700|
# |  6|  dev|  3400|
# |  7|  dev|  5200|
# |  8|  dev|  3700|
# |  9|  dev|  4400|
# | 10|  dev|  4400|
# +---+-----+------+
Q1: 列出dev部门工资大于4000的所有id。(filter, select, show)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
df.filter((F.col('dept') == 'dev') & (F.col('salary') > 4000)).select(['id', 'salary']).show()

# +---+------+
# | id|salary|
# +---+------+
# |  6|  3400|
# |  7|  5200|
# |  8|  3700|
# |  9|  4400|
# | 10|  4400|
# +---+------+
Q2: 求基础工资的10%奖金列,并补全列名dept->department。(withColumn, withColumnRenamed)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
df.withColumn('bonus', F.col('salary')*0.1).withColumnRenamed('dept', 'department').show()

# +---+----------+------+-----+
# | id|department|salary|bonus|
# +---+----------+------+-----+
# |  1|     sales|  4200|420.0|
# |  2|     admin|  3100|310.0|
# |  3|     sales|  4000|400.0|
# |  4|     sales|  4000|400.0|
# |  5|     admin|  2700|270.0|
# |  6|       dev|  3400|340.0|
# |  7|       dev|  5200|520.0|
# |  8|       dev|  3700|370.0|
# |  9|       dev|  4400|440.0|
# | 10|       dev|  4400|440.0|
# +---+----------+------+-----+
Q3: 求每个部门的最低工资,平均工资,最高工资。(groupby, agg, alias, sort), 并按平均工资降序排序。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
df.groupby('dept').agg(F.min(F.col('salary')).alias('minimum'), \
                       F.avg(F.col('salary')).alias('mean'), \
                       F.max(F.col('salary')).alias('maximum'))\
                .sort('mean', ascending=False).show()

# +-----+-------+------------------+-------+
# | dept|minimum|              mean|maximum|
# +-----+-------+------------------+-------+
# |  dev|   3400|            4220.0|   5200|
# |sales|   4000|4066.6666666666665|   4200|
# |admin|   2700|            2900.0|   3100|
# +-----+-------+------------------+-------+

Example 2. groupby 与 window

参考:Youtube, How do spark window functions work?

Q1: 求部门工资数组,和部门总工资。(groupby, agg, collect_list)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
df = df.groupby('dept').agg(
    F.expr('collect_list(salary)').alias('list_salary'),
    F.expr('avg(salary)').alias('average_salary'),
    F.expr('sum(salary)').alias('total_salary')
).show()

# +-----+--------------------+------------------+------------+
# | dept|         list_salary|    average_salary|total_salary|
# +-----+--------------------+------------------+------------+
# |  dev|[3400, 5200, 3700...|            4220.0|       21100|
# |sales|  [4200, 4000, 4000]|4066.6666666666665|       12200|
# |admin|        [3100, 2700]|            2900.0|        5800|
# +-----+--------------------+------------------+------------+
Q2: 在每条数据旁列出所在部门的工资情况(上问的答案)。(Window, partitionBy, over)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
windowSpec = Window.partitionBy('dept')

df = df.withColumn('list_salary', F.collect_list(F.col('salary')).over(windowSpec))\
        .withColumn('average_salary', F.avg(F.col('salary')).over(windowSpec))\
        .withColumn('total_salary', F.sum(F.col('salary')).over(windowSpec)).show()

# +---+-----+------+--------------------+------------------+------------+
# | id| dept|salary|         list_salary|    average_salary|total_salary|
# +---+-----+------+--------------------+------------------+------------+
# |  6|  dev|  3400|[3400, 5200, 3700...|            4220.0|       21100|
# |  7|  dev|  5200|[3400, 5200, 3700...|            4220.0|       21100|
# |  8|  dev|  3700|[3400, 5200, 3700...|            4220.0|       21100|
# |  9|  dev|  4400|[3400, 5200, 3700...|            4220.0|       21100|
# | 10|  dev|  4400|[3400, 5200, 3700...|            4220.0|       21100|
# |  1|sales|  4200|  [4200, 4000, 4000]|4066.6666666666665|       12200|
# |  3|sales|  4000|  [4200, 4000, 4000]|4066.6666666666665|       12200|
# |  4|sales|  4000|  [4200, 4000, 4000]|4066.6666666666665|       12200|
# |  2|admin|  3100|        [3100, 2700]|            2900.0|        5800|
# |  5|admin|  2700|        [3100, 2700]|            2900.0|        5800|
# +---+-----+------+--------------------+------------------+------------+
Q3: 将工资在其部门顺序,在部门内扫描得到前缀和(相等的工资同时扫描)。(orderBy)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
windowSpec = Window.partitionBy('dept').orderBy(F.asc('salary'))

df = df.withColumn('list_salary', F.collect_list(F.col('salary')).over(windowSpec))\
        .withColumn('average_salary', F.avg(F.col('salary')).over(windowSpec))\
        .withColumn('total_salary', F.sum(F.col('salary')).over(windowSpec)).show()

# +---+-----+------+--------------------+------------------+------------+
# | id| dept|salary|         list_salary|    average_salary|total_salary|
# +---+-----+------+--------------------+------------------+------------+
# |  6|  dev|  3400|              [3400]|            3400.0|        3400|
# |  8|  dev|  3700|        [3400, 3700]|            3550.0|        7100|
# |  9|  dev|  4400|[3400, 3700, 4400...|            3975.0|       15900|
# | 10|  dev|  4400|[3400, 3700, 4400...|            3975.0|       15900|
# |  7|  dev|  5200|[3400, 3700, 4400...|            4220.0|       21100|
# |  3|sales|  4000|        [4000, 4000]|            4000.0|        8000|
# |  4|sales|  4000|        [4000, 4000]|            4000.0|        8000|
# |  1|sales|  4200|  [4000, 4000, 4200]|4066.6666666666665|       12200|
# |  5|admin|  2700|              [2700]|            2700.0|        2700|
# |  2|admin|  3100|        [2700, 3100]|            2900.0|        5800|
# +---+-----+------+--------------------+------------------+------------+

注:注意观察id9&10的重复工资,相等的工资条将导致工资前缀和的同时扫描。请看下问一个更贴切实际的前缀和实例。

Q4: Q3问题不变(相等的工资先后扫描)。(rowsBetween)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
windowSpec = Window.partitionBy('dept').orderBy(F.asc('salary'))\
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

df = df.withColumn('list_salary', F.collect_list(F.col('salary')).over(windowSpec))\
        .withColumn('average_salary', F.avg(F.col('salary')).over(windowSpec))\
        .withColumn('total_salary', F.sum(F.col('salary')).over(windowSpec)).show()

# +---+-----+------+--------------------+------------------+------------+
# | id| dept|salary|         list_salary|    average_salary|total_salary|
# +---+-----+------+--------------------+------------------+------------+
# |  6|  dev|  3400|              [3400]|            3400.0|        3400|
# |  8|  dev|  3700|        [3400, 3700]|            3550.0|        7100|
# |  9|  dev|  4400|  [3400, 3700, 4400]|3833.3333333333335|       11500|
# | 10|  dev|  4400|[3400, 3700, 4400...|            3975.0|       15900|
# |  7|  dev|  5200|[3400, 3700, 4400...|            4220.0|       21100|
# |  3|sales|  4000|              [4000]|            4000.0|        4000|
# |  4|sales|  4000|        [4000, 4000]|            4000.0|        8000|
# |  1|sales|  4200|  [4000, 4000, 4200]|4066.6666666666665|       12200|
# |  5|admin|  2700|              [2700]|            2700.0|        2700|
# |  2|admin|  3100|        [2700, 3100]|            2900.0|        5800|
# +---+-----+------+--------------------+------------------+------------+
Q5: 将Q3&4问中的前缀和替换为,滑动窗口总和(当前数据的上一条,当前数据)(rowsBetween)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
windowSpec = Window.partitionBy('dept').orderBy(F.asc('salary'))\
    .rowsBetween(-1, Window.currentRow) # 区间表示

df = df.withColumn('list_salary', F.collect_list(F.col('salary')).over(windowSpec))\
        .withColumn('average_salary', F.avg(F.col('salary')).over(windowSpec))\
        .withColumn('total_salary', F.sum(F.col('salary')).over(windowSpec)).show()

# +---+-----+------+------------+--------------+------------+
# | id| dept|salary| list_salary|average_salary|total_salary|
# +---+-----+------+------------+--------------+------------+
# |  6|  dev|  3400|      [3400]|        3400.0|        3400|
# |  8|  dev|  3700|[3400, 3700]|        3550.0|        7100|
# |  9|  dev|  4400|[3700, 4400]|        4050.0|        8100|
# | 10|  dev|  4400|[4400, 4400]|        4400.0|        8800|
# |  7|  dev|  5200|[4400, 5200]|        4800.0|        9600|
# |  3|sales|  4000|      [4000]|        4000.0|        4000|
# |  4|sales|  4000|[4000, 4000]|        4000.0|        8000|
# |  1|sales|  4200|[4000, 4200]|        4100.0|        8200|
# |  5|admin|  2700|      [2700]|        2700.0|        2700|
# |  2|admin|  3100|[2700, 3100]|        2900.0|        5800|
# +---+-----+------+------------+--------------+------------+
Q6: 求每条工资的部门内顺序,并比较官方的三种名次规则。(rank, dense_rank, perc_rank)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
windowSpec = Window.partitionBy('dept').orderBy(F.asc('salary'))

df = df.withColumn('list_salary', F.collect_list(F.col('salary')).over(windowSpec))\
        .withColumn('rank', F.rank().over(windowSpec))\
        .withColumn('dense_rank', F.dense_rank().over(windowSpec))\
        .withColumn('perc_rank', F.percent_rank().over(windowSpec)).show()

# +---+-----+------+--------------------+----+----------+---------+
# | id| dept|salary|         list_salary|rank|dense_rank|perc_rank|
# +---+-----+------+--------------------+----+----------+---------+
# |  6|  dev|  3400|              [3400]|   1|         1|      0.0|
# |  8|  dev|  3700|        [3400, 3700]|   2|         2|     0.25|
# |  9|  dev|  4400|[3400, 3700, 4400...|   3|         3|      0.5|
# | 10|  dev|  4400|[3400, 3700, 4400...|   3|         3|      0.5|
# |  7|  dev|  5200|[3400, 3700, 4400...|   5|         4|      1.0|
# |  3|sales|  4000|        [4000, 4000]|   1|         1|      0.0|
# |  4|sales|  4000|        [4000, 4000]|   1|         1|      0.0|
# |  1|sales|  4200|  [4000, 4000, 4200]|   3|         2|      1.0|
# |  5|admin|  2700|              [2700]|   1|         1|      0.0|
# |  2|admin|  3100|        [2700, 3100]|   2|         2|      1.0|
# +---+-----+------+--------------------+----+----------+---------+

Example 3. Join与Skew Join

join可以把两张表内关键字匹配的数据做笛卡尔乘积,放入结果表。

参考: Medium, 6 spark exercises to rule them all

数据集:三张表sales, products, sellers, 大约6GB,使用local模式。

Q1: 三张表各有多少记录?(parquet, count)
1
2
3
4
5
6
7
8
DATASET_PATH = './DatasetToCompleteTheSixSparkExercises'
products = spark.read.parquet(f'{DATASET_PATH}/products_parquet')
sales = spark.read.parquet(f'{DATASET_PATH}/sales_parquet')
sellers = spark.read.parquet(f'{DATASET_PATH}/sellers_parquet')

sales.count(), products.count(), sellers.count()

# (20000040, 75000000, 10)
Q2: 有多少种商品至少被卖出过一次?(distinct)
1
2
3
sales.select(['product_id']).distinct().count()

# 993429
Q3: 订单中最常见的商品是哪个?(sort, limit)
1
2
3
4
5
6
7
sales.groupby(['product_id']).count().sort('count', ascending=False).limit(1).show()

# +----------+--------+
# |product_id|   count|
# +----------+--------+
# |         0|19000000|
# +----------+--------+
Q4: 统计salesproduct_id的分布情况 (groupby, groupby)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
key_stats = sales.groupby('product_id').count().withColumnRenamed('count', 'pid_count')\
        .groupby('pid_count').count().toPandas()

key_stats = key_stats.sort_values('pid_count')
key_stats['frequency'] = key_stats['count'] / sum(key_stats['count'])
key_stats['cum_freq'] = np.cumsum(key_stats['frequency'])
key_stats

# 先通过groupby得到key size, 再按照小key到大key排列,最后通过groupby统计每个key size的数量
# 	pid_count	count	frequency	cum_freq
# 0	1	986847	0.993374	0.993374
# 2	2	6550	0.006593	0.999968
# 1	3	31	0.000031	0.999999
# 3	19000000	1	0.000001	1.000000

plt.plot(np.log10(key_stats['pid_count']), key_stats['cum_freq'])
Q5: 每天各卖多少种商品?(countDistinct)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
sales.groupby(['date']).agg(F.countDistinct("product_id").alias("cnt")).sort("cnt", ascending=False).show()

# +----------+------+
# |      date|   cnt|
# +----------+------+
# |2020-07-06|100765|
# |2020-07-09|100501|
# |2020-07-01|100337|
# |2020-07-03|100017|
# |2020-07-02| 99807|
# |2020-07-05| 99796|
# |2020-07-04| 99791|
# |2020-07-07| 99756|
# |2020-07-08| 99662|
# |2020-07-10| 98973|
# +----------+------+
Q6: 求订单盈利(数量x价格)的平均值。(join, skewed data)

由于订单中的商品集中在少数商品ID,使用product_id进行join会造成skewed data,降低并行效率。

1
2
3
4
5
6
7
8
9
df = sales.join(products, on='product_id')
df = df.withColumn('revenue', sales['num_pieces_sold'] * products['price'])
df.select(F.avg('revenue')).show()

# +------------------+
# |      avg(revenue)|
# +------------------+
# |1246.1338560822878|
# +------------------+

可以对product常见key加盐,可以先join两表加盐的key部分,再join两表未加盐的普通key部分,最后union。这样可以把集中分布的key加盐溶解,提高集群的并行效率。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def skew_join(skew_df, other_df, on, topK, how='inner', SALT_RANGE=100):
    # STEP1: find topK popular keys in skew_df
    topKkey = skew_df.groupby(on).agg({'*': 'count'}).withColumnRenamed('count(1)', '_skew_key_cnt')\
                    .sort('_skew_key_cnt', ascending=False).limit(topK)
    
    # STEP2: split skew_df, other_df into pop and unpop halves
    skew_df_pop = skew_df.join(F.broadcast(topKkey), on=on).drop('_skew_key_cnt')
    skew_df_unpop = skew_df.join(F.broadcast(topKkey), on=on, how='left_anti').drop('_skew_key_cnt')
    
    other_df_pop = other_df.join(F.broadcast(topKkey), on=on).drop('_skew_key_cnt')
    other_df_unpop = other_df.join(F.broadcast(topKkey), on=on, how='left_anti').drop('_skew_key_cnt')
    
    # STEP3: salt skew_df_pop, replicate other_df_pop
    skew_df_pop = skew_df_pop.withColumn('_skew_salt', (F.rand() * SALT_RANGE).cast(types.IntegerType()))
    other_df_pop = other_df_pop.withColumn('_skew_salts', F.array([F.lit(i) for i in range(SALT_RANGE+1)]))
    other_df_pop = other_df_pop.select('*', F.explode('_skew_salts').alias('_skew_salt')).drop('_skew_salts')
    
    # STEP4: join seperately and union
    res_pop = skew_df_pop.join(other_df_pop, on=on+['_skew_salt'], how=how).drop('_skew_salt')
    res_unpop = skew_df_unpop.join(other_df_unpop, on=on, how=how)
    
    return res_pop.union(res_unpop)
    
df = skew_join(sales, products, on=['product_id'], topK=10)
df = df.withColumn('revenue', sales['num_pieces_sold'] * products['price'])
df.select(F.avg('revenue')).show()

# +------------------+
# |      avg(revenue)|
# +------------------+
# |1246.1338560822878|
# +------------------+
Q7: 对每个销售员,求一个订单对其目标的平均贡献。(broadcast)

注:比如销售员0目标250件总共拥有三笔订单,订单1卖10件,订单2卖8件,订单3卖7件。那么每张订单贡献为(10/250, 8/250, 7/250),平均贡献为avg(10/250, 8/250, 7/250)=25/250/3 = 0.033

由于seller表较小,可以broadcast给所有worker进行worker内join,无需shuffle,提高效率。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
df = sales.join(F.broadcast(sellers), on='seller_id')\
    .withColumn('contribution', F.col('num_pieces_sold')/F.col('daily_target'))
df.groupby('seller_id').agg({'contribution':'avg'}).show()

# +---------+--------------------+
# |seller_id|   avg(contribution)|
# +---------+--------------------+
# |        7|2.595228787788170...|
# |        3| 1.62888537056594E-4|
# |        8|9.213030375408861E-5|
# |        0|2.019885898946922...|
# |        5|4.211073965904022E-5|
# |        6|4.782147194369122E-5|
# |        9|3.837913136180238E-5|
# |        1|1.964233366461014...|
# |        4|3.296428039825817E-5|
# |        2|6.690408001060484E-5|
# +---------+--------------------+

Example 4. UDF

Q1: 在sales数据中创建列,根据order_id和bill_raw_text计算每条销售数据bill的hash值,并查看是否有hash冲突。(udf)

hash规则: 当order_id为奇数,计算sha256;当order_id为偶数时,迭代计算md5,迭代次数为大写A出现的次数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import hashlib
from pyspark.sql.types import StringType

def fancy_hash(order_id, bill_raw_text):
    res = bill_raw_text.encode('utf-8')
    # 1) even case
    if int(order_id) % 2 == 0:
        cnt_A = bill_raw_text.count('A')
        for _ in range(cnt_A):
            res = hashlib.md5(res).hexdigest().encode('utf-8')
        res = res.decode('utf-8')
    else:
        res = hashlib.sha256(res).hexdigest()
    return res


# Convert the fancy_hash python function into its UDF
fancy_hash_udf = F.udf(lambda arg1, arg2: fancy_hash(arg1, arg2), StringType())

sales.withColumn('hashed_bill', fancy_hash_udf(sales['order_id'], sales['bill_raw_text']))\
        .groupby('hashed_bill').agg(F.count('*').alias('cnt')).filter(F.col('cnt') > 1).show()

# 返回为空,即无哈希冲突

Example 5. UDF

数据集DataFrame [startDate: timestamp, binaryIds: array<bigint>]

寻找最新的100个binaryId

1