1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.tomcat.util.net;
18
19 import java.io.EOFException;
20 import java.io.IOException;
21 import java.net.SocketTimeoutException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.CompletionHandler;
24 import java.nio.channels.InterruptedByTimeoutException;
25 import java.nio.channels.ReadPendingException;
26 import java.nio.channels.WritePendingException;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.RejectedExecutionException;
29 import java.util.concurrent.Semaphore;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicBoolean;
32
33 import org.apache.juli.logging.Log;
34 import org.apache.juli.logging.LogFactory;
35 import org.apache.tomcat.util.ExceptionUtils;
36 import org.apache.tomcat.util.res.StringManager;
37
38 public abstract class SocketWrapperBase<E> {
39
40 private static final Log log = LogFactory.getLog(SocketWrapperBase.class);
41
42 protected static final StringManager sm = StringManager.getManager(SocketWrapperBase.class);
43
44 private E socket;
45 private final AbstractEndpoint<E,?> endpoint;
46
47 protected final AtomicBoolean closed = new AtomicBoolean(false);
48
49 // Volatile because I/O and setting the timeout values occurs on a different
50 // thread to the thread checking the timeout.
51 private volatile long readTimeout = -1;
52 private volatile long writeTimeout = -1;
53
54 private volatile int keepAliveLeft = 100;
55 private volatile boolean upgraded = false;
56 private boolean secure = false;
57 private String negotiatedProtocol = null;
58
59 /*
60 * Following cached for speed / reduced GC
61 */
62 protected String localAddr = null;
63 protected String localName = null;
64 protected int localPort = -1;
65 protected String remoteAddr = null;
66 protected String remoteHost = null;
67 protected int remotePort = -1;
68
69 /**
70 * Used to record the first IOException that occurs during non-blocking
71 * read/writes that can't be usefully propagated up the stack since there is
72 * no user code or appropriate container code in the stack to handle it.
73 */
74 private volatile IOException error = null;
75
76 /**
77 * The buffers used for communicating with the socket.
78 */
79 protected volatile SocketBufferHandler socketBufferHandler = null;
80
81 /**
82 * The max size of the individual buffered write buffers
83 */
84 protected int bufferedWriteSize = 64 * 1024; // 64k default write buffer
85
86 /**
87 * Additional buffer used for non-blocking writes. Non-blocking writes need
88 * to return immediately even if the data cannot be written immediately but
89 * the socket buffer may not be big enough to hold all of the unwritten
90 * data. This structure provides an additional buffer to hold the data until
91 * it can be written.
92 * Not that while the Servlet API only allows one non-blocking write at a
93 * time, due to buffering and the possible need to write HTTP headers, this
94 * layer may see multiple writes.
95 */
96 protected final WriteBuffer nonBlockingWriteBuffer = new WriteBuffer(bufferedWriteSize);
97
98 /*
99 * Asynchronous operations.
100 */
101 protected final Semaphore readPending;
102 protected volatile OperationState<?> readOperation = null;
103 protected final Semaphore writePending;
104 protected volatile OperationState<?> writeOperation = null;
105
106 /**
107 * The org.apache.coyote.Processor instance currently associated
108 * with the wrapper.
109 */
110 protected Object currentProcessor = null;
111
112 public SocketWrapperBase(E socket, AbstractEndpoint<E,?> endpoint) {
113 this.socket = socket;
114 this.endpoint = endpoint;
115 if (endpoint.getUseAsyncIO() || needSemaphores()) {
116 readPending = new Semaphore(1);
117 writePending = new Semaphore(1);
118 } else {
119 readPending = null;
120 writePending = null;
121 }
122 }
123
124 public E getSocket() {
125 return socket;
126 }
127
128 protected void reset(E closedSocket) {
129 socket = closedSocket;
130 }
131
132 protected AbstractEndpoint<E,?> getEndpoint() {
133 return endpoint;
134 }
135
136 public Object getCurrentProcessor() {
137 return currentProcessor;
138 }
139
140 public void setCurrentProcessor(Object currentProcessor) {
141 this.currentProcessor = currentProcessor;
142 }
143
144 /**
145 * Transfers processing to a container thread.
146 *
147 * @param runnable The actions to process on a container thread
148 *
149 * @throws RejectedExecutionException If the runnable cannot be executed
150 */
151 public void execute(Runnable runnable) {
152 Executor executor = endpoint.getExecutor();
153 if (!endpoint.isRunning() || executor == null) {
154 throw new RejectedExecutionException();
155 }
156 executor.execute(runnable);
157 }
158
159 public IOException getError() { return error; }
160 public void setError(IOException error) {
161 // Not perfectly thread-safe but good enough. Just needs to ensure that
162 // once this.error is non-null, it can never be null.
163 if (this.error != null) {
164 return;
165 }
166 this.error = error;
167 }
168 public void checkError() throws IOException {
169 if (error != null) {
170 throw error;
171 }
172 }
173
174 public boolean isUpgraded() { return upgraded; }
175 public void setUpgraded(boolean upgraded) { this.upgraded = upgraded; }
176 public boolean isSecure() { return secure; }
177 public void setSecure(boolean secure) { this.secure = secure; }
178 public String getNegotiatedProtocol() { return negotiatedProtocol; }
179 public void setNegotiatedProtocol(String negotiatedProtocol) {
180 this.negotiatedProtocol = negotiatedProtocol;
181 }
182
183 /**
184 * Set the timeout for reading. Values of zero or less will be changed to
185 * -1.
186 *
187 * @param readTimeout The timeout in milliseconds. A value of -1 indicates
188 * an infinite timeout.
189 */
190 public void setReadTimeout(long readTimeout) {
191 if (readTimeout > 0) {
192 this.readTimeout = readTimeout;
193 } else {
194 this.readTimeout = -1;
195 }
196 }
197
198 public long getReadTimeout() {
199 return this.readTimeout;
200 }
201
202 /**
203 * Set the timeout for writing. Values of zero or less will be changed to
204 * -1.
205 *
206 * @param writeTimeout The timeout in milliseconds. A value of zero or less
207 * indicates an infinite timeout.
208 */
209 public void setWriteTimeout(long writeTimeout) {
210 if (writeTimeout > 0) {
211 this.writeTimeout = writeTimeout;
212 } else {
213 this.writeTimeout = -1;
214 }
215 }
216
217 public long getWriteTimeout() {
218 return this.writeTimeout;
219 }
220
221
222 public void setKeepAliveLeft(int keepAliveLeft) { this.keepAliveLeft = keepAliveLeft; }
223 public int decrementKeepAlive() { return (--keepAliveLeft); }
224
225 public String getRemoteHost() {
226 if (remoteHost == null) {
227 populateRemoteHost();
228 }
229 return remoteHost;
230 }
231 protected abstract void populateRemoteHost();
232
233 public String getRemoteAddr() {
234 if (remoteAddr == null) {
235 populateRemoteAddr();
236 }
237 return remoteAddr;
238 }
239 protected abstract void populateRemoteAddr();
240
241 public int getRemotePort() {
242 if (remotePort == -1) {
243 populateRemotePort();
244 }
245 return remotePort;
246 }
247 protected abstract void populateRemotePort();
248
249 public String getLocalName() {
250 if (localName == null) {
251 populateLocalName();
252 }
253 return localName;
254 }
255 protected abstract void populateLocalName();
256
257 public String getLocalAddr() {
258 if (localAddr == null) {
259 populateLocalAddr();
260 }
261 return localAddr;
262 }
263 protected abstract void populateLocalAddr();
264
265 public int getLocalPort() {
266 if (localPort == -1) {
267 populateLocalPort();
268 }
269 return localPort;
270 }
271 protected abstract void populateLocalPort();
272
273 public SocketBufferHandler getSocketBufferHandler() { return socketBufferHandler; }
274
275 public boolean hasDataToRead() {
276 // Return true because it is always safe to make a read attempt
277 return true;
278 }
279
280 public boolean hasDataToWrite() {
281 return !socketBufferHandler.isWriteBufferEmpty() || !nonBlockingWriteBuffer.isEmpty();
282 }
283
284 /**
285 * Checks to see if there are any writes pending and if there are calls
286 * {@link #registerWriteInterest()} to trigger a callback once the pending
287 * writes have completed.
288 * <p>
289 * Note: Once this method has returned <code>false</code> it <b>MUST NOT</b>
290 * be called again until the pending write has completed and the
291 * callback has been fired.
292 * TODO: Modify {@link #registerWriteInterest()} so the above
293 * restriction is enforced there rather than relying on the caller.
294 *
295 * @return <code>true</code> if no writes are pending and data can be
296 * written otherwise <code>false</code>
297 */
298 public boolean isReadyForWrite() {
299 boolean result = canWrite();
300 if (!result) {
301 registerWriteInterest();
302 }
303 return result;
304 }
305
306
307 public boolean canWrite() {
308 if (socketBufferHandler == null) {
309 throw new IllegalStateException(sm.getString("socket.closed"));
310 }
311 return socketBufferHandler.isWriteBufferWritable() && nonBlockingWriteBuffer.isEmpty();
312 }
313
314
315 /**
316 * Overridden for debug purposes. No guarantees are made about the format of
317 * this message which may vary significantly between point releases.
318 * <p>
319 * {@inheritDoc}
320 */
321 @Override
322 public String toString() {
323 return super.toString() + ":" + String.valueOf(socket);
324 }
325
326
327 public abstract int read(boolean block, byte[] b, int off, int len) throws IOException;
328 public abstract int read(boolean block, ByteBuffer to) throws IOException;
329 public abstract boolean isReadyForRead() throws IOException;
330 public abstract void setAppReadBufHandler(ApplicationBufferHandler handler);
331
332 protected int populateReadBuffer(byte[] b, int off, int len) {
333 socketBufferHandler.configureReadBufferForRead();
334 ByteBuffer readBuffer = socketBufferHandler.getReadBuffer();
335 int remaining = readBuffer.remaining();
336
337 // Is there enough data in the read buffer to satisfy this request?
338 // Copy what data there is in the read buffer to the byte array
339 if (remaining > 0) {
340 remaining = Math.min(remaining, len);
341 readBuffer.get(b, off, remaining);
342
343 if (log.isDebugEnabled()) {
344 log.debug("Socket: [" + this + "], Read from buffer: [" + remaining + "]");
345 }
346 }
347 return remaining;
348 }
349
350
351 protected int populateReadBuffer(ByteBuffer to) {
352 // Is there enough data in the read buffer to satisfy this request?
353 // Copy what data there is in the read buffer to the byte array
354 socketBufferHandler.configureReadBufferForRead();
355 int nRead = transfer(socketBufferHandler.getReadBuffer(), to);
356
357 if (log.isDebugEnabled()) {
358 log.debug("Socket: [" + this + "], Read from buffer: [" + nRead + "]");
359 }
360 return nRead;
361 }
362
363
364 /**
365 * Return input that has been read to the input buffer for re-reading by the
366 * correct component. There are times when a component may read more data
367 * than it needs before it passes control to another component. One example
368 * of this is during HTTP upgrade. If an (arguably misbehaving client) sends
369 * data associated with the upgraded protocol before the HTTP upgrade
370 * completes, the HTTP handler may read it. This method provides a way for
371 * that data to be returned so it can be processed by the correct component.
372 *
373 * @param returnedInput The input to return to the input buffer.
374 */
375 public void unRead(ByteBuffer returnedInput) {
376 if (returnedInput != null) {
377 socketBufferHandler.unReadReadBuffer(returnedInput);
378 }
379 }
380
381
382 /**
383 * Close the socket wrapper.
384 */
385 public void close() {
386 if (closed.compareAndSet(false, true)) {
387 try {
388 getEndpoint().getHandler().release(this);
389 } catch (Throwable e) {
390 ExceptionUtils.handleThrowable(e);
391 if (log.isDebugEnabled()) {
392 log.error(sm.getString("endpoint.debug.handlerRelease"), e);
393 }
394 } finally {
395 getEndpoint().countDownConnection();
396 doClose();
397 }
398 }
399 }
400
401 /**
402 * Perform the actual close. The closed atomic boolean guarantees this will
403 * be called only once per wrapper.
404 */
405 protected abstract void doClose();
406
407 /**
408 * @return true if the wrapper has been closed
409 */
410 public boolean isClosed() {
411 return closed.get();
412 }
413
414
415 /**
416 * Writes the provided data to the socket write buffer. If the socket write
417 * buffer fills during the write, the content of the socket write buffer is
418 * written to the network and this method starts to fill the socket write
419 * buffer again. Depending on the size of the data to write, there may be
420 * multiple writes to the network.
421 * <p>
422 * Non-blocking writes must return immediately and the byte array holding
423 * the data to be written must be immediately available for re-use. It may
424 * not be possible to write sufficient data to the network to allow this to
425 * happen. In this case data that cannot be written to the network and
426 * cannot be held by the socket buffer is stored in the non-blocking write
427 * buffer.
428 * <p>
429 * Note: There is an implementation assumption that, before switching from
430 * non-blocking writes to blocking writes, any data remaining in the
431 * non-blocking write buffer will have been written to the network.
432 *
433 * @param block <code>true</code> if a blocking write should be used,
434 * otherwise a non-blocking write will be used
435 * @param buf The byte array containing the data to be written
436 * @param off The offset within the byte array of the data to be written
437 * @param len The length of the data to be written
438 *
439 * @throws IOException If an IO error occurs during the write
440 */
441 public final void write(boolean block, byte[] buf, int off, int len) throws IOException {
442 if (len == 0 || buf == null) {
443 return;
444 }
445
446 /*
447 * While the implementations for blocking and non-blocking writes are
448 * very similar they have been split into separate methods:
449 * - To allow sub-classes to override them individually. NIO2, for
450 * example, overrides the non-blocking write but not the blocking
451 * write.
452 * - To enable a marginally more efficient implemented for blocking
453 * writes which do not require the additional checks related to the
454 * use of the non-blocking write buffer
455 */
456 if (block) {
457 writeBlocking(buf, off, len);
458 } else {
459 writeNonBlocking(buf, off, len);
460 }
461 }
462
463
464 /**
465 * Writes the provided data to the socket write buffer. If the socket write
466 * buffer fills during the write, the content of the socket write buffer is
467 * written to the network and this method starts to fill the socket write
468 * buffer again. Depending on the size of the data to write, there may be
469 * multiple writes to the network.
470 * <p>
471 * Non-blocking writes must return immediately and the ByteBuffer holding
472 * the data to be written must be immediately available for re-use. It may
473 * not be possible to write sufficient data to the network to allow this to
474 * happen. In this case data that cannot be written to the network and
475 * cannot be held by the socket buffer is stored in the non-blocking write
476 * buffer.
477 * <p>
478 * Note: There is an implementation assumption that, before switching from
479 * non-blocking writes to blocking writes, any data remaining in the
480 * non-blocking write buffer will have been written to the network.
481 *
482 * @param block <code>true</code> if a blocking write should be used,
483 * otherwise a non-blocking write will be used
484 * @param from The ByteBuffer containing the data to be written
485 *
486 * @throws IOException If an IO error occurs during the write
487 */
488 public final void write(boolean block, ByteBuffer from) throws IOException {
489 if (from == null || from.remaining() == 0) {
490 return;
491 }
492
493 /*
494 * While the implementations for blocking and non-blocking writes are
495 * very similar they have been split into separate methods:
496 * - To allow sub-classes to override them individually. NIO2, for
497 * example, overrides the non-blocking write but not the blocking
498 * write.
499 * - To enable a marginally more efficient implemented for blocking
500 * writes which do not require the additional checks related to the
501 * use of the non-blocking write buffer
502 */
503 if (block) {
504 writeBlocking(from);
505 } else {
506 writeNonBlocking(from);
507 }
508 }
509
510
511 /**
512 * Writes the provided data to the socket write buffer. If the socket write
513 * buffer fills during the write, the content of the socket write buffer is
514 * written to the network using a blocking write. Once that blocking write
515 * is complete, this method starts to fill the socket write buffer again.
516 * Depending on the size of the data to write, there may be multiple writes
517 * to the network. On completion of this method there will always be space
518 * remaining in the socket write buffer.
519 *
520 * @param buf The byte array containing the data to be written
521 * @param off The offset within the byte array of the data to be written
522 * @param len The length of the data to be written
523 *
524 * @throws IOException If an IO error occurs during the write
525 */
526 protected void writeBlocking(byte[] buf, int off, int len) throws IOException {
527 if (len > 0) {
528 socketBufferHandler.configureWriteBufferForWrite();
529 int thisTime = transfer(buf, off, len, socketBufferHandler.getWriteBuffer());
530 len -= thisTime;
531 while (len > 0) {
532 off += thisTime;
533 doWrite(true);
534 socketBufferHandler.configureWriteBufferForWrite();
535 thisTime = transfer(buf, off, len, socketBufferHandler.getWriteBuffer());
536 len -= thisTime;
537 }
538 }
539 }
540
541
542 /**
543 * Writes the provided data to the socket write buffer. If the socket write
544 * buffer fills during the write, the content of the socket write buffer is
545 * written to the network using a blocking write. Once that blocking write
546 * is complete, this method starts to fill the socket write buffer again.
547 * Depending on the size of the data to write, there may be multiple writes
548 * to the network. On completion of this method there will always be space
549 * remaining in the socket write buffer.
550 *
551 * @param from The ByteBuffer containing the data to be written
552 *
553 * @throws IOException If an IO error occurs during the write
554 */
555 protected void writeBlocking(ByteBuffer from) throws IOException {
556 if (from.hasRemaining()) {
557 socketBufferHandler.configureWriteBufferForWrite();
558 transfer(from, socketBufferHandler.getWriteBuffer());
559 while (from.hasRemaining()) {
560 doWrite(true);
561 socketBufferHandler.configureWriteBufferForWrite();
562 transfer(from, socketBufferHandler.getWriteBuffer());
563 }
564 }
565 }
566
567
568 /**
569 * Transfers the data to the socket write buffer (writing that data to the
570 * socket if the buffer fills up using a non-blocking write) until either
571 * all the data has been transferred and space remains in the socket write
572 * buffer or a non-blocking write leaves data in the socket write buffer.
573 * After an incomplete write, any data remaining to be transferred to the
574 * socket write buffer will be copied to the socket write buffer. If the
575 * remaining data is too big for the socket write buffer, the socket write
576 * buffer will be filled and the additional data written to the non-blocking
577 * write buffer.
578 *
579 * @param buf The byte array containing the data to be written
580 * @param off The offset within the byte array of the data to be written
581 * @param len The length of the data to be written
582 *
583 * @throws IOException If an IO error occurs during the write
584 */
585 protected void writeNonBlocking(byte[] buf, int off, int len) throws IOException {
586 if (len > 0 && nonBlockingWriteBuffer.isEmpty()
587 && socketBufferHandler.isWriteBufferWritable()) {
588 socketBufferHandler.configureWriteBufferForWrite();
589 int thisTime = transfer(buf, off, len, socketBufferHandler.getWriteBuffer());
590 len -= thisTime;
591 while (len > 0) {
592 off = off + thisTime;
593 doWrite(false);
594 if (len > 0 && socketBufferHandler.isWriteBufferWritable()) {
595 socketBufferHandler.configureWriteBufferForWrite();
596 thisTime = transfer(buf, off, len, socketBufferHandler.getWriteBuffer());
597 } else {
598 // Didn't write any data in the last non-blocking write.
599 // Therefore the write buffer will still be full. Nothing
600 // else to do here. Exit the loop.
601 break;
602 }
603 len -= thisTime;
604 }
605 }
606
607 if (len > 0) {
608 // Remaining data must be buffered
609 nonBlockingWriteBuffer.add(buf, off, len);
610 }
611 }
612
613
614 /**
615 * Transfers the data to the socket write buffer (writing that data to the
616 * socket if the buffer fills up using a non-blocking write) until either
617 * all the data has been transferred and space remains in the socket write
618 * buffer or a non-blocking write leaves data in the socket write buffer.
619 * After an incomplete write, any data remaining to be transferred to the
620 * socket write buffer will be copied to the socket write buffer. If the
621 * remaining data is too big for the socket write buffer, the socket write
622 * buffer will be filled and the additional data written to the non-blocking
623 * write buffer.
624 *
625 * @param from The ByteBuffer containing the data to be written
626 *
627 * @throws IOException If an IO error occurs during the write
628 */
629 protected void writeNonBlocking(ByteBuffer from)
630 throws IOException {
631
632 if (from.hasRemaining() && nonBlockingWriteBuffer.isEmpty()
633 && socketBufferHandler.isWriteBufferWritable()) {
634 writeNonBlockingInternal(from);
635 }
636
637 if (from.hasRemaining()) {
638 // Remaining data must be buffered
639 nonBlockingWriteBuffer.add(from);
640 }
641 }
642
643
644 /**
645 * Separate method so it can be re-used by the socket write buffer to write
646 * data to the network
647 *
648 * @param from The ByteBuffer containing the data to be written
649 *
650 * @throws IOException If an IO error occurs during the write
651 */
652 protected void writeNonBlockingInternal(ByteBuffer from) throws IOException {
653 socketBufferHandler.configureWriteBufferForWrite();
654 transfer(from, socketBufferHandler.getWriteBuffer());
655 while (from.hasRemaining()) {
656 doWrite(false);
657 if (socketBufferHandler.isWriteBufferWritable()) {
658 socketBufferHandler.configureWriteBufferForWrite();
659 transfer(from, socketBufferHandler.getWriteBuffer());
660 } else {
661 break;
662 }
663 }
664 }
665
666
667 /**
668 * Writes as much data as possible from any that remains in the buffers.
669 *
670 * @param block <code>true</code> if a blocking write should be used,
671 * otherwise a non-blocking write will be used
672 *
673 * @return <code>true</code> if data remains to be flushed after this method
674 * completes, otherwise <code>false</code>. In blocking mode
675 * therefore, the return value should always be <code>false</code>
676 *
677 * @throws IOException If an IO error occurs during the write
678 */
679 public boolean flush(boolean block) throws IOException {
680 boolean result = false;
681 if (block) {
682 // A blocking flush will always empty the buffer.
683 flushBlocking();
684 } else {
685 result = flushNonBlocking();
686 }
687
688 return result;
689 }
690
691
692 protected void flushBlocking() throws IOException {
693 doWrite(true);
694
695 if (!nonBlockingWriteBuffer.isEmpty()) {
696 nonBlockingWriteBuffer.write(this, true);
697
698 if (!socketBufferHandler.isWriteBufferEmpty()) {
699 doWrite(true);
700 }
701 }
702
703 }
704
705
706 protected boolean flushNonBlocking() throws IOException {
707 boolean dataLeft = !socketBufferHandler.isWriteBufferEmpty();
708
709 // Write to the socket, if there is anything to write
710 if (dataLeft) {
711 doWrite(false);
712 dataLeft = !socketBufferHandler.isWriteBufferEmpty();
713 }
714
715 if (!dataLeft && !nonBlockingWriteBuffer.isEmpty()) {
716 dataLeft = nonBlockingWriteBuffer.write(this, false);
717
718 if (!dataLeft && !socketBufferHandler.isWriteBufferEmpty()) {
719 doWrite(false);
720 dataLeft = !socketBufferHandler.isWriteBufferEmpty();
721 }
722 }
723
724 return dataLeft;
725 }
726
727
728 /**
729 * Write the contents of the socketWriteBuffer to the socket. For blocking
730 * writes either then entire contents of the buffer will be written or an
731 * IOException will be thrown. Partial blocking writes will not occur.
732 *
733 * @param block Should the write be blocking or not?
734 *
735 * @throws IOException If an I/O error such as a timeout occurs during the
736 * write
737 */
738 protected void doWrite(boolean block) throws IOException {
739 socketBufferHandler.configureWriteBufferForRead();
740 doWrite(block, socketBufferHandler.getWriteBuffer());
741 }
742
743
744 /**
745 * Write the contents of the ByteBuffer to the socket. For blocking writes
746 * either then entire contents of the buffer will be written or an
747 * IOException will be thrown. Partial blocking writes will not occur.
748 *
749 * @param block Should the write be blocking or not?
750 * @param from the ByteBuffer containing the data to be written
751 *
752 * @throws IOException If an I/O error such as a timeout occurs during the
753 * write
754 */
755 protected abstract void doWrite(boolean block, ByteBuffer from) throws IOException;
756
757
758 public void processSocket(SocketEvent socketStatus, boolean dispatch) {
759 endpoint.processSocket(this, socketStatus, dispatch);
760 }
761
762
763 public abstract void registerReadInterest();
764
765 public abstract void registerWriteInterest();
766
767 public abstract SendfileDataBase createSendfileData(String filename, long pos, long length);
768
769 /**
770 * Starts the sendfile process. It is expected that if the sendfile process
771 * does not complete during this call and does not report an error, that the
772 * caller <b>will not</b> add the socket to the poller (or equivalent). That
773 * is the responsibility of this method.
774 *
775 * @param sendfileData Data representing the file to send
776 *
777 * @return The state of the sendfile process after the first write.
778 */
779 public abstract SendfileState processSendfile(SendfileDataBase sendfileData);
780
781 /**
782 * Require the client to perform CLIENT-CERT authentication if it hasn't
783 * already done so.
784 *
785 * @param sslSupport The SSL/TLS support instance currently being used by
786 * the connection that may need updating after the client
787 * authentication
788 *
789 * @throws IOException If authentication is required then there will be I/O
790 * with the client and this exception will be thrown if
791 * that goes wrong
792 */
793 public abstract void doClientAuth(SSLSupport sslSupport) throws IOException;
794
795 public abstract SSLSupport getSslSupport(String clientCertProvider);
796
797
798 // ------------------------------------------------------- NIO 2 style APIs
799
800
801 public enum BlockingMode {
802 /**
803 * The operation will not block. If there are pending operations,
804 * the operation will throw a pending exception.
805 */
806 CLASSIC,
807 /**
808 * The operation will not block. If there are pending operations,
809 * the operation will return CompletionState.NOT_DONE.
810 */
811 NON_BLOCK,
812 /**
813 * The operation will block until pending operations are completed, but
814 * will not block after performing it.
815 */
816 SEMI_BLOCK,
817 /**
818 * The operation will block until completed.
819 */
820 BLOCK
821 }
822
823 public enum CompletionState {
824 /**
825 * Operation is still pending.
826 */
827 PENDING,
828 /**
829 * Operation was pending and non blocking.
830 */
831 NOT_DONE,
832 /**
833 * The operation completed inline.
834 */
835 INLINE,
836 /**
837 * The operation completed inline but failed.
838 */
839 ERROR,
840 /**
841 * The operation completed, but not inline.
842 */
843 DONE
844 }
845
846 public enum CompletionHandlerCall {
847 /**
848 * Operation should continue, the completion handler shouldn't be
849 * called.
850 */
851 CONTINUE,
852 /**
853 * The operation completed but the completion handler shouldn't be
854 * called.
855 */
856 NONE,
857 /**
858 * The operation is complete, the completion handler should be
859 * called.
860 */
861 DONE
862 }
863
864 public interface CompletionCheck {
865 /**
866 * Determine what call, if any, should be made to the completion
867 * handler.
868 *
869 * @param state of the operation (done or done in-line since the
870 * IO call is done)
871 * @param buffers ByteBuffer[] that has been passed to the
872 * original IO call
873 * @param offset that has been passed to the original IO call
874 * @param length that has been passed to the original IO call
875 *
876 * @return The call, if any, to make to the completion handler
877 */
878 public CompletionHandlerCall callHandler(CompletionState state, ByteBuffer[] buffers,
879 int offset, int length);
880 }
881
882 /**
883 * This utility CompletionCheck will cause the write to fully write
884 * all remaining data. If the operation completes inline, the
885 * completion handler will not be called.
886 */
887 public static final CompletionCheck COMPLETE_WRITE = new CompletionCheck() {
888 @Override
889 public CompletionHandlerCall callHandler(CompletionState state, ByteBuffer[] buffers,
890 int offset, int length) {
891 for (int i = 0; i < length; i++) {
892 if (buffers[offset + i].hasRemaining()) {
893 return CompletionHandlerCall.CONTINUE;
894 }
895 }
896 return (state == CompletionState.DONE) ? CompletionHandlerCall.DONE
897 : CompletionHandlerCall.NONE;
898 }
899 };
900
901 /**
902 * This utility CompletionCheck will cause the write to fully write
903 * all remaining data. The completion handler will then be called.
904 */
905 public static final CompletionCheck COMPLETE_WRITE_WITH_COMPLETION = new CompletionCheck() {
906 @Override
907 public CompletionHandlerCall callHandler(CompletionState state, ByteBuffer[] buffers,
908 int offset, int length) {
909 for (int i = 0; i < length; i++) {
910 if (buffers[offset + i].hasRemaining()) {
911 return CompletionHandlerCall.CONTINUE;
912 }
913 }
914 return CompletionHandlerCall.DONE;
915 }
916 };
917
918 /**
919 * This utility CompletionCheck will cause the completion handler
920 * to be called once some data has been read. If the operation
921 * completes inline, the completion handler will not be called.
922 */
923 public static final CompletionCheck READ_DATA = new CompletionCheck() {
924 @Override
925 public CompletionHandlerCall callHandler(CompletionState state, ByteBuffer[] buffers,
926 int offset, int length) {
927 return (state == CompletionState.DONE) ? CompletionHandlerCall.DONE
928 : CompletionHandlerCall.NONE;
929 }
930 };
931
932 /**
933 * This utility CompletionCheck will cause the completion handler
934 * to be called once the given buffers are full. The completion
935 * handler will then be called.
936 */
937 public static final CompletionCheck COMPLETE_READ_WITH_COMPLETION = COMPLETE_WRITE_WITH_COMPLETION;
938
939 /**
940 * This utility CompletionCheck will cause the completion handler
941 * to be called once the given buffers are full. If the operation
942 * completes inline, the completion handler will not be called.
943 */
944 public static final CompletionCheck COMPLETE_READ = COMPLETE_WRITE;
945
946 /**
947 * Internal state tracker for vectored operations.
948 */
949 protected abstract class OperationState<A> implements Runnable {
950 protected final boolean read;
951 protected final ByteBuffer[] buffers;
952 protected final int offset;
953 protected final int length;
954 protected final A attachment;
955 protected final long timeout;
956 protected final TimeUnit unit;
957 protected final BlockingMode block;
958 protected final CompletionCheck check;
959 protected final CompletionHandler<Long, ? super A> handler;
960 protected final Semaphore semaphore;
961 protected final VectoredIOCompletionHandler<A> completion;
962 protected final AtomicBoolean callHandler;
963 protected OperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
964 BlockingMode block, long timeout, TimeUnit unit, A attachment,
965 CompletionCheck check, CompletionHandler<Long, ? super A> handler,
966 Semaphore semaphore, VectoredIOCompletionHandler<A> completion) {
967 this.read = read;
968 this.buffers = buffers;
969 this.offset = offset;
970 this.length = length;
971 this.block = block;
972 this.timeout = timeout;
973 this.unit = unit;
974 this.attachment = attachment;
975 this.check = check;
976 this.handler = handler;
977 this.semaphore = semaphore;
978 this.completion = completion;
979 callHandler = (handler != null) ? new AtomicBoolean(true) : null;
980 }
981 protected volatile long nBytes = 0;
982 protected volatile CompletionState state = CompletionState.PENDING;
983 protected boolean completionDone = true;
984
985 /**
986 * @return true if the operation is still inline, false if the operation
987 * is running on a thread that is not the original caller
988 */
989 protected abstract boolean isInline();
990
991 /**
992 * Process the operation using the connector executor.
993 * @return true if the operation was accepted, false if the executor
994 * rejected execution
995 */
996 protected boolean process() {
997 try {
998 getEndpoint().getExecutor().execute(this);
999 return true;
1000 } catch (RejectedExecutionException ree) {
1001 log.warn(sm.getString("endpoint.executor.fail", SocketWrapperBase.this) , ree);
1002 } catch (Throwable t) {
1003 ExceptionUtils.handleThrowable(t);
1004 // This means we got an OOM or similar creating a thread, or that
1005 // the pool and its queue are full
1006 log.error(sm.getString("endpoint.process.fail"), t);
1007 }
1008 return false;
1009 }
1010
1011 /**
1012 * Start the operation, this will typically call run.
1013 */
1014 protected void start() {
1015 run();
1016 }
1017
1018 /**
1019 * End the operation.
1020 */
1021 protected void end() {
1022 }
1023
1024 }
1025
1026 /**
1027 * Completion handler for vectored operations. This will check the completion of the operation,
1028 * then either continue or call the user provided completion handler.
1029 */
1030 protected class VectoredIOCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> {
1031 @Override
1032 public void completed(Long nBytes, OperationState<A> state) {
1033 if (nBytes.longValue() < 0) {
1034 failed(new EOFException(), state);
1035 } else {
1036 state.nBytes += nBytes.longValue();
1037 CompletionState currentState = state.isInline() ? CompletionState.INLINE : CompletionState.DONE;
1038 boolean complete = true;
1039 boolean completion = true;
1040 if (state.check != null) {
1041 CompletionHandlerCall call = state.check.callHandler(currentState, state.buffers, state.offset, state.length);
1042 if (call == CompletionHandlerCall.CONTINUE) {
1043 complete = false;
1044 } else if (call == CompletionHandlerCall.NONE) {
1045 completion = false;
1046 }
1047 }
1048 if (complete) {
1049 boolean notify = false;
1050 state.semaphore.release();
1051 if (state.read) {
1052 readOperation = null;
1053 } else {
1054 writeOperation = null;
1055 }
1056 if (state.block == BlockingMode.BLOCK && currentState != CompletionState.INLINE) {
1057 notify = true;
1058 } else {
1059 state.state = currentState;
1060 }
1061 state.end();
1062 if (completion && state.handler != null && state.callHandler.compareAndSet(true, false)) {
1063 state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
1064 }
1065 synchronized (state) {
1066 state.completionDone = true;
1067 if (notify) {
1068 state.state = currentState;
1069 state.notify();
1070 }
1071 }
1072 } else {
1073 synchronized (state) {
1074 state.completionDone = true;
1075 }
1076 state.run();
1077 }
1078 }
1079 }
1080 @Override
1081 public void failed(Throwable exc, OperationState<A> state) {
1082 IOException ioe = null;
1083 if (exc instanceof InterruptedByTimeoutException) {
1084 ioe = new SocketTimeoutException();
1085 exc = ioe;
1086 } else if (exc instanceof IOException) {
1087 ioe = (IOException) exc;
1088 }
1089 setError(ioe);
1090 boolean notify = false;
1091 state.semaphore.release();
1092 if (state.read) {
1093 readOperation = null;
1094 } else {
1095 writeOperation = null;
1096 }
1097 if (state.block == BlockingMode.BLOCK) {
1098 notify = true;
1099 } else {
1100 state.state = state.isInline() ? CompletionState.ERROR : CompletionState.DONE;
1101 }
1102 state.end();
1103 if (state.handler != null && state.callHandler.compareAndSet(true, false)) {
1104 state.handler.failed(exc, state.attachment);
1105 }
1106 synchronized (state) {
1107 state.completionDone = true;
1108 if (notify) {
1109 state.state = state.isInline() ? CompletionState.ERROR : CompletionState.DONE;
1110 state.notify();
1111 }
1112 }
1113 }
1114 }
1115
1116 /**
1117 * Allows using NIO2 style read/write.
1118 *
1119 * @return {@code true} if the connector has the capability enabled
1120 */
1121 public boolean hasAsyncIO() {
1122 // The semaphores are only created if async IO is enabled
1123 return (readPending != null);
1124 }
1125
1126 /**
1127 * Allows indicating if the connector needs semaphores.
1128 *
1129 * @return This default implementation always returns {@code false}
1130 */
1131 public boolean needSemaphores() {
1132 return false;
1133 }
1134
1135 /**
1136 * Allows indicating if the connector supports per operation timeout.
1137 *
1138 * @return This default implementation always returns {@code false}
1139 */
1140 public boolean hasPerOperationTimeout() {
1141 return false;
1142 }
1143
1144 /**
1145 * Allows checking if an asynchronous read operation is currently pending.
1146 * @return <code>true</code> if the endpoint supports asynchronous IO and
1147 * a read operation is being processed asynchronously
1148 */
1149 public boolean isReadPending() {
1150 return false;
1151 }
1152
1153 /**
1154 * Allows checking if an asynchronous write operation is currently pending.
1155 * @return <code>true</code> if the endpoint supports asynchronous IO and
1156 * a write operation is being processed asynchronously
1157 */
1158 public boolean isWritePending() {
1159 return false;
1160 }
1161
1162 /**
1163 * If an asynchronous read operation is pending, this method will block
1164 * until the operation completes, or the specified amount of time
1165 * has passed.
1166 * @param timeout The maximum amount of time to wait
1167 * @param unit The unit for the timeout
1168 * @return <code>true</code> if the read operation is complete,
1169 * <code>false</code> if the operation is still pending and
1170 * the specified timeout has passed
1171 */
1172 @Deprecated
1173 public boolean awaitReadComplete(long timeout, TimeUnit unit) {
1174 return true;
1175 }
1176
1177 /**
1178 * If an asynchronous write operation is pending, this method will block
1179 * until the operation completes, or the specified amount of time
1180 * has passed.
1181 * @param timeout The maximum amount of time to wait
1182 * @param unit The unit for the timeout
1183 * @return <code>true</code> if the read operation is complete,
1184 * <code>false</code> if the operation is still pending and
1185 * the specified timeout has passed
1186 */
1187 @Deprecated
1188 public boolean awaitWriteComplete(long timeout, TimeUnit unit) {
1189 return true;
1190 }
1191
1192 /**
1193 * Scatter read. The completion handler will be called once some
1194 * data has been read or an error occurred. The default NIO2
1195 * behavior is used: the completion handler will be called as soon
1196 * as some data has been read, even if the read has completed inline.
1197 *
1198 * @param timeout timeout duration for the read
1199 * @param unit units for the timeout duration
1200 * @param attachment an object to attach to the I/O operation that will be
1201 * used when calling the completion handler
1202 * @param handler to call when the IO is complete
1203 * @param dsts buffers
1204 * @param <A> The attachment type
1205 * @return the completion state (done, done inline, or still pending)
1206 */
1207 public final <A> CompletionState read(long timeout, TimeUnit unit, A attachment,
1208 CompletionHandler<Long, ? super A> handler, ByteBuffer... dsts) {
1209 if (dsts == null) {
1210 throw new IllegalArgumentException();
1211 }
1212 return read(dsts, 0, dsts.length, BlockingMode.CLASSIC, timeout, unit, attachment, null, handler);
1213 }
1214
1215 /**
1216 * Scatter read. The completion handler will be called once some
1217 * data has been read or an error occurred. If a CompletionCheck
1218 * object has been provided, the completion handler will only be
1219 * called if the callHandler method returned true. If no
1220 * CompletionCheck object has been provided, the default NIO2
1221 * behavior is used: the completion handler will be called as soon
1222 * as some data has been read, even if the read has completed inline.
1223 *
1224 * @param block is the blocking mode that will be used for this operation
1225 * @param timeout timeout duration for the read
1226 * @param unit units for the timeout duration
1227 * @param attachment an object to attach to the I/O operation that will be
1228 * used when calling the completion handler
1229 * @param check for the IO operation completion
1230 * @param handler to call when the IO is complete
1231 * @param dsts buffers
1232 * @param <A> The attachment type
1233 * @return the completion state (done, done inline, or still pending)
1234 */
1235 public final <A> CompletionState read(BlockingMode block, long timeout,
1236 TimeUnit unit, A attachment, CompletionCheck check,
1237 CompletionHandler<Long, ? super A> handler, ByteBuffer... dsts) {
1238 if (dsts == null) {
1239 throw new IllegalArgumentException();
1240 }
1241 return read(dsts, 0, dsts.length, block, timeout, unit, attachment, check, handler);
1242 }
1243
1244 /**
1245 * Scatter read. The completion handler will be called once some
1246 * data has been read or an error occurred. If a CompletionCheck
1247 * object has been provided, the completion handler will only be
1248 * called if the callHandler method returned true. If no
1249 * CompletionCheck object has been provided, the default NIO2
1250 * behavior is used: the completion handler will be called as soon
1251 * as some data has been read, even if the read has completed inline.
1252 *
1253 * @param dsts buffers
1254 * @param offset in the buffer array
1255 * @param length in the buffer array
1256 * @param block is the blocking mode that will be used for this operation
1257 * @param timeout timeout duration for the read
1258 * @param unit units for the timeout duration
1259 * @param attachment an object to attach to the I/O operation that will be
1260 * used when calling the completion handler
1261 * @param check for the IO operation completion
1262 * @param handler to call when the IO is complete
1263 * @param <A> The attachment type
1264 * @return the completion state (done, done inline, or still pending)
1265 */
1266 public final <A> CompletionState read(ByteBuffer[] dsts, int offset, int length,
1267 BlockingMode block, long timeout, TimeUnit unit, A attachment,
1268 CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
1269 return vectoredOperation(true, dsts, offset, length, block, timeout, unit, attachment, check, handler);
1270 }
1271
1272 /**
1273 * Gather write. The completion handler will be called once some
1274 * data has been written or an error occurred. The default NIO2
1275 * behavior is used: the completion handler will be called, even
1276 * if the write is incomplete and data remains in the buffers, or
1277 * if the write completed inline.
1278 *
1279 * @param timeout timeout duration for the write
1280 * @param unit units for the timeout duration
1281 * @param attachment an object to attach to the I/O operation that will be
1282 * used when calling the completion handler
1283 * @param handler to call when the IO is complete
1284 * @param srcs buffers
1285 * @param <A> The attachment type
1286 * @return the completion state (done, done inline, or still pending)
1287 */
1288 public final <A> CompletionState write(long timeout, TimeUnit unit, A attachment,
1289 CompletionHandler<Long, ? super A> handler, ByteBuffer... srcs) {
1290 if (srcs == null) {
1291 throw new IllegalArgumentException();
1292 }
1293 return write(srcs, 0, srcs.length, BlockingMode.CLASSIC, timeout, unit, attachment, null, handler);
1294 }
1295
1296 /**
1297 * Gather write. The completion handler will be called once some
1298 * data has been written or an error occurred. If a CompletionCheck
1299 * object has been provided, the completion handler will only be
1300 * called if the callHandler method returned true. If no
1301 * CompletionCheck object has been provided, the default NIO2
1302 * behavior is used: the completion handler will be called, even
1303 * if the write is incomplete and data remains in the buffers, or
1304 * if the write completed inline.
1305 *
1306 * @param block is the blocking mode that will be used for this operation
1307 * @param timeout timeout duration for the write
1308 * @param unit units for the timeout duration
1309 * @param attachment an object to attach to the I/O operation that will be
1310 * used when calling the completion handler
1311 * @param check for the IO operation completion
1312 * @param handler to call when the IO is complete
1313 * @param srcs buffers
1314 * @param <A> The attachment type
1315 * @return the completion state (done, done inline, or still pending)
1316 */
1317 public final <A> CompletionState write(BlockingMode block, long timeout,
1318 TimeUnit unit, A attachment, CompletionCheck check,
1319 CompletionHandler<Long, ? super A> handler, ByteBuffer... srcs) {
1320 if (srcs == null) {
1321 throw new IllegalArgumentException();
1322 }
1323 return write(srcs, 0, srcs.length, block, timeout, unit, attachment, check, handler);
1324 }
1325
1326 /**
1327 * Gather write. The completion handler will be called once some
1328 * data has been written or an error occurred. If a CompletionCheck
1329 * object has been provided, the completion handler will only be
1330 * called if the callHandler method returned true. If no
1331 * CompletionCheck object has been provided, the default NIO2
1332 * behavior is used: the completion handler will be called, even
1333 * if the write is incomplete and data remains in the buffers, or
1334 * if the write completed inline.
1335 *
1336 * @param srcs buffers
1337 * @param offset in the buffer array
1338 * @param length in the buffer array
1339 * @param block is the blocking mode that will be used for this operation
1340 * @param timeout timeout duration for the write
1341 * @param unit units for the timeout duration
1342 * @param attachment an object to attach to the I/O operation that will be
1343 * used when calling the completion handler
1344 * @param check for the IO operation completion
1345 * @param handler to call when the IO is complete
1346 * @param <A> The attachment type
1347 * @return the completion state (done, done inline, or still pending)
1348 */
1349 public final <A> CompletionState write(ByteBuffer[] srcs, int offset, int length,
1350 BlockingMode block, long timeout, TimeUnit unit, A attachment,
1351 CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
1352 return vectoredOperation(false, srcs, offset, length, block, timeout, unit, attachment, check, handler);
1353 }
1354
1355
1356 /**
1357 * Vectored operation. The completion handler will be called once
1358 * the operation is complete or an error occurred. If a CompletionCheck
1359 * object has been provided, the completion handler will only be
1360 * called if the callHandler method returned true. If no
1361 * CompletionCheck object has been provided, the default NIO2
1362 * behavior is used: the completion handler will be called, even
1363 * if the operation is incomplete, or if the operation completed inline.
1364 *
1365 * @param read true if the operation is a read, false if it is a write
1366 * @param buffers buffers
1367 * @param offset in the buffer array
1368 * @param length in the buffer array
1369 * @param block is the blocking mode that will be used for this operation
1370 * @param timeout timeout duration for the write
1371 * @param unit units for the timeout duration
1372 * @param attachment an object to attach to the I/O operation that will be
1373 * used when calling the completion handler
1374 * @param check for the IO operation completion
1375 * @param handler to call when the IO is complete
1376 * @param <A> The attachment type
1377 * @return the completion state (done, done inline, or still pending)
1378 */
1379 protected final <A> CompletionState vectoredOperation(boolean read,
1380 ByteBuffer[] buffers, int offset, int length,
1381 BlockingMode block, long timeout, TimeUnit unit, A attachment,
1382 CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
1383 IOException ioe = getError();
1384 if (ioe != null) {
1385 handler.failed(ioe, attachment);
1386 return CompletionState.ERROR;
1387 }
1388 if (timeout == -1) {
1389 timeout = AbstractEndpoint.toTimeout(read ? getReadTimeout() : getWriteTimeout());
1390 unit = TimeUnit.MILLISECONDS;
1391 } else if (!hasPerOperationTimeout() && (unit.toMillis(timeout) != (read ? getReadTimeout() : getWriteTimeout()))) {
1392 if (read) {
1393 setReadTimeout(unit.toMillis(timeout));
1394 } else {
1395 setWriteTimeout(unit.toMillis(timeout));
1396 }
1397 }
1398 if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) {
1399 try {
1400 if (read ? !readPending.tryAcquire(timeout, unit) : !writePending.tryAcquire(timeout, unit)) {
1401 handler.failed(new SocketTimeoutException(), attachment);
1402 return CompletionState.ERROR;
1403 }
1404 } catch (InterruptedException e) {
1405 handler.failed(e, attachment);
1406 return CompletionState.ERROR;
1407 }
1408 } else {
1409 if (read ? !readPending.tryAcquire() : !writePending.tryAcquire()) {
1410 if (block == BlockingMode.NON_BLOCK) {
1411 return CompletionState.NOT_DONE;
1412 } else {
1413 handler.failed(read ? new ReadPendingException() : new WritePendingException(), attachment);
1414 return CompletionState.ERROR;
1415 }
1416 }
1417 }
1418 VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>();
1419 OperationState<A> state = newOperationState(read, buffers, offset, length, block, timeout, unit,
1420 attachment, check, handler, read ? readPending : writePending, completion);
1421 if (read) {
1422 readOperation = state;
1423 } else {
1424 writeOperation = state;
1425 }
1426 state.start();
1427 if (block == BlockingMode.BLOCK) {
1428 synchronized (state) {
1429 if (state.state == CompletionState.PENDING) {
1430 try {
1431 state.wait(unit.toMillis(timeout));
1432 if (state.state == CompletionState.PENDING) {
1433 if (handler != null && state.callHandler.compareAndSet(true, false)) {
1434 handler.failed(new SocketTimeoutException(), attachment);
1435 }
1436 return CompletionState.ERROR;
1437 }
1438 } catch (InterruptedException e) {
1439 if (handler != null && state.callHandler.compareAndSet(true, false)) {
1440 handler.failed(new SocketTimeoutException(), attachment);
1441 }
1442 return CompletionState.ERROR;
1443 }
1444 }
1445 }
1446 }
1447 return state.state;
1448 }
1449
1450 protected abstract <A> OperationState<A> newOperationState(boolean read,
1451 ByteBuffer[] buffers, int offset, int length,
1452 BlockingMode block, long timeout, TimeUnit unit, A attachment,
1453 CompletionCheck check, CompletionHandler<Long, ? super A> handler,
1454 Semaphore semaphore, VectoredIOCompletionHandler<A> completion);
1455
1456 // --------------------------------------------------------- Utility methods
1457
1458 protected static int transfer(byte[] from, int offset, int length, ByteBuffer to) {
1459 int max = Math.min(length, to.remaining());
1460 if (max > 0) {
1461 to.put(from, offset, max);
1462 }
1463 return max;
1464 }
1465
1466 protected static int transfer(ByteBuffer from, ByteBuffer to) {
1467 int max = Math.min(from.remaining(), to.remaining());
1468 if (max > 0) {
1469 int fromLimit = from.limit();
1470 from.limit(from.position() + max);
1471 to.put(from);
1472 from.limit(fromLimit);
1473 }
1474 return max;
1475 }
1476
1477 protected static boolean buffersArrayHasRemaining(ByteBuffer[] buffers, int offset, int length) {
1478 for (int pos = offset; pos < offset + length; pos++) {
1479 if (buffers[pos].hasRemaining()) {
1480 return true;
1481 }
1482 }
1483 return false;
1484 }
1485 }
1486