Source file src/pkg/net/http/transport.go
1
2
3
4
5
6
7
8
9
10 package http
11
12 import (
13 "bufio"
14 "compress/gzip"
15 "container/list"
16 "context"
17 "crypto/tls"
18 "errors"
19 "fmt"
20 "io"
21 "log"
22 "net"
23 "net/http/httptrace"
24 "net/textproto"
25 "net/url"
26 "os"
27 "reflect"
28 "strings"
29 "sync"
30 "sync/atomic"
31 "time"
32
33 "golang.org/x/net/http/httpguts"
34 "golang.org/x/net/http/httpproxy"
35 )
36
37
38
39
40
41
42 var DefaultTransport RoundTripper = &Transport{
43 Proxy: ProxyFromEnvironment,
44 DialContext: (&net.Dialer{
45 Timeout: 30 * time.Second,
46 KeepAlive: 30 * time.Second,
47 DualStack: true,
48 }).DialContext,
49 ForceAttemptHTTP2: true,
50 MaxIdleConns: 100,
51 IdleConnTimeout: 90 * time.Second,
52 TLSHandshakeTimeout: 10 * time.Second,
53 ExpectContinueTimeout: 1 * time.Second,
54 }
55
56
57
58 const DefaultMaxIdleConnsPerHost = 2
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95 type Transport struct {
96 idleMu sync.Mutex
97 closeIdle bool
98 idleConn map[connectMethodKey][]*persistConn
99 idleConnWait map[connectMethodKey]wantConnQueue
100 idleLRU connLRU
101
102 reqMu sync.Mutex
103 reqCanceler map[*Request]func(error)
104
105 altMu sync.Mutex
106 altProto atomic.Value
107
108 connsPerHostMu sync.Mutex
109 connsPerHost map[connectMethodKey]int
110 connsPerHostWait map[connectMethodKey]wantConnQueue
111
112
113
114
115
116
117
118
119
120
121 Proxy func(*Request) (*url.URL, error)
122
123
124
125
126
127
128
129
130
131 DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
132
133
134
135
136
137
138
139
140
141
142
143 Dial func(network, addr string) (net.Conn, error)
144
145
146
147
148
149
150
151
152
153
154 DialTLS func(network, addr string) (net.Conn, error)
155
156
157
158
159
160 TLSClientConfig *tls.Config
161
162
163
164 TLSHandshakeTimeout time.Duration
165
166
167
168
169
170
171 DisableKeepAlives bool
172
173
174
175
176
177
178
179
180
181 DisableCompression bool
182
183
184
185 MaxIdleConns int
186
187
188
189
190 MaxIdleConnsPerHost int
191
192
193
194
195
196
197 MaxConnsPerHost int
198
199
200
201
202
203 IdleConnTimeout time.Duration
204
205
206
207
208
209 ResponseHeaderTimeout time.Duration
210
211
212
213
214
215
216
217
218 ExpectContinueTimeout time.Duration
219
220
221
222
223
224
225
226
227
228
229
230 TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper
231
232
233
234 ProxyConnectHeader Header
235
236
237
238
239
240
241 MaxResponseHeaderBytes int64
242
243
244
245
246 WriteBufferSize int
247
248
249
250
251 ReadBufferSize int
252
253
254
255 nextProtoOnce sync.Once
256 h2transport h2Transport
257 tlsNextProtoWasNil bool
258
259
260
261
262
263
264 ForceAttemptHTTP2 bool
265 }
266
267 func (t *Transport) writeBufferSize() int {
268 if t.WriteBufferSize > 0 {
269 return t.WriteBufferSize
270 }
271 return 4 << 10
272 }
273
274 func (t *Transport) readBufferSize() int {
275 if t.ReadBufferSize > 0 {
276 return t.ReadBufferSize
277 }
278 return 4 << 10
279 }
280
281
282 func (t *Transport) Clone() *Transport {
283 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
284 t2 := &Transport{
285 Proxy: t.Proxy,
286 DialContext: t.DialContext,
287 Dial: t.Dial,
288 DialTLS: t.DialTLS,
289 TLSClientConfig: t.TLSClientConfig.Clone(),
290 TLSHandshakeTimeout: t.TLSHandshakeTimeout,
291 DisableKeepAlives: t.DisableKeepAlives,
292 DisableCompression: t.DisableCompression,
293 MaxIdleConns: t.MaxIdleConns,
294 MaxIdleConnsPerHost: t.MaxIdleConnsPerHost,
295 MaxConnsPerHost: t.MaxConnsPerHost,
296 IdleConnTimeout: t.IdleConnTimeout,
297 ResponseHeaderTimeout: t.ResponseHeaderTimeout,
298 ExpectContinueTimeout: t.ExpectContinueTimeout,
299 ProxyConnectHeader: t.ProxyConnectHeader.Clone(),
300 MaxResponseHeaderBytes: t.MaxResponseHeaderBytes,
301 ForceAttemptHTTP2: t.ForceAttemptHTTP2,
302 WriteBufferSize: t.WriteBufferSize,
303 ReadBufferSize: t.ReadBufferSize,
304 }
305 if !t.tlsNextProtoWasNil {
306 npm := map[string]func(authority string, c *tls.Conn) RoundTripper{}
307 for k, v := range t.TLSNextProto {
308 npm[k] = v
309 }
310 t2.TLSNextProto = npm
311 }
312 return t2
313 }
314
315
316
317
318
319
320
321 type h2Transport interface {
322 CloseIdleConnections()
323 }
324
325
326
327 func (t *Transport) onceSetNextProtoDefaults() {
328 t.tlsNextProtoWasNil = (t.TLSNextProto == nil)
329 if strings.Contains(os.Getenv("GODEBUG"), "http2client=0") {
330 return
331 }
332
333
334
335
336
337
338 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
339 if rv := reflect.ValueOf(altProto["https"]); rv.IsValid() && rv.Type().Kind() == reflect.Struct && rv.Type().NumField() == 1 {
340 if v := rv.Field(0); v.CanInterface() {
341 if h2i, ok := v.Interface().(h2Transport); ok {
342 t.h2transport = h2i
343 return
344 }
345 }
346 }
347
348 if t.TLSNextProto != nil {
349
350
351 return
352 }
353 if !t.ForceAttemptHTTP2 && (t.TLSClientConfig != nil || t.Dial != nil || t.DialTLS != nil || t.DialContext != nil) {
354
355
356
357
358
359
360 return
361 }
362 t2, err := http2configureTransport(t)
363 if err != nil {
364 log.Printf("Error enabling Transport HTTP/2 support: %v", err)
365 return
366 }
367 t.h2transport = t2
368
369
370
371
372
373
374
375 if limit1 := t.MaxResponseHeaderBytes; limit1 != 0 && t2.MaxHeaderListSize == 0 {
376 const h2max = 1<<32 - 1
377 if limit1 >= h2max {
378 t2.MaxHeaderListSize = h2max
379 } else {
380 t2.MaxHeaderListSize = uint32(limit1)
381 }
382 }
383 }
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401 func ProxyFromEnvironment(req *Request) (*url.URL, error) {
402 return envProxyFunc()(req.URL)
403 }
404
405
406
407 func ProxyURL(fixedURL *url.URL) func(*Request) (*url.URL, error) {
408 return func(*Request) (*url.URL, error) {
409 return fixedURL, nil
410 }
411 }
412
413
414
415
416 type transportRequest struct {
417 *Request
418 extra Header
419 trace *httptrace.ClientTrace
420
421 mu sync.Mutex
422 err error
423 }
424
425 func (tr *transportRequest) extraHeaders() Header {
426 if tr.extra == nil {
427 tr.extra = make(Header)
428 }
429 return tr.extra
430 }
431
432 func (tr *transportRequest) setError(err error) {
433 tr.mu.Lock()
434 if tr.err == nil {
435 tr.err = err
436 }
437 tr.mu.Unlock()
438 }
439
440
441
442 func (t *Transport) useRegisteredProtocol(req *Request) bool {
443 if req.URL.Scheme == "https" && req.requiresHTTP1() {
444
445
446
447
448 return false
449 }
450 return true
451 }
452
453
454 func (t *Transport) roundTrip(req *Request) (*Response, error) {
455 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
456 ctx := req.Context()
457 trace := httptrace.ContextClientTrace(ctx)
458
459 if req.URL == nil {
460 req.closeBody()
461 return nil, errors.New("http: nil Request.URL")
462 }
463 if req.Header == nil {
464 req.closeBody()
465 return nil, errors.New("http: nil Request.Header")
466 }
467 scheme := req.URL.Scheme
468 isHTTP := scheme == "http" || scheme == "https"
469 if isHTTP {
470 for k, vv := range req.Header {
471 if !httpguts.ValidHeaderFieldName(k) {
472 return nil, fmt.Errorf("net/http: invalid header field name %q", k)
473 }
474 for _, v := range vv {
475 if !httpguts.ValidHeaderFieldValue(v) {
476 return nil, fmt.Errorf("net/http: invalid header field value %q for key %v", v, k)
477 }
478 }
479 }
480 }
481
482 if t.useRegisteredProtocol(req) {
483 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
484 if altRT := altProto[scheme]; altRT != nil {
485 if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
486 return resp, err
487 }
488 }
489 }
490 if !isHTTP {
491 req.closeBody()
492 return nil, &badStringError{"unsupported protocol scheme", scheme}
493 }
494 if req.Method != "" && !validMethod(req.Method) {
495 return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
496 }
497 if req.URL.Host == "" {
498 req.closeBody()
499 return nil, errors.New("http: no Host in request URL")
500 }
501
502 for {
503 select {
504 case <-ctx.Done():
505 req.closeBody()
506 return nil, ctx.Err()
507 default:
508 }
509
510
511 treq := &transportRequest{Request: req, trace: trace}
512 cm, err := t.connectMethodForRequest(treq)
513 if err != nil {
514 req.closeBody()
515 return nil, err
516 }
517
518
519
520
521
522 pconn, err := t.getConn(treq, cm)
523 if err != nil {
524 t.setReqCanceler(req, nil)
525 req.closeBody()
526 return nil, err
527 }
528
529 var resp *Response
530 if pconn.alt != nil {
531
532 t.setReqCanceler(req, nil)
533 resp, err = pconn.alt.RoundTrip(req)
534 } else {
535 resp, err = pconn.roundTrip(treq)
536 }
537 if err == nil {
538 return resp, nil
539 }
540
541
542
543 _, isH2DialError := pconn.alt.(http2erringRoundTripper)
544 if http2isNoCachedConnError(err) || isH2DialError {
545 t.removeIdleConn(pconn)
546 t.decConnsPerHost(pconn.cacheKey)
547 }
548 if !pconn.shouldRetryRequest(req, err) {
549
550
551 if e, ok := err.(transportReadFromServerError); ok {
552 err = e.err
553 }
554 return nil, err
555 }
556 testHookRoundTripRetried()
557
558
559 if req.GetBody != nil {
560 newReq := *req
561 var err error
562 newReq.Body, err = req.GetBody()
563 if err != nil {
564 return nil, err
565 }
566 req = &newReq
567 }
568 }
569 }
570
571
572
573
574 func (pc *persistConn) shouldRetryRequest(req *Request, err error) bool {
575 if http2isNoCachedConnError(err) {
576
577
578
579
580
581
582 return true
583 }
584 if err == errMissingHost {
585
586 return false
587 }
588 if !pc.isReused() {
589
590
591
592
593
594
595
596 return false
597 }
598 if _, ok := err.(nothingWrittenError); ok {
599
600
601 return req.outgoingLength() == 0 || req.GetBody != nil
602 }
603 if !req.isReplayable() {
604
605 return false
606 }
607 if _, ok := err.(transportReadFromServerError); ok {
608
609
610 return true
611 }
612 if err == errServerClosedIdle {
613
614
615
616 return true
617 }
618 return false
619 }
620
621
622 var ErrSkipAltProtocol = errors.New("net/http: skip alternate protocol")
623
624
625
626
627
628
629
630
631
632
633
634 func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
635 t.altMu.Lock()
636 defer t.altMu.Unlock()
637 oldMap, _ := t.altProto.Load().(map[string]RoundTripper)
638 if _, exists := oldMap[scheme]; exists {
639 panic("protocol " + scheme + " already registered")
640 }
641 newMap := make(map[string]RoundTripper)
642 for k, v := range oldMap {
643 newMap[k] = v
644 }
645 newMap[scheme] = rt
646 t.altProto.Store(newMap)
647 }
648
649
650
651
652
653 func (t *Transport) CloseIdleConnections() {
654 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
655 t.idleMu.Lock()
656 m := t.idleConn
657 t.idleConn = nil
658 t.closeIdle = true
659 t.idleLRU = connLRU{}
660 t.idleMu.Unlock()
661 for _, conns := range m {
662 for _, pconn := range conns {
663 pconn.close(errCloseIdleConns)
664 }
665 }
666 if t2 := t.h2transport; t2 != nil {
667 t2.CloseIdleConnections()
668 }
669 }
670
671
672
673
674
675
676
677 func (t *Transport) CancelRequest(req *Request) {
678 t.cancelRequest(req, errRequestCanceled)
679 }
680
681
682 func (t *Transport) cancelRequest(req *Request, err error) {
683 t.reqMu.Lock()
684 cancel := t.reqCanceler[req]
685 delete(t.reqCanceler, req)
686 t.reqMu.Unlock()
687 if cancel != nil {
688 cancel(err)
689 }
690 }
691
692
693
694
695
696 var (
697
698 envProxyOnce sync.Once
699 envProxyFuncValue func(*url.URL) (*url.URL, error)
700 )
701
702
703
704
705 func envProxyFunc() func(*url.URL) (*url.URL, error) {
706 envProxyOnce.Do(func() {
707 envProxyFuncValue = httpproxy.FromEnvironment().ProxyFunc()
708 })
709 return envProxyFuncValue
710 }
711
712
713 func resetProxyConfig() {
714 envProxyOnce = sync.Once{}
715 envProxyFuncValue = nil
716 }
717
718 func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod, err error) {
719
720
721 if port := treq.URL.Port(); !validPort(port) {
722 return cm, fmt.Errorf("invalid URL port %q", port)
723 }
724 cm.targetScheme = treq.URL.Scheme
725 cm.targetAddr = canonicalAddr(treq.URL)
726 if t.Proxy != nil {
727 cm.proxyURL, err = t.Proxy(treq.Request)
728 if err == nil && cm.proxyURL != nil {
729 if port := cm.proxyURL.Port(); !validPort(port) {
730 return cm, fmt.Errorf("invalid proxy URL port %q", port)
731 }
732 }
733 }
734 cm.onlyH1 = treq.requiresHTTP1()
735 return cm, err
736 }
737
738
739
740 func (cm *connectMethod) proxyAuth() string {
741 if cm.proxyURL == nil {
742 return ""
743 }
744 if u := cm.proxyURL.User; u != nil {
745 username := u.Username()
746 password, _ := u.Password()
747 return "Basic " + basicAuth(username, password)
748 }
749 return ""
750 }
751
752
753 var (
754 errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled")
755 errConnBroken = errors.New("http: putIdleConn: connection is in bad state")
756 errCloseIdle = errors.New("http: putIdleConn: CloseIdleConnections was called")
757 errTooManyIdle = errors.New("http: putIdleConn: too many idle connections")
758 errTooManyIdleHost = errors.New("http: putIdleConn: too many idle connections for host")
759 errCloseIdleConns = errors.New("http: CloseIdleConnections called")
760 errReadLoopExiting = errors.New("http: persistConn.readLoop exiting")
761 errIdleConnTimeout = errors.New("http: idle connection timeout")
762 errNotCachingH2Conn = errors.New("http: not caching alternate protocol's connections")
763
764
765
766
767
768 errServerClosedIdle = errors.New("http: server closed idle connection")
769 )
770
771
772
773
774
775
776
777
778
779 type transportReadFromServerError struct {
780 err error
781 }
782
783 func (e transportReadFromServerError) Unwrap() error { return e.err }
784
785 func (e transportReadFromServerError) Error() string {
786 return fmt.Sprintf("net/http: Transport failed to read from server: %v", e.err)
787 }
788
789 func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
790 if err := t.tryPutIdleConn(pconn); err != nil {
791 pconn.close(err)
792 }
793 }
794
795 func (t *Transport) maxIdleConnsPerHost() int {
796 if v := t.MaxIdleConnsPerHost; v != 0 {
797 return v
798 }
799 return DefaultMaxIdleConnsPerHost
800 }
801
802
803
804
805
806
807 func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
808 if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
809 return errKeepAlivesDisabled
810 }
811 if pconn.isBroken() {
812 return errConnBroken
813 }
814 pconn.markReused()
815
816 t.idleMu.Lock()
817 defer t.idleMu.Unlock()
818
819
820
821
822 if pconn.alt != nil && t.idleLRU.m[pconn] != nil {
823 return nil
824 }
825
826
827
828
829
830 key := pconn.cacheKey
831 if q, ok := t.idleConnWait[key]; ok {
832 done := false
833 if pconn.alt == nil {
834
835
836 for q.len() > 0 {
837 w := q.popFront()
838 if w.tryDeliver(pconn, nil) {
839 done = true
840 break
841 }
842 }
843 } else {
844
845
846
847
848 for q.len() > 0 {
849 w := q.popFront()
850 w.tryDeliver(pconn, nil)
851 }
852 }
853 if q.len() == 0 {
854 delete(t.idleConnWait, key)
855 } else {
856 t.idleConnWait[key] = q
857 }
858 if done {
859 return nil
860 }
861 }
862
863 if t.closeIdle {
864 return errCloseIdle
865 }
866 if t.idleConn == nil {
867 t.idleConn = make(map[connectMethodKey][]*persistConn)
868 }
869 idles := t.idleConn[key]
870 if len(idles) >= t.maxIdleConnsPerHost() {
871 return errTooManyIdleHost
872 }
873 for _, exist := range idles {
874 if exist == pconn {
875 log.Fatalf("dup idle pconn %p in freelist", pconn)
876 }
877 }
878 t.idleConn[key] = append(idles, pconn)
879 t.idleLRU.add(pconn)
880 if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns {
881 oldest := t.idleLRU.removeOldest()
882 oldest.close(errTooManyIdle)
883 t.removeIdleConnLocked(oldest)
884 }
885
886
887
888
889 if t.IdleConnTimeout > 0 && pconn.alt == nil {
890 if pconn.idleTimer != nil {
891 pconn.idleTimer.Reset(t.IdleConnTimeout)
892 } else {
893 pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
894 }
895 }
896 pconn.idleAt = time.Now()
897 return nil
898 }
899
900
901
902
903 func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
904 if t.DisableKeepAlives {
905 return false
906 }
907
908 t.idleMu.Lock()
909 defer t.idleMu.Unlock()
910
911
912
913 t.closeIdle = false
914
915 if w == nil {
916
917 return false
918 }
919
920
921 if list, ok := t.idleConn[w.key]; ok {
922 stop := false
923 delivered := false
924 for len(list) > 0 && !stop {
925 pconn := list[len(list)-1]
926 if pconn.isBroken() {
927
928
929
930 list = list[:len(list)-1]
931 continue
932 }
933 delivered = w.tryDeliver(pconn, nil)
934 if delivered {
935 if pconn.alt != nil {
936
937
938 } else {
939
940
941 t.idleLRU.remove(pconn)
942 list = list[:len(list)-1]
943 }
944 }
945 stop = true
946 }
947 if len(list) > 0 {
948 t.idleConn[w.key] = list
949 } else {
950 delete(t.idleConn, w.key)
951 }
952 if stop {
953 return delivered
954 }
955 }
956
957
958 if t.idleConnWait == nil {
959 t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
960 }
961 q := t.idleConnWait[w.key]
962 q.cleanFront()
963 q.pushBack(w)
964 t.idleConnWait[w.key] = q
965 return false
966 }
967
968
969 func (t *Transport) removeIdleConn(pconn *persistConn) {
970 t.idleMu.Lock()
971 defer t.idleMu.Unlock()
972 t.removeIdleConnLocked(pconn)
973 }
974
975
976 func (t *Transport) removeIdleConnLocked(pconn *persistConn) {
977 if pconn.idleTimer != nil {
978 pconn.idleTimer.Stop()
979 }
980 t.idleLRU.remove(pconn)
981 key := pconn.cacheKey
982 pconns := t.idleConn[key]
983 switch len(pconns) {
984 case 0:
985
986 case 1:
987 if pconns[0] == pconn {
988 delete(t.idleConn, key)
989 }
990 default:
991 for i, v := range pconns {
992 if v != pconn {
993 continue
994 }
995
996
997 copy(pconns[i:], pconns[i+1:])
998 t.idleConn[key] = pconns[:len(pconns)-1]
999 break
1000 }
1001 }
1002 }
1003
1004 func (t *Transport) setReqCanceler(r *Request, fn func(error)) {
1005 t.reqMu.Lock()
1006 defer t.reqMu.Unlock()
1007 if t.reqCanceler == nil {
1008 t.reqCanceler = make(map[*Request]func(error))
1009 }
1010 if fn != nil {
1011 t.reqCanceler[r] = fn
1012 } else {
1013 delete(t.reqCanceler, r)
1014 }
1015 }
1016
1017
1018
1019
1020
1021 func (t *Transport) replaceReqCanceler(r *Request, fn func(error)) bool {
1022 t.reqMu.Lock()
1023 defer t.reqMu.Unlock()
1024 _, ok := t.reqCanceler[r]
1025 if !ok {
1026 return false
1027 }
1028 if fn != nil {
1029 t.reqCanceler[r] = fn
1030 } else {
1031 delete(t.reqCanceler, r)
1032 }
1033 return true
1034 }
1035
1036 var zeroDialer net.Dialer
1037
1038 func (t *Transport) dial(ctx context.Context, network, addr string) (net.Conn, error) {
1039 if t.DialContext != nil {
1040 return t.DialContext(ctx, network, addr)
1041 }
1042 if t.Dial != nil {
1043 c, err := t.Dial(network, addr)
1044 if c == nil && err == nil {
1045 err = errors.New("net/http: Transport.Dial hook returned (nil, nil)")
1046 }
1047 return c, err
1048 }
1049 return zeroDialer.DialContext(ctx, network, addr)
1050 }
1051
1052
1053
1054
1055
1056
1057
1058 type wantConn struct {
1059 cm connectMethod
1060 key connectMethodKey
1061 ctx context.Context
1062 ready chan struct{}
1063
1064
1065
1066
1067 beforeDial func()
1068 afterDial func()
1069
1070 mu sync.Mutex
1071 pc *persistConn
1072 err error
1073 }
1074
1075
1076 func (w *wantConn) waiting() bool {
1077 select {
1078 case <-w.ready:
1079 return false
1080 default:
1081 return true
1082 }
1083 }
1084
1085
1086 func (w *wantConn) tryDeliver(pc *persistConn, err error) bool {
1087 w.mu.Lock()
1088 defer w.mu.Unlock()
1089
1090 if w.pc != nil || w.err != nil {
1091 return false
1092 }
1093
1094 w.pc = pc
1095 w.err = err
1096 if w.pc == nil && w.err == nil {
1097 panic("net/http: internal error: misuse of tryDeliver")
1098 }
1099 close(w.ready)
1100 return true
1101 }
1102
1103
1104
1105 func (w *wantConn) cancel(t *Transport, err error) {
1106 w.mu.Lock()
1107 if w.pc == nil && w.err == nil {
1108 close(w.ready)
1109 }
1110 pc := w.pc
1111 w.pc = nil
1112 w.err = err
1113 w.mu.Unlock()
1114
1115 if pc != nil {
1116 t.putOrCloseIdleConn(pc)
1117 }
1118 }
1119
1120
1121 type wantConnQueue struct {
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132 head []*wantConn
1133 headPos int
1134 tail []*wantConn
1135 }
1136
1137
1138 func (q *wantConnQueue) len() int {
1139 return len(q.head) - q.headPos + len(q.tail)
1140 }
1141
1142
1143 func (q *wantConnQueue) pushBack(w *wantConn) {
1144 q.tail = append(q.tail, w)
1145 }
1146
1147
1148 func (q *wantConnQueue) popFront() *wantConn {
1149 if q.headPos >= len(q.head) {
1150 if len(q.tail) == 0 {
1151 return nil
1152 }
1153
1154 q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
1155 }
1156 w := q.head[q.headPos]
1157 q.head[q.headPos] = nil
1158 q.headPos++
1159 return w
1160 }
1161
1162
1163 func (q *wantConnQueue) peekFront() *wantConn {
1164 if q.headPos < len(q.head) {
1165 return q.head[q.headPos]
1166 }
1167 if len(q.tail) > 0 {
1168 return q.tail[0]
1169 }
1170 return nil
1171 }
1172
1173
1174
1175 func (q *wantConnQueue) cleanFront() (cleaned bool) {
1176 for {
1177 w := q.peekFront()
1178 if w == nil || w.waiting() {
1179 return cleaned
1180 }
1181 q.popFront()
1182 cleaned = true
1183 }
1184 }
1185
1186
1187
1188
1189
1190 func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
1191 req := treq.Request
1192 trace := treq.trace
1193 ctx := req.Context()
1194 if trace != nil && trace.GetConn != nil {
1195 trace.GetConn(cm.addr())
1196 }
1197
1198 w := &wantConn{
1199 cm: cm,
1200 key: cm.key(),
1201 ctx: ctx,
1202 ready: make(chan struct{}, 1),
1203 beforeDial: testHookPrePendingDial,
1204 afterDial: testHookPostPendingDial,
1205 }
1206 defer func() {
1207 if err != nil {
1208 w.cancel(t, err)
1209 }
1210 }()
1211
1212
1213 if delivered := t.queueForIdleConn(w); delivered {
1214 pc := w.pc
1215
1216
1217 if pc.alt == nil && trace != nil && trace.GotConn != nil {
1218 trace.GotConn(pc.gotIdleConnTrace(pc.idleAt))
1219 }
1220
1221
1222
1223 t.setReqCanceler(req, func(error) {})
1224 return pc, nil
1225 }
1226
1227 cancelc := make(chan error, 1)
1228 t.setReqCanceler(req, func(err error) { cancelc <- err })
1229
1230
1231 t.queueForDial(w)
1232
1233
1234 select {
1235 case <-w.ready:
1236
1237
1238 if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil {
1239 trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()})
1240 }
1241 if w.err != nil {
1242
1243
1244
1245 select {
1246 case <-req.Cancel:
1247 return nil, errRequestCanceledConn
1248 case <-req.Context().Done():
1249 return nil, req.Context().Err()
1250 case err := <-cancelc:
1251 if err == errRequestCanceled {
1252 err = errRequestCanceledConn
1253 }
1254 return nil, err
1255 default:
1256
1257 }
1258 }
1259 return w.pc, w.err
1260 case <-req.Cancel:
1261 return nil, errRequestCanceledConn
1262 case <-req.Context().Done():
1263 return nil, req.Context().Err()
1264 case err := <-cancelc:
1265 if err == errRequestCanceled {
1266 err = errRequestCanceledConn
1267 }
1268 return nil, err
1269 }
1270 }
1271
1272
1273
1274 func (t *Transport) queueForDial(w *wantConn) {
1275 w.beforeDial()
1276 if t.MaxConnsPerHost <= 0 {
1277 go t.dialConnFor(w)
1278 return
1279 }
1280
1281 t.connsPerHostMu.Lock()
1282 defer t.connsPerHostMu.Unlock()
1283
1284 if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
1285 if t.connsPerHost == nil {
1286 t.connsPerHost = make(map[connectMethodKey]int)
1287 }
1288 t.connsPerHost[w.key] = n + 1
1289 go t.dialConnFor(w)
1290 return
1291 }
1292
1293 if t.connsPerHostWait == nil {
1294 t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
1295 }
1296 q := t.connsPerHostWait[w.key]
1297 q.cleanFront()
1298 q.pushBack(w)
1299 t.connsPerHostWait[w.key] = q
1300 }
1301
1302
1303
1304
1305 func (t *Transport) dialConnFor(w *wantConn) {
1306 defer w.afterDial()
1307
1308 pc, err := t.dialConn(w.ctx, w.cm)
1309 delivered := w.tryDeliver(pc, err)
1310 if err == nil && (!delivered || pc.alt != nil) {
1311
1312
1313
1314 t.putOrCloseIdleConn(pc)
1315 }
1316 if err != nil {
1317 t.decConnsPerHost(w.key)
1318 }
1319 }
1320
1321
1322
1323 func (t *Transport) decConnsPerHost(key connectMethodKey) {
1324 if t.MaxConnsPerHost <= 0 {
1325 return
1326 }
1327
1328 t.connsPerHostMu.Lock()
1329 defer t.connsPerHostMu.Unlock()
1330 n := t.connsPerHost[key]
1331 if n == 0 {
1332
1333
1334 panic("net/http: internal error: connCount underflow")
1335 }
1336
1337
1338
1339
1340
1341 if q := t.connsPerHostWait[key]; q.len() > 0 {
1342 done := false
1343 for q.len() > 0 {
1344 w := q.popFront()
1345 if w.waiting() {
1346 go t.dialConnFor(w)
1347 done = true
1348 break
1349 }
1350 }
1351 if q.len() == 0 {
1352 delete(t.connsPerHostWait, key)
1353 } else {
1354
1355
1356 t.connsPerHostWait[key] = q
1357 }
1358 if done {
1359 return
1360 }
1361 }
1362
1363
1364 if n--; n == 0 {
1365 delete(t.connsPerHost, key)
1366 } else {
1367 t.connsPerHost[key] = n
1368 }
1369 }
1370
1371
1372
1373 func chooseTLSHost(cm connectMethod, t *Transport) string {
1374 tlsHost := ""
1375 if t.TLSClientConfig != nil {
1376 tlsHost = t.TLSClientConfig.ServerName
1377 }
1378 if tlsHost == "" {
1379 tlsHost = cm.tlsHost()
1380 }
1381 return tlsHost
1382 }
1383
1384
1385
1386
1387 func (pconn *persistConn) addTLS(name string, trace *httptrace.ClientTrace) error {
1388
1389 cfg := cloneTLSConfig(pconn.t.TLSClientConfig)
1390 if cfg.ServerName == "" {
1391 cfg.ServerName = name
1392 }
1393 if pconn.cacheKey.onlyH1 {
1394 cfg.NextProtos = nil
1395 }
1396 plainConn := pconn.conn
1397 tlsConn := tls.Client(plainConn, cfg)
1398 errc := make(chan error, 2)
1399 var timer *time.Timer
1400 if d := pconn.t.TLSHandshakeTimeout; d != 0 {
1401 timer = time.AfterFunc(d, func() {
1402 errc <- tlsHandshakeTimeoutError{}
1403 })
1404 }
1405 go func() {
1406 if trace != nil && trace.TLSHandshakeStart != nil {
1407 trace.TLSHandshakeStart()
1408 }
1409 err := tlsConn.Handshake()
1410 if timer != nil {
1411 timer.Stop()
1412 }
1413 errc <- err
1414 }()
1415 if err := <-errc; err != nil {
1416 plainConn.Close()
1417 if trace != nil && trace.TLSHandshakeDone != nil {
1418 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1419 }
1420 return err
1421 }
1422 cs := tlsConn.ConnectionState()
1423 if trace != nil && trace.TLSHandshakeDone != nil {
1424 trace.TLSHandshakeDone(cs, nil)
1425 }
1426 pconn.tlsState = &cs
1427 pconn.conn = tlsConn
1428 return nil
1429 }
1430
1431 func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
1432 pconn = &persistConn{
1433 t: t,
1434 cacheKey: cm.key(),
1435 reqch: make(chan requestAndChan, 1),
1436 writech: make(chan writeRequest, 1),
1437 closech: make(chan struct{}),
1438 writeErrCh: make(chan error, 1),
1439 writeLoopDone: make(chan struct{}),
1440 }
1441 trace := httptrace.ContextClientTrace(ctx)
1442 wrapErr := func(err error) error {
1443 if cm.proxyURL != nil {
1444
1445 return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
1446 }
1447 return err
1448 }
1449 if cm.scheme() == "https" && t.DialTLS != nil {
1450 var err error
1451 pconn.conn, err = t.DialTLS("tcp", cm.addr())
1452 if err != nil {
1453 return nil, wrapErr(err)
1454 }
1455 if pconn.conn == nil {
1456 return nil, wrapErr(errors.New("net/http: Transport.DialTLS returned (nil, nil)"))
1457 }
1458 if tc, ok := pconn.conn.(*tls.Conn); ok {
1459
1460
1461 if trace != nil && trace.TLSHandshakeStart != nil {
1462 trace.TLSHandshakeStart()
1463 }
1464 if err := tc.Handshake(); err != nil {
1465 go pconn.conn.Close()
1466 if trace != nil && trace.TLSHandshakeDone != nil {
1467 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1468 }
1469 return nil, err
1470 }
1471 cs := tc.ConnectionState()
1472 if trace != nil && trace.TLSHandshakeDone != nil {
1473 trace.TLSHandshakeDone(cs, nil)
1474 }
1475 pconn.tlsState = &cs
1476 }
1477 } else {
1478 conn, err := t.dial(ctx, "tcp", cm.addr())
1479 if err != nil {
1480 return nil, wrapErr(err)
1481 }
1482 pconn.conn = conn
1483 if cm.scheme() == "https" {
1484 var firstTLSHost string
1485 if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
1486 return nil, wrapErr(err)
1487 }
1488 if err = pconn.addTLS(firstTLSHost, trace); err != nil {
1489 return nil, wrapErr(err)
1490 }
1491 }
1492 }
1493
1494
1495 switch {
1496 case cm.proxyURL == nil:
1497
1498 case cm.proxyURL.Scheme == "socks5":
1499 conn := pconn.conn
1500 d := socksNewDialer("tcp", conn.RemoteAddr().String())
1501 if u := cm.proxyURL.User; u != nil {
1502 auth := &socksUsernamePassword{
1503 Username: u.Username(),
1504 }
1505 auth.Password, _ = u.Password()
1506 d.AuthMethods = []socksAuthMethod{
1507 socksAuthMethodNotRequired,
1508 socksAuthMethodUsernamePassword,
1509 }
1510 d.Authenticate = auth.Authenticate
1511 }
1512 if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil {
1513 conn.Close()
1514 return nil, err
1515 }
1516 case cm.targetScheme == "http":
1517 pconn.isProxy = true
1518 if pa := cm.proxyAuth(); pa != "" {
1519 pconn.mutateHeaderFunc = func(h Header) {
1520 h.Set("Proxy-Authorization", pa)
1521 }
1522 }
1523 case cm.targetScheme == "https":
1524 conn := pconn.conn
1525 hdr := t.ProxyConnectHeader
1526 if hdr == nil {
1527 hdr = make(Header)
1528 }
1529 if pa := cm.proxyAuth(); pa != "" {
1530 hdr = hdr.Clone()
1531 hdr.Set("Proxy-Authorization", pa)
1532 }
1533 connectReq := &Request{
1534 Method: "CONNECT",
1535 URL: &url.URL{Opaque: cm.targetAddr},
1536 Host: cm.targetAddr,
1537 Header: hdr,
1538 }
1539 connectReq.Write(conn)
1540
1541
1542
1543
1544 br := bufio.NewReader(conn)
1545 resp, err := ReadResponse(br, connectReq)
1546 if err != nil {
1547 conn.Close()
1548 return nil, err
1549 }
1550 if resp.StatusCode != 200 {
1551 f := strings.SplitN(resp.Status, " ", 2)
1552 conn.Close()
1553 if len(f) < 2 {
1554 return nil, errors.New("unknown status code")
1555 }
1556 return nil, errors.New(f[1])
1557 }
1558 }
1559
1560 if cm.proxyURL != nil && cm.targetScheme == "https" {
1561 if err := pconn.addTLS(cm.tlsHost(), trace); err != nil {
1562 return nil, err
1563 }
1564 }
1565
1566 if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
1567 if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
1568 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: next(cm.targetAddr, pconn.conn.(*tls.Conn))}, nil
1569 }
1570 }
1571
1572 pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
1573 pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
1574
1575 go pconn.readLoop()
1576 go pconn.writeLoop()
1577 return pconn, nil
1578 }
1579
1580
1581
1582
1583
1584
1585
1586 type persistConnWriter struct {
1587 pc *persistConn
1588 }
1589
1590 func (w persistConnWriter) Write(p []byte) (n int, err error) {
1591 n, err = w.pc.conn.Write(p)
1592 w.pc.nwrite += int64(n)
1593 return
1594 }
1595
1596
1597
1598
1599 func (w persistConnWriter) ReadFrom(r io.Reader) (n int64, err error) {
1600 n, err = io.Copy(w.pc.conn, r)
1601 w.pc.nwrite += n
1602 return
1603 }
1604
1605 var _ io.ReaderFrom = (*persistConnWriter)(nil)
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624 type connectMethod struct {
1625 proxyURL *url.URL
1626 targetScheme string
1627
1628
1629
1630 targetAddr string
1631 onlyH1 bool
1632 }
1633
1634 func (cm *connectMethod) key() connectMethodKey {
1635 proxyStr := ""
1636 targetAddr := cm.targetAddr
1637 if cm.proxyURL != nil {
1638 proxyStr = cm.proxyURL.String()
1639 if (cm.proxyURL.Scheme == "http" || cm.proxyURL.Scheme == "https") && cm.targetScheme == "http" {
1640 targetAddr = ""
1641 }
1642 }
1643 return connectMethodKey{
1644 proxy: proxyStr,
1645 scheme: cm.targetScheme,
1646 addr: targetAddr,
1647 onlyH1: cm.onlyH1,
1648 }
1649 }
1650
1651
1652 func (cm *connectMethod) scheme() string {
1653 if cm.proxyURL != nil {
1654 return cm.proxyURL.Scheme
1655 }
1656 return cm.targetScheme
1657 }
1658
1659
1660 func (cm *connectMethod) addr() string {
1661 if cm.proxyURL != nil {
1662 return canonicalAddr(cm.proxyURL)
1663 }
1664 return cm.targetAddr
1665 }
1666
1667
1668
1669 func (cm *connectMethod) tlsHost() string {
1670 h := cm.targetAddr
1671 if hasPort(h) {
1672 h = h[:strings.LastIndex(h, ":")]
1673 }
1674 return h
1675 }
1676
1677
1678
1679
1680 type connectMethodKey struct {
1681 proxy, scheme, addr string
1682 onlyH1 bool
1683 }
1684
1685 func (k connectMethodKey) String() string {
1686
1687 var h1 string
1688 if k.onlyH1 {
1689 h1 = ",h1"
1690 }
1691 return fmt.Sprintf("%s|%s%s|%s", k.proxy, k.scheme, h1, k.addr)
1692 }
1693
1694
1695
1696 type persistConn struct {
1697
1698
1699
1700 alt RoundTripper
1701
1702 t *Transport
1703 cacheKey connectMethodKey
1704 conn net.Conn
1705 tlsState *tls.ConnectionState
1706 br *bufio.Reader
1707 bw *bufio.Writer
1708 nwrite int64
1709 reqch chan requestAndChan
1710 writech chan writeRequest
1711 closech chan struct{}
1712 isProxy bool
1713 sawEOF bool
1714 readLimit int64
1715
1716
1717
1718
1719 writeErrCh chan error
1720
1721 writeLoopDone chan struct{}
1722
1723
1724 idleAt time.Time
1725 idleTimer *time.Timer
1726
1727 mu sync.Mutex
1728 numExpectedResponses int
1729 closed error
1730 canceledErr error
1731 broken bool
1732 reused bool
1733
1734
1735
1736 mutateHeaderFunc func(Header)
1737 }
1738
1739 func (pc *persistConn) maxHeaderResponseSize() int64 {
1740 if v := pc.t.MaxResponseHeaderBytes; v != 0 {
1741 return v
1742 }
1743 return 10 << 20
1744 }
1745
1746 func (pc *persistConn) Read(p []byte) (n int, err error) {
1747 if pc.readLimit <= 0 {
1748 return 0, fmt.Errorf("read limit of %d bytes exhausted", pc.maxHeaderResponseSize())
1749 }
1750 if int64(len(p)) > pc.readLimit {
1751 p = p[:pc.readLimit]
1752 }
1753 n, err = pc.conn.Read(p)
1754 if err == io.EOF {
1755 pc.sawEOF = true
1756 }
1757 pc.readLimit -= int64(n)
1758 return
1759 }
1760
1761
1762 func (pc *persistConn) isBroken() bool {
1763 pc.mu.Lock()
1764 b := pc.closed != nil
1765 pc.mu.Unlock()
1766 return b
1767 }
1768
1769
1770
1771 func (pc *persistConn) canceled() error {
1772 pc.mu.Lock()
1773 defer pc.mu.Unlock()
1774 return pc.canceledErr
1775 }
1776
1777
1778 func (pc *persistConn) isReused() bool {
1779 pc.mu.Lock()
1780 r := pc.reused
1781 pc.mu.Unlock()
1782 return r
1783 }
1784
1785 func (pc *persistConn) gotIdleConnTrace(idleAt time.Time) (t httptrace.GotConnInfo) {
1786 pc.mu.Lock()
1787 defer pc.mu.Unlock()
1788 t.Reused = pc.reused
1789 t.Conn = pc.conn
1790 t.WasIdle = true
1791 if !idleAt.IsZero() {
1792 t.IdleTime = time.Since(idleAt)
1793 }
1794 return
1795 }
1796
1797 func (pc *persistConn) cancelRequest(err error) {
1798 pc.mu.Lock()
1799 defer pc.mu.Unlock()
1800 pc.canceledErr = err
1801 pc.closeLocked(errRequestCanceled)
1802 }
1803
1804
1805
1806
1807 func (pc *persistConn) closeConnIfStillIdle() {
1808 t := pc.t
1809 t.idleMu.Lock()
1810 defer t.idleMu.Unlock()
1811 if _, ok := t.idleLRU.m[pc]; !ok {
1812
1813 return
1814 }
1815 t.removeIdleConnLocked(pc)
1816 pc.close(errIdleConnTimeout)
1817 }
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827 func (pc *persistConn) mapRoundTripError(req *transportRequest, startBytesWritten int64, err error) error {
1828 if err == nil {
1829 return nil
1830 }
1831
1832
1833
1834
1835 if cerr := pc.canceled(); cerr != nil {
1836 return cerr
1837 }
1838
1839
1840 req.mu.Lock()
1841 reqErr := req.err
1842 req.mu.Unlock()
1843 if reqErr != nil {
1844 return reqErr
1845 }
1846
1847 if err == errServerClosedIdle {
1848
1849 return err
1850 }
1851
1852 if _, ok := err.(transportReadFromServerError); ok {
1853
1854 return err
1855 }
1856 if pc.isBroken() {
1857 <-pc.writeLoopDone
1858 if pc.nwrite == startBytesWritten {
1859 return nothingWrittenError{err}
1860 }
1861 return fmt.Errorf("net/http: HTTP/1.x transport connection broken: %v", err)
1862 }
1863 return err
1864 }
1865
1866
1867
1868
1869 var errCallerOwnsConn = errors.New("read loop ending; caller owns writable underlying conn")
1870
1871 func (pc *persistConn) readLoop() {
1872 closeErr := errReadLoopExiting
1873 defer func() {
1874 pc.close(closeErr)
1875 pc.t.removeIdleConn(pc)
1876 }()
1877
1878 tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
1879 if err := pc.t.tryPutIdleConn(pc); err != nil {
1880 closeErr = err
1881 if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
1882 trace.PutIdleConn(err)
1883 }
1884 return false
1885 }
1886 if trace != nil && trace.PutIdleConn != nil {
1887 trace.PutIdleConn(nil)
1888 }
1889 return true
1890 }
1891
1892
1893
1894
1895 eofc := make(chan struct{})
1896 defer close(eofc)
1897
1898
1899 testHookMu.Lock()
1900 testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
1901 testHookMu.Unlock()
1902
1903 alive := true
1904 for alive {
1905 pc.readLimit = pc.maxHeaderResponseSize()
1906 _, err := pc.br.Peek(1)
1907
1908 pc.mu.Lock()
1909 if pc.numExpectedResponses == 0 {
1910 pc.readLoopPeekFailLocked(err)
1911 pc.mu.Unlock()
1912 return
1913 }
1914 pc.mu.Unlock()
1915
1916 rc := <-pc.reqch
1917 trace := httptrace.ContextClientTrace(rc.req.Context())
1918
1919 var resp *Response
1920 if err == nil {
1921 resp, err = pc.readResponse(rc, trace)
1922 } else {
1923 err = transportReadFromServerError{err}
1924 closeErr = err
1925 }
1926
1927 if err != nil {
1928 if pc.readLimit <= 0 {
1929 err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
1930 }
1931
1932 select {
1933 case rc.ch <- responseAndError{err: err}:
1934 case <-rc.callerGone:
1935 return
1936 }
1937 return
1938 }
1939 pc.readLimit = maxInt64
1940
1941 pc.mu.Lock()
1942 pc.numExpectedResponses--
1943 pc.mu.Unlock()
1944
1945 bodyWritable := resp.bodyIsWritable()
1946 hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0
1947
1948 if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable {
1949
1950
1951
1952 alive = false
1953 }
1954
1955 if !hasBody || bodyWritable {
1956 pc.t.setReqCanceler(rc.req, nil)
1957
1958
1959
1960
1961
1962
1963
1964 alive = alive &&
1965 !pc.sawEOF &&
1966 pc.wroteRequest() &&
1967 tryPutIdleConn(trace)
1968
1969 if bodyWritable {
1970 closeErr = errCallerOwnsConn
1971 }
1972
1973 select {
1974 case rc.ch <- responseAndError{res: resp}:
1975 case <-rc.callerGone:
1976 return
1977 }
1978
1979
1980
1981
1982 testHookReadLoopBeforeNextRead()
1983 continue
1984 }
1985
1986 waitForBodyRead := make(chan bool, 2)
1987 body := &bodyEOFSignal{
1988 body: resp.Body,
1989 earlyCloseFn: func() error {
1990 waitForBodyRead <- false
1991 <-eofc
1992 return nil
1993
1994 },
1995 fn: func(err error) error {
1996 isEOF := err == io.EOF
1997 waitForBodyRead <- isEOF
1998 if isEOF {
1999 <-eofc
2000 } else if err != nil {
2001 if cerr := pc.canceled(); cerr != nil {
2002 return cerr
2003 }
2004 }
2005 return err
2006 },
2007 }
2008
2009 resp.Body = body
2010 if rc.addedGzip && strings.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
2011 resp.Body = &gzipReader{body: body}
2012 resp.Header.Del("Content-Encoding")
2013 resp.Header.Del("Content-Length")
2014 resp.ContentLength = -1
2015 resp.Uncompressed = true
2016 }
2017
2018 select {
2019 case rc.ch <- responseAndError{res: resp}:
2020 case <-rc.callerGone:
2021 return
2022 }
2023
2024
2025
2026
2027 select {
2028 case bodyEOF := <-waitForBodyRead:
2029 pc.t.setReqCanceler(rc.req, nil)
2030 alive = alive &&
2031 bodyEOF &&
2032 !pc.sawEOF &&
2033 pc.wroteRequest() &&
2034 tryPutIdleConn(trace)
2035 if bodyEOF {
2036 eofc <- struct{}{}
2037 }
2038 case <-rc.req.Cancel:
2039 alive = false
2040 pc.t.CancelRequest(rc.req)
2041 case <-rc.req.Context().Done():
2042 alive = false
2043 pc.t.cancelRequest(rc.req, rc.req.Context().Err())
2044 case <-pc.closech:
2045 alive = false
2046 }
2047
2048 testHookReadLoopBeforeNextRead()
2049 }
2050 }
2051
2052 func (pc *persistConn) readLoopPeekFailLocked(peekErr error) {
2053 if pc.closed != nil {
2054 return
2055 }
2056 if n := pc.br.Buffered(); n > 0 {
2057 buf, _ := pc.br.Peek(n)
2058 if is408Message(buf) {
2059 pc.closeLocked(errServerClosedIdle)
2060 return
2061 } else {
2062 log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, peekErr)
2063 }
2064 }
2065 if peekErr == io.EOF {
2066
2067 pc.closeLocked(errServerClosedIdle)
2068 } else {
2069 pc.closeLocked(fmt.Errorf("readLoopPeekFailLocked: %v", peekErr))
2070 }
2071 }
2072
2073
2074
2075
2076 func is408Message(buf []byte) bool {
2077 if len(buf) < len("HTTP/1.x 408") {
2078 return false
2079 }
2080 if string(buf[:7]) != "HTTP/1." {
2081 return false
2082 }
2083 return string(buf[8:12]) == " 408"
2084 }
2085
2086
2087
2088
2089 func (pc *persistConn) readResponse(rc requestAndChan, trace *httptrace.ClientTrace) (resp *Response, err error) {
2090 if trace != nil && trace.GotFirstResponseByte != nil {
2091 if peek, err := pc.br.Peek(1); err == nil && len(peek) == 1 {
2092 trace.GotFirstResponseByte()
2093 }
2094 }
2095 num1xx := 0
2096 const max1xxResponses = 5
2097
2098 continueCh := rc.continueCh
2099 for {
2100 resp, err = ReadResponse(pc.br, rc.req)
2101 if err != nil {
2102 return
2103 }
2104 resCode := resp.StatusCode
2105 if continueCh != nil {
2106 if resCode == 100 {
2107 if trace != nil && trace.Got100Continue != nil {
2108 trace.Got100Continue()
2109 }
2110 continueCh <- struct{}{}
2111 continueCh = nil
2112 } else if resCode >= 200 {
2113 close(continueCh)
2114 continueCh = nil
2115 }
2116 }
2117 is1xx := 100 <= resCode && resCode <= 199
2118
2119 is1xxNonTerminal := is1xx && resCode != StatusSwitchingProtocols
2120 if is1xxNonTerminal {
2121 num1xx++
2122 if num1xx > max1xxResponses {
2123 return nil, errors.New("net/http: too many 1xx informational responses")
2124 }
2125 pc.readLimit = pc.maxHeaderResponseSize()
2126 if trace != nil && trace.Got1xxResponse != nil {
2127 if err := trace.Got1xxResponse(resCode, textproto.MIMEHeader(resp.Header)); err != nil {
2128 return nil, err
2129 }
2130 }
2131 continue
2132 }
2133 break
2134 }
2135 if resp.isProtocolSwitch() {
2136 resp.Body = newReadWriteCloserBody(pc.br, pc.conn)
2137 }
2138
2139 resp.TLS = pc.tlsState
2140 return
2141 }
2142
2143
2144
2145
2146 func (pc *persistConn) waitForContinue(continueCh <-chan struct{}) func() bool {
2147 if continueCh == nil {
2148 return nil
2149 }
2150 return func() bool {
2151 timer := time.NewTimer(pc.t.ExpectContinueTimeout)
2152 defer timer.Stop()
2153
2154 select {
2155 case _, ok := <-continueCh:
2156 return ok
2157 case <-timer.C:
2158 return true
2159 case <-pc.closech:
2160 return false
2161 }
2162 }
2163 }
2164
2165 func newReadWriteCloserBody(br *bufio.Reader, rwc io.ReadWriteCloser) io.ReadWriteCloser {
2166 body := &readWriteCloserBody{ReadWriteCloser: rwc}
2167 if br.Buffered() != 0 {
2168 body.br = br
2169 }
2170 return body
2171 }
2172
2173
2174
2175
2176
2177
2178 type readWriteCloserBody struct {
2179 br *bufio.Reader
2180 io.ReadWriteCloser
2181 }
2182
2183 func (b *readWriteCloserBody) Read(p []byte) (n int, err error) {
2184 if b.br != nil {
2185 if n := b.br.Buffered(); len(p) > n {
2186 p = p[:n]
2187 }
2188 n, err = b.br.Read(p)
2189 if b.br.Buffered() == 0 {
2190 b.br = nil
2191 }
2192 return n, err
2193 }
2194 return b.ReadWriteCloser.Read(p)
2195 }
2196
2197
2198 type nothingWrittenError struct {
2199 error
2200 }
2201
2202 func (pc *persistConn) writeLoop() {
2203 defer close(pc.writeLoopDone)
2204 for {
2205 select {
2206 case wr := <-pc.writech:
2207 startBytesWritten := pc.nwrite
2208 err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
2209 if bre, ok := err.(requestBodyReadError); ok {
2210 err = bre.error
2211
2212
2213
2214
2215
2216
2217
2218 wr.req.setError(err)
2219 }
2220 if err == nil {
2221 err = pc.bw.Flush()
2222 }
2223 if err != nil {
2224 wr.req.Request.closeBody()
2225 if pc.nwrite == startBytesWritten {
2226 err = nothingWrittenError{err}
2227 }
2228 }
2229 pc.writeErrCh <- err
2230 wr.ch <- err
2231 if err != nil {
2232 pc.close(err)
2233 return
2234 }
2235 case <-pc.closech:
2236 return
2237 }
2238 }
2239 }
2240
2241
2242
2243
2244 const maxWriteWaitBeforeConnReuse = 50 * time.Millisecond
2245
2246
2247
2248 func (pc *persistConn) wroteRequest() bool {
2249 select {
2250 case err := <-pc.writeErrCh:
2251
2252
2253 return err == nil
2254 default:
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265 t := time.NewTimer(maxWriteWaitBeforeConnReuse)
2266 defer t.Stop()
2267 select {
2268 case err := <-pc.writeErrCh:
2269 return err == nil
2270 case <-t.C:
2271 return false
2272 }
2273 }
2274 }
2275
2276
2277
2278 type responseAndError struct {
2279 res *Response
2280 err error
2281 }
2282
2283 type requestAndChan struct {
2284 req *Request
2285 ch chan responseAndError
2286
2287
2288
2289
2290 addedGzip bool
2291
2292
2293
2294
2295
2296 continueCh chan<- struct{}
2297
2298 callerGone <-chan struct{}
2299 }
2300
2301
2302
2303
2304
2305 type writeRequest struct {
2306 req *transportRequest
2307 ch chan<- error
2308
2309
2310
2311
2312 continueCh <-chan struct{}
2313 }
2314
2315 type httpError struct {
2316 err string
2317 timeout bool
2318 }
2319
2320 func (e *httpError) Error() string { return e.err }
2321 func (e *httpError) Timeout() bool { return e.timeout }
2322 func (e *httpError) Temporary() bool { return true }
2323
2324 var errTimeout error = &httpError{err: "net/http: timeout awaiting response headers", timeout: true}
2325
2326
2327
2328 var errRequestCanceled = http2errRequestCanceled
2329 var errRequestCanceledConn = errors.New("net/http: request canceled while waiting for connection")
2330
2331 func nop() {}
2332
2333
2334 var (
2335 testHookEnterRoundTrip = nop
2336 testHookWaitResLoop = nop
2337 testHookRoundTripRetried = nop
2338 testHookPrePendingDial = nop
2339 testHookPostPendingDial = nop
2340
2341 testHookMu sync.Locker = fakeLocker{}
2342 testHookReadLoopBeforeNextRead = nop
2343 )
2344
2345 func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
2346 testHookEnterRoundTrip()
2347 if !pc.t.replaceReqCanceler(req.Request, pc.cancelRequest) {
2348 pc.t.putOrCloseIdleConn(pc)
2349 return nil, errRequestCanceled
2350 }
2351 pc.mu.Lock()
2352 pc.numExpectedResponses++
2353 headerFn := pc.mutateHeaderFunc
2354 pc.mu.Unlock()
2355
2356 if headerFn != nil {
2357 headerFn(req.extraHeaders())
2358 }
2359
2360
2361
2362
2363
2364 requestedGzip := false
2365 if !pc.t.DisableCompression &&
2366 req.Header.Get("Accept-Encoding") == "" &&
2367 req.Header.Get("Range") == "" &&
2368 req.Method != "HEAD" {
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381 requestedGzip = true
2382 req.extraHeaders().Set("Accept-Encoding", "gzip")
2383 }
2384
2385 var continueCh chan struct{}
2386 if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
2387 continueCh = make(chan struct{}, 1)
2388 }
2389
2390 if pc.t.DisableKeepAlives && !req.wantsClose() {
2391 req.extraHeaders().Set("Connection", "close")
2392 }
2393
2394 gone := make(chan struct{})
2395 defer close(gone)
2396
2397 defer func() {
2398 if err != nil {
2399 pc.t.setReqCanceler(req.Request, nil)
2400 }
2401 }()
2402
2403 const debugRoundTrip = false
2404
2405
2406
2407
2408 startBytesWritten := pc.nwrite
2409 writeErrCh := make(chan error, 1)
2410 pc.writech <- writeRequest{req, writeErrCh, continueCh}
2411
2412 resc := make(chan responseAndError)
2413 pc.reqch <- requestAndChan{
2414 req: req.Request,
2415 ch: resc,
2416 addedGzip: requestedGzip,
2417 continueCh: continueCh,
2418 callerGone: gone,
2419 }
2420
2421 var respHeaderTimer <-chan time.Time
2422 cancelChan := req.Request.Cancel
2423 ctxDoneChan := req.Context().Done()
2424 for {
2425 testHookWaitResLoop()
2426 select {
2427 case err := <-writeErrCh:
2428 if debugRoundTrip {
2429 req.logf("writeErrCh resv: %T/%#v", err, err)
2430 }
2431 if err != nil {
2432 pc.close(fmt.Errorf("write error: %v", err))
2433 return nil, pc.mapRoundTripError(req, startBytesWritten, err)
2434 }
2435 if d := pc.t.ResponseHeaderTimeout; d > 0 {
2436 if debugRoundTrip {
2437 req.logf("starting timer for %v", d)
2438 }
2439 timer := time.NewTimer(d)
2440 defer timer.Stop()
2441 respHeaderTimer = timer.C
2442 }
2443 case <-pc.closech:
2444 if debugRoundTrip {
2445 req.logf("closech recv: %T %#v", pc.closed, pc.closed)
2446 }
2447 return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
2448 case <-respHeaderTimer:
2449 if debugRoundTrip {
2450 req.logf("timeout waiting for response headers.")
2451 }
2452 pc.close(errTimeout)
2453 return nil, errTimeout
2454 case re := <-resc:
2455 if (re.res == nil) == (re.err == nil) {
2456 panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
2457 }
2458 if debugRoundTrip {
2459 req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
2460 }
2461 if re.err != nil {
2462 return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
2463 }
2464 return re.res, nil
2465 case <-cancelChan:
2466 pc.t.CancelRequest(req.Request)
2467 cancelChan = nil
2468 case <-ctxDoneChan:
2469 pc.t.cancelRequest(req.Request, req.Context().Err())
2470 cancelChan = nil
2471 ctxDoneChan = nil
2472 }
2473 }
2474 }
2475
2476
2477
2478 type tLogKey struct{}
2479
2480 func (tr *transportRequest) logf(format string, args ...interface{}) {
2481 if logf, ok := tr.Request.Context().Value(tLogKey{}).(func(string, ...interface{})); ok {
2482 logf(time.Now().Format(time.RFC3339Nano)+": "+format, args...)
2483 }
2484 }
2485
2486
2487
2488 func (pc *persistConn) markReused() {
2489 pc.mu.Lock()
2490 pc.reused = true
2491 pc.mu.Unlock()
2492 }
2493
2494
2495
2496
2497
2498
2499 func (pc *persistConn) close(err error) {
2500 pc.mu.Lock()
2501 defer pc.mu.Unlock()
2502 pc.closeLocked(err)
2503 }
2504
2505 func (pc *persistConn) closeLocked(err error) {
2506 if err == nil {
2507 panic("nil error")
2508 }
2509 pc.broken = true
2510 if pc.closed == nil {
2511 pc.closed = err
2512 pc.t.decConnsPerHost(pc.cacheKey)
2513
2514
2515 if pc.alt == nil {
2516 if err != errCallerOwnsConn {
2517 pc.conn.Close()
2518 }
2519 close(pc.closech)
2520 }
2521 }
2522 pc.mutateHeaderFunc = nil
2523 }
2524
2525 var portMap = map[string]string{
2526 "http": "80",
2527 "https": "443",
2528 "socks5": "1080",
2529 }
2530
2531
2532 func canonicalAddr(url *url.URL) string {
2533 addr := url.Hostname()
2534 if v, err := idnaASCII(addr); err == nil {
2535 addr = v
2536 }
2537 port := url.Port()
2538 if port == "" {
2539 port = portMap[url.Scheme]
2540 }
2541 return net.JoinHostPort(addr, port)
2542 }
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
2553
2554
2555 type bodyEOFSignal struct {
2556 body io.ReadCloser
2557 mu sync.Mutex
2558 closed bool
2559 rerr error
2560 fn func(error) error
2561 earlyCloseFn func() error
2562 }
2563
2564 var errReadOnClosedResBody = errors.New("http: read on closed response body")
2565
2566 func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
2567 es.mu.Lock()
2568 closed, rerr := es.closed, es.rerr
2569 es.mu.Unlock()
2570 if closed {
2571 return 0, errReadOnClosedResBody
2572 }
2573 if rerr != nil {
2574 return 0, rerr
2575 }
2576
2577 n, err = es.body.Read(p)
2578 if err != nil {
2579 es.mu.Lock()
2580 defer es.mu.Unlock()
2581 if es.rerr == nil {
2582 es.rerr = err
2583 }
2584 err = es.condfn(err)
2585 }
2586 return
2587 }
2588
2589 func (es *bodyEOFSignal) Close() error {
2590 es.mu.Lock()
2591 defer es.mu.Unlock()
2592 if es.closed {
2593 return nil
2594 }
2595 es.closed = true
2596 if es.earlyCloseFn != nil && es.rerr != io.EOF {
2597 return es.earlyCloseFn()
2598 }
2599 err := es.body.Close()
2600 return es.condfn(err)
2601 }
2602
2603
2604 func (es *bodyEOFSignal) condfn(err error) error {
2605 if es.fn == nil {
2606 return err
2607 }
2608 err = es.fn(err)
2609 es.fn = nil
2610 return err
2611 }
2612
2613
2614
2615 type gzipReader struct {
2616 body *bodyEOFSignal
2617 zr *gzip.Reader
2618 zerr error
2619 }
2620
2621 func (gz *gzipReader) Read(p []byte) (n int, err error) {
2622 if gz.zr == nil {
2623 if gz.zerr == nil {
2624 gz.zr, gz.zerr = gzip.NewReader(gz.body)
2625 }
2626 if gz.zerr != nil {
2627 return 0, gz.zerr
2628 }
2629 }
2630
2631 gz.body.mu.Lock()
2632 if gz.body.closed {
2633 err = errReadOnClosedResBody
2634 }
2635 gz.body.mu.Unlock()
2636
2637 if err != nil {
2638 return 0, err
2639 }
2640 return gz.zr.Read(p)
2641 }
2642
2643 func (gz *gzipReader) Close() error {
2644 return gz.body.Close()
2645 }
2646
2647 type readerAndCloser struct {
2648 io.Reader
2649 io.Closer
2650 }
2651
2652 type tlsHandshakeTimeoutError struct{}
2653
2654 func (tlsHandshakeTimeoutError) Timeout() bool { return true }
2655 func (tlsHandshakeTimeoutError) Temporary() bool { return true }
2656 func (tlsHandshakeTimeoutError) Error() string { return "net/http: TLS handshake timeout" }
2657
2658
2659
2660
2661 type fakeLocker struct{}
2662
2663 func (fakeLocker) Lock() {}
2664 func (fakeLocker) Unlock() {}
2665
2666
2667
2668
2669 func cloneTLSConfig(cfg *tls.Config) *tls.Config {
2670 if cfg == nil {
2671 return &tls.Config{}
2672 }
2673 return cfg.Clone()
2674 }
2675
2676 type connLRU struct {
2677 ll *list.List
2678 m map[*persistConn]*list.Element
2679 }
2680
2681
2682 func (cl *connLRU) add(pc *persistConn) {
2683 if cl.ll == nil {
2684 cl.ll = list.New()
2685 cl.m = make(map[*persistConn]*list.Element)
2686 }
2687 ele := cl.ll.PushFront(pc)
2688 if _, ok := cl.m[pc]; ok {
2689 panic("persistConn was already in LRU")
2690 }
2691 cl.m[pc] = ele
2692 }
2693
2694 func (cl *connLRU) removeOldest() *persistConn {
2695 ele := cl.ll.Back()
2696 pc := ele.Value.(*persistConn)
2697 cl.ll.Remove(ele)
2698 delete(cl.m, pc)
2699 return pc
2700 }
2701
2702
2703 func (cl *connLRU) remove(pc *persistConn) {
2704 if ele, ok := cl.m[pc]; ok {
2705 cl.ll.Remove(ele)
2706 delete(cl.m, pc)
2707 }
2708 }
2709
2710
2711 func (cl *connLRU) len() int {
2712 return len(cl.m)
2713 }
2714
2715
2716
2717
2718 func validPort(p string) bool {
2719 for _, r := range []byte(p) {
2720 if r < '0' || r > '9' {
2721 return false
2722 }
2723 }
2724 return true
2725 }
2726
View as plain text