怎么减轻,拆包实施方案

图片 1image

Netty是眼下产业界最盛行的NIO框架之一,它的健壮性、高品质、可定制和可扩充性在同类框架中都以卓越。它早就收获了广大的商业项目标认证,比方Hadoop的RPC框架Avro就采纳了Netty作为底层通讯框架,其余的产业界主流RPC框架,举例:Dubbo、Google 开源的gRPC、微博博客园开源的Motan、Twitter 开源的finagle也采取Netty来构建高质量的异步通讯技能。别的,Alibaba开源的音讯中间件RocketMQ也采纳Netty作为底层通讯框架。

记得近些日子我们生育上的多少个网关出现了故障。

TCP黏包/拆包

TCP是三个“流”公约,所谓流,正是从未界限的一长串二进制数据。TCP作为传输层协议并不不打听上层业务数据的现实意思,它会基于TCP缓冲区的实际上意况进行数据包的剪切,所以在工作上认为是四个全体的包,可能会被TCP拆分成八个包实行发送,也会有希望把多个小的包封装成一个大的数码包发送,那正是所谓的TCP粘包和拆包难点。

本条网关逻辑特别简单,正是收到客户端的伸手然后剖析报文最后发送短信。

粘包难题的减轻政策

由于底层的TCP无法清楚上层的事体数据,所以在底层是无力回天保险数据包不被拆分和构成的,这些标题只好透过上层的行使契约栈设计来消除。业界的主流合同的减轻方案,能够归结如下:

  1. 消息定长,报文大小固定长度,举个例子每一种报文的尺寸固定为200字节,假设缺乏空位补空格;
  2. 包尾增加特殊分隔符,比方每条报文结束都抬高回车换行符(比如FTP公约)可能钦点特殊字符作为报文分隔符,接收方通过非正规分隔符切分报文区分;
  3. 将音信分为信息头和新闻体,新闻头中富含表示消息的总局长度(或然音信体长度)的字段;
  4. 更复杂的自定义应用层协议。

但以此伏乞并不是分布的 HTTP ,而是利用 Netty 自定义的磋商。

Netty粘包和拆包实施方案

Netty提供了几个解码器,可以扩充裕包的操作,分别是:

  • LineBasedFrameDecoder
  • DelimiterBasedFrameDecoder(加多特种分隔符报文来分包)
  • FixedLengthFrameDecoder(使用定长的报文来含有)
  • LengthFieldBasedFrameDecoder

有个前提是:网关是内需读取一段完整的报文技能扩充末端的逻辑。

LineBasedFrameDecoder解码器

LineBasedFrameDecoder是回车换行解码器,倘若客户发送的消息以回车换行符作为音讯结束的标记,则足以一向动用Netty的LineBasedFrameDecoder对音讯进行解码,只必要在初叶化Netty服务端恐怕客户端时将LineBasedFrameDecoder准确的充裕到ChannelPipeline中就可以,无需团结再也完毕一套换行解码器。

标题是有天陡然开掘网关分析报文出错,查看了客商端的出殡和埋葬日志也没开掘难点,最终通过日记开采收到了大多不完全的报文,有个别还多了。

Netty依赖

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.9.Final</version>
        </dependency>

于是乎想会不会是 TCP 拆、粘包带来的主题素材,最终选择 Netty 自带的拆包工具化解了该难题。

1.1 Server端

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 *
 * @author Ricky
 *
 */
public class LineBasedServer {
    private Logger logger = LoggerFactory.getLogger(getClass());

    public void bind(int port) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 1024)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {

                    ChannelPipeline p = ch.pipeline();
                    p.addLast(new LineBasedFrameDecoder(1024));
                    p.addLast(new StringDecoder());
                    p.addLast(new StringEncoder());

                    p.addLast(new LineServerHandler());
                }
            });

            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)

            logger.info("server bind port:{}", port);

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();

        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {

        new LineBasedServer().bind(Constants.PORT);
    }
}

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LineServerHandler extends ChannelInboundHandlerAdapter {
    private Logger logger = LoggerFactory.getLogger(getClass());

    private int count = 0;
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {

        count++;
        String body = (String) msg;
        logger.info("server read msg:{}, count:{}", body, count);

        String response = "hello from server"+System.getProperty("line.separator");
        ctx.writeAndFlush(response);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        logger.error("server caught exception", cause);
        ctx.close();
    }

}

那便有了此文。

