網(wǎng)站首頁(yè) 編程語(yǔ)言 正文
數(shù)據(jù)讀取與保存
Text文件
對(duì)于 Text文件的讀取和保存 ,其語(yǔ)法和實(shí)現(xiàn)是最簡(jiǎn)單的,因此我只是簡(jiǎn)單敘述一下這部分相關(guān)知識(shí)點(diǎn),大家可以結(jié)合demo具體分析記憶。
1)基本語(yǔ)法
(1)數(shù)據(jù)讀取:textFile(String)
(2)數(shù)據(jù)保存:saveAsTextFile(String)
2)實(shí)現(xiàn)代碼demo如下:
object Operate_Text { def main(args: Array[String]): Unit = { //1.創(chuàng)建SparkConf并設(shè)置App名稱 val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]") //2.創(chuàng)建SparkContext,該對(duì)象是提交Spark App的入口 val sc: SparkContext = new SparkContext(conf) //3.1 讀取輸入文件 val inputRDD: RDD[String] = sc.textFile("input/demo.txt") //3.2 保存數(shù)據(jù) inputRDD.saveAsTextFile("textFile") //4.關(guān)閉連接 sc.stop() } }
Sequence文件
SequenceFile文件 是Hadoop中用來(lái)存儲(chǔ)二進(jìn)制形式的 key-value對(duì) 的一種平面文件(Flat File)。在SparkContext中,可以通過(guò)調(diào)用 sequenceFile[ keyClass,valueClass ] (path) 來(lái)調(diào)用。
1)基本語(yǔ)法
- (1)數(shù)據(jù)讀取:sequenceFile[ keyClass, valueClass ] (path)
- (2)數(shù)據(jù)保存:saveAsSequenceFile(String)
2)實(shí)現(xiàn)代碼demo如下:
object Operate_Sequence { def main(args: Array[String]): Unit = { //1.創(chuàng)建SparkConf并設(shè)置App名稱 val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]") //2.創(chuàng)建SparkContext,該對(duì)象是提交Spark App的入口 val sc: SparkContext = new SparkContext(conf) //3.1 創(chuàng)建rdd val dataRDD: RDD[(Int, Int)] = sc.makeRDD(Array((1,2,3),(4,5,6),(7,8,9))) //3.2 保存數(shù)據(jù)為SequenceFile dataRDD.saveAsSequenceFile("seqFile") //3.3 讀取SequenceFile文件 sc.sequenceFile[Int,Int]("seqFile").collect().foreach(println) //4.關(guān)閉連接 sc.stop() } }
Object對(duì)象文件
對(duì)象文件是將對(duì)象序列化后保存的文件,采用Hadoop的序列化機(jī)制。可以通過(guò) objectFile[ k , v ] (path) 函數(shù)接收一個(gè)路徑,讀取對(duì)象文件,返回對(duì)應(yīng)的RDD,也可以通過(guò)調(diào)用 saveAsObjectFile() 實(shí)現(xiàn)對(duì)對(duì)象文件的輸出。因?yàn)橐蛄谢砸付愋汀?/p>
1)基本語(yǔ)法
- (1)數(shù)據(jù)讀取:objectFile[ k , v ] (path)
- (2)數(shù)據(jù)保存:saveAsObjectFile(String)
2)實(shí)現(xiàn)代碼demo如下:
object Operate_Object { def main(args: Array[String]): Unit = { //1.創(chuàng)建SparkConf并設(shè)置App名稱 val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]") //2.創(chuàng)建SparkContext,該對(duì)象是提交Spark App的入口 val sc: SparkContext = new SparkContext(conf) //3.1 創(chuàng)建RDD val dataRDD: RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6),2) //3.2 保存數(shù)據(jù) dataRDD.saveAsObjectFile("objFile") //3.3 讀取數(shù)據(jù) sc.objectFile[Int]("objFile").collect().foreach(println) //4.關(guān)閉連接 sc.stop() } }
累加器
累加器概念
累加器,是一種變量---分布式共享只寫(xiě)變量。僅支持“add”,支持并發(fā),但Executor和Executor之間不能讀數(shù)據(jù),可實(shí)現(xiàn)所有分片處理時(shí)更新共享變量的功能。
累加器用來(lái)把Executor端變量信息聚合到Driver端。在Driver中定義的一個(gè)變量,在Executor端的每個(gè)task都會(huì)得到這個(gè)變量的一份新的副本,每個(gè)task更新這些副本的值后,傳回Driver端進(jìn)行合并計(jì)算。
系統(tǒng)累加器
1)累加器定義(SparkContext.accumulator(initialValue)方法)
val sum: LongAccumulator = sc.longAccumulator("sum")
2)累加器添加數(shù)據(jù)(累加器.add方法)
sum.add(count)
3)累加器獲取數(shù)據(jù)(累加器.value)
sum.value
注意:Executor端的任務(wù)不能讀取累加器的值(例如:在Executor端調(diào)用sum.value,獲取的值不是累加器最終的值)。因此我們說(shuō),累加器是一個(gè)分布式共享只寫(xiě)變量。
4)累加器要放在行動(dòng)算子中
因?yàn)檗D(zhuǎn)換算子執(zhí)行的次數(shù)取決于job的數(shù)量,如果一個(gè) spark應(yīng)用 有多個(gè)行動(dòng)算子,那么轉(zhuǎn)換算子中的累加器可能會(huì)發(fā)生不止一次更新,導(dǎo)致結(jié)果錯(cuò)誤。所以,如果想要一個(gè)無(wú)論在失敗還是重復(fù)計(jì)算時(shí)都絕對(duì)可靠的累加器,必須把它放在foreach()這樣的行動(dòng)算子中。
5) 代碼實(shí)現(xiàn):
object accumulator_system { package com.atguigu.cache import org.apache.spark.rdd.RDD import org.apache.spark.util.LongAccumulator import org.apache.spark.{SparkConf, SparkContext} object accumulator_system { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]") val sc = new SparkContext(conf) val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4))) //需求:統(tǒng)計(jì)a出現(xiàn)的所有次數(shù) ("a",10) //普通算子實(shí)現(xiàn) reduceByKey 代碼會(huì)走shuffle 效率低 val rdd: RDD[(String, Int)] = dataRDD.reduceByKey(_ + _) //累加器實(shí)現(xiàn) //1 聲明累加器 val accSum: LongAccumulator = sc.longAccumulator("sum") dataRDD.foreach{ case (a,count) => { //2 使用累加器累加 累加器.add() accSum.add(count) // 4 不在executor端獲取累加器的值,因?yàn)榈玫降闹挡粶?zhǔn)確,所以累加器叫分布式共享只寫(xiě)變量 //println("sum = " + accSum.value) } } //3 獲取累加器的值 累加器.value println(("a",accSum.value)) sc.stop() } }
原文鏈接:https://juejin.cn/post/7159579713738899486
相關(guān)推薦
- 2022-06-25 Python+PuLP實(shí)現(xiàn)線性規(guī)劃的求解_python
- 2022-09-10 ELK收集Tomcat日志的實(shí)現(xiàn)_Tomcat
- 2022-08-06 python用pd.read_csv()方法來(lái)讀取csv文件的實(shí)現(xiàn)_python
- 2023-06-21 Android崩潰日志收集和保存解析_Android
- 2022-06-04 Dashboard管理Kubernetes集群與API訪問(wèn)配置_云和虛擬化
- 2022-10-21 Python?NumPy教程之?dāng)?shù)組的基本操作詳解_python
- 2022-06-22 React表單容器的通用解決方案_React
- 2022-12-08 Flask框架運(yùn)用Ajax實(shí)現(xiàn)數(shù)據(jù)交互的示例代碼_python
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過(guò)濾器
- Spring Security概述快速入門(mén)
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡(jiǎn)單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對(duì)象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支