網(wǎng)站首頁 編程語言 正文
Spark算子 repartitionAndSortWithinPartitions
- 算子含義
- spark.java.api
- flatMapToPair + repartitionAndSortWithinPartitions
- spark.scala.api
- flatMapRdd + OrderedRDDFunctions + repartitionAndSortWithinPartitions
- 關于OrderedRDDFunctions
算子含義
根據(jù)給定的分區(qū)器對 RDD 進行重新分區(qū),并在每個生成的分區(qū)中,按記錄的key對記錄進行排序。這比調(diào)用 repartition 然后在每個分區(qū)內(nèi)排序更有效,因為它可以將排序向下推到 shuffle 機器中。
如果需要在repartition重分區(qū)之后,還要進行排序,建議直接使用repartitionAndSortWithinPartitions算子
必須是 <k,v> 類型的rdd才可以調(diào)用
建議使用javaApi,scala的優(yōu)點麻煩的
spark.java.api
flatMapToPair + repartitionAndSortWithinPartitions
JavaPairRDD<ImmutableBytesWritable, KeyValue> hFileRdd = ds.javaRDD()
.flatMapToPair(new PairFlatMapFunction<Row, ImmutableBytesWritable, KeyValue>() {
@Override
public Iterator<Tuple2<ImmutableBytesWritable, KeyValue>> call(Row row) throws Exception {
String rowKey = row.getString(0); // 按照約定,第一個字段是rowKey
ArrayList<Tuple2<ImmutableBytesWritable, KeyValue>> list = new ArrayList<>();
for (int i = 1; i < row.length(); i++) {
String fieldName = row.schema().fields()[i].name();
String columnFamily = fieldName.split(":")[0];
String qualifier = fieldName.split(":")[1];
String value = String.valueOf(row.get(i));
KeyValue keyValue = new KeyValue(
Bytes.toBytes(rowKey),
Bytes.toBytes(columnFamily),
Bytes.toBytes(qualifier),
Bytes.toBytes(value));
list.add(new Tuple2<>(new ImmutableBytesWritable(Bytes.toBytes(rowKey)), keyValue));
}
return list.iterator();
}
});
//可以直接調(diào)用 repartitionAndSortWithinPartitions
JavaPairRDD<ImmutableBytesWritable, KeyValue> repartitioned =
hFileRdd.repartitionAndSortWithinPartitions(new RegionPartitioner(regionSplits.toArray(new String[regionSplits.size()])));
spark.scala.api
flatMapRdd + OrderedRDDFunctions + repartitionAndSortWithinPartitions
val flatMapRdd: RDD[(ImmutableBytesWritable, KeyValue)] = df.rdd.flatMap(row => {
val rowkey: Int = row.getInt(0)
val columnFamily: String = "i"
val list = new util.ArrayList[(ImmutableBytesWritable, KeyValue)]()
for (i <- 0 until row.length) {
val fieldName: String = row.schema.fields(i).name
val qualifier: String = fieldName.split(":")(0)
val value: String = String.valueOf(row.get(i))
val keyValue = new KeyValue(
Bytes.toBytes(rowkey),
Bytes.toBytes(columnFamily),
Bytes.toBytes(qualifier),
Bytes.toBytes(value)
)
list.add(new Tuple2[ImmutableBytesWritable, KeyValue](new ImmutableBytesWritable(Bytes.toBytes(rowkey)), keyValue))
}
list.asScala
})
//必須要加
implicit val caseInsensitiveOrdering = new Ordering[ImmutableBytesWritable] {
override def compare(x: ImmutableBytesWritable, y: ImmutableBytesWritable): Int = x.compareTo(y)
}
//此時才可以調(diào)用 repartitionAndSortWithinPartitions
flatMapRdd.repartitionAndSortWithinPartitions(new RegionPartitioner())
也可以不加
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName).master("local").getOrCreate()
val sc: SparkContext = session.sparkContext
val rdd: RDD[String] = sc.makeRDD(List("spark", "flink", "hbase", "kafka"))
val pairRdd: RDD[(Long, String)] = rdd.zipWithIndex().map(f => (f._2, f._1))
val rsRdd: RDD[(Long, String)] = pairRdd.repartitionAndSortWithinPartitions(new HashPartitioner(2))
rsRdd.collect().foreach(println)
session.close()
-------------------------- 輸出-------------------------
(0,spark)
(2,hbase)
(1,flink)
(3,kafka)
關于OrderedRDDFunctions
/**
(key, value) 對的 RDD 上可用的額外函數(shù),其中鍵可通過隱式轉換進行排序。
它們將適用于在范圍內(nèi)具有隱式 Ordering[K] 的任何鍵類型 K。
所有標準基本類型都已存在排序對象。用戶還可以為自定義類型定義自己的排序,或覆蓋默認排序。將使用最近范圍內(nèi)的隱式排序
demo:
import org.apache.spark.SparkContext._
val rdd: RDD[(String, Int)] = ...
implicit val caseInsensitiveOrdering = new Ordering[String] {
override def compare(a: String, b: String) =
a.toLowerCase(Locale.ROOT).compare(b.toLowerCase(Locale.ROOT))
}
// Sort by key, using the above case insensitive ordering.
rdd.sortByKey()
**/
class OrderedRDDFunctions[K : Ordering : ClassTag,
V: ClassTag,
P <: Product2[K, V] : ClassTag] @DeveloperApi() (
self: RDD[P])
extends Logging with Serializable {
private val ordering = implicitly[Ordering[K]]
原文鏈接:https://blog.csdn.net/Lzx116/article/details/125127422
- 上一篇:淺談異常分類及異常處理機制
- 下一篇:Hbase 之KeyValue結構詳解
相關推薦
- 2022-04-28 C/C++的關鍵字之static你了解嗎_C 語言
- 2022-06-13 云計算openstack框架分類及發(fā)展階段概述_OpenStack
- 2022-09-23 深入了解C++的多態(tài)與虛函數(shù)_C 語言
- 2022-12-05 GPU狀態(tài)監(jiān)測?nvidia-smi?命令的用法詳解_python
- 2022-09-25 Clion配置STM32開發(fā)環(huán)境printf函數(shù)打印浮點數(shù)快速設置方法
- 2023-10-15 理解C/C++中的鏈接
- 2022-08-17 R語言繪制corrplot相關熱圖分析美化示例及詳細圖解_R語言
- 2022-07-22 如何處理SQL Server中附加數(shù)據(jù)庫時出現(xiàn)的錯誤
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細win安裝深度學習環(huán)境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結構-簡單動態(tài)字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支