1.2 Client

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class LineBasedClient {

    public void connect(String host, int port) throws InterruptedException {

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {

                    ChannelPipeline p = ch.pipeline();
                    p.addLast(new LineBasedFrameDecoder(1024));
                    p.addLast(new StringDecoder());
                    p.addLast(new StringEncoder());

                    p.addLast(new LineClientHandler());
                }
            });

            ChannelFuture future = b.connect(Constants.HOST, Constants.PORT).sync();

            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {

        new LineBasedClient().connect(Constants.HOST, Constants.PORT);
    }
}

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LineClientHandler extends ChannelInboundHandlerAdapter {
    private Logger logger = LoggerFactory.getLogger(getClass());

    private int count =0;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // Send the message to Server
       for(int i=0; i<100; i++){

           String msg = "hello from client "+i;
           logger.info("client send message:{}", msg);

           ctx.writeAndFlush(msg+System.getProperty("line.separator"));
       }

    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        String body = (String) msg;
        count++;
        logger.info("client read msg:{}, count:{}", body, count);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        logger.error("client caught exception", cause);
        ctx.close();
    }
}

TCP 协议

标题固然缓和了,但要么得思量缘由,为何会这么?打破砂锅问到底才是三个可信赖的程序猿。

那就得从 TCP 那几个合同谈到了。

TCP 是二个面向字节流的商业事务,它是性质是流式的,所以它并不曾分支。就疑似水流同样,你没有办法知道怎么着时候初步,曾几何时结束。

之所以她会依赖当前的套接字缓冲区的意况展开拆包或是粘包。

下图展现了贰个 TCP 合同传输的进度:

图片 2image

出殡端的字节流都会先传出缓冲区,再经过互连网传播到接收端的缓冲区中,最后由接收端获取。

当大家发送多少个完整包到接收端的时候:

图片 3image

好端端情状会接到到多个总体的报文。

但也会有以下的情状:

图片 4image

收到到的是二个报文,它是由发送的多个报文组成的,那样对于应用程序来讲就很难管理了。

图片 5image

再有一点都不小可能出现下边这样的即使接受了七个包,可是在那之中的内容却是相互富含,对于使用来说依然力不从心解析。

对于如此的难题只好透过上层的行使来解决,常见的方法有:

  • 在报文末尾扩大换行符表Bellamy(Bellamy)条完整的消息,那样在接收端能够依附这一个换行符来决断新闻是或不是完好。
  • 将音信分为音信头、新闻体。能够在音讯头中注脚新闻的尺寸,根据这几个长度来收获报文(例如808 协议)。
  • 规定好报文长度,不足的空位补齐,取的时候根据长度截取就可以。

以上的那几个艺术大家在 Netty 的 pipline 中里投入相应的解码器都能够手动完毕。

但骨子里 Netty 已经帮大家搞好了,完全能够开箱即用。

比如:

  • LineBasedFrameDecoder 能够遵照换行符消除。
  • DelimiterBasedFrameDecoder可根据分隔符解决。
  • FixedLengthFrameDecoder可钦赐长度消除。

上面来效仿一下最简易的字符串传输。

大概在事先的

拓表自己要作为楷模坚守规则。

