1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.tomcat.util.net;
18
19 import java.io.EOFException;
20 import java.io.IOException;
21 import java.net.SocketTimeoutException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.CancelledKeyException;
24 import java.nio.channels.ClosedChannelException;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.nio.channels.SocketChannel;
28 import java.util.Iterator;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicInteger;
32
33 import org.apache.juli.logging.Log;
34 import org.apache.juli.logging.LogFactory;
35 import org.apache.tomcat.util.ExceptionUtils;
36 import org.apache.tomcat.util.collections.SynchronizedQueue;
37 import org.apache.tomcat.util.collections.SynchronizedStack;
38 import org.apache.tomcat.util.net.NioEndpoint.NioSocketWrapper;
39 import org.apache.tomcat.util.res.StringManager;
40
41 public class NioBlockingSelector {
42
43     private static final Log log = LogFactory.getLog(NioBlockingSelector.class);
44     protected static final StringManager sm = StringManager.getManager(NioBlockingSelector.class);
45
46     private final SynchronizedStack<KeyReference> keyReferenceStack =
47             new SynchronizedStack<>();
48
49     protected Selector sharedSelector;
50
51     protected BlockPoller poller;
52
53     public void open(String name, Selector selector) {
54         sharedSelector = selector;
55         poller = new BlockPoller();
56         poller.selector = sharedSelector;
57         poller.setDaemon(true);
58         poller.setName(name + "-BlockPoller");
59         poller.start();
60     }
61
62     public void close() {
63         if (poller != null) {
64             poller.disable();
65             poller.interrupt();
66             poller = null;
67         }
68     }
69
70     /**
71      * Performs a blocking write using the byte buffer for data to be written
72      * If the <code>selector</code> parameter is null, then it will perform a busy write that could
73      * take up a lot of CPU cycles.
74      *
75      * @param buf ByteBuffer - the buffer containing the data, we will write as long as <code>(buf.hasRemaining()==true)</code>
76      * @param socket SocketChannel - the socket to write data to
77      * @param writeTimeout long - the timeout for this write operation in milliseconds, -1 means no timeout
78      * @return the number of bytes written
79      * @throws EOFException if write returns -1
80      * @throws SocketTimeoutException if the write times out
81      * @throws IOException if an IO Exception occurs in the underlying socket logic
82      */

83     public int write(ByteBuffer buf, NioChannel socket, long writeTimeout)
84             throws IOException {
85         SelectionKey key = socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector());
86         if (key == null) {
87             throw new IOException(sm.getString("nioBlockingSelector.keyNotRegistered"));
88         }
89         KeyReference reference = keyReferenceStack.pop();
90         if (reference == null) {
91             reference = new KeyReference();
92         }
93         NioSocketWrapper att = (NioSocketWrapper) key.attachment();
94         int written = 0;
95         boolean timedout = false;
96         int keycount = 1; //assume we can write
97         long time = System.currentTimeMillis(); //start the timeout timer
98         try {
99             while (!timedout && buf.hasRemaining()) {
100                 if (keycount > 0) { //only write if we were registered for a write
101                     int cnt = socket.write(buf); //write the data
102                     if (cnt == -1) {
103                         throw new EOFException();
104                     }
105                     written += cnt;
106                     if (cnt > 0) {
107                         time = System.currentTimeMillis(); //reset our timeout timer
108                         continue//we successfully wrote, try again without a selector
109                     }
110                 }
111                 try {
112                     if (att.getWriteLatch() == null || att.getWriteLatch().getCount() == 0) {
113                         att.startWriteLatch(1);
114                     }
115                     poller.add(att, SelectionKey.OP_WRITE, reference);
116                     att.awaitWriteLatch(AbstractEndpoint.toTimeout(writeTimeout), TimeUnit.MILLISECONDS);
117                 } catch (InterruptedException ignore) {
118                     // Ignore
119                 }
120                 if (att.getWriteLatch() != null && att.getWriteLatch().getCount() > 0) {
121                     //we got interrupted, but we haven't received notification from the poller.
122                     keycount = 0;
123                 } else {
124                     //latch countdown has happened
125                     keycount = 1;
126                     att.resetWriteLatch();
127                 }
128
129                 if (writeTimeout > 0 && (keycount == 0)) {
130                     timedout = (System.currentTimeMillis() - time) >= writeTimeout;
131                 }
132             }
133             if (timedout) {
134                 throw new SocketTimeoutException();
135             }
136         } finally {
137             poller.remove(att, SelectionKey.OP_WRITE);
138             if (timedout && reference.key != null) {
139                 poller.cancelKey(reference.key);
140             }
141             reference.key = null;
142             keyReferenceStack.push(reference);
143         }
144         return written;
145     }
146
147     /**
148      * Performs a blocking read using the bytebuffer for data to be read
149      * If the <code>selector</code> parameter is null, then it will perform a busy read that could
150      * take up a lot of CPU cycles.
151      *
152      * @param buf ByteBuffer - the buffer containing the data, we will read as until we have read at least one byte or we timed out
153      * @param socket SocketChannel - the socket to write data to
154      * @param readTimeout long - the timeout for this read operation in milliseconds, -1 means no timeout
155      * @return the number of bytes read
156      * @throws EOFException if read returns -1
157      * @throws SocketTimeoutException if the read times out
158      * @throws IOException if an IO Exception occurs in the underlying socket logic
159      */

