Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 32 additions & 16 deletions pkg/remote/trans/nphttp2/grpc/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,11 @@ func (task *closeStreamTask) Tick() {
}

type clientTransportDump struct {
LocalAddress string `json:"local_address"`
State transportState `json:"transport_state"`
OutFlowControlSize int64 `json:"out_flow_control_size"`
ActiveStreams []streamDump `json:"active_streams"`
LocalAddress string `json:"local_address"`
State transportState `json:"transport_state"`
OutFlowControlSize int64 `json:"out_flow_control_size"`
ActiveStreams []streamDump `json:"active_streams"`
MaxConcurrentStreams uint32 `json:"max_concurrent_streams"`
}

type streamDump struct {
Expand Down Expand Up @@ -514,6 +515,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
var ch chan struct{}
checkForStreamQuota := func(it interface{}) bool {
if t.streamQuota <= 0 { // Can go negative if server decreases it.
klog.CtxInfof(ctx, "KITEX: %s %s NewStream blocked due to concurrent streams limit, MaxConcurrentStreams is %d", callHdr.Host, callHdr.Method, t.maxConcurrentStreams)
if firstTry {
t.waitingStreams++
}
Expand Down Expand Up @@ -1311,8 +1313,6 @@ func (t *http2Client) IsActive() bool {

func (t *http2Client) Dump() interface{} {
t.mu.Lock()
defer t.mu.Unlock()

// sort the stream using streamID
as := make([]streamDump, 0, len(t.activeStreams))
ids := make([]uint32, 0, len(t.activeStreams))
Expand Down Expand Up @@ -1340,29 +1340,45 @@ func (t *http2Client) Dump() interface{} {
}
as = append(as, sd)
}
var localAddress string
state := t.state
if t.localAddr != nil {
localAddress = t.localAddr.String()
}
t.mu.Unlock()

// since t.getOutFlowWindowAndMaxConcurrentStreams would access t.controlBuf,
// we must release the t.mu first otherwise it will cause deadlock possibly
outFlowWindow, maxStreams := t.getOutFlowWindowAndMaxConcurrentStreams()
dump := clientTransportDump{
State: t.state,
ActiveStreams: as,
OutFlowControlSize: t.getOutFlowWindow(),
State: state,
ActiveStreams: as,
OutFlowControlSize: outFlowWindow,
MaxConcurrentStreams: maxStreams,
}
if localAddress := t.localAddr; localAddress != nil {
dump.LocalAddress = localAddress.String()
if localAddress != "" {
dump.LocalAddress = localAddress
}
return dump
}

func (t *http2Client) getOutFlowWindow() int64 {
func (t *http2Client) getOutFlowWindowAndMaxConcurrentStreams() (int64, uint32) {
var outFlowWindow int64
var maxStreams uint32
resp := make(chan uint32, 1)
timer := time.NewTimer(time.Second)
defer timer.Stop()
t.controlBuf.put(&outFlowControlSizeRequest{resp})
t.controlBuf.executeAndPut(func(it interface{}) bool {
maxStreams = t.maxConcurrentStreams
return true
}, &outFlowControlSizeRequest{resp})
select {
case sz := <-resp:
return int64(sz)
outFlowWindow = int64(sz)
case <-t.ctx.Done():
return -1
outFlowWindow = -1
case <-timer.C:
return -2
outFlowWindow = -2
}
return outFlowWindow, maxStreams
}
9 changes: 6 additions & 3 deletions pkg/remote/trans/nphttp2/grpc/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1638,9 +1638,11 @@ func TestGetClientStat(t *testing.T) {
test.Assert(t, md != nil)
test.Assert(t, ok)
test.Assert(t, s.getHeaderValid())
// send quota
w := cli.getOutFlowWindow()
// send quota and max concurrent streams
w, m := cli.getOutFlowWindowAndMaxConcurrentStreams()
test.Assert(t, uint32(w) == cli.loopy.sendQuota)
// there is no limit on the number of streams by default
test.Assert(t, m == math.MaxUint32)

// Dump
d := cli.Dump()
Expand All @@ -1649,7 +1651,8 @@ func TestGetClientStat(t *testing.T) {
test.Assert(t, td.State == reachable)
cli.mu.Lock()
remoteAddr := cli.remoteAddr.String()
test.Assert(t, len(td.ActiveStreams) == len(cli.activeStreams))
// When the Dump ends, it's possible streams that were active at that moment have already ended
test.Assert(t, len(td.ActiveStreams) >= len(cli.activeStreams))
cli.mu.Unlock()

if len(td.ActiveStreams) != 0 {
Expand Down