Skip to content

Commit 57d71e8

Browse files
committed
stream: added is_sync flag for begin/commit
Part of #TNTP-3334 Closes #366
1 parent 4de68c4 commit 57d71e8

File tree

5 files changed

+220
-2
lines changed

5 files changed

+220
-2
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1414
- Implemented box.session.su request and sugar interface only for current session granting (#426).
1515
- Defined `ErrConcurrentSchemaUpdate` constant for "concurrent schema update" error.
1616
Now you can check this error with `errors.Is(err, tarantool.ErrConcurrentSchemaUpdate)`.
17+
- Implemented support for `IPROTO_IS_SYNC` flag in stream transactions,
18+
added `IsSync(bool)` method for `BeginRequest`/`CommitRequest` (#447).
1719

1820
### Changed
1921

example_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -998,6 +998,90 @@ func ExampleBeginRequest_TxnIsolation() {
998998
fmt.Printf("Select after Rollback: response is %#v\n", data)
999999
}
10001000

1001+
func ExampleBeginRequest_IsSync() {
1002+
conn := exampleConnect(dialer, opts)
1003+
defer conn.Close()
1004+
1005+
// Tarantool supports IS_SYNC flag for BeginRequest since version 3.1.0.
1006+
isLess, err := test_helpers.IsTarantoolVersionLess(3, 1, 0)
1007+
if err != nil || isLess {
1008+
return
1009+
}
1010+
1011+
stream, err := conn.NewStream()
1012+
if err != nil {
1013+
fmt.Printf("error getting the stream: %s\n", err)
1014+
return
1015+
}
1016+
1017+
// Begin transaction with synchronous mode.
1018+
req := tarantool.NewBeginRequest().IsSync(true)
1019+
resp, err := stream.Do(req).GetResponse()
1020+
switch {
1021+
case err != nil:
1022+
fmt.Printf("error getting the response: %s\n", err)
1023+
case resp.Header().Error != tarantool.ErrorNo:
1024+
fmt.Printf("response error code: %s\n", resp.Header().Error)
1025+
default:
1026+
fmt.Println("Success.")
1027+
}
1028+
}
1029+
1030+
func ExampleCommitRequest_IsSync() {
1031+
conn := exampleConnect(dialer, opts)
1032+
defer conn.Close()
1033+
1034+
// Tarantool supports IS_SYNC flag for CommitRequest since version 3.1.0.
1035+
isLess, err := test_helpers.IsTarantoolVersionLess(3, 1, 0)
1036+
if err != nil || isLess {
1037+
return
1038+
}
1039+
1040+
var req tarantool.Request
1041+
1042+
stream, err := conn.NewStream()
1043+
if err != nil {
1044+
fmt.Printf("error getting the stream: %s\n", err)
1045+
return
1046+
}
1047+
1048+
// Begin transaction.
1049+
req = tarantool.NewBeginRequest()
1050+
resp, err := stream.Do(req).GetResponse()
1051+
switch {
1052+
case err != nil:
1053+
fmt.Printf("error getting the response: %s\n", err)
1054+
return
1055+
case resp.Header().Error != tarantool.ErrorNo:
1056+
fmt.Printf("response error code: %s\n", resp.Header().Error)
1057+
return
1058+
}
1059+
1060+
// Insert in stream.
1061+
req = tarantool.NewReplaceRequest("test").Tuple([]interface{}{1, "test"})
1062+
resp, err = stream.Do(req).GetResponse()
1063+
switch {
1064+
case err != nil:
1065+
fmt.Printf("error getting the response: %s\n", err)
1066+
return
1067+
case resp.Header().Error != tarantool.ErrorNo:
1068+
fmt.Printf("response error code: %s\n", resp.Header().Error)
1069+
return
1070+
}
1071+
1072+
// Commit transaction in sync mode.
1073+
req = tarantool.NewCommitRequest().IsSync(true)
1074+
resp, err = stream.Do(req).GetResponse()
1075+
switch {
1076+
case err != nil:
1077+
fmt.Printf("error getting the response: %s\n", err)
1078+
case resp.Header().Error != tarantool.ErrorNo:
1079+
fmt.Printf("response error code: %s\n", resp.Header().Error)
1080+
default:
1081+
fmt.Println("Success.")
1082+
}
1083+
}
1084+
10011085
func ExampleErrorNo() {
10021086
conn := exampleConnect(dialer, opts)
10031087
defer conn.Close()

stream.go

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ type BeginRequest struct {
4242
baseRequest
4343
txnIsolation TxnIsolationLevel
4444
timeout time.Duration
45+
isSync bool
46+
isSyncSet bool
4547
}
4648

4749
// NewBeginRequest returns a new BeginRequest.
@@ -59,12 +61,19 @@ func (req *BeginRequest) TxnIsolation(txnIsolation TxnIsolationLevel) *BeginRequ
5961
return req
6062
}
6163

62-
// WithTimeout allows to set up a timeout for call BeginRequest.
64+
// Timeout allows to set up a timeout for call BeginRequest.
6365
func (req *BeginRequest) Timeout(timeout time.Duration) *BeginRequest {
6466
req.timeout = timeout
6567
return req
6668
}
6769

70+
// IsSync allows to set up a IsSync flag for call BeginRequest.
71+
func (req *BeginRequest) IsSync(isSync bool) *BeginRequest {
72+
req.isSync = isSync
73+
req.isSyncSet = true
74+
return req
75+
}
76+
6877
// Body fills an msgpack.Encoder with the begin request body.
6978
func (req *BeginRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error {
7079
var (
@@ -81,6 +90,10 @@ func (req *BeginRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error {
8190
mapLen++
8291
}
8392

93+
if req.isSyncSet {
94+
mapLen++
95+
}
96+
8497
if err := enc.EncodeMapLen(mapLen); err != nil {
8598
return err
8699
}
@@ -105,6 +118,16 @@ func (req *BeginRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error {
105118
}
106119
}
107120

121+
if req.isSyncSet {
122+
if err := enc.EncodeUint(uint64(iproto.IPROTO_IS_SYNC)); err != nil {
123+
return err
124+
}
125+
126+
if err := enc.EncodeBool(req.isSync); err != nil {
127+
return err
128+
}
129+
}
130+
108131
return nil
109132
}
110133

@@ -124,6 +147,9 @@ func (req *BeginRequest) Context(ctx context.Context) *BeginRequest {
124147
// Commit request can not be processed out of stream.
125148
type CommitRequest struct {
126149
baseRequest
150+
151+
isSync bool
152+
isSyncSet bool
127153
}
128154

129155
// NewCommitRequest returns a new CommitRequest.
@@ -133,9 +159,38 @@ func NewCommitRequest() *CommitRequest {
133159
return req
134160
}
135161

162+
// IsSync allows to set up a IsSync flag for call BeginRequest.
163+
func (req *CommitRequest) IsSync(isSync bool) *CommitRequest {
164+
req.isSync = isSync
165+
req.isSyncSet = true
166+
return req
167+
}
168+
136169
// Body fills an msgpack.Encoder with the commit request body.
137170
func (req *CommitRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error {
138-
return enc.EncodeMapLen(0)
171+
var (
172+
mapLen = 0
173+
)
174+
175+
if req.isSyncSet {
176+
mapLen++
177+
}
178+
179+
if err := enc.EncodeMapLen(mapLen); err != nil {
180+
return err
181+
}
182+
183+
if req.isSyncSet {
184+
if err := enc.EncodeUint(uint64(iproto.IPROTO_IS_SYNC)); err != nil {
185+
return err
186+
}
187+
188+
if err := enc.EncodeBool(req.isSync); err != nil {
189+
return err
190+
}
191+
}
192+
193+
return nil
139194
}
140195

141196
// Context sets a passed context to the request.

tarantool_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4199,6 +4199,75 @@ func TestFdDialer(t *testing.T) {
41994199
require.Equal(t, int8(0), resp[0])
42004200
}
42014201

4202+
const (
4203+
errNoSyncTransactionQueue = "The synchronous transaction queue doesn't belong to any instance"
4204+
)
4205+
4206+
func TestDoBeginRequest_IsSync(t *testing.T) {
4207+
test_helpers.SkipIfIsSyncUnsupported(t)
4208+
4209+
conn := test_helpers.ConnectWithValidation(t, dialer, opts)
4210+
defer conn.Close()
4211+
4212+
stream, err := conn.NewStream()
4213+
require.NoError(t, err)
4214+
4215+
_, err = stream.Do(NewBeginRequest().IsSync(true)).Get()
4216+
assert.Nil(t, err)
4217+
4218+
_, err = stream.Do(
4219+
NewReplaceRequest("test").Tuple([]interface{}{1, "foo"}),
4220+
).Get()
4221+
require.Nil(t, err)
4222+
4223+
_, err = stream.Do(NewCommitRequest()).Get()
4224+
require.NotNil(t, err)
4225+
assert.Contains(t, err.Error(), errNoSyncTransactionQueue)
4226+
}
4227+
4228+
func TestDoCommitRequest_IsSync(t *testing.T) {
4229+
test_helpers.SkipIfIsSyncUnsupported(t)
4230+
4231+
conn := test_helpers.ConnectWithValidation(t, dialer, opts)
4232+
defer conn.Close()
4233+
4234+
stream, err := conn.NewStream()
4235+
require.NoError(t, err)
4236+
4237+
_, err = stream.Do(NewBeginRequest()).Get()
4238+
require.Nil(t, err)
4239+
4240+
_, err = stream.Do(
4241+
NewReplaceRequest("test").Tuple([]interface{}{1, "foo"}),
4242+
).Get()
4243+
require.Nil(t, err)
4244+
4245+
_, err = stream.Do(NewCommitRequest().IsSync(true)).Get()
4246+
require.NotNil(t, err)
4247+
assert.Contains(t, err.Error(), errNoSyncTransactionQueue)
4248+
}
4249+
4250+
func TestDoCommitRequest_NoSync(t *testing.T) {
4251+
test_helpers.SkipIfIsSyncUnsupported(t)
4252+
4253+
conn := test_helpers.ConnectWithValidation(t, dialer, opts)
4254+
defer conn.Close()
4255+
4256+
stream, err := conn.NewStream()
4257+
require.NoError(t, err)
4258+
4259+
_, err = stream.Do(NewBeginRequest()).Get()
4260+
require.Nil(t, err)
4261+
4262+
_, err = stream.Do(
4263+
NewReplaceRequest("test").Tuple([]interface{}{1, "foo"}),
4264+
).Get()
4265+
require.Nil(t, err)
4266+
4267+
_, err = stream.Do(NewCommitRequest()).Get()
4268+
assert.Nil(t, err)
4269+
}
4270+
42024271
// runTestMain is a body of TestMain function
42034272
// (see https://pkg.go.dev/testing#hdr-Main).
42044273
// Using defer + os.Exit is not works so TestMain body

test_helpers/utils.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,14 @@ func SkipIfCrudSpliceBroken(t *testing.T) {
217217
SkipIfFeatureUnsupported(t, "crud update splice", 2, 0, 0)
218218
}
219219

220+
// SkipIfIsSyncUnsupported skips test run if Tarantool without
221+
// IS_SYNC support is used.
222+
func SkipIfIsSyncUnsupported(t *testing.T) {
223+
t.Helper()
224+
225+
SkipIfFeatureUnsupported(t, "is sync", 3, 1, 0)
226+
}
227+
220228
// IsTcsSupported checks if Tarantool supports centralized storage.
221229
// Tarantool supports centralized storage with Enterprise since 3.3.0 version.
222230
func IsTcsSupported() (bool, error) {

0 commit comments

Comments
 (0)