160     public int read(ByteBuffer buf, NioChannel socket, long readTimeout) throws IOException {
161         SelectionKey key = socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector());
162         if (key == null) {
163             throw new IOException(sm.getString("nioBlockingSelector.keyNotRegistered"));
164         }
165         KeyReference reference = keyReferenceStack.pop();
166         if (reference == null) {
167             reference = new KeyReference();
168         }
169         NioSocketWrapper att = (NioSocketWrapper) key.attachment();
170         int read = 0;
171         boolean timedout = false;
172         int keycount = 1; //assume we can read
173         long time = System.currentTimeMillis(); //start the timeout timer
174         try {
175             while (!timedout) {
176                 if (keycount > 0) { //only read if we were registered for a read
177                     read = socket.read(buf);
178                     if (read != 0) {
179                         break;
180                     }
181                 }
182                 try {
183                     if (att.getReadLatch()==null || att.getReadLatch().getCount()==0) {
184                         att.startReadLatch(1);
185                     }
186                     poller.add(att,SelectionKey.OP_READ, reference);
187                     att.awaitReadLatch(AbstractEndpoint.toTimeout(readTimeout), TimeUnit.MILLISECONDS);
188                 } catch (InterruptedException ignore) {
189                     // Ignore
190                 }
191                 if ( att.getReadLatch()!=null && att.getReadLatch().getCount()> 0) {
192                     //we got interrupted, but we haven't received notification from the poller.
193                     keycount = 0;
194                 }else {
195                     //latch countdown has happened
196                     keycount = 1;
197                     att.resetReadLatch();
198                 }
199                 if (readTimeout >= 0 && (keycount == 0)) {
200                     timedout = (System.currentTimeMillis() - time) >= readTimeout;
201                 }
202             }
203             if (timedout) {
204                 throw new SocketTimeoutException();
205             }
206         } finally {
207             poller.remove(att,SelectionKey.OP_READ);
208             if (timedout && reference.key != null) {
209                 poller.cancelKey(reference.key);
210             }
211             reference.key = null;
212             keyReferenceStack.push(reference);
213         }
214         return read;
215     }
216
217
218     protected static class BlockPoller extends Thread {
219         protected volatile boolean run = true;
220         protected Selector selector = null;
221         protected final SynchronizedQueue<Runnable> events = new SynchronizedQueue<>();
222         public void disable() {
223             run = false;
224             selector.wakeup();
225         }
226         protected final AtomicInteger wakeupCounter = new AtomicInteger(0);
227
228         public void cancelKey(final SelectionKey key) {
229             Runnable r = new RunnableCancel(key);
230             events.offer(r);
231             wakeup();
232         }
233
234         public void wakeup() {
235             if (wakeupCounter.addAndGet(1)==0) selector.wakeup();
236         }
237
238         public void cancel(SelectionKey sk, NioSocketWrapper key, int ops){
239             if (sk != null) {
240                 sk.cancel();
241                 sk.attach(null);
242                 if (SelectionKey.OP_WRITE == (ops & SelectionKey.OP_WRITE)) {
243                     countDown(key.getWriteLatch());
244                 }
245                 if (SelectionKey.OP_READ == (ops & SelectionKey.OP_READ)) {
246                     countDown(key.getReadLatch());
247                 }
248             }
249         }
250
251         public void add(final NioSocketWrapper key, final int ops, final KeyReference ref) {
252             if (key == null) {
253                 return;
254             }
255             NioChannel nch = key.getSocket();
256             final SocketChannel ch = nch.getIOChannel();
257             if (ch == null) {
258                 return;
259             }
260             Runnable r = new RunnableAdd(ch, key, ops, ref);
261             events.offer(r);
262             wakeup();
263         }
264
265         public void remove(final NioSocketWrapper key, final int ops) {
266             if (key == null) {
267                 return;
268             }
269             NioChannel nch = key.getSocket();
270             final SocketChannel ch = nch.getIOChannel();
271             if (ch == null) {
272                 return;
273             }
274             Runnable r = new RunnableRemove(ch, key, ops);
275             events.offer(r);
276             wakeup();
277         }
278
279         public boolean events() {
280             Runnable r = null;
281             /* We only poll and run the runnable events when we start this
282              * method. Further events added to the queue later will be delayed
283              * to the next execution of this method.
284              *
285              * We do in this way, because running event from the events queue
286              * may lead the working thread to add more events to the queue (for
287              * example, the worker thread may add another RunnableAdd event when
288              * waken up by a previous RunnableAdd event who got an invalid
289              * SelectionKey). Trying to consume all the events in an increasing
290              * queue till it's empty, will make the loop hard to be terminated,
291              * which will kill a lot of time, and greatly affect performance of
292              * the poller loop.
293              */

294             int size = events.size();
295             for (int i = 0; i < size && (r = events.poll()) != null; i++) {
296                 r.run();
297             }
298             return (size > 0);
299         }
300
301         @Override
302         public void run() {
303             while (run) {
304                 try {
305                     events();
306                     int keyCount = 0;
307                     try {
308                         int i = wakeupCounter.get();
309                         if (i > 0) {
310                             keyCount = selector.selectNow();
311                         } else {
312                             wakeupCounter.set(-1);
313                             keyCount = selector.select(1000);
314                         }
315                         wakeupCounter.set(0);
316                         if (!run) {
317                             break;
318                         }
319                     } catch (NullPointerException x) {
320                         // sun bug 5076772 on windows JDK 1.5
321                         if (selector == null) {
322                             throw x;
323                         }
324                         if (log.isDebugEnabled()) {
325                             log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5", x);
326                         }
327                         continue;
328                     } catch (CancelledKeyException x) {
329                         // sun bug 5076772 on windows JDK 1.5
330                         if (log.isDebugEnabled()) {
331                             log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5", x);
332                         }
333                         continue;
334                     } catch (Throwable x) {
335                         ExceptionUtils.handleThrowable(x);
336                         log.error(sm.getString("nioBlockingSelector.selectError"), x);
337                         continue;
338                     }
339
340                     Iterator<SelectionKey> iterator = keyCount > 0
341                             ? selector.selectedKeys().iterator()
342                             : null;
343
344                     // Walk through the collection of ready keys and dispatch
345                     // any active event.
346                     while (run && iterator != null && iterator.hasNext()) {
347                         SelectionKey sk = iterator.next();
348                         NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
349                         try {
350                             iterator.remove();
351                             sk.interestOps(sk.interestOps() & (~sk.readyOps()));
352                             if (sk.isReadable()) {
353                                 countDown(socketWrapper.getReadLatch());
354                             }
355                             if (sk.isWritable()) {
356                                 countDown(socketWrapper.getWriteLatch());
357                             }
358                         } catch (CancelledKeyException ckx) {
359                             sk.cancel();
360                             countDown(socketWrapper.getReadLatch());
361                             countDown(socketWrapper.getWriteLatch());
362                         }
363                     }
364                 } catch (Throwable t) {
365                     log.error(sm.getString("nioBlockingSelector.processingError"), t);
366                 }
367             }
368             events.clear();
369             // If using a shared selector, the NioSelectorPool will also try and
370             // close the selector. Try and avoid the ClosedSelectorException
371             // although because multiple threads are involved there is always
372             // the possibility of an Exception here.
373             if (selector.isOpen()) {
374                 try {
375                     // Cancels all remaining keys
376                     selector.selectNow();
377                 } catch (Exception ignore) {
378                     if (log.isDebugEnabled())
379                         log.debug("", ignore);
380                 }
381             }
382             try {
383                 selector.close();
384             } catch (Exception ignore) {
385                 if (log.isDebugEnabled())
386                     log.debug("", ignore);
387             }
388         }
389
390         public void countDown(CountDownLatch latch) {
391             if (latch == null) {
392                 return;
393             }
394             latch.countDown();
395         }
396
397
398         private class RunnableAdd implements Runnable {
399
400             private final SocketChannel ch;
401             private final NioSocketWrapper key;
402             private final int ops;
403             private final KeyReference ref;
404
405             public RunnableAdd(SocketChannel ch, NioSocketWrapper key, int ops, KeyReference ref) {
406                 this.ch = ch;
407                 this.key = key;
408                 this.ops = ops;
409                 this.ref = ref;
410             }
411
412             @Override
413             public void run() {
414                 SelectionKey sk = ch.keyFor(selector);
415                 try {
416                     if (sk == null) {
417                         sk = ch.register(selector, ops, key);
418                         ref.key = sk;
419                     } else if (!sk.isValid()) {
420                         cancel(sk, key, ops);
421                     } else {
422                         sk.interestOps(sk.interestOps() | ops);
423                     }
424                 } catch (CancelledKeyException cx) {
425                     cancel(sk, key, ops);
426                 } catch (ClosedChannelException cx) {
427                     cancel(null, key, ops);
428                 }
429             }
430         }
431
432
433         private class RunnableRemove implements Runnable {
434
435             private final SocketChannel ch;
436             private final NioSocketWrapper key;
437             private final int ops;
438
439             public RunnableRemove(SocketChannel ch, NioSocketWrapper key, int ops) {
440                 this.ch = ch;
441                 this.key = key;
442                 this.ops = ops;
443             }
444
445             @Override
446             public void run() {
447                 SelectionKey sk = ch.keyFor(selector);
448                 try {
449                     if (sk == null) {
450                         if (SelectionKey.OP_WRITE == (ops & SelectionKey.OP_WRITE)) {
451                             countDown(key.getWriteLatch());
452                         }
453                         if (SelectionKey.OP_READ == (ops & SelectionKey.OP_READ)) {
454                             countDown(key.getReadLatch());
455                         }
456                     } else {
457                         if (sk.isValid()) {
458                             sk.interestOps(sk.interestOps() & (~ops));
459                             if (SelectionKey.OP_WRITE == (ops & SelectionKey.OP_WRITE)) {
460                                 countDown(key.getWriteLatch());
461                             }
462                             if (SelectionKey.OP_READ == (ops & SelectionKey.OP_READ)) {
463                                 countDown(key.getReadLatch());
464                             }
465                             if (sk.interestOps() == 0) {
466                                 sk.cancel();
467                                 sk.attach(null);
468                             }
469                         } else {
470                             sk.cancel();
471                             sk.attach(null);
472                         }
473                     }
474                 } catch (CancelledKeyException cx) {
475                     if (sk != null) {
476                         sk.cancel();
477                         sk.attach(null);
478                     }
479                 }
480             }
481
482         }
483
484
485         public static class RunnableCancel implements Runnable {
486
487             private final SelectionKey key;
488
489             public RunnableCancel(SelectionKey key) {
490                 this.key = key;
491             }
492
493             @Override
494             public void run() {
495                 key.cancel();
496             }
497         }
498     }
499
500
501     public static class KeyReference {
502         SelectionKey key = null;
503
504         @Override
505         protected void finalize() {
506             if (key != null && key.isValid()) {
507                 log.warn(sm.getString("nioBlockingSelector.possibleLeak"));
508                 try {
509                     key.cancel();
510                 } catch (Exception ignore) {
511                 }
512             }
513         }
514     }
515 }
516