Skip to content

Commit a09a362

Browse files
authored
Merge pull request smallnest#886 from rekyyang/fix/inform-chan
fix: done channel for broadcast, fork and inform
2 parents d9e2dbe + 5bf2c1a commit a09a362

File tree

1 file changed

+40
-20
lines changed

1 file changed

+40
-20
lines changed

โ€Žclient/xclient.goโ€Ž

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,12 @@ import (
1616
"time"
1717

1818
"github.com/juju/ratelimit"
19+
"golang.org/x/sync/singleflight"
20+
1921
ex "github.com/smallnest/rpcx/errors"
2022
"github.com/smallnest/rpcx/log"
2123
"github.com/smallnest/rpcx/protocol"
2224
"github.com/smallnest/rpcx/share"
23-
"golang.org/x/sync/singleflight"
2425
)
2526

2627
const (
@@ -944,6 +945,9 @@ func (c *xClient) Broadcast(ctx context.Context, serviceMethod string, args inte
944945
var replyOnce sync.Once
945946

946947
ctx = setServerTimeout(ctx)
948+
// add timeout after set server timeout, only prevent client hanging
949+
ctx, cancel := context.WithTimeout(ctx, time.Minute)
950+
defer cancel()
947951
callPlugins := make([]RPCClient, 0, len(c.servers))
948952
clients := make(map[string]RPCClient)
949953
c.mu.Lock()
@@ -982,7 +986,9 @@ func (c *xClient) Broadcast(ctx context.Context, serviceMethod string, args inte
982986
}
983987

984988
e := c.wrapCall(ctx, client, serviceMethod, args, clonedReply)
985-
done <- (e == nil)
989+
defer func() {
990+
done <- (e == nil)
991+
}()
986992
if e != nil {
987993
if uncoverError(e) {
988994
c.removeClient(k, c.servicePath, serviceMethod, client)
@@ -998,7 +1004,6 @@ func (c *xClient) Broadcast(ctx context.Context, serviceMethod string, args inte
9981004
}()
9991005
}
10001006

1001-
timeout := time.NewTimer(time.Minute)
10021007
check:
10031008
for {
10041009
select {
@@ -1007,12 +1012,14 @@ check:
10071012
if l == 0 || !result { // all returns or some one returns an error
10081013
break check
10091014
}
1010-
case <-timeout.C:
1011-
err.Append(errors.New(("timeout")))
1012-
break check
10131015
}
10141016
}
1015-
timeout.Stop()
1017+
1018+
select {
1019+
case <-ctx.Done():
1020+
err.Append(errors.New(("timeout")))
1021+
default:
1022+
}
10161023

10171024
return err.ErrorOrNil()
10181025
}
@@ -1035,6 +1042,10 @@ func (c *xClient) Fork(ctx context.Context, serviceMethod string, args interface
10351042
}
10361043

10371044
ctx = setServerTimeout(ctx)
1045+
1046+
// add timeout after set server timeout, only prevent client hanging
1047+
ctx, cancel := context.WithTimeout(ctx, time.Minute)
1048+
defer cancel()
10381049
callPlugins := make([]RPCClient, 0, len(c.servers))
10391050
clients := make(map[string]RPCClient)
10401051
c.mu.Lock()
@@ -1080,7 +1091,9 @@ func (c *xClient) Fork(ctx context.Context, serviceMethod string, args interface
10801091
reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(clonedReply).Elem())
10811092
})
10821093
}
1083-
done <- (e == nil)
1094+
defer func() {
1095+
done <- (e == nil)
1096+
}()
10841097
if e != nil {
10851098
if uncoverError(e) {
10861099
c.removeClient(k, c.servicePath, serviceMethod, client)
@@ -1090,7 +1103,6 @@ func (c *xClient) Fork(ctx context.Context, serviceMethod string, args interface
10901103
}()
10911104
}
10921105

1093-
timeout := time.NewTimer(time.Minute)
10941106
check:
10951107
for {
10961108
select {
@@ -1102,13 +1114,14 @@ check:
11021114
if l == 0 { // all returns or some one returns an error
11031115
break check
11041116
}
1105-
1106-
case <-timeout.C:
1107-
err.Append(errors.New(("timeout")))
1108-
break check
11091117
}
11101118
}
1111-
timeout.Stop()
1119+
1120+
select {
1121+
case <-ctx.Done():
1122+
err.Append(errors.New(("timeout")))
1123+
default:
1124+
}
11121125

11131126
return err.ErrorOrNil()
11141127
}
@@ -1132,6 +1145,10 @@ func (c *xClient) Inform(ctx context.Context, serviceMethod string, args interfa
11321145
}
11331146

11341147
ctx = setServerTimeout(ctx)
1148+
1149+
// add timeout after set server timeout, only prevent client hanging
1150+
ctx, cancel := context.WithTimeout(ctx, time.Minute)
1151+
defer cancel()
11351152
callPlugins := make([]RPCClient, 0, len(c.servers))
11361153
clients := make(map[string]RPCClient)
11371154
c.mu.Lock()
@@ -1175,7 +1192,9 @@ func (c *xClient) Inform(ctx context.Context, serviceMethod string, args interfa
11751192
}
11761193

11771194
e := c.wrapCall(ctx, client, serviceMethod, args, clonedReply)
1178-
done <- (e == nil)
1195+
defer func() {
1196+
done <- (e == nil)
1197+
}()
11791198
if e != nil {
11801199
if uncoverError(e) {
11811200
c.removeClient(k, c.servicePath, serviceMethod, client)
@@ -1204,7 +1223,6 @@ func (c *xClient) Inform(ctx context.Context, serviceMethod string, args interfa
12041223
}()
12051224
}
12061225

1207-
timeout := time.NewTimer(time.Minute)
12081226
check:
12091227
for {
12101228
select {
@@ -1213,12 +1231,14 @@ check:
12131231
if l == 0 { // all returns or some one returns an error
12141232
break check
12151233
}
1216-
case <-timeout.C:
1217-
err.Append(errors.New(("timeout")))
1218-
break check
12191234
}
12201235
}
1221-
timeout.Stop()
1236+
1237+
select {
1238+
case <-ctx.Done():
1239+
err.Append(errors.New(("timeout")))
1240+
default:
1241+
}
12221242

12231243
return receipts, err.ErrorOrNil()
12241244
}

0 commit comments

Comments
ย (0)