在 Netty 客商端中加了贰个入口能够循环境与发展送 100 条字符串报文到接收端:

 /** * 向服务端发消息 字符串 * @param stringReqVO * @return */ @ApiOperation("客户端发送消息,字符串") @RequestMapping(value = "sendStringMsg", method = RequestMethod.POST) @ResponseBody public BaseResponse<NULLBody> sendStringMsg(@RequestBody StringReqVO stringReqVO){ BaseResponse<NULLBody> res = new BaseResponse(); for (int i = 0; i < 100; i++) { heartbeatClient.sendStringMsg(stringReqVO.getMsg ; } // 利用 actuator 来自增 counterService.increment(Constants.COUNTER_CLIENT_PUSH_COUNT); SendMsgResVO sendMsgResVO = new SendMsgResVO() ; sendMsgResVO.setMsg ; res.setCode(StatusEnum.SUCCESS.getCode ; res.setMessage(StatusEnum.SUCCESS.getMessage ; return res ; } /** * 发送消息字符串 * * @param msg */ public void sendStringMsg(String msg) { ByteBuf message = Unpooled.buffer(msg.getBytes ; message.writeBytes(msg.getBytes ; ChannelFuture future = channel.writeAndFlush; future.addListener((ChannelFutureListener) channelFuture -> LOGGER.info("客户端手动发消息成功={}", msg)); }

服务端直接打字与印刷就可以:

 @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { LOGGER.info("收到msg={}", msg); }

附带提一下,这里加的有一个字符串的解码器:.addLast(new StringDecoder 其实正是把音信解析为字符串。

 @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { out.add(msg.toString; }

在 Swagger 中调用了客商端的接口用于给服务端发送了 100 次音讯:

图片 6image

健康状态下接收端应该打字与印刷 100 次 hello 才对,可是查看日志会开掘:

图片 7image

摄取的开始和结果有全部的、多的、少的、拼接的;那也就对应了下面提到的拆包、粘包。

该怎么化解吧?那便可应用之前提到的 LineBasedFrameDecoder 利用换行符消除。

DelimiterBasedFrameDecoder解码器

DelimiterBasedFrameDecoder是相隔符解码器,顾客能够内定新闻甘休的分隔符,它能够自动完毕以分隔符作为码流结束标志的音讯的解码。回车换行解码器实际上是一种独特的DelimiterBasedFrameDecoder解码器。

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 
 * @author Ricky
 *
 */
public class DelimiterServer {
    private Logger logger = LoggerFactory.getLogger(getClass());


    public void bind(int port) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 1024)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {

                    ChannelPipeline p = ch.pipeline();
                    p.addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(Constants.DELIMITER.getBytes())));
                    p.addLast(new StringDecoder());
                    p.addLast(new StringEncoder());

                    p.addLast(new DelimiterServerHandler());
                }
            });

            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)

            logger.info("server bind port:{}", port);

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();

        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {

        new DelimiterServer().bind(Constants.PORT);
    }
}

Client:

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class DelimiterClient {

    public void connect(String host, int port) throws InterruptedException {

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {

                    ChannelPipeline p = ch.pipeline();
                    p.addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(Constants.DELIMITER.getBytes())));
                    p.addLast(new StringDecoder());
                    p.addLast(new StringEncoder());

                    p.addLast(new DelimiterClientHandler());
                }
            });

            ChannelFuture future = b.connect(Constants.HOST, Constants.PORT).sync();

            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {

        new DelimiterClient().connect(Constants.HOST, Constants.PORT);
    }
}

使用 LineBasedFrameDecoder 化解难点

LineBasedFrameDecoder 解码器使用特简单,只供给在 pipline 链条上加上就能够。

//字符串解析,换行防拆包.addLast(new LineBasedFrameDecoder.addLast(new StringDecoder

构造函数中盛传了 1024 是指报的长度最大不超越那个值,具体能够看下文的源码解析。

接下来大家再进行三次测量检验看看结果:

在意,由于 LineBasedFrameDecoder 解码器是透过换行符来推断的,所以在出殡和埋葬时,一条完整的音信供给加多 n

图片 8image

最后的结果:

图片 9image

全面察看日志,发掘确实未有一条被拆、粘包。

FixedLengthFrameDecoder解码器

FixedLengthFrameDecoder是原则性长度解码器,它能够依照钦命的长短对消息实行自动解码,开辟者没有供给思考TCP的粘包/拆包等主题材料,特别实用。

对于定长音讯,要是新闻其实尺寸小于定长,则一再会进展补位操作,它在明确水平上产生了半空淑节财富的浪费。不过它的独到之处也是不行猛烈的,编解码相比较轻易,由此在实际项目中照旧有必然的运用场景。

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author Ricky Fung
 */
public class NettyServer {
    private Logger logger = LoggerFactory.getLogger(getClass());

    public void bind(int port) throws InterruptedException {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //配置服务器启动类
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))//配置日志输出
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch)
                                throws Exception {
                            ch.pipeline().addLast(new FixedLengthFrameDecoder(1<<5));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new StringEncoder());

                            ch.pipeline().addLast(new ServerHandler());
                        }
                    });

            ChannelFuture f = b.bind(port).sync();
            //等待服务器退出
            f.channel().closeFuture().sync();
        } finally {
            //释放线程资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ServerHandler extends ChannelInboundHandlerAdapter {
        private int counter = 0;

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            super.channelActive(ctx);
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

            logger.info("接收客户端msg:{}", msg);

            ByteBuf echo = Unpooled.copiedBuffer(String.format("Hello from server:", counter).getBytes());
            ctx.writeAndFlush(echo);
        }
    }

    public static void main(String[] args) throws InterruptedException {

        new NettyServer().bind(Constants.PORT);
    }
}

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author Ricky Fung
 */
public class NettyClient {

    private Logger logger = LoggerFactory.getLogger(getClass());

