Skip to content

Commit 27a3cb5

Browse files
committed
feat(gRPC): support dump MaxConcurrentStreams of HTTP2 Client
1 parent a990796 commit 27a3cb5

File tree

2 files changed

+38
-19
lines changed

2 files changed

+38
-19
lines changed

pkg/remote/trans/nphttp2/grpc/http2_client.go

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -317,10 +317,11 @@ func (task *closeStreamTask) Tick() {
317317
}
318318

319319
type clientTransportDump struct {
320-
LocalAddress string `json:"local_address"`
321-
State transportState `json:"transport_state"`
322-
OutFlowControlSize int64 `json:"out_flow_control_size"`
323-
ActiveStreams []streamDump `json:"active_streams"`
320+
LocalAddress string `json:"local_address"`
321+
State transportState `json:"transport_state"`
322+
OutFlowControlSize int64 `json:"out_flow_control_size"`
323+
ActiveStreams []streamDump `json:"active_streams"`
324+
MaxConcurrentStreams uint32 `json:"max_concurrent_streams"`
324325
}
325326

326327
type streamDump struct {
@@ -514,6 +515,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
514515
var ch chan struct{}
515516
checkForStreamQuota := func(it interface{}) bool {
516517
if t.streamQuota <= 0 { // Can go negative if server decreases it.
518+
klog.CtxInfof(ctx, "KITEX: %s %s NewStream blocked due to concurrent streams limit, MaxConcurrentStreams is %d", callHdr.Host, callHdr.Method, t.maxConcurrentStreams)
517519
if firstTry {
518520
t.waitingStreams++
519521
}
@@ -1311,8 +1313,6 @@ func (t *http2Client) IsActive() bool {
13111313

13121314
func (t *http2Client) Dump() interface{} {
13131315
t.mu.Lock()
1314-
defer t.mu.Unlock()
1315-
13161316
// sort the stream using streamID
13171317
as := make([]streamDump, 0, len(t.activeStreams))
13181318
ids := make([]uint32, 0, len(t.activeStreams))
@@ -1340,29 +1340,45 @@ func (t *http2Client) Dump() interface{} {
13401340
}
13411341
as = append(as, sd)
13421342
}
1343+
var localAddress string
1344+
state := t.state
1345+
if t.localAddr != nil {
1346+
localAddress = t.localAddr.String()
1347+
}
1348+
t.mu.Unlock()
13431349

1350+
// since t.getOutFlowWindowAndMaxConcurrentStreams would access t.controlBuf,
1351+
// we must release the t.mu first otherwise it will cause deadlock possibly
1352+
outFlowWindow, maxStreams := t.getOutFlowWindowAndMaxConcurrentStreams()
13441353
dump := clientTransportDump{
1345-
State: t.state,
1346-
ActiveStreams: as,
1347-
OutFlowControlSize: t.getOutFlowWindow(),
1354+
State: state,
1355+
ActiveStreams: as,
1356+
OutFlowControlSize: outFlowWindow,
1357+
MaxConcurrentStreams: maxStreams,
13481358
}
1349-
if localAddress := t.localAddr; localAddress != nil {
1350-
dump.LocalAddress = localAddress.String()
1359+
if localAddress != "" {
1360+
dump.LocalAddress = localAddress
13511361
}
13521362
return dump
13531363
}
13541364

1355-
func (t *http2Client) getOutFlowWindow() int64 {
1365+
func (t *http2Client) getOutFlowWindowAndMaxConcurrentStreams() (int64, uint32) {
1366+
var outFlowWindow int64
1367+
var maxStreams uint32
13561368
resp := make(chan uint32, 1)
13571369
timer := time.NewTimer(time.Second)
13581370
defer timer.Stop()
1359-
t.controlBuf.put(&outFlowControlSizeRequest{resp})
1371+
t.controlBuf.executeAndPut(func(it interface{}) bool {
1372+
maxStreams = t.maxConcurrentStreams
1373+
return true
1374+
}, &outFlowControlSizeRequest{resp})
13601375
select {
13611376
case sz := <-resp:
1362-
return int64(sz)
1377+
outFlowWindow = int64(sz)
13631378
case <-t.ctx.Done():
1364-
return -1
1379+
outFlowWindow = -1
13651380
case <-timer.C:
1366-
return -2
1381+
outFlowWindow = -2
13671382
}
1383+
return outFlowWindow, maxStreams
13681384
}

pkg/remote/trans/nphttp2/grpc/transport_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1638,9 +1638,11 @@ func TestGetClientStat(t *testing.T) {
16381638
test.Assert(t, md != nil)
16391639
test.Assert(t, ok)
16401640
test.Assert(t, s.getHeaderValid())
1641-
// send quota
1642-
w := cli.getOutFlowWindow()
1641+
// send quota and max concurrent streams
1642+
w, m := cli.getOutFlowWindowAndMaxConcurrentStreams()
16431643
test.Assert(t, uint32(w) == cli.loopy.sendQuota)
1644+
// there is no limit on the number of streams by default
1645+
test.Assert(t, m == math.MaxUint32)
16441646

16451647
// Dump
16461648
d := cli.Dump()
@@ -1649,7 +1651,8 @@ func TestGetClientStat(t *testing.T) {
16491651
test.Assert(t, td.State == reachable)
16501652
cli.mu.Lock()
16511653
remoteAddr := cli.remoteAddr.String()
1652-
test.Assert(t, len(td.ActiveStreams) == len(cli.activeStreams))
1654+
// When the Dump ends, it's possible streams that were active at that moment have already ended
1655+
test.Assert(t, len(td.ActiveStreams) >= len(cli.activeStreams))
16531656
cli.mu.Unlock()
16541657

16551658
if len(td.ActiveStreams) != 0 {

0 commit comments

Comments
 (0)