網站首頁 編程語言 正文
文章目錄
- MapReduce讀取定長文件入庫Hive表Orc格式
- MapReduce啟動程序
- FixedLengthMapper
- OrcFixedReduce
MapReduce讀取定長文件入庫Hive表Orc格式
定長文件指,每一行的byte是相同的。且有一個定義定長數據中,每一部分是什么字段,長度多少等信息。
需要寫入到指定的Hive分區的時候, 需要創建對應分區并指定地址為輸出地址。既可完成。
MapReduce啟動程序
實例代碼,配置讀取文件,Map操作,Reduce操作以及輸出文件。
package com.study.spark.mr;
import com.study.spark.mr.mapper.FixedLengthMapper;
import com.study.spark.mr.reduce.OrcFixedReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FixedLengthInputFormat;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapreduce.OrcOutputFormat;
import org.apache.parquet.hadoop.ParquetInputFormat;
public class FileOrcParquetExample {
public static void main(String[] args) throws Exception{
mr();
}
public static void mr() throws Exception {
Configuration configuration = new Configuration();
int recordLength = 200; //定長文件每行長度,如果文件每行帶有/n則需要加1
configuration.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH,recordLength);
configuration.set("encode","文件編號格式");
configuration.set("orc.mapred.output.schema",schema().toString());
Job job = Job.getInstance(configuration);
//設置執行的
job.setJarByClass(FileParquetExample.class);
job.setJobName("FileParquetExample");
Path path = new Path("hdfs:");
ParquetInputFormat.setInputPaths(job,path);
job.setInputFormatClass(FixedLengthInputFormat.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setMapperClass(FixedLengthMapper.class);
job.setOutputFormatClass(OrcOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(OrcStruct.class);
job.setReducerClass(OrcFixedReduce.class);
//文件輸出位置
OrcOutputFormat.setOutputPath(job,new Path("hdfs://"));
job.waitForCompletion(true);
}
public static TypeDescription schema(){
OrcStruct各種數據格式參考鏈接:https://blog.csdn.net/swg321321/article/details/125879576
TypeDescription description = new TypeDescription(TypeDescription.Category.STRUCT);
description.addField("boolean",TypeDescription.createBoolean());
description.addField("decimal",TypeDescription.createDecimal()).withPrecision(22).withScale(2);
return description;
}
}
FixedLengthMapper
代碼實現,定長文件讀取出現的數據,在這里進入Mapper處理。
package com.study.spark.mr.mapper;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FixedLengthMapper extends Mapper<LongWritable, BytesWritable, LongWritable, BytesWritable> {
/**
* 在這里完成,對數據的修改。如果不錯修改也可以放到Reduce中進行修改
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
*/
protected void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
context.write(key, value);
}
}
OrcFixedReduce
代碼實現,從Map讀取到數據轉為Orc文件
package com.study.spark.mr.reduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcStruct;
import java.io.IOException;
public class OrcFixedReduce extends Reducer<LongWritable, BytesWritable, NullWritable, OrcStruct> {
private TypeDescription typeDescription;
/**
* Called once at the start of the task.
*/
protected void setup(Context context) throws IOException, InterruptedException {
Configuration config = context.getConfiguration();
if(config.get("orc.mapred.output.schema") == null){
throw new RuntimeException("需要設置ORC的Schema,orc.mapred.output.schema");
}
typeDescription = TypeDescription.fromString(config.get("orc.mapred.output.schema"));
}
/**
* This method is called once for each key. Most applications will define
* their reduce class by overriding this method. The default implementation
* is an identity function.
*/
@SuppressWarnings("unchecked")
protected void reduce(LongWritable key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
for(BytesWritable value: values) {
OrcStruct orcStruct = new OrcStruct(typeDescription);
byte[] bs = value.getBytes();
//在這里實現自己的分割字符
//OrcStruct各種數據格式寫入參考鏈接:https://blog.csdn.net/swg321321/article/details/125879576
context.write(NullWritable.get(),orcStruct);
}
}
}
原文鏈接:https://blog.csdn.net/swg321321/article/details/126414003
相關推薦
- 2022-11-04 asp.net?core?認證和授權實例詳解_實用技巧
- 2022-08-06 C#后臺調用WebApi接口的實現方法_C#教程
- 2022-03-31 jenkins?實現shell腳本化定時執行任務的方法_linux shell
- 2023-01-18 一文帶你了解Qt中槽的使用_C 語言
- 2023-01-21 詳解Go語言如何使用標準庫sort對切片進行排序_Golang
- 2022-06-20 .NET?Core企業微信網頁授權登錄的實現_實用技巧
- 2022-08-10 Python接口自動化之request請求封裝源碼分析_python
- 2022-03-07 Android顯示系統SurfaceFlinger分析_Android
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支