網(wǎng)站首頁 編程語言 正文
文章目錄
- 創(chuàng)建工程
- 在 pom 文件中添加 Flink 相關(guān)依賴
- 示例代碼
- 代碼推送到 gitee
- 配置項(xiàng)目
- 編譯項(xiàng)目
- 提交應(yīng)用
- Yarn 平臺(tái)確認(rèn)執(zhí)行結(jié)果
創(chuàng)建工程
筆者工程名叫Flink-StreamX
在 pom 文件中添加 Flink 相關(guān)依賴
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.13.6</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF
folder.
Otherwise, this might cause SecurityExceptions when
using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
示例代碼
package com.apache.bigdata;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class UnboundedWC {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.socketTextStream("hadoop102", 9999)
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
for (String word : line.split(" ")) {
out.collect(word);
}
}
})
.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String word) throws Exception {
return Tuple2.of(word, 1l);
}
})
.keyBy(new KeySelector<Tuple2<String, Long>, String>() {
@Override
public String getKey(Tuple2<String, Long> t) throws Exception {
return t.f0; // t._1
}
})
.sum(1)
.print();
env.execute();
}
}
代碼推送到 gitee
在 streamx 平臺(tái)部署應(yīng)用的時(shí)候要求代碼最好部署在 git 平臺(tái),比如 github 或 gitee。作為國(guó)內(nèi)用戶我們選擇比較穩(wěn)定的 gitee。
如果不會(huì)git/gitee的小伙伴,可以從基礎(chǔ)學(xué)習(xí)一下,時(shí)間不長(zhǎng),一天足夠。
我的項(xiàng)目推送地址:https://gitee.com/luan_hao/Flink-StreamX/
配置項(xiàng)目
編譯項(xiàng)目
編譯前:
第一次編譯需要的時(shí)間比較久, 因?yàn)樾枰螺d許多的依賴。
編譯成功后:
提交應(yīng)用
1)創(chuàng)建應(yīng)用
2)配置應(yīng)用
3)上線應(yīng)用
4)啟動(dòng)應(yīng)用(注意先啟動(dòng) socket: nc -lk 9999)
啟動(dòng)成功的表現(xiàn):
開始運(yùn)行:
在彈出的界面里點(diǎn)擊Task Managers,然后點(diǎn)擊正在運(yùn)行的任務(wù)
[root@hadoop102 local]# nc -lk 9999
spark kafka
flink hadoop
再點(diǎn)擊stdout即可查看輸出:
Yarn 平臺(tái)確認(rèn)執(zhí)行結(jié)果
成功。
原文鏈接:https://blog.csdn.net/weixin_45417821/article/details/125896898
相關(guān)推薦
- 2023-05-22 pytorch的Backward過程用時(shí)太長(zhǎng)問題及解決_python
- 2022-09-13 c++實(shí)現(xiàn)排序算法之希爾排序方式_C 語言
- 2022-05-31 如何使用yolov5輸出檢測(cè)到的目標(biāo)坐標(biāo)信息_python
- 2022-02-14 flutter封裝自定義打印信息
- 2022-12-04 WxPython界面利用pubsub如何實(shí)現(xiàn)多線程控制_python
- 2022-11-22 python模塊導(dǎo)入方式淺析步驟_python
- 2021-12-12 C++多線程之互斥鎖與死鎖_C 語言
- 2022-03-26 c#使用listbox的詳細(xì)方法和常見問題解決_C#教程
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡(jiǎn)單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對(duì)象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支