
導讀
大報文問題,在京東物流內較少出現,但每次出現往往是大事故,甚至導致上下游多個系統故障。大報文的背後,是不同商家業務體量不同,特別是B端業務的採購及銷售出庫單,一些頭部商家對京東系統支持業務複雜度及容量能力的要求越來越高。因此我們有必要把這個問題重視起來,從組織上根本上解決。
1 認識大報文問題
大報文問題,是指不同的系統通過網絡進行數據交互時payload size過大導致的系統可用性下降問題。
對於大報文的產生方,過大的報文在序列化時消耗更多內存和CPU,在傳輸時(JSF/MQ)可能超過中間件的大小限制導致傳輸失敗;對於大報文的消費方,過大的報文在反序列化時會產生大對象,消耗更多的內存和CPU,容易觸發FullGC甚至OOM,而在處理過程中要遍歷的內容更多,造成響應變慢,如果涉及數據庫操作容易產生大事務、慢SQL,這些容易觸發超時,如果客戶端有重試機制,會進一步加重大報文消費方負載,嚴重時導致服務集群整體不可用。
此外,由於大報文與小報文是在一個接口上完成的,使用相同的UMP key,它會導致監控失真,報警閾值無效。如果日誌記錄了原始報文,也可能磁盤打滿和響應變慢。
在京東物流技術體系內,具體表現為:
大報文場景 | 後果 |
---|---|
MQ的producer發送了大的Message | 由於JMQ對消息大小的限制,導致producer發送失敗:消息未送達 |
MQ consumer反序列化Message並處理計算時產生大對象,頻繁FullGC,CPU使用率飆升 | |
JSF Consumer調用API時傳入大入參值 | 由於JSF Server對payload大小限制,導致服務端將報文拋棄:無法送達 |
JSF Provider響應變慢,產生大對象,頻繁FullGC,CPU使用率飆升,甚至OOM;請求處理超時 | |
JSF Provider返回值包含大對象 | 由於JSF Consumer對payload大小限制,導致consumer無法獲取響應 |
JSF Consumer產生大對象,頻繁FullGC,CPU使用率飆升,甚至OOM |
📌 JMQ/JSF對payload大小的限制都屬於防禦性保護措施,目前的值是科學的,它們都已經足夠大了。在緊急止血情況下可以調整配置參數來暫時提高payload大小限制,但長期看它會加重系統的風險,應該從設計入手避免超過payload大小限制。
1.1 背景知識
1.1.1 JMQ限制
根據JMQ的官方文檔,單條消息大小:JMQ4不要超過4M,JMQ2不要超過2M。
具體原理是發送消息時在生產端做主動校驗,如果消息大小超過閾值則拋出異常(代碼實現與官方文檔不一致):
class ClusterManager {
protected volatile int maxSize = 4194304; // 4MB
}
class MessageProducer implement Producer { // Producer接口的具体实现类
ClusterManager clusterManager;
// producer.send时做校验
int checkMessages(List<Message> messages) {
int size = 0;
for (Message message : messages) {
size += message.getSize() // 压缩后的大小
}
if (size > this.clusterManager.getMaxSize()) {
throw new IllegalArgumentException("the total bytes of message body must be less than " + this.clusterManager.getMaxSize());
}
}
}
📌 經與JMQ團隊確認,JMQ消息大小的限制,以代碼實現為準(官方文檔不准確):
1.1.2 JSF限制
根據JSF官方文檔,JSF可以在server和consumer端分別設置payload size,默認都是8MB。
📌 需要注意,觸發provider報文長度限制時,JSF consumer(老版本)並不會立即失敗,而是依靠客戶端超時後才返回(感覺是JSF的缺陷)。具體原因:JSF依靠底層netty來實現報文長度限制,當provider從請求報文頭里取得本次請求payload size發現超過限定值時,不會繼續讀取報文體,而是拋出netty定義的TooLongFrameException,而該異常的處理依賴netty的ChannelHandler.exceptionCaught方法,JSF裡沒有對TooLongFrameException做處理(吃掉異常),provider端不給consumer任何響應(請求被扔進黑洞),因此造成consumer一直等待響應直到超時,而這可能把consumer端的業務線程池拖死。
class LengthFieldBasedFrameDecoder { // 基于netty io.netty.handler.codec.LengthFieldBasedFrameDecoder的改动
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
// 从JSF协议的报文头里获取本次请求的payload size,此时还没有读取8MB的body
long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
if (frameLength > maxFrameLength) { // maxFrameLength即8MB限制
throw new TooLongFrameException();
}
}
}
class ServerChannelHandler implements ChannelHandler {
public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) {
if (cause instanceof IOException) {
// ...
} else if (cause instanceof RpcException) {
// 这里可以看到遇到这种异常,JSF是如何给consumer端响应的
ResponseMessage responseMessage = new ResponseMessage(); // 给consumer的响应
responseMessage.getMsgHeader().setMsgType(Constants.RESPONSE_MSG);
String causeMsg = cause.getMessage();
String channelInfo = BaseServerHandler.getKey(ctx.channel());
String causeMsg2 = "Remote Error Channel:" + channelInfo + " cause: " + causeMsg;
((RpcException) cause).setErrorMsg(causeMsg2);
responseMessage.setException(cause); // 异常传递给consumer
// socket.write回consumer
ChannelFuture channelFuture = ctx.writeAndFlush(responseMessage);
} else {
// TooLongFrameException会走到这里,它的继承关系如下:
// TooLongFrameException -> DecoderException -> CodecException -> RuntimeException
// 异常被吃掉了,不给consumer响应
logger.warn("catch " + cause.getClass().getName() + " at {} : {}",
NetUtils.channelToString(channel.remoteAddress(), channel.localAddress()),
cause.getMessage());
}
}
}
📌 經與JSF團隊確認,consumer端或provider端發出的消息過大(超過playload)時consumer端得不到正確的異常響應只提示請求超時的問題,已經在1.7.5版本修復:需要provider端升級。升級後,如果consumer端發送的消息過大,provider會立即響應RpcException。
此外,在JSF舊版本下,consumer使用了默認的5秒超時,但consumer拋出超時異常總用時是48秒,這是為什麼?
這是因為consumer配置的timeout不包括序列化時間,這48秒是把8MB的報文序列化的耗時:
class JSFClientTransport {
// consumer同步调用provider
ResponseMessage send(BaseMessage msg, int timeout) {
MsgFuture<ResponseMessage> future = doSendAsyn(msg, timeout);
return future.get(timeout, TimeUnit.MILLISECONDS);
}
MsgFuture doSendAsyn(final BaseMessage msg, int timeout) {
final MsgFuture resultFuture = new MsgFuture(getChannel(), msg.getMsgHeader(), timeout);
Protocol protocol = ProtocolFactory.getProtocol(msg.getProtocolType(), msg.getMsgHeader().getCodecType());
byteBuf = protocol.encode(request, byteBuf); // 发送报文前的序列化
RequestMessage request = (RequestMessage) msg;
request.setMsg(byteBuf);
channel.writeAndFlush(request, channel.voidPromise()); // socket.write,异步IO
resultFuture.setSentTime(JSFContext.systemClock.now());
}
}
class MsgFuture implements java.util.concurrent.Future {
final long genTime = JSFContext.systemClock.now(); // new的时候就赋值了
volatile long sentTime;
// 抛出超时异常逻辑
ClientTimeoutException clientTimeoutException() {
Date now = new Date();
String errorMsg = "[JSF-22110]Waiting provider return response timeout . Start time: " + DateUtils.dateToMillisStr(new Date(genTime))
+ ", End time: " + DateUtils.dateToMillisStr(now)
+ ", Client elapsed: " + (sentTime - genTime) // 它包括:序列化时间,由于异步IO因此不包括socket.write时间
+ "ms, Server elapsed: " + (now.getTime() - sentTime);
return new ClientTimeoutException(errorMsg);
}
}
1.1.3 物流網關限制
物流網關在nginx層通過client_max_body_size做了5MB限制。這意味著,JSF限制了8MB,但通過物流網關對外開放成HTTP JSON API時,調用者實際的限制是5MB。
1.1.4 MySQL限制
max_allowed_packet,net_buffer_length等參數在底層控制TCP層的報文長度,京東物流體系內該值足夠大,研發不必關注。
研發需要關注的是字段長度的定義,主要是varchar的長度。 MySQL通過sql_mode參數控製字段超過長度後的行為是字段截斷還是中斷事務。對於京東物流業務執行鏈路比較長的場景來講,同一個字段可能多處保存,例如訂單行里的skuName,就會在OFC/WMS等系統保存,sku_name varchar長度的不一致,特殊場景下可能造成上下游交互出現問題。
1.1.5 其他限制
DUCC value 的長度默認限制為4W 字符。
UMP Key的限制128。
JMQ的businessId長度限制100,Producer在發送是默認超時2秒,Producer發送失敗默認重試2次。
JMQ消費者拋出異常會導致重試(進入retry-db),首次重試10分鐘,如果重試還不成功會越來越慢推送直至過期。過期時間:JMQ2為3天,JMQ4為30天。
JSF如果不配置consumer timeout,則使用默認值:5秒。
Zookeeper ZNode限制長度1MB。雖然可以通過jute.maxbuffer這個Java系統屬性修改,但強烈不建議。
原則上,所有依賴的中間件都要確認其限制約束,提升健壯性,避免邊界條件被觸發而產生出乎意料的錯誤。
1.2 產生原因
1.2.1 集合類字段無約束
導致京東物流線上事故的大報文問題中,絕大部分都屬於該類問題。而這又可以細分為兩種場景:
interface JsfAPI {
// 场景1:批量接口,对批量的大小无限制
void foo(List<Request> requests);
}
class Request {
// 场景2:对一个类内部的集合类字段大小无限制
// JMQ产生大报文,绝大部分属于该场景
List<Item> items;
}
當數據量增大時,報文也會增大,造成幾MB到幾十MB的報文傳輸,系統為了處理這樣大數據量的報文,必然會產生大對象,並且這種對象會一直處於內存中,在數據保存處理時,會造成內存不能釋放,可能觸發頻繁FullGC,CPU使用率飆升。同時,處理集合數據,往往會有數據遍歷過程,如果無並發則時間複雜度是O(N),大的數據集必然帶來更慢的響應速度,而consumer端不會根據payload大小動態設置超時時間,它可能導致consumer端超時,超時可能帶來多次重試,進而加重服務端壓力。
例如:無印良品訂單sku品類過多,比如一個出庫單包含2萬個sku的極端情況。
例如:WMS出庫發貨後向ECLP回傳信息,之前都是通過一個JMQ Topic: eclp_delivery進行回傳,一份消息包含了(訂單主檔,箱明細,包裹明細)3部分信息。後來中石化場景下,一個訂單的包裹明細數量非常多,導致ECLP處理報文時CPU飆升,同時MQ Listener與對外服務共享CPU,導致接單功能可用率降低。後來,從源頭入手把一個訂單按照明細進行分頁式拆分(之前是整單回傳,之後是按明細分頁回傳),同時把eclp_delivery這一個topic拆分成3個topic:(訂單,箱明細,包裹明細),解決了大報文問題。
1.2.2 大字段無約束
它指的是某一個字段(不是集合大小),由於沒加長度限制,在特定場景下傳入了遠超預期大小的數據而造成的故障。
ECLP的商品主數據有個下發商品的接口,有個字段skuName,接口沒有對該字段長度進行約束。系統一直平穩運行,直到有個商家下發了某一個商品,它的skuName達到了10KB(事後發現,商家是把該商品詳情頁的整個HTML通過skuName傳過來了),插入數據庫時超過了字段長度限制varchar(200),導致插入失敗,但由於沒有考慮到這種場景,返回了誤導的錯誤提示。展開來看,如果ECLP為skuName定義了MySQL Text類型字段,還會有更嚴重問題:ECLP接收下商品,下發給WMS,但WMS裡的skuName是varchar(200),這個問題就只能人工處理了,甚至與商家溝通。
WMS6.0為了考慮多場景全滿足,在出庫單預留了擴展字段,在接單時技術BP自行決定寫入哪個擴展字段。京喜BP下發出庫單時在訂單明細維度傳入了handOverSlip(交接單,其實是團單信息,裡面有多層明細嵌套),該字段其實是一個大JSON,單個長度10KB上下,接單環節沒問題。但組建集合單會把多個出庫單組建成一個集合單,共產生3000多個明細,僅handOverSlip就佔30MB,造成組建集合單後下發(JSF調用)揀貨時遇到了JSF 8MB限制問題,下發失敗,單據卡在那裡,現場生產無法繼續。
WMS6.0的用戶中心系統,為其他系統提供了發送咚咚通知的服務,具體實現是調用集團的咚咚發送接口:xxx生產系統-> 用戶中心-> 咚咚系統。鏈路上每一個環節都未對通知內容content字段長度做限制。一次xxx生產系統調用用戶中心傳入了超8MB的content字段,觸發了咚咚系統的JSF底層的報文限制,最終在用戶中心產生了ClientTimeoutException,它導致用戶中心的JSF業務線程池打滿;而由於用戶中心為所有業務生產系統服務,現場操作會依賴它,進而導致生產卡頓,現場多環節無法正常生產。
Amazon FBA的SP-API(Sell Partner API),對可能出現風險的字段都做了長度限制,例如:
String displayableOrderComment; // maxLength: 1000
String sellerSku; // maxLength: 50
String giftMessage; // maxLength: 512
String displayableComment; // maxLength: 250
1.2.3 查詢接口返回大量數據
ECLP主數據有個接口:導出所有warehouse list,調用方很多,訪問頻率不高,每次響應長度3MB。該接口在線上出現過多次事故(2019年)。這個接口顯然是不該存在的,但把它下線需要推動所有的調用方改動,這個週期很長阻力也很大。
最開始,直接查數據庫,出現事故後加入JimDB,再次出現事故後配置了JimDB的local cache,後又加入JSF限流等措施。
出現故障時,ECLP CPU飆升,導致服務超時,京東零售調用方配置的超時設置很短,這導致越來越多的請求打過來,加重了ECLP負擔。
1.2.4 導出問題
這個問題與【1.2.3 查詢接口返回大量數據】看上去類似,但有很大不同:一個同步調用,返回的數據量相對少,另一個異步執行,返回數據量巨大。
WMS6.0的報表都有導出的需求,例如導出最近3個月的明細數據。貼近商家的OFC(如ECLP),也有類似需求,商家要求導出明細數據。系統執行過程大致是:根據用戶指定的條件異步執行SQL,把數據庫返回的數據集寫入Excel,並存放到blob storage(指定TTL),用戶在規定時間(TTL)內根據storage key去blob storage下載,完成整個導出過程。
這裡的關鍵問題是如何查詢數據庫,而數據庫作為共享資源往往是整個系統的瓶頸(增加複本數量意味著成本上升),它變慢會拖垮整個系統。如何查詢數據庫,有8個可選項:
導出問題的本質,是大範圍table scan,很難設計精細的複合索引。 WMS6.0最初使用的是方案1,它會產生深分頁limit offset問題:越往後的頁面越慢,對數據庫的壓力越大。舉例:要導出100萬行記錄,每頁1萬,那麼到50萬記錄時,每次分頁查詢相當於數據庫要掃描50萬+行記錄後拋棄絕大部分並返回1萬行,這還要繼續執行50次,此外分頁組件還要額外執行count語句以計算總行數。
如果每頁是1千呢?因此,數據庫的壓力被放大了,可以簡單理解為“全表掃描”了【50 + 100(count計算)=150】次,遠不如不分頁(不分頁還要解決OOM問題)。目前,WMS6.0改用了方案8,根本上解決了數據庫慢查詢問題。思路是不再盲目靜態分頁,而是根據時間條件切分成多個SQL,分別查詢,保證每個SQL返回數據量不大從而避免慢SQL。例如,某個倉要導出最近3個月的出庫單數據,那麼把這1個date range拆分(explode)成N個date range,分別執行:
condition = DateRange(from = "2022-01-01 00:00:00", to = "2022-04-01 00:00:00") // 用户指定的时间范围:3个月
// sql = select * from ob_shipment_order where xxx and update_time between condition.from and condition.to
List<DateRange> chunks = explode(condition)
for (DateRange chunk : chunks) {
// 该chunk的时间范围已经变成了1天,甚至是1小时,具体值是根据SQL执行计划估算得来的:数据量越大则拆分越细
sql = select * from ob_shipment_order where xxx and update_time between chunk.from and chunk.to
mysql.query(sql)
}
1.2.5 payload約束不一致產生的問題
鏈路上經過不同的系統,不同系統對payload size的約束不同,也可能產生問題,因為決定是否可以正常處理的是最小的那個,但鏈路長時相關方可能不知道,在異步場景下這個問題尤為明顯。
例如,aws的API Gateway與Lambda對payload size有不同的約束,最終用戶必須知道限制最嚴格的那一個環節。
對於京東物流,JSF與JMQ的限制不同,理論上可能產生這樣的問題:JSF調用者發送8MB的請求,JSF提供者處理時採用同步轉異步機制,異步把該請求8MB發送MQ,它會導致MQ發送永遠無法成功,而JSF的調用方卻渾然不覺。
如果通過物流網關對外開放,網關nginx限制是5MB,而JSF是8MB,設計上沒問題(fail fast),但可能造成服務方承諾與調用者感知端到端的不一致。
JSF對provider(jsf:server)和consumer可以分別設置不同的報文大小限制,理論上也可能出現問題,但在京東物流尚未出現,可不必關注。
1.2.6 其他非入口場景
它發生在系統執行過程內部。典型場景是DAO層查詢數據庫返回大結果集,Redis大key問題等。這要根據具體中間件機制來識別,例如,MyBatis支持插件來識別DAO查詢出大結果集:
public class ListResultInterceptor implements org.apache.ibatis.plugin.Interceptor {
private static final int RESULTSET_SIZE_THRESHOLD = 10000;
@Override
public Object intercept(Invocation invocation) throws Throwable {
Object result = invocation.proceed();
if (result != null && result instanceof List) {
int resultSetSize = ((List) result).size();
if (resultSetSize > RESULTSET_SIZE_THRESHOLD) {
// 报警
}
}
return result;
}
}
2 設計原則
2.1 主動顯式強約束
即,主動防禦式自我保護,而不是依靠使用者的“自覺”:外部用戶不可信賴。
對於JSF,可以通過JSR303向API Consumer顯式傳遞約束,並且該約束可以通過框架對業務代碼無侵入地自動執行。對於MQ,由於生產者與消費者解耦,無法直接傳遞約束,只能靠主動監控、人工協調。
它的前提條件,是研發有能力去主動識別出大報文風險。
2.2 Fail Fast
如果有前端,那麼前端加約束,避免大報文傳遞給後端。
對於後端,鍊式的上下游關係中,上游要把好關。
這個原則並不是說下游不用關心大報文問題,恰恰相反,鏈路的每個環節都要關心,但Fail Fast可以降低整體的不必要的損耗成本,也可以緩解某個環節保護機制缺失帶來的人工介入和修數成本。
2.3 上下游對齊隱式約束
同一個業務字段在上下游傳遞時,字段長度約束要一致,否則可能會出現上游成功落庫下游無法落庫的情況。
2.4 大報文產生方負責拆分
解決大報文的根本思路是拆分報文:大-> 小。
對應MQ來講,應該是Producer負責拆分大報文為小報文。
對於JSF來講,有兩種情況:
- consumer產生的大報文:應該provider加約束,強迫consumer端分頁拆分請求。參考AJAX機制
典型場景:揀貨下架調用庫存預佔接口,一次性傳入1萬個sku
- provider產生的大報文:應該變成分頁返回結果
典型場景:一次性返回所有warehouse列表
📌 需要注意的是,拆分報文,會增加生產方和消費方的複雜度,尤其是消費方:冪等,集齊,(並發和異步調用時產生的)亂序,業務的原子性保證等。例如,一個出庫單明細行過多時,整單預佔庫存(大報文) -> 按訂單明細分頁預佔(小報文)。
揀貨下架按明細維度分頁調用庫存預佔接口場景下,如果訂單不允許缺量:整單預佔時,該訂單預佔庫存的原子性(要么全成功預佔,要么一個sku都不預佔)是由庫存系統(provider)保證的;而在按訂單明細維度分頁預佔時,原子性需要在揀貨系統(consumer)保證,即如果後面頁碼的預佔失敗則需要把前面頁碼的預佔釋放。這增加consumer端複雜度,但為了系統的性能和可用性,這是值得的。當然,也有另外一個可選方案,仍舊讓庫存保證原子性,但庫存接口需要增加類似(currentPage, totalPages)的參數,那樣就是庫存更複雜了。無論如何,都增加了整體複雜度。
3 具體辦法
3.1 報文分頁
適用場景:MQ,以及JSF返回大報文響應。
為了保持報文的完整性,也便於消費方實現冪等、集齊等邏輯,需要在報文裡額外增加分頁信息:currentPage/totalPages。
class Payload {
List<Item> items;
int currentPage, totalPages;
}
void sendPayload(Payload payload) {
int currentPage = 1;
int totalPages = payload.getItems().size() / batchSize;
Lists.partition(payload.getItems, batchSize).forEach(subItems -> {
Payload subPayload = new Payload(subItems)
subPayload.setPageInfo(currentPage, totalPages)
producer.send(subPayload)
currentPage++;
});
}
📌 在極端複雜場景下,也可以考慮分拆topic,但不推薦,因為它可能額外引入亂序問題。
📌 MQ報文編解碼除了目前的JSON外,也可以考慮Protobuf等更高效格式。例如京東零售訂單快照orderver就由xml升級到了PB。
3.2 報文轉存
適用場景:MQ/JSF。
這種方案,也被稱為Claim Check Pattern。
把大的明細List,按照固定batch size轉存到JFS/OSS/JimKV/S3等外部blob storage,在報文裡存放指針(blob地址)列表。
class BigPayload {
List<Item> items;
}
class SmallPayload {
List<String> itemBlobKeys;
}
void sendPayload(BigPayload bigPayload) {
SmallPayload smallPayload = new SmallPayload();
Lists.partition(bigPayload.getItems(), batchSize).forEach(subItems -> {
List<String> itemBlobKeys = blogStore.putObjects(subItems)
smallPayload.addItemBlobKeys(itemBlobKeys);
});
producer.send(JSON.encode(smallPayload);
}
目前上游系統(eclp、序列號、OMC等)、DTC、下游系統(各版本WMS)的信息傳遞使用了該辦法,共用一個JFS集群。
📌 Side effects:1)引入額外依賴,而且消費方被迫引入依賴2)需要Blob存儲的TTL機製或定期清理,否則加大存儲成本3)為消費方帶來了不確定性,從blob拿回的數據可能超大,在反序列化和處理過程中有OOM/FullGC等風險(雖然一些json庫提供了底層的基於詞法token的Streaming Parsing API,但如果要讀取全部內容仍然耗費大量內存)
3.3 報文截斷
適用場景:大字段。
在確定用戶體驗可以接受的情況下,上層進行字段內容截斷(truncate)。及早截斷,不要依賴下層數據庫的截斷機制。
3.4 分頁調用
適用場景:JSF。
兩種場景:一種是批量接口,即入參是集合,另一種是入參對象裡有集合字段。
class FooRequest {
@javax.validation.constraints.Size(min = 1, max = 200)
private List<Bar> barItems;
}
interface JsfAPI {
// 场景1:批量接口
void foo(@javax.validation.constraints.Size(min = 1, max = 200) List<FooRequest> requests)
// 场景2:请求对象里有集合字段
void bar(FooRequest request);
}
對於JSF Consumer,可以通過JSF異步調用,它相當於redis pipeline模式,也可以通過客戶端線程池並發調用方式實現分頁調用,二者耗時相同,推薦使用前者:1)代碼實現簡單2)節省了額外線程池成本。
int maxJsfRetries = 3; // JSF async下的自动重试只能应用层自己做了
int retried = 0;
do {
List<ResponseFuture<Result<ObLocatingResultDto>>> futures = new LinkedList();
Lists.partition(voList, batchSize).forEach(subVoList -> {
ObLocatingOrderDto dto = mapper.INSTANCE.toDTO(subVoList);
locatingAppService.outboundOrderLocate(dto); // async JSF call
ResponseFuture<Result<ObLocatingResultDto>> future = RpcContext.getContext().getFuture();
futures.add(future);
});
for (ResponseFuture<Result<ObLocatingResultDto>> future : futures) {
try {
Result<ObLocatingResultDto> result = future.get();
} catch (RpcException jsfException) {
retried++;
} catch (Throwable e) {
// 额外的业务逻辑:与JSF并发同步调用相同的处理逻辑
}
}
} while (retried <= maxJsfRetries);
📌 JSF異步調用時,jsf:consumer配置的retries無效,這是因為異步發送後如果出現網絡超時,只能由業務代碼通過future.get()才能拿到結果,JSF底層沒有機會進行自動重試。而同步調用時,JSF底層可以判斷出超時,它有機會根據配置進行自動重試。更多細節可以查看JSF的FailoverClient.doSendMsg方法。
3.5 MQ替代JSF
適用場景:單向通知類請求,相當於AsyncAPI。
大的報文往往意味著更長的處理時長,JSF同步調用下consumer必須同步等待provider端的返回,這會同時佔用consumer和provider雙方的線程池資源,極端情況下可能導致雙方線程池用盡。 JSF下可能耗盡線程池,進而拖死被強依賴的上游,產生雪崩效應;而MQ下,只會消費積壓。
異步交互,使得上游對下游響應時間的依賴轉換為吞吐率的依賴。 JMQ實現了消費者和生產者在時間和空間上的解耦,消息的消費者可以承受更大範圍的處理速度範圍。
3.6 總結
4 最佳實踐
4.1 單個接口與批量接口分離
根據sku編號查詢商品資料,往往伴隨著多個sku一起查詢的需求,如何設計接口?
有的這樣:
interface JsfAPI {
Result<SkuInfo> getSkuInfo(String sku);
Result<List<SkuInfo>> listSkuInfo(List<String> skus);
}
由於批量接口在技術上已經滿足了單個查詢的功能,有的團隊乾脆去掉了單個查詢接口,造成使用者查詢單個sku時:
Result<SkuInfo> result = jsfAPI.listSkuInfo(Lists.newArrayList("EMG1800752592"));
應該這樣:
interface JsfAPI {
Result<SkuInfo> getSkuInfo(String sku);
}
interface JsfBulkAPI {
Result<List<SkuInfo>> listSkuInfo(List<String> skus);
}
4.2 線程池隔離
JsfAPI與JsfBulkAPI把批量與單一接口進行分離後,可以分配到不同的線程池,盡可能互不干擾,這同理於Bulkhead Pattern。
單一接口 | 批量接口 |
---|---|
處理關鍵業務,SLA要求更高 | 風險高,性能差 |
JSF可以通過jsf:server定義線程池,並為jsf:provider分配不同的server。
4.3 大報文與小報文分離
如果大報文實在無法拆分(例如,上游團隊不配合),為了降低極端請求對絕大部分正常請求的影響,可以採用大小報文分離的辦法。
對於JMQ,為了防止某一個大報文的消費長耗時或異常導緻小報文的消費積壓,可以把大報文轉發到“慢隊列”進行消費。
此外,也要考慮如何緩解UMP監控失真問題。
4.4 JMQ設置合理的批量大小
該值決定了MessageListener.onMessage入參messages的size。
interface MessageListener {
void onMessage(List<Message> messages) throws Exception;
}
JMQ Consumer的ACK是以批為單位的,例如設置為10,則10條消息裡任意一條產生異常都會導致10條全部重新消費。大報文場景下,如果發現問題,可以把該值調整為1,避免大小報文相互影響。
大批量消費主要有兩個好處:1)壓縮效果好(JMQ在發現報文超過100B時就進行壓縮),TCP I/O性能高2)降低獲取消息的等待耗時,因為它相當於prefetch(具體原理是LinkedBlockingDeque的capacity,如果拉取的消息數超過它,則IO阻塞以防止拉取新消息)。同時它也有兩大負面效應:1)ACK以批為單位,一個錯誤導致整批錯誤,整批重試2)消息大小限制取決於整批所有消息大小,可能觸發大報文問題。
對於京東物流絕大部分業務系統來講,這點提升與繁重的業務處理來比不值一提,例如:I/O節省了5ms,但單個消息處理需要200ms(因為要通過接口查詢,處理,然後寫庫),反倒是side effect成為主要矛盾。因此,絕大部分場景下該值應該設置為1。如果業務邏輯類似於集齊:把N個消息拿下來,本地緩衝暫不處理,等滿足條件了再merge並一次性處理,那麼可以調整批量大小為非1。
JMQ Producer提供了批量發送方法:
interface Producer {
void send(List<Message> messages) throws JMQException;
}
我們的業務代碼也在使用,例如:
/**
* 发送分播结果消息
*/
public void send(List<CheckResultDto> checkResultDtos) {
List<Message> messageList = Lists.newArrayList();
for (CheckResultDto checkResultDto : checkResultDtos) {
String messageText = JmqMessage.createReportBody(checkResultDto.getUuid(), Lists.newArrayList(checkResultDto));
messageList.add(JmqMessage.create(topic, messageText, checkResultDto.getUuid(), checkResultDto.getWarehouseNo()));
}
producer.send(messageList);
}
這裡要注意,分批發送時,1)發送的超時(默認2s)作用於整批消息,而不是單個消息2)消息大小限制(4MB)作用於整批消息之和,因此批包含的消息越多越可能失敗。
4.5 避免大日誌
尤其是AOP/Interceptor/Filter等統一處理的代碼,因為對報文的打印往往需要先json序列化。
if (logger.isInfoEnabled()) {
log.info(JsonUtil.toJson(request); // CPU intensive and disk I/O intensive(虽然日志是顺序写)
}
如果確實要記錄,也可以考慮採樣率方式記錄大報文日誌。
4.6 顯式約束由嚴開始
開放API由於消費方多而且不確定性高,客觀上造成了“只有一次做對的機會”。
List size limit, property max length limit等,要在開放API的第一時間公佈出去。如果開始不約束,後期加約束可能遭遇大的阻力和溝通成本。此外,遵循從嚴開始的規律,為自己爭取主動:你把限制放開,沒人找你岔,反之則阻力大。例如:order.items max size limit由100變成200,你可以放心地做;但由200變成100,你要徵得現有使用者的全部確認。
例如,Amazon FBA的SP-API對集合的條數限制絕大部分是50。
5 治理機制
5.1 識別大報文場景
無論採用哪種大報文問題解決辦法,識別出大報文場景是前提。
技術上,可以通過JSF Filter分析報文長度,把尚未觸發8MB但有潛在風險的自動識別出來。但JMQ無相關機制,業務系統要自行實現相關攔截機制。
5.1.1 JSF自動識別
provider端自動識別即可。
@Slf4j
public final class PayloadSizeFilter extends AbstractFilter {
private static final int PAYLOAD_SIZE_THRESHOLD = 4 << 20; // 4MB = 8MB(JSF限制) * 50%
private static final int BATCH_SIZE_THRESHOLD = 1000;
@Override
public ResponseMessage invoke(RequestMessage requestMessage) {
if (!RpcContext.getContext().isProviderSide()) {
// 只在provider端检查大报文:它才是我们要保护的对象
return getNext().invoke(requestMessage);
}
// 自动识别潜在的大报文场景:针对报文大小
Integer payloadSize = requestMessage.getMsgHeader().getLength();
if (payloadSize != null && payloadSize > PAYLOAD_SIZE_THRESHOLD) {
// 这里使用最简单的日志把潜在大报文暴露出来,各团队可以做更细化的机制
// 由于logbook限制只有error level日志才能配置"关键字报警",这里使用log.error
// 如果不想自动报警,只是人工巡检,可以log.warn
String methodName = requestMessage.getMethodName();
String className = requestMessage.getClassName();
log.error("Suspected BIG payload: {}.{}, {}>{}", className, methodName, payloadSize, PAYLOAD_SIZE_THRESHOLD);
}
// 自动识别潜在的大报文场景:报文字节小,但仍会导致处理慢,例如 List<String> orderNos,如果发来1万个单号?
// 这里只能识别出入参是List的场景,对于字段类型是List的场景无效
Invocation invocation = requestMessage.getInvocationBody();
Class[] argClasses = invocation.getArgClasses();
Object[] args = invocation.getArgs();
for (int i = 0; i < argClasses.length; i++) {
Class argClass = argClasses[i];
if (Collection.class.isAssignableFrom(argClass)) {
// 入参类型是Collection
Collection collection = (Collection) args[i];
if (collection.size() > BATCH_SIZE_THRESHOLD) {
log.error("Too BIG Collection argument: {}>{}", collection.size(), BATCH_SIZE_THRESHOLD);
}
}
}
return getNext().invoke(requestMessage);
}
}
5.1.2 JMQ自動識別
在consumer端加自動識別,如果發現,協同producer方確認風險判斷是否需要改造。
public interface BigPayloadTrait extends MessageListener {
int THRESHOLD_BIG_PAYLOAD = 2 << 20; // 2MB = 4MB(JMQ限制) * 50%
default boolean suspectedBigPayload(List<Message> messages) {
for (Message message : messages) {
if (message.getSize() > THRESHOLD_BIG_PAYLOAD) {
return true;
}
}
return false;
}
}
5.2 有效的監控
人工識別會有遺漏場景,關注監控全局指標,尤其是分析一些跳點,可能補充發現大報文場景。
5.3 設計應急預案
有些大報文問題,可能暫時無法通過技術手段解決,例如,已經有商家接入的對外接口,開放時沒有對List size限制,加限制後需要商家配合修改做客戶端分頁,而商家不配合。這時候,可以採用大促期降級,限流,加開關,加強監控,設計應急預案,為此接口提供獨立的線程池來隔離正常請求等手段解決。
5.4 常態化的大報文搗亂演練
以第三方視角幫助識別出尚未識別的大報文場景,不要自己給自己搗亂。
5.5 團隊執行
推進大報文治理工作時,為了便於項目追踪管理,可以採用如下流程。
5.5.1 新的API和MQ
這裡也包括現有API/MQ上加字段場景。
設計和評審時,檢查:
-
字段長度,在上下游上長度對齊
-
JSF接口對List等集合類型加@Size顯式約束和校驗,對List性批量接口入參也加@Size
-
MQ Producer確保不發出大報文
5.5.2 現有系統治理
為所有JSF和MQ加入大報文預先監控機制(具體可參考【5.1 識別大報文場景】,根據是否改得動做相應的治理動作。
作者:京東物流高鵬
來源:京東雲開發者社區 自猿其說Tech
#萬字好文大報文問題實戰 #京東雲技術團隊 #京東雲開發者的個人空間 #科技資訊
You may also like
相关贴文:
近期文章
- 訂單狀態控制插件用於WooCommerce
- 客戶無法停止狂歡的WooCommerce插件! | ⭐5.0評級WooCommerce
- Curcy – WooCommerce多貨幣 – 貨幣切換器安裝和使用
- 如何將WooCommerce產品發佈到eBay和Etsy |教程2025
- 最好的WooCommerce產品搜索插件,以改善用戶體驗
- Shopify vs WooCommerce(WordPress) – 哪個更好?
- 免費在WordPress中添加產品品牌|在品牌下出售產品
- WooCommerce還是Shopify?初學者的全面比較|| 2025年最佳電子商務平台
- 2024年的12個最佳打印件WooCommerce插件