1
17 package org.apache.coyote;
18
19 import java.net.InetAddress;
20 import java.nio.ByteBuffer;
21 import java.util.Collections;
22 import java.util.HashSet;
23 import java.util.Set;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.Executor;
27 import java.util.concurrent.ScheduledExecutorService;
28 import java.util.concurrent.ScheduledFuture;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 import javax.management.InstanceNotFoundException;
34 import javax.management.MBeanRegistration;
35 import javax.management.MBeanRegistrationException;
36 import javax.management.MBeanServer;
37 import javax.management.MalformedObjectNameException;
38 import javax.management.ObjectName;
39 import javax.servlet.http.HttpUpgradeHandler;
40 import javax.servlet.http.WebConnection;
41
42 import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler;
43 import org.apache.coyote.http11.upgrade.UpgradeProcessorInternal;
44 import org.apache.juli.logging.Log;
45 import org.apache.tomcat.InstanceManager;
46 import org.apache.tomcat.util.ExceptionUtils;
47 import org.apache.tomcat.util.collections.SynchronizedStack;
48 import org.apache.tomcat.util.modeler.Registry;
49 import org.apache.tomcat.util.net.AbstractEndpoint;
50 import org.apache.tomcat.util.net.AbstractEndpoint.Handler;
51 import org.apache.tomcat.util.net.SocketEvent;
52 import org.apache.tomcat.util.net.SocketWrapperBase;
53 import org.apache.tomcat.util.res.StringManager;
54
55 public abstract class AbstractProtocol<S> implements ProtocolHandler,
56 MBeanRegistration {
57
58
61 private static final StringManager sm = StringManager.getManager(AbstractProtocol.class);
62
63
64
68 private static final AtomicInteger nameCounter = new AtomicInteger(0);
69
70
71
74 protected ObjectName rgOname = null;
75
76
77
82 private int nameIndex = 0;
83
84
85
90 private final AbstractEndpoint<S,?> endpoint;
91
92
93 private Handler<S> handler;
94
95
96 private final Set<Processor> waitingProcessors =
97 Collections.newSetFromMap(new ConcurrentHashMap<Processor, Boolean>());
98
99
102 private ScheduledFuture<?> timeoutFuture = null;
103 private ScheduledFuture<?> monitorFuture;
104
105 public AbstractProtocol(AbstractEndpoint<S,?> endpoint) {
106 this.endpoint = endpoint;
107 setConnectionLinger(Constants.DEFAULT_CONNECTION_LINGER);
108 setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
109 }
110
111
112
113
114
127 public boolean setProperty(String name, String value) {
128 return endpoint.setProperty(name, value);
129 }
130
131
132
140 public String getProperty(String name) {
141 return endpoint.getProperty(name);
142 }
143
144
145
146
147
151 protected Adapter adapter;
152 @Override
153 public void setAdapter(Adapter adapter) { this.adapter = adapter; }
154 @Override
155 public Adapter getAdapter() { return adapter; }
156
157
158
165 protected int processorCache = 200;
166 public int getProcessorCache() { return this.processorCache; }
167 public void setProcessorCache(int processorCache) {
168 this.processorCache = processorCache;
169 }
170
171
172 private String clientCertProvider = null;
173
184 public String getClientCertProvider() { return clientCertProvider; }
185 public void setClientCertProvider(String s) { this.clientCertProvider = s; }
186
187
188 private int maxHeaderCount = 100;
189 public int getMaxHeaderCount() {
190 return maxHeaderCount;
191 }
192 public void setMaxHeaderCount(int maxHeaderCount) {
193 this.maxHeaderCount = maxHeaderCount;
194 }
195
196
197 @Override
198 public boolean isAprRequired() {
199 return false;
200 }
201
202
203 @Override
204 public boolean isSendfileSupported() {
205 return endpoint.getUseSendfile();
206 }
207
208
209
210
211 @Override
212 public Executor getExecutor() { return endpoint.getExecutor(); }
213 @Override
214 public void setExecutor(Executor executor) {
215 endpoint.setExecutor(executor);
216 }
217
218
219 @Override
220 public ScheduledExecutorService getUtilityExecutor() { return endpoint.getUtilityExecutor(); }
221 @Override
222 public void setUtilityExecutor(ScheduledExecutorService utilityExecutor) {
223 endpoint.setUtilityExecutor(utilityExecutor);
224 }
225
226
227 public int getMaxThreads() { return endpoint.getMaxThreads(); }
228 public void setMaxThreads(int maxThreads) {
229 endpoint.setMaxThreads(maxThreads);
230 }
231
232 public int getMaxConnections() { return endpoint.getMaxConnections(); }
233 public void setMaxConnections(int maxConnections) {
234 endpoint.setMaxConnections(maxConnections);
235 }
236
237
238 public int getMinSpareThreads() { return endpoint.getMinSpareThreads(); }
239 public void setMinSpareThreads(int minSpareThreads) {
240 endpoint.setMinSpareThreads(minSpareThreads);
241 }
242
243
244 public int getThreadPriority() { return endpoint.getThreadPriority(); }
245 public void setThreadPriority(int threadPriority) {
246 endpoint.setThreadPriority(threadPriority);
247 }
248
249
250 public int getAcceptCount() { return endpoint.getAcceptCount(); }
251 public void setAcceptCount(int acceptCount) { endpoint.setAcceptCount(acceptCount); }
252
253
254 public boolean getTcpNoDelay() { return endpoint.getTcpNoDelay(); }
255 public void setTcpNoDelay(boolean tcpNoDelay) {
256 endpoint.setTcpNoDelay(tcpNoDelay);
257 }
258
259
260 public int getConnectionLinger() { return endpoint.getConnectionLinger(); }
261 public void setConnectionLinger(int connectionLinger) {
262 endpoint.setConnectionLinger(connectionLinger);
263 }
264
265
266
272 public int getKeepAliveTimeout() { return endpoint.getKeepAliveTimeout(); }
273 public void setKeepAliveTimeout(int keepAliveTimeout) {
274 endpoint.setKeepAliveTimeout(keepAliveTimeout);
275 }
276
277 public InetAddress getAddress() { return endpoint.getAddress(); }
278 public void setAddress(InetAddress ia) {
279 endpoint.setAddress(ia);
280 }
281
282
283 public int getPort() { return endpoint.getPort(); }
284 public void setPort(int port) {
285 endpoint.setPort(port);
286 }
287
288
289 public int getPortOffset() { return endpoint.getPortOffset(); }
290 public void setPortOffset(int portOffset) {
291 endpoint.setPortOffset(portOffset);
292 }
293
294
295 public int getPortWithOffset() { return endpoint.getPortWithOffset(); }
296
297
298 public int getLocalPort() { return endpoint.getLocalPort(); }
299
300
304 public int getConnectionTimeout() {
305 return endpoint.getConnectionTimeout();
306 }
307 public void setConnectionTimeout(int timeout) {
308 endpoint.setConnectionTimeout(timeout);
309 }
310
311 public long getConnectionCount() {
312 return endpoint.getConnectionCount();
313 }
314
315
322 @Deprecated
323 public void setAcceptorThreadCount(int threadCount) {
324 }
325
326
333 @Deprecated
334 public int getAcceptorThreadCount() {
335 return 1;
336 }
337
338 public void setAcceptorThreadPriority(int threadPriority) {
339 endpoint.setAcceptorThreadPriority(threadPriority);
340 }
341 public int getAcceptorThreadPriority() {
342 return endpoint.getAcceptorThreadPriority();
343 }
344
345
346
347
348 public synchronized int getNameIndex() {
349 if (nameIndex == 0) {
350 nameIndex = nameCounter.incrementAndGet();
351 }
352
353 return nameIndex;
354 }
355
356
357
364 public String getName() {
365 return ObjectName.quote(getNameInternal());
366 }
367
368
369 private String getNameInternal() {
370 StringBuilder name = new StringBuilder(getNamePrefix());
371 name.append('-');
372 if (getAddress() != null) {
373 name.append(getAddress().getHostAddress());
374 name.append('-');
375 }
376 int port = getPortWithOffset();
377 if (port == 0) {
378
379 name.append("auto-");
380 name.append(getNameIndex());
381 port = getLocalPort();
382 if (port != -1) {
383 name.append('-');
384 name.append(port);
385 }
386 } else {
387 name.append(port);
388 }
389 return name.toString();
390 }
391
392
393 public void addWaitingProcessor(Processor processor) {
394 if (getLog().isDebugEnabled()) {
395 getLog().debug(sm.getString("abstractProtocol.waitingProcessor.add", processor));
396 }
397 waitingProcessors.add(processor);
398 }
399
400
401 public void removeWaitingProcessor(Processor processor) {
402 if (getLog().isDebugEnabled()) {
403 getLog().debug(sm.getString("abstractProtocol.waitingProcessor.remove", processor));
404 }
405 waitingProcessors.remove(processor);
406 }
407
408
409
410
411 protected AbstractEndpoint<S,?> getEndpoint() {
412 return endpoint;
413 }
414
415
416 protected Handler<S> getHandler() {
417 return handler;
418 }
419
420 protected void setHandler(Handler<S> handler) {
421 this.handler = handler;
422 }
423
424
425
426
427
432 protected abstract Log getLog();
433
434
435
440 protected abstract String getNamePrefix();
441
442
443
447 protected abstract String getProtocolName();
448
449
450
457 protected abstract UpgradeProtocol getNegotiatedProtocol(String name);
458
459
460
467 protected abstract UpgradeProtocol getUpgradeProtocol(String name);
468
469
470
476 protected abstract Processor createProcessor();
477
478
479 protected abstract Processor createUpgradeProcessor(
480 SocketWrapperBase<?> socket,
481 UpgradeToken upgradeToken);
482
483
484
485
486 protected String domain;
487 protected ObjectName oname;
488 protected MBeanServer mserver;
489
490 public ObjectName getObjectName() {
491 return oname;
492 }
493
494 public String getDomain() {
495 return domain;
496 }
497
498 @Override
499 public ObjectName preRegister(MBeanServer server, ObjectName name)
500 throws Exception {
501 oname = name;
502 mserver = server;
503 domain = name.getDomain();
504 return name;
505 }
506
507 @Override
508 public void postRegister(Boolean registrationDone) {
509
510 }
511
512 @Override
513 public void preDeregister() throws Exception {
514
515 }
516
517 @Override
518 public void postDeregister() {
519
520 }
521
522 private ObjectName createObjectName() throws MalformedObjectNameException {
523
524 domain = getAdapter().getDomain();
525
526 if (domain == null) {
527 return null;
528 }
529
530 StringBuilder name = new StringBuilder(getDomain());
531 name.append(":type=ProtocolHandler,port=");
532 int port = getPortWithOffset();
533 if (port > 0) {
534 name.append(port);
535 } else {
536 name.append("auto-");
537 name.append(getNameIndex());
538 }
539 InetAddress address = getAddress();
540 if (address != null) {
541 name.append(",address=");
542 name.append(ObjectName.quote(address.getHostAddress()));
543 }
544 return new ObjectName(name.toString());
545 }
546
547
548
549
550
555
556 @Override
557 public void init() throws Exception {
558 if (getLog().isInfoEnabled()) {
559 getLog().info(sm.getString("abstractProtocolHandler.init", getName()));
560 logPortOffset();
561 }
562
563 if (oname == null) {
564
565 oname = createObjectName();
566 if (oname != null) {
567 Registry.getRegistry(null, null).registerComponent(this, oname, null);
568 }
569 }
570
571 if (this.domain != null) {
572 rgOname = new ObjectName(domain + ":type=GlobalRequestProcessor,name=" + getName());
573 Registry.getRegistry(null, null).registerComponent(
574 getHandler().getGlobal(), rgOname, null);
575 }
576
577 String endpointName = getName();
578 endpoint.setName(endpointName.substring(1, endpointName.length()-1));
579 endpoint.setDomain(domain);
580
581 endpoint.init();
582 }
583
584
585 @Override
586 public void start() throws Exception {
587 if (getLog().isInfoEnabled()) {
588 getLog().info(sm.getString("abstractProtocolHandler.start", getName()));
589 logPortOffset();
590 }
591
592 endpoint.start();
593 monitorFuture = getUtilityExecutor().scheduleWithFixedDelay(
594 new Runnable() {
595 @Override
596 public void run() {
597 if (!isPaused()) {
598 startAsyncTimeout();
599 }
600 }
601 }, 0, 60, TimeUnit.SECONDS);
602 }
603
604
605
610 protected void startAsyncTimeout() {
611 if (timeoutFuture == null || (timeoutFuture != null && timeoutFuture.isDone())) {
612 if (timeoutFuture != null && timeoutFuture.isDone()) {
613
614 try {
615 timeoutFuture.get();
616 } catch (InterruptedException | ExecutionException e) {
617 getLog().error(sm.getString("abstractProtocolHandler.asyncTimeoutError"), e);
618 }
619 }
620 timeoutFuture = getUtilityExecutor().scheduleAtFixedRate(
621 new Runnable() {
622 @Override
623 public void run() {
624 long now = System.currentTimeMillis();
625 for (Processor processor : waitingProcessors) {
626 processor.timeoutAsync(now);
627 }
628 }
629 }, 1, 1, TimeUnit.SECONDS);
630 }
631 }
632
633 protected void stopAsyncTimeout() {
634 if (timeoutFuture != null) {
635 timeoutFuture.cancel(false);
636 timeoutFuture = null;
637 }
638 }
639
640 @Override
641 public void pause() throws Exception {
642 if (getLog().isInfoEnabled()) {
643 getLog().info(sm.getString("abstractProtocolHandler.pause", getName()));
644 }
645
646 stopAsyncTimeout();
647 endpoint.pause();
648 }
649
650
651 public boolean isPaused() {
652 return endpoint.isPaused();
653 }
654
655
656 @Override
657 public void resume() throws Exception {
658 if(getLog().isInfoEnabled()) {
659 getLog().info(sm.getString("abstractProtocolHandler.resume", getName()));
660 }
661
662 endpoint.resume();
663 startAsyncTimeout();
664 }
665
666
667 @Override
668 public void stop() throws Exception {
669 if(getLog().isInfoEnabled()) {
670 getLog().info(sm.getString("abstractProtocolHandler.stop", getName()));
671 logPortOffset();
672 }
673
674 if (monitorFuture != null) {
675 monitorFuture.cancel(true);
676 monitorFuture = null;
677 }
678 stopAsyncTimeout();
679
680 for (Processor processor : waitingProcessors) {
681 processor.timeoutAsync(-1);
682 }
683
684 endpoint.stop();
685 }
686
687
688 @Override
689 public void destroy() throws Exception {
690 if(getLog().isInfoEnabled()) {
691 getLog().info(sm.getString("abstractProtocolHandler.destroy", getName()));
692 logPortOffset();
693 }
694
695 try {
696 endpoint.destroy();
697 } finally {
698 if (oname != null) {
699 if (mserver == null) {
700 Registry.getRegistry(null, null).unregisterComponent(oname);
701 } else {
702
703 try {
704 mserver.unregisterMBean(oname);
705 } catch (MBeanRegistrationException | InstanceNotFoundException e) {
706 getLog().info(sm.getString("abstractProtocol.mbeanDeregistrationFailed",
707 oname, mserver));
708 }
709 }
710 }
711
712 if (rgOname != null) {
713 Registry.getRegistry(null, null).unregisterComponent(rgOname);
714 }
715 }
716 }
717
718
719 @Override
720 public void closeServerSocketGraceful() {
721 endpoint.closeServerSocketGraceful();
722 }
723
724
725 private void logPortOffset() {
726 if (getPort() != getPortWithOffset()) {
727 getLog().info(sm.getString("abstractProtocolHandler.portOffset", getName(),
728 String.valueOf(getPort()), String.valueOf(getPortOffset())));
729 }
730 }
731
732
733
734
735 protected static class ConnectionHandler<S> implements AbstractEndpoint.Handler<S> {
736
737 private final AbstractProtocol<S> proto;
738 private final RequestGroupInfo global = new RequestGroupInfo();
739 private final AtomicLong registerCount = new AtomicLong(0);
740 private final RecycledProcessors recycledProcessors = new RecycledProcessors(this);
741
742 public ConnectionHandler(AbstractProtocol<S> proto) {
743 this.proto = proto;
744 }
745
746 protected AbstractProtocol<S> getProtocol() {
747 return proto;
748 }
749
750 protected Log getLog() {
751 return getProtocol().getLog();
752 }
753
754 @Override
755 public Object getGlobal() {
756 return global;
757 }
758
759 @Override
760 public void recycle() {
761 recycledProcessors.clear();
762 }
763
764
765 @Override
766 public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
767 if (getLog().isDebugEnabled()) {
768 getLog().debug(sm.getString("abstractConnectionHandler.process",
769 wrapper.getSocket(), status));
770 }
771 if (wrapper == null) {
772
773 return SocketState.CLOSED;
774 }
775
776 S socket = wrapper.getSocket();
777
778 Processor processor = (Processor) wrapper.getCurrentProcessor();
779 if (getLog().isDebugEnabled()) {
780 getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet",
781 processor, socket));
782 }
783
784
785
786
787
788 if (SocketEvent.TIMEOUT == status &&
789 (processor == null ||
790 !processor.isAsync() && !processor.isUpgrade() ||
791 processor.isAsync() && !processor.checkAsyncTimeoutGeneration())) {
792
793 return SocketState.OPEN;
794 }
795
796 if (processor != null) {
797
798 getProtocol().removeWaitingProcessor(processor);
799 } else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) {
800
801
802 return SocketState.CLOSED;
803 }
804
805 ContainerThreadMarker.set();
806
807 try {
808 if (processor == null) {
809 String negotiatedProtocol = wrapper.getNegotiatedProtocol();
810
811
812 if (negotiatedProtocol != null && negotiatedProtocol.length() > 0) {
813 UpgradeProtocol upgradeProtocol = getProtocol().getNegotiatedProtocol(negotiatedProtocol);
814 if (upgradeProtocol != null) {
815 processor = upgradeProtocol.getProcessor(wrapper, getProtocol().getAdapter());
816 if (getLog().isDebugEnabled()) {
817 getLog().debug(sm.getString("abstractConnectionHandler.processorCreate", processor));
818 }
819 } else if (negotiatedProtocol.equals("http/1.1")) {
820
821
822 } else {
823
824
825
826
827
828
829
830 if (getLog().isDebugEnabled()) {
831 getLog().debug(sm.getString("abstractConnectionHandler.negotiatedProcessor.fail",
832 negotiatedProtocol));
833 }
834 return SocketState.CLOSED;
835
843 }
844 }
845 }
846 if (processor == null) {
847 processor = recycledProcessors.pop();
848 if (getLog().isDebugEnabled()) {
849 getLog().debug(sm.getString("abstractConnectionHandler.processorPop", processor));
850 }
851 }
852 if (processor == null) {
853 processor = getProtocol().createProcessor();
854 register(processor);
855 if (getLog().isDebugEnabled()) {
856 getLog().debug(sm.getString("abstractConnectionHandler.processorCreate", processor));
857 }
858 }
859
860 processor.setSslSupport(
861 wrapper.getSslSupport(getProtocol().getClientCertProvider()));
862
863
864 wrapper.setCurrentProcessor(processor);
865
866 SocketState state = SocketState.CLOSED;
867 do {
868 state = processor.process(wrapper, status);
869
870 if (state == SocketState.UPGRADING) {
871
872 UpgradeToken upgradeToken = processor.getUpgradeToken();
873
874 ByteBuffer leftOverInput = processor.getLeftoverInput();
875 if (upgradeToken == null) {
876
877 UpgradeProtocol upgradeProtocol = getProtocol().getUpgradeProtocol("h2c");
878 if (upgradeProtocol != null) {
879 processor = upgradeProtocol.getProcessor(
880 wrapper, getProtocol().getAdapter());
881 wrapper.unRead(leftOverInput);
882
883 wrapper.setCurrentProcessor(processor);
884 } else {
885 if (getLog().isDebugEnabled()) {
886 getLog().debug(sm.getString(
887 "abstractConnectionHandler.negotiatedProcessor.fail",
888 "h2c"));
889 }
890 return SocketState.CLOSED;
891 }
892 } else {
893 HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
894
895 release(processor);
896
897 processor = getProtocol().createUpgradeProcessor(wrapper, upgradeToken);
898 if (getLog().isDebugEnabled()) {
899 getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate",
900 processor, wrapper));
901 }
902 wrapper.unRead(leftOverInput);
903
904 wrapper.setUpgraded(true);
905
906 wrapper.setCurrentProcessor(processor);
907
908
909
910
911
912
913 if (upgradeToken.getInstanceManager() == null) {
914 httpUpgradeHandler.init((WebConnection) processor);
915 } else {
916 ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);
917 try {
918 httpUpgradeHandler.init((WebConnection) processor);
919 } finally {
920 upgradeToken.getContextBind().unbind(false, oldCL);
921 }
922 }
923 if (httpUpgradeHandler instanceof InternalHttpUpgradeHandler) {
924 if (((InternalHttpUpgradeHandler) httpUpgradeHandler).hasAsyncIO()) {
925
926 state = SocketState.LONG;
927 }
928 }
929 }
930 }
931 } while ( state == SocketState.UPGRADING);
932
933 if (state == SocketState.LONG) {
934
935
936
937 longPoll(wrapper, processor);
938 if (processor.isAsync()) {
939 getProtocol().addWaitingProcessor(processor);
940 }
941 } else if (state == SocketState.OPEN) {
942
943
944 wrapper.setCurrentProcessor(null);
945 release(processor);
946 wrapper.registerReadInterest();
947 } else if (state == SocketState.SENDFILE) {
948
949
950
951
952 } else if (state == SocketState.UPGRADED) {
953
954
955
956
957
958 if (status != SocketEvent.OPEN_WRITE) {
959 longPoll(wrapper, processor);
960 getProtocol().addWaitingProcessor(processor);
961 }
962 } else if (state == SocketState.SUSPENDED) {
963
964
965
966 } else {
967
968
969
970 wrapper.setCurrentProcessor(null);
971 if (processor.isUpgrade()) {
972 UpgradeToken upgradeToken = processor.getUpgradeToken();
973 HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
974 InstanceManager instanceManager = upgradeToken.getInstanceManager();
975 if (instanceManager == null) {
976 httpUpgradeHandler.destroy();
977 } else {
978 ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);
979 try {
980 httpUpgradeHandler.destroy();
981 } finally {
982 try {
983 instanceManager.destroyInstance(httpUpgradeHandler);
984 } catch (Throwable e) {
985 ExceptionUtils.handleThrowable(e);
986 getLog().error(sm.getString("abstractConnectionHandler.error"), e);
987 }
988 upgradeToken.getContextBind().unbind(false, oldCL);
989 }
990 }
991 }
992 release(processor);
993 }
994 return state;
995 } catch(java.net.SocketException e) {
996
997 getLog().debug(sm.getString(
998 "abstractConnectionHandler.socketexception.debug"), e);
999 } catch (java.io.IOException e) {
1000
1001 getLog().debug(sm.getString(
1002 "abstractConnectionHandler.ioexception.debug"), e);
1003 } catch (ProtocolException e) {
1004
1005
1006 getLog().debug(sm.getString(
1007 "abstractConnectionHandler.protocolexception.debug"), e);
1008 }
1009
1010
1011
1012 catch (OutOfMemoryError oome) {
1013
1014
1015
1016
1017 getLog().error(sm.getString("abstractConnectionHandler.oome"), oome);
1018 } catch (Throwable e) {
1019 ExceptionUtils.handleThrowable(e);
1020
1021
1022
1023 getLog().error(sm.getString("abstractConnectionHandler.error"), e);
1024 } finally {
1025 ContainerThreadMarker.clear();
1026 }
1027
1028
1029
1030 wrapper.setCurrentProcessor(null);
1031 release(processor);
1032 return SocketState.CLOSED;
1033 }
1034
1035
1036 protected void longPoll(SocketWrapperBase<?> socket, Processor processor) {
1037 if (!processor.isAsync()) {
1038
1039
1040
1041
1042
1043 socket.registerReadInterest();
1044 }
1045 }
1046
1047
1048 @Override
1049 public Set<S> getOpenSockets() {
1050 Set<SocketWrapperBase<S>> set = proto.getEndpoint().getConnections();
1051 Set<S> result = new HashSet<>();
1052 for (SocketWrapperBase<S> socketWrapper : set) {
1053 S socket = socketWrapper.getSocket();
1054 if (socket != null) {
1055 result.add(socket);
1056 }
1057 }
1058 return result;
1059 }
1060
1061
1062
1069 private void release(Processor processor) {
1070 if (processor != null) {
1071 processor.recycle();
1072 if (processor.isUpgrade()) {
1073
1074
1075
1076
1077 if (processor instanceof UpgradeProcessorInternal) {
1078 if (((UpgradeProcessorInternal) processor).hasAsyncIO()) {
1079 getProtocol().removeWaitingProcessor(processor);
1080 }
1081 }
1082 } else {
1083
1084
1085
1086
1087
1088 recycledProcessors.push(processor);
1089 if (getLog().isDebugEnabled()) {
1090 getLog().debug("Pushed Processor [" + processor + "]");
1091 }
1092 }
1093 }
1094 }
1095
1096
1097
1101 @Override
1102 public void release(SocketWrapperBase<S> socketWrapper) {
1103 Processor processor = (Processor) socketWrapper.getCurrentProcessor();
1104 socketWrapper.setCurrentProcessor(null);
1105 release(processor);
1106 }
1107
1108
1109 protected void register(Processor processor) {
1110 if (getProtocol().getDomain() != null) {
1111 synchronized (this) {
1112 try {
1113 long count = registerCount.incrementAndGet();
1114 RequestInfo rp =
1115 processor.getRequest().getRequestProcessor();
1116 rp.setGlobalProcessor(global);
1117 ObjectName rpName = new ObjectName(
1118 getProtocol().getDomain() +
1119 ":type=RequestProcessor,worker="
1120 + getProtocol().getName() +
1121 ",name=" + getProtocol().getProtocolName() +
1122 "Request" + count);
1123 if (getLog().isDebugEnabled()) {
1124 getLog().debug("Register [" + processor + "] as [" + rpName + "]");
1125 }
1126 Registry.getRegistry(null, null).registerComponent(rp,
1127 rpName, null);
1128 rp.setRpName(rpName);
1129 } catch (Exception e) {
1130 getLog().warn(sm.getString("abstractProtocol.processorRegisterError"), e);
1131 }
1132 }
1133 }
1134 }
1135
1136 protected void unregister(Processor processor) {
1137 if (getProtocol().getDomain() != null) {
1138 synchronized (this) {
1139 try {
1140 Request r = processor.getRequest();
1141 if (r == null) {
1142
1143 return;
1144 }
1145 RequestInfo rp = r.getRequestProcessor();
1146 rp.setGlobalProcessor(null);
1147 ObjectName rpName = rp.getRpName();
1148 if (getLog().isDebugEnabled()) {
1149 getLog().debug("Unregister [" + rpName + "]");
1150 }
1151 Registry.getRegistry(null, null).unregisterComponent(
1152 rpName);
1153 rp.setRpName(null);
1154 } catch (Exception e) {
1155 getLog().warn(sm.getString("abstractProtocol.processorUnregisterError"), e);
1156 }
1157 }
1158 }
1159 }
1160
1161 @Override
1162 public final void pause() {
1163
1172 for (SocketWrapperBase<S> wrapper : proto.getEndpoint().getConnections()) {
1173 Processor processor = (Processor) wrapper.getCurrentProcessor();
1174 if (processor != null) {
1175 processor.pause();
1176 }
1177 }
1178 }
1179 }
1180
1181 protected static class RecycledProcessors extends SynchronizedStack<Processor> {
1182
1183 private final transient ConnectionHandler<?> handler;
1184 protected final AtomicInteger size = new AtomicInteger(0);
1185
1186 public RecycledProcessors(ConnectionHandler<?> handler) {
1187 this.handler = handler;
1188 }
1189
1190 @SuppressWarnings("sync-override")
1191 @Override
1192 public boolean push(Processor processor) {
1193 int cacheSize = handler.getProtocol().getProcessorCache();
1194 boolean offer = cacheSize == -1 ? true : size.get() < cacheSize;
1195
1196 boolean result = false;
1197 if (offer) {
1198 result = super.push(processor);
1199 if (result) {
1200 size.incrementAndGet();
1201 }
1202 }
1203 if (!result) handler.unregister(processor);
1204 return result;
1205 }
1206
1207 @SuppressWarnings("sync-override")
1208 @Override
1209 public Processor pop() {
1210 Processor result = super.pop();
1211 if (result != null) {
1212 size.decrementAndGet();
1213 }
1214 return result;
1215 }
1216
1217 @Override
1218 public synchronized void clear() {
1219 Processor next = pop();
1220 while (next != null) {
1221 handler.unregister(next);
1222 next = pop();
1223 }
1224 super.clear();
1225 size.set(0);
1226 }
1227 }
1228
1229 }
1230