1 /*
2  *  Licensed to the Apache Software Foundation (ASF) under one or more
3  *  contributor license agreements.  See the NOTICE file distributed with
4  *  this work for additional information regarding copyright ownership.
5  *  The ASF licenses this file to You under the Apache License, Version 2.0
6  *  (the "License"); you may not use this file except in compliance with
7  *  the License.  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */

17 package org.apache.coyote;
18
19 import java.net.InetAddress;
20 import java.nio.ByteBuffer;
21 import java.util.Collections;
22 import java.util.HashSet;
23 import java.util.Set;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.Executor;
27 import java.util.concurrent.ScheduledExecutorService;
28 import java.util.concurrent.ScheduledFuture;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 import javax.management.InstanceNotFoundException;
34 import javax.management.MBeanRegistration;
35 import javax.management.MBeanRegistrationException;
36 import javax.management.MBeanServer;
37 import javax.management.MalformedObjectNameException;
38 import javax.management.ObjectName;
39 import javax.servlet.http.HttpUpgradeHandler;
40 import javax.servlet.http.WebConnection;
41
42 import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler;
43 import org.apache.coyote.http11.upgrade.UpgradeProcessorInternal;
44 import org.apache.juli.logging.Log;
45 import org.apache.tomcat.InstanceManager;
46 import org.apache.tomcat.util.ExceptionUtils;
47 import org.apache.tomcat.util.collections.SynchronizedStack;
48 import org.apache.tomcat.util.modeler.Registry;
49 import org.apache.tomcat.util.net.AbstractEndpoint;
50 import org.apache.tomcat.util.net.AbstractEndpoint.Handler;
51 import org.apache.tomcat.util.net.SocketEvent;
52 import org.apache.tomcat.util.net.SocketWrapperBase;
53 import org.apache.tomcat.util.res.StringManager;
54
55 public abstract class AbstractProtocol<S> implements ProtocolHandler,
56         MBeanRegistration {
57
58     /**
59      * The string manager for this package.
60      */

61     private static final StringManager sm = StringManager.getManager(AbstractProtocol.class);
62
63
64     /**
65      * Counter used to generate unique JMX names for connectors using automatic
66      * port binding.
67      */

68     private static final AtomicInteger nameCounter = new AtomicInteger(0);
69
70
71     /**
72      * Name of MBean for the Global Request Processor.
73      */

74     protected ObjectName rgOname = null;
75
76
77     /**
78      * Unique ID for this connector. Only used if the connector is configured
79      * to use a random port as the port will change if stop(), start() is
80      * called.
81      */

82     private int nameIndex = 0;
83
84
85     /**
86      * Endpoint that provides low-level network I/O - must be matched to the
87      * ProtocolHandler implementation (ProtocolHandler using NIO, requires NIO
88      * Endpoint etc.).
89      */

90     private final AbstractEndpoint<S,?> endpoint;
91
92
93     private Handler<S> handler;
94
95
96     private final Set<Processor> waitingProcessors =
97             Collections.newSetFromMap(new ConcurrentHashMap<Processor, Boolean>());
98
99     /**
100      * Controller for the timeout scheduling.
101      */

102     private ScheduledFuture<?> timeoutFuture = null;
103     private ScheduledFuture<?> monitorFuture;
104
105     public AbstractProtocol(AbstractEndpoint<S,?> endpoint) {
106         this.endpoint = endpoint;
107         setConnectionLinger(Constants.DEFAULT_CONNECTION_LINGER);
108         setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
109     }
110
111
112     // ----------------------------------------------- Generic property handling
113
114     /**
115      * Generic property setter used by the digester. Other code should not need
116      * to use this. The digester will only use this method if it can't find a
117      * more specific setter. That means the property belongs to the Endpoint,
118      * the ServerSocketFactory or some other lower level component. This method
119      * ensures that it is visible to both.
120      *
121      * @param name  The name of the property to set
122      * @param value The value, in string form, to set for the property
123      *
124      * @return <code>true</code> if the property was set successfully, otherwise
125      *         <code>false</code>
126      */

127     public boolean setProperty(String name, String value) {
128         return endpoint.setProperty(name, value);
129     }
130
131
132     /**
133      * Generic property getter used by the digester. Other code should not need
134      * to use this.
135      *
136      * @param name The name of the property to get
137      *
138      * @return The value of the property converted to a string
139      */

140     public String getProperty(String name) {
141         return endpoint.getProperty(name);
142     }
143
144
145     // ------------------------------- Properties managed by the ProtocolHandler
146
147     /**
148      * The adapter provides the link between the ProtocolHandler and the
149      * connector.
150      */

151     protected Adapter adapter;
152     @Override
153     public void setAdapter(Adapter adapter) { this.adapter = adapter; }
154     @Override
155     public Adapter getAdapter() { return adapter; }
156
157
158     /**
159      * The maximum number of idle processors that will be retained in the cache
160      * and re-used with a subsequent request. The default is 200. A value of -1
161      * means unlimited. In the unlimited case, the theoretical maximum number of
162      * cached Processor objects is {@link #getMaxConnections()} although it will
163      * usually be closer to {@link #getMaxThreads()}.
164      */

165     protected int processorCache = 200;
166     public int getProcessorCache() { return this.processorCache; }
167     public void setProcessorCache(int processorCache) {
168         this.processorCache = processorCache;
169     }
170
171
172     private String clientCertProvider = null;
173     /**
174      * When client certificate information is presented in a form other than
175      * instances of {@link java.security.cert.X509Certificate} it needs to be
176      * converted before it can be used and this property controls which JSSE
177      * provider is used to perform the conversion. For example it is used with
178      * the AJP connectors, the HTTP APR connector and with the
179      * {@link org.apache.catalina.valves.SSLValve}. If not specified, the
180      * default provider will be used.
181      *
182      * @return The name of the JSSE provider to use
183      */

184     public String getClientCertProvider() { return clientCertProvider; }
185     public void setClientCertProvider(String s) { this.clientCertProvider = s; }
186
187
188     private int maxHeaderCount = 100;
189     public int getMaxHeaderCount() {
190         return maxHeaderCount;
191     }
192     public void setMaxHeaderCount(int maxHeaderCount) {
193         this.maxHeaderCount = maxHeaderCount;
194     }
195
196
197     @Override
198     public boolean isAprRequired() {
199         return false;
200     }
201
202
203     @Override
204     public boolean isSendfileSupported() {
205         return endpoint.getUseSendfile();
206     }
207
208
209     // ---------------------- Properties that are passed through to the EndPoint
210
211     @Override
212     public Executor getExecutor() { return endpoint.getExecutor(); }
213     @Override
214     public void setExecutor(Executor executor) {
215         endpoint.setExecutor(executor);
216     }
217
218
219     @Override
220     public ScheduledExecutorService getUtilityExecutor() { return endpoint.getUtilityExecutor(); }
221     @Override
222     public void setUtilityExecutor(ScheduledExecutorService utilityExecutor) {
223         endpoint.setUtilityExecutor(utilityExecutor);
224     }
225
226
227     public int getMaxThreads() { return endpoint.getMaxThreads(); }
228     public void setMaxThreads(int maxThreads) {
229         endpoint.setMaxThreads(maxThreads);
230     }
231
232     public int getMaxConnections() { return endpoint.getMaxConnections(); }
233     public void setMaxConnections(int maxConnections) {
234         endpoint.setMaxConnections(maxConnections);
235     }
236
237
238     public int getMinSpareThreads() { return endpoint.getMinSpareThreads(); }
239     public void setMinSpareThreads(int minSpareThreads) {
240         endpoint.setMinSpareThreads(minSpareThreads);
241     }
242
243
244     public int getThreadPriority() { return endpoint.getThreadPriority(); }
245     public void setThreadPriority(int threadPriority) {
246         endpoint.setThreadPriority(threadPriority);
247     }
248
249
250     public int getAcceptCount() { return endpoint.getAcceptCount(); }
251     public void setAcceptCount(int acceptCount) { endpoint.setAcceptCount(acceptCount); }
252
253
254     public boolean getTcpNoDelay() { return endpoint.getTcpNoDelay(); }
255     public void setTcpNoDelay(boolean tcpNoDelay) {
256         endpoint.setTcpNoDelay(tcpNoDelay);
257     }
258
259
260     public int getConnectionLinger() { return endpoint.getConnectionLinger(); }
261     public void setConnectionLinger(int connectionLinger) {
262         endpoint.setConnectionLinger(connectionLinger);
263     }
264
265
266     /**
267      * The time Tomcat will wait for a subsequent request before closing the
268      * connection. The default is {@link #getConnectionTimeout()}.
269      *
270      * @return The timeout in milliseconds
271      */

272     public int getKeepAliveTimeout() { return endpoint.getKeepAliveTimeout(); }
273     public void setKeepAliveTimeout(int keepAliveTimeout) {
274         endpoint.setKeepAliveTimeout(keepAliveTimeout);
275     }
276
277     public InetAddress getAddress() { return endpoint.getAddress(); }
278     public void setAddress(InetAddress ia) {
279         endpoint.setAddress(ia);
280     }
281
282
283     public int getPort() { return endpoint.getPort(); }
284     public void setPort(int port) {
285         endpoint.setPort(port);
286     }
287
288
289     public int getPortOffset() { return endpoint.getPortOffset(); }
290     public void setPortOffset(int portOffset) {
291         endpoint.setPortOffset(portOffset);
292     }
293
294
295     public int getPortWithOffset() { return endpoint.getPortWithOffset(); }
296
297
298     public int getLocalPort() { return endpoint.getLocalPort(); }
299
300     /*
301      * When Tomcat expects data from the client, this is the time Tomcat will
302      * wait for that data to arrive before closing the connection.
303      */

304     public int getConnectionTimeout() {
305         return endpoint.getConnectionTimeout();
306     }
307     public void setConnectionTimeout(int timeout) {
308         endpoint.setConnectionTimeout(timeout);
309     }
310
311     public long getConnectionCount() {
312         return endpoint.getConnectionCount();
313     }
314
315     /**
316      * NO-OP.
317      *
318      * @param threadCount Unused
319      *
320      * @deprecated Will be removed in Tomcat 10.
321      */

322     @Deprecated
323     public void setAcceptorThreadCount(int threadCount) {
324     }
325
326     /**
327      * Always returns 1.
328      *
329      * @return Always 1.
330      *
331      * @deprecated Will be removed in Tomcat 10.
332      */

333     @Deprecated
334     public int getAcceptorThreadCount() {
335       return 1;
336     }
337
338     public void setAcceptorThreadPriority(int threadPriority) {
339         endpoint.setAcceptorThreadPriority(threadPriority);
340     }
341     public int getAcceptorThreadPriority() {
342       return endpoint.getAcceptorThreadPriority();
343     }
344
345
346     // ---------------------------------------------------------- Public methods
347
348     public synchronized int getNameIndex() {
349         if (nameIndex == 0) {
350             nameIndex = nameCounter.incrementAndGet();
351         }
352
353         return nameIndex;
354     }
355
356
357     /**
358      * The name will be prefix-address-port if address is non-null and
359      * prefix-port if the address is null.
360      *
361      * @return A name for this protocol instance that is appropriately quoted
362      *         for use in an ObjectName.
363      */

364     public String getName() {
365         return ObjectName.quote(getNameInternal());
366     }
367
368
369     private String getNameInternal() {
370         StringBuilder name = new StringBuilder(getNamePrefix());
371         name.append('-');
372         if (getAddress() != null) {
373             name.append(getAddress().getHostAddress());
374             name.append('-');
375         }
376         int port = getPortWithOffset();
377         if (port == 0) {
378             // Auto binding is in use. Check if port is known
379             name.append("auto-");
380             name.append(getNameIndex());
381             port = getLocalPort();
382             if (port != -1) {
383                 name.append('-');
384                 name.append(port);
385             }
386         } else {
387             name.append(port);
388         }
389         return name.toString();
390     }
391
392
393     public void addWaitingProcessor(Processor processor) {
394         if (getLog().isDebugEnabled()) {
395             getLog().debug(sm.getString("abstractProtocol.waitingProcessor.add", processor));
396         }
397         waitingProcessors.add(processor);
398     }
399
400
401     public void removeWaitingProcessor(Processor processor) {
402         if (getLog().isDebugEnabled()) {
403             getLog().debug(sm.getString("abstractProtocol.waitingProcessor.remove", processor));
404         }
405         waitingProcessors.remove(processor);
406     }
407
408
409     // ----------------------------------------------- Accessors for sub-classes
410
411     protected AbstractEndpoint<S,?> getEndpoint() {
412         return endpoint;
413     }
414
415
416     protected Handler<S> getHandler() {
417         return handler;
418     }
419
420     protected void setHandler(Handler<S> handler) {
421         this.handler = handler;
422     }
423
424
425     // -------------------------------------------------------- Abstract methods
426
427     /**
428      * Concrete implementations need to provide access to their logger to be
429      * used by the abstract classes.
430      * @return the logger
431      */

432     protected abstract Log getLog();
433
434
435     /**
436      * Obtain the prefix to be used when construction a name for this protocol
437      * handler. The name will be prefix-address-port.
438      * @return the prefix
439      */

440     protected abstract String getNamePrefix();
441
442
443     /**
444      * Obtain the name of the protocol, (Http, Ajp, etc.). Used with JMX.
445      * @return the protocol name
446      */

447     protected abstract String getProtocolName();
448
449
450     /**
451      * Find a suitable handler for the protocol negotiated
452      * at the network layer.
453      * @param name The name of the requested negotiated protocol.
454      * @return The instance where {@link UpgradeProtocol#getAlpnName()} matches
455      *         the requested protocol
456      */

457     protected abstract UpgradeProtocol getNegotiatedProtocol(String name);
458
459
460     /**
461      * Find a suitable handler for the protocol upgraded name specified. This
462      * is used for direct connection protocol selection.
463      * @param name The name of the requested negotiated protocol.
464      * @return The instance where {@link UpgradeProtocol#getAlpnName()} matches
465      *         the requested protocol
466      */

467     protected abstract UpgradeProtocol getUpgradeProtocol(String name);
468
469
470     /**
471      * Create and configure a new Processor instance for the current protocol
472      * implementation.
473      *
474      * @return A fully configured Processor instance that is ready to use
475      */

476     protected abstract Processor createProcessor();
477
478
479     protected abstract Processor createUpgradeProcessor(
480             SocketWrapperBase<?> socket,
481             UpgradeToken upgradeToken);
482
483
484     // ----------------------------------------------------- JMX related methods
485
486     protected String domain;
487     protected ObjectName oname;
488     protected MBeanServer mserver;
489
490     public ObjectName getObjectName() {
491         return oname;
492     }
493
494     public String getDomain() {
495         return domain;
496     }
497
498     @Override
499     public ObjectName preRegister(MBeanServer server, ObjectName name)
500             throws Exception {
501         oname = name;
502         mserver = server;
503         domain = name.getDomain();
504         return name;
505     }
506
507     @Override
508     public void postRegister(Boolean registrationDone) {
509         // NOOP
510     }
511
512     @Override
513     public void preDeregister() throws Exception {
514         // NOOP
515     }
516
517     @Override
518     public void postDeregister() {
519         // NOOP
520     }
521
522     private ObjectName createObjectName() throws MalformedObjectNameException {
523         // Use the same domain as the connector
524         domain = getAdapter().getDomain();
525
526         if (domain == null) {
527             return null;
528         }
529
530         StringBuilder name = new StringBuilder(getDomain());
531         name.append(":type=ProtocolHandler,port=");
532         int port = getPortWithOffset();
533         if (port > 0) {
534             name.append(port);
535         } else {
536             name.append("auto-");
537             name.append(getNameIndex());
538         }
539         InetAddress address = getAddress();
540         if (address != null) {
541             name.append(",address=");
542             name.append(ObjectName.quote(address.getHostAddress()));
543         }
544         return new ObjectName(name.toString());
545     }
546
547
548     // ------------------------------------------------------- Lifecycle methods
549
550     /*
551      * NOTE: There is no maintenance of state or checking for valid transitions
552      * within this class. It is expected that the connector will maintain state
553      * and prevent invalid state transitions.
554      */

555
556     @Override
557     public void init() throws Exception {
558         if (getLog().isInfoEnabled()) {
559             getLog().info(sm.getString("abstractProtocolHandler.init", getName()));
560             logPortOffset();
561         }
562
563         if (oname == null) {
564             // Component not pre-registered so register it
565             oname = createObjectName();
566             if (oname != null) {
567                 Registry.getRegistry(nullnull).registerComponent(this, oname, null);
568             }
569         }
570
571         if (this.domain != null) {
572             rgOname = new ObjectName(domain + ":type=GlobalRequestProcessor,name=" + getName());
573             Registry.getRegistry(nullnull).registerComponent(
574                     getHandler().getGlobal(), rgOname, null);
575         }
576
577         String endpointName = getName();
578         endpoint.setName(endpointName.substring(1, endpointName.length()-1));
579         endpoint.setDomain(domain);
580
581         endpoint.init();
582     }
583
584
585     @Override
586     public void start() throws Exception {
587         if (getLog().isInfoEnabled()) {
588             getLog().info(sm.getString("abstractProtocolHandler.start", getName()));
589             logPortOffset();
590         }
591
592         endpoint.start();
593         monitorFuture = getUtilityExecutor().scheduleWithFixedDelay(
594                 new Runnable() {
595                     @Override
596                     public void run() {
597                         if (!isPaused()) {
598                             startAsyncTimeout();
599                         }
600                     }
601                 }, 0, 60, TimeUnit.SECONDS);
602     }
603
604
605     /**
606      * Note: The name of this method originated with the Servlet 3.0
607      * asynchronous processing but evolved over time to represent a timeout that
608      * is triggered independently of the socket read/write timeouts.
609      */

610     protected void startAsyncTimeout() {
611         if (timeoutFuture == null || (timeoutFuture != null && timeoutFuture.isDone())) {
612             if (timeoutFuture != null && timeoutFuture.isDone()) {
613                 // There was an error executing the scheduled task, get it and log it
614                 try {
615                     timeoutFuture.get();
616                 } catch (InterruptedException | ExecutionException e) {
617                     getLog().error(sm.getString("abstractProtocolHandler.asyncTimeoutError"), e);
618                 }
619             }
620             timeoutFuture = getUtilityExecutor().scheduleAtFixedRate(
621                     new Runnable() {
622                         @Override
623                         public void run() {
624                             long now = System.currentTimeMillis();
625                             for (Processor processor : waitingProcessors) {
626                                 processor.timeoutAsync(now);
627                             }
628                         }
629                     }, 1, 1, TimeUnit.SECONDS);
630         }
631     }
632
633     protected void stopAsyncTimeout() {
634         if (timeoutFuture != null) {
635             timeoutFuture.cancel(false);
636             timeoutFuture = null;
637         }
638     }
639
640     @Override
641     public void pause() throws Exception {
642         if (getLog().isInfoEnabled()) {
643             getLog().info(sm.getString("abstractProtocolHandler.pause", getName()));
644         }
645
646         stopAsyncTimeout();
647         endpoint.pause();
648     }
649
650
651     public boolean isPaused() {
652         return endpoint.isPaused();
653     }
654
655
656     @Override
657     public void resume() throws Exception {
658         if(getLog().isInfoEnabled()) {
659             getLog().info(sm.getString("abstractProtocolHandler.resume", getName()));
660         }
661
662         endpoint.resume();
663         startAsyncTimeout();
664     }
665
666
667     @Override
668     public void stop() throws Exception {
669         if(getLog().isInfoEnabled()) {
670             getLog().info(sm.getString("abstractProtocolHandler.stop", getName()));
671             logPortOffset();
672         }
673
674         if (monitorFuture != null) {
675             monitorFuture.cancel(true);
676             monitorFuture = null;
677         }
678         stopAsyncTimeout();
679         // Timeout any waiting processor
680         for (Processor processor : waitingProcessors) {
681             processor.timeoutAsync(-1);
682         }
683
684         endpoint.stop();
685     }
686
687
688     @Override
689     public void destroy() throws Exception {
690         if(getLog().isInfoEnabled()) {
691             getLog().info(sm.getString("abstractProtocolHandler.destroy", getName()));
692             logPortOffset();
693         }
694
695         try {
696             endpoint.destroy();
697         } finally {
698             if (oname != null) {
699                 if (mserver == null) {
700                     Registry.getRegistry(nullnull).unregisterComponent(oname);
701                 } else {
702                     // Possibly registered with a different MBeanServer
703                     try {
704                         mserver.unregisterMBean(oname);
705                     } catch (MBeanRegistrationException | InstanceNotFoundException e) {
706                         getLog().info(sm.getString("abstractProtocol.mbeanDeregistrationFailed",
707                                 oname, mserver));
708                     }
709                 }
710             }
711
712             if (rgOname != null) {
713                 Registry.getRegistry(nullnull).unregisterComponent(rgOname);
714             }
715         }
716     }
717
718
719     @Override
720     public void closeServerSocketGraceful() {
721         endpoint.closeServerSocketGraceful();
722     }
723
724
725     private void logPortOffset() {
726         if (getPort() != getPortWithOffset()) {
727             getLog().info(sm.getString("abstractProtocolHandler.portOffset", getName(),
728                     String.valueOf(getPort()), String.valueOf(getPortOffset())));
729         }
730     }
731
732
733     // ------------------------------------------- Connection handler base class
734
735     protected static class ConnectionHandler<S> implements AbstractEndpoint.Handler<S> {
736
737         private final AbstractProtocol<S> proto;
738         private final RequestGroupInfo global = new RequestGroupInfo();
739         private final AtomicLong registerCount = new AtomicLong(0);
740         private final RecycledProcessors recycledProcessors = new RecycledProcessors(this);
741
742         public ConnectionHandler(AbstractProtocol<S> proto) {
743             this.proto = proto;
744         }
745
746         protected AbstractProtocol<S> getProtocol() {
747             return proto;
748         }
749
750         protected Log getLog() {
751             return getProtocol().getLog();
752         }
753
754         @Override
755         public Object getGlobal() {
756             return global;
757         }
758
759         @Override
760         public void recycle() {
761             recycledProcessors.clear();
762         }
763
764
765         @Override
766         public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
767             if (getLog().isDebugEnabled()) {
768                 getLog().debug(sm.getString("abstractConnectionHandler.process",
769                         wrapper.getSocket(), status));
770             }
771             if (wrapper == null) {
772                 // Nothing to do. Socket has been closed.
773                 return SocketState.CLOSED;
774             }
775
776             S socket = wrapper.getSocket();
777
778             Processor processor = (Processor) wrapper.getCurrentProcessor();
779             if (getLog().isDebugEnabled()) {
780                 getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet",
781                         processor, socket));
782             }
783
784             // Timeouts are calculated on a dedicated thread and then
785             // dispatched. Because of delays in the dispatch process, the
786             // timeout may no longer be required. Check here and avoid
787             // unnecessary processing.
788             if (SocketEvent.TIMEOUT == status &&
789                     (processor == null ||
790                     !processor.isAsync() && !processor.isUpgrade() ||
791                     processor.isAsync() && !processor.checkAsyncTimeoutGeneration())) {
792                 // This is effectively a NO-OP
793                 return SocketState.OPEN;
794             }
795
796             if (processor != null) {
797                 // Make sure an async timeout doesn't fire
798                 getProtocol().removeWaitingProcessor(processor);
799             } else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) {
800                 // Nothing to do. Endpoint requested a close and there is no
801                 // longer a processor associated with this socket.
802                 return SocketState.CLOSED;
803             }
804
805             ContainerThreadMarker.set();
806
807             try {
808                 if (processor == null) {
809                     String negotiatedProtocol = wrapper.getNegotiatedProtocol();
810                     // OpenSSL typically returns null whereas JSSE typically
811                     // returns "" when no protocol is negotiated
812                     if (negotiatedProtocol != null && negotiatedProtocol.length() > 0) {
813                         UpgradeProtocol upgradeProtocol = getProtocol().getNegotiatedProtocol(negotiatedProtocol);
814                         if (upgradeProtocol != null) {
815                             processor = upgradeProtocol.getProcessor(wrapper, getProtocol().getAdapter());
816                             if (getLog().isDebugEnabled()) {
817                                 getLog().debug(sm.getString("abstractConnectionHandler.processorCreate", processor));
818                             }
819                         } else if (negotiatedProtocol.equals("http/1.1")) {
820                             // Explicitly negotiated the default protocol.
821                             // Obtain a processor below.
822                         } else {
823                             // TODO:
824                             // OpenSSL 1.0.2's ALPN callback doesn't support
825                             // failing the handshake with an error if no
826                             // protocol can be negotiated. Therefore, we need to
827                             // fail the connection here. Once this is fixed,
828                             // replace the code below with the commented out
829                             // block.
830                             if (getLog().isDebugEnabled()) {
831                                 getLog().debug(sm.getString("abstractConnectionHandler.negotiatedProcessor.fail",
832                                         negotiatedProtocol));
833                             }
834                             return SocketState.CLOSED;
835                             /*
836                              * To replace the code above once OpenSSL 1.1.0 is
837                              * used.
838                             // Failed to create processor. This is a bug.
839                             throw new IllegalStateException(sm.getString(
840                                     "abstractConnectionHandler.negotiatedProcessor.fail",
841                                     negotiatedProtocol));
842                             */

843                         }
844                     }
845                 }
846                 if (processor == null) {
847                     processor = recycledProcessors.pop();
848                     if (getLog().isDebugEnabled()) {
849                         getLog().debug(sm.getString("abstractConnectionHandler.processorPop", processor));
850                     }
851                 }
852                 if (processor == null) {
853                     processor = getProtocol().createProcessor();
854                     register(processor);
855                     if (getLog().isDebugEnabled()) {
856                         getLog().debug(sm.getString("abstractConnectionHandler.processorCreate", processor));
857                     }
858                 }
859
860                 processor.setSslSupport(
861                         wrapper.getSslSupport(getProtocol().getClientCertProvider()));
862
863                 // Associate the processor with the connection
864                 wrapper.setCurrentProcessor(processor);
865
866                 SocketState state = SocketState.CLOSED;
867                 do {
868                     state = processor.process(wrapper, status);
869
870                     if (state == SocketState.UPGRADING) {
871                         // Get the HTTP upgrade handler
872                         UpgradeToken upgradeToken = processor.getUpgradeToken();
873                         // Retrieve leftover input
874                         ByteBuffer leftOverInput = processor.getLeftoverInput();
875                         if (upgradeToken == null) {
876                             // Assume direct HTTP/2 connection
877                             UpgradeProtocol upgradeProtocol = getProtocol().getUpgradeProtocol("h2c");
878                             if (upgradeProtocol != null) {
879                                 processor = upgradeProtocol.getProcessor(
880                                         wrapper, getProtocol().getAdapter());
881                                 wrapper.unRead(leftOverInput);
882                                 // Associate with the processor with the connection
883                                 wrapper.setCurrentProcessor(processor);
884                             } else {
885                                 if (getLog().isDebugEnabled()) {
886                                     getLog().debug(sm.getString(
887                                         "abstractConnectionHandler.negotiatedProcessor.fail",
888                                         "h2c"));
889                                 }
890                                 return SocketState.CLOSED;
891                             }
892                         } else {
893                             HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
894                             // Release the Http11 processor to be re-used
895                             release(processor);
896                             // Create the upgrade processor
897                             processor = getProtocol().createUpgradeProcessor(wrapper, upgradeToken);
898                             if (getLog().isDebugEnabled()) {
899                                 getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate",
900                                         processor, wrapper));
901                             }
902                             wrapper.unRead(leftOverInput);
903                             // Mark the connection as upgraded
904                             wrapper.setUpgraded(true);
905                             // Associate with the processor with the connection
906                             wrapper.setCurrentProcessor(processor);
907                             // Initialise the upgrade handler (which may trigger
908                             // some IO using the new protocol which is why the lines
909                             // above are necessary)
910                             // This cast should be safe. If it fails the error
911                             // handling for the surrounding try/catch will deal with
912                             // it.
913                             if (upgradeToken.getInstanceManager() == null) {
914                                 httpUpgradeHandler.init((WebConnection) processor);
915                             } else {
916                                 ClassLoader oldCL = upgradeToken.getContextBind().bind(falsenull);
917                                 try {
918                                     httpUpgradeHandler.init((WebConnection) processor);
919                                 } finally {
920                                     upgradeToken.getContextBind().unbind(false, oldCL);
921                                 }
922                             }
923                             if (httpUpgradeHandler instanceof InternalHttpUpgradeHandler) {
924                                 if (((InternalHttpUpgradeHandler) httpUpgradeHandler).hasAsyncIO()) {
925                                     // The handler will initiate all further I/O
926                                     state = SocketState.LONG;
927                                 }
928                             }
929                         }
930                     }
931                 } while ( state == SocketState.UPGRADING);
932
933                 if (state == SocketState.LONG) {
934                     // In the middle of processing a request/response. Keep the
935                     // socket associated with the processor. Exact requirements
936                     // depend on type of long poll
937                     longPoll(wrapper, processor);
938                     if (processor.isAsync()) {
939                         getProtocol().addWaitingProcessor(processor);
940                     }
941                 } else if (state == SocketState.OPEN) {
942                     // In keep-alive but between requests. OK to recycle
943                     // processor. Continue to poll for the next request.
944                     wrapper.setCurrentProcessor(null);
945                     release(processor);
946                     wrapper.registerReadInterest();
947                 } else if (state == SocketState.SENDFILE) {
948                     // Sendfile in progress. If it fails, the socket will be
949                     // closed. If it works, the socket either be added to the
950                     // poller (or equivalent) to await more data or processed
951                     // if there are any pipe-lined requests remaining.
952                 } else if (state == SocketState.UPGRADED) {
953                     // Don't add sockets back to the poller if this was a
954                     // non-blocking write otherwise the poller may trigger
955                     // multiple read events which may lead to thread starvation
956                     // in the connector. The write() method will add this socket
957                     // to the poller if necessary.
958                     if (status != SocketEvent.OPEN_WRITE) {
959                         longPoll(wrapper, processor);
960                         getProtocol().addWaitingProcessor(processor);
961                     }
962                 } else if (state == SocketState.SUSPENDED) {
963                     // Don't add sockets back to the poller.
964                     // The resumeProcessing() method will add this socket
965                     // to the poller.
966                 } else {
967                     // Connection closed. OK to recycle the processor.
968                     // Processors handling upgrades require additional clean-up
969                     // before release.
970                     wrapper.setCurrentProcessor(null);
971                     if (processor.isUpgrade()) {
972                         UpgradeToken upgradeToken = processor.getUpgradeToken();
973                         HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
974                         InstanceManager instanceManager = upgradeToken.getInstanceManager();
975                         if (instanceManager == null) {
976                             httpUpgradeHandler.destroy();
977                         } else {
978                             ClassLoader oldCL = upgradeToken.getContextBind().bind(falsenull);
979                             try {
980                                 httpUpgradeHandler.destroy();
981                             } finally {
982                                 try {
983                                     instanceManager.destroyInstance(httpUpgradeHandler);
984                                 } catch (Throwable e) {
985                                     ExceptionUtils.handleThrowable(e);
986                                     getLog().error(sm.getString("abstractConnectionHandler.error"), e);
987                                 }
988                                 upgradeToken.getContextBind().unbind(false, oldCL);
989                             }
990                         }
991                     }
992                     release(processor);
993                 }
994                 return state;
995             } catch(java.net.SocketException e) {
996                 // SocketExceptions are normal
997                 getLog().debug(sm.getString(
998                         "abstractConnectionHandler.socketexception.debug"), e);
999             } catch (java.io.IOException e) {
1000                 // IOExceptions are normal
1001                 getLog().debug(sm.getString(
1002                         "abstractConnectionHandler.ioexception.debug"), e);
1003             } catch (ProtocolException e) {
1004                 // Protocol exceptions normally mean the client sent invalid or
1005                 // incomplete data.
1006                 getLog().debug(sm.getString(
1007                         "abstractConnectionHandler.protocolexception.debug"), e);
1008             }
1009             // Future developers: if you discover any other
1010             // rare-but-nonfatal exceptions, catch them here, and log as
1011             // above.
1012             catch (OutOfMemoryError oome) {
1013                 // Try and handle this here to give Tomcat a chance to close the
1014                 // connection and prevent clients waiting until they time out.
1015                 // Worst case, it isn't recoverable and the attempt at logging
1016                 // will trigger another OOME.
1017                 getLog().error(sm.getString("abstractConnectionHandler.oome"), oome);
1018             } catch (Throwable e) {
1019                 ExceptionUtils.handleThrowable(e);
1020                 // any other exception or error is odd. Here we log it
1021                 // with "ERROR" level, so it will show up even on
1022                 // less-than-verbose logs.
1023                 getLog().error(sm.getString("abstractConnectionHandler.error"), e);
1024             } finally {
1025                 ContainerThreadMarker.clear();
1026             }
1027
1028             // Make sure socket/processor is removed from the list of current
1029             // connections
1030             wrapper.setCurrentProcessor(null);
1031             release(processor);
1032             return SocketState.CLOSED;
1033         }
1034
1035
1036         protected void longPoll(SocketWrapperBase<?> socket, Processor processor) {
1037             if (!processor.isAsync()) {
1038                 // This is currently only used with HTTP
1039                 // Either:
1040                 //  - this is an upgraded connection
1041                 //  - the request line/headers have not been completely
1042                 //    read
1043                 socket.registerReadInterest();
1044             }
1045         }
1046
1047
1048         @Override
1049         public Set<S> getOpenSockets() {
1050             Set<SocketWrapperBase<S>> set = proto.getEndpoint().getConnections();
1051             Set<S> result = new HashSet<>();
1052             for (SocketWrapperBase<S> socketWrapper : set) {
1053                 S socket = socketWrapper.getSocket();
1054                 if (socket != null) {
1055                     result.add(socket);
1056                 }
1057             }
1058             return result;
1059         }
1060
1061
1062         /**
1063          * Expected to be used by the handler once the processor is no longer
1064          * required.
1065          *
1066          * @param processor Processor being released (that was associated with
1067          *                  the socket)
1068          */

