日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

SparkStreaming寫入Hive慢

作者:南風知我意丿 更新時間: 2022-09-05 編程語言

文章目錄

  • 項目場景:
  • 問題描述
  • 原因分析:
    • 分析
      • hive的MV策略如下:
      • hdfs mv原理
  • 解決方案:
    • 方案一:修改臨時目錄
    • 方案二:

項目場景:

spark streaming從 Kafka 消費數據,寫到 Hive 表。


問題描述

數據量級上億,SparkStreaming 的 bath time 為 1 min, 在某一個時刻開始出現任務堆積,即大量任務處于 Queued 狀態,卡在了某個 job,最長延遲時間為 1.7 h。

查看 job 狀態一直處于 processing, 但是發現該 job 寫 hive 的時間也就花費了 30 秒左右,但是該 job 最終執行完的時間遠遠大于這個時間。

慢慢的,每一批次都要慢幾分鐘,出現堆積,最終造成數據大面積延遲。


原因分析:

  • Spark源碼:
    在這里插入圖片描述
  • hive源碼
 // If source path is a subdirectory of the destination path (or the other way around):
    //   ex: INSERT OVERWRITE DIRECTORY 'target/warehouse/dest4.out' SELECT src.value WHERE src.key >= 300;
    //   where the staging directory is a subdirectory of the destination directory
    // (1) Do not delete the dest dir before doing the move operation.
    // (2) It is assumed that subdir and dir are in same encryption zone.
    // (3) Move individual files from scr dir to dest dir.
    boolean srcIsSubDirOfDest = isSubDir(srcf, destf, srcFs, destFs, isSrcLocal),
        destIsSubDirOfSrc = isSubDir(destf, srcf, destFs, srcFs, false);
    final String msg = "Unable to move source " + srcf + " to destination " + destf;
    try {
      if (replace) {
        try{
          //if destf is an existing directory:
          //if replace is true, delete followed by rename(mv) is equivalent to replace
          //if replace is false, rename (mv) actually move the src under dest dir
          //if destf is an existing file, rename is actually a replace, and do not need
          // to delete the file first
          if (replace && !srcIsSubDirOfDest) {
            destFs.delete(destf, true);
            LOG.debug("The path " + destf.toString() + " is deleted");
          }
        } catch (FileNotFoundException ignore) {
        }
      }
      final SessionState parentSession = SessionState.get();
      if (isSrcLocal) {
        // For local src file, copy to hdfs
        destFs.copyFromLocalFile(srcf, destf);
        return true;
      } else {

分析

閱讀上面的源碼,可以發現往 Hive 中寫數據的時候會在目標表中(1.1 版本之后是默認位置目標表的文件夾)生成一個以.hive-staging 開頭的lin時文件夾,結果會在臨時文件夾存放。執行完成后會,將臨時文件夾 rename,放到對應的目標表文件下。

這里的 rename 并不是直接修改 hive 元數據那么簡單。是在特定條件下才會執行 mv file 的,否則還是會 copy file 的形式。

臨時文件就直接放在目標表對應的目錄下面了,所以最后執行的 copy 操作,如果文件多或者數據量大的情況下,會很慢。

hive的MV策略如下:

 1.原文件是非hdfs文件,copyFromLocal

   2.原文件是hdfs文件

          2.1   Encrypted模式

                copy操作,如果文件大于默認值(32MB),則會進行distcp操作。

         2.2  非Encrypted模式

            (1)原目錄是目標目錄的子目錄,原目錄下的每個文件進行copy操作,如果文件大于默認值(32MB),則會進行distcp操作。

            (2)其他情況,進行mv操作。

hdfs mv原理

當用戶調用hdfs dfs -mv時,HDFS保證重命名操作的原子性.運行此命令時,客戶端對NameNode進行RPC調用.此RPC的NameNode實現在修改inode樹時保持鎖定,并且只有在重命名完成后才會成功鎖定或成功鎖定. (由于許可或配額違規等原因,它可能會失敗.)

由于實現完全在NameNode內執行并且僅操縱文件系統元數據,因此不涉及實際的數據移動.實際上,在hdfs dfs -mv命令期間,沒有與DataNode的交互.所有文件的塊保持不變,與inode關聯的阻止列表保持不變. NameNode只是從一個位置獲取該文件的inode,并將其移動到文件系統樹中的另一個位置.不會破壞塊數據


解決方案:

方案一:修改臨時目錄

臨時文件不要放在目標表對應的目錄下面了,此時會執行 mv 操作,不涉及文件的移動,這樣就會很快。

<property>  
    <name>hive.exec.stagingdir</name>    
    <value>/tmp/hive/.hive-staging</value>  
    <description>hive任務生成臨時文件夾地址</description>
</property>

<property>          
    <name>hive.insert.into.multilevel.dirs</name> 
     <value>true</value>  
     <description>hive.insert.into.mulltilevel.dirs設置成false的時候,insert 目標目錄的上級目錄必須存在;trued的時候允許不存在</description>
 </property>

方案二:

spark 直接落文件到 HDFS的對應分區中 ,hive 表見外部表與數據進行關聯。這種就不依賴與 hive 了,減少中間環節。這是,盡可能的規避小文件,需要盡可能減少文件個數。

參考:
http://t.csdn.cn/0Rmyv

原文鏈接:https://blog.csdn.net/Lzx116/article/details/126499665

欄目分類
最近更新