網站首頁 編程語言 正文
Flink 側流輸出源碼解析
Flink 的 side output 為我們提供了側流(分流)輸出的功能,根據條件可以把一條流分為多個不同的流,之后做不同的處理邏輯,下面就來看下側流輸出相關的源碼。
先來看下面的一個 Demo,一個流被分成了 3 個流,一個主流,兩個側流輸出。
SingleOutputStreamOperator<JasonLeePOJO> process =
kafka_source1.process(
new ProcessFunction<JasonLeePOJO, JasonLeePOJO>() {
@Override
public void processElement(
JasonLeePOJO value,
ProcessFunction<JasonLeePOJO, JasonLeePOJO>.Context ctx,
Collector<JasonLeePOJO> out)
throws Exception {
// 這個是主流輸出
if (value.getName().equals("flink")) {
out.collect(value);
// 下面兩個是測流輸出
} else if (value.getName().equals("spark")) {
ctx.output(test, value);
// 測流
} else if (value.getName().equals("hadoop")) {
ctx.output(test1, value);
}
}
});
為了更加清楚的查看每一個算子,我禁用了 operator chain,任務的 DAG 圖如下所示:
這樣就比較清晰了,很明顯從 process 算子開始,1 個數據流分為了 3 個數據流,當然,在默認情況下沒有禁止
operator chain 所有的算子都是 chain 在一起的。
源碼解析
我們先來看第一個主流輸出也就是 out.collect(value) 的源碼,這里的 out 實際上是 TimestampedCollector 對象。
TimestampedCollector#collect
@Override
public void collect(T record) {
output.collect(reuse.replace(record));
}
在 collect 方法中持有一個 output 對象,用來輸出數據,在這里實際上是一個 CountingOutput 它是一個包裝了 Output 的對象,主要用于更新發送數據的 metric,并輸出數據。
CountingOutput#collect
@Override
public void collect(StreamRecord<OUT> record) {
numRecordsOut.inc();
output.collect(record);
}
在 CountingOutput 中也持有一個 output 對象,但是這里的 output 是 BroadcastingOutputCollector 對象,從名字就可以看出它是往下游廣播數據的,這里就有一個疑問?把數據廣播到下游,那豈不是下游的每個數據流都有這條數據嗎?這樣的話是怎么實現分流的呢?帶著這個疑問,我們來看 BroadcastingOutputCollector 的 collect 方法是怎么實現的。
BroadcastingOutputCollector#collect
@Override
public void collect(StreamRecord<T> record) {
// 這里的 outputs 數組有三個 output 分別對應上面的三個輸出流
for (Output<StreamRecord<T>> output : outputs) {
output.collect(record);
}
}
在 BroadcastingOutputCollector 對象里也持有一個 output 對象,其實他們都實現了 Output 接口,用來往下游發送數據,這里的 outputs 是一個 Output 數組,代表了下游的所有 Output,因為上面有三個輸出流,所以數組里面就包含了 3 個 Output 對象。
循環的調用 output 的 collect 方法往下游發送數據,因為我打斷了 operator chain,所以 process 算子和下游的 Print 算子不在同一個 operatorChain 內,那么上下游算子之間數據傳輸用的就是 RecordWriterOutput,否則用的是 CopyingChainingOutput 或者 ChainingOutput,具體使用的是哪個 Output 這里就不多介紹了,后面有時間的話會單獨介紹。
RecordWriterOutput#collect
@Override
public void collect(StreamRecord<OUT> record) {
// 主流是沒有 outputTag 的,只有測流有 outputTag
if (this.outputTag != null) {
// we are not responsible for emitting to the main output.
return;
}
pushToRecordWriter(record);
}
接著來看 RecordWriterOutput 的 collect 方法,在 collect 方法里面會先判斷 outputTag 是否為空,如果不為空不做任何處理,直接返回,否則就把數據推送到下游算子,只有側流輸出才需要定義 outputTag,主流(正常流)是沒有 outputTag 的,所以這里會走 pushToRecordWriter 方法把數據寫入到下游,也就是說雖然會以廣播的形式把數據廣播到所有下游,但其實另外兩個側流是直接返回的,只有主流才會把數據推送到下游,這也就解釋了上面的疑問。
然后再來看第二個側流輸出 ctx.output(test, value) 的源碼,這里的 ctx 實際上是 ProcessOperator#ContextImpl 對象。
ProcessOperator#ContextImpl#output
@Override
public <X> void output(OutputTag<X> outputTag, X value) {
if (outputTag == null) {
throw new IllegalArgumentException("OutputTag must not be null.");
}
output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
}
如果 outputTag 是空,直接拋出異常,因為這個是側流,所以必須要定義 OutputTag。這里的 output 實際上是父類 AbstractStreamOperator 所持有的變量,如果 outputTag 不為空,就調用 output 的 collect 方法把數據發送到下游,這里的 output 和上面的一樣是 CountingOutput 但是 collect 方法是另外一個重載的方法。
CountingOutput#collect
@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
numRecordsOut.inc();
output.collect(outputTag, record);
}
可以發現,這個 collect 方法比上面那個多了一個 OutputTag 參數,也就是使用側流輸出的時候定義的 OutputTag 對象,然后調用 output 的 collect 方法發送數據,這個也和上面的一樣,同樣是 BroadcastingOutputCollector 對象的另外一個重載方法,多了一個 OutputTag 參數。
BroadcastingOutputCollector#collect
@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
for (Output<StreamRecord<T>> output : outputs) {
output.collect(outputTag, record);
}
}
這里的邏輯和上面是一樣的,同樣的循環調用 collect 方法發送數據。
RecordWriterOutput#collect
@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
// 先要判斷兩個 OutputTag 是否一樣
if (OutputTag.isResponsibleFor(this.outputTag, outputTag)) {
pushToRecordWriter(record);
}
}
在這個 collect 方法中會先判斷傳入的 OutputTag 對象和成員變量 this.outputTag 是不是相等,如果是的話,就發送數據,否則不做任何處理,所以這里每次只會選擇一個下游側流輸出數據,這樣就實現了所謂的分流。
OutputTag#isResponsibleFor
public static boolean isResponsibleFor(
@Nullable OutputTag<?> owner, @Nonnull OutputTag<?> other) {
return other.equals(owner);
}
可以看到在 isResponsibleFor 方法內是直接調用 OutputTag 的 equals 方法判斷兩個對象是否相等的。
第三個側流 test1 ctx.output(test1, value) 和第二個側流 test 是完全一樣的情況,這里就不在看代碼了。
上面是完成了分流操作,那怎么獲取到分流后結果呢(數據流)?我們可以通過 getSideOutput 方法獲取。
DataStream<JasonLeePOJO> sideOutput = process.getSideOutput(test);
DataStream<JasonLeePOJO> sideOutput1 = process.getSideOutput(test1);
getSideOutput 源碼
public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
sideOutputTag = clean(requireNonNull(sideOutputTag));
// make a defensive copy
sideOutputTag = new OutputTag<X>(sideOutputTag.getId(), sideOutputTag.getTypeInfo());
TypeInformation<?> type = requestedSideOutputs.get(sideOutputTag);
if (type != null && !type.equals(sideOutputTag.getTypeInfo())) {
throw new UnsupportedOperationException(
"A side output with a matching id was "
+ "already requested with a different type. This is not allowed, side output "
+ "ids need to be unique.");
}
requestedSideOutputs.put(sideOutputTag, sideOutputTag.getTypeInfo());
SideOutputTransformation<X> sideOutputTransformation =
new SideOutputTransformation<>(this.getTransformation(), sideOutputTag);
return new DataStream<>(this.getExecutionEnvironment(), sideOutputTransformation);
}
getSideOutput 方法里先是構建了一個 SideOutputTransformation 對象,然后又構建了 DataStream 對象,這樣我們就可以基于分流后的 DataStream 做不同的處理邏輯了,從而實現了把一個 DataStream 分流成多個 DataStream 功能。
總結
通過對側流輸出的源碼進行解析,在分流的時候,數據是通過廣播的方式發送到下游算子的,對于主流的數據來說,只有 OutputTag 為空的才會處理,側流因為 OutputTag 不為空,所以直接返回,不做任何處理,那對于側流的數據來說,是通過判斷兩個 OutputTag 是否相等,所以每次只會把數據發送到下游對應的那一個側流上去,這樣即可實現分流邏輯。
原文鏈接:https://juejin.cn/post/7143077414153748488
相關推薦
- 2022-10-15 python?實現syslog?服務器的詳細過程_python
- 2022-04-06 React中使用react-player?播放視頻或直播的方法_React
- 2024-04-03 objectMapper(字符串轉對象)
- 2022-04-23 uni-app之項目首頁實現步驟
- 2022-10-17 android中px、sp與dp之間進行轉換詳解_Android
- 2022-11-23 解決vant組件van-list 首屏加載兩次的情況
- 2023-01-03 Android序列化實現接口Serializable與Parcelable詳解_Android
- 2022-11-02 react父組件更改props子組件無法刷新原因及解決方法_React
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支