《跟闪电侠学Netty》阅读笔记 - 数据载体 ByteBuf
引言
API设计更建议实战过程中逐渐了解熟悉掌握,本文记录基础设计和相关API,只需要大致了解ByteBuf设计思想即可。
思维导图
[https://www.mubucm.com/doc/58ehM7v4PP5]()
基础结构
整个ByteBuf的数据结构组成如下,整个设计思想有点类似计算机如何实现从北京到上海,那就是一段足够长的铁轨,不断“拆掉”后面的铁轨放到前面的铁轨上,这样实现火车一直在铁轨上跑的错觉。
- 容量上限:maxCapacity
- 容量:capacity
- 数组:
- 废弃字节 :被丢弃的字节数据无效
- 可读字节(writerIndex -readerIndex)
- 可写字节(capacity - writerIndex)
- 读指针 readerIndex :每读取(read)一个字节,readerIndex 自增 1
- 写指针 writerIndex :每写入(write)一个字节,writeIndex 自增 1
- 剩余可用空间
结构解析
-
- 字节容器:分为三部分
- 废弃空间 :被丢弃的字节,数据无效
- 可读空间 :从ByteBuf读取出来的数据都属于这部分
- 可写空间 :未来所有的写入都会写入到此处
-
- 划分依据:两个指针加一个变量
-
- 最大容量和和容量限制
- 容量可以在写满的时候扩容
- 如果扩容到最大容量就报错
容量API
实践容量API之前,我们先构建ByteBuf。
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);
print("allocate ByteBuf(9,100) => {} \n", buffer);
// allocate ByteBuf(9,100) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 9/100)
capacity()
表示 ByteBuf 底层占用了多少字节的内存(包括丢弃的字节、可读字节、可写字节),不同的底层实现机制有不同的计算方式,后面我们讲 ByteBuf 的分类的时候会讲到。
从案例看出,默认创建的方式容量为初始化指定的容量。
print("allocate ByteBuf(9,100) capacity => {} \n", buffer.capacity());
// allocate ByteBuf(9,100) capacity => 9
maxCapacity()
表示 ByteBuf 底层最大能够占用多少字节的内存,当向 ByteBuf 中写数据的时候,如果发现容量不足,则进行扩容,直到扩容到 maxCapacity,超过这个数,就抛异常。
从案例可以得知,如果扩容到 100 就会报错
print("allocate ByteBuf(9,100) maxCapacity => {} \n", buffer.maxCapacity());
// allocate ByteBuf(9,100) maxCapacity => 100
readableBytes() 与 isReadable()
readableBytes() 表示 ByteBuf 当前可读的字节数,它的值等于 writerIndex-readerIndex,如果两者相等,则不可读,isReadable() 方法返回 false。
// readableBytes() 与 isReadable()
print("allocate ByteBuf(9,100) isReadable => {} \n", buffer.isReadable());
print("allocate ByteBuf(9,100) readableBytes => {} \n", buffer.readableBytes());
// allocate ByteBuf(9,100) isReadable => false
// allocate ByteBuf(9,100) readableBytes => 0
// write 方法改变写指针
buffer.writeBytes(new byte[]{1, 2, 3, 4});
// 改变写指针 writeBytes(new byte[]{1,2,3,4}) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 4, cap: 9/100)
// 写入数据之后,重新执行readableBytes() 与 isReadable()
print("allocate ByteBuf(9,100) isReadable => {} \n", buffer.isReadable());
print("allocate ByteBuf(9,100) readableBytes => {} \n", buffer.readableBytes());
//allocate ByteBuf(9,100) isReadable => true
//allocate ByteBuf(9,100) readableBytes => 4
writableBytes()、 isWritable() 与 maxWritableBytes()
writableBytes() 表示 ByteBuf 当前可写的字节数,它的值等于 capacity-writerIndex,如果两者相等,则表示不可写,isWritable() 返回 false。
注意这个时候,并不代表不能往 ByteBuf 中写数据了, 如果发现往 ByteBuf 中写数据写不进去的话,Netty 会自动扩容 ByteBuf,直到扩容到底层的内存大小为 maxCapacity。
maxWritableBytes() 就表示可写的最大字节数,它的值等于 maxCapacity-writerIndex。
在初始化构建过程中,由于没有读写任何数据,可以看到他们的值基本和前面计算的容量是一致的。
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);
// writableBytes()、 isWritable() 与 maxWritableBytes()
print("allocate ByteBuf(9,100) writableBytes => {} \n", buffer.writableBytes());
print("allocate ByteBuf(9,100) isWritable => {} \n", buffer.isWritable());
print("allocate ByteBuf(9,100) maxWritableBytes => {} \n", buffer.maxWritableBytes());
// allocate ByteBuf(9,100) writableBytes => 9
// allocate ByteBuf(9,100) isWritable => true
// allocate ByteBuf(9,100) maxWritableBytes => 100
读写指针相关的 API
实践读写指针相关的 API之前,我们先构建初始化ByteBuf。
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);
print("allocate ByteBuf(9,100) => {} \n", buffer);
readerIndex() 与 readerIndex(int)
- readerIndex():返回当前的读指针 readerIndex
- readerIndex(int):表示设置读指针
在没有写入任何数据的时候,读指针为0。
// readerIndex() 返回当前读指针
print("allocate ByteBuf(9,100) readerIndex => {} \n", buffer.readerIndex());
// allocate ByteBuf(9,100) readerIndex => 0
下面的案例说明读指针不能越过写指针的界限。
// 尝试手动重定向渎指针位置
// print("allocate ByteBuf(9,100) readerIndex(int) => {} \n", buffer.readerIndex(2));
// readerIndex: 2, writerIndex: 0 (expected: 0 <= readerIndex <= writerIndex <= capacity(9))
我们写入一些数据之后,再进行读指针重定向。
buffer.writeBytes(new byte[]{1, 2, 3, 4});
// 重定向读指针
print("allocate ByteBuf(9,100) readerIndex(int) => {} \n", buffer.readerIndex(2));
print("重定向读指针 之后 (new byte[]{1,2,3,4}) => {} \n", buffer);
// allocate ByteBuf(9,100) readerIndex(int) => PooledUnsafeDirectByteBuf(ridx: 2, widx: 4, cap: 9/100)
// 重定向读指针 之后 (new byte[]{1,2,3,4}) => PooledUnsafeDirectByteBuf(ridx: 2, widx: 4, cap: 9/100)
writerIndex() 与 writerIndex(int)
writeIndex()
表示返回当前的写指针 writerIndex。
writeIndex(int)
表示设置写指针。
案例以初始化写入四个字节之后作为开始。
// writeIndex() 与 writeIndex(int)
print("allocate ByteBuf(9,100) writerIndex => {} \n", buffer.writerIndex());
print("allocate ByteBuf(9,100) writerIndex(int) => {} \n", buffer.writerIndex(2));
// allocate ByteBuf(9,100) writerIndex => 4
// allocate ByteBuf(9,100) writerIndex(int) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 2, cap: 9/100)
markReaderIndex() 与 resetReaderIndex()
区别:
markReaderIndex()
:表示把当前的读指针保存起来,
resetReaderIndex()
:表示把当前的读指针恢复到之前保存的值。
下面两段代码是等价的。
// 代码片段1
int readerIndex = buffer.readerIndex();
// … 其他操作
buffer.readerIndex(readerIndex);
// 代码片段二
// (不需要自己定义变量,推荐使用)
buffer.markReaderIndex();
// … 其他操作
// resetReaderIndex() 可以恢复到之前状态
// (解析自定义协议的数据包常用)
buffer.resetReaderIndex();
希望大家多多使用代码片段二这种方式,不需要自己定义变量,无论 buffer 当作参数传递到哪里,调用 resetReaderIndex() 都可以恢复到之前的状态,在解析自定义协议的数据包的时候非常常见,推荐大家使用这一对 API markWriterIndex() 与 resetWriterIndex() 这一对 API 的作用与上述一对 API 类似
读写API
实践读写API之前,我们先构建ByteBuf。
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);
print("allocate ByteBuf(9,100) => {} \n", buffer);
上面的代码
writeBytes(byte[] src) 与 buffer.readBytes(byte[] dst)
writeBytes() 表示把字节数组 src 里面的数据全部写到 ByteBuf。注意此方法执行之后,会移动前面介绍的 writeIndex 写指针。
// write 方法改变写指针
buffer.writeBytes(new byte[]{1, 2, 3, 4});
print("改变写指针 writeBytes(new byte[]{1,2,3,4}) => {} \n", buffer);
// 改变写指针 writeBytes(new byte[]{1,2,3,4}) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 4, cap: 9/100)
readBytes()
指的是把 ByteBuf 里面的数据全部读取到 dst。
//只有read方法改变指针
byte[] bytes = new byte[buffer.readableBytes()];
buffer.readBytes(bytes);
print("bytes 内容 => {}", bytes);
// bytes 内容 =>
这里 dst 字节数组的大小通常等于 readableBytes(),而 src 字节数组大小的长度通常小于等于writableBytes()。
writeByte(byte b) 与 buffer.readByte()
writeByte() 表示往 ByteBuf 中写一个字节,而 buffer.readByte() 表示从 ByteBuf 中读取一个字节,类似的 API 还有 writeBoolean()、writeChar()、writeShort()、writeInt()、writeLong()、writeFloat()、writeDouble() 与 readBoolean()、readChar()、readShort()、readInt()、readLong()、readFloat()、readDouble()
这里就不一一赘述。
// write 方法改变写指针
buffer.writeBytes(new byte[]{1, 2, 3, 4});
print("改变写指针 writeBytes(new byte[]{1,2,3,4}) => {} \n", buffer);
// 改变写指针 writeBytes(new byte[]{1,2,3,4}) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 4, cap: 9/100)buffer.writeByte(5);
print("改变写指针 buffer.writeByte(5) => {} \n", buffer);
// 改变写指针 buffer.writeByte(5) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 5, cap: 9/100)
getBytes、getByte() 与 setBytes()、setByte() 系列,唯一的区别就是 get/set 不会改变读写指针,而 read/write 会改变读写指针。
//只有read方法改变指针
byte[] bytes = new byte[buffer.readableBytes()];
buffer.readBytes(bytes);
print("buffer.readBytes(bytes) => {}\n", buffer);
// buffer.readBytes(bytes) => PooledUnsafeDirectByteBuf(ridx: 10, widx: 10, cap: 16/100)
release() 与 retain()
由于 Netty 使用了堆外内存,而堆外内存是不被 jvm 直接管理的,也就是说申请到的内存无法被垃圾回收器直接回收,所以需要我们手动回收。
Netty 的 ByteBuf 是通过引用计数的方式管理的,如果一个 ByteBuf 没有地方被引用到,就需要回收底层内存。
默认情况下,当创建完一个 ByteBuf,它的引用为1,然后每次调用 retain() 方法, 它的引用就加一, release() 方法原理是将引用计数减一,减完之后如果发现引用计数为0,则直接回收 ByteBuf 底层的内存。
Test
最后是简单测试程序整合前面的API案例。
public class ByteBufTest {
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);
print("allocate ByteBuf(9, 100)", buffer);
// write 方法改变写指针,写完之后写指针未到 capacity 的时候,buffer 仍然可写
buffer.writeBytes(new byte[]{1, 2, 3, 4});
print("writeBytes(1,2,3,4)", buffer);
// write 方法改变写指针,写完之后写指针未到 capacity 的时候,buffer 仍然可写, 写完 int 类型之后,写指针增加4
buffer.writeInt(12);
print("writeInt(12)", buffer);
// write 方法改变写指针, 写完之后写指针等于 capacity 的时候,buffer 不可写
buffer.writeBytes(new byte[]{5});
print("writeBytes(5)", buffer);
// write 方法改变写指针,写的时候发现 buffer 不可写则开始扩容,扩容之后 capacity 随即改变
buffer.writeBytes(new byte[]{6});
print("writeBytes(6)", buffer);
// get 方法不改变读写指针
System.out.println("getByte(3) return: " + buffer.getByte(3));
System.out.println("getShort(3) return: " + buffer.getShort(3));
System.out.println("getInt(3) return: " + buffer.getInt(3));
print("getByte()", buffer);
// set 方法不改变读写指针
buffer.setByte(buffer.readableBytes() + 1, 0);
print("setByte()", buffer);
// read 方法改变读指针
byte[] dst = new byte[buffer.readableBytes()];
buffer.readBytes(dst);
print("readBytes(" + dst.length + ")", buffer);
}
private static void print(String action, ByteBuf buffer) {
System.out.println("after ===========" + action + "============");
System.out.println("capacity(): " + buffer.capacity());
System.out.println("maxCapacity(): " + buffer.maxCapacity());
System.out.println("readerIndex(): " + buffer.readerIndex());
System.out.println("readableBytes(): " + buffer.readableBytes());
System.out.println("isReadable(): " + buffer.isReadable());
System.out.println("writerIndex(): " + buffer.writerIndex());
System.out.println("writableBytes(): " + buffer.writableBytes());
System.out.println("isWritable(): " + buffer.isWritable());
System.out.println("maxWritableBytes(): " + buffer.maxWritableBytes());
System.out.println();
}
}
最后是个人的实验Test类。
/**
* byteBuf 的API测试
*/
public class ByteBufTest {
public static void main(String[] args) {
// 9 代表初始容量, 100代表最大容量
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);
print("allocate ByteBuf(9,100) => {} \n", buffer);
//allocate ByteBuf(9,100) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 9/100)
// write 方法改变写指针
buffer.writeBytes(new byte[]{1, 2, 3, 4});
print("改变写指针 writeBytes(new byte[]{1,2,3,4}) => {} \n", buffer);
// 改变写指针 writeBytes(new byte[]{1,2,3,4}) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 4, cap: 9/100)
// write 改变写指针,如果没有到达 capacity 依然可以写入,写入 int 之后写指针增加4
buffer.writeInt(12);
print("buffer.writeInt(12) => {}\n", buffer);
// buffer.writeInt(12) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 8, cap: 9/100)
// 继续改变写指针,当前写入等于 initialCapacity 这个初始值之后将不能继续写入
buffer.writeBytes(new byte[]{5});
print("writeBytes(new byte[]{5}) => {}\n", buffer);
// writeBytes(new byte[]{5}) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 9, cap: 9/100)
// 继续写入指针,此时发现 已经超过 initialCapacity 的值,此时会进行扩容
buffer.writeBytes(new byte[]{6});
print("writeBytes(new byte[]{6}) => {}\n", buffer);
// writeBytes(new byte[]{6}) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 10, cap: 16/100)
// get 方法调用之后不改变读指针
print("getByte(3) return => {}\n", buffer.getByte(3));
print("getShort(3) return => {}\n", buffer.getShort(3));
print("getInt(3) return => {}\n", buffer.getInt(3));
print("getChar(3) return => {}\n", buffer.getChar(3));
/*
getByte(3) return => 4 getShort(3) return => 1024 getInt(3) return => 67108864 getChar(3) return => Ѐ
* */
// set 方法不改变读写指针
buffer.setByte(buffer.readableBytes() + 1, 0);
print("setByte(buffer.readableBytes() + 1, 0) => {}\n", buffer);
// setByte(buffer.readableBytes() + 1, 0) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 10, cap: 16/100)
//只有read方法改变指针
byte[] bytes = new byte[buffer.readableBytes()];
buffer.readBytes(bytes);
print("buffer.readBytes(bytes) => {}\n", buffer);
// buffer.readBytes(bytes) => PooledUnsafeDirectByteBuf(ridx: 10, widx: 10, cap: 16/100)
print("buffer.readBytes(readBuffer); => {}\n", buffer);
ByteBuf readBuffer = ByteBufAllocator.DEFAULT.buffer(6, 6);
// 原始writeIndex要有足够空间可读
// buffer.writeBytes(new byte[]{5,1,1,1,1,1,1,1});
// buffer.writeBytes(new byte[]{5});
// readerIndex(10) + length(6) exceeds writerIndex(11): PooledUnsafeDirectByteBuf(ridx: 10, widx: 11, cap: 16/100) buffer.readBytes(readBuffer);
System.err.println(readBuffer.readableBytes());
// buffer.readBytes(readBuffer);
// readerIndex(10) + length(9) exceeds writerIndex(10): PooledUnsafeDirectByteBuf(ridx: 10, widx: 10, cap: 16/100)
}
}
写在最后
比较简单的一个章节,主要介绍ByteBuf在Java中的使用,整个使用过程简单易懂十分清晰。