網站首頁 編程語言 正文
文章目錄
- 創建工程
- 在 pom 文件中添加 Flink 相關依賴
- 示例代碼
- 代碼推送到 gitee
- 配置項目
- 編譯項目
- 提交應用
- Yarn 平臺確認執行結果
創建工程
筆者工程名叫Flink-StreamX
在 pom 文件中添加 Flink 相關依賴
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.13.6</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF
folder.
Otherwise, this might cause SecurityExceptions when
using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
示例代碼
package com.apache.bigdata;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class UnboundedWC {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.socketTextStream("hadoop102", 9999)
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
for (String word : line.split(" ")) {
out.collect(word);
}
}
})
.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String word) throws Exception {
return Tuple2.of(word, 1l);
}
})
.keyBy(new KeySelector<Tuple2<String, Long>, String>() {
@Override
public String getKey(Tuple2<String, Long> t) throws Exception {
return t.f0; // t._1
}
})
.sum(1)
.print();
env.execute();
}
}
代碼推送到 gitee
在 streamx 平臺部署應用的時候要求代碼最好部署在 git 平臺,比如 github 或 gitee。作為國內用戶我們選擇比較穩定的 gitee。
如果不會git/gitee的小伙伴,可以從基礎學習一下,時間不長,一天足夠。
我的項目推送地址:https://gitee.com/luan_hao/Flink-StreamX/
配置項目
編譯項目
編譯前:
第一次編譯需要的時間比較久, 因為需要下載許多的依賴。
編譯成功后:
提交應用
1)創建應用
2)配置應用
3)上線應用
4)啟動應用(注意先啟動 socket: nc -lk 9999)
啟動成功的表現:
開始運行:
在彈出的界面里點擊Task Managers,然后點擊正在運行的任務
[root@hadoop102 local]# nc -lk 9999
spark kafka
flink hadoop
再點擊stdout即可查看輸出:
Yarn 平臺確認執行結果
成功。
原文鏈接:https://blog.csdn.net/weixin_45417821/article/details/125896898
相關推薦
- 2023-11-20 如何設置樹莓派4B的頻率?
- 2022-06-04 python實現對doc,txt,xls文檔的讀寫操作_python
- 2022-06-13 golang下grpc框架的使用編寫示例_Golang
- 2021-12-03 前端異常502?bad?gateway的原因和解決辦法_nginx
- 2022-09-05 SparkStreaming寫入Hive慢
- 2022-10-13 Golang執行cmd命令行的方法_Golang
- 2022-05-02 Numpy中創建數組的9種方式小結_python
- 2022-07-18 spring boot 中解決 Invalid character found in the req
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支