日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

詳解Flink同步Kafka數據到ClickHouse分布式表_數據庫其它

作者:大數據技術派 ? 更新時間: 2022-12-28 編程語言

引言

業務需要一種OLAP引擎,可以做到實時寫入存儲和查詢計算功能,提供高效、穩健的實時數據服務,最終決定ClickHouse

什么是ClickHouse?

ClickHouse是一個用于聯機分析(OLAP)的列式數據庫管理系統(DBMS)。

列式數據庫更適合于OLAP場景(對于大多數查詢而言,處理速度至少提高了100倍),下面詳細解釋了原因(通過圖片更有利于直觀理解),圖片來源于ClickHouse中文官方文檔。

行式

列式

我們使用Flink編寫程序,消費kafka里面的主題數據,清洗、歸一,寫入到clickhouse里面去。

這里的關鍵點,由于第一次使用,無法分清應該建立什么格式的clickhouse表,出現了一些問題,最大的問題就是程序將數據寫入了,查詢發現數據不完整,只有一部分。我也在網上查了一些原因,總結下來。

為什么有時看不到已經創建好的表并且查詢結果一直抖動時多時少?

常見原因1:

建表流程存在問題。ClickHouse的分布式集群搭建并沒有原生的分布式DDL語義。如果您在自建ClickHouse集群時使用create table創建表,查詢雖然返回了成功,但實際這個表只在當前連接的Server上創建了。下次連接重置換一個Server,您就看不到這個表了。

解決方案:

建表時,請使用create table <table_name> on cluster default語句,on cluster default聲明會把這條語句廣播給default集群的所有節點進行執行。示例代碼如下。 Create table test on cluster default (a UInt64) Engine = MergeTree() order by tuple(); 在test表上再創建一個分布式表引擎,建表語句如下。 Create table test_dis on cluster default as test Engine = Distributed(default, default, test, cityHash64(a));

常見原因2:

ReplicatedMergeTree存儲表配置有問題。ReplicatedMergeTree表引擎是對應MergeTree表引擎的主備同步增強版,在單副本實例上限定只能創建MergeTree表引擎,在雙副本實例上只能創建ReplicatedMergeTree表引擎。

解決方案:

在雙副本實例上建表時,請使用ReplicatedMergeTree(‘/clickhouse/tables/{database}/{table}/{shard}’, ‘{replica}’)或ReplicatedMergeTree()配置ReplicatedMergeTree表引擎。其中,ReplicatedMergeTree(‘/clickhouse/tables/{database}/{table}/{shard}’, ‘{replica}’)為固定配置,無需修改。

這里引出了復制表的概念,這里介紹一下,只有 MergeTree 系列里的表可支持副本:

ReplicatedMergeTree

ReplicatedSummingMergeTree

ReplicatedReplacingMergeTree

ReplicatedAggregatingMergeTree ReplicatedCollapsingMergeTree

ReplicatedVersionedCollapsingMergeTree

ReplicatedGraphiteMergeTree

副本是表級別的,不是整個服務器級的。所以,服務器里可以同時有復制表和非復制表。副本不依賴分片。每個分片有它自己的獨立副本。

創建復制表

先做好準備工作,該建表的建表,然后編寫程序。在表引擎名稱上加上 Replicated 前綴。例如:ReplicatedMergeTree。

  • 首先創建一個分布式數據庫
create database test on cluster default_cluster;
  • 創建本地表

由于clickhouse是分布式的,創建本地表本來應該在每個節點上創建的,但是指定on cluster關鍵字可以直接完成,建表語句如下:

CREATE TABLE test.test_data_shade on cluster default_cluster
(
    `data` Map(String, String),
    `uid` String,
    `remote_addr` String,
    `time` Datetime64,
    `status` Int32,
    ...其它字段省略
    `dt` String
)
ENGINE = ReplicatedMergeTree()
partition by dt
order by (dt, sipHash64(uid));

這里表引擎為ReplicatedMergeTree,即有副本的表,根據dt按天分區,提升查詢效率,sipHash64是一個hash函數,根據uid散列使得相同uid數據在同一個分片上面,如果有去重需求,速度更快,因為可以計算每個分片去重,再匯總一下即可。

  • 創建分布式表
CREATE TABLE test.test_data_all on cluster default_cluster as test.test_data_shade ENGINE = Distributed('default_cluster', 'test', 'test_data_shade', sipHash64(uid));

在多副本分布式 ClickHouse 集群中,通常需要使用 Distributed 表寫入或讀取數據,Distributed 表引擎自身不存儲任何數據,它能夠作為分布式表的一層透明代理,在集群內部自動開展數據的寫入、分發、查詢、路由等工作。

通過jdbc寫入

這個我是看的官方文檔,里面有2種選擇,感興趣的同學可以都去嘗試一下。

這里貼一下我的Pom依賴

<dependency>
    <groupId>ru.yandex.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.3.1-patch</version>
    <classifier>shaded</classifier>
    <exclusions>
        <exclusion>
            <groupId>*</groupId>
            <artifactId>*</artifactId>
        </exclusion>
    </exclusions>
</dependency>

Flink主程序,消費kafka,做清洗,然后寫入clickhouse,這都是常規操作,這里貼一下關鍵代碼吧。

連接clickhouse有2種方式,8123端口的http方式,和基于9000端口的tcp方式。

這里官方推薦的是連接驅動是0.3.2:

<dependency>
    <!-- please stop using ru.yandex.clickhouse as it's been deprecated -->
    <groupId>com.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.3.2-patch11</version>
    <classifier>all</classifier>
    <exclusions>
        <exclusion>
            <groupId>*</groupId>
            <artifactId>*</artifactId>
        </exclusion>
    </exclusions>
</dependency>

Note: ru.yandex.clickhouse.ClickHouseDriver has been deprecated and everything under ru.yandex.clickhouse will be removed in 0.3.3.

官方推薦升級到0.3.2,上面表格給出了升級方法,文檔地址:

github.com/ClickHouse/…

原文鏈接:https://juejin.cn/post/7171767779484237838

欄目分類
最近更新