日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網(wǎng)站首頁 編程語言 正文

Spark repartitionAndSortWithinPartitions

作者:南風知我意丿 更新時間: 2022-09-05 編程語言

Spark算子 repartitionAndSortWithinPartitions

  • 算子含義
  • spark.java.api
    • flatMapToPair + repartitionAndSortWithinPartitions
  • spark.scala.api
    • flatMapRdd + OrderedRDDFunctions + repartitionAndSortWithinPartitions
    • 關于OrderedRDDFunctions

算子含義

根據(jù)給定的分區(qū)器對 RDD 進行重新分區(qū),并在每個生成的分區(qū)中,按記錄的key對記錄進行排序。這比調(diào)用 repartition 然后在每個分區(qū)內(nèi)排序更有效,因為它可以將排序向下推到 shuffle 機器中。
如果需要在repartition重分區(qū)之后,還要進行排序,建議直接使用repartitionAndSortWithinPartitions算子
必須是 <k,v> 類型的rdd才可以調(diào)用

建議使用javaApi,scala的優(yōu)點麻煩的

在這里插入圖片描述

spark.java.api

flatMapToPair + repartitionAndSortWithinPartitions

  JavaPairRDD<ImmutableBytesWritable, KeyValue> hFileRdd = ds.javaRDD()
                .flatMapToPair(new PairFlatMapFunction<Row, ImmutableBytesWritable, KeyValue>() {
                    @Override
                    public Iterator<Tuple2<ImmutableBytesWritable, KeyValue>> call(Row row) throws Exception {
                        String rowKey = row.getString(0); // 按照約定,第一個字段是rowKey

                        ArrayList<Tuple2<ImmutableBytesWritable, KeyValue>> list = new ArrayList<>();

                        for (int i = 1; i < row.length(); i++) {
                            String fieldName = row.schema().fields()[i].name();
                            String columnFamily = fieldName.split(":")[0];
                            String qualifier = fieldName.split(":")[1];
                            String value = String.valueOf(row.get(i));
                            KeyValue keyValue = new KeyValue(
                                    Bytes.toBytes(rowKey),
                                    Bytes.toBytes(columnFamily),
                                    Bytes.toBytes(qualifier),
                                    Bytes.toBytes(value));
                            list.add(new Tuple2<>(new ImmutableBytesWritable(Bytes.toBytes(rowKey)), keyValue));
                        }

                        return list.iterator();
                    }
                });

//可以直接調(diào)用 repartitionAndSortWithinPartitions
JavaPairRDD<ImmutableBytesWritable, KeyValue> repartitioned =
                    hFileRdd.repartitionAndSortWithinPartitions(new RegionPartitioner(regionSplits.toArray(new String[regionSplits.size()])));

spark.scala.api

flatMapRdd + OrderedRDDFunctions + repartitionAndSortWithinPartitions

 val flatMapRdd: RDD[(ImmutableBytesWritable, KeyValue)] = df.rdd.flatMap(row => {

      val rowkey: Int = row.getInt(0)
      val columnFamily: String = "i"

      val list = new util.ArrayList[(ImmutableBytesWritable, KeyValue)]()

      for (i <- 0 until row.length) {
        val fieldName: String = row.schema.fields(i).name
        val qualifier: String = fieldName.split(":")(0)
        val value: String = String.valueOf(row.get(i))
        val keyValue = new KeyValue(
          Bytes.toBytes(rowkey),
          Bytes.toBytes(columnFamily),
          Bytes.toBytes(qualifier),
          Bytes.toBytes(value)
        )
        list.add(new Tuple2[ImmutableBytesWritable, KeyValue](new ImmutableBytesWritable(Bytes.toBytes(rowkey)), keyValue))
      }
      list.asScala
    })

 //必須要加
    implicit val caseInsensitiveOrdering = new Ordering[ImmutableBytesWritable] {
      override def compare(x: ImmutableBytesWritable, y: ImmutableBytesWritable): Int = x.compareTo(y)
    }

//此時才可以調(diào)用 repartitionAndSortWithinPartitions
flatMapRdd.repartitionAndSortWithinPartitions(new RegionPartitioner())

也可以不加

 def main(args: Array[String]): Unit = {
    val session: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName).master("local").getOrCreate()
    val sc: SparkContext = session.sparkContext
    val rdd: RDD[String] = sc.makeRDD(List("spark", "flink", "hbase", "kafka"))

    val pairRdd: RDD[(Long, String)] = rdd.zipWithIndex().map(f => (f._2, f._1))

    val rsRdd: RDD[(Long, String)] = pairRdd.repartitionAndSortWithinPartitions(new HashPartitioner(2))

    rsRdd.collect().foreach(println)

    session.close()
	 
	-------------------------- 輸出-------------------------
		(0,spark)
		(2,hbase)
		(1,flink)
		(3,kafka)

關于OrderedRDDFunctions

/**
(key, value) 對的 RDD 上可用的額外函數(shù),其中鍵可通過隱式轉換進行排序。
它們將適用于在范圍內(nèi)具有隱式 Ordering[K] 的任何鍵類型 K。
所有標準基本類型都已存在排序對象。用戶還可以為自定義類型定義自己的排序,或覆蓋默認排序。將使用最近范圍內(nèi)的隱式排序
demo:
import org.apache.spark.SparkContext._

   val rdd: RDD[(String, Int)] = ...
   implicit val caseInsensitiveOrdering = new Ordering[String] {
     override def compare(a: String, b: String) =
       a.toLowerCase(Locale.ROOT).compare(b.toLowerCase(Locale.ROOT))
   }

   // Sort by key, using the above case insensitive ordering.
   rdd.sortByKey()
**/
class OrderedRDDFunctions[K : Ordering : ClassTag,
                          V: ClassTag,
                          P <: Product2[K, V] : ClassTag] @DeveloperApi() (
    self: RDD[P])
  extends Logging with Serializable {
  private val ordering = implicitly[Ordering[K]]

原文鏈接:https://blog.csdn.net/Lzx116/article/details/125127422

欄目分類
最近更新