日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

Flink中實現自定義ProcessFunction實現定時器、側輸出

作者:Leo Han 更新時間: 2022-01-21 編程語言

在Flink中,當我們需要獲取到算子的Processing Time或者Water Mark以及定時器時,可以實現ProcessFunction函數。
目前該函數主要有:K惡業的ProcessFuntion,ProcessFunction,CoPropcessFunction等,核心功能主要如下:

  • 可以使用狀態計算,能夠在算子中訪問Keyed State
  • 可以設置定時器
  • 側輸出,可以將一部分數據發送到另外一個數據流中,而且輸出的兩個數據流數據類型可以不一樣。

如下自定義實現一個KeyedProcessFunction:


public class MyKeyedProcessFunctionJava extends KeyedProcessFunction<String, StockPrice,String> {

    private ValueState<Long> currentTime;
    private ValueState<Double> lastPrice ;
    private static long  intervalMs = 500 ;

    OutputTag<StockPrice> highLevel ;
    OutputTag<StockPrice> middleLevel;
    OutputTag<StockPrice> lowLevel;

    public MyKeyedProcessFunctionJava(OutputTag<StockPrice> highLevel,
                                      OutputTag<StockPrice> middleLevel,
                                      OutputTag<StockPrice> lowLevel){
        this.lowLevel = lowLevel;
        this.middleLevel= middleLevel;
        this.highLevel = highLevel;

    }
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        currentTime = getRuntimeContext().getState(new ValueStateDescriptor<Long>("currentTime",Long.class));
        lastPrice = getRuntimeContext().getState(new ValueStateDescriptor<Double>("lastPrice",Double.class));
    }

    @Override
    public void processElement(StockPrice stockPrice, Context context, Collector<String> collector) throws Exception {
        double price = 0;
        if(lastPrice.value() != null) {
            price = lastPrice.value() ;
        }
        long currentTimeStamp = 0;
        if(currentTime.value() != null){
            currentTimeStamp = currentTime.value();
        }
        if(price < stockPrice.getPrice()){
            context.timerService().deleteEventTimeTimer(currentTimeStamp);
        }else{
            long time = context.timestamp()+intervalMs;
            context.timerService().registerEventTimeTimer(time);
            currentTime.update(time);

        }
        lastPrice.update(stockPrice.getPrice());
        if(price>10000){
            context.output(highLevel,stockPrice);
        }else if(price<= 10000 && price > 3000){
            context.output(middleLevel,stockPrice);
        }else{
            context.output(lowLevel,stockPrice);
        }


    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyy-MM-dd HH:mm:ss");
        String timeStr = format.format(timestamp);
        String warnings = String.format("warnings: time=%s ,key=%s increased",timeStr,ctx.getCurrentKey());
        out.collect(warnings);
    }

    public static void main(String[] args) {
        String topic = "test001";
        Properties kafkaProps = new Properties();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        FlinkKafkaConsumer<StockPrice> consumer = new FlinkKafkaConsumer<StockPrice>(topic, new MyKafkaDeserializationSchema(), kafkaProps);

        OutputTag<StockPrice> highLevel = new OutputTag<StockPrice>("highLevel");
        OutputTag<StockPrice> middleLevel = new OutputTag<StockPrice>("middleLevel");
        OutputTag<StockPrice> lowLevel = new OutputTag<StockPrice>("lowLevel");

        SingleOutputStreamOperator<String> warning = env.addSource(consumer)
                .keyBy(stockPrice -> stockPrice.getId())
        .process(new MyKeyedProcessFunctionJava(highLevel,middleLevel,lowLevel));

        DataStream<StockPrice> highLevelStream = warning.getSideOutput(highLevel);
        DataStream<StockPrice> middleLevelStream = warning.getSideOutput(middleLevel);
        DataStream<StockPrice> lowLevelStream = warning.getSideOutput(lowLevel);





    }
}

原文鏈接:https://blog.csdn.net/LeoHan163/article/details/122400891

欄目分類
最近更新