日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

采用python開發sparkstreming全流程

作者:qq_32457341 更新時間: 2022-08-13 編程語言

最近開發了sparkstreaming的程序,且開發語言是采用python的,下述記錄了開發的具體代碼和過程,方便今后重復使用;

使用場景

需要從kafka的topic上消費數據,最終寫入到hadoop集群中,這里面有幾個方案;
(1)采用kudu作為存儲系統,直接將消費到的數據寫入到kudu存儲中,之后利用該數據;
(2)消費寫入到文件中,放在hdfs上,采用hive-load的方式寫入到hive表中,之后利用該數據;
(3)sparkstreaming直接寫入到hive的分區表中,后續利用該數據;

注:方案(2)如果是流式處理的話,只能寫到一個文件或文件夾當中,不好截取處理,我們最終采用了(3)方案,關于(3)方案會有小文件的問題,后續也會介紹該問題的解決方法;

代碼開發

我們采用python的pyspark開發了相應的代碼,這里介紹一下具體的環境情況;
spark版本:2.4.0.7
pyspark的版本:2.4.6

代碼如下:

#coding:utf-8

from pyspark.sql import SparkSession
 
# basic info
app_name = 'xxx'
 
# kafka info
kafka_broker_list = 'xxxx'
kafka_topic = 'xxx'
kafka_groupid = 'xxx'
kafka_username = 'xxx'
kafka_password = 'xxx'
 
def create_spark_session(app_name,log_level):
    '''
    create sparkSession,setting log level
    :param app_name:
    :return:
    '''
    spark_session = SparkSession \
        .builder \
        .appName(app_name) \
        .enableHiveSupport() \
        .getOrCreate()
    spark_session.sparkContext.setLogLevel('ERROR')
    return spark_session
          
def foreach_batch_function(batch_df, batch_id):
    '''
    '''
    # write no partition table
    #batch_df.write.format('hive').mode('append').saveAsTable('xxxx')
    # write partition table
    batch_df.write.format('hive').mode('append').partitionBy('partitionxxx').saveAsTable('xxx')
     
sqlstr1='''
create table xxx(xxx) partitioned by 'xxx' stored as parquet location 'hdfs://tmp/xxx'
'''       
def main():   
    # 1.create sparkSession
   spark_session = create_spark_session(app_name, 'ERROR')
 
    # 2.create table,this can not created by hive user,privilege permission
    #spark_session.sql(sqlstr1)
 
    # 3.read kafka data, using structstreaming
    df = spark_session.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_broker_list) \
        .option("subscribe", kafka_topic) \
        .option("kafka.sasl.mechanism", "PLAIN") \
        .option("kafka.security.protocol", "SASL_PLAINTEXT") \
        .option("kafka.sasl.jaas.config",
                'org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx" password="xxx";') \
        .option("startingOffsets", "earliest") \
        .load()
     
    # 4. write partition table, you must set this config   
    spark_session.sql("set hive.exec.dynamic.partition.mode=nonstrict")
     
    # 5.get the data you want to,using sql
    words = df.selectExpr("cast(value as STRING) AS value")
    job_info_df = words.where("get_json_object(value,'$.xxx')='xxx'") \
                       .selectExpr("get_json_object(value,'$.xxx1') as xxx1",
                                   "get_json_object(value,'$.xxx2') as xxx2"
                       )
 
    # 5.using streaming, write table, split streaming to batch, write batch to table
    query1 = job_info_df. \
        writeStream. \
        option('checkpointLocation', 'xxx'). \
        foreachBatch(foreach_batch_function). \
        trigger(processingTime='1 minute'). \
        start()
    query1.awaitTermination()
    query1.stop()
 
     
if __name__ == '__main__':
    main()

注:
(1)我們在spark集群上運行python代碼時,發現如果存在中文字符,會導致執行不通過,故建議不要使用中文注釋,防止執行時報錯;
(2)本代碼是使用structstreaming的方式消費kafka的數據,經過sql處理,選取想要的數據,最終將數據寫入到hive分區表中,在寫入hive分區表時,我們通過foreachBatch的函數,將讀取的流分成多個dataFrame,然后用對應的foreach_batch_function這個函數分別對dataFrame進行處理,這里使用了saveAsTable的方式寫入到最終表中,表可以是分區表和非分區表;
(3)不知道是何原因,我們用hive直接建立最終表時,總是會報寫入權限的問題,因此我們在sparksql中建立了最終表,且建立的是外表,目錄是當前執行用戶可以訪問的目錄;
(4)spark通過流式寫入數據時,需要設置checkpointLocation的目錄,如果不設置,spark會配置checkpoint的默認目錄,在當前集群下,默認目錄是/,spark無法寫入,會報錯;

代碼執行

代碼執行采用spark-submit的方式執行,具體如下:

 #client 模式
spark-submit --master yarn --keytab /home/xxx/xxx.keytab --principal xxx xxx.py
# cluster模式
 spark-submit --master yarn --deploy-mode cluster --keytab /home/xxx/xxx.keytab --principal xxx xxx.py

注:我們的集群采用kerberos認證的方式,因此提交時需要使用配置–kertab和–principal這兩個參數;

spark寫入hive小文件的問題

在用spark寫入hive時,會出現很多的小文件,我們提供如下的解決方案:
(1)寫入的表最好是分區表;
(2)對歷史分區(非當前寫入的分區)采用如下的命令解決小文件的問題;

INSERT OVERWRITE TABLE tablename PARTITION (partition)
SELECT * FROM tablename
DISTRIBUTE BY partition,cast(rand() * N as int)

原文鏈接:https://blog.csdn.net/qq_32457341/article/details/125315068

欄目分類
最近更新