網站首頁 編程語言 正文
最近開發了sparkstreaming的程序,且開發語言是采用python的,下述記錄了開發的具體代碼和過程,方便今后重復使用;
使用場景
需要從kafka的topic上消費數據,最終寫入到hadoop集群中,這里面有幾個方案;
(1)采用kudu作為存儲系統,直接將消費到的數據寫入到kudu存儲中,之后利用該數據;
(2)消費寫入到文件中,放在hdfs上,采用hive-load的方式寫入到hive表中,之后利用該數據;
(3)sparkstreaming直接寫入到hive的分區表中,后續利用該數據;
注:方案(2)如果是流式處理的話,只能寫到一個文件或文件夾當中,不好截取處理,我們最終采用了(3)方案,關于(3)方案會有小文件的問題,后續也會介紹該問題的解決方法;
代碼開發
我們采用python的pyspark開發了相應的代碼,這里介紹一下具體的環境情況;
spark版本:2.4.0.7
pyspark的版本:2.4.6
代碼如下:
#coding:utf-8
from pyspark.sql import SparkSession
# basic info
app_name = 'xxx'
# kafka info
kafka_broker_list = 'xxxx'
kafka_topic = 'xxx'
kafka_groupid = 'xxx'
kafka_username = 'xxx'
kafka_password = 'xxx'
def create_spark_session(app_name,log_level):
'''
create sparkSession,setting log level
:param app_name:
:return:
'''
spark_session = SparkSession \
.builder \
.appName(app_name) \
.enableHiveSupport() \
.getOrCreate()
spark_session.sparkContext.setLogLevel('ERROR')
return spark_session
def foreach_batch_function(batch_df, batch_id):
'''
'''
# write no partition table
#batch_df.write.format('hive').mode('append').saveAsTable('xxxx')
# write partition table
batch_df.write.format('hive').mode('append').partitionBy('partitionxxx').saveAsTable('xxx')
sqlstr1='''
create table xxx(xxx) partitioned by 'xxx' stored as parquet location 'hdfs://tmp/xxx'
'''
def main():
# 1.create sparkSession
spark_session = create_spark_session(app_name, 'ERROR')
# 2.create table,this can not created by hive user,privilege permission
#spark_session.sql(sqlstr1)
# 3.read kafka data, using structstreaming
df = spark_session.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_broker_list) \
.option("subscribe", kafka_topic) \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_PLAINTEXT") \
.option("kafka.sasl.jaas.config",
'org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx" password="xxx";') \
.option("startingOffsets", "earliest") \
.load()
# 4. write partition table, you must set this config
spark_session.sql("set hive.exec.dynamic.partition.mode=nonstrict")
# 5.get the data you want to,using sql
words = df.selectExpr("cast(value as STRING) AS value")
job_info_df = words.where("get_json_object(value,'$.xxx')='xxx'") \
.selectExpr("get_json_object(value,'$.xxx1') as xxx1",
"get_json_object(value,'$.xxx2') as xxx2"
)
# 5.using streaming, write table, split streaming to batch, write batch to table
query1 = job_info_df. \
writeStream. \
option('checkpointLocation', 'xxx'). \
foreachBatch(foreach_batch_function). \
trigger(processingTime='1 minute'). \
start()
query1.awaitTermination()
query1.stop()
if __name__ == '__main__':
main()
注:
(1)我們在spark集群上運行python代碼時,發現如果存在中文字符,會導致執行不通過,故建議不要使用中文注釋,防止執行時報錯;
(2)本代碼是使用structstreaming的方式消費kafka的數據,經過sql處理,選取想要的數據,最終將數據寫入到hive分區表中,在寫入hive分區表時,我們通過foreachBatch的函數,將讀取的流分成多個dataFrame,然后用對應的foreach_batch_function這個函數分別對dataFrame進行處理,這里使用了saveAsTable的方式寫入到最終表中,表可以是分區表和非分區表;
(3)不知道是何原因,我們用hive直接建立最終表時,總是會報寫入權限的問題,因此我們在sparksql中建立了最終表,且建立的是外表,目錄是當前執行用戶可以訪問的目錄;
(4)spark通過流式寫入數據時,需要設置checkpointLocation的目錄,如果不設置,spark會配置checkpoint的默認目錄,在當前集群下,默認目錄是/,spark無法寫入,會報錯;
代碼執行
代碼執行采用spark-submit的方式執行,具體如下:
#client 模式
spark-submit --master yarn --keytab /home/xxx/xxx.keytab --principal xxx xxx.py
# cluster模式
spark-submit --master yarn --deploy-mode cluster --keytab /home/xxx/xxx.keytab --principal xxx xxx.py
注:我們的集群采用kerberos認證的方式,因此提交時需要使用配置–kertab和–principal這兩個參數;
spark寫入hive小文件的問題
在用spark寫入hive時,會出現很多的小文件,我們提供如下的解決方案:
(1)寫入的表最好是分區表;
(2)對歷史分區(非當前寫入的分區)采用如下的命令解決小文件的問題;
INSERT OVERWRITE TABLE tablename PARTITION (partition)
SELECT * FROM tablename
DISTRIBUTE BY partition,cast(rand() * N as int)
原文鏈接:https://blog.csdn.net/qq_32457341/article/details/125315068
相關推薦
- 2022-07-24 C++算法學習之分支限界法的應用_C 語言
- 2022-05-04 Django點贊的實現示例_python
- 2022-07-21 SpringBoot整合SpringCache詳解
- 2024-02-29 UNI-APP項目在引用官方提供的Uni-App-Demo實例中的組件時應該注意的問題
- 2023-01-03 Nginx?Gunicorn?flask項目部署思路分析詳解_nginx
- 2022-01-18 django中ajax發送post請求報403錯誤csrf禁止,解決只需三步
- 2022-05-03 基于docker部署skywalking實現全鏈路監控功能_docker
- 2022-11-27 Python+decimal完成精度計算的示例詳解_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支