網站首頁 編程語言 正文
正文
作為一個算法工程師,日常學習和工作中,不光要 訓練模型關注效果 ,更多的 時間 是在 準備樣本數據與分析數據 等,而這些過程 都與 大數據 spark和hadoop生態 的若干工具息息相關。
今天我們就不在更新 機器學習 和 算法模型 相關的內容,分享兩個 spark函數 吧,以前也在某種場景中使用過但沒有保存收藏,哎!! 事前不搜藏,臨時抱佛腳 的感覺 真是 痛苦,太耽誤干活了 。
so,把這 兩個函數 記在這里 以備不時 之需~
(1) 得到 spark dataframe 全局排序ID
這個函數的 應用場景 就是:根據某一列的數值對 spark 的 dataframe 進行排序, 得到全局多分區排序的全局有序ID,新增一列保存這個rank id ,并且保留別的列的數據無變化 。
有用戶會說,這不是很容易嗎 ,直接用 orderBy 不就可以了嗎,但是難點是:orderBy完記錄下全局ID 并且 保持原來全部列的DF數據 。
多說無益,遇到這個場景 直接copy 用起來 就知道 有多爽 了,同類問題 我們可以 用下面 這個函數 解決 ~
scala 寫的 spark 版本代碼:
def dfZipWithIndex( df: DataFrame, offset: Int = 1, colName: String ="rank_id", inFront: Boolean = true ) : DataFrame = { df.sqlContext.createDataFrame( df.rdd.zipWithIndex.map(ln => Row.fromSeq( (if (inFront) Seq(ln._2 + offset) else Seq()) ++ ln._1.toSeq ++ (if (inFront) Seq() else Seq(ln._2 + offset)) ) ), StructType( (if (inFront) Array(StructField(colName,LongType,false)) else Array[StructField]()) ++ df.schema.fields ++ (if (inFront) Array[StructField]() else Array(StructField(colName,LongType,false))) ) ) }
函數調用我們可以用這行代碼調用: val ranked_df = dfZipWithIndex(raw_df.orderBy($"predict_score".desc))
, 直接復制過去就可以~
python寫的 pyspark 版本代碼:
from pyspark.sql.types import LongType, StructField, StructType def dfZipWithIndex (df, offset=1, colName="rank_id"): new_schema = StructType( [StructField(colName,LongType(),True)] # new added field in front + df.schema.fields # previous schema ) zipped_rdd = df.rdd.zipWithIndex() new_rdd = zipped_rdd.map(lambda (row,rowId): ([rowId +offset] + list(row))) return spark.createDataFrame(new_rdd, new_schema)
調用 同理 , 這里我就不在進行贅述了。
(2)分組后保留最大值行
這個函數的 應用場景 就是: 當我們使用 spark 或則 sparkSQL 查找某個 dataframe 數據的時候,在某一天里,任意一個用戶可能有多條記錄,我們需要 對每一個用戶,保留dataframe 中 某列值最大 的那行數據 。
其中的 關鍵點 在于:一次性求出對每個用戶分組后,求得每個用戶的多行記錄中,某個值最大的行進行數據保留 。
當然,經過 簡單修改代碼,不一定是最大,最小也是可以的,平均都ok 。
scala 寫的 spark 版本代碼:
// 得到一天內一個用戶多個記錄里面時間最大的那行用戶的記錄 import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions val w = Window.partitionBy("user_id") val result_df = raw_df .withColumn("max_time",functions.max("time").over(w)) .where($"time" === $"max_time") .drop($"max_time")
python寫的 pyspark 版本代碼:
# pyspark dataframe 某列值最大的元素所在的那一行 # GroupBy 列并過濾 Pyspark 中某列值最大的行 # 創建一個Window 以按A列進行分區,并使用它來計算每個組的最大值。然后過濾出行,使 B 列中的值等于最大值 from pyspark.sql import Window w = Window.partitionBy('user_id') result_df = spark.sql(raw_df).withColumn('max_time', fun.max('time').over(w))\ .where(fun.col('time') == fun.col('time')) .drop('max_time')
我們可以看到: 這個函數的關鍵就是運用了 spark 的 window 函數 ,靈活運用 威力無窮 哦 !
原文鏈接:https://juejin.cn/post/7197736603719254076
相關推薦
- 2022-10-01 python實現圖像增強算法_python
- 2022-10-25 C++構建函數使用介紹_C 語言
- 2022-03-19 使用Docker部署Spring?Boot項目的實現步驟_docker
- 2022-02-11 安裝element UI (全局引入與按需引入)
- 2022-10-19 React封裝彈出框組件的方法_React
- 2022-05-10 FactoryBean配置文件定義的 類型 調用時返回 不同的類型
- 2023-04-18 go實現服務優雅關閉的示例_Golang
- 2023-07-24 vxe-grid實現 二維數據聯動
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支