介绍
Apache Spark 的 Python API 称为PySpark。要在 Python 中开发 Spark 应用程序,我们将使用 PySpark。它还提供用于实时数据分析的 Pyspark shell。
PySpark 支持大部分 Apache Spark功能,包括 Spark Core、SparkSQL、DataFrame、Streaming、MLlib(机器学习)和 MLlib(机器学习)。
本文将通过基于场景的示例探索有用的 PySpark 函数,以更好地理解它们。
expr() 函数
它是 PySpark 中的一个 SQL 函数,用于执行 SQL 表达式。它将接受 SQL 表达式作为字符串参数并执行语句中编写的命令。它支持使用 PySpark Column 类型和 pyspark.sql.functions API 中不存在的类似 SQL 的函数。
例如:𝐂𝐀𝐒𝐄𝐖𝐇𝐄𝐍。我们可以在表达式中使用𝐃𝐚𝐭𝐚𝐅𝐫𝐚𝐦𝐞 𝐜𝐨𝐥𝐮𝐦𝐧𝐬。这个函数的语法是𝐞𝐱𝐩𝐫(𝐬𝐭𝐫)。
# importing necessary libs
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
# creating session
spark = SparkSession.builder.appName("practice").getOrCreate()
# create data
data = [("Prashant","Banglore",25, 58, "2022-08-01", 1),
("Ankit","Banglore",26,54,"2021-05-02", 2),
("Ramakant","Gurugram",24, 60, "2022-06-02", 3),
("Brijesh","Gazipur", 26,75,"2022-07-04", 4),
("Devendra","Gurugram", 27, 62, "2022-04-03", 5),
("Ajay","Chandigarh", 25,72,"2022-02-01", 6)]
columns= ["friends_name","location", "age", "weight", "meetup_date", "offset"]
df_friends = spark.createDataFrame(data = data, schema = columns)
df_friends.show()
![](http://qiniu.aihubs.net/38928Screenshot (95).png)
让我们看看实际的实现:
示例: A) 使用 expr() 连接一列或多列
# concate friend's name, age, and location columns using expr()
df_concat = df_friends.withColumn("name-age-location",expr("friends_name|| '-'|| age || '-' || location"))
df_concat.show()
我们加入了姓名、年龄和位置列,并将结果存储在一个名为“name-age-location”的新列中。
示例:B) 使用 expr() 根据条件 (CASE WHEN) 添加新列
# check if exercise needed based on weight
# if weight is more or equal to 60 -- Yes
# if weight is less than 55 -- No
# else -- Enjoy
df_condition = df_friends.withColumn("Exercise_Need", expr("CASE WHEN weight >= 60 THEN 'Yes' " + "WHEN weight < 55 THEN 'No' ELSE 'Enjoy' END"))
df_condition.show()
根据 CASE WHEN 中给出的条件,我们的“Exercise_Need”列收到了三个值(Enjoy、No 和 Yes)。权重列的第一个值为 58,因此小于 60 且大于 55,因此结果为“Enjoy”。
示例:C) 使用表达式中的当前列值创建一个新列。
# let increment the meetup month by the number of offset
df_meetup = df_friends.withColumn("new_meetup_date", expr("add_months(meetup_date,offset)"))
df_meetup.show()
“ meetup_date”月份值增加偏移值,新生成的结果存储在“new_meetup_date”列中。
填充函数
A.) lpad():
该函数为列的左侧提供填充,该函数的输入是列名、长度和填充字符串。
B.) rpad ():
此函数用于在列的右侧添加填充。列名、长度和填充字符串是此函数的附加输入。
注意:
- 如果列值长于指定长度,则返回值将缩短为长度字符或字节。
- 如果未指定填充值,则列值将根据你使用的函数向左或向右填充,如果是字符串则使用空格字符,如果是字节序列则使用零。
让我们首先创建一个数据框:
# importing necessary libs
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lpad, rpad
# creating session
spark = SparkSession.builder.appName("practice").getOrCreate()
# creating data
data = [("Delhi",30000),("Mumbai",50000),("Gujrat",80000)]
columns= ["state_name","state_population"]
df_states = spark.createDataFrame(data = data, schema = columns)
df_states.show()
示例:01 – 使用左填充
# left padding
df_states = df_states.withColumn('states_name_leftpad', lpad(col("state_name"), 10, '#'))
df_states.show(truncate =False)
我们 在“ state_name”列值的左侧添加了“#”符号,填充后列值的总长度变为“ 10”。
示例:02 – 右填充
# right padding
df_states = df_states.withColumn('states_name_rightpad', rpad(col("state_name"), 10, '#'))
df_states.show(truncate =False)
我们在“state_name”列值的右侧添加了 “#”符号,右侧填充后总长度变为 10。
示例:03 – 当列字符串长度大于填充字符串长度时
df_states = df_states.withColumn('states_name_condition', lpad(col("state_name"), 3, '#'))
df_states.show(truncate =False)
在这种情况下,返回的列值将被缩短为填充字符串长度的长度。你可以看到“state_name_condition”列只有长度为 3 的值,这是我们在函数中给出的填充长度。
repeat()函数
在 PySpark 中,我们使用 repeat() 函数来复制列值。repeat(str,n) 函数返回包含重复 n 次的指定字符串值的字符串。
示例: 01
# importing necessary libs
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, repeat
# creating session
spark = SparkSession.builder.appName("practice").getOrCreate()
# # create data
data = [("Prashant",25, 80), ("Ankit",26, 90),("Ramakant", 24, 85)]
columns= ["student_name", "student_age", "student_score"]
df_students = spark.createDataFrame(data = data, schema = columns)
df_students.show()
# repeating the column (student_name) twice and saving results in new column
df_repeated = df_students.withColumn("student_name_repeated",(expr("repeat(student_name, 2)")))
df_repeated.show()
我们重复了上述示例中的“student_name”列值两次。
我们还可以将此函数与 Concat 函数一起使用,在该函数中,我们可以在列值之前重复某些字符串值 n 次,就像填充一样,其中 n 可能是某些值的长度。
startwith () 和endswith () 函数
startswith()
它将产生 True 或 False 的布尔结果。当 Dataframe 列值以作为此方法的参数提供的字符串结尾时,它返回 True。如果未找到匹配项,则返回 False。
endswith()
将返回布尔值 (True/False)。当 DataFrame 列值以作为此方法输入的字符串结尾时,它返回 True。如果不匹配,则返回 False。
注意:
- 如果列值或输入字符串中的任何一个是 𝐍𝐔𝐋𝐋,则返回 𝐍𝐔𝐋𝐋。
- 如果输入检查字符串为空,则返回 𝗧𝗿𝘂𝗲。
- 这些方法区分大小写。
创建一个数据框:
# importing necessary libs
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# creating session
spark = SparkSession.builder.appName("practice").getOrCreate()
# # create dataframe
data = [("Prashant",25, 80), ("Ankit",26, 90),("Ramakant", 24, 85), (None, 23, 87)]
columns= ["student_name", "student_age", "student_score"]
df_students = spark.createDataFrame(data = data, schema = columns)
df_students.show()
示例 01 首先,检查输出类型。
df_internal_res = df_students.select(col("student_name").endswith("it").alias("internal_bool_val"))
df_internal_res.show()
- 输出是一个布尔值。
- 最后一行值的输出值为 null,因为“students_name”列的对应值为 NULL。
示例 02
- 现在我们使用 filter() 方法来获取对应于 True 值的行。
df_check_start = df_students.filter(col("student_name").startswith("Pra"))
df_check_start.show()
在这里,我们得到了第一行作为输出,因为“student_name”列的值以函数内部提到的值开头。
示例 03
df_check_end = df_students.filter(col("student_name").endswith("ant"))
df_check_end.show()
在这里,我们得到了两行作为输出,因为“student_name”列的值以函数内部提到的值结尾。
示例 04 如果函数中的参数为空怎么办?
df_check_empty = df_students.filter(col("student_name").endswith(""))
df_check_empty.show()
在这种情况下,我们得到了每一行对应的 True 值,并且没有返回 False 值。
结论
在本文中,我们从定义 PySpark 及其特性开始讨论。然后我们讨论函数、它们的定义和它们的语法。在讨论了每个函数之后,我们创建了一个数据框并使用它练习了一些示例。我们在本文中介绍了六个函数。
本文的主要内容是:
- 我在 PySpark 中使用“ expr ”函数将列与类似 SQL 的表达式连接起来。
- 我们在上述函数中将列的名称作为字符串传递。
- 使用表达式中的列值创建一个新列。
- 向列值添加填充。
- 使用重复函数多次重复列值。
- 我们还检查了列值是否以特定单词开头或结尾。