網站首頁 編程語言 正文
需求
- 將Hbase數據,解析后推送到RocketMQ。
- redis使用list數據類型,存儲了需要推送的數據的RowKey及表名。
簡單畫個流程圖就是:
分析及確定方案
Redis
- 明確list中元素結構
{"rowkey":rowkey,"table":table}
解析出rowkey; - 一次取多個元素加快效率;取了之后放入重試隊列,并刪除原來的元素;
- 處理數據永遠是重試隊列里的,成功之后刪除,失敗就加上重試次數并重新放回;
- 明確從list中取值所使用的redis命令;
- 范圍獲取
LRANGE
; - 范圍刪除(留下指定范圍的數據)
LTRIM
; - 判斷list長度
LLEN
; - 加入list
RPUSH
;刪除LREM
等等; - 從Hbase獲取數據失敗和發送到mq失敗都令重試次數加一;
- 每次碰到重試次數不為0的數據都休眠1s;
- 設置最大重試次數,達到限制后丟棄;
- 考慮客戶redis部署方式,單機、主從、集群、哨兵等;
- 選擇合適的客戶端,Jedis、Redisson、Lettuce等;
- 編寫不同的操作代碼,也可以利用配置文件、環境變量、工廠模式等適配各種部署模式;
Hbase
- 基本理論知識學習(原來沒接觸過),rowkey是沒條數據的主鍵,限定符是字段名,列族是多個限定名的集合等;
- 當時看這個覺得不錯https://www.jb51.net/article/230731.htm因為是不停讀取數據、鏈接、Table不用close,可以緩存起來,沒必要每次都創建;
- 確定批量獲取數據方式為批量
Get
,沒用scan
; - 了解解析方式,一些網上的解析試了之后會亂碼,這邊用的是它自帶的
CellUtil.clone
相關方法; - 考慮所有都沒數據時休眠10s;
RocketMQ
- 有現成的發送代碼,公司封裝好的;
- 調整發送的速度、太快了服務端會吃不消(獲取Hbase數據速度太快了,最開始沒限制一會兒就入了百萬數據),設置超時時間(默認3s);
- 調整服務端的內存、線程數等參數;
實現
配置
#server configuration server.port=8896 #log config logging.file.path=./logs #redis-standalone redis.standalone.host= redis.standalone.port=6379 redis.standalone.password= redis.standalone.enable=true #redis-cluster redis.cluster.nodes= redis.cluster.password= redis.cluster.timeout=30000 redis.cluster.enable=false # Zookeeper 集群地址,逗號分隔 hbase.zookeeper.quorum= # Zookeeper 端口 hbase.zookeeper.property.clientPort=2181 # 消息目的rocketmq地址 rocketmq.server.host= # 發送消息間隔時間,防止發送過快mq受不了 rocketmq.send.interval.millisec=10 # 每次從redis讀取數據量限制。 data.access.redisDataSize=100 # 失敗數據重試次數,超過的直接丟棄 data.access.retryNum=10 # 需要接入的表,需要發送到rocketmq的topic和在redis中的key的映射。xxx.xxx.xxx[topic]=redisKey data.access.topicKeyMap[weibo_hbase]=data:sync:notice:suanzi:weibo:back data.access.topicKeyMap[wechat_hbase]=data:sync:notice:suanzi:wechat:back
部分代碼
獲取配置,其余的直接@Value("${}")
:
@Setter @Getter @Configuration @ConfigurationProperties(prefix = "data.access") public class AccessRedisMqConfig { /** * key:topic; value:redis的key */ private Map<String, String> topicKeyMap = new HashMap<>(); /** * 一次從redis中讀取數據量限制 */ private long redisDataSize = 50; /** * 失敗數據重試次數 */ private int retryNum = 10; }
開啟接入:
@Component public class AdapterRunner implements ApplicationRunner { @Resource private DataAccessService dataAccessService; @Override public void run(ApplicationArguments args) { System.out.println("項目已啟動,開始接入數據到RocketMQ……"); dataAccessService.accessData2Mq(); } }
其他代碼其實也在分析里了。
踩坑
mq發送問題
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: invokeAsync call timeout at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeAsync(NettyRemotingClient.java:525) at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageAsync(MQClientAPIImpl.java:523) at org.apache.rocketmq.client.impl.MQClientAPIImpl.onExceptionImpl(MQClientAPIImpl.java:610) at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$100(MQClientAPIImpl.java:167) at org.apache.rocketmq.client.impl.MQClientAPIImpl$1.operationComplete(MQClientAPIImpl.java:572) at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54) at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:319) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Wo
上面分析也說了,注意發送速度,有多少資源就接入多快。還有注意相關三個端口是否開放。
總結
原文鏈接:https://www.cnblogs.com/letscrazy/p/case.html
相關推薦
- 2022-07-22 C語言輸出所有水仙花數
- 2022-12-27 一文帶你了解Go語言標準庫strings的常用函數和方法_Golang
- 2022-10-30 在C#程序中注入惡意DLL的方法詳解_C#教程
- 2023-06-05 Python?time時間格式化和設置時區實現代碼詳解_python
- 2022-04-30 C/C++編程語言中的指針(pointer)你了解嗎_C 語言
- 2022-05-02 C語言實現簡單回聲服務器_C 語言
- 2023-06-18 Go語言實現關閉http請求的方式總結_Golang
- 2022-08-21 golang數組內存分配原理_Golang
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支