[讨论] Linux 下使用NIO长连接,内存泄漏问题

yhz04 2012-10-22
应用场景:我们做的是mysql数据库中间层,对外提供标准的mysql服务。

底层的socket通信上选用了NIO,因为是数据库服务,用户通常采用连接池的方式与我们保持socket长连接,并定期有validate。

另外用户的请求时常会有大数据集的发生,在处理写数据的时候采用了一次写不完,注册写事件的方式处理。

boolean finished = doWrite();

if (!finished) {

     key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);

  }

public boolean doWrite() throws IOException {

      synchronized (writeLock) {

            ByteBuffer buffer = null;

            int message = 0;

            while ((buffer = outQueue.getNonBlocking()) != null) {

                  channel.write(buffer);

                  if (buffer.remaining() > 0) {

                      outQueue.prepend(buffer);

                      return false;

                  } else {
                      message++;
                  }
           }
           return true;
     }
}

dump日志中对象信息:

查看源码发现:Linux版本下SelectionKey.interestOps(int ops);最终指向:
EPollSelectorImpl.putEventOps(SelectionKeyImpl sk, int ops);方法。
  184       void putEventOps(SelectionKeyImpl sk, int ops) {
  185           if (closed)
  186               throw new ClosedSelectorException();
  187           pollWrapper.setInterest(sk.channel, ops);
  188       }


而EPollArrayWrapper会在每次setInterest(SelChImpl channel, int mask);时候重新创建一个Updator对象。
  169       /**
  170        * Update the events for a given channel.
  171        */
  172       void setInterest(SelChImpl channel, int mask) {
  173           synchronized (updateList) {
  174               // if the previous pending operation is to add this file descriptor
  175               // to epoll then update its event set
  176               if (updateList.size() > 0) {
  177                   Updator last = updateList.getLast();
  178                   if (last.channel == channel && last.opcode == EPOLL_CTL_ADD) {
  179                       last.events = mask;
  180                       return;
  181                   }
  182               }
  183   
  184               // update existing registration
  185               updateList.add(new Updator(channel, EPOLL_CTL_MOD, mask));
  186           }
  187       }


Updator对象只有在channel被cancel时才会从linkList中移除。如果使用长连接,连接一直被复用,就会造成了内存泄漏。


阿男bluedash 2012-10-22
这个代码不完整,方便把相关的代码贴出来吗?看上去像是使用问题。
yhz04 2012-10-23
入口方法是:
private void writeMessage() {
		if (this.socketClosed) {
			return;
		}
		try {
			SelectionKey key = getSelectionKey();
			if (key != null && !key.isValid()) {
				handleFailureToClose(new java.nio.channels.CancelledKeyException());
				return;
			}

			if (key != null && (key.interestOps() & SelectionKey.OP_WRITE) == 0) {
				boolean finished = doWrite();
				if (!finished)
					key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
			}
		} catch (CancelledKeyException ce) {
			logger.error(ce);
			handleFailureToClose(ce);
		} catch (IOException e) {
			logger.error(e);
			handleFailureToClose(e);
		}
	}

对于一次不能写完数据的情况采用了注册写事件的方式。下面这段是selector对于事件的轮询处理代码:
private void flush() {
		if (flushingClientConnections.isEmpty())
			return;
		do {
			AbstractClientConnection clientConnection = flushingClientConnections
					.poll();

			if (clientConnection == null)
				break;

			try {
				boolean flushedAll = flushNow(clientConnection);

				if (!flushedAll) {
					flushingClientConnections.add(clientConnection);
				}
			} catch (Exception e) {
				logger.error("flush error : {}", e);
			}
		} while (!flushingClientConnections.isEmpty());
	}

	private boolean flushNow(AbstractClientConnection clientConnection) {
		try {
			// Clear OP_WRITE
			setInterestedInWrite(clientConnection, false);
			boolean finished = clientConnection.doWrite();

			if (!finished) {
				setInterestedInWrite(clientConnection, true);
				return false;
			}
		} catch (Exception e) {
			logger.error("flushNow error : {}", e);
			return false;
		}
		return true;
	}

protected void setInterestedInWrite(
			AbstractClientConnection clientConnection, boolean isInterested)
			throws Exception {
		SelectionKey key = clientConnection.getSelectionKey();

		if (key == null) {
			return;
		}

		int oldInterestOps = key.interestOps();
		int newInterestOps = oldInterestOps;

		if (isInterested) {
			newInterestOps |= SelectionKey.OP_WRITE;
		} else {
			newInterestOps &= ~SelectionKey.OP_WRITE;
		}

		if (oldInterestOps != newInterestOps) {
			key.interestOps(newInterestOps);
		}
	}

注释Clear OP_WRITE的地方已经对于写操作完成的事件,反注册了写事件。
阿男bluedash 2012-10-23
private void writeMessage() {  
        try {  
            SelectionKey key = getSelectionKey();  
...
                boolean finished = doWrite();  
...


这里,key如果writable,则呼叫doWrite(),进入doWrite():

public boolean doWrite() throws IOException {  
...
}


在进入doWrite()之前,一定要将此key的SelectionKey.OP_WRITE清掉,就像你在setInterestedInWrite中做的那样:

&= ~SelectionKey.OP_WRITE


目前来看是这处BUG造成的,试试看。
yhz04 2012-10-23
doWrite()之前有一行判断额。
 (key.interestOps() & SelectionKey.OP_WRITE) == 0

这个说明当前的注册事件不是写事件。
昨天半夜找到问题的原因了,做下简单的分享。问题主要出现在flush()方法中。
try {  
      boolean flushedAll = flushNow(clientConnection);  
  
      if (!flushedAll) {  
                    flushingClientConnections.add(clientConnection);  
      }  
 } catch (Exception e) {  
      logger.error("flush error : {}", e);  
 }  

当flushNow方法一次没有写完的时候不能继续直接注册写事件,循环写入。原因:当系统的资源十分匮乏的时候,内核BUFF有可能没有写入空间,这个时候channel.write()就会返回0字节,这个时候循环写入就会无限的注册写事件,导致updator对象内存溢出。需要在注册写事件以后跳出循环,等待selector.select()方法唤醒后再进行处理(理由是selector.doSelect()方法会对updator对象做一次清理)。
感谢楼主、阿男帮忙分析问题。
阿男bluedash 2012-10-23
Cool! :-)
stepinto 2013-01-14
delete
stepinto 2013-01-14
delete
stepinto 2013-01-14
delete
Global site tag (gtag.js) - Google Analytics