1069         private void release(Processor processor) {
1070             if (processor != null) {
1071                 processor.recycle();
1072                 if (processor.isUpgrade()) {
1073                     // UpgradeProcessorInternal instances can utilise AsyncIO.
1074                     // If they do, the processor will not pass through the
1075                     // process() method and be removed from waitingProcessors
1076                     // so do that here.
1077                     if (processor instanceof UpgradeProcessorInternal) {
1078                         if (((UpgradeProcessorInternal) processor).hasAsyncIO()) {
1079                             getProtocol().removeWaitingProcessor(processor);
1080                         }
1081                     }
1082                 } else {
1083                     // After recycling, only instances of UpgradeProcessorBase
1084                     // will return true for isUpgrade().
1085                     // Instances of UpgradeProcessorBase should not be added to
1086                     // recycledProcessors since that pool is only for AJP or
1087                     // HTTP processors
1088                     recycledProcessors.push(processor);
1089                     if (getLog().isDebugEnabled()) {
1090                         getLog().debug("Pushed Processor [" + processor + "]");
1091                     }
1092                 }
1093             }
1094         }
1095
1096
1097         /**
1098          * Expected to be used by the Endpoint to release resources on socket
1099          * close, errors etc.
1100          */

1101         @Override
1102         public void release(SocketWrapperBase<S> socketWrapper) {
1103             Processor processor = (Processor) socketWrapper.getCurrentProcessor();
1104             socketWrapper.setCurrentProcessor(null);
1105             release(processor);
1106         }
1107
1108
1109         protected void register(Processor processor) {
1110             if (getProtocol().getDomain() != null) {
1111                 synchronized (this) {
1112                     try {
1113                         long count = registerCount.incrementAndGet();
1114                         RequestInfo rp =
1115                             processor.getRequest().getRequestProcessor();
1116                         rp.setGlobalProcessor(global);
1117                         ObjectName rpName = new ObjectName(
1118                                 getProtocol().getDomain() +
1119                                 ":type=RequestProcessor,worker="
1120                                 + getProtocol().getName() +
1121                                 ",name=" + getProtocol().getProtocolName() +
1122                                 "Request" + count);
1123                         if (getLog().isDebugEnabled()) {
1124                             getLog().debug("Register [" + processor + "] as [" + rpName + "]");
1125                         }
1126                         Registry.getRegistry(nullnull).registerComponent(rp,
1127                                 rpName, null);
1128                         rp.setRpName(rpName);
1129                     } catch (Exception e) {
1130                         getLog().warn(sm.getString("abstractProtocol.processorRegisterError"), e);
1131                     }
1132                 }
1133             }
1134         }
1135
1136         protected void unregister(Processor processor) {
1137             if (getProtocol().getDomain() != null) {
1138                 synchronized (this) {
1139                     try {
1140                         Request r = processor.getRequest();
1141                         if (r == null) {
1142                             // Probably an UpgradeProcessor
1143                             return;
1144                         }
1145                         RequestInfo rp = r.getRequestProcessor();
1146                         rp.setGlobalProcessor(null);
1147                         ObjectName rpName = rp.getRpName();
1148                         if (getLog().isDebugEnabled()) {
1149                             getLog().debug("Unregister [" + rpName + "]");
1150                         }
1151                         Registry.getRegistry(nullnull).unregisterComponent(
1152                                 rpName);
1153                         rp.setRpName(null);
1154                     } catch (Exception e) {
1155                         getLog().warn(sm.getString("abstractProtocol.processorUnregisterError"), e);
1156                     }
1157                 }
1158             }
1159         }
1160
1161         @Override
1162         public final void pause() {
1163             /*
1164              * Inform all the processors associated with current connections
1165              * that the endpoint is being paused. Most won't care. Those
1166              * processing multiplexed streams may wish to take action. For
1167              * example, HTTP/2 may wish to stop accepting new streams.
1168              *
1169              * Note that even if the endpoint is resumed, there is (currently)
1170              * no API to inform the Processors of this.
1171              */

1172             for (SocketWrapperBase<S> wrapper : proto.getEndpoint().getConnections()) {
1173                 Processor processor = (Processor) wrapper.getCurrentProcessor();
1174                 if (processor != null) {
1175                     processor.pause();
1176                 }
1177             }
1178         }
1179     }
1180
1181     protected static class RecycledProcessors extends SynchronizedStack<Processor> {
1182
1183         private final transient ConnectionHandler<?> handler;
1184         protected final AtomicInteger size = new AtomicInteger(0);
1185
1186         public RecycledProcessors(ConnectionHandler<?> handler) {
1187             this.handler = handler;
1188         }
1189
1190         @SuppressWarnings("sync-override"// Size may exceed cache size a bit
1191         @Override
1192         public boolean push(Processor processor) {
1193             int cacheSize = handler.getProtocol().getProcessorCache();
1194             boolean offer = cacheSize == -1 ? true : size.get() < cacheSize;
1195             //avoid over growing our cache or add after we have stopped
1196             boolean result = false;
1197             if (offer) {
1198                 result = super.push(processor);
1199                 if (result) {
1200                     size.incrementAndGet();
1201                 }
1202             }
1203             if (!result) handler.unregister(processor);
1204             return result;
1205         }
1206
1207         @SuppressWarnings("sync-override"// OK if size is too big briefly
1208         @Override
1209         public Processor pop() {
1210             Processor result = super.pop();
1211             if (result != null) {
1212                 size.decrementAndGet();
1213             }
1214             return result;
1215         }
1216
1217         @Override
1218         public synchronized void clear() {
1219             Processor next = pop();
1220             while (next != null) {
1221                 handler.unregister(next);
1222                 next = pop();
1223             }
1224             super.clear();
1225             size.set(0);
1226         }
1227     }
1228
1229 }
1230