網站首頁 編程語言 正文
在Flink中,當我們需要獲取到算子的Processing Time或者Water Mark以及定時器時,可以實現ProcessFunction函數。
目前該函數主要有:K惡業的ProcessFuntion,ProcessFunction,CoPropcessFunction等,核心功能主要如下:
- 可以使用狀態計算,能夠在算子中訪問Keyed State
- 可以設置定時器
- 側輸出,可以將一部分數據發送到另外一個數據流中,而且輸出的兩個數據流數據類型可以不一樣。
如下自定義實現一個KeyedProcessFunction
:
public class MyKeyedProcessFunctionJava extends KeyedProcessFunction<String, StockPrice,String> {
private ValueState<Long> currentTime;
private ValueState<Double> lastPrice ;
private static long intervalMs = 500 ;
OutputTag<StockPrice> highLevel ;
OutputTag<StockPrice> middleLevel;
OutputTag<StockPrice> lowLevel;
public MyKeyedProcessFunctionJava(OutputTag<StockPrice> highLevel,
OutputTag<StockPrice> middleLevel,
OutputTag<StockPrice> lowLevel){
this.lowLevel = lowLevel;
this.middleLevel= middleLevel;
this.highLevel = highLevel;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
currentTime = getRuntimeContext().getState(new ValueStateDescriptor<Long>("currentTime",Long.class));
lastPrice = getRuntimeContext().getState(new ValueStateDescriptor<Double>("lastPrice",Double.class));
}
@Override
public void processElement(StockPrice stockPrice, Context context, Collector<String> collector) throws Exception {
double price = 0;
if(lastPrice.value() != null) {
price = lastPrice.value() ;
}
long currentTimeStamp = 0;
if(currentTime.value() != null){
currentTimeStamp = currentTime.value();
}
if(price < stockPrice.getPrice()){
context.timerService().deleteEventTimeTimer(currentTimeStamp);
}else{
long time = context.timestamp()+intervalMs;
context.timerService().registerEventTimeTimer(time);
currentTime.update(time);
}
lastPrice.update(stockPrice.getPrice());
if(price>10000){
context.output(highLevel,stockPrice);
}else if(price<= 10000 && price > 3000){
context.output(middleLevel,stockPrice);
}else{
context.output(lowLevel,stockPrice);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
SimpleDateFormat format = new SimpleDateFormat("yyy-MM-dd HH:mm:ss");
String timeStr = format.format(timestamp);
String warnings = String.format("warnings: time=%s ,key=%s increased",timeStr,ctx.getCurrentKey());
out.collect(warnings);
}
public static void main(String[] args) {
String topic = "test001";
Properties kafkaProps = new Properties();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<StockPrice> consumer = new FlinkKafkaConsumer<StockPrice>(topic, new MyKafkaDeserializationSchema(), kafkaProps);
OutputTag<StockPrice> highLevel = new OutputTag<StockPrice>("highLevel");
OutputTag<StockPrice> middleLevel = new OutputTag<StockPrice>("middleLevel");
OutputTag<StockPrice> lowLevel = new OutputTag<StockPrice>("lowLevel");
SingleOutputStreamOperator<String> warning = env.addSource(consumer)
.keyBy(stockPrice -> stockPrice.getId())
.process(new MyKeyedProcessFunctionJava(highLevel,middleLevel,lowLevel));
DataStream<StockPrice> highLevelStream = warning.getSideOutput(highLevel);
DataStream<StockPrice> middleLevelStream = warning.getSideOutput(middleLevel);
DataStream<StockPrice> lowLevelStream = warning.getSideOutput(lowLevel);
}
}
原文鏈接:https://blog.csdn.net/LeoHan163/article/details/122400891
相關推薦
- 2022-07-04 C#實現中文日歷Calendar_C#教程
- 2022-06-09 詳解Python中*args和**kwargs的使用_python
- 2022-05-08 C#使用Unity實現IOC_實用技巧
- 2022-08-03 Redis生成全局唯一ID的實現方法_Redis
- 2022-11-07 React?Hook父組件如何獲取子組件的數據/函數_React
- 2022-12-04 go高并發時append方法偶現錯誤解決分析_Golang
- 2022-09-21 Golang運行報錯找不到包:package?xxx?is?not?in?GOROOT的解決過程_G
- 2022-12-29 React中的Hooks路由跳轉問題_React
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支