Skip to content

Commit b05bb74

Browse files
authored
Merge pull request yutopp#49 from diallo-han/master
Fix bug writing same timestamp with different messageid.
2 parents ece2167 + 3e0521f commit b05bb74

File tree

3 files changed

+108
-8
lines changed

3 files changed

+108
-8
lines changed

chunk_stream_writer.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@ import (
1515
type ChunkStreamWriter struct {
1616
ChunkStreamReader
1717

18-
doneCh chan struct{}
19-
closeCh chan struct{}
20-
lastErr error
21-
aqM sync.Mutex
18+
doneCh chan struct{}
19+
closeCh chan struct{}
20+
lastErr error
21+
aqM sync.Mutex
22+
newChunk bool
2223
}
2324

2425
func (w *ChunkStreamWriter) Write(b []byte) (int, error) {

chunk_streamer.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ func (cs *ChunkStreamer) NewChunkWriter(ctx context.Context, chunkStreamID int)
168168
}
169169

170170
func (cs *ChunkStreamer) Sched(writer *ChunkStreamWriter) error {
171+
writer.newChunk = true
171172
return cs.writerSched.Sched(writer)
172173
}
173174

@@ -305,13 +306,14 @@ func (cs *ChunkStreamer) writeChunk(writer *ChunkStreamWriter) (bool, error) {
305306

306307
func (cs *ChunkStreamer) updateWriterHeader(writer *ChunkStreamWriter) {
307308
fmt := byte(2) // default: only timestamp delta
308-
if writer.messageHeader.messageLength != writer.messageLength || writer.messageTypeID != writer.messageHeader.messageTypeID {
309+
if writer.messageHeader.messageLength != writer.messageLength ||
310+
writer.messageTypeID != writer.messageHeader.messageTypeID {
309311
// header or type id is updated, change fmt to 1 to notify difference and update state
310312
writer.messageHeader.messageLength = writer.messageLength
311313
writer.messageHeader.messageTypeID = writer.messageTypeID
312314
fmt = 1
313315
}
314-
if writer.timestamp != writer.messageHeader.timestamp {
316+
if writer.timestamp != writer.messageHeader.timestamp || writer.newChunk {
315317
if writer.timestamp >= writer.messageHeader.timestamp {
316318
writer.timestampDelta = writer.timestamp - writer.messageHeader.timestamp
317319
} else {
@@ -320,6 +322,7 @@ func (cs *ChunkStreamer) updateWriterHeader(writer *ChunkStreamWriter) {
320322
writer.timestampDelta = 0
321323
}
322324
}
325+
writer.newChunk = false
323326
if writer.timestampDelta == writer.messageHeader.timestampDelta && fmt == 2 {
324327
fmt = 3
325328
}
@@ -409,8 +412,9 @@ func (cs *ChunkStreamer) prepareChunkWriter(chunkStreamID int) (*ChunkStreamWrit
409412
timestamp: math.MaxUint32, // initial state will be updated by writer.timestamp
410413
},
411414
},
412-
doneCh: make(chan struct{}),
413-
closeCh: make(chan struct{}),
415+
doneCh: make(chan struct{}),
416+
closeCh: make(chan struct{}),
417+
newChunk: true,
414418
}
415419
close(writer.doneCh)
416420
cs.writers[chunkStreamID] = writer

chunk_streamer_test.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,101 @@ func TestStreamerChunkExample1(t *testing.T) {
249249
}
250250
}
251251

252+
func TestStreamerChunkExample2(t *testing.T) {
253+
type write struct {
254+
timestamp uint32
255+
length int
256+
messageTypeId byte
257+
}
258+
259+
type read struct {
260+
timestamp uint32
261+
delta uint32
262+
fmt byte
263+
isComplete bool
264+
}
265+
266+
type testCase struct {
267+
name string
268+
chunkStreamID int
269+
messageStreamID uint32
270+
writeCases []write
271+
readCases []read
272+
}
273+
274+
tcs := []testCase{
275+
// Same timestamp
276+
{
277+
name: "Same timestamp's delta #1",
278+
chunkStreamID: 5,
279+
messageStreamID: 22346,
280+
writeCases: []write{
281+
{timestamp: 1000, length: 200, messageTypeId: 10},
282+
{timestamp: 1001, length: 200, messageTypeId: 11},
283+
{timestamp: 2000, length: 200, messageTypeId: 10},
284+
{timestamp: 2000, length: 200, messageTypeId: 11},
285+
},
286+
readCases: []read{
287+
{timestamp: 1000, delta: 0, fmt: 0, isComplete: false},
288+
{timestamp: 1000, delta: 0, fmt: 3, isComplete: true},
289+
290+
{timestamp: 1000, delta: 1, fmt: 1, isComplete: false},
291+
{timestamp: 1001, delta: 0, fmt: 3, isComplete: true},
292+
293+
{timestamp: 1001, delta: 999, fmt: 1, isComplete: false},
294+
{timestamp: 2000, delta: 0, fmt: 3, isComplete: true},
295+
296+
{timestamp: 2000, delta: 0, fmt: 1, isComplete: false},
297+
{timestamp: 2000, delta: 0, fmt: 3, isComplete: true},
298+
},
299+
},
300+
}
301+
302+
for _, tc := range tcs {
303+
t.Run(tc.name, func(t *testing.T) {
304+
buf := bytes.NewBuffer(make([]byte, 0, 2048))
305+
inbuf := bufio.NewReaderSize(buf, 2048)
306+
outbuf := bufio.NewWriterSize(buf, 2048)
307+
308+
streamer := NewChunkStreamer(inbuf, outbuf, nil)
309+
310+
for i, wc := range tc.writeCases {
311+
t.Run(fmt.Sprintf("Write: %d", i), func(t *testing.T) {
312+
w, err := streamer.NewChunkWriter(context.Background(), tc.chunkStreamID)
313+
assert.Nil(t, err)
314+
assert.NotNil(t, w)
315+
316+
bin := make([]byte, wc.length)
317+
318+
w.messageLength = uint32(len(bin))
319+
w.messageTypeID = byte(wc.messageTypeId)
320+
w.messageStreamID = tc.messageStreamID
321+
w.timestamp = wc.timestamp
322+
w.buf.Write(bin)
323+
err = streamer.Sched(w)
324+
assert.Nil(t, err)
325+
})
326+
}
327+
328+
_, err := streamer.NewChunkWriter(context.Background(), tc.chunkStreamID) // wait for writing
329+
assert.Nil(t, err)
330+
331+
for i, rc := range tc.readCases {
332+
t.Run(fmt.Sprintf("Read: %d", i), func(t *testing.T) {
333+
r, err := streamer.readChunk()
334+
_ = rc
335+
_ = err
336+
assert.Nil(t, err)
337+
assert.NotNil(t, r)
338+
assert.Equal(t, rc.fmt, r.basicHeader.fmt)
339+
assert.Equal(t, uint32(rc.delta), r.messageHeader.timestampDelta)
340+
assert.Equal(t, rc.isComplete, r.completed)
341+
})
342+
}
343+
})
344+
}
345+
}
346+
252347
func TestWriteToInvalidWriter(t *testing.T) {
253348
buf := bytes.NewBuffer(make([]byte, 0, 2048))
254349
inbuf := bufio.NewReaderSize(buf, 2048)

0 commit comments

Comments
 (0)