網站首頁 編程語言 正文
1. 摘要
本文演示了使用外部表集成 Vertica 和 Apache Hudi。 在演示中我們使用 Spark 上的 Apache Hudi 將數據攝取到 S3 中,并使用 Vertica 外部表訪問這些數據。
2. Apache Hudi介紹
Apache Hudi 是一種變更數據捕獲 (CDC) 工具,可在不同時間線將事務記錄在表中。 Hudi 代表 Hadoop Upserts Deletes and Incrementals,是一個開源框架。 Hudi 提供 ACID 事務、可擴展的元數據處理,并統一流和批處理數據處理。
以下流程圖說明了該過程。 使用安裝在 Apache Spark 上的 Hudi 將數據處理到 S3,并從 Vertica 外部表中讀取 S3 中的數據更改。
3. 環境準備
Apache Spark 環境。 使用具有 1 個 Master 和 3 個 Worker 的 4 節點集群進行了測試。 按照在多節點集群上設置 Apache Spark 中的說明安裝 Spark 集群環境。 啟動 Spark 多節點集群。
Vertica 分析數據庫。 使用 Vertica Enterprise 11.0.0 進行了測試。
AWS S3 或 S3 兼容對象存儲。 使用 MinIO 作為 S3 存儲桶進行了測試。
需要以下 jar 文件。將 jar 復制到 Spark 機器上任何需要的位置,將這些 jar 文件放在 /opt/spark/jars 中。
Hadoop - hadoop-aws-2.7.3.jar
AWS - aws-java-sdk-1.7.4.jar
在 Vertica 數據庫中運行以下命令來設置訪問存儲桶的 S3 參數:
SELECT SET_CONFIG_PARAMETER('AWSAuth', 'accesskey:secretkey'); SELECT SET_CONFIG_PARAMETER('AWSRegion','us-east-1'); SELECT SET_CONFIG_PARAMETER('AWSEndpoint','<S3_IP>:9000'); SELECT SET_CONFIG_PARAMETER('AWSEnableHttps','0');
endpoint可能會有所不同,具體取決于 S3 存儲桶位置選擇的 S3 對象存儲。
4. Vertica和Apache Hudi集成
要將 Vertica 與 Apache Hudi 集成,首先需要將 Apache Spark 與 Apache Hudi 集成,配置 jars,以及訪問 AWS S3 的連接。 其次,將 Vertica 連接到 Apache Hudi。 然后對 S3 存儲桶執行 Insert、Append、Update 等操作。
按照以下部分中的步驟將數據寫入 Vertica。
在 Apache Spark 上配置 Apache Hudi 和 AWS S3
配置 Vertica 和 Apache Hudi 集成
4.1 在 Apache Spark 上配置 Apache Hudi 和 AWS S3
在 Apache Spark 機器中運行以下命令。
這會下載 Apache Hudi 包,配置 jar 文件,以及 AWS S3
/opt/spark/bin/spark-shell \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"\--packages org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:3.0.1
導入Hudi的讀、寫等所需的包:
import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._
使用以下命令根據需要配置 Minio 訪問密鑰、Secret key、Endpoint 和其他 S3A 算法和路徑。
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "*****") spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "*****") spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "http://XXXX.9000") spark.sparkContext.hadoopConfiguration.set("fs.s3a.path.style.access", "true") sc.hadoopConfiguration.set("fs.s3a.signing-algorithm","S3SignerType")
創建變量來存儲 MinIO 的表名和 S3 路徑。
val tableName = “Trips” val basepath = “s3a://apachehudi/vertica/”
準備數據,使用 Scala 在 Apache spark 中創建示例數據
val df = Seq( ("aaa","r1","d1",10,"US","20211001"), ("bbb","r2","d2",20,"Europe","20211002"), ("ccc","r3","d3",30,"India","20211003"), ("ddd","r4","d4",40,"Europe","20211004"), ("eee","r5","d5",50,"India","20211005"), ).toDF("uuid", "rider", "driver","fare","partitionpath","ts")
將數據寫入 AWS S3 并驗證此數據
df.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)
使用 Scala 運行以下命令以驗證是否從 S3 存儲桶中正確讀取數據。
spark.read.format("hudi").load(basePath).createOrReplaceTempView("dta") spark.sql("select _hoodie_commit_time, uuid, rider, driver, fare,ts, partitionpath from dta order by uuid").show()
4.2 配置 Vertica 和 Apache HUDI 集成
在 vertica 中創建一個外部表,其中包含來自 S3 上 Hudi 表的數據。 我們創建了“旅行”表。
CREATE EXTERNAL TABLE Trips ( _hoodie_commit_time TimestampTz, uuid varchar, rider varchar, driver varchar, fare int, ts varchar, partitionpath varchar ) AS COPY FROM 's3a://apachehudi/parquet/vertica/*/*.parquet' PARQUET;
運行以下命令以驗證正在讀取外部表:
4.3 如何讓 Vertica 查看更改的數據
以下部分包含為查看 Vertica 中更改的數據而執行的一些操作的示例。
4.3.1 寫入數據
在這個例子中,我們使用 Scala 在 Apache spark 中運行了以下命令并附加了一些數據:
val df2 = Seq( ("fff","r6","d6",50,"India","20211005") ).toDF("uuid", "rider", "driver","fare","partitionpath","ts")
運行以下命令將此數據附加到 S3 上的 Hudi 表中:
df2.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath)
4.3.2 更新數據
在這個例子中,我們更新了一條 Hudi 表的記錄。 需要導入數據以觸發并更新數據:
val df3 = Seq( ("aaa","r1","d1",100,"US","20211001"), ("eee","r5","d5",500,"India","20211001") ).toDF("uuid", "rider", "driver","fare","partitionpath","ts")
運行以下命令將數據更新到 S3 上的 HUDI 表:
df3.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath)
以下是 spark.sql 的輸出:
以下是 Vertica 輸出:
4.3.3 創建和查看數據的歷史快照
執行以下指向特定時間戳的 spark 命令:
val dd = spark.read .format("hudi") .option("as.of.instant", "20211007092600") .load(basePath)
使用以下命令將數據寫入 S3 中的 parquet:
dd.write.parquet("s3a://apachehudi/parquet/p2")
在此示例中,我們正在讀取截至“20211007092600”日期的 Hudi 表快照。
dd.show
通過在 parquet 文件上創建外部表從 Vertica 執行命令。
原文鏈接:https://www.cnblogs.com/leesf456/p/16072233.html
相關推薦
- 2023-02-10 Python常見錯誤:IndexError:?list?index?out?of?range解決_p
- 2023-02-15 Python?PyWebIO提升團隊效率使用介紹_python
- 2022-06-10 利用Python實現RSA加密解密方法實例_python
- 2021-12-02 C語言中幾種常量的認識和理解_C 語言
- 2022-03-20 C語言初階之數組詳細介紹_C 語言
- 2023-05-17 一文速學Python+Pyecharts繪制樹形圖_python
- 2022-11-04 C++淺析內存分區模型概念與示例_C 語言
- 2022-12-12 Android?DataBinding類關系深入探究_Android
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支