網站首頁 編程語言 正文
Spark中緩存和檢查點的區別???
一、緩存
cache和persist是RDD的兩個API,cache底層調用的就是persist,區別在于cache不能指定緩存方式,只能緩存在內存中,但是persist可以指定緩存方式,比如:緩存在內存中、內存和磁盤并序列化等。同時不會切斷RDD的血緣關系,當Executor發生宕機時,會重新根據依賴關系讀取數據。
二、checkpoint
本質上是將RDD(數據)長久地保存在磁盤文件,從而做檢查點(通常是checkpoint到HDFS上)。在生產環境中,往往在RDD上會執行各種操作,使得DAG圖會拉的非常長,為防止中間某個環節出問題而影響正常的計算,會使用checkpoint機制將重要的RDD存到HDFS上,這樣,出錯以后就不必要回到最初的RDD重新計算,而是可以直接從checkpoint位置重新運算。
三、緩存和檢查點的區別
1、都是lazy操作,只有action算子觸發以后才會真正的進行緩存或者checkpoint。lazy操作時Spark的一個重要特性,不僅適用于RDD還適用于Spark SQL。
2、cache只緩存數據,不改變lineage(血緣),但比較容易丟失數據。
3、checkpoint改變原有的lineage(血緣),生成新的checkpointRDD,通常存于HDFS中,因為可以利用 HDFS 多副本特性保證容錯
Examples:
1.緩存cache()的使用
object Test01 {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("Demo").setMaster("local"))
val rdd1 = sc.textFile("file:///D:\\IdeaProgram\\Bigdata\\file\\Spark\\wc\\input")
//在這里設置一個cache緩存,只緩存結果,建議緩存完之后執行action,再去執行轉換算子+執行算子
val rdd2 = rdd1.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _).cache()
//這里如果不設置緩存,每次執行完一個action以后再去執行下一個action的時候都要從頭開始進行計算
//如果設置了緩存,每次執行完一個action以后再去執行下一個action的時候直接從緩存為止讀取數據
rdd2.collect()
rdd2.foreach(println(_))
rdd2.take(2)
}
}
2.persist()的使用
import org.apache.spark.storage._
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("DISK_ONLY")
val sc = new SparkContext(conf);
sc.setLogLevel("ERROR")
val a = sc.parallelize(1 to 9, 3).persist(StorageLevel.DISK_ONLY)
println(a.first())
}
一旦 driver program 執行結束,也就是 executor 所在進程 CoarseGrainedExecutorBackend stop,blockManager也會stop;被緩存到磁盤上的RDD也會被清空(整個blockManager使用的local文件夾被刪除 )。
3.checkpoint()的使用
checkpoint檢查點機制
檢查點(本質就是通過將RDD寫入Disk做檢查點)是為了通過lineage(血統)做容錯的輔助,lineage過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯,如果之后有節點問題而丟失分區,從做查點的RDD開始重做lineage,就會減少開銷檢查點通過將數據寫入到HDFS文件系統實現了RDD的檢查點功能.
檢查點,類似于快照,chekpoint的作?就是將DAG中?較重要的數據做?個檢查點,將結果存儲到?個?可?的地?
//檢查點的使用
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkDemo").setMaster("local")
val sc = new SparkContext(conf)
//設置檢查點的路徑
sc.setCheckpointDir("hdfs://hadoop01:8020/ck")
val rdd = sc.textFile("hdfs://hadoop01:8020/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
//檢查點的觸發?定要使?個action算?
rdd.checkpoint()
rdd.saveAsTextFile("hdfs://hadoop01:8020/out10")
println(rdd.getCheckpointFile) //查看存儲的位置
/**
查看是否可以設置檢查點 rdd.isCheckpointed 這個?法在shell中可以使? 但是代碼中不好?
*/
}
4、使用心得體會
緩存和檢查點的區別
緩存把RDD計算出來的然后放在內存,但是RDD的依賴鏈不能丟掉,當某個exexutor宕機時,上面cache的RDD就會丟掉,需要通過依賴鏈放入重新計算,不同的是,checkpoint是把RDD保存在HDFS上,是多副本可靠存儲,所以依賴鏈可以丟掉,就是斬斷了依賴鏈,是通過復制實現的高容錯
cache和persist的比較
1.cache底層調用的是persist
2.cache默認持久化等級是內存且不能修改,persist可以修改持久化的等級
什么時候使用cache或checkpoint
1.某步驟計算特別耗時
2.計算鏈條特別長
3.發生shuffle之后
一般情況建議使用cache或是persist模式,因為不需要創建存儲位置,默認存儲到內存中計算速度快,而checkpoint需要手動創建存儲位置和手動刪除數據,若數據量非常龐大建議使用checkpoint
? Task包括ResultTask(最后一個階段的任務) + ShuffleMapTask(非最后一個階段)
原文鏈接:https://blog.csdn.net/qq_43727170/article/details/125651393
相關推薦
- 2022-06-29 C語言實例梳理講解常用關鍵字的用法_C 語言
- 2022-11-14 Git暫存區的意義或git add的意義
- 2022-11-15 TypeScript數組實現棧與對象實現棧的區別詳解_其它
- 2022-07-13 Docker技術_Docker與傳統虛擬機以及傳統容器的差異
- 2023-01-05 Kotlin的空安全處理方式詳解_Android
- 2022-08-25 Python進階Matplotlib庫圖繪制_python
- 2022-06-26 Android?app啟動節點與上報啟動實例詳解_Android
- 2022-09-12 Python提取Word中圖片的實現步驟_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支