ByteToMessageDecoder问题的解决

在使用某RPC框架过程中,遇到两个因使用 ByteToMessageDecoder 不当而导致的两个小问题,如下列示例代码所示:

    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List\ list) throws Exception {
        byteBuf.markReaderIndex();
        int dataLength = byteBuf.readInt();
        if (byteBuf.readableBytes() < dataLength) {
            // 未校验最大长度,存在安全隐患
            byteBuf.resetReaderIndex();
            return;
        }
        int rpcMagicVal = byteBuf.readShort();
        if (rpcMagicVal != Rpc.MAGIC_VALUE) {
            // 此处直接close或抛异常后close将导致decode()被重复调用
            ctx.close();
        }
        // DO ACTUAL CODEC
    }

问题一 未对报文做长度校验,存在OOM隐患

如上面代码片段中:

        byteBuf.markReaderIndex();
        int dataLength = byteBuf.readInt();
        // byteBuf中可读信息不足,等待后续报文
        if (byteBuf.readableBytes() < dataLength) {
            // 未校验最大长度,存在安全隐患
            byteBuf.resetReaderIndex();
            return;
        }

当收到的报文长度标识位过长,则 ByteToMessageDecoder 继续将把后续报文存入 ByteBuf 中,当遇到恶意报文时将导致应用OOM。顺便提一下 ByteToMessageDecoder 中提供两种 ByteBuf 的叠加机制。

  1. MERGE_CUMULATOR 通过拷贝来实现叠加,不足则申请新 ByteBuf 再拷贝进去
  2. COMPOSITE_CUMULATOR 通过组合来实现叠加,直接使用 CompositeByteBuf不需要拷贝

默认的是使用MERGE_CUMULATOR,在累计报文大小到达 DIRECT_MEMORY_LIMIT的一半或是堆内存的一半时,应用就将OOM报错

问题二 关闭Channel前未释放ByteBuf,导致decode()被多次调用

如上面代码片段中:

        int rpcMagicVal = byteBuf.readShort();
        if (rpcMagicVal != Rpc.MAGIC\_VALUE) {
            // 此处直接close将导致decode()被重复调用
            ctx.close();
        }

或是如

        int rpcMagicVal = byteBuf.readShort();
        if (rpcMagicVal != Rpc.MAGIC\_VALUE) {
            // 此处直接抛异常并在别的Handler做channel.close()将导致decode()被重复调用
            throw new RuntimeException("INVALID MSG");
        }

当解析报文遇到不合法的报文时,上面代码选择关闭此 channel,并为对 byteBuf 进行释放,这将导致此 ByteToMessageDecoder 的实现类的 decode() 方法被多次调用。造成此问题的原因有两

多次调用原因一(直接在decode调用close)

Netty4 默认每个 Channel 上所有操作都归一个线程排队操作,所以在调用了 ctx.close() 并不会立即触发 channelInactive 等。线程将继续走完本次 decode() ,而我们看父类调用此 decode() 的地方 ByteToMessageDecoder.callDecode()

    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List\ out) {
        try {
            // 只要是可读就循环
            while (in.isReadable()) {
                int outSize = out.size();
                if (outSize > 0) {
                    fireChannelRead(ctx, out, outSize);
                    out.clear();
                    if (ctx.isRemoved()) {
                        //被Remove才跳出
                        break;
                    }
                    outSize = 0;
                }
                int oldInputLength = in.readableBytes();
                // 实际调用子类decode()方法处
                decodeRemovalReentryProtection(ctx, in, out);
                if (ctx.isRemoved()) {
                    //被Remove才跳出
                    break;
                }
                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                        // 未解码出东西且未移动读索引则跳出
                        break;
                    } else {
                        //否则继续
                        continue;
                    }
                }
                // ignore some code
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception cause) {
            throw new DecoderException(cause);
        }
    }

如上所示在我们未解码出东西放入 out 且移动了读索引 byteBuf.readShort() 情况下是满足循环条件的,因此不可避免地就对一条错误的报文多次调用decode()

多次调用原因二(在decode中throw Exception)

如上面代码所示,直接在其中抛出异常貌似能跳出 while (in.isReadable()) 的循环,为什么还是会调用 decode() 呢?

这是因为设计者为了防止 channel.close() 后而丢失暂存在 ByteToMessageDecoder 的信息,在 ByteToMessageDecoder.channelInactive() 中还进行了如下操作:

    void channelInputClosed(ChannelHandlerContext ctx, List\ out) throws Exception {
        // 若累计暂存的ByteBuf不为空,则继续调用上段代码循环解码
        if (cumulation != null) {
            callDecode(ctx, cumulation, out);
            decodeLast(ctx, cumulation, out);
        } else {
            // 方法内容如下,其实不进行任何操作
            decodeLast(ctx, Unpooled.EMPTY_BUFFER, out);
        }
    }
        protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List\ out) throws Exception {
        if (in.isReadable()) {
            decodeRemovalReentryProtection(ctx, in, out);
        }
    }

两种原因通用的解决方案

在调用 ctx.close() 或 抛出异常前将所给参数 byteBuf 变成不可读,如下:

        int rpcMagicVal = byteBuf.readShort();
        if (rpcMagicVal != Rpc.MAGIC_VALUE) {
            // 或byteBuf.readerIndex(byteBuf.writerIndex())均可
            byteBuf.skipBytes(byteBuf.readableBytes());
            ctx.close();
        }

此方案能解决第二种起因的道理是 父类的调用解码的入口方法 ByteToMessageDecoder.channelRead() finally 代码块中会对不为空但不可读的叠加 ByteBuf 做释放并置空。

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                // ignore some code
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Exception e) {
                throw new DecoderException(e);
            } finally {
                // 对不为空但不可读的叠加ByteBuf做释放并置空
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                }
                // ignore some code
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }
觉得不错不妨打赏一笔