日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

MapReduce讀取定長文件入庫Hive表Orc格式

作者:swg321321 更新時間: 2022-08-19 編程語言

文章目錄

  • MapReduce讀取定長文件入庫Hive表Orc格式
  • MapReduce啟動程序
  • FixedLengthMapper
  • OrcFixedReduce


MapReduce讀取定長文件入庫Hive表Orc格式

定長文件指,每一行的byte是相同的。且有一個定義定長數據中,每一部分是什么字段,長度多少等信息。

需要寫入到指定的Hive分區的時候, 需要創建對應分區并指定地址為輸出地址。既可完成。

MapReduce啟動程序

實例代碼,配置讀取文件,Map操作,Reduce操作以及輸出文件。

package com.study.spark.mr;

import com.study.spark.mr.mapper.FixedLengthMapper;
import com.study.spark.mr.reduce.OrcFixedReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FixedLengthInputFormat;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapreduce.OrcOutputFormat;
import org.apache.parquet.hadoop.ParquetInputFormat;

public class FileOrcParquetExample {
    
    public static void main(String[] args) throws Exception{
        mr();
    }


    public static void mr() throws Exception {

        Configuration configuration = new Configuration();
        int recordLength = 200; //定長文件每行長度,如果文件每行帶有/n則需要加1
        configuration.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH,recordLength);
        configuration.set("encode","文件編號格式");
        configuration.set("orc.mapred.output.schema",schema().toString());

        Job job = Job.getInstance(configuration);
        //設置執行的
        job.setJarByClass(FileParquetExample.class);
        job.setJobName("FileParquetExample");
        Path path = new Path("hdfs:");
        ParquetInputFormat.setInputPaths(job,path);
        job.setInputFormatClass(FixedLengthInputFormat.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(BytesWritable.class);
        job.setMapperClass(FixedLengthMapper.class);

        job.setOutputFormatClass(OrcOutputFormat.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(OrcStruct.class);
        job.setReducerClass(OrcFixedReduce.class);
        //文件輸出位置
        OrcOutputFormat.setOutputPath(job,new Path("hdfs://"));

        job.waitForCompletion(true);

    }

    public static  TypeDescription schema(){
        OrcStruct各種數據格式參考鏈接:https://blog.csdn.net/swg321321/article/details/125879576
        TypeDescription description = new TypeDescription(TypeDescription.Category.STRUCT);
        description.addField("boolean",TypeDescription.createBoolean());
        description.addField("decimal",TypeDescription.createDecimal()).withPrecision(22).withScale(2);
        return description;

    }

}

FixedLengthMapper

代碼實現,定長文件讀取出現的數據,在這里進入Mapper處理。

package com.study.spark.mr.mapper;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FixedLengthMapper extends Mapper<LongWritable, BytesWritable, LongWritable, BytesWritable> {
    /**
     * 在這里完成,對數據的修改。如果不錯修改也可以放到Reduce中進行修改
     * @param key
     * @param value
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    protected void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
        context.write(key, value);
    }
}

OrcFixedReduce

代碼實現,從Map讀取到數據轉為Orc文件

package com.study.spark.mr.reduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcStruct;

import java.io.IOException;

public class OrcFixedReduce extends Reducer<LongWritable, BytesWritable, NullWritable, OrcStruct> {

    private TypeDescription typeDescription;

    /**
     * Called once at the start of the task.
     */
    protected void setup(Context context) throws IOException, InterruptedException {

        Configuration config = context.getConfiguration();
        if(config.get("orc.mapred.output.schema") == null){
            throw new RuntimeException("需要設置ORC的Schema,orc.mapred.output.schema");
        }
        typeDescription =  TypeDescription.fromString(config.get("orc.mapred.output.schema"));
    }

    /**
     * This method is called once for each key. Most applications will define
     * their reduce class by overriding this method. The default implementation
     * is an identity function.
     */
    @SuppressWarnings("unchecked")
    protected void reduce(LongWritable key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
        for(BytesWritable value: values) {
            OrcStruct orcStruct = new OrcStruct(typeDescription);
            byte[] bs = value.getBytes();
            //在這里實現自己的分割字符
            //OrcStruct各種數據格式寫入參考鏈接:https://blog.csdn.net/swg321321/article/details/125879576

            context.write(NullWritable.get(),orcStruct);
        }
    }

}

原文鏈接:https://blog.csdn.net/swg321321/article/details/126414003

欄目分類
最近更新