網(wǎng)站首頁 編程語言 正文
詳解Flink同步Kafka數(shù)據(jù)到ClickHouse分布式表_數(shù)據(jù)庫其它
作者:大數(shù)據(jù)技術(shù)派 ? 更新時間: 2022-12-28 編程語言引言
業(yè)務(wù)需要一種OLAP引擎,可以做到實時寫入存儲和查詢計算功能,提供高效、穩(wěn)健的實時數(shù)據(jù)服務(wù),最終決定ClickHouse
什么是ClickHouse?
ClickHouse是一個用于聯(lián)機(jī)分析(OLAP)的列式數(shù)據(jù)庫管理系統(tǒng)(DBMS)。
列式數(shù)據(jù)庫更適合于OLAP場景(對于大多數(shù)查詢而言,處理速度至少提高了100倍),下面詳細(xì)解釋了原因(通過圖片更有利于直觀理解),圖片來源于ClickHouse中文官方文檔。
行式
列式
我們使用Flink編寫程序,消費kafka里面的主題數(shù)據(jù),清洗、歸一,寫入到clickhouse里面去。
這里的關(guān)鍵點,由于第一次使用,無法分清應(yīng)該建立什么格式的clickhouse表,出現(xiàn)了一些問題,最大的問題就是程序?qū)?shù)據(jù)寫入了,查詢發(fā)現(xiàn)數(shù)據(jù)不完整,只有一部分。我也在網(wǎng)上查了一些原因,總結(jié)下來。
為什么有時看不到已經(jīng)創(chuàng)建好的表并且查詢結(jié)果一直抖動時多時少?
常見原因1:
建表流程存在問題。ClickHouse的分布式集群搭建并沒有原生的分布式DDL語義。如果您在自建ClickHouse集群時使用create table創(chuàng)建表,查詢雖然返回了成功,但實際這個表只在當(dāng)前連接的Server上創(chuàng)建了。下次連接重置換一個Server,您就看不到這個表了。
解決方案:
建表時,請使用create table <table_name> on cluster default語句,on cluster default聲明會把這條語句廣播給default集群的所有節(jié)點進(jìn)行執(zhí)行。示例代碼如下。 Create table test on cluster default (a UInt64) Engine = MergeTree() order by tuple(); 在test表上再創(chuàng)建一個分布式表引擎,建表語句如下。 Create table test_dis on cluster default as test Engine = Distributed(default, default, test, cityHash64(a));
常見原因2:
ReplicatedMergeTree存儲表配置有問題。ReplicatedMergeTree表引擎是對應(yīng)MergeTree表引擎的主備同步增強(qiáng)版,在單副本實例上限定只能創(chuàng)建MergeTree表引擎,在雙副本實例上只能創(chuàng)建ReplicatedMergeTree表引擎。
解決方案:
在雙副本實例上建表時,請使用ReplicatedMergeTree(‘/clickhouse/tables/{database}/{table}/{shard}’, ‘{replica}’)或ReplicatedMergeTree()配置ReplicatedMergeTree表引擎。其中,ReplicatedMergeTree(‘/clickhouse/tables/{database}/{table}/{shard}’, ‘{replica}’)為固定配置,無需修改。
這里引出了復(fù)制表的概念,這里介紹一下,只有 MergeTree 系列里的表可支持副本:
ReplicatedMergeTree
ReplicatedSummingMergeTree
ReplicatedReplacingMergeTree
ReplicatedAggregatingMergeTree ReplicatedCollapsingMergeTree
ReplicatedVersionedCollapsingMergeTree
ReplicatedGraphiteMergeTree
副本是表級別的,不是整個服務(wù)器級的。所以,服務(wù)器里可以同時有復(fù)制表和非復(fù)制表。副本不依賴分片。每個分片有它自己的獨立副本。
創(chuàng)建復(fù)制表
先做好準(zhǔn)備工作,該建表的建表,然后編寫程序。在表引擎名稱上加上 Replicated 前綴。例如:ReplicatedMergeTree。
- 首先創(chuàng)建一個分布式數(shù)據(jù)庫
create database test on cluster default_cluster;
- 創(chuàng)建本地表
由于clickhouse是分布式的,創(chuàng)建本地表本來應(yīng)該在每個節(jié)點上創(chuàng)建的,但是指定on cluster關(guān)鍵字可以直接完成,建表語句如下:
CREATE TABLE test.test_data_shade on cluster default_cluster ( `data` Map(String, String), `uid` String, `remote_addr` String, `time` Datetime64, `status` Int32, ...其它字段省略 `dt` String ) ENGINE = ReplicatedMergeTree() partition by dt order by (dt, sipHash64(uid));
這里表引擎為ReplicatedMergeTree,即有副本的表,根據(jù)dt按天分區(qū),提升查詢效率,sipHash64是一個hash函數(shù),根據(jù)uid散列使得相同uid數(shù)據(jù)在同一個分片上面,如果有去重需求,速度更快,因為可以計算每個分片去重,再匯總一下即可。
- 創(chuàng)建分布式表
CREATE TABLE test.test_data_all on cluster default_cluster as test.test_data_shade ENGINE = Distributed('default_cluster', 'test', 'test_data_shade', sipHash64(uid));
在多副本分布式 ClickHouse 集群中,通常需要使用 Distributed 表寫入或讀取數(shù)據(jù),Distributed 表引擎自身不存儲任何數(shù)據(jù),它能夠作為分布式表的一層透明代理,在集群內(nèi)部自動開展數(shù)據(jù)的寫入、分發(fā)、查詢、路由等工作。
通過jdbc寫入
這個我是看的官方文檔,里面有2種選擇,感興趣的同學(xué)可以都去嘗試一下。
這里貼一下我的Pom依賴
<dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.3.1-patch</version> <classifier>shaded</classifier> <exclusions> <exclusion> <groupId>*</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency>
Flink主程序,消費kafka,做清洗,然后寫入clickhouse,這都是常規(guī)操作,這里貼一下關(guān)鍵代碼吧。
連接clickhouse有2種方式,8123端口的http方式,和基于9000端口的tcp方式。
這里官方推薦的是連接驅(qū)動是0.3.2:
<dependency> <!-- please stop using ru.yandex.clickhouse as it's been deprecated --> <groupId>com.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.3.2-patch11</version> <classifier>all</classifier> <exclusions> <exclusion> <groupId>*</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency>
Note: ru.yandex.clickhouse.ClickHouseDriver has been deprecated and everything under ru.yandex.clickhouse will be removed in 0.3.3.
官方推薦升級到0.3.2,上面表格給出了升級方法,文檔地址:
github.com/ClickHouse/…
原文鏈接:https://juejin.cn/post/7171767779484237838
相關(guān)推薦
- 2022-09-19 Python+LyScript實現(xiàn)自定義反匯編_python
- 2022-12-26 Python常用標(biāo)準(zhǔn)庫之os模塊功能_python
- 2022-11-12 python中validators庫的使用方法詳解_python
- 2022-10-12 pandas學(xué)習(xí)之df.set_index的具體使用_python
- 2022-04-07 C#中的串口通信SerialPort詳解_C#教程
- 2022-04-10 微信小程序與普通網(wǎng)頁的區(qū)別是什么
- 2022-06-22 Android中TextView動態(tài)設(shè)置縮進(jìn)距離的方法_Android
- 2022-07-11 spring boot + shiro 無需redis自定義token生成
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡單動態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支