    public void connect(String host, int port) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            ch.pipeline().addLast(new FixedLengthFrameDecoder(1<<5));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new StringEncoder());

                            ch.pipeline().addLast(new ClientHandler());
                        }
                    });

            ChannelFuture future = b.connect(host, port).sync();

            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    private class ClientHandler extends ChannelInboundHandlerAdapter {

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {

            for(int i=0; i<100; i++){

                String msg = "hello from client "+i;
                logger.info("client send message:{}", msg);

                ctx.writeAndFlush(msg);
            }
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {

            logger.info("接收服务端msg:{}", msg);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            cause.printStackTrace();
            ctx.close();
        }

    }

    public static void main(String[] args) throws InterruptedException {

        new NettyClient().connect(Constants.HOST, Constants.PORT);
    }
}

LineBasedFrameDecoder 的原理

指标到达了,来拜望它的达成原理:

图片 10image

  1. 首先步关键正是 findEndOfLine 方法去找到当前报文中是不是存在分隔符,存在就能够回到分隔符所在的职位。
  2. 判定是不是要求放任,默以为 false ,第一回走这些逻辑(下文种推断是还是不是要求改为 true)。
  3. 只要报文中存在换行符,就能够将数据截取到丰富地点。
  4. 例如不设有换行符(有相当的大希望是拆包、粘包),就看脚下报文的长短是或不是超越预设的长度。大于则供给缓存那么些报文长度,并将 discarding 设为 true。
  5. 一旦是需求丢掉时,推断是或不是找到了换行符,存在则供给甩掉掉此前记录的尺寸然后截取数据。
  6. 假若未有找到换行符,则将事先缓存的报文长度进行增多,用于下一次扬弃。

从这一个逻辑中能够看看正是探究报文中是不是富含换行符,并开展对应的截取。

是因为是通过缓冲区读取的,所以即使此次未有换行符的数额,只要下一遍的报文存在换行符,上一轮的数据也不会丢。

地点提到的实际便是在解码中展开操作,咱们也能够自定义本人的拆、粘包工具。

编解码的严重性指标便是为了能够编码成字节流用于在网络中传输、悠久化存款和储蓄。

Java 中也得以兑现 Serializable 接口来兑现种类化,但由于它质量等原因在一部分 RPC 调用中用的少之甚少。

Google Protocol 则是一个迅速的体系化框架,下边来演示在 Netty 中什么行使。

LengthFieldBasedFrameDecoder解码器

非常多的交涉(私有可能国有),协议头中会指点长度字段,用于标识音信体或然整包音讯的尺寸,比如SMPP、HTTP左券等。由于基于长度解码须求的通用性,以及为了缩小顾客的情商开拓难度,Netty提供了LengthFieldBasedFrameDecoder,自动屏蔽TCP底层的拆包和粘包难点,只供给传入正确的参数,就能够轻易化解“读半包“难题。

Message.java

import java.nio.charset.Charset;

/**
 * @author Ricky Fung
 */
public class Message {

    private final Charset charset = Charset.forName("utf-8");

    private byte magicType;
    private byte type;//消息类型  0xAF 表示心跳包    0xBF 表示超时包  0xCF 业务信息包
    private long requestId; //请求id
    private int length;
    private String body;

    public Message(){

    }

    public Message(byte magicType, byte type, long requestId, byte[] data) {
        this.magicType = magicType;
        this.type = type;
        this.requestId = requestId;
        this.length = data.length;
        this.body = new String(data, charset);
    }

    public Message(byte magicType, byte type, long requestId, String body) {
        this.magicType = magicType;
        this.type = type;
        this.requestId = requestId;
        this.length = body.getBytes(charset).length;
        this.body = body;
    }
    ...setter/getter
}

MessageDecoder.java

import com.mindflow.netty4.unpack.model.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author Ricky Fung
 */
public class MessageDecoder extends LengthFieldBasedFrameDecoder {
    private Logger logger = LoggerFactory.getLogger(getClass());

    //头部信息的大小应该是 byte+byte+int = 1+1+8+4 = 14
    private static final int HEADER_SIZE = 14;

    public MessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        if (in == null) {
            return null;
        }

        if (in.readableBytes() <= HEADER_SIZE) {
            return null;
        }

        in.markReaderIndex();

        byte magic = in.readByte();
        byte type = in.readByte();
        long requestId = in.readLong();
        int dataLength = in.readInt();

