Skip to content

Commit 3c7ea5c

Browse files
authored
fix(storage): ZB Reader redirect support (#12703)
* fix(storage): ZB Reader redirect support * do single retry for redirect * add mid-stream redirect for aborted * clean up test comments
1 parent 2346cf1 commit 3c7ea5c

File tree

2 files changed

+86
-24
lines changed

2 files changed

+86
-24
lines changed

storage/grpc_client.go

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1628,7 +1628,6 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange
16281628
req := &storagepb.BidiReadObjectRequest{
16291629
ReadObjectSpec: spec,
16301630
}
1631-
ctx = gax.InsertMetadataIntoOutgoingContext(ctx, contextMetadataFromBidiReadObject(req)...)
16321631

16331632
// Define a function that initiates a Read with offset and length, assuming
16341633
// we have already read seen bytes.
@@ -1660,28 +1659,53 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange
16601659
var decoder *readResponseDecoder
16611660

16621661
err = run(cc, func(ctx context.Context) error {
1663-
stream, err = c.raw.BidiReadObject(ctx, s.gax...)
1664-
if err != nil {
1665-
return err
1666-
}
1667-
if err := stream.Send(req); err != nil {
1668-
return err
1662+
var databufs mem.BufferSlice
1663+
openAndSendReq := func() error {
1664+
databufs = mem.BufferSlice{}
1665+
1666+
// Insert context metadata, including routing token if this is a retry
1667+
// for a redirect.
1668+
mdCtx := gax.InsertMetadataIntoOutgoingContext(ctx, contextMetadataFromBidiReadObject(req)...)
1669+
stream, err = c.raw.BidiReadObject(mdCtx, s.gax...)
1670+
if err != nil {
1671+
return err
1672+
}
1673+
if err := stream.Send(req); err != nil {
1674+
return err
1675+
}
1676+
// Oneshot reads can close the client->server side immediately.
1677+
if err := stream.CloseSend(); err != nil {
1678+
return err
1679+
}
1680+
1681+
// Receive the message into databuf as a wire-encoded message so we can
1682+
// use a custom decoder to avoid an extra copy at the protobuf layer.
1683+
return stream.RecvMsg(&databufs)
16691684
}
1670-
// Oneshot reads can close the client->server side immediately.
1671-
if err := stream.CloseSend(); err != nil {
1672-
return err
1685+
1686+
err := openAndSendReq()
1687+
1688+
// We might get a redirect error here for an out-of-region request.
1689+
// Add the routing token and read handle to the request and do one
1690+
// retry.
1691+
if st, ok := status.FromError(err); ok && st.Code() == codes.Aborted {
1692+
for _, d := range st.Details() {
1693+
if e, ok := d.(*storagepb.BidiReadObjectRedirectedError); ok {
1694+
req.ReadObjectSpec.ReadHandle = e.GetReadHandle()
1695+
req.ReadObjectSpec.RoutingToken = e.RoutingToken
1696+
err = openAndSendReq()
1697+
break
1698+
}
1699+
}
16731700
}
16741701

1675-
// Receive the message into databuf as a wire-encoded message so we can
1676-
// use a custom decoder to avoid an extra copy at the protobuf layer.
1677-
databufs := mem.BufferSlice{}
1678-
err := stream.RecvMsg(&databufs)
16791702
// These types of errors show up on the RecvMsg call, rather than the
16801703
// initialization of the stream via BidiReadObject above.
16811704
if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
1682-
return formatObjectErr(err)
1705+
err = formatObjectErr(err)
16831706
}
16841707
if err != nil {
1708+
databufs.Free()
16851709
return err
16861710
}
16871711
// Use a custom decoder that uses protobuf unmarshalling for all
@@ -2093,7 +2117,9 @@ func (r *gRPCReader) Close() error {
20932117
func (r *gRPCReader) recv() error {
20942118
databufs := mem.BufferSlice{}
20952119
err := r.stream.RecvMsg(&databufs)
2096-
if err != nil && r.settings.retry.runShouldRetry(err) {
2120+
// If we get a mid-stream error on a recv call, reopen the stream.
2121+
// ABORTED could indicate a redirect so should also trigger a reopen.
2122+
if err != nil && (r.settings.retry.runShouldRetry(err) || status.Code(err) == codes.Aborted) {
20972123
// This will "close" the existing stream and immediately attempt to
20982124
// reopen the stream, but will backoff if further attempts are necessary.
20992125
// Reopening the stream Recvs the first message, so if retrying is

storage/integration_test.go

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ import (
4646

4747
"cloud.google.com/go/auth"
4848
"cloud.google.com/go/auth/credentials"
49-
"cloud.google.com/go/compute/metadata"
5049
"cloud.google.com/go/httpreplay"
5150
"cloud.google.com/go/iam"
5251
"cloud.google.com/go/iam/apiv1/iampb"
@@ -6933,6 +6932,43 @@ func TestIntegration_OTelTracing(t *testing.T) {
69336932
})
69346933
}
69356934

6935+
// Simple integration test for a zonal bucket read via storage.Reader.
6936+
// Will test out-of-region redirect flow if it's run from outside of us-west4.
6937+
func TestIntegration_ZonalRead(t *testing.T) {
6938+
multiTransportTest(skipAllButBidi(context.Background(), "zonal bucket test"), t, func(t *testing.T, ctx context.Context, _ string, prefix string, client *Client) {
6939+
h := testHelper{t}
6940+
bucketName := prefix + uidSpace.New()
6941+
bkt := client.Bucket(bucketName)
6942+
6943+
h.mustCreateZonalBucket(bkt, testutil.ProjID())
6944+
defer h.mustDeleteBucket(bkt)
6945+
6946+
objName := "bidi-test-obj"
6947+
obj := bkt.Object(objName)
6948+
w := obj.NewWriter(ctx)
6949+
w.Append = true
6950+
h.mustWrite(w, randomBytes3MiB)
6951+
defer obj.Delete(ctx)
6952+
6953+
r, err := obj.NewReader(ctx)
6954+
if err != nil {
6955+
t.Fatalf("NewReader: %v", err)
6956+
}
6957+
6958+
b, err := io.ReadAll(r)
6959+
if err != nil {
6960+
t.Fatalf("reading: %v", err)
6961+
}
6962+
if !bytes.Equal(randomBytes3MiB, b) {
6963+
t.Errorf("download data does not match; got %v bytes, want %v bytes", len(b), len(randomBytes3MiB))
6964+
}
6965+
6966+
if err := r.Close(); err != nil {
6967+
t.Errorf("closing: %v", err)
6968+
}
6969+
}, experimental.WithGRPCBidiReads())
6970+
}
6971+
69366972
// openTelemetryTestExporter is a test utility exporter. It should be created
69376973
// with NewopenTelemetryTestExporter.
69386974
type openTelemetryTestExporter struct {
@@ -7207,13 +7243,13 @@ func (h testHelper) mustCreate(b *BucketHandle, projID string, attrs *BucketAttr
72077243
func (h testHelper) mustCreateZonalBucket(b *BucketHandle, projID string) {
72087244
h.t.Helper()
72097245

7210-
// Create a bucket in the same zone as the test VM.
7211-
zone, err := metadata.ZoneWithContext(context.Background())
7212-
if err != nil {
7213-
h.t.Fatalf("could not determine VM zone: %v", err)
7214-
}
7215-
region := strings.Join(strings.Split(zone, "-")[:2], "-")
7216-
h.mustCreate(b, testutil.ProjID(), &BucketAttrs{
7246+
// Create a zonal bucket in us-west4-a.
7247+
// Another zone/region can be subbed in if you want to run elsewhere because
7248+
// of quota reasons or to run in the same region as your VM.
7249+
zone := "us-west4-a"
7250+
region := "us-west4"
7251+
7252+
h.mustCreate(b, projID, &BucketAttrs{
72177253
Location: region,
72187254
CustomPlacementConfig: &CustomPlacementConfig{
72197255
DataLocations: []string{zone},

0 commit comments

Comments
 (0)