1
17 package org.apache.tomcat.util.net;
18
19 import java.io.EOFException;
20 import java.io.File;
21 import java.io.FileInputStream;
22 import java.io.IOException;
23 import java.net.InetAddress;
24 import java.net.InetSocketAddress;
25 import java.net.SocketTimeoutException;
26 import java.nio.ByteBuffer;
27 import java.nio.channels.CancelledKeyException;
28 import java.nio.channels.Channel;
29 import java.nio.channels.ClosedChannelException;
30 import java.nio.channels.CompletionHandler;
31 import java.nio.channels.FileChannel;
32 import java.nio.channels.NetworkChannel;
33 import java.nio.channels.SelectionKey;
34 import java.nio.channels.Selector;
35 import java.nio.channels.ServerSocketChannel;
36 import java.nio.channels.SocketChannel;
37 import java.nio.channels.WritableByteChannel;
38 import java.util.ConcurrentModificationException;
39 import java.util.Iterator;
40 import java.util.concurrent.CountDownLatch;
41 import java.util.concurrent.Semaphore;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicLong;
44
45 import javax.net.ssl.SSLEngine;
46 import javax.net.ssl.SSLSession;
47
48 import org.apache.juli.logging.Log;
49 import org.apache.juli.logging.LogFactory;
50 import org.apache.tomcat.util.ExceptionUtils;
51 import org.apache.tomcat.util.IntrospectionUtils;
52 import org.apache.tomcat.util.collections.SynchronizedQueue;
53 import org.apache.tomcat.util.collections.SynchronizedStack;
54 import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
55 import org.apache.tomcat.util.net.NioChannel.ClosedNioChannel;
56 import org.apache.tomcat.util.net.jsse.JSSESupport;
57
58
72 public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> {
73
74
75
76
77
78 private static final Log log = LogFactory.getLog(NioEndpoint.class);
79
80
81 public static final int OP_REGISTER = 0x100;
82
83
84
85 private NioSelectorPool selectorPool = new NioSelectorPool();
86
87
90 private volatile ServerSocketChannel serverSock = null;
91
92
95 private volatile CountDownLatch stopLatch = null;
96
97
100 private SynchronizedStack<PollerEvent> eventCache;
101
102
105 private SynchronizedStack<NioChannel> nioChannels;
106
107
108
109
110
111
114 @Override
115 public boolean setProperty(String name, String value) {
116 final String selectorPoolName = "selectorPool.";
117 try {
118 if (name.startsWith(selectorPoolName)) {
119 return IntrospectionUtils.setProperty(selectorPool, name.substring(selectorPoolName.length()), value);
120 } else {
121 return super.setProperty(name, value);
122 }
123 } catch (Exception e) {
124 log.error(sm.getString("endpoint.setAttributeError", name, value), e);
125 return false;
126 }
127 }
128
129
130
133 private boolean useInheritedChannel = false;
134 public void setUseInheritedChannel(boolean useInheritedChannel) { this.useInheritedChannel = useInheritedChannel; }
135 public boolean getUseInheritedChannel() { return useInheritedChannel; }
136
137
140 private int pollerThreadPriority = Thread.NORM_PRIORITY;
141 public void setPollerThreadPriority(int pollerThreadPriority) { this.pollerThreadPriority = pollerThreadPriority; }
142 public int getPollerThreadPriority() { return pollerThreadPriority; }
143
144
145
152 @Deprecated
153 public void setPollerThreadCount(int pollerThreadCount) { }
154
161 @Deprecated
162 public int getPollerThreadCount() { return 1; }
163
164 private long selectorTimeout = 1000;
165 public void setSelectorTimeout(long timeout) { this.selectorTimeout = timeout;}
166 public long getSelectorTimeout() { return this.selectorTimeout; }
167
168
171 private Poller poller = null;
172
173
174 public void setSelectorPool(NioSelectorPool selectorPool) {
175 this.selectorPool = selectorPool;
176 }
177
178
181 @Override
182 public boolean getDeferAccept() {
183
184 return false;
185 }
186
187
188
189
190
196 public int getKeepAliveCount() {
197 if (poller == null) {
198 return 0;
199 } else {
200 return poller.getKeyCount();
201 }
202 }
203
204
205
206
207
210 @Override
211 public void bind() throws Exception {
212 initServerSocket();
213
214 setStopLatch(new CountDownLatch(1));
215
216
217 initialiseSsl();
218
219 selectorPool.open(getName());
220 }
221
222
223
224 protected void initServerSocket() throws Exception {
225 if (!getUseInheritedChannel()) {
226 serverSock = ServerSocketChannel.open();
227 socketProperties.setProperties(serverSock.socket());
228 InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
229 serverSock.socket().bind(addr,getAcceptCount());
230 } else {
231
232 Channel ic = System.inheritedChannel();
233 if (ic instanceof ServerSocketChannel) {
234 serverSock = (ServerSocketChannel) ic;
235 }
236 if (serverSock == null) {
237 throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
238 }
239 }
240 serverSock.configureBlocking(true);
241 }
242
243
244
247 @Override
248 public void startInternal() throws Exception {
249
250 if (!running) {
251 running = true;
252 paused = false;
253
254 if (socketProperties.getProcessorCache() != 0) {
255 processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
256 socketProperties.getProcessorCache());
257 }
258 if (socketProperties.getEventCache() != 0) {
259 eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
260 socketProperties.getEventCache());
261 }
262 if (socketProperties.getBufferPool() != 0) {
263 nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
264 socketProperties.getBufferPool());
265 }
266
267
268 if (getExecutor() == null) {
269 createExecutor();
270 }
271
272 initializeConnectionLatch();
273
274
275 poller = new Poller();
276 Thread pollerThread = new Thread(poller, getName() + "-ClientPoller");
277 pollerThread.setPriority(threadPriority);
278 pollerThread.setDaemon(true);
279 pollerThread.start();
280
281 startAcceptorThread();
282 }
283 }
284
285
286
289 @Override
290 public void stopInternal() {
291 if (!paused) {
292 pause();
293 }
294 if (running) {
295 running = false;
296 if (poller != null) {
297 poller.destroy();
298 poller = null;
299 }
300 try {
301 if (!getStopLatch().await(selectorTimeout + 100, TimeUnit.MILLISECONDS)) {
302 log.warn(sm.getString("endpoint.nio.stopLatchAwaitFail"));
303 }
304 } catch (InterruptedException e) {
305 log.warn(sm.getString("endpoint.nio.stopLatchAwaitInterrupted"), e);
306 }
307 shutdownExecutor();
308 if (eventCache != null) {
309 eventCache.clear();
310 eventCache = null;
311 }
312 if (nioChannels != null) {
313 nioChannels.clear();
314 nioChannels = null;
315 }
316 if (processorCache != null) {
317 processorCache.clear();
318 processorCache = null;
319 }
320 }
321 }
322
323
324
327 @Override
328 public void unbind() throws Exception {
329 if (log.isDebugEnabled()) {
330 log.debug("Destroy initiated for " +
331 new InetSocketAddress(getAddress(),getPortWithOffset()));
332 }
333 if (running) {
334 stop();
335 }
336 doCloseServerSocket();
337 destroySsl();
338 super.unbind();
339 if (getHandler() != null ) {
340 getHandler().recycle();
341 }
342 selectorPool.close();
343 if (log.isDebugEnabled()) {
344 log.debug("Destroy completed for " +
345 new InetSocketAddress(getAddress(), getPortWithOffset()));
346 }
347 }
348
349
350 @Override
351 protected void doCloseServerSocket() throws IOException {
352 if (!getUseInheritedChannel() && serverSock != null) {
353
354 serverSock.close();
355 }
356 serverSock = null;
357 }
358
359
360
361 protected NioSelectorPool getSelectorPool() {
362 return selectorPool;
363 }
364
365
366 protected SynchronizedStack<NioChannel> getNioChannels() {
367 return nioChannels;
368 }
369
370
371 protected Poller getPoller() {
372 return poller;
373 }
374
375
376 protected CountDownLatch getStopLatch() {
377 return stopLatch;
378 }
379
380
381 protected void setStopLatch(CountDownLatch stopLatch) {
382 this.stopLatch = stopLatch;
383 }
384
385
386
393 @Override
394 protected boolean setSocketOptions(SocketChannel socket) {
395 NioSocketWrapper socketWrapper = null;
396 try {
397
398 NioChannel channel = null;
399 if (nioChannels != null) {
400 channel = nioChannels.pop();
401 }
402 if (channel == null) {
403 SocketBufferHandler bufhandler = new SocketBufferHandler(
404 socketProperties.getAppReadBufSize(),
405 socketProperties.getAppWriteBufSize(),
406 socketProperties.getDirectBuffer());
407 if (isSSLEnabled()) {
408 channel = new SecureNioChannel(bufhandler, selectorPool, this);
409 } else {
410 channel = new NioChannel(bufhandler);
411 }
412 }
413 NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);
414 channel.reset(socket, newWrapper);
415 connections.put(socket, newWrapper);
416 socketWrapper = newWrapper;
417
418
419
420 socket.configureBlocking(false);
421 socketProperties.setProperties(socket.socket());
422
423 socketWrapper.setReadTimeout(getConnectionTimeout());
424 socketWrapper.setWriteTimeout(getConnectionTimeout());
425 socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
426 socketWrapper.setSecure(isSSLEnabled());
427 poller.register(channel, socketWrapper);
428 return true;
429 } catch (Throwable t) {
430 ExceptionUtils.handleThrowable(t);
431 try {
432 log.error(sm.getString("endpoint.socketOptionsError"), t);
433 } catch (Throwable tt) {
434 ExceptionUtils.handleThrowable(tt);
435 }
436 if (socketWrapper == null) {
437 destroySocket(socket);
438 }
439 }
440
441 return false;
442 }
443
444
445 @Override
446 protected void destroySocket(SocketChannel socket) {
447 countDownConnection();
448 try {
449 socket.close();
450 } catch (IOException ioe) {
451 if (log.isDebugEnabled()) {
452 log.debug(sm.getString("endpoint.err.close"), ioe);
453 }
454 }
455 }
456
457
458 @Override
459 protected NetworkChannel getServerSocket() {
460 return serverSock;
461 }
462
463
464 @Override
465 protected SocketChannel serverSocketAccept() throws Exception {
466 return serverSock.accept();
467 }
468
469
470 @Override
471 protected Log getLog() {
472 return log;
473 }
474
475
476 @Override
477 protected SocketProcessorBase<NioChannel> createSocketProcessor(
478 SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
479 return new SocketProcessor(socketWrapper, event);
480 }
481
482
483
484
487 public static class PollerEvent implements Runnable {
488
489 private NioChannel socket;
490 private int interestOps;
491
492 public PollerEvent(NioChannel ch, int intOps) {
493 reset(ch, intOps);
494 }
495
496 public void reset(NioChannel ch, int intOps) {
497 socket = ch;
498 interestOps = intOps;
499 }
500
501 public void reset() {
502 reset(null, 0);
503 }
504
505 @Override
506 public void run() {
507 if (interestOps == OP_REGISTER) {
508 try {
509 socket.getIOChannel().register(socket.getSocketWrapper().getPoller().getSelector(), SelectionKey.OP_READ, socket.getSocketWrapper());
510 } catch (Exception x) {
511 log.error(sm.getString("endpoint.nio.registerFail"), x);
512 }
513 } else {
514 final SelectionKey key = socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector());
515 try {
516 if (key == null) {
517
518
519
520
521
522 try {
523 socket.socketWrapper.close();
524 } catch (Exception ignore) {
525 }
526 } else {
527 final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
528 if (socketWrapper != null) {
529
530 int ops = key.interestOps() | interestOps;
531 socketWrapper.interestOps(ops);
532 key.interestOps(ops);
533 } else {
534 socket.getSocketWrapper().getPoller().cancelledKey(key, socket.getSocketWrapper());
535 }
536 }
537 } catch (CancelledKeyException ckx) {
538 try {
539 socket.getSocketWrapper().getPoller().cancelledKey(key, socket.getSocketWrapper());
540 } catch (Exception ignore) {}
541 }
542 }
543 }
544
545 @Override
546 public String toString() {
547 return "Poller event: socket [" + socket + "], socketWrapper [" + socket.getSocketWrapper() +
548 "], interestOps [" + interestOps + "]";
549 }
550 }
551
552
555 public class Poller implements Runnable {
556
557 private Selector selector;
558 private final SynchronizedQueue<PollerEvent> events =
559 new SynchronizedQueue<>();
560
561 private volatile boolean close = false;
562
563 private long nextExpiration = 0;
564
565 private AtomicLong wakeupCounter = new AtomicLong(0);
566
567 private volatile int keyCount = 0;
568
569 public Poller() throws IOException {
570 this.selector = Selector.open();
571 }
572
573 public int getKeyCount() { return keyCount; }
574
575 public Selector getSelector() { return selector; }
576
577
580 protected void destroy() {
581
582
583
584 close = true;
585 selector.wakeup();
586 }
587
588 private void addEvent(PollerEvent event) {
589 events.offer(event);
590 if (wakeupCounter.incrementAndGet() == 0) {
591 selector.wakeup();
592 }
593 }
594
595
605 public void add(NioSocketWrapper socketWrapper, int interestOps) {
606 PollerEvent r = null;
607 if (eventCache != null) {
608 r = eventCache.pop();
609 }
610 if (r == null) {
611 r = new PollerEvent(socketWrapper.getSocket(), interestOps);
612 } else {
613 r.reset(socketWrapper.getSocket(), interestOps);
614 }
615 addEvent(r);
616 if (close) {
617 processSocket(socketWrapper, SocketEvent.STOP, false);
618 }
619 }
620
621
627 public boolean events() {
628 boolean result = false;
629
630 PollerEvent pe = null;
631 for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
632 result = true;
633 try {
634 pe.run();
635 pe.reset();
636 if (running && !paused && eventCache != null) {
637 eventCache.push(pe);
638 }
639 } catch ( Throwable x ) {
640 log.error(sm.getString("endpoint.nio.pollerEventError"), x);
641 }
642 }
643
644 return result;
645 }
646
647
653 public void register(final NioChannel socket, final NioSocketWrapper socketWrapper) {
654 socketWrapper.interestOps(SelectionKey.OP_READ);
655 PollerEvent r = null;
656 if (eventCache != null) {
657 r = eventCache.pop();
658 }
659 if (r == null) {
660 r = new PollerEvent(socket, OP_REGISTER);
661 } else {
662 r.reset(socket, OP_REGISTER);
663 }
664 addEvent(r);
665 }
666
667 public void cancelledKey(SelectionKey sk, SocketWrapperBase<NioChannel> socketWrapper) {
668 try {
669
670
671 if (sk != null) {
672 sk.attach(null);
673 if (sk.isValid()) {
674 sk.cancel();
675 }
676 }
677 } catch (Throwable e) {
678 ExceptionUtils.handleThrowable(e);
679 if (log.isDebugEnabled()) {
680 log.error(sm.getString("endpoint.debug.channelCloseFail"), e);
681 }
682 } finally {
683 if (socketWrapper != null) {
684 socketWrapper.close();
685 }
686 }
687 }
688
689
694 @Override
695 public void run() {
696
697 while (true) {
698
699 boolean hasEvents = false;
700
701 try {
702 if (!close) {
703 hasEvents = events();
704 if (wakeupCounter.getAndSet(-1) > 0) {
705
706
707 keyCount = selector.selectNow();
708 } else {
709 keyCount = selector.select(selectorTimeout);
710 }
711 wakeupCounter.set(0);
712 }
713 if (close) {
714 events();
715 timeout(0, false);
716 try {
717 selector.close();
718 } catch (IOException ioe) {
719 log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
720 }
721 break;
722 }
723 } catch (Throwable x) {
724 ExceptionUtils.handleThrowable(x);
725 log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
726 continue;
727 }
728
729 if (keyCount == 0) {
730 hasEvents = (hasEvents | events());
731 }
732
733 Iterator<SelectionKey> iterator =
734 keyCount > 0 ? selector.selectedKeys().iterator() : null;
735
736
737 while (iterator != null && iterator.hasNext()) {
738 SelectionKey sk = iterator.next();
739 NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
740
741
742 if (socketWrapper == null) {
743 iterator.remove();
744 } else {
745 iterator.remove();
746 processKey(sk, socketWrapper);
747 }
748 }
749
750
751 timeout(keyCount,hasEvents);
752 }
753
754 getStopLatch().countDown();
755 }
756
757 protected void processKey(SelectionKey sk, NioSocketWrapper socketWrapper) {
758 try {
759 if (close) {
760 cancelledKey(sk, socketWrapper);
761 } else if (sk.isValid() && socketWrapper != null) {
762 if (sk.isReadable() || sk.isWritable()) {
763 if (socketWrapper.getSendfileData() != null) {
764 processSendfile(sk, socketWrapper, false);
765 } else {
766 unreg(sk, socketWrapper, sk.readyOps());
767 boolean closeSocket = false;
768
769 if (sk.isReadable()) {
770 if (socketWrapper.readOperation != null) {
771 if (!socketWrapper.readOperation.process()) {
772 closeSocket = true;
773 }
774 } else if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
775 closeSocket = true;
776 }
777 }
778 if (!closeSocket && sk.isWritable()) {
779 if (socketWrapper.writeOperation != null) {
780 if (!socketWrapper.writeOperation.process()) {
781 closeSocket = true;
782 }
783 } else if (!processSocket(socketWrapper, SocketEvent.OPEN_WRITE, true)) {
784 closeSocket = true;
785 }
786 }
787 if (closeSocket) {
788 cancelledKey(sk, socketWrapper);
789 }
790 }
791 }
792 } else {
793
794 cancelledKey(sk, socketWrapper);
795 }
796 } catch (CancelledKeyException ckx) {
797 cancelledKey(sk, socketWrapper);
798 } catch (Throwable t) {
799 ExceptionUtils.handleThrowable(t);
800 log.error(sm.getString("endpoint.nio.keyProcessingError"), t);
801 }
802 }
803
804 public SendfileState processSendfile(SelectionKey sk, NioSocketWrapper socketWrapper,
805 boolean calledByProcessor) {
806 NioChannel sc = null;
807 try {
808 unreg(sk, socketWrapper, sk.readyOps());
809 SendfileData sd = socketWrapper.getSendfileData();
810
811 if (log.isTraceEnabled()) {
812 log.trace("Processing send file for: " + sd.fileName);
813 }
814
815 if (sd.fchannel == null) {
816
817 File f = new File(sd.fileName);
818 @SuppressWarnings("resource")
819 FileInputStream fis = new FileInputStream(f);
820 sd.fchannel = fis.getChannel();
821 }
822
823
824 sc = socketWrapper.getSocket();
825
826 WritableByteChannel wc = ((sc instanceof SecureNioChannel) ? sc : sc.getIOChannel());
827
828
829 if (sc.getOutboundRemaining() > 0) {
830 if (sc.flushOutbound()) {
831 socketWrapper.updateLastWrite();
832 }
833 } else {
834 long written = sd.fchannel.transferTo(sd.pos, sd.length, wc);
835 if (written > 0) {
836 sd.pos += written;
837 sd.length -= written;
838 socketWrapper.updateLastWrite();
839 } else {
840
841
842 if (sd.fchannel.size() <= sd.pos) {
843 throw new IOException(sm.getString("endpoint.sendfile.tooMuchData"));
844 }
845 }
846 }
847 if (sd.length <= 0 && sc.getOutboundRemaining()<=0) {
848 if (log.isDebugEnabled()) {
849 log.debug("Send file complete for: " + sd.fileName);
850 }
851 socketWrapper.setSendfileData(null);
852 try {
853 sd.fchannel.close();
854 } catch (Exception ignore) {
855 }
856
857
858
859 if (!calledByProcessor) {
860 switch (sd.keepAliveState) {
861 case NONE: {
862 if (log.isDebugEnabled()) {
863 log.debug("Send file connection is being closed");
864 }
865 poller.cancelledKey(sk, socketWrapper);
866 break;
867 }
868 case PIPELINED: {
869 if (log.isDebugEnabled()) {
870 log.debug("Connection is keep alive, processing pipe-lined data");
871 }
872 if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
873 poller.cancelledKey(sk, socketWrapper);
874 }
875 break;
876 }
877 case OPEN: {
878 if (log.isDebugEnabled()) {
879 log.debug("Connection is keep alive, registering back for OP_READ");
880 }
881 reg(sk, socketWrapper, SelectionKey.OP_READ);
882 break;
883 }
884 }
885 }
886 return SendfileState.DONE;
887 } else {
888 if (log.isDebugEnabled()) {
889 log.debug("OP_WRITE for sendfile: " + sd.fileName);
890 }
891 if (calledByProcessor) {
892 add(socketWrapper, SelectionKey.OP_WRITE);
893 } else {
894 reg(sk, socketWrapper, SelectionKey.OP_WRITE);
895 }
896 return SendfileState.PENDING;
897 }
898 } catch (IOException e) {
899 if (log.isDebugEnabled()) {
900 log.debug("Unable to complete sendfile request:", e);
901 }
902 if (!calledByProcessor && sc != null) {
903 poller.cancelledKey(sk, socketWrapper);
904 }
905 return SendfileState.ERROR;
906 } catch (Throwable t) {
907 log.error(sm.getString("endpoint.sendfile.error"), t);
908 if (!calledByProcessor && sc != null) {
909 poller.cancelledKey(sk, socketWrapper);
910 }
911 return SendfileState.ERROR;
912 }
913 }
914
915 protected void unreg(SelectionKey sk, NioSocketWrapper socketWrapper, int readyOps) {
916
917 reg(sk, socketWrapper, sk.interestOps() & (~readyOps));
918 }
919
920 protected void reg(SelectionKey sk, NioSocketWrapper socketWrapper, int intops) {
921 sk.interestOps(intops);
922 socketWrapper.interestOps(intops);
923 }
924
925 protected void timeout(int keyCount, boolean hasEvents) {
926 long now = System.currentTimeMillis();
927
928
929
930
931
932
933
934 if (nextExpiration > 0 && (keyCount > 0 || hasEvents) && (now < nextExpiration) && !close) {
935 return;
936 }
937 int keycount = 0;
938 try {
939 for (SelectionKey key : selector.keys()) {
940 keycount++;
941 try {
942 NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
943 if (socketWrapper == null) {
944
945 cancelledKey(key, null);
946 } else if (close) {
947 key.interestOps(0);
948
949 socketWrapper.interestOps(0);
950 cancelledKey(key, socketWrapper);
951 } else if ((socketWrapper.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ ||
952 (socketWrapper.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
953 boolean readTimeout = false;
954 boolean writeTimeout = false;
955
956 if ((socketWrapper.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
957 long delta = now - socketWrapper.getLastRead();
958 long timeout = socketWrapper.getReadTimeout();
959 if (timeout > 0 && delta > timeout) {
960 readTimeout = true;
961 }
962 }
963
964 if (!readTimeout && (socketWrapper.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
965 long delta = now - socketWrapper.getLastWrite();
966 long timeout = socketWrapper.getWriteTimeout();
967 if (timeout > 0 && delta > timeout) {
968 writeTimeout = true;
969 }
970 }
971 if (readTimeout || writeTimeout) {
972 key.interestOps(0);
973
974 socketWrapper.interestOps(0);
975 socketWrapper.setError(new SocketTimeoutException());
976 if (readTimeout && socketWrapper.readOperation != null) {
977 if (!socketWrapper.readOperation.process()) {
978 cancelledKey(key, socketWrapper);
979 }
980 } else if (writeTimeout && socketWrapper.writeOperation != null) {
981 if (!socketWrapper.writeOperation.process()) {
982 cancelledKey(key, socketWrapper);
983 }
984 } else if (!processSocket(socketWrapper, SocketEvent.ERROR, true)) {
985 cancelledKey(key, socketWrapper);
986 }
987 }
988 }
989 } catch (CancelledKeyException ckx) {
990 cancelledKey(key, (NioSocketWrapper) key.attachment());
991 }
992 }
993 } catch (ConcurrentModificationException cme) {
994
995 log.warn(sm.getString("endpoint.nio.timeoutCme"), cme);
996 }
997
998 long prevExp = nextExpiration;
999 nextExpiration = System.currentTimeMillis() +
1000 socketProperties.getTimeoutInterval();
1001 if (log.isTraceEnabled()) {
1002 log.trace("timeout completed: keys processed=" + keycount +
1003 "; now=" + now + "; nextExpiration=" + prevExp +
1004 "; keyCount=" + keyCount + "; hasEvents=" + hasEvents +
1005 "; eval=" + ((now < prevExp) && (keyCount>0 || hasEvents) && (!close) ));
1006 }
1007
1008 }
1009 }
1010
1011
1012
1013 public static class NioSocketWrapper extends SocketWrapperBase<NioChannel> {
1014
1015 private final NioSelectorPool pool;
1016 private final SynchronizedStack<NioChannel> nioChannels;
1017 private final Poller poller;
1018
1019 private int interestOps = 0;
1020 private CountDownLatch readLatch = null;
1021 private CountDownLatch writeLatch = null;
1022 private volatile SendfileData sendfileData = null;
1023 private volatile long lastRead = System.currentTimeMillis();
1024 private volatile long lastWrite = lastRead;
1025
1026 public NioSocketWrapper(NioChannel channel, NioEndpoint endpoint) {
1027 super(channel, endpoint);
1028 pool = endpoint.getSelectorPool();
1029 nioChannels = endpoint.getNioChannels();
1030 poller = endpoint.getPoller();
1031 socketBufferHandler = channel.getBufHandler();
1032 }
1033
1034 public Poller getPoller() { return poller; }
1035 public int interestOps() { return interestOps; }
1036 public int interestOps(int ops) { this.interestOps = ops; return ops; }
1037 public CountDownLatch getReadLatch() { return readLatch; }
1038 public CountDownLatch getWriteLatch() { return writeLatch; }
1039 protected CountDownLatch resetLatch(CountDownLatch latch) {
1040 if (latch == null || latch.getCount() == 0) {
1041 return null;
1042 } else {
1043 throw new IllegalStateException(sm.getString("endpoint.nio.latchMustBeZero"));
1044 }
1045 }
1046 public void resetReadLatch() { readLatch = resetLatch(readLatch); }
1047 public void resetWriteLatch() { writeLatch = resetLatch(writeLatch); }
1048
1049 protected CountDownLatch startLatch(CountDownLatch latch, int cnt) {
1050 if (latch == null || latch.getCount() == 0) {
1051 return new CountDownLatch(cnt);
1052 } else {
1053 throw new IllegalStateException(sm.getString("endpoint.nio.latchMustBeZero"));
1054 }
1055 }
1056 public void startReadLatch(int cnt) { readLatch = startLatch(readLatch, cnt); }
1057 public void startWriteLatch(int cnt) { writeLatch = startLatch(writeLatch, cnt); }
1058
1059 protected void awaitLatch(CountDownLatch latch, long timeout, TimeUnit unit) throws InterruptedException {
1060 if (latch == null) {
1061 throw new IllegalStateException(sm.getString("endpoint.nio.nullLatch"));
1062 }
1063
1064
1065
1066 latch.await(timeout, unit);
1067 }
1068 public void awaitReadLatch(long timeout, TimeUnit unit) throws InterruptedException { awaitLatch(readLatch, timeout, unit); }
1069 public void awaitWriteLatch(long timeout, TimeUnit unit) throws InterruptedException { awaitLatch(writeLatch, timeout, unit); }
1070
1071 public void setSendfileData(SendfileData sf) { this.sendfileData = sf;}
1072 public SendfileData getSendfileData() { return this.sendfileData; }
1073
1074 public void updateLastWrite() { lastWrite = System.currentTimeMillis(); }
1075 public long getLastWrite() { return lastWrite; }
1076 public void updateLastRead() { lastRead = System.currentTimeMillis(); }
1077 public long getLastRead() { return lastRead; }
1078
1079 @Override
1080 public boolean isReadyForRead() throws IOException {
1081 socketBufferHandler.configureReadBufferForRead();
1082
1083 if (socketBufferHandler.getReadBuffer().remaining() > 0) {
1084 return true;
1085 }
1086
1087 fillReadBuffer(false);
1088
1089 boolean isReady = socketBufferHandler.getReadBuffer().position() > 0;
1090 return isReady;
1091 }
1092
1093
1094 @Override
1095 public int read(boolean block, byte[] b, int off, int len) throws IOException {
1096 int nRead = populateReadBuffer(b, off, len);
1097 if (nRead > 0) {
1098 return nRead;
1099
1106 }
1107
1108
1109 nRead = fillReadBuffer(block);
1110 updateLastRead();
1111
1112
1113
1114 if (nRead > 0) {
1115 socketBufferHandler.configureReadBufferForRead();
1116 nRead = Math.min(nRead, len);
1117 socketBufferHandler.getReadBuffer().get(b, off, nRead);
1118 }
1119 return nRead;
1120 }
1121
1122
1123 @Override
1124 public int read(boolean block, ByteBuffer to) throws IOException {
1125 int nRead = populateReadBuffer(to);
1126 if (nRead > 0) {
1127 return nRead;
1128
1135 }
1136
1137
1138 int limit = socketBufferHandler.getReadBuffer().capacity();
1139 if (to.remaining() >= limit) {
1140 to.limit(to.position() + limit);
1141 nRead = fillReadBuffer(block, to);
1142 if (log.isDebugEnabled()) {
1143 log.debug("Socket: [" + this + "], Read direct from socket: [" + nRead + "]");
1144 }
1145 updateLastRead();
1146 } else {
1147
1148 nRead = fillReadBuffer(block);
1149 if (log.isDebugEnabled()) {
1150 log.debug("Socket: [" + this + "], Read into buffer: [" + nRead + "]");
1151 }
1152 updateLastRead();
1153
1154
1155
1156 if (nRead > 0) {
1157 nRead = populateReadBuffer(to);
1158 }
1159 }
1160 return nRead;
1161 }
1162
1163
1164 @Override
1165 protected void doClose() {
1166 if (log.isDebugEnabled()) {
1167 log.debug("Calling [" + getEndpoint() + "].closeSocket([" + this + "])");
1168 }
1169 try {
1170 getEndpoint().connections.remove(getSocket().getIOChannel());
1171 if (getSocket().isOpen()) {
1172 getSocket().close(true);
1173 }
1174 if (getEndpoint().running && !getEndpoint().paused) {
1175 if (nioChannels == null || !nioChannels.push(getSocket())) {
1176 getSocket().free();
1177 }
1178 }
1179 } catch (Throwable e) {
1180 ExceptionUtils.handleThrowable(e);
1181 if (log.isDebugEnabled()) {
1182 log.error(sm.getString("endpoint.debug.channelCloseFail"), e);
1183 }
1184 } finally {
1185 socketBufferHandler = SocketBufferHandler.EMPTY;
1186 nonBlockingWriteBuffer.clear();
1187 reset(NioChannel.CLOSED_NIO_CHANNEL);
1188 }
1189 try {
1190 SendfileData data = getSendfileData();
1191 if (data != null && data.fchannel != null && data.fchannel.isOpen()) {
1192 data.fchannel.close();
1193 }
1194 } catch (Throwable e) {
1195 ExceptionUtils.handleThrowable(e);
1196 if (log.isDebugEnabled()) {
1197 log.error(sm.getString("endpoint.sendfile.closeError"), e);
1198 }
1199 }
1200 }
1201
1202 private int fillReadBuffer(boolean block) throws IOException {
1203 socketBufferHandler.configureReadBufferForWrite();
1204 return fillReadBuffer(block, socketBufferHandler.getReadBuffer());
1205 }
1206
1207
1208 private int fillReadBuffer(boolean block, ByteBuffer to) throws IOException {
1209 int nRead;
1210 NioChannel socket = getSocket();
1211 if (socket instanceof ClosedNioChannel) {
1212 throw new ClosedChannelException();
1213 }
1214 if (block) {
1215 Selector selector = null;
1216 try {
1217 selector = pool.get();
1218 } catch (IOException x) {
1219
1220 }
1221 try {
1222 nRead = pool.read(to, socket, selector, getReadTimeout());
1223 } finally {
1224 if (selector != null) {
1225 pool.put(selector);
1226 }
1227 }
1228 } else {
1229 nRead = socket.read(to);
1230 if (nRead == -1) {
1231 throw new EOFException();
1232 }
1233 }
1234 return nRead;
1235 }
1236
1237
1238 @Override
1239 protected void doWrite(boolean block, ByteBuffer from) throws IOException {
1240 NioChannel socket = getSocket();
1241 if (socket instanceof ClosedNioChannel) {
1242 throw new ClosedChannelException();
1243 }
1244 if (block) {
1245 long writeTimeout = getWriteTimeout();
1246 Selector selector = null;
1247 try {
1248 selector = pool.get();
1249 } catch (IOException x) {
1250
1251 }
1252 try {
1253 pool.write(from, socket, selector, writeTimeout);
1254
1255 do {
1256 } while (!socket.flush(true, selector, writeTimeout));
1257 } finally {
1258 if (selector != null) {
1259 pool.put(selector);
1260 }
1261 }
1262
1263
1264
1265
1266 } else {
1267 int n = 0;
1268 do {
1269 n = socket.write(from);
1270 if (n == -1) {
1271 throw new EOFException();
1272 }
1273 } while (n > 0 && from.hasRemaining());
1274 }
1275 updateLastWrite();
1276 }
1277
1278
1279 @Override
1280 public void registerReadInterest() {
1281 if (log.isDebugEnabled()) {
1282 log.debug(sm.getString("endpoint.debug.registerRead", this));
1283 }
1284 getPoller().add(this, SelectionKey.OP_READ);
1285 }
1286
1287
1288 @Override
1289 public void registerWriteInterest() {
1290 if (log.isDebugEnabled()) {
1291 log.debug(sm.getString("endpoint.debug.registerWrite", this));
1292 }
1293 getPoller().add(this, SelectionKey.OP_WRITE);
1294 }
1295
1296
1297 @Override
1298 public SendfileDataBase createSendfileData(String filename, long pos, long length) {
1299 return new SendfileData(filename, pos, length);
1300 }
1301
1302
1303 @Override
1304 public SendfileState processSendfile(SendfileDataBase sendfileData) {
1305 setSendfileData((SendfileData) sendfileData);
1306 SelectionKey key = getSocket().getIOChannel().keyFor(getPoller().getSelector());
1307
1308 return getPoller().processSendfile(key, this, true);
1309 }
1310
1311
1312 @Override
1313 protected void populateRemoteAddr() {
1314 SocketChannel sc = getSocket().getIOChannel();
1315 if (sc != null) {
1316 InetAddress inetAddr = sc.socket().getInetAddress();
1317 if (inetAddr != null) {
1318 remoteAddr = inetAddr.getHostAddress();
1319 }
1320 }
1321 }
1322
1323
1324 @Override
1325 protected void populateRemoteHost() {
1326 SocketChannel sc = getSocket().getIOChannel();
1327 if (sc != null) {
1328 InetAddress inetAddr = sc.socket().getInetAddress();
1329 if (inetAddr != null) {
1330 remoteHost = inetAddr.getHostName();
1331 if (remoteAddr == null) {
1332 remoteAddr = inetAddr.getHostAddress();
1333 }
1334 }
1335 }
1336 }
1337
1338
1339 @Override
1340 protected void populateRemotePort() {
1341 SocketChannel sc = getSocket().getIOChannel();
1342 if (sc != null) {
1343 remotePort = sc.socket().getPort();
1344 }
1345 }
1346
1347
1348 @Override
1349 protected void populateLocalName() {
1350 SocketChannel sc = getSocket().getIOChannel();
1351 if (sc != null) {
1352 InetAddress inetAddr = sc.socket().getLocalAddress();
1353 if (inetAddr != null) {
1354 localName = inetAddr.getHostName();
1355 }
1356 }
1357 }
1358
1359
1360 @Override
1361 protected void populateLocalAddr() {
1362 SocketChannel sc = getSocket().getIOChannel();
1363 if (sc != null) {
1364 InetAddress inetAddr = sc.socket().getLocalAddress();
1365 if (inetAddr != null) {
1366 localAddr = inetAddr.getHostAddress();
1367 }
1368 }
1369 }
1370
1371
1372 @Override
1373 protected void populateLocalPort() {
1374 SocketChannel sc = getSocket().getIOChannel();
1375 if (sc != null) {
1376 localPort = sc.socket().getLocalPort();
1377 }
1378 }
1379
1380
1381
1385 @Override
1386 public SSLSupport getSslSupport(String clientCertProvider) {
1387 if (getSocket() instanceof SecureNioChannel) {
1388 SecureNioChannel ch = (SecureNioChannel) getSocket();
1389 SSLEngine sslEngine = ch.getSslEngine();
1390 if (sslEngine != null) {
1391 SSLSession session = sslEngine.getSession();
1392 return ((NioEndpoint) getEndpoint()).getSslImplementation().getSSLSupport(session);
1393 }
1394 }
1395 return null;
1396 }
1397
1398
1399 @Override
1400 public void doClientAuth(SSLSupport sslSupport) throws IOException {
1401 SecureNioChannel sslChannel = (SecureNioChannel) getSocket();
1402 SSLEngine engine = sslChannel.getSslEngine();
1403 if (!engine.getNeedClientAuth()) {
1404
1405 engine.setNeedClientAuth(true);
1406 sslChannel.rehandshake(getEndpoint().getConnectionTimeout());
1407 ((JSSESupport) sslSupport).setSession(engine.getSession());
1408 }
1409 }
1410
1411
1412 @Override
1413 public void setAppReadBufHandler(ApplicationBufferHandler handler) {
1414 getSocket().setAppReadBufHandler(handler);
1415 }
1416
1417 @Override
1418 protected <A> OperationState<A> newOperationState(boolean read,
1419 ByteBuffer[] buffers, int offset, int length,
1420 BlockingMode block, long timeout, TimeUnit unit, A attachment,
1421 CompletionCheck check, CompletionHandler<Long, ? super A> handler,
1422 Semaphore semaphore, VectoredIOCompletionHandler<A> completion) {
1423 return new NioOperationState<>(read, buffers, offset, length, block,
1424 timeout, unit, attachment, check, handler, semaphore, completion);
1425 }
1426
1427 private class NioOperationState<A> extends OperationState<A> {
1428 private volatile boolean inline = true;
1429 private NioOperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
1430 BlockingMode block, long timeout, TimeUnit unit, A attachment, CompletionCheck check,
1431 CompletionHandler<Long, ? super A> handler, Semaphore semaphore,
1432 VectoredIOCompletionHandler<A> completion) {
1433 super(read, buffers, offset, length, block,
1434 timeout, unit, attachment, check, handler, semaphore, completion);
1435 }
1436
1437 @Override
1438 protected boolean isInline() {
1439 return inline;
1440 }
1441
1442 @Override
1443 public void run() {
1444
1445
1446 long nBytes = 0;
1447 if (getError() == null) {
1448 try {
1449 synchronized (this) {
1450 if (!completionDone) {
1451
1452
1453 if (log.isDebugEnabled()) {
1454 log.debug("Skip concurrent " + (read ? "read" : "write") + " notification");
1455 }
1456 return;
1457 }
1458 if (read) {
1459
1460 if (!socketBufferHandler.isReadBufferEmpty()) {
1461
1462 socketBufferHandler.configureReadBufferForRead();
1463 for (int i = 0; i < length && !socketBufferHandler.isReadBufferEmpty(); i++) {
1464 nBytes += transfer(socketBufferHandler.getReadBuffer(), buffers[offset + i]);
1465 }
1466 }
1467 if (nBytes == 0) {
1468 nBytes = getSocket().read(buffers, offset, length);
1469 updateLastRead();
1470 }
1471 } else {
1472 boolean doWrite = true;
1473
1474 if (!socketBufferHandler.isWriteBufferEmpty()) {
1475
1476 socketBufferHandler.configureWriteBufferForRead();
1477 do {
1478 nBytes = getSocket().write(socketBufferHandler.getWriteBuffer());
1479 } while (!socketBufferHandler.isWriteBufferEmpty() && nBytes > 0);
1480 if (!socketBufferHandler.isWriteBufferEmpty()) {
1481 doWrite = false;
1482 }
1483
1484 if (nBytes > 0) {
1485 nBytes = 0;
1486 }
1487 }
1488 if (doWrite) {
1489 long n = 0;
1490 do {
1491 n = getSocket().write(buffers, offset, length);
1492 if (n == -1) {
1493 nBytes = n;
1494 } else {
1495 nBytes += n;
1496 }
1497 } while (n > 0);
1498 updateLastWrite();
1499 }
1500 }
1501 if (nBytes != 0 || !buffersArrayHasRemaining(buffers, offset, length)) {
1502 completionDone = false;
1503 }
1504 }
1505 } catch (IOException e) {
1506 setError(e);
1507 }
1508 }
1509 if (nBytes > 0 || (nBytes == 0 && !buffersArrayHasRemaining(buffers, offset, length))) {
1510
1511 completion.completed(Long.valueOf(nBytes), this);
1512 } else if (nBytes < 0 || getError() != null) {
1513 IOException error = getError();
1514 if (error == null) {
1515 error = new EOFException();
1516 }
1517 completion.failed(error, this);
1518 } else {
1519
1520 inline = false;
1521 if (read) {
1522 registerReadInterest();
1523 } else {
1524 registerWriteInterest();
1525 }
1526 }
1527 }
1528
1529 }
1530
1531 }
1532
1533
1534
1535
1536
1540 protected class SocketProcessor extends SocketProcessorBase<NioChannel> {
1541
1542 public SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
1543 super(socketWrapper, event);
1544 }
1545
1546 @Override
1547 protected void doRun() {
1548 NioChannel socket = socketWrapper.getSocket();
1549 SelectionKey key = socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector());
1550 Poller poller = NioEndpoint.this.poller;
1551 if (poller == null) {
1552 socketWrapper.close();
1553 return;
1554 }
1555
1556 try {
1557 int handshake = -1;
1558
1559 try {
1560 if (key != null) {
1561 if (socket.isHandshakeComplete()) {
1562
1563
1564 handshake = 0;
1565 } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
1566 event == SocketEvent.ERROR) {
1567
1568
1569 handshake = -1;
1570 } else {
1571 handshake = socket.handshake(key.isReadable(), key.isWritable());
1572
1573
1574
1575
1576
1577
1578
1579 event = SocketEvent.OPEN_READ;
1580 }
1581 }
1582 } catch (IOException x) {
1583 handshake = -1;
1584 if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);
1585 } catch (CancelledKeyException ckx) {
1586 handshake = -1;
1587 }
1588 if (handshake == 0) {
1589 SocketState state = SocketState.OPEN;
1590
1591 if (event == null) {
1592 state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
1593 } else {
1594 state = getHandler().process(socketWrapper, event);
1595 }
1596 if (state == SocketState.CLOSED) {
1597 poller.cancelledKey(key, socketWrapper);
1598 }
1599 } else if (handshake == -1 ) {
1600 getHandler().process(socketWrapper, SocketEvent.CONNECT_FAIL);
1601 poller.cancelledKey(key, socketWrapper);
1602 } else if (handshake == SelectionKey.OP_READ){
1603 socketWrapper.registerReadInterest();
1604 } else if (handshake == SelectionKey.OP_WRITE){
1605 socketWrapper.registerWriteInterest();
1606 }
1607 } catch (CancelledKeyException cx) {
1608 poller.cancelledKey(key, socketWrapper);
1609 } catch (VirtualMachineError vme) {
1610 ExceptionUtils.handleThrowable(vme);
1611 } catch (Throwable t) {
1612 log.error(sm.getString("endpoint.processing.fail"), t);
1613 poller.cancelledKey(key, socketWrapper);
1614 } finally {
1615 socketWrapper = null;
1616 event = null;
1617
1618 if (running && !paused && processorCache != null) {
1619 processorCache.push(this);
1620 }
1621 }
1622 }
1623 }
1624
1625
1626
1627
1630 public static class SendfileData extends SendfileDataBase {
1631
1632 public SendfileData(String filename, long pos, long length) {
1633 super(filename, pos, length);
1634 }
1635
1636 protected volatile FileChannel fchannel;
1637 }
1638 }
1639