推荐回答
netty写数据的时候,会先放到一个缓存队列AbstractNioChannel.writeBufferQueue中,这个队列是WriteRequestQueueJava代码publicvoideventSunkChannelPipelinepipeline,ChannelEventethrowsException{ifeinstanceofChannelStateEvent{……}elseifeinstanceofMessageEvent{MessageEventevent=MessageEvente;NioSocketChannelchannel=NioSocketChannelevent.getChannel;booleanoffered=channel.writeBufferQueue.offerevent;//写到channel的writeBufferQueueassertoffered;channel.worker.writeFromUserCodechannel;}}WriteRequestQueue的offer方法中会根据缓存消息的总大小判断是否超过了高水位线highWaterMark,如果第一次超过了超过高水位线,就会fireChannelInterestChanged;后边如果仍然一直往队列放数据,缓存的消息的大小持续超过高水位线的时候,不会再fireChannelInterestChanged。Java代码publicbooleanofferMessageEvente{booleansuccess=queue.offere;assertsuccess;intmessageSize=getMessageSizee;intnewWriteBufferSize=writeBufferSize.addAndGetmessageSize;inthighWaterMark=getConfig.getWriteBufferHighWaterMark;ifnewWriteBufferSize>=highWaterMark{ifnewWriteBufferSize-messageSize=lowWaterMark{//本次拉取,是的缓存数据大小掉到了低水位之下highWaterMarkCounter.decrementAndGet;ifisConnected&&!notifying.get{notifying.setBoolean.TRUE;fireChannelInterestChangedAbstractNioChannel.this;notifying.setBoolean.FALSE;}}}}returne;}超过高水位和低于低水位都会触发fireChannelInterestChanged,怎么区分呢?通过AbstractChannel.isWritable,如果channel的interestOps里边有注册过OP_WRITE,则是不可写的,否则是可写的Java代码publicbooleanisWritable{returngetInterestOps&OP_WRITE==0;}publicintgetInterestOps{if!isOpen{returnChannel.OP_WRITE;}intinterestOps=getRawInterestOps;intwriteBufferSize=this.writeBufferSize.get;ifwriteBufferSize!=0{ifhighWaterMarkCounter.get>0{//还记得这个值,放数据到发送队列的时候值+=1,从队列拉数据出来的时候值-=1intlowWaterMark=getConfig.getWriteBufferLowWaterMark;ifwriteBufferSize>=lowWaterMark{//缓存队列数据量,超过高水位,也超过了低水位,意味着高水位>低水位,此时等于注册写操作interestOps|=Channel.OP_WRITE;}else{interestOps&=~Channel.OP_WRITE;//缓存队列数据量,超过高水位但是低于低水位,意味着低水位>高水位,此时等于没有注册写操作}}else{//超过高水位counter=highWaterMark{//这里,缓存数据量仍然高于高水位.....并发?按道理说channel的处理是单线程处理的,此时等于注册写操作interestOps|=Channel.OP_WRITE;}else{interestOps&=~Channel.OP_WRITE;}}}else{interestOps&=~Channel.OP_WRITE;//写队列没数据,没有注册写操作}returninterestOps;}即,如果超过高水位isWritable==false,低于低水位isWritable==true,低水位优先级高于高水位,即如果当前水位>低水位则不可写,否则可写如果在通过netty向某机器写数据,但是写很缓慢,则会导致数据都缓存到netty的发送队列中,如果不做控制,可能会导致fullgc/cmsgc频繁,甚至最终OOM。所以可以考虑用高水位和低水位的值来控制netty的缓存队列,即用AbstractChannel.isWritable来控制是否继续写,如果AbstractChannel.isWritable==false,则丢弃数据,或者记录发送数据的状态,待后续缓存数据队列水位下降到安全水位后再发送。
赵驰北2019-12-21 15:11:33
提示您:回答为网友贡献,仅供参考。