在进行网络传输的过程之中,如何可以快速准确的传输所需要的数据内容,成为了所有网络开技术所面临的的实际问题,而在Netty里面,考虑到了这样的实际应用,所以其可以直接采用各种序列化的形式方便的进行数据传输。
现在利用序列化的概念,可以保证服务器端接收到了所有的二进制数据之后,可以依据传输的数据内容自动进行对象的反序列化处理。这样在业务中心调用的过程之中,只需要考虑对象的传输即可。
如果想要就那些进行序列化的操作,就应该考虑几个问题:
1、如何可以保证服务器端可以实现所有TCP客户端对应数据的传输要求
|- Java直接拥有序列化对象的处理能力,但这样的操作只能在java上使用,它的性能够好。
2、需要保证服务器端和客户端之间最快速简洁的数据传输
3、需要保证服务器端可以接收到的数据属于广大语言都支持的一种数据传输格式,如:XML、JSON
在Netty开发框架里面为了方便开发者进行更加方便的网络服务器的定义,提供有了方便的序列化支持能力,对于所有的数据传输都可以利用Netty实现自动的序列化与反序列化操作,常见的几种序列化工具:JDK原生序列化、MessagePack、Marshaling、JSON处理,如果要在性能和简洁上找平衡,那么基本使用JSON。
JDK序列化支持
不管是序列化还是反序列化,实际上客户端与服务器端之间都应该有一标准的对象存在,所以应该在公共包之中创建一个新类,该类一定要实现Serializable接口
客户端
public class Member implements Serializable {
private String name ;
private Integer age;
private Double salary;
//getter、setter、toString方法一定要写上,这里为了节约空间省略
}
修改客户端Handler类
//客户端发送500条数据
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int x = 0; x < REPEAT; x++) {
Member member = new Member();
member.setName("netty - " + x + "号");
member.setAge(18);
member.setSalary(9999.99);
ctx.writeAndFlush(member);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取返回内容
Member content = (Member) msg;
System.err.println(content);
}
由于在进行数据接收的时候直接实现了对象数据的接收处理,所以必须修改Client程序类,在处理链上追加新的处理操作
cliectBootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader()))); //对象解码器
ch.pipeline().addLast(new ObjectEncoder()); //对象编码器
ch.pipeline().addLast(new EchoClientHandlerBigDataTest());
}
});
服务器端
修改服务器端Handler类
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
//获取输入的内容
Member inputMember = (Member) msg;
System.err.println("{EchoServer端}接收到消息内容:" + inputMember);
//设置回应消息,将工资增加10000
Member echoMember = new Member();
echoMember.setName(inputMember.getName());
echoMember.setAge(inputMember.getAge());
echoMember.setSalary(inputMember.getSalary() + 10000);
//返回客户端数据
ctx.writeAndFlush(echoMember);
} catch (Exception e) {
e.printStackTrace();
} finally {
//执行完毕之后,有无异常都抛弃接收到的消息
ReferenceCountUtil.release(msg);
}
}
同样需要修改Server类,在处理链追加新的处理操作工具,与Client一致的配置
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader()))); //对象解码器
ch.pipeline().addLast(new ObjectEncoder()); //对象编码器
ch.pipeline().addLast(new EchoServerHandlerBigDataTest());
}
});
执行结果
服务器端
客户端
MessagePack序列化
虽然JDK原生支持的序列化处理操作可以得到最佳的传输性能,但是其未必是所有项目开发之中的首选处理工具,因为其最大的问题在于,本身的程序实现是严重依赖编程语言的,而如果要进行服务器的操作实现肯定要适用于所有的编程语言,那么在实际的编写之中就可以引入一些第三方定义的数据传输结构,所以本次首先研究“MsgPack”组件,官方地址:“https://msgpack.org/”。
使用MessagePack最大的特点在于,其操作的结果类是于JSON,但是比JSON的传输速度更快,而且体积更小。比如:
|- 如果要在JSON里面进行布尔的传输使用的是true和false,而在MessagePack里面只用一个字节来描述;
|- 在进行一些字符串描述的时候也会适当的进行字符串数据的压缩处理。
在公共包中引入相关依赖即可
Gradle
compile group: 'org.msgpack', name: 'msgpack', version: '0.6.12'
Maven
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.12</version>
</dependency>
只需要在VO类上使用“@Message”注解即可使用MsgPack。如果要基于MassagePack进行数据的序列化与反序列化操作,那么在公共包中就要设置一个基于MessagePack序列化编码器类。
该编码器类继承“MessageToByteEncoder
public class MessagePackEncoder extends MessageToByteEncoder<Object> {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
//准备好一个序列化工具
MessagePack msgPack = new MessagePack();
//序列化处理
byte[] raw = msgPack.write(msg);
//进行字节数据的传输
out.writeBytes(raw);
}
}
有了编码器还需要一个解码器,解码器需要继承“MessageToMessageDecoder
public class MessagePackDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) throws Exception {
//读取获取的字节数据的长度
int len = byteBuf.readableBytes();
//创建一个新的数组
byte[] raw = new byte[len];
//保存在raw中
byteBuf.getBytes(byteBuf.readerIndex(), raw, 0, len);
MessagePack msgPack = new MessagePack();
//反序列化操作
list.add(msgPack.read(raw));
}
}
随后在server和client处理节点上加入处理操作工具,两边的配置都是一致的
//解码器
ch.pipeline().addLast(new MessagePackDecoder());
ch.pipeline().addLast(new LengthFieldPrepender(3));
//编码器
ch.pipeline().addLast(new MessagePackEncoder());
ch.pipeline().addLast(new EchoServerHandlerBigDataTest());
一定要加上:ch.pipeline().addLast(new LengthFieldPrepender(3));表示处理对象的属性个数
但是这样配置在启动服务端的时候不会报错,一旦启动了客户端,就会报错,类型无法转换。服务器端接收到的 类型是“org.msgpack.type.IntValueImpl”,所有要在处理节点中加入ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536,0,3,0,3));
注意:此时如果正在进行解码器定义的时候使用的是“msgPack.read(raw)”会有一些问题:
如果在server和client类中没有加上下面的处理器:
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536,0,3,0,3));
同时在当前的MessagePack解码器中获得的只是一个原始的字节内容“msgPack.read(raw)”,那么此时能够接收到的消息内容就是IntValue,但是明显不能够使用IntValue,因为其所描述的都是数值类型的操作数据,而本身传递的是一个对象,所以才加入上面的处理节点,于是此时获得的类型就变成了“org.magpack.type.ArrayValueImpl”,它属于ArrayValue的子类。
此时修改解码器
public class MessagePackDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) throws Exception {
//读取获取的字节数据的长度
int len = byteBuf.readableBytes();
//创建一个新的数组
byte[] raw = new byte[len];
//保存在raw数组中
byteBuf.getBytes(byteBuf.readerIndex(), raw, 0, len);
MessagePack msgPack = new MessagePack();
//反序列化操作
list.add(msgPack.read(raw,msgPack.lookup(Member.class)));
}
}
上面的配置在进行MessagePack解码器的过程里面,将对应的字节数据直接解析为一个完整的“Member”对象,如果此时为“Member”对象则在传递“XxxChannelHandler”的时候所传递的一定是一个具体的对象,如果没有使用它,那么传递的就属于一个原生的数组操作。
完成以上的配置就可以进行数据的序列化与反序列化
Marshalling序列化
虽然MessagePack得到了众多语言的处理支持,并且可以进行高效的数据传输,但是其在使用的过程中需要引入相应的“@message”注解,实际上这样的操作本身就破坏了程序的结构,而在JBoos组织之中为了进一步与原生的Java序列化操作结合,所以提供了一个Marshalling序列化工具包。
需要使用该工具包,只需引入相应的依赖即可
Gradle
compile group: 'org.jboss.marshalling', name: 'jboss-marshalling', version: '2.0.9.Final'
testCompile group: 'org.jboss.marshalling', name: 'jboss-marshalling-serial', version: '2.0.9.Final'
Maven
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling</artifactId>
<version>2.0.9.Final</version>
</dependency>
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-serial</artifactId>
<version>2.0.9.Final</version>
<scope>test</scope>
</dependency>
然编写一个Marshalling序列化工厂类,里面提供有编码、解码操作
public class MarshallingCodeFactory {
/**
* 创建一个Marshalling的数据编码器
* @return 编码器的实例化对象
*/
public static MarshallingEncoder bulidMarshallingEncoder() {
//使用JDK中默认的序列化操作
MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial");
//进行编码器的相关配置
MarshallingConfiguration configuration = new MarshallingConfiguration();
//存放的是序列化的配置版本
//configuration.setVersion(1);
MarshallerProvider provider = new DefaultMarshallerProvider(factory, configuration);
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
/**
* 创建一个Marshalling解码器
* @return 解码器实例对象
*/
public static MarshallingDecoder bulidMarshallingDecoder() {
//使用JDK中默认的序列化操作
MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial");
//进行解码器的相关配置
MarshallingConfiguration configuration = new MarshallingConfiguration();
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(factory, configuration);
//手工设置对象传输的最大长度
int maxSize = 1024 << 2;
MarshallingDecoder decoder = new MarshallingDecoder(provider, maxSize);
return decoder;
}
}
然后在处理节点中加入编码、解码器即可使用Marshalling。server和client都是一样的配置
cliectBootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//编码器
ch.pipeline().addLast(MarshallingCodeFactory.bulidMarshallingEncoder());
//解码器
ch.pipeline().addLast(MarshallingCodeFactory.bulidMarshallingDecoder());
ch.pipeline().addLast(new EchoClientHandlerBigDataTest());
}
});
FastJSON序列化
fastJson是有阿里推出的一个快速进行JSON解析与生成的工具包,使用JSON进行数据传输一定不是最快的,但结构却是最清晰的。需要使用FastJson直接引入相关依赖即可
compile group: 'com.alibaba', name: 'fastjson', version: '1.2.62'
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
随后定义专属的编码、解码器
//编码器
public class JsonEncoder extends MessageToByteEncoder<Object> {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf byteBuf) throws Exception {
byte[] data = JSONObject.toJSONString(msg).getBytes();
byteBuf.writeBytes(data);
}
}
//解码器
public class JsonDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
int len = byteBuf.readableBytes();
byte[] data = new byte[len];
byteBuf.getBytes(byteBuf.readerIndex(),data,0,len);
out.add(JSON.parseObject(new String(data), Member.class));
}
}
之后在处理节点中追加JSON编码解码器即可,该配置与MessagePack的配置类似
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536, 0, 3, 0, 3));
ch.pipeline().addLast(new JsonDecoder());
ch.pipeline().addLast(new LengthFieldPrepender(3)); //设置属性个数
ch.pipeline().addLast(new JsonEncoder());