1 /*
2  *  Licensed to the Apache Software Foundation (ASF) under one or more
3  *  contributor license agreements.  See the NOTICE file distributed with
4  *  this work for additional information regarding copyright ownership.
5  *  The ASF licenses this file to You under the Apache License, Version 2.0
6  *  (the "License"); you may not use this file except in compliance with
7  *  the License.  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */

17 package org.apache.tomcat.util.net;
18
19 import java.io.EOFException;
20 import java.io.File;
21 import java.io.FileInputStream;
22 import java.io.IOException;
23 import java.net.InetAddress;
24 import java.net.InetSocketAddress;
25 import java.net.SocketTimeoutException;
26 import java.nio.ByteBuffer;
27 import java.nio.channels.CancelledKeyException;
28 import java.nio.channels.Channel;
29 import java.nio.channels.ClosedChannelException;
30 import java.nio.channels.CompletionHandler;
31 import java.nio.channels.FileChannel;
32 import java.nio.channels.NetworkChannel;
33 import java.nio.channels.SelectionKey;
34 import java.nio.channels.Selector;
35 import java.nio.channels.ServerSocketChannel;
36 import java.nio.channels.SocketChannel;
37 import java.nio.channels.WritableByteChannel;
38 import java.util.ConcurrentModificationException;
39 import java.util.Iterator;
40 import java.util.concurrent.CountDownLatch;
41 import java.util.concurrent.Semaphore;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicLong;
44
45 import javax.net.ssl.SSLEngine;
46 import javax.net.ssl.SSLSession;
47
48 import org.apache.juli.logging.Log;
49 import org.apache.juli.logging.LogFactory;
50 import org.apache.tomcat.util.ExceptionUtils;
51 import org.apache.tomcat.util.IntrospectionUtils;
52 import org.apache.tomcat.util.collections.SynchronizedQueue;
53 import org.apache.tomcat.util.collections.SynchronizedStack;
54 import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
55 import org.apache.tomcat.util.net.NioChannel.ClosedNioChannel;
56 import org.apache.tomcat.util.net.jsse.JSSESupport;
57
58 /**
59  * NIO tailored thread pool, providing the following services:
60  * <ul>
61  * <li>Socket acceptor thread</li>
62  * <li>Socket poller thread</li>
63  * <li>Worker threads pool</li>
64  * </ul>
65  *
66  * When switching to Java 5, there's an opportunity to use the virtual
67  * machine's thread pool.
68  *
69  * @author Mladen Turk
70  * @author Remy Maucherat
71  */

72 public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> {
73
74
75     // -------------------------------------------------------------- Constants
76
77
78     private static final Log log = LogFactory.getLog(NioEndpoint.class);
79
80
81     public static final int OP_REGISTER = 0x100; //register interest op
82
83     // ----------------------------------------------------------------- Fields
84
85     private NioSelectorPool selectorPool = new NioSelectorPool();
86
87     /**
88      * Server socket "pointer".
89      */

90     private volatile ServerSocketChannel serverSock = null;
91
92     /**
93      * Stop latch used to wait for poller stop
94      */

95     private volatile CountDownLatch stopLatch = null;
96
97     /**
98      * Cache for poller events
99      */

100     private SynchronizedStack<PollerEvent> eventCache;
101
102     /**
103      * Bytebuffer cache, each channel holds a set of buffers (two, except for SSL holds four)
104      */

105     private SynchronizedStack<NioChannel> nioChannels;
106
107
108     // ------------------------------------------------------------- Properties
109
110
111     /**
112      * Generic properties, introspected
113      */

114     @Override
115     public boolean setProperty(String name, String value) {
116         final String selectorPoolName = "selectorPool.";
117         try {
118             if (name.startsWith(selectorPoolName)) {
119                 return IntrospectionUtils.setProperty(selectorPool, name.substring(selectorPoolName.length()), value);
120             } else {
121                 return super.setProperty(name, value);
122             }
123         } catch (Exception e) {
124             log.error(sm.getString("endpoint.setAttributeError", name, value), e);
125             return false;
126         }
127     }
128
129
130     /**
131      * Use System.inheritableChannel to obtain channel from stdin/stdout.
132      */

133     private boolean useInheritedChannel = false;
134     public void setUseInheritedChannel(boolean useInheritedChannel) { this.useInheritedChannel = useInheritedChannel; }
135     public boolean getUseInheritedChannel() { return useInheritedChannel; }
136
137     /**
138      * Priority of the poller threads.
139      */

140     private int pollerThreadPriority = Thread.NORM_PRIORITY;
141     public void setPollerThreadPriority(int pollerThreadPriority) { this.pollerThreadPriority = pollerThreadPriority; }
142     public int getPollerThreadPriority() { return pollerThreadPriority; }
143
144
145     /**
146      * NO-OP.
147      *
148      * @param pollerThreadCount Unused
149      *
150      * @deprecated Will be removed in Tomcat 10.
151      */

152     @Deprecated
153     public void setPollerThreadCount(int pollerThreadCount) { }
154     /**
155      * Always returns 1.
156      *
157      * @return Always 1.
158      *
159      * @deprecated Will be removed in Tomcat 10.
160      */

161     @Deprecated
162     public int getPollerThreadCount() { return 1; }
163
164     private long selectorTimeout = 1000;
165     public void setSelectorTimeout(long timeout) { this.selectorTimeout = timeout;}
166     public long getSelectorTimeout() { return this.selectorTimeout; }
167
168     /**
169      * The socket poller.
170      */

171     private Poller poller = null;
172
173
174     public void setSelectorPool(NioSelectorPool selectorPool) {
175         this.selectorPool = selectorPool;
176     }
177
178     /**
179      * Is deferAccept supported?
180      */

181     @Override
182     public boolean getDeferAccept() {
183         // Not supported
184         return false;
185     }
186
187
188     // --------------------------------------------------------- Public Methods
189
190     /**
191      * Number of keep-alive sockets.
192      *
193      * @return The number of sockets currently in the keep-alive state waiting
194      *         for the next request to be received on the socket
195      */

196     public int getKeepAliveCount() {
197         if (poller == null) {
198             return 0;
199         } else {
200             return poller.getKeyCount();
201         }
202     }
203
204
205     // ----------------------------------------------- Public Lifecycle Methods
206
207     /**
208      * Initialize the endpoint.
209      */

210     @Override
211     public void bind() throws Exception {
212         initServerSocket();
213
214         setStopLatch(new CountDownLatch(1));
215
216         // Initialize SSL if needed
217         initialiseSsl();
218
219         selectorPool.open(getName());
220     }
221
222     // Separated out to make it easier for folks that extend NioEndpoint to
223     // implement custom [server]sockets
224     protected void initServerSocket() throws Exception {
225         if (!getUseInheritedChannel()) {
226             serverSock = ServerSocketChannel.open();
227             socketProperties.setProperties(serverSock.socket());
228             InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
229             serverSock.socket().bind(addr,getAcceptCount());
230         } else {
231             // Retrieve the channel provided by the OS
232             Channel ic = System.inheritedChannel();
233             if (ic instanceof ServerSocketChannel) {
234                 serverSock = (ServerSocketChannel) ic;
235             }
236             if (serverSock == null) {
237                 throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
238             }
239         }
240         serverSock.configureBlocking(true); //mimic APR behavior
241     }
242
243
244     /**
245      * Start the NIO endpoint, creating acceptor, poller threads.
246      */

247     @Override
248     public void startInternal() throws Exception {
249
250         if (!running) {
251             running = true;
252             paused = false;
253
254             if (socketProperties.getProcessorCache() != 0) {
255                 processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
256                         socketProperties.getProcessorCache());
257             }
258             if (socketProperties.getEventCache() != 0) {
259                 eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
260                         socketProperties.getEventCache());
261             }
262             if (socketProperties.getBufferPool() != 0) {
263                 nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
264                         socketProperties.getBufferPool());
265             }
266
267             // Create worker collection
268             if (getExecutor() == null) {
269                 createExecutor();
270             }
271
272             initializeConnectionLatch();
273
274             // Start poller thread
275             poller = new Poller();
276             Thread pollerThread = new Thread(poller, getName() + "-ClientPoller");
277             pollerThread.setPriority(threadPriority);
278             pollerThread.setDaemon(true);
279             pollerThread.start();
280
281             startAcceptorThread();
282         }
283     }
284
285
286     /**
287      * Stop the endpoint. This will cause all processing threads to stop.
288      */

