[讨论] Linux 下使用NIO长连接,内存泄漏问题
yhz04
2012-10-22
应用场景:我们做的是mysql数据库中间层,对外提供标准的mysql服务。
底层的socket通信上选用了NIO,因为是数据库服务,用户通常采用连接池的方式与我们保持socket长连接,并定期有validate。 另外用户的请求时常会有大数据集的发生,在处理写数据的时候采用了一次写不完,注册写事件的方式处理。 boolean finished = doWrite(); if (!finished) { key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); } public boolean doWrite() throws IOException { synchronized (writeLock) { ByteBuffer buffer = null; int message = 0; while ((buffer = outQueue.getNonBlocking()) != null) { channel.write(buffer); if (buffer.remaining() > 0) { outQueue.prepend(buffer); return false; } else { message++; } } return true; } } dump日志中对象信息: ![]() 查看源码发现:Linux版本下SelectionKey.interestOps(int ops);最终指向: EPollSelectorImpl.putEventOps(SelectionKeyImpl sk, int ops);方法。 184 void putEventOps(SelectionKeyImpl sk, int ops) { 185 if (closed) 186 throw new ClosedSelectorException(); 187 pollWrapper.setInterest(sk.channel, ops); 188 } 而EPollArrayWrapper会在每次setInterest(SelChImpl channel, int mask);时候重新创建一个Updator对象。 169 /** 170 * Update the events for a given channel. 171 */ 172 void setInterest(SelChImpl channel, int mask) { 173 synchronized (updateList) { 174 // if the previous pending operation is to add this file descriptor 175 // to epoll then update its event set 176 if (updateList.size() > 0) { 177 Updator last = updateList.getLast(); 178 if (last.channel == channel && last.opcode == EPOLL_CTL_ADD) { 179 last.events = mask; 180 return; 181 } 182 } 183 184 // update existing registration 185 updateList.add(new Updator(channel, EPOLL_CTL_MOD, mask)); 186 } 187 } Updator对象只有在channel被cancel时才会从linkList中移除。如果使用长连接,连接一直被复用,就会造成了内存泄漏。 |
|
阿男bluedash
2012-10-22
这个代码不完整,方便把相关的代码贴出来吗?看上去像是使用问题。
|
|
yhz04
2012-10-23
入口方法是:
private void writeMessage() { if (this.socketClosed) { return; } try { SelectionKey key = getSelectionKey(); if (key != null && !key.isValid()) { handleFailureToClose(new java.nio.channels.CancelledKeyException()); return; } if (key != null && (key.interestOps() & SelectionKey.OP_WRITE) == 0) { boolean finished = doWrite(); if (!finished) key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); } } catch (CancelledKeyException ce) { logger.error(ce); handleFailureToClose(ce); } catch (IOException e) { logger.error(e); handleFailureToClose(e); } } 对于一次不能写完数据的情况采用了注册写事件的方式。下面这段是selector对于事件的轮询处理代码: private void flush() { if (flushingClientConnections.isEmpty()) return; do { AbstractClientConnection clientConnection = flushingClientConnections .poll(); if (clientConnection == null) break; try { boolean flushedAll = flushNow(clientConnection); if (!flushedAll) { flushingClientConnections.add(clientConnection); } } catch (Exception e) { logger.error("flush error : {}", e); } } while (!flushingClientConnections.isEmpty()); } private boolean flushNow(AbstractClientConnection clientConnection) { try { // Clear OP_WRITE setInterestedInWrite(clientConnection, false); boolean finished = clientConnection.doWrite(); if (!finished) { setInterestedInWrite(clientConnection, true); return false; } } catch (Exception e) { logger.error("flushNow error : {}", e); return false; } return true; } protected void setInterestedInWrite( AbstractClientConnection clientConnection, boolean isInterested) throws Exception { SelectionKey key = clientConnection.getSelectionKey(); if (key == null) { return; } int oldInterestOps = key.interestOps(); int newInterestOps = oldInterestOps; if (isInterested) { newInterestOps |= SelectionKey.OP_WRITE; } else { newInterestOps &= ~SelectionKey.OP_WRITE; } if (oldInterestOps != newInterestOps) { key.interestOps(newInterestOps); } } 注释Clear OP_WRITE的地方已经对于写操作完成的事件,反注册了写事件。 |
|
阿男bluedash
2012-10-23
private void writeMessage() { try { SelectionKey key = getSelectionKey(); ... boolean finished = doWrite(); ... 这里,key如果writable,则呼叫doWrite(),进入doWrite(): public boolean doWrite() throws IOException { ... } 在进入doWrite()之前,一定要将此key的SelectionKey.OP_WRITE清掉,就像你在setInterestedInWrite中做的那样: &= ~SelectionKey.OP_WRITE 目前来看是这处BUG造成的,试试看。 |
|
yhz04
2012-10-23
doWrite()之前有一行判断额。
(key.interestOps() & SelectionKey.OP_WRITE) == 0 这个说明当前的注册事件不是写事件。 昨天半夜找到问题的原因了,做下简单的分享。问题主要出现在flush()方法中。 try { boolean flushedAll = flushNow(clientConnection); if (!flushedAll) { flushingClientConnections.add(clientConnection); } } catch (Exception e) { logger.error("flush error : {}", e); } 当flushNow方法一次没有写完的时候不能继续直接注册写事件,循环写入。原因:当系统的资源十分匮乏的时候,内核BUFF有可能没有写入空间,这个时候channel.write()就会返回0字节,这个时候循环写入就会无限的注册写事件,导致updator对象内存溢出。需要在注册写事件以后跳出循环,等待selector.select()方法唤醒后再进行处理(理由是selector.doSelect()方法会对updator对象做一次清理)。 感谢楼主、阿男帮忙分析问题。 |
|
阿男bluedash
2012-10-23
Cool! :-)
|
|
stepinto
2013-01-14
delete
|
|
stepinto
2013-01-14
delete
|
|
stepinto
2013-01-14
delete
|