網(wǎng)站首頁 編程語言 正文
自從計(jì)算機(jī)出現(xiàn)以來,我們一直在嘗試尋找計(jì)算機(jī)存儲一些信息的方法,存儲在計(jì)算機(jī)上的信息(也稱為數(shù)據(jù))有多種形式,數(shù)據(jù)變得如此重要,以至于信息現(xiàn)在已成為觸手可及的商品。多年來數(shù)據(jù)以多種方式存儲在計(jì)算機(jī)中,包括數(shù)據(jù)庫、blob存儲和其他方法,為了進(jìn)行有效的業(yè)務(wù)分析,必須對現(xiàn)代應(yīng)用程序創(chuàng)建的數(shù)據(jù)進(jìn)行處理和分析,并且產(chǎn)生的數(shù)據(jù)量非常巨大!有效地存儲數(shù)PB數(shù)據(jù)并擁有必要的工具來查詢它以便使用它至關(guān)重要,只有這樣對該數(shù)據(jù)的分析才能產(chǎn)生有意義的結(jié)果。
大數(shù)據(jù)是一門處理分析方法、有條不紊地從中提取信息或以其他方式處理對于典型數(shù)據(jù)處理應(yīng)用程序軟件而言過于龐大或復(fù)雜的數(shù)據(jù)量的方法的學(xué)科。為了處理現(xiàn)代應(yīng)用程序產(chǎn)生的數(shù)據(jù),大數(shù)據(jù)的應(yīng)用是非常必要的,考慮到這一點(diǎn),本博客旨在提供一個關(guān)于如何創(chuàng)建數(shù)據(jù)湖的小教程,該數(shù)據(jù)湖從應(yīng)用程序的數(shù)據(jù)庫中讀取任何更改并將其寫入數(shù)據(jù)湖中的相關(guān)位置,我們將為此使用的工具如下:
- Debezium
- MySQL
- Apache Kafka
- Apache Hudi
- Apache Spark
我們將要構(gòu)建的數(shù)據(jù)湖架構(gòu)如下:
第一步是使用 Debezium 讀取關(guān)系數(shù)據(jù)庫中發(fā)生的所有更改,并將所有更改推送到 Kafka 集群。
Debezium 是一個用于變更數(shù)據(jù)捕獲的開源分布式平臺,Debezium 可以指向任何關(guān)系數(shù)據(jù)庫,并且它可以開始實(shí)時捕獲任何數(shù)據(jù)更改,它非常快速且實(shí)用,由紅帽維護(hù)。
首先,我們將使用 docker-compose 在我們的機(jī)器上設(shè)置 Debezium、MySQL 和 Kafka,您也可以使用這些的獨(dú)立安裝,我們將使用 Debezium 提供給我們的 mysql 鏡像,因?yàn)槠渲幸呀?jīng)包含數(shù)據(jù),在任何生產(chǎn)環(huán)境中都可以使用適當(dāng)?shù)?Kafka、MySQL 和 Debezium 集群,docker compose 文件如下:
version: '2' services: zookeeper: image: debezium/zookeeper:${DEBEZIUM_VERSION} ports: - 2181:2181 - 2888:2888 - 3888:3888 kafka: image: debezium/kafka:${DEBEZIUM_VERSION} ports: - 9092:9092 links: - zookeeper environment: - ZOOKEEPER_CONNECT=zookeeper:2181 mysql: image: debezium/example-mysql:${DEBEZIUM_VERSION} ports: - 3307:3306 environment: - MYSQL_ROOT_PASSWORD=${MYSQL_ROOT_PASS} - MYSQL_USER=${MYSQL_USER} - MYSQL_PASSWORD=${MYSQL_USER_PASS} schema-registry: image: confluentinc/cp-schema-registry ports: - 8181:8181 - 8081:8081 environment: - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092 - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181 - SCHEMA_REGISTRY_HOST_NAME=schema-registry - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081 links: - zookeeper connect: image: debezium/connect:${DEBEZIUM_VERSION} ports: - 8083:8083 links: - kafka - mysql - schema-registry environment: - BOOTSTRAP_SERVERS=kafka:9092 - GROUP_ID=1 - CONFIG_STORAGE_TOPIC=my_connect_configs - OFFSET_STORAGE_TOPIC=my_connect_offsets - STATUS_STORAGE_TOPIC=my_connect_statuses - KEY_CONVERTER=io.confluent.connect.avro.AvroConverter - VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter - INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter - INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter - CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 - CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
DEBEZIUM_VERSION 可以設(shè)置為 1.8。 此外請確保設(shè)置 MYSQL_ROOT_PASS、MYSQL_USER 和 MYSQL_PASSWORD。
在我們繼續(xù)之前,我們將查看 debezium 鏡像提供給我們的數(shù)據(jù)庫 inventory 的結(jié)構(gòu),進(jìn)入數(shù)據(jù)庫的命令行:
docker-compose -f docker-compose-avro-mysql.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
在 shell 內(nèi)部,我們可以使用 show tables 命令。 輸出應(yīng)該是這樣的:
我們可以通過 select * from customers 命令來查看客戶表的內(nèi)容。 輸出應(yīng)該是這樣的:
現(xiàn)在在創(chuàng)建容器后,我們將能夠?yàn)?Kafka Connect 激活 Debezium 源連接器,我們將使用的數(shù)據(jù)格式是 Avro 數(shù)據(jù)格式,Avro 是在 Apache 的 Hadoop 項(xiàng)目中開發(fā)的面向行的遠(yuǎn)程過程調(diào)用和數(shù)據(jù)序列化框架。它使用 JSON 來定義數(shù)據(jù)類型和協(xié)議,并以緊湊的二進(jìn)制格式序列化數(shù)據(jù)。
讓我們用我們的 Debezium 連接器的配置創(chuàng)建另一個文件。
{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "MYSQL_USER", "database.password": "MYSQL_PASSWORD", "database.server.id": "184054", "database.server.name": "dbserver1", "database.include.list": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "schema-changes.inventory", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter.schema.registry.url": "http://schema-registry:8081" } }
正如我們所看到的,我們已經(jīng)在其中配置了數(shù)據(jù)庫的詳細(xì)信息以及要從中讀取更改的數(shù)據(jù)庫,確保將 MYSQL_USER 和 MYSQL_PASSWORD 的值更改為您之前配置的值,現(xiàn)在我們將運(yùn)行一個命令在 Kafka Connect 中注冊它,命令如下:
curl -i -X POST -H "Accept:application/json" -H "Content-type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json
現(xiàn)在,Debezium 應(yīng)該能夠從 Kafka 讀取數(shù)據(jù)庫更改。
下一步涉及使用 Spark 和 Hudi 從 Kafka 讀取數(shù)據(jù),并將它們以 Hudi 文件格式放入 Google Cloud Storage Bucket。 在我們開始使用它們之前,讓我們了解一下 Hudi 和 Spark 是什么。
Apache Hudi 是一個開源數(shù)據(jù)管理框架,用于簡化增量數(shù)據(jù)處理和數(shù)據(jù)管道開發(fā)。 該框架更有效地管理數(shù)據(jù)生命周期等業(yè)務(wù)需求并提高數(shù)據(jù)質(zhì)量。 Hudi 使您能夠在基于云的數(shù)據(jù)湖上管理記錄級別的數(shù)據(jù),以簡化更改數(shù)據(jù)捕獲 (CDC) 和流式數(shù)據(jù)攝取,并幫助處理需要記錄級別更新和刪除的數(shù)據(jù)隱私用例。 Hudi 管理的數(shù)據(jù)集使用開放存儲格式存儲在云存儲桶中,而與 Presto、Apache Hive 和/或 Apache Spark 的集成使用熟悉的工具提供近乎實(shí)時的更新數(shù)據(jù)訪問
Apache Spark 是用于大規(guī)模數(shù)據(jù)處理的開源統(tǒng)一分析引擎。 Spark 為具有隱式數(shù)據(jù)并行性和容錯性的集群編程提供了一個接口。 Spark 代碼庫最初是在加州大學(xué)伯克利分校的 AMPLab 開發(fā)的,后來被捐贈給了 Apache 軟件基金會,該基金會一直在維護(hù)它。
現(xiàn)在,由于我們正在 Google Cloud 上構(gòu)建解決方案,因此最好的方法是使用 Google Cloud Dataproc。 Google Cloud Dataproc 是一種托管服務(wù),用于處理大型數(shù)據(jù)集,例如大數(shù)據(jù)計(jì)劃中使用的數(shù)據(jù)集。 Dataproc 是 Google 的公共云產(chǎn)品 Google Cloud Platform 的一部分。 Dataproc 幫助用戶處理、轉(zhuǎn)換和理解大量數(shù)據(jù)。
在 Google Dataproc 實(shí)例中,預(yù)裝了 Spark 和所有必需的庫。 創(chuàng)建實(shí)例后,我們可以在其中運(yùn)行以下 Spark 作業(yè)來完成我們的管道:
spark-submit \ --packages org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-avro_2.12:3.1.2 \ --master yarn --deploy-mode client \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /usr/lib/hadoop/hudi-packages/hudi-utilities-bundle_2.12-0.10.1.jar \ --table-type COPY_ON_WRITE --op UPSERT \ --target-base-path gs://your-data-lake-bucket/hudi/customers \ --target-table hudi_customers --continuous \ --min-sync-interval-seconds 60 \ --source-class org.apache.hudi.utilities.sources.debezium.MysqlDebeziumSource \ --source-ordering-field _event_origin_ts_ms \ --hoodie-conf schema.registry.url=http://localhost:8081 \ --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/dbserver1.inventory.customers-value/versions/latest \ --hoodie-conf hoodie.deltastreamer.source.kafka.topic=dbserver1.inventory.customers \ --hoodie-conf bootstrap.servers=localhost:9092 \ --hoodie-conf auto.offset.reset=earliest \ --hoodie-conf hoodie.datasource.write.recordkey.field=id \ --hoodie-conf hoodie.datasource.write.partitionpath.field=id \
這將運(yùn)行一個 spark 作業(yè),該作業(yè)從我們之前推送到的 Kafka 中獲取數(shù)據(jù)并將其寫入 Google Cloud Storage Bucket。 我們必須指定 Kafka 主題、Schema Registry URL 和其他相關(guān)配置。
結(jié)論
可以通過多種方式構(gòu)建數(shù)據(jù)湖。 我試圖展示如何使用 Debezium、Kafka、Hudi、Spark 和 Google Cloud 構(gòu)建數(shù)據(jù)湖。 使用這樣的設(shè)置,可以輕松擴(kuò)展管道以管理大量數(shù)據(jù)工作負(fù)載! 有關(guān)每種技術(shù)的更多詳細(xì)信息,可以訪問文檔。 可以自定義 Spark 作業(yè)以獲得更細(xì)粒度的控制。 這里顯示的 Hudi 也可以與 Presto、Hive 或 Trino 集成。 定制的數(shù)量是無窮無盡的。 本文提供了有關(guān)如何使用上述工具構(gòu)建基本數(shù)據(jù)管道的基本介紹!
原文鏈接:https://www.cnblogs.com/leesf456/p/16110660.html
相關(guān)推薦
- 2022-04-25 python?實(shí)現(xiàn)兩個字符串乘法小練習(xí)_python
- 2022-12-24 Docker網(wǎng)絡(luò)模型以及容器通信詳解續(xù)篇_docker
- 2022-07-19 tomcat升級 遇到的坑 運(yùn)行tomcat的時候出現(xiàn)NosuchMethodError
- 2022-06-11 python?針對在子文件夾中的md文檔實(shí)現(xiàn)批量md轉(zhuǎn)word_python
- 2023-04-24 一文掌握python中的__init__的意思及使用場景分析_python
- 2021-11-08 深入解析golang中的標(biāo)準(zhǔn)庫flag_Golang
- 2022-09-04 python實(shí)現(xiàn)自動生成C++代碼的代碼生成器_python
- 2022-07-01 Python查詢?nèi)笔е档?種方法總結(jié)_python
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡單動態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支