289     @Override
290     public void stopInternal() {
291         if (!paused) {
292             pause();
293         }
294         if (running) {
295             running = false;
296             if (poller != null) {
297                 poller.destroy();
298                 poller = null;
299             }
300             try {
301                 if (!getStopLatch().await(selectorTimeout + 100, TimeUnit.MILLISECONDS)) {
302                     log.warn(sm.getString("endpoint.nio.stopLatchAwaitFail"));
303                 }
304             } catch (InterruptedException e) {
305                 log.warn(sm.getString("endpoint.nio.stopLatchAwaitInterrupted"), e);
306             }
307             shutdownExecutor();
308             if (eventCache != null) {
309                 eventCache.clear();
310                 eventCache = null;
311             }
312             if (nioChannels != null) {
313                 nioChannels.clear();
314                 nioChannels = null;
315             }
316             if (processorCache != null) {
317                 processorCache.clear();
318                 processorCache = null;
319             }
320         }
321     }
322
323
324     /**
325      * Deallocate NIO memory pools, and close server socket.
326      */

327     @Override
328     public void unbind() throws Exception {
329         if (log.isDebugEnabled()) {
330             log.debug("Destroy initiated for " +
331                     new InetSocketAddress(getAddress(),getPortWithOffset()));
332         }
333         if (running) {
334             stop();
335         }
336         doCloseServerSocket();
337         destroySsl();
338         super.unbind();
339         if (getHandler() != null ) {
340             getHandler().recycle();
341         }
342         selectorPool.close();
343         if (log.isDebugEnabled()) {
344             log.debug("Destroy completed for " +
345                     new InetSocketAddress(getAddress(), getPortWithOffset()));
346         }
347     }
348
349
350     @Override
351     protected void doCloseServerSocket() throws IOException {
352         if (!getUseInheritedChannel() && serverSock != null) {
353             // Close server socket
354             serverSock.close();
355         }
356         serverSock = null;
357     }
358
359     // ------------------------------------------------------ Protected Methods
360
361     protected NioSelectorPool getSelectorPool() {
362         return selectorPool;
363     }
364
365
366     protected SynchronizedStack<NioChannel> getNioChannels() {
367         return nioChannels;
368     }
369
370
371     protected Poller getPoller() {
372         return poller;
373     }
374
375
376     protected CountDownLatch getStopLatch() {
377         return stopLatch;
378     }
379
380
381     protected void setStopLatch(CountDownLatch stopLatch) {
382         this.stopLatch = stopLatch;
383     }
384
385
386     /**
387      * Process the specified connection.
388      * @param socket The socket channel
389      * @return <code>true</code> if the socket was correctly configured
390      *  and processing may continue, <code>false</code> if the socket needs to be
391      *  close immediately
392      */

393     @Override
394     protected boolean setSocketOptions(SocketChannel socket) {
395         NioSocketWrapper socketWrapper = null;
396         try {
397             // Allocate channel and wrapper
398             NioChannel channel = null;
399             if (nioChannels != null) {
400                 channel = nioChannels.pop();
401             }
402             if (channel == null) {
403                 SocketBufferHandler bufhandler = new SocketBufferHandler(
404                         socketProperties.getAppReadBufSize(),
405                         socketProperties.getAppWriteBufSize(),
406                         socketProperties.getDirectBuffer());
407                 if (isSSLEnabled()) {
408                     channel = new SecureNioChannel(bufhandler, selectorPool, this);
409                 } else {
410                     channel = new NioChannel(bufhandler);
411                 }
412             }
413             NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);
414             channel.reset(socket, newWrapper);
415             connections.put(socket, newWrapper);
416             socketWrapper = newWrapper;
417
418             // Set socket properties
419             // Disable blocking, polling will be used
420             socket.configureBlocking(false);
421             socketProperties.setProperties(socket.socket());
422
423             socketWrapper.setReadTimeout(getConnectionTimeout());
424             socketWrapper.setWriteTimeout(getConnectionTimeout());
425             socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
426             socketWrapper.setSecure(isSSLEnabled());
427             poller.register(channel, socketWrapper);
428             return true;
429         } catch (Throwable t) {
430             ExceptionUtils.handleThrowable(t);
431             try {
432                 log.error(sm.getString("endpoint.socketOptionsError"), t);
433             } catch (Throwable tt) {
434                 ExceptionUtils.handleThrowable(tt);
435             }
436             if (socketWrapper == null) {
437                 destroySocket(socket);
438             }
439         }
440         // Tell to close the socket if needed
441         return false;
442     }
443
444
445     @Override
446     protected void destroySocket(SocketChannel socket) {
447         countDownConnection();
448         try {
449             socket.close();
450         } catch (IOException ioe) {
451             if (log.isDebugEnabled()) {
452                 log.debug(sm.getString("endpoint.err.close"), ioe);
453             }
454         }
455     }
456
457
458     @Override
459     protected NetworkChannel getServerSocket() {
460         return serverSock;
461     }
462
463
464     @Override
465     protected SocketChannel serverSocketAccept() throws Exception {
466         return serverSock.accept();
467     }
468
469
470     @Override
471     protected Log getLog() {
472         return log;
473     }
474
475
476     @Override
477     protected SocketProcessorBase<NioChannel> createSocketProcessor(
478             SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
479         return new SocketProcessor(socketWrapper, event);
480     }
481
482     // ----------------------------------------------------- Poller Inner Classes
483
484     /**
485      * PollerEvent, cacheable object for poller events to avoid GC
486      */

