網站首頁 編程語言 正文
數據讀取與保存
Text文件
對于 Text文件的讀取和保存 ,其語法和實現是最簡單的,因此我只是簡單敘述一下這部分相關知識點,大家可以結合demo具體分析記憶。
1)基本語法
(1)數據讀取:textFile(String)
(2)數據保存:saveAsTextFile(String)
2)實現代碼demo如下:
object Operate_Text { def main(args: Array[String]): Unit = { //1.創建SparkConf并設置App名稱 val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]") //2.創建SparkContext,該對象是提交Spark App的入口 val sc: SparkContext = new SparkContext(conf) //3.1 讀取輸入文件 val inputRDD: RDD[String] = sc.textFile("input/demo.txt") //3.2 保存數據 inputRDD.saveAsTextFile("textFile") //4.關閉連接 sc.stop() } }
Sequence文件
SequenceFile文件 是Hadoop中用來存儲二進制形式的 key-value對 的一種平面文件(Flat File)。在SparkContext中,可以通過調用 sequenceFile[ keyClass,valueClass ] (path) 來調用。
1)基本語法
- (1)數據讀取:sequenceFile[ keyClass, valueClass ] (path)
- (2)數據保存:saveAsSequenceFile(String)
2)實現代碼demo如下:
object Operate_Sequence { def main(args: Array[String]): Unit = { //1.創建SparkConf并設置App名稱 val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]") //2.創建SparkContext,該對象是提交Spark App的入口 val sc: SparkContext = new SparkContext(conf) //3.1 創建rdd val dataRDD: RDD[(Int, Int)] = sc.makeRDD(Array((1,2,3),(4,5,6),(7,8,9))) //3.2 保存數據為SequenceFile dataRDD.saveAsSequenceFile("seqFile") //3.3 讀取SequenceFile文件 sc.sequenceFile[Int,Int]("seqFile").collect().foreach(println) //4.關閉連接 sc.stop() } }
Object對象文件
對象文件是將對象序列化后保存的文件,采用Hadoop的序列化機制。可以通過 objectFile[ k , v ] (path) 函數接收一個路徑,讀取對象文件,返回對應的RDD,也可以通過調用 saveAsObjectFile() 實現對對象文件的輸出。因為要序列化所以要指定類型。
1)基本語法
- (1)數據讀取:objectFile[ k , v ] (path)
- (2)數據保存:saveAsObjectFile(String)
2)實現代碼demo如下:
object Operate_Object { def main(args: Array[String]): Unit = { //1.創建SparkConf并設置App名稱 val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]") //2.創建SparkContext,該對象是提交Spark App的入口 val sc: SparkContext = new SparkContext(conf) //3.1 創建RDD val dataRDD: RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6),2) //3.2 保存數據 dataRDD.saveAsObjectFile("objFile") //3.3 讀取數據 sc.objectFile[Int]("objFile").collect().foreach(println) //4.關閉連接 sc.stop() } }
累加器
累加器概念
累加器,是一種變量---分布式共享只寫變量。僅支持“add”,支持并發,但Executor和Executor之間不能讀數據,可實現所有分片處理時更新共享變量的功能。
累加器用來把Executor端變量信息聚合到Driver端。在Driver中定義的一個變量,在Executor端的每個task都會得到這個變量的一份新的副本,每個task更新這些副本的值后,傳回Driver端進行合并計算。
系統累加器
1)累加器定義(SparkContext.accumulator(initialValue)方法)
val sum: LongAccumulator = sc.longAccumulator("sum")
2)累加器添加數據(累加器.add方法)
sum.add(count)
3)累加器獲取數據(累加器.value)
sum.value
注意:Executor端的任務不能讀取累加器的值(例如:在Executor端調用sum.value,獲取的值不是累加器最終的值)。因此我們說,累加器是一個分布式共享只寫變量。
4)累加器要放在行動算子中
因為轉換算子執行的次數取決于job的數量,如果一個 spark應用 有多個行動算子,那么轉換算子中的累加器可能會發生不止一次更新,導致結果錯誤。所以,如果想要一個無論在失敗還是重復計算時都絕對可靠的累加器,必須把它放在foreach()這樣的行動算子中。
5) 代碼實現:
object accumulator_system { package com.atguigu.cache import org.apache.spark.rdd.RDD import org.apache.spark.util.LongAccumulator import org.apache.spark.{SparkConf, SparkContext} object accumulator_system { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]") val sc = new SparkContext(conf) val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4))) //需求:統計a出現的所有次數 ("a",10) //普通算子實現 reduceByKey 代碼會走shuffle 效率低 val rdd: RDD[(String, Int)] = dataRDD.reduceByKey(_ + _) //累加器實現 //1 聲明累加器 val accSum: LongAccumulator = sc.longAccumulator("sum") dataRDD.foreach{ case (a,count) => { //2 使用累加器累加 累加器.add() accSum.add(count) // 4 不在executor端獲取累加器的值,因為得到的值不準確,所以累加器叫分布式共享只寫變量 //println("sum = " + accSum.value) } } //3 獲取累加器的值 累加器.value println(("a",accSum.value)) sc.stop() } }
原文鏈接:https://juejin.cn/post/7159579713738899486
相關推薦
- 2022-12-03 Android開發數據結構算法ArrayList源碼詳解_Android
- 2023-02-02 redis中的配置以及密碼設置方式_Redis
- 2022-06-17 Ruby操作CSV格式數據方法詳解_ruby專題
- 2022-03-01 格式化日期‘年月日-時分秒’方法
- 2022-04-22 Mac環境下使用CLion調試redis 6.X源碼
- 2022-12-23 react數據管理機制React.Context源碼解析_React
- 2023-02-12 Jupyter?notebook如何實現打開數據集_python
- 2022-09-08 pandas時間序列之如何將int轉換成datetime格式_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支