在使用某RPC框架过程中,遇到两个因使用 ByteToMessageDecoder
不当而导致的两个小问题,如下列示例代码所示:
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List\
问题一 未对报文做长度校验,存在OOM隐患
如上面代码片段中:
byteBuf.markReaderIndex(); int dataLength = byteBuf.readInt(); // byteBuf中可读信息不足,等待后续报文 if (byteBuf.readableBytes() < dataLength) { // 未校验最大长度,存在安全隐患 byteBuf.resetReaderIndex(); return; }
当收到的报文长度标识位过长,则 ByteToMessageDecoder
继续将把后续报文存入 ByteBuf
中,当遇到恶意报文时将导致应用OOM。顺便提一下 ByteToMessageDecoder
中提供两种 ByteBuf
的叠加机制。
- MERGE_CUMULATOR 通过拷贝来实现叠加,不足则申请新
ByteBuf
再拷贝进去 - COMPOSITE_CUMULATOR 通过组合来实现叠加,直接使用
CompositeByteBuf
不需要拷贝
默认的是使用MERGE_CUMULATOR,在累计报文大小到达 DIRECT_MEMORY_LIMIT
的一半或是堆内存的一半时,应用就将OOM报错
问题二 关闭Channel前未释放ByteBuf,导致decode()被多次调用
如上面代码片段中:
int rpcMagicVal = byteBuf.readShort(); if (rpcMagicVal != Rpc.MAGIC\_VALUE) { // 此处直接close将导致decode()被重复调用 ctx.close(); }
或是如
int rpcMagicVal = byteBuf.readShort(); if (rpcMagicVal != Rpc.MAGIC\_VALUE) { // 此处直接抛异常并在别的Handler做channel.close()将导致decode()被重复调用 throw new RuntimeException("INVALID MSG"); }
当解析报文遇到不合法的报文时,上面代码选择关闭此 channel
,并为对 byteBuf
进行释放,这将导致此 ByteToMessageDecoder
的实现类的 decode() 方法被多次调用。造成此问题的原因有两
多次调用原因一(直接在decode调用close)
Netty4
默认每个 Channel
上所有操作都归一个线程排队操作,所以在调用了 ctx.close()
并不会立即触发 channelInactive
等。线程将继续走完本次 decode()
,而我们看父类调用此 decode()
的地方 ByteToMessageDecoder.callDecode()
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List\
如上所示在我们未解码出东西放入 out
且移动了读索引 byteBuf.readShort()
情况下是满足循环条件的,因此不可避免地就对一条错误的报文多次调用decode()
多次调用原因二(在decode中throw Exception)
如上面代码所示,直接在其中抛出异常貌似能跳出 while (in.isReadable())
的循环,为什么还是会调用 decode()
呢?
这是因为设计者为了防止 channel.close()
后而丢失暂存在 ByteToMessageDecoder
的信息,在 ByteToMessageDecoder.channelInactive()
中还进行了如下操作:
void channelInputClosed(ChannelHandlerContext ctx, List\
两种原因通用的解决方案
在调用 ctx.close()
或 抛出异常前将所给参数 byteBuf
变成不可读,如下:
int rpcMagicVal = byteBuf.readShort(); if (rpcMagicVal != Rpc.MAGIC_VALUE) { // 或byteBuf.readerIndex(byteBuf.writerIndex())均可 byteBuf.skipBytes(byteBuf.readableBytes()); ctx.close(); }
此方案能解决第二种起因的道理是 父类的调用解码的入口方法 ByteToMessageDecoder.channelRead()
finally 代码块中会对不为空但不可读的叠加 ByteBuf
做释放并置空。
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { CodecOutputList out = CodecOutputList.newInstance(); try { // ignore some code callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally { // 对不为空但不可读的叠加ByteBuf做释放并置空 if (cumulation != null && !cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } // ignore some code } } else { ctx.fireChannelRead(msg); } }