1
17 package org.apache.tomcat.util.net;
18
19 import java.io.EOFException;
20 import java.io.IOException;
21 import java.net.SocketTimeoutException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.CancelledKeyException;
24 import java.nio.channels.ClosedChannelException;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.nio.channels.SocketChannel;
28 import java.util.Iterator;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicInteger;
32
33 import org.apache.juli.logging.Log;
34 import org.apache.juli.logging.LogFactory;
35 import org.apache.tomcat.util.ExceptionUtils;
36 import org.apache.tomcat.util.collections.SynchronizedQueue;
37 import org.apache.tomcat.util.collections.SynchronizedStack;
38 import org.apache.tomcat.util.net.NioEndpoint.NioSocketWrapper;
39 import org.apache.tomcat.util.res.StringManager;
40
41 public class NioBlockingSelector {
42
43 private static final Log log = LogFactory.getLog(NioBlockingSelector.class);
44 protected static final StringManager sm = StringManager.getManager(NioBlockingSelector.class);
45
46 private final SynchronizedStack<KeyReference> keyReferenceStack =
47 new SynchronizedStack<>();
48
49 protected Selector sharedSelector;
50
51 protected BlockPoller poller;
52
53 public void open(String name, Selector selector) {
54 sharedSelector = selector;
55 poller = new BlockPoller();
56 poller.selector = sharedSelector;
57 poller.setDaemon(true);
58 poller.setName(name + "-BlockPoller");
59 poller.start();
60 }
61
62 public void close() {
63 if (poller != null) {
64 poller.disable();
65 poller.interrupt();
66 poller = null;
67 }
68 }
69
70
83 public int write(ByteBuffer buf, NioChannel socket, long writeTimeout)
84 throws IOException {
85 SelectionKey key = socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector());
86 if (key == null) {
87 throw new IOException(sm.getString("nioBlockingSelector.keyNotRegistered"));
88 }
89 KeyReference reference = keyReferenceStack.pop();
90 if (reference == null) {
91 reference = new KeyReference();
92 }
93 NioSocketWrapper att = (NioSocketWrapper) key.attachment();
94 int written = 0;
95 boolean timedout = false;
96 int keycount = 1;
97 long time = System.currentTimeMillis();
98 try {
99 while (!timedout && buf.hasRemaining()) {
100 if (keycount > 0) {
101 int cnt = socket.write(buf);
102 if (cnt == -1) {
103 throw new EOFException();
104 }
105 written += cnt;
106 if (cnt > 0) {
107 time = System.currentTimeMillis();
108 continue;
109 }
110 }
111 try {
112 if (att.getWriteLatch() == null || att.getWriteLatch().getCount() == 0) {
113 att.startWriteLatch(1);
114 }
115 poller.add(att, SelectionKey.OP_WRITE, reference);
116 att.awaitWriteLatch(AbstractEndpoint.toTimeout(writeTimeout), TimeUnit.MILLISECONDS);
117 } catch (InterruptedException ignore) {
118
119 }
120 if (att.getWriteLatch() != null && att.getWriteLatch().getCount() > 0) {
121
122 keycount = 0;
123 } else {
124
125 keycount = 1;
126 att.resetWriteLatch();
127 }
128
129 if (writeTimeout > 0 && (keycount == 0)) {
130 timedout = (System.currentTimeMillis() - time) >= writeTimeout;
131 }
132 }
133 if (timedout) {
134 throw new SocketTimeoutException();
135 }
136 } finally {
137 poller.remove(att, SelectionKey.OP_WRITE);
138 if (timedout && reference.key != null) {
139 poller.cancelKey(reference.key);
140 }
141 reference.key = null;
142 keyReferenceStack.push(reference);
143 }
144 return written;
145 }
146
147
160 public int read(ByteBuffer buf, NioChannel socket, long readTimeout) throws IOException {
161 SelectionKey key = socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector());
162 if (key == null) {
163 throw new IOException(sm.getString("nioBlockingSelector.keyNotRegistered"));
164 }
165 KeyReference reference = keyReferenceStack.pop();
166 if (reference == null) {
167 reference = new KeyReference();
168 }
169 NioSocketWrapper att = (NioSocketWrapper) key.attachment();
170 int read = 0;
171 boolean timedout = false;
172 int keycount = 1;
173 long time = System.currentTimeMillis();
174 try {
175 while (!timedout) {
176 if (keycount > 0) {
177 read = socket.read(buf);
178 if (read != 0) {
179 break;
180 }
181 }
182 try {
183 if (att.getReadLatch()==null || att.getReadLatch().getCount()==0) {
184 att.startReadLatch(1);
185 }
186 poller.add(att,SelectionKey.OP_READ, reference);
187 att.awaitReadLatch(AbstractEndpoint.toTimeout(readTimeout), TimeUnit.MILLISECONDS);
188 } catch (InterruptedException ignore) {
189
190 }
191 if ( att.getReadLatch()!=null && att.getReadLatch().getCount()> 0) {
192
193 keycount = 0;
194 }else {
195
196 keycount = 1;
197 att.resetReadLatch();
198 }
199 if (readTimeout >= 0 && (keycount == 0)) {
200 timedout = (System.currentTimeMillis() - time) >= readTimeout;
201 }
202 }
203 if (timedout) {
204 throw new SocketTimeoutException();
205 }
206 } finally {
207 poller.remove(att,SelectionKey.OP_READ);
208 if (timedout && reference.key != null) {
209 poller.cancelKey(reference.key);
210 }
211 reference.key = null;
212 keyReferenceStack.push(reference);
213 }
214 return read;
215 }
216
217
218 protected static class BlockPoller extends Thread {
219 protected volatile boolean run = true;
220 protected Selector selector = null;
221 protected final SynchronizedQueue<Runnable> events = new SynchronizedQueue<>();
222 public void disable() {
223 run = false;
224 selector.wakeup();
225 }
226 protected final AtomicInteger wakeupCounter = new AtomicInteger(0);
227
228 public void cancelKey(final SelectionKey key) {
229 Runnable r = new RunnableCancel(key);
230 events.offer(r);
231 wakeup();
232 }
233
234 public void wakeup() {
235 if (wakeupCounter.addAndGet(1)==0) selector.wakeup();
236 }
237
238 public void cancel(SelectionKey sk, NioSocketWrapper key, int ops){
239 if (sk != null) {
240 sk.cancel();
241 sk.attach(null);
242 if (SelectionKey.OP_WRITE == (ops & SelectionKey.OP_WRITE)) {
243 countDown(key.getWriteLatch());
244 }
245 if (SelectionKey.OP_READ == (ops & SelectionKey.OP_READ)) {
246 countDown(key.getReadLatch());
247 }
248 }
249 }
250
251 public void add(final NioSocketWrapper key, final int ops, final KeyReference ref) {
252 if (key == null) {
253 return;
254 }
255 NioChannel nch = key.getSocket();
256 final SocketChannel ch = nch.getIOChannel();
257 if (ch == null) {
258 return;
259 }
260 Runnable r = new RunnableAdd(ch, key, ops, ref);
261 events.offer(r);
262 wakeup();
263 }
264
265 public void remove(final NioSocketWrapper key, final int ops) {
266 if (key == null) {
267 return;
268 }
269 NioChannel nch = key.getSocket();
270 final SocketChannel ch = nch.getIOChannel();
271 if (ch == null) {
272 return;
273 }
274 Runnable r = new RunnableRemove(ch, key, ops);
275 events.offer(r);
276 wakeup();
277 }
278
279 public boolean events() {
280 Runnable r = null;
281
294 int size = events.size();
295 for (int i = 0; i < size && (r = events.poll()) != null; i++) {
296 r.run();
297 }
298 return (size > 0);
299 }
300
301 @Override
302 public void run() {
303 while (run) {
304 try {
305 events();
306 int keyCount = 0;
307 try {
308 int i = wakeupCounter.get();
309 if (i > 0) {
310 keyCount = selector.selectNow();
311 } else {
312 wakeupCounter.set(-1);
313 keyCount = selector.select(1000);
314 }
315 wakeupCounter.set(0);
316 if (!run) {
317 break;
318 }
319 } catch (NullPointerException x) {
320
321 if (selector == null) {
322 throw x;
323 }
324 if (log.isDebugEnabled()) {
325 log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5", x);
326 }
327 continue;
328 } catch (CancelledKeyException x) {
329
330 if (log.isDebugEnabled()) {
331 log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5", x);
332 }
333 continue;
334 } catch (Throwable x) {
335 ExceptionUtils.handleThrowable(x);
336 log.error(sm.getString("nioBlockingSelector.selectError"), x);
337 continue;
338 }
339
340 Iterator<SelectionKey> iterator = keyCount > 0
341 ? selector.selectedKeys().iterator()
342 : null;
343
344
345
346 while (run && iterator != null && iterator.hasNext()) {
347 SelectionKey sk = iterator.next();
348 NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
349 try {
350 iterator.remove();
351 sk.interestOps(sk.interestOps() & (~sk.readyOps()));
352 if (sk.isReadable()) {
353 countDown(socketWrapper.getReadLatch());
354 }
355 if (sk.isWritable()) {
356 countDown(socketWrapper.getWriteLatch());
357 }
358 } catch (CancelledKeyException ckx) {
359 sk.cancel();
360 countDown(socketWrapper.getReadLatch());
361 countDown(socketWrapper.getWriteLatch());
362 }
363 }
364 } catch (Throwable t) {
365 log.error(sm.getString("nioBlockingSelector.processingError"), t);
366 }
367 }
368 events.clear();
369
370
371
372
373 if (selector.isOpen()) {
374 try {
375
376 selector.selectNow();
377 } catch (Exception ignore) {
378 if (log.isDebugEnabled())
379 log.debug("", ignore);
380 }
381 }
382 try {
383 selector.close();
384 } catch (Exception ignore) {
385 if (log.isDebugEnabled())
386 log.debug("", ignore);
387 }
388 }
389
390 public void countDown(CountDownLatch latch) {
391 if (latch == null) {
392 return;
393 }
394 latch.countDown();
395 }
396
397
398 private class RunnableAdd implements Runnable {
399
400 private final SocketChannel ch;
401 private final NioSocketWrapper key;
402 private final int ops;
403 private final KeyReference ref;
404
405 public RunnableAdd(SocketChannel ch, NioSocketWrapper key, int ops, KeyReference ref) {
406 this.ch = ch;
407 this.key = key;
408 this.ops = ops;
409 this.ref = ref;
410 }
411
412 @Override
413 public void run() {
414 SelectionKey sk = ch.keyFor(selector);
415 try {
416 if (sk == null) {
417 sk = ch.register(selector, ops, key);
418 ref.key = sk;
419 } else if (!sk.isValid()) {
420 cancel(sk, key, ops);
421 } else {
422 sk.interestOps(sk.interestOps() | ops);
423 }
424 } catch (CancelledKeyException cx) {
425 cancel(sk, key, ops);
426 } catch (ClosedChannelException cx) {
427 cancel(null, key, ops);
428 }
429 }
430 }
431
432
433 private class RunnableRemove implements Runnable {
434
435 private final SocketChannel ch;
436 private final NioSocketWrapper key;
437 private final int ops;
438
439 public RunnableRemove(SocketChannel ch, NioSocketWrapper key, int ops) {
440 this.ch = ch;
441 this.key = key;
442 this.ops = ops;
443 }
444
445 @Override
446 public void run() {
447 SelectionKey sk = ch.keyFor(selector);
448 try {
449 if (sk == null) {
450 if (SelectionKey.OP_WRITE == (ops & SelectionKey.OP_WRITE)) {
451 countDown(key.getWriteLatch());
452 }
453 if (SelectionKey.OP_READ == (ops & SelectionKey.OP_READ)) {
454 countDown(key.getReadLatch());
455 }
456 } else {
457 if (sk.isValid()) {
458 sk.interestOps(sk.interestOps() & (~ops));
459 if (SelectionKey.OP_WRITE == (ops & SelectionKey.OP_WRITE)) {
460 countDown(key.getWriteLatch());
461 }
462 if (SelectionKey.OP_READ == (ops & SelectionKey.OP_READ)) {
463 countDown(key.getReadLatch());
464 }
465 if (sk.interestOps() == 0) {
466 sk.cancel();
467 sk.attach(null);
468 }
469 } else {
470 sk.cancel();
471 sk.attach(null);
472 }
473 }
474 } catch (CancelledKeyException cx) {
475 if (sk != null) {
476 sk.cancel();
477 sk.attach(null);
478 }
479 }
480 }
481
482 }
483
484
485 public static class RunnableCancel implements Runnable {
486
487 private final SelectionKey key;
488
489 public RunnableCancel(SelectionKey key) {
490 this.key = key;
491 }
492
493 @Override
494 public void run() {
495 key.cancel();
496 }
497 }
498 }
499
500
501 public static class KeyReference {
502 SelectionKey key = null;
503
504 @Override
505 protected void finalize() {
506 if (key != null && key.isValid()) {
507 log.warn(sm.getString("nioBlockingSelector.possibleLeak"));
508 try {
509 key.cancel();
510 } catch (Exception ignore) {
511 }
512 }
513 }
514 }
515 }
516