日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

Spark中緩存和檢查點的區別

作者:阿興呢!!! 更新時間: 2022-08-30 編程語言

Spark中緩存和檢查點的區別???

一、緩存
cache和persist是RDD的兩個API,cache底層調用的就是persist,區別在于cache不能指定緩存方式,只能緩存在內存中,但是persist可以指定緩存方式,比如:緩存在內存中、內存和磁盤并序列化等。同時不會切斷RDD的血緣關系,當Executor發生宕機時,會重新根據依賴關系讀取數據。

二、checkpoint
本質上是將RDD(數據)長久地保存在磁盤文件,從而做檢查點(通常是checkpoint到HDFS上)。在生產環境中,往往在RDD上會執行各種操作,使得DAG圖會拉的非常長,為防止中間某個環節出問題而影響正常的計算,會使用checkpoint機制將重要的RDD存到HDFS上,這樣,出錯以后就不必要回到最初的RDD重新計算,而是可以直接從checkpoint位置重新運算。

三、緩存和檢查點的區別
1、都是lazy操作,只有action算子觸發以后才會真正的進行緩存或者checkpoint。lazy操作時Spark的一個重要特性,不僅適用于RDD還適用于Spark SQL。
2、cache只緩存數據,不改變lineage(血緣),但比較容易丟失數據。
3、checkpoint改變原有的lineage(血緣),生成新的checkpointRDD,通常存于HDFS中,因為可以利用 HDFS 多副本特性保證容錯

Examples:

1.緩存cache()的使用

object Test01 {
    def main(args: Array[String]): Unit = {
        val sc = new SparkContext(new SparkConf().setAppName("Demo").setMaster("local"))
        val rdd1 = sc.textFile("file:///D:\\IdeaProgram\\Bigdata\\file\\Spark\\wc\\input")
        //在這里設置一個cache緩存,只緩存結果,建議緩存完之后執行action,再去執行轉換算子+執行算子
        val rdd2 = rdd1.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _).cache()
        //這里如果不設置緩存,每次執行完一個action以后再去執行下一個action的時候都要從頭開始進行計算
        //如果設置了緩存,每次執行完一個action以后再去執行下一個action的時候直接從緩存為止讀取數據
        rdd2.collect()
        rdd2.foreach(println(_))
        rdd2.take(2)
    }
}

2.persist()的使用

import org.apache.spark.storage._
 
 def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local").setAppName("DISK_ONLY")
        val sc = new  SparkContext(conf);
        sc.setLogLevel("ERROR")
        val a = sc.parallelize(1 to 9, 3).persist(StorageLevel.DISK_ONLY)
        println(a.first()) 
    }

一旦 driver program 執行結束,也就是 executor 所在進程 CoarseGrainedExecutorBackend stop,blockManager也會stop;被緩存到磁盤上的RDD也會被清空(整個blockManager使用的local文件夾被刪除 )。

3.checkpoint()的使用

checkpoint檢查點機制 
檢查點(本質就是通過將RDD寫入Disk做檢查點)是為了通過lineage(血統)做容錯的輔助,lineage過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯,如果之后有節點問題而丟失分區,從做查點的RDD開始重做lineage,就會減少開銷檢查點通過將數據寫入到HDFS文件系統實現了RDD的檢查點功能.
 
檢查點,類似于快照,chekpoint的作?就是將DAG中?較重要的數據做?個檢查點,將結果存儲到?個?可?的地?
//檢查點的使用
def main(args: Array[String]): Unit = {
 val conf = new SparkConf().setAppName("SparkDemo").setMaster("local")
 val sc = new SparkContext(conf)
 //設置檢查點的路徑
 sc.setCheckpointDir("hdfs://hadoop01:8020/ck")
val rdd = sc.textFile("hdfs://hadoop01:8020/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
//檢查點的觸發?定要使?個action算?
 rdd.checkpoint()
 rdd.saveAsTextFile("hdfs://hadoop01:8020/out10")
 println(rdd.getCheckpointFile) //查看存儲的位置
 /**
 查看是否可以設置檢查點 rdd.isCheckpointed 這個?法在shell中可以使? 但是代碼中不好?
 */
}

4、使用心得體會

緩存和檢查點的區別
緩存把RDD計算出來的然后放在內存,但是RDD的依賴鏈不能丟掉,當某個exexutor宕機時,上面cache的RDD就會丟掉,需要通過依賴鏈放入重新計算,不同的是,checkpoint是把RDD保存在HDFS上,是多副本可靠存儲,所以依賴鏈可以丟掉,就是斬斷了依賴鏈,是通過復制實現的高容錯

cache和persist的比較
1.cache底層調用的是persist
2.cache默認持久化等級是內存且不能修改,persist可以修改持久化的等級
什么時候使用cache或checkpoint
1.某步驟計算特別耗時
2.計算鏈條特別長
3.發生shuffle之后
一般情況建議使用cache或是persist模式,因為不需要創建存儲位置,默認存儲到內存中計算速度快,而checkpoint需要手動創建存儲位置和手動刪除數據,若數據量非常龐大建議使用checkpoint

? Task包括ResultTask(最后一個階段的任務) + ShuffleMapTask(非最后一個階段)

原文鏈接:https://blog.csdn.net/qq_43727170/article/details/125651393

欄目分類
最近更新