        // FIXME 如果dataLength过大,可能导致问题
        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex();
            return null;
        }

        byte[] data = new byte[dataLength];
        in.readBytes(data);

        String body = new String(data, "UTF-8");
        Message msg = new Message(magic, type, requestId, body);
        return msg;
    }
}

MessageEncoder.java

import com.mindflow.netty4.unpack.model.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.nio.charset.Charset;

/**
 * @author Ricky Fung
 */
public class MessageEncoder extends MessageToByteEncoder<Message> {
    private final Charset charset = Charset.forName("utf-8");

    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {

        //
        out.writeByte(msg.getMagicType());
        out.writeByte(msg.getType());
        out.writeLong(msg.getRequestId());

        byte[] data = msg.getBody().getBytes(charset);
        out.writeInt(data.length);
        out.writeBytes(data);
    }
}

服务端:

import com.mindflow.netty4.unpack.model.Message;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author Ricky Fung
 */
public class NettyServer {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    public void bind(int port) throws InterruptedException {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {

                            ChannelPipeline p = ch.pipeline();
                            p.addLast(new MessageDecoder(1<<20, 10, 4));
                            p.addLast(new MessageEncoder());
                            p.addLast(new ServerHandler());
                        }
                    });

            // Bind and start to accept incoming connections.
            ChannelFuture future = b.bind(port).sync(); // (7)

            logger.info("server bind port:{}", port);

            // Wait until the server socket is closed.
            future.channel().closeFuture().sync();

        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ServerHandler extends SimpleChannelInboundHandler<Message> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {

            logger.info("server read msg:{}", msg);

            Message resp = new Message(msg.getMagicType(), msg.getType(), msg.getRequestId(), "Hello world from server");
            ctx.writeAndFlush(resp);
        }
    }

    public static void main(String[] args) throws Exception {

        new NettyServer().bind(Constants.PORT);
    }
}

客户端:

import com.mindflow.netty4.unpack.model.Message;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

/**
 * @author Ricky Fung
 */
public class NettyClient {

    public void connect(String host, int port) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            ChannelPipeline p = ch.pipeline();
                            p.addLast(new MessageDecoder(1<<20, 10, 4));
                            p.addLast(new MessageEncoder());

                            p.addLast(new ClientHandler());
                        }
                    });

            ChannelFuture future = b.connect(host, port).sync();

            future.awaitUninterruptibly(2000, TimeUnit.MILLISECONDS);
            if(future.channel().isActive()){

                for(int i=0; i<100; i++) {

                    String body = "Hello world from client:"+ i;
                    Message msg = new Message((byte) 0XAF, (byte) 0XBF, i, body);

                    future.channel().writeAndFlush(msg);
                }
            }

            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    private class ClientHandler extends ChannelInboundHandlerAdapter {
        private Logger logger = LoggerFactory.getLogger(getClass());

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {

            logger.info("client read msg:{}, ", msg);

        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            logger.error("client caught exception", cause);
            ctx.close();
        }
    }

    public static void main(String[] args) throws Exception {

        new NettyClient().connect(Constants.HOST, Constants.PORT);
    }
}

安装

率先第一步自然是安装:

在官方网址下载对应的包。

本地配置遇到变量:

图片 11image

当执行 protoc --version 现身以下结果申明安装成功:

图片 12image

参谋资料

Netty API Reference

Netty连串之Netty编解码框架分析

概念自个儿的说道格式

随着是急需服从合法供给的语法定义自个儿的研商格式。

例如小编那边须求定义二个输入输出的报文格式:

BaseRequestProto.proto:

syntax = "proto2";package protocol;option java_package = "com.crossoverjie.netty.action.protocol";option java_outer_classname = "BaseRequestProto";message RequestProtocol { required int32 requestId = 2; required string reqMsg = 1; }

BaseResponseProto.proto:

syntax = "proto2";package protocol;option java_package = "com.crossoverjie.netty.action.protocol";option java_outer_classname = "BaseResponseProto";message ResponseProtocol { required int32 responseId = 2; required string resMsg = 1; }

再通过

protoc --java_out=/dev BaseRequestProto.proto BaseResponseProto.proto

protoc 命令将刚刚定义的说Doug式调换为 Java 代码,并扭转在 /dev 目录。

只须要将转移的代码拷贝到我们的品种中,同一时候引入信赖:

<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.4.0</version></dependency>

运用 Protocol 的编解码也特别轻松:

public class ProtocolUtil { public static void main(String[] args) throws InvalidProtocolBufferException { BaseRequestProto.RequestProtocol protocol = BaseRequestProto.RequestProtocol.newBuilder() .setRequestId .setReqMsg .build(); byte[] encode = encode; BaseRequestProto.RequestProtocol parseFrom = decode; System.out.println(protocol.toString; System.out.println(protocol.toString().equals(parseFrom.toString; } /** * 编码 * @param protocol * @return */ public static byte[] encode(BaseRequestProto.RequestProtocol protocol){ return protocol.toByteArray() ; } /** * 解码 * @param bytes * @return * @throws InvalidProtocolBufferException */ public static BaseRequestProto.RequestProtocol decode(byte[] bytes) throws InvalidProtocolBufferException { return BaseRequestProto.RequestProtocol.parseFrom; }}

利用 BaseRequestProto 来做三个示范,先编码再解码最后相比较最后的结果是不是同样。答案断定是同样的。

接纳 protoc 命令生成的 Java 文件里曾经帮大家把编解码全部都打包好了,只需求简单调用就行了。

能够见到 Protocol 创立对象使用的是营造者格局,对使用者来讲清晰易读,更加的多关于创设器的剧情能够参照这里。

越多关于 Google Protocol 内容请查看官方开采文书档案。

源码下载

https://github.com/TiFG/netty4-in-action/tree/master/netty4-unpack-demo

结合 Netty

Netty 已经自带了对 Google protobuf 的编解码器,也是只要求在 pipline 中加上就可以。

server 端:

// google Protobuf 编解码.addLast(new ProtobufDecoder(BaseRequestProto.RequestProtocol.getDefaultInstance.addLast(new ProtobufEncoder

客户端:

// google Protobuf 编解码.addLast(new ProtobufDecoder(BaseResponseProto.ResponseProtocol.getDefaultInstance.addLast(new ProtobufEncoder

些微注意的是,在构建 ProtobufDecoder 时索要显式内定解码器必要解码成什么样品种。

本人那边服务端接收的是 BaseRequestProto,顾客端收到的是服务端响应的 BaseResponseProto 所以就安装了相应的实例。

同等的提供了一个接口向服务端发送新闻,当服务端收到了二个特别指令时也会向客商端重返内容:

 @Override protected void channelRead0(ChannelHandlerContext ctx, BaseRequestProto.RequestProtocol msg) throws Exception { LOGGER.info("收到msg={}", msg.getReqMsg; if (999 == msg.getRequestId{ BaseResponseProto.ResponseProtocol responseProtocol = BaseResponseProto.ResponseProtocol.newBuilder() .setResponseId .setResMsg .build(); ctx.writeAndFlush(responseProtocol) ; } }

在 swagger 中调用相关接口:

图片 13image

在日记能够看看服务端收到了新闻,同偶尔候客商端也接受了回来:

图片 14image图片 15image

尽管 Netty 封装了 Google Protobuf 相关的编解码工具,其实查看它的编码工具就能意识也是接纳上文提到的 api 落成的。

图片 16image

Protocol 拆、粘包

Google Protocol 的选取确实很简单,但照旧有值的让人瞩指标地点,举例它依旧会有拆、粘包问题。

无妨模拟一下:

图片 17image

三番五次发送 100 次音讯看服务端收到的什么样:

图片 18image

会发掘服务端在解码的时候报错,其实正是被拆、粘包了。

这一点 Netty 自然也挂念到了,所以已经提供了相关的工具。

//拆包解码.addLast(new ProtobufVarint32FrameDecoder.addLast(new ProtobufVarint32LengthFieldPrepender

只须要在服务端和客商端加上那三个编解码工具就可以,再来发送九十六遍尝试。

查阅日志发掘并未有出现贰遍特别,100 条消息全体都接到到了。

图片 19image

其一编解码工具得以总结领悟为是在音信体中加了三个 31位长度的整形字段,用于表明当前新闻长度。

互连网那块同样是计算机的功底,由于这几天在做连锁的工作之所以接触的可比多,也终归给大学补课了。

末端会随之更新 Netty 相关的内容,最终会现出叁个高质量的 HTTP 以及 RPC 框架,敬请期望。

上文相关的代码:

近来在总计一些 Java 相关的知识点,感兴趣的心上人能够共同尊崇。

地址:

接待关怀大伙儿号一齐调换:

图片 20image

本文由365bet体育在线官网发布于网络编程,转载请注明出处:怎么减轻,拆包实施方案

TAG标签:
Ctrl+D 将本页面保存为书签,全面了解最新资讯,方便快捷。