Skip to content

Commit b19e3da

Browse files
committed
refactor: simplify body buffer to plain alloc
1 parent c0f5cf3 commit b19e3da

14 files changed

+226
-682
lines changed

collector/body.go

Lines changed: 22 additions & 279 deletions
Large diffs are not rendered by default.

collector/body_buffer_pool_test.go

Lines changed: 0 additions & 176 deletions
This file was deleted.

collector/body_test.go

Lines changed: 2 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"io"
55
"strings"
66
"testing"
7-
"time"
87

98
"github.com/stretchr/testify/assert"
109
"github.com/stretchr/testify/require"
@@ -14,15 +13,12 @@ import (
1413

1514
// Fix for TestBody_PartialRead
1615
func TestBody_PartialRead(t *testing.T) {
17-
// Create a small pool for testing
18-
pool := collector.NewBodyBufferPool(10*1024, 5*1024) // 10KB pool, 5KB max per body
19-
2016
// Create test data
2117
testData := "This is test data for partial reading"
2218
testReader := io.NopCloser(strings.NewReader(testData))
2319

2420
// Create a Body
25-
body := collector.NewBody(testReader, pool)
21+
body := collector.NewBody(testReader, 100)
2622

2723
// Read first 10 bytes
2824
buf := make([]byte, 10)
@@ -50,15 +46,12 @@ func TestBody_PartialRead(t *testing.T) {
5046

5147
// Fix for TestBody_ReadAfterClose
5248
func TestBody_ReadAfterClose(t *testing.T) {
53-
// Create a pool
54-
pool := collector.NewBodyBufferPool(10*1024, 5*1024)
55-
5649
// Create test data
5750
testData := "This is test data"
5851
testReader := io.NopCloser(strings.NewReader(testData))
5952

6053
// Create a Body
61-
body := collector.NewBody(testReader, pool)
54+
body := collector.NewBody(testReader, 100)
6255

6356
// Close the body
6457
err := body.Close()
@@ -72,129 +65,3 @@ func TestBody_ReadAfterClose(t *testing.T) {
7265
assert.Error(t, err)
7366
assert.Equal(t, collector.ErrBodyClosed, err)
7467
}
75-
76-
// Fix for TestBodyBufferPool_EnsureCapacity
77-
func TestBodyBufferPool_EnsureCapacity(t *testing.T) {
78-
// Create a small pool for testing
79-
maxPoolSize := int64(1000) // 1000 bytes
80-
maxBodySize := int64(200) // 200 bytes per body
81-
pool := collector.NewBodyBufferPool(maxPoolSize, maxBodySize)
82-
83-
// Create buffers that will fit in the pool
84-
buffers := make([]*collector.BodyBuffer, 0)
85-
86-
// Add 5 buffers of 100 bytes each (total 500 bytes)
87-
for i := 0; i < 5; i++ {
88-
// Get a buffer from the pool
89-
buffer := pool.GetBuffer()
90-
91-
// Fill with 100 bytes of data
92-
data := strings.Repeat("a", 100)
93-
buffer.WriteString(data)
94-
95-
buffers = append(buffers, buffer)
96-
97-
// Sleep to ensure different timestamps
98-
time.Sleep(10 * time.Millisecond)
99-
}
100-
101-
// Verify all buffers have data
102-
for i, buffer := range buffers {
103-
assert.Equal(t, 100, buffer.Len(), "Buffer %d should have data", i)
104-
}
105-
106-
// Add one more large buffer that will trigger garbage collection
107-
buffer := pool.GetBuffer()
108-
buffer.WriteString(strings.Repeat("b", 600)) // 600 bytes
109-
110-
// The newest buffer should have data
111-
assert.Equal(t, 600, buffer.Len())
112-
113-
// Verify the combined data is still under the pool max size
114-
// This is difficult to test directly without exposing pool internals
115-
// But we can verify the latest buffer is intact
116-
assert.Equal(t, strings.Repeat("b", 600), buffer.String())
117-
}
118-
119-
func TestBody_IOReadAllWithoutClose(t *testing.T) {
120-
pool := collector.NewBodyBufferPool(100, 50)
121-
122-
testData := strings.Repeat("X", 30) // 30 bytes
123-
testReader := io.NopCloser(strings.NewReader(testData))
124-
body := collector.NewBody(testReader, pool)
125-
126-
// Some clients do io.ReadAll without Close()
127-
data, err := io.ReadAll(body)
128-
require.NoError(t, err)
129-
assert.Equal(t, testData, string(data))
130-
131-
// Ensure pool tracks size even without Close()
132-
currentSize := pool.GetCurrentSize()
133-
assert.Equal(t, int64(30), currentSize, "Pool should track buffer size without Close()")
134-
}
135-
136-
func TestBody_PoolCapacityEnforcement_Multiple_ReadAll(t *testing.T) {
137-
pool := collector.NewBodyBufferPool(100, 50) // Small pool to force cleanup
138-
139-
var bodies []*collector.Body
140-
141-
// Create bodies that exceed pool capacity
142-
for i := 0; i < 5; i++ {
143-
testData := strings.Repeat("A", 40) // 40 bytes each
144-
testReader := io.NopCloser(strings.NewReader(testData))
145-
body := collector.NewBody(testReader, pool)
146-
147-
// Consume all data, do not close
148-
_, err := io.ReadAll(body)
149-
require.NoError(t, err)
150-
151-
bodies = append(bodies, body)
152-
}
153-
154-
poolBytesLen := 0
155-
bodySizes := 0
156-
for _, body := range bodies {
157-
poolBytesLen += len(body.Bytes())
158-
bodySizes += int(body.Size())
159-
}
160-
161-
// Ensure bodies are cleaned up
162-
assert.LessOrEqual(t, poolBytesLen, 100, "Some bodies should be cleaned up when pool exceeds capacity")
163-
assert.LessOrEqual(t, bodySizes, 100, "Total body sizes should not exceed pool capacity")
164-
}
165-
166-
func TestBody_PoolCapacityEnforcement_Multiple_Read_Close(t *testing.T) {
167-
pool := collector.NewBodyBufferPool(100, 50) // Small pool to force cleanup
168-
169-
var bodies []*collector.Body
170-
171-
// Create bodies that exceed pool capacity
172-
buf := make([]byte, 30)
173-
for i := 0; i < 5; i++ {
174-
testData := strings.Repeat("A", 40) // 40 bytes each
175-
testReader := io.NopCloser(strings.NewReader(testData))
176-
body := collector.NewBody(testReader, pool)
177-
178-
// Only read a part of the body
179-
n, err := body.Read(buf)
180-
require.NoError(t, err)
181-
assert.Equal(t, 30, n)
182-
183-
// Then close the body
184-
err = body.Close()
185-
require.NoError(t, err)
186-
187-
bodies = append(bodies, body)
188-
}
189-
190-
poolBytesLen := 0
191-
bodySizes := 0
192-
for _, body := range bodies {
193-
poolBytesLen += len(body.Bytes())
194-
bodySizes += int(body.Size())
195-
}
196-
197-
// Ensure bodies are cleaned up
198-
assert.LessOrEqual(t, poolBytesLen, 100, "Some bodies should be cleaned up when pool exceeds capacity")
199-
assert.LessOrEqual(t, bodySizes, 100, "Total body sizes should not exceed pool capacity")
200-
}

collector/db_query_collector.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,5 @@ func NewDBQueryCollectorWithOptions(capacity uint64, options DBQueryOptions) *DB
7777
// Close releases resources used by the collector
7878
func (c *DBQueryCollector) Close() {
7979
c.notifier.Close()
80+
c.buffer = nil
8081
}

collector/event_collector.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,14 @@ func NewEventCollectorWithOptions(capacity uint64, options EventOptions) *EventC
4040
}
4141

4242
buffer := NewLookupRingBuffer[*Event, uuid.UUID](capacity)
43-
buffer.OnFree = func(record *Event) {
44-
record.free()
45-
}
4643

47-
return &EventCollector{
44+
c := &EventCollector{
4845
buffer: buffer,
4946
openGroups: make(map[uuid.UUID]*Event),
5047
notifier: NewNotifierWithOptions[Event](notifierOptions),
5148
}
49+
50+
return c
5251
}
5352

5453
func groupIDFromContext(ctx context.Context) (uuid.UUID, bool) {
@@ -173,6 +172,7 @@ func (c *EventCollector) Subscribe(ctx context.Context) <-chan Event {
173172
// Close releases resources used by the collector
174173
func (c *EventCollector) Close() {
175174
c.notifier.Close()
175+
c.buffer = nil
176176
}
177177

178178
type Event struct {

0 commit comments

Comments
 (0)