487     public static class PollerEvent implements Runnable {
488
489         private NioChannel socket;
490         private int interestOps;
491
492         public PollerEvent(NioChannel ch, int intOps) {
493             reset(ch, intOps);
494         }
495
496         public void reset(NioChannel ch, int intOps) {
497             socket = ch;
498             interestOps = intOps;
499         }
500
501         public void reset() {
502             reset(null, 0);
503         }
504
505         @Override
506         public void run() {
507             if (interestOps == OP_REGISTER) {
508                 try {
509                     socket.getIOChannel().register(socket.getSocketWrapper().getPoller().getSelector(), SelectionKey.OP_READ, socket.getSocketWrapper());
510                 } catch (Exception x) {
511                     log.error(sm.getString("endpoint.nio.registerFail"), x);
512                 }
513             } else {
514                 final SelectionKey key = socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector());
515                 try {
516                     if (key == null) {
517                         // The key was cancelled (e.g. due to socket closure)
518                         // and removed from the selector while it was being
519                         // processed. Count down the connections at this point
520                         // since it won't have been counted down when the socket
521                         // closed.
522                         try {
523                             socket.socketWrapper.close();
524                         } catch (Exception ignore) {
525                         }
526                     } else {
527                         final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
528                         if (socketWrapper != null) {
529                             // We are registering the key to start with, reset the fairness counter.
530                             int ops = key.interestOps() | interestOps;
531                             socketWrapper.interestOps(ops);
532                             key.interestOps(ops);
533                         } else {
534                             socket.getSocketWrapper().getPoller().cancelledKey(key, socket.getSocketWrapper());
535                         }
536                     }
537                 } catch (CancelledKeyException ckx) {
538                     try {
539                         socket.getSocketWrapper().getPoller().cancelledKey(key, socket.getSocketWrapper());
540                     } catch (Exception ignore) {}
541                 }
542             }
543         }
544
545         @Override
546         public String toString() {
547             return "Poller event: socket [" + socket + "], socketWrapper [" + socket.getSocketWrapper() +
548                     "], interestOps [" + interestOps + "]";
549         }
550     }
551
552     /**
553      * Poller class.
554      */

555     public class Poller implements Runnable {
556
557         private Selector selector;
558         private final SynchronizedQueue<PollerEvent> events =
559                 new SynchronizedQueue<>();
560
561         private volatile boolean close = false;
562         // Optimize expiration handling
563         private long nextExpiration = 0;
564
565         private AtomicLong wakeupCounter = new AtomicLong(0);
566
567         private volatile int keyCount = 0;
568
569         public Poller() throws IOException {
570             this.selector = Selector.open();
571         }
572
573         public int getKeyCount() { return keyCount; }
574
575         public Selector getSelector() { return selector; }
576
577         /**
578          * Destroy the poller.
579          */

580         protected void destroy() {
581             // Wait for polltime before doing anything, so that the poller threads
582             // exit, otherwise parallel closure of sockets which are still
583             // in the poller can cause problems
584             close = true;
585             selector.wakeup();
586         }
587
588         private void addEvent(PollerEvent event) {
589             events.offer(event);
590             if (wakeupCounter.incrementAndGet() == 0) {
591                 selector.wakeup();
592             }
593         }
594
595         /**
596          * Add specified socket and associated pool to the poller. The socket will
597          * be added to a temporary array, and polled first after a maximum amount
598          * of time equal to pollTime (in most cases, latency will be much lower,
599          * however).
600          *
601          * @param socketWrapper to add to the poller
602          * @param interestOps Operations for which to register this socket with
603          *                    the Poller
604          */

605         public void add(NioSocketWrapper socketWrapper, int interestOps) {
606             PollerEvent r = null;
607             if (eventCache != null) {
608                 r = eventCache.pop();
609             }
610             if (r == null) {
611                 r = new PollerEvent(socketWrapper.getSocket(), interestOps);
612             } else {
613                 r.reset(socketWrapper.getSocket(), interestOps);
614             }
615             addEvent(r);
616             if (close) {
617                 processSocket(socketWrapper, SocketEvent.STOP, false);
618             }
619         }
620
621         /**
622          * Processes events in the event queue of the Poller.
623          *
624          * @return <code>true</code> if some events were processed,
625          *   <code>false</code> if queue was empty
626          */

627         public boolean events() {
628             boolean result = false;
629
630             PollerEvent pe = null;
631             for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
632                 result = true;
633                 try {
634                     pe.run();
635                     pe.reset();
636                     if (running && !paused && eventCache != null) {
637                         eventCache.push(pe);
638                     }
639                 } catch ( Throwable x ) {
640                     log.error(sm.getString("endpoint.nio.pollerEventError"), x);
641                 }
642             }
643
644             return result;
645         }
646
647         /**
648          * Registers a newly created socket with the poller.
649          *
650          * @param socket    The newly created socket
651          * @param socketWrapper The socket wrapper
652          */

653         public void register(final NioChannel socket, final NioSocketWrapper socketWrapper) {
654             socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
655             PollerEvent r = null;
656             if (eventCache != null) {
657                 r = eventCache.pop();
658             }
659             if (r == null) {
660                 r = new PollerEvent(socket, OP_REGISTER);
661             } else {
662                 r.reset(socket, OP_REGISTER);
663             }
664             addEvent(r);
665         }
666
667         public void cancelledKey(SelectionKey sk, SocketWrapperBase<NioChannel> socketWrapper) {
668             try {
669                 // If is important to cancel the key first, otherwise a deadlock may occur between the
670                 // poller select and the socket channel close which would cancel the key
671                 if (sk != null) {
672                     sk.attach(null);
673                     if (sk.isValid()) {
674                         sk.cancel();
675                     }
676                 }
677             } catch (Throwable e) {
678                 ExceptionUtils.handleThrowable(e);
679                 if (log.isDebugEnabled()) {
680                     log.error(sm.getString("endpoint.debug.channelCloseFail"), e);
681                 }
682             } finally {
683                 if (socketWrapper != null) {
684                     socketWrapper.close();
685                 }
686             }
687         }
688
689         /**
690          * The background thread that adds sockets to the Poller, checks the
691          * poller for triggered events and hands the associated socket off to an
692          * appropriate processor as events occur.
693          */

