南宁上林网站建设,网站排名方法,站内搜索工具,慈溪企业排名网站目录 一、Filter方法 功能 语法 代码 总结 filter算子 二、distinct方法 功能 语法 代码 总结 distinct算子 三、SortBy方法 功能 语法 代码 总结 sortBy算子 四、数据计算练习 需求#xff1a; 解答 总结 去重函数#xff1a; 过滤函数#xff1a; 转换函数#xff1a; 排… 目录 一、Filter方法 功能 语法 代码 总结 filter算子 二、distinct方法 功能 语法 代码 总结 distinct算子 三、SortBy方法 功能 语法 代码 总结 sortBy算子 四、数据计算练习 需求 解答 总结 去重函数 过滤函数 转换函数 排序函数 于是我驻足享受无法复刻的一些瞬间 —— 24.11.9 一、Filter方法
功能
过滤想要的数据进行保留
语法
基于filter中我们传入的函数决定rdd对象中哪个保留哪个丢弃
代码
from pyspark import SparkConf,SparkContext# 设置spark中的python解释器对象
import os
os.environ[PYSPARK_PYTHON] E:/python.learning/pyt/scripts/python.execonf SparkConf().setMaster(local[*]).setAppName(test_spark)
sc SparkContext(confconf)# 准备一个RDD
rdd sc.parallelize([1,2,3,4,5,6,7,8,9,10])
# 对RDD的数据进行过滤保留奇数去除偶数# 方法1
def Retain(data):if data % 2 1:return Trueelse:return False# 对RDD数据进行过滤留下奇数
rdd1 rdd.filter(Retain)
print(rdd1.collect())# 方法2
rdd2 rdd.filter(lambda num:num % 2 1)
print(rdd2.collect()) 总结
filter算子
接受一个处理函数可用lambda匿名函数快速编写
函数对RDD数据逐个处理得到True的保留到返回值的RDD中 二、distinct方法
功能
对RDD数据进行去重返回新RDD
语法
rdd.distinct() # 无需传参
代码
from pyspark import SparkConf,SparkContext# 设置spark中的python解释器对象
import os
os.environ[PYSPARK_PYTHON] E:/python.learning/pyt/scripts/python.execonf SparkConf().setMaster(local[*]).setAppName(test_spark)
sc SparkContext(confconf)# 准备一个RDD
rdd sc.parallelize([1,3,3,4,4,4,7,8,9,9])
rdd rdd.distinct()
print(rdd.collect()) 总结
distinct算子
完成对Rdd内数据的去重操作 三、SortBy方法
功能
对RDD数据进行排序基于指定的排序依据
语法
rdd.sortBy()
rdd.sortBy(func, ascending False, numPartitions 1)
# func:(T) - U: 告知按照rdd中的哪个数据进行排序比如 lambda x:x[1] 表示按照rdd中的第二列元素进行排序
# ascending: True升序 False 降序
# numPartitions: 用多少分区排序
代码
from pyspark import SparkConf,SparkContext# 设置spark中的python解释器对象
import os
os.environ[PYSPARK_PYTHON] E:/python.learning/pyt/scripts/python.execonf SparkConf().setMaster(local[*]).setAppName(test_spark)
sc SparkContext(confconf)# 读取数据文件
rdd sc.textFile(D:/2LFE\Desktop\WordCount.txt)
# 取出全部单词
word_rdd rdd.flatMap(lambda x:x.split( ))
print(word_rdd.collect())# 将所有单词都转换成二元元组单词为keyvalue设置为1
word_with_one_rdd word_rdd.map(lambda word:(word,1))
# 分组并求和
result_rdd word_with_one_rdd.reduceByKey(lambda a,b:ab)
# 对结果进行排序
result_rdd result_rdd.sortBy(lambda x:x[1],ascending False,numPartitions 1)
# 打印并输出结果
print(result_rdd.collect()) 总结
sortBy算子
接收一个处理函数可用lambda快速编写
函数表示用来决定排序的依据
可以控制升序或降序
全局排序需要设置分区数为1 四、数据计算练习
需求
复制以上内容到文件中使用Spark读取文件进行计算
① 各个城市销售额排名从大到小
② 全部城市有哪些商品类别在售卖
③ 北京市有哪些商品类别在售卖
解答
from pyspark import SparkConf,SparkContext
import json# 设置spark中的python解释器对象
import os
os.environ[PYSPARK_PYTHON] E:/python.learning/pyt/scripts/python.execonf SparkConf().setMaster(local[*]).setAppName(test_spark)
sc SparkContext(confconf)# 读取文件得到RDD
file_rdd sc.textFile(E:\python.learning\pyspark\sortBy.txt)# 取出一个个JSON字符串
json_str_rdd file_rdd.flatMap(lambda x:x.split(|))# 将一个JSON字符串转换为字典 json模块
dict_rdd json_str_rdd.map(lambda x:json.loads(x))# 取出城市和销售额数据城市销售额
city_with_money_rdd dict_rdd.map(lambda x:(x[areaName],int(x[money])))# 按销售额对结果进行聚合然后根据销售额降序排序
city_result_rdd city_with_money_rdd.reduceByKey(lambda x,y:xy)
res1 city_result_rdd.sortBy(lambda x:x[1],ascending False,numPartitions 1)
print(需求1结果 , res1.collect())# 需求2 对全部商品进行去重
category_rdd dict_rdd.map(lambda x: x[category]).distinct()
print(需求2结果,category_rdd.collect())# 需求3 过滤北京市的数据
BJ_data_rdd dict_rdd.filter(lambda x:x[areaName] 北京)
print(需求3结果,BJ_data_rdd.collect())# 需求4 对北京市的商品类别进行商品类别去重
res2 BJ_data_rdd.map(lambda x:x[category]).distinct()
print(需求4结果,res2.collect()) 总结
去重函数
在 PySpark 框架下distinct函数用于返回一个新的 RDD其中包含原始 RDD 中的不同元素。
过滤函数
filter函数用于从弹性分布式数据集RDD中筛选出满足特定条件的元素返回一个新的 RDD 只包含满足条件的元素。
转换函数
在 PySpark 中map函数是对弹性分布式数据集RDD进行转换操作的一种重要方法。map函数对 RDD 中的每个元素应用一个函数返回一个新的 RDD其中包含应用函数后的结果。
排序函数
sortBy 函数用于对RDD 中的元素进行排序它接受一个函数或者一个字段名作为参数根据这个参数来确定排序的依据。