694         @Override
695         public void run() {
696             // Loop until destroy() is called
697             while (true) {
698
699                 boolean hasEvents = false;
700
701                 try {
702                     if (!close) {
703                         hasEvents = events();
704                         if (wakeupCounter.getAndSet(-1) > 0) {
705                             // If we are here, means we have other stuff to do
706                             // Do a non blocking select
707                             keyCount = selector.selectNow();
708                         } else {
709                             keyCount = selector.select(selectorTimeout);
710                         }
711                         wakeupCounter.set(0);
712                     }
713                     if (close) {
714                         events();
715                         timeout(0, false);
716                         try {
717                             selector.close();
718                         } catch (IOException ioe) {
719                             log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
720                         }
721                         break;
722                     }
723                 } catch (Throwable x) {
724                     ExceptionUtils.handleThrowable(x);
725                     log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
726                     continue;
727                 }
728                 // Either we timed out or we woke up, process events first
729                 if (keyCount == 0) {
730                     hasEvents = (hasEvents | events());
731                 }
732
733                 Iterator<SelectionKey> iterator =
734                     keyCount > 0 ? selector.selectedKeys().iterator() : null;
735                 // Walk through the collection of ready keys and dispatch
736                 // any active event.
737                 while (iterator != null && iterator.hasNext()) {
738                     SelectionKey sk = iterator.next();
739                     NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
740                     // Attachment may be null if another thread has called
741                     // cancelledKey()
742                     if (socketWrapper == null) {
743                         iterator.remove();
744                     } else {
745                         iterator.remove();
746                         processKey(sk, socketWrapper);
747                     }
748                 }
749
750                 // Process timeouts
751                 timeout(keyCount,hasEvents);
752             }
753
754             getStopLatch().countDown();
755         }
756
757         protected void processKey(SelectionKey sk, NioSocketWrapper socketWrapper) {
758             try {
759                 if (close) {
760                     cancelledKey(sk, socketWrapper);
761                 } else if (sk.isValid() && socketWrapper != null) {
762                     if (sk.isReadable() || sk.isWritable()) {
763                         if (socketWrapper.getSendfileData() != null) {
764                             processSendfile(sk, socketWrapper, false);
765                         } else {
766                             unreg(sk, socketWrapper, sk.readyOps());
767                             boolean closeSocket = false;
768                             // Read goes before write
769                             if (sk.isReadable()) {
770                                 if (socketWrapper.readOperation != null) {
771                                     if (!socketWrapper.readOperation.process()) {
772                                         closeSocket = true;
773                                     }
774                                 } else if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
775                                     closeSocket = true;
776                                 }
777                             }
778                             if (!closeSocket && sk.isWritable()) {
779                                 if (socketWrapper.writeOperation != null) {
780                                     if (!socketWrapper.writeOperation.process()) {
781                                         closeSocket = true;
782                                     }
783                                 } else if (!processSocket(socketWrapper, SocketEvent.OPEN_WRITE, true)) {
784                                     closeSocket = true;
785                                 }
786                             }
787                             if (closeSocket) {
788                                 cancelledKey(sk, socketWrapper);
789                             }
790                         }
791                     }
792                 } else {
793                     // Invalid key
794                     cancelledKey(sk, socketWrapper);
795                 }
796             } catch (CancelledKeyException ckx) {
797                 cancelledKey(sk, socketWrapper);
798             } catch (Throwable t) {
799                 ExceptionUtils.handleThrowable(t);
800                 log.error(sm.getString("endpoint.nio.keyProcessingError"), t);
801             }
802         }
803
804         public SendfileState processSendfile(SelectionKey sk, NioSocketWrapper socketWrapper,
805                 boolean calledByProcessor) {
806             NioChannel sc = null;
807             try {
808                 unreg(sk, socketWrapper, sk.readyOps());
809                 SendfileData sd = socketWrapper.getSendfileData();
810
811                 if (log.isTraceEnabled()) {
812                     log.trace("Processing send file for: " + sd.fileName);
813                 }
814
815                 if (sd.fchannel == null) {
816                     // Setup the file channel
817                     File f = new File(sd.fileName);
818                     @SuppressWarnings("resource"// Closed when channel is closed
819                     FileInputStream fis = new FileInputStream(f);
820                     sd.fchannel = fis.getChannel();
821                 }
822
823                 // Configure output channel
824                 sc = socketWrapper.getSocket();
825                 // TLS/SSL channel is slightly different
826                 WritableByteChannel wc = ((sc instanceof SecureNioChannel) ? sc : sc.getIOChannel());
827
828                 // We still have data in the buffer
829                 if (sc.getOutboundRemaining() > 0) {
830                     if (sc.flushOutbound()) {
831                         socketWrapper.updateLastWrite();
832                     }
833                 } else {
834                     long written = sd.fchannel.transferTo(sd.pos, sd.length, wc);
835                     if (written > 0) {
836                         sd.pos += written;
837                         sd.length -= written;
838                         socketWrapper.updateLastWrite();
839                     } else {
840                         // Unusual not to be able to transfer any bytes
841                         // Check the length was set correctly
842                         if (sd.fchannel.size() <= sd.pos) {
843                             throw new IOException(sm.getString("endpoint.sendfile.tooMuchData"));
844                         }
845                     }
846                 }
847                 if (sd.length <= 0 && sc.getOutboundRemaining()<=0) {
848                     if (log.isDebugEnabled()) {
849                         log.debug("Send file complete for: " + sd.fileName);
850                     }
851                     socketWrapper.setSendfileData(null);
852                     try {
853                         sd.fchannel.close();
854                     } catch (Exception ignore) {
855                     }
856                     // For calls from outside the Poller, the caller is
857                     // responsible for registering the socket for the
858                     // appropriate event(s) if sendfile completes.
859                     if (!calledByProcessor) {
860                         switch (sd.keepAliveState) {
861                         case NONE: {
862                             if (log.isDebugEnabled()) {
863                                 log.debug("Send file connection is being closed");
864                             }
865                             poller.cancelledKey(sk, socketWrapper);
866                             break;
867                         }
868                         case PIPELINED: {
869                             if (log.isDebugEnabled()) {
870                                 log.debug("Connection is keep alive, processing pipe-lined data");
871                             }
872                             if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
873                                 poller.cancelledKey(sk, socketWrapper);
874                             }
875                             break;
876                         }
877                         case OPEN: {
878                             if (log.isDebugEnabled()) {
879                                 log.debug("Connection is keep alive, registering back for OP_READ");
880                             }
881                             reg(sk, socketWrapper, SelectionKey.OP_READ);
882                             break;
883                         }
884                         }
885                     }
886                     return SendfileState.DONE;
887                 } else {
888                     if (log.isDebugEnabled()) {
889                         log.debug("OP_WRITE for sendfile: " + sd.fileName);
890                     }
891                     if (calledByProcessor) {
892                         add(socketWrapper, SelectionKey.OP_WRITE);
893                     } else {
894                         reg(sk, socketWrapper, SelectionKey.OP_WRITE);
895                     }
896                     return SendfileState.PENDING;
897                 }
898             } catch (IOException e) {
899                 if (log.isDebugEnabled()) {
900                     log.debug("Unable to complete sendfile request:", e);
901                 }
902                 if (!calledByProcessor && sc != null) {
903                     poller.cancelledKey(sk, socketWrapper);
904                 }
905                 return SendfileState.ERROR;
906             } catch (Throwable t) {
907                 log.error(sm.getString("endpoint.sendfile.error"), t);
908                 if (!calledByProcessor && sc != null) {
909                     poller.cancelledKey(sk, socketWrapper);
910                 }
911                 return SendfileState.ERROR;
912             }
913         }
914
915         protected void unreg(SelectionKey sk, NioSocketWrapper socketWrapper, int readyOps) {
916             // This is a must, so that we don't have multiple threads messing with the socket
917             reg(sk, socketWrapper, sk.interestOps() & (~readyOps));
918         }
919
920         protected void reg(SelectionKey sk, NioSocketWrapper socketWrapper, int intops) {
921             sk.interestOps(intops);
922             socketWrapper.interestOps(intops);
923         }
924
925         protected void timeout(int keyCount, boolean hasEvents) {
926             long now = System.currentTimeMillis();
927             // This method is called on every loop of the Poller. Don't process
928             // timeouts on every loop of the Poller since that would create too
929             // much load and timeouts can afford to wait a few seconds.
930             // However, do process timeouts if any of the following are true:
931             // - the selector simply timed out (suggests there isn't much load)
932             // - the nextExpiration time has passed
933             // - the server socket is being closed
934             if (nextExpiration > 0 && (keyCount > 0 || hasEvents) && (now < nextExpiration) && !close) {
935                 return;
936             }
937             int keycount = 0;
938             try {
939                 for (SelectionKey key : selector.keys()) {
940                     keycount++;
941                     try {
942                         NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
943                         if (socketWrapper == null) {
944                             // We don't support any keys without attachments
945                             cancelledKey(key, null);
946                         } else if (close) {
947                             key.interestOps(0);
948                             // Avoid duplicate stop calls
949                             socketWrapper.interestOps(0);
950                             cancelledKey(key, socketWrapper);
951                         } else if ((socketWrapper.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ ||
952                                   (socketWrapper.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
953                             boolean readTimeout = false;
954                             boolean writeTimeout = false;
955                             // Check for read timeout
956                             if ((socketWrapper.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
957                                 long delta = now - socketWrapper.getLastRead();
958                                 long timeout = socketWrapper.getReadTimeout();
959                                 if (timeout > 0 && delta > timeout) {
960                                     readTimeout = true;
961                                 }
962                             }
963                             // Check for write timeout
964                             if (!readTimeout && (socketWrapper.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
965                                 long delta = now - socketWrapper.getLastWrite();
966                                 long timeout = socketWrapper.getWriteTimeout();
967                                 if (timeout > 0 && delta > timeout) {
968                                     writeTimeout = true;
969                                 }
970                             }
971                             if (readTimeout || writeTimeout) {
972                                 key.interestOps(0);
973                                 // Avoid duplicate timeout calls
974                                 socketWrapper.interestOps(0);
975                                 socketWrapper.setError(new SocketTimeoutException());
976                                 if (readTimeout && socketWrapper.readOperation != null) {
977                                     if (!socketWrapper.readOperation.process()) {
978                                         cancelledKey(key, socketWrapper);
979                                     }
980                                 } else if (writeTimeout && socketWrapper.writeOperation != null) {
981                                     if (!socketWrapper.writeOperation.process()) {
982                                         cancelledKey(key, socketWrapper);
983                                     }
984                                 } else if (!processSocket(socketWrapper, SocketEvent.ERROR, true)) {
985                                     cancelledKey(key, socketWrapper);
986                                 }
987                             }
988                         }
989                     } catch (CancelledKeyException ckx) {
990                         cancelledKey(key, (NioSocketWrapper) key.attachment());
991                     }
992                 }
993             } catch (ConcurrentModificationException cme) {
994                 // See https://bz.apache.org/bugzilla/show_bug.cgi?id=57943
995                 log.warn(sm.getString("endpoint.nio.timeoutCme"), cme);
996             }
997             // For logging purposes only
998             long prevExp = nextExpiration;
999             nextExpiration = System.currentTimeMillis() +
1000                     socketProperties.getTimeoutInterval();
1001             if (log.isTraceEnabled()) {
1002                 log.trace("timeout completed: keys processed=" + keycount +
1003                         "; now=" + now + "; nextExpiration=" + prevExp +
1004                         "; keyCount=" + keyCount + "; hasEvents=" + hasEvents +
1005                         "; eval=" + ((now < prevExp) && (keyCount>0 || hasEvents) && (!close) ));
1006             }
1007
1008         }
1009     }
1010
1011     // --------------------------------------------------- Socket Wrapper Class
1012
1013     public static class NioSocketWrapper extends SocketWrapperBase<NioChannel> {
1014
1015         private final NioSelectorPool pool;
1016         private final SynchronizedStack<NioChannel> nioChannels;
1017         private final Poller poller;
1018
1019         private int interestOps = 0;
1020         private CountDownLatch readLatch = null;
1021         private CountDownLatch writeLatch = null;
1022         private volatile SendfileData sendfileData = null;
1023         private volatile long lastRead = System.currentTimeMillis();
1024         private volatile long lastWrite = lastRead;
1025
1026         public NioSocketWrapper(NioChannel channel, NioEndpoint endpoint) {
1027             super(channel, endpoint);
1028             pool = endpoint.getSelectorPool();
1029             nioChannels = endpoint.getNioChannels();
1030             poller = endpoint.getPoller();
1031             socketBufferHandler = channel.getBufHandler();
1032         }
1033
1034         public Poller getPoller() { return poller; }
1035         public int interestOps() { return interestOps; }
1036         public int interestOps(int ops) { this.interestOps  = ops; return ops; }
1037         public CountDownLatch getReadLatch() { return readLatch; }
1038         public CountDownLatch getWriteLatch() { return writeLatch; }
1039         protected CountDownLatch resetLatch(CountDownLatch latch) {
1040             if (latch == null || latch.getCount() == 0) {
1041                 return null;
1042             } else {
1043                 throw new IllegalStateException(sm.getString("endpoint.nio.latchMustBeZero"));
1044             }
1045         }
1046         public void resetReadLatch() { readLatch = resetLatch(readLatch); }
1047         public void resetWriteLatch() { writeLatch = resetLatch(writeLatch); }
1048
1049         protected CountDownLatch startLatch(CountDownLatch latch, int cnt) {
1050             if (latch == null || latch.getCount() == 0) {
1051                 return new CountDownLatch(cnt);
1052             } else {
1053                 throw new IllegalStateException(sm.getString("endpoint.nio.latchMustBeZero"));
1054             }
1055         }
1056         public void startReadLatch(int cnt) { readLatch = startLatch(readLatch, cnt); }
1057         public void startWriteLatch(int cnt) { writeLatch = startLatch(writeLatch, cnt); }
1058
1059         protected void awaitLatch(CountDownLatch latch, long timeout, TimeUnit unit) throws InterruptedException {
1060             if (latch == null) {
1061                 throw new IllegalStateException(sm.getString("endpoint.nio.nullLatch"));
1062             }
1063             // Note: While the return value is ignored if the latch does time
1064             //       out, logic further up the call stack will trigger a
1065             //       SocketTimeoutException
1066             latch.await(timeout, unit);
1067         }
1068         public void awaitReadLatch(long timeout, TimeUnit unit) throws InterruptedException { awaitLatch(readLatch, timeout, unit); }
1069         public void awaitWriteLatch(long timeout, TimeUnit unit) throws InterruptedException { awaitLatch(writeLatch, timeout, unit); }
1070
1071         public void setSendfileData(SendfileData sf) { this.sendfileData = sf;}
1072         public SendfileData getSendfileData() { return this.sendfileData; }
1073
1074         public void updateLastWrite() { lastWrite = System.currentTimeMillis(); }
1075         public long getLastWrite() { return lastWrite; }
1076         public void updateLastRead() { lastRead = System.currentTimeMillis(); }
1077         public long getLastRead() { return lastRead; }
1078
1079         @Override
1080         public boolean isReadyForRead() throws IOException {
1081             socketBufferHandler.configureReadBufferForRead();
1082
1083             if (socketBufferHandler.getReadBuffer().remaining() > 0) {
1084                 return true;
1085             }
1086
1087             fillReadBuffer(false);
1088
1089             boolean isReady = socketBufferHandler.getReadBuffer().position() > 0;
1090             return isReady;
1091         }
1092
1093
1094         @Override
1095         public int read(boolean block, byte[] b, int off, int len) throws IOException {
1096             int nRead = populateReadBuffer(b, off, len);
1097             if (nRead > 0) {
1098                 return nRead;
1099                 /*
1100                  * Since more bytes may have arrived since the buffer was last
1101                  * filled, it is an option at this point to perform a
1102                  * non-blocking read. However correctly handling the case if
1103                  * that read returns end of stream adds complexity. Therefore,
1104                  * at the moment, the preference is for simplicity.
1105                  */

1106             }
1107
1108             // Fill the read buffer as best we can.
1109             nRead = fillReadBuffer(block);
1110             updateLastRead();
1111
1112             // Fill as much of the remaining byte array as possible with the
1113             // data that was just read
1114             if (nRead > 0) {
1115                 socketBufferHandler.configureReadBufferForRead();
1116                 nRead = Math.min(nRead, len);
1117                 socketBufferHandler.getReadBuffer().get(b, off, nRead);
1118             }
1119             return nRead;
1120         }
1121
1122
1123         @Override
1124         public int read(boolean block, ByteBuffer to) throws IOException {
1125             int nRead = populateReadBuffer(to);
1126             if (nRead > 0) {
1127                 return nRead;
1128                 /*
1129                  * Since more bytes may have arrived since the buffer was last
1130                  * filled, it is an option at this point to perform a
1131                  * non-blocking read. However correctly handling the case if
1132                  * that read returns end of stream adds complexity. Therefore,
1133                  * at the moment, the preference is for simplicity.
1134                  */

1135             }
1136
1137             // The socket read buffer capacity is socket.appReadBufSize
1138             int limit = socketBufferHandler.getReadBuffer().capacity();
1139             if (to.remaining() >= limit) {
1140                 to.limit(to.position() + limit);
1141                 nRead = fillReadBuffer(block, to);
1142                 if (log.isDebugEnabled()) {
1143                     log.debug("Socket: [" + this + "], Read direct from socket: [" + nRead + "]");
1144                 }
1145                 updateLastRead();
1146             } else {
1147                 // Fill the read buffer as best we can.
1148                 nRead = fillReadBuffer(block);
1149                 if (log.isDebugEnabled()) {
1150                     log.debug("Socket: [" + this + "], Read into buffer: [" + nRead + "]");
1151                 }
1152                 updateLastRead();
1153
1154                 // Fill as much of the remaining byte array as possible with the
1155                 // data that was just read
1156                 if (nRead > 0) {
1157                     nRead = populateReadBuffer(to);
1158                 }
1159             }
1160             return nRead;
1161         }
1162
1163
1164         @Override
1165         protected void doClose() {
1166             if (log.isDebugEnabled()) {
1167                 log.debug("Calling [" + getEndpoint() + "].closeSocket([" + this + "])");
1168             }
1169             try {
1170                 getEndpoint().connections.remove(getSocket().getIOChannel());
1171                 if (getSocket().isOpen()) {
1172                     getSocket().close(true);
1173                 }
1174                 if (getEndpoint().running && !getEndpoint().paused) {
1175                     if (nioChannels == null || !nioChannels.push(getSocket())) {
1176                         getSocket().free();
1177                     }
1178                 }
1179             } catch (Throwable e) {
1180                 ExceptionUtils.handleThrowable(e);
1181                 if (log.isDebugEnabled()) {
1182                     log.error(sm.getString("endpoint.debug.channelCloseFail"), e);
1183                 }
1184             } finally {
1185                 socketBufferHandler = SocketBufferHandler.EMPTY;
1186                 nonBlockingWriteBuffer.clear();
1187                 reset(NioChannel.CLOSED_NIO_CHANNEL);
1188             }
1189             try {
1190                 SendfileData data = getSendfileData();
1191                 if (data != null && data.fchannel != null && data.fchannel.isOpen()) {
1192                     data.fchannel.close();
1193                 }
1194             } catch (Throwable e) {
1195                 ExceptionUtils.handleThrowable(e);
1196                 if (log.isDebugEnabled()) {
1197                     log.error(sm.getString("endpoint.sendfile.closeError"), e);
1198                 }
1199             }
1200         }
1201
1202         private int fillReadBuffer(boolean block) throws IOException {
1203             socketBufferHandler.configureReadBufferForWrite();
1204             return fillReadBuffer(block, socketBufferHandler.getReadBuffer());
1205         }
1206
1207
1208         private int fillReadBuffer(boolean block, ByteBuffer to) throws IOException {
1209             int nRead;
1210             NioChannel socket = getSocket();
1211             if (socket instanceof ClosedNioChannel) {
1212                 throw new ClosedChannelException();
1213             }
1214             if (block) {
1215                 Selector selector = null;
1216                 try {
1217                     selector = pool.get();
1218                 } catch (IOException x) {
1219                     // Ignore
1220                 }
1221                 try {
1222                     nRead = pool.read(to, socket, selector, getReadTimeout());
1223                 } finally {
1224                     if (selector != null) {
1225                         pool.put(selector);
1226                     }
1227                 }
1228             } else {
1229                 nRead = socket.read(to);
1230                 if (nRead == -1) {
1231                     throw new EOFException();
1232                 }
1233             }
1234             return nRead;
1235         }
1236
1237
1238         @Override
1239         protected void doWrite(boolean block, ByteBuffer from) throws IOException {
1240             NioChannel socket = getSocket();
1241             if (socket instanceof ClosedNioChannel) {
1242                 throw new ClosedChannelException();
1243             }
1244             if (block) {
1245                 long writeTimeout = getWriteTimeout();
1246                 Selector selector = null;
1247                 try {
1248                     selector = pool.get();
1249                 } catch (IOException x) {
1250                     // Ignore
1251                 }
1252                 try {
1253                     pool.write(from, socket, selector, writeTimeout);
1254                     // Make sure we are flushed
1255                     do {
1256                     } while (!socket.flush(true, selector, writeTimeout));
1257                 } finally {
1258                     if (selector != null) {
1259                         pool.put(selector);
1260                     }
1261                 }
1262                 // If there is data left in the buffer the socket will be registered for
1263                 // write further up the stack. This is to ensure the socket is only
1264                 // registered for write once as both container and user code can trigger
1265                 // write registration.
1266             } else {
1267                 int n = 0;
1268                 do {
1269                     n = socket.write(from);
1270                     if (n == -1) {
1271                         throw new EOFException();
1272                     }
1273                 } while (n > 0 && from.hasRemaining());
1274             }
1275             updateLastWrite();
1276         }
1277
1278
1279         @Override
1280         public void registerReadInterest() {
1281             if (log.isDebugEnabled()) {
1282                 log.debug(sm.getString("endpoint.debug.registerRead"this));
1283             }
1284             getPoller().add(this, SelectionKey.OP_READ);
1285         }
1286
1287
1288         @Override
1289         public void registerWriteInterest() {
1290             if (log.isDebugEnabled()) {
1291                 log.debug(sm.getString("endpoint.debug.registerWrite"this));
1292             }
1293             getPoller().add(this, SelectionKey.OP_WRITE);
1294         }
1295
1296
1297         @Override
1298         public SendfileDataBase createSendfileData(String filename, long pos, long length) {
1299             return new SendfileData(filename, pos, length);
1300         }
1301
1302
1303         @Override
1304         public SendfileState processSendfile(SendfileDataBase sendfileData) {
1305             setSendfileData((SendfileData) sendfileData);
1306             SelectionKey key = getSocket().getIOChannel().keyFor(getPoller().getSelector());
1307             // Might as well do the first write on this thread
1308             return getPoller().processSendfile(key, thistrue);
1309         }
1310
1311
1312         @Override
1313         protected void populateRemoteAddr() {
1314             SocketChannel sc = getSocket().getIOChannel();
1315             if (sc != null) {
1316                 InetAddress inetAddr = sc.socket().getInetAddress();
1317                 if (inetAddr != null) {
1318                     remoteAddr = inetAddr.getHostAddress();
1319                 }
1320             }
1321         }
1322
1323
1324         @Override
1325         protected void populateRemoteHost() {
1326             SocketChannel sc = getSocket().getIOChannel();
1327             if (sc != null) {
1328                 InetAddress inetAddr = sc.socket().getInetAddress();
1329                 if (inetAddr != null) {
1330                     remoteHost = inetAddr.getHostName();
1331                     if (remoteAddr == null) {
1332                         remoteAddr = inetAddr.getHostAddress();
1333                     }
1334                 }
1335             }
1336         }
1337
1338
1339         @Override
1340         protected void populateRemotePort() {
1341             SocketChannel sc = getSocket().getIOChannel();
1342             if (sc != null) {
1343                 remotePort = sc.socket().getPort();
1344             }
1345         }
1346
1347
1348         @Override
1349         protected void populateLocalName() {
1350             SocketChannel sc = getSocket().getIOChannel();
1351             if (sc != null) {
1352                 InetAddress inetAddr = sc.socket().getLocalAddress();
1353                 if (inetAddr != null) {
1354                     localName = inetAddr.getHostName();
1355                 }
1356             }
1357         }
1358
1359
1360         @Override
1361         protected void populateLocalAddr() {
1362             SocketChannel sc = getSocket().getIOChannel();
1363             if (sc != null) {
1364                 InetAddress inetAddr = sc.socket().getLocalAddress();
1365                 if (inetAddr != null) {
1366                     localAddr = inetAddr.getHostAddress();
1367                 }
1368             }
1369         }
1370
1371
1372         @Override
1373         protected void populateLocalPort() {
1374             SocketChannel sc = getSocket().getIOChannel();
1375             if (sc != null) {
1376                 localPort = sc.socket().getLocalPort();
1377             }
1378         }
1379
1380
1381         /**
1382          * {@inheritDoc}
1383          * @param clientCertProvider Ignored for this implementation
1384          */

1385         @Override
1386         public SSLSupport getSslSupport(String clientCertProvider) {
1387             if (getSocket() instanceof SecureNioChannel) {
1388                 SecureNioChannel ch = (SecureNioChannel) getSocket();
1389                 SSLEngine sslEngine = ch.getSslEngine();
1390                 if (sslEngine != null) {
1391                     SSLSession session = sslEngine.getSession();
1392                     return ((NioEndpoint) getEndpoint()).getSslImplementation().getSSLSupport(session);
1393                 }
1394             }
1395             return null;
1396         }
1397
1398
1399         @Override
1400         public void doClientAuth(SSLSupport sslSupport) throws IOException {
1401             SecureNioChannel sslChannel = (SecureNioChannel) getSocket();
1402             SSLEngine engine = sslChannel.getSslEngine();
1403             if (!engine.getNeedClientAuth()) {
1404                 // Need to re-negotiate SSL connection
1405                 engine.setNeedClientAuth(true);
1406                 sslChannel.rehandshake(getEndpoint().getConnectionTimeout());
1407                 ((JSSESupport) sslSupport).setSession(engine.getSession());
1408             }
1409         }
1410
1411
1412         @Override
1413         public void setAppReadBufHandler(ApplicationBufferHandler handler) {
1414             getSocket().setAppReadBufHandler(handler);
1415         }
1416
1417         @Override
1418         protected <A> OperationState<A> newOperationState(boolean read,
1419                 ByteBuffer[] buffers, int offset, int length,
1420                 BlockingMode block, long timeout, TimeUnit unit, A attachment,
1421                 CompletionCheck check, CompletionHandler<Long, ? super A> handler,
1422                 Semaphore semaphore, VectoredIOCompletionHandler<A> completion) {
1423             return new NioOperationState<>(read, buffers, offset, length, block,
1424                     timeout, unit, attachment, check, handler, semaphore, completion);
1425         }
1426
1427         private class NioOperationState<A> extends OperationState<A> {
1428             private volatile boolean inline = true;
1429             private NioOperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
1430                     BlockingMode block, long timeout, TimeUnit unit, A attachment, CompletionCheck check,
1431                     CompletionHandler<Long, ? super A> handler, Semaphore semaphore,
1432                     VectoredIOCompletionHandler<A> completion) {
1433                 super(read, buffers, offset, length, block,
1434                         timeout, unit, attachment, check, handler, semaphore, completion);
1435             }
1436
1437             @Override
1438             protected boolean isInline() {
1439                 return inline;
1440             }
1441
1442             @Override
1443             public void run() {
1444                 // Perform the IO operation
1445                 // Called from the poller to continue the IO operation
1446                 long nBytes = 0;
1447                 if (getError() == null) {
1448                     try {
1449                         synchronized (this) {
1450                             if (!completionDone) {
1451                                 // This filters out same notification until processing
1452                                 // of the current one is done
1453                                 if (log.isDebugEnabled()) {
1454                                     log.debug("Skip concurrent " + (read ? "read" : "write") + " notification");
1455                                 }
1456                                 return;
1457                             }
1458                             if (read) {
1459                                 // Read from main buffer first
1460                                 if (!socketBufferHandler.isReadBufferEmpty()) {
1461                                     // There is still data inside the main read buffer, it needs to be read first
1462                                     socketBufferHandler.configureReadBufferForRead();
1463                                     for (int i = 0; i < length && !socketBufferHandler.isReadBufferEmpty(); i++) {
1464                                         nBytes += transfer(socketBufferHandler.getReadBuffer(), buffers[offset + i]);
1465                                     }
1466                                 }
1467                                 if (nBytes == 0) {
1468                                     nBytes = getSocket().read(buffers, offset, length);
1469                                     updateLastRead();
1470                                 }
1471                             } else {
1472                                 boolean doWrite = true;
1473                                 // Write from main buffer first
1474                                 if (!socketBufferHandler.isWriteBufferEmpty()) {
1475                                     // There is still data inside the main write buffer, it needs to be written first
1476                                     socketBufferHandler.configureWriteBufferForRead();
1477                                     do {
1478                                         nBytes = getSocket().write(socketBufferHandler.getWriteBuffer());
1479                                     } while (!socketBufferHandler.isWriteBufferEmpty() && nBytes > 0);
1480                                     if (!socketBufferHandler.isWriteBufferEmpty()) {
1481                                         doWrite = false;
1482                                     }
1483                                     // Preserve a negative value since it is an error
1484                                     if (nBytes > 0) {
1485                                         nBytes = 0;
1486                                     }
1487                                 }
1488                                 if (doWrite) {
1489                                     long n = 0;
1490                                     do {
1491                                         n = getSocket().write(buffers, offset, length);
1492                                         if (n == -1) {
1493                                             nBytes = n;
1494                                         } else {
1495                                             nBytes += n;
1496                                         }
1497                                     } while (n > 0);
1498                                     updateLastWrite();
1499                                 }
1500                             }
1501                             if (nBytes != 0 || !buffersArrayHasRemaining(buffers, offset, length)) {
1502                                 completionDone = false;
1503                             }
1504                         }
1505                     } catch (IOException e) {
1506                         setError(e);
1507                     }
1508                 }
1509                 if (nBytes > 0 || (nBytes == 0 && !buffersArrayHasRemaining(buffers, offset, length))) {
1510                     // The bytes processed are only updated in the completion handler
1511                     completion.completed(Long.valueOf(nBytes), this);
1512                 } else if (nBytes < 0 || getError() != null) {
1513                     IOException error = getError();
1514                     if (error == null) {
1515                         error = new EOFException();
1516                     }
1517                     completion.failed(error, this);
1518                 } else {
1519                     // As soon as the operation uses the poller, it is no longer inline
1520                     inline = false;
1521                     if (read) {
1522                         registerReadInterest();
1523                     } else {
1524                         registerWriteInterest();
1525                     }
1526                 }
1527             }
1528
1529         }
1530
1531     }
1532
1533
1534     // ---------------------------------------------- SocketProcessor Inner Class
1535
1536     /**
1537      * This class is the equivalent of the Worker, but will simply use in an
1538      * external Executor thread pool.
1539      */

1540     protected class SocketProcessor extends SocketProcessorBase<NioChannel> {
1541
1542         public SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
1543             super(socketWrapper, event);
1544         }
1545
1546         @Override
1547         protected void doRun() {
1548             NioChannel socket = socketWrapper.getSocket();
1549             SelectionKey key = socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector());
1550             Poller poller = NioEndpoint.this.poller;
1551             if (poller == null) {
1552                 socketWrapper.close();
1553                 return;
1554             }
1555
1556             try {
1557                 int handshake = -1;
1558
1559                 try {
1560                     if (key != null) {
1561                         if (socket.isHandshakeComplete()) {
1562                             // No TLS handshaking required. Let the handler
1563                             // process this socket / event combination.
1564                             handshake = 0;
1565                         } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
1566                                 event == SocketEvent.ERROR) {
1567                             // Unable to complete the TLS handshake. Treat it as
1568                             // if the handshake failed.
1569                             handshake = -1;
1570                         } else {
1571                             handshake = socket.handshake(key.isReadable(), key.isWritable());
1572                             // The handshake process reads/writes from/to the
1573                             // socket. status may therefore be OPEN_WRITE once
1574                             // the handshake completes. However, the handshake
1575                             // happens when the socket is opened so the status
1576                             // must always be OPEN_READ after it completes. It
1577                             // is OK to always set this as it is only used if
1578                             // the handshake completes.
1579                             event = SocketEvent.OPEN_READ;
1580                         }
1581                     }
1582                 } catch (IOException x) {
1583                     handshake = -1;
1584                     if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);
1585                 } catch (CancelledKeyException ckx) {
1586                     handshake = -1;
1587                 }
1588                 if (handshake == 0) {
1589                     SocketState state = SocketState.OPEN;
1590                     // Process the request from this socket
1591                     if (event == null) {
1592                         state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
1593                     } else {
1594                         state = getHandler().process(socketWrapper, event);
1595                     }
1596                     if (state == SocketState.CLOSED) {
1597                         poller.cancelledKey(key, socketWrapper);
1598                     }
1599                 } else if (handshake == -1 ) {
1600                     getHandler().process(socketWrapper, SocketEvent.CONNECT_FAIL);
1601                     poller.cancelledKey(key, socketWrapper);
1602                 } else if (handshake == SelectionKey.OP_READ){
1603                     socketWrapper.registerReadInterest();
1604                 } else if (handshake == SelectionKey.OP_WRITE){
1605                     socketWrapper.registerWriteInterest();
1606                 }
1607             } catch (CancelledKeyException cx) {
1608                 poller.cancelledKey(key, socketWrapper);
1609             } catch (VirtualMachineError vme) {
1610                 ExceptionUtils.handleThrowable(vme);
1611             } catch (Throwable t) {
1612                 log.error(sm.getString("endpoint.processing.fail"), t);
1613                 poller.cancelledKey(key, socketWrapper);
1614             } finally {
1615                 socketWrapper = null;
1616                 event = null;
1617                 //return to cache
1618                 if (running && !paused && processorCache != null) {
1619                     processorCache.push(this);
1620                 }
1621             }
1622         }
1623     }
1624
1625     // ----------------------------------------------- SendfileData Inner Class
1626
1627     /**
1628      * SendfileData class.
1629      */

1630     public static class SendfileData extends SendfileDataBase {
1631
1632         public SendfileData(String filename, long pos, long length) {
1633             super(filename, pos, length);
1634         }
1635
1636         protected volatile FileChannel fchannel;
1637     }
1638 }
1639