Skip to content

Commit c52c1a5

Browse files
author
jose.castillo
committed
fix: possible memory leaks
1 parent 1036217 commit c52c1a5

File tree

3 files changed

+142
-30
lines changed

3 files changed

+142
-30
lines changed

cache.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,6 @@ func (m *mutexCache) Add(key string, value transport.ByteView) {
9191
},
9292
}
9393
}
94-
if oldValue, ok := m.lru.Get(key); ok {
95-
oldByteView := oldValue.(transport.ByteView)
96-
m.bytes -= int64(len(key)) + int64(oldByteView.Len())
97-
}
9894
m.lru.Add(key, value, value.Expire())
9995
m.bytes += int64(len(key)) + int64(value.Len())
10096
m.removeOldest()

group.go

Lines changed: 135 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -170,53 +170,111 @@ func (g *group) Set(ctx context.Context, key string, value []byte, expire time.T
170170
return errors.New("empty Set() key not allowed")
171171
}
172172

173+
// Si no hay tamaño de caché, no hacemos nada
173174
if g.maxCacheBytes <= 0 {
174175
return nil
175176
}
176177

178+
// Usamos singleflight para asegurar que solo una operación Set para la misma clave
179+
// esté activa a la vez en este nodo (para la lógica de coordinación y actualización local/remota).
177180
_, err := g.setGroup.Do(key, func() (interface{}, error) {
178-
// If remote peer owns this key
181+
182+
// *** INICIO: Crear una copia del valor ***
183+
// Se crea una copia explícita del slice de bytes.
184+
// Esto es crucial para evitar que las goroutines que actualizan los peers
185+
// retengan referencias al slice 'value' original o a versiones antiguas,
186+
// permitiendo que el GC libere la memoria de ciclos anteriores.
187+
valueCopy := make([]byte, len(value))
188+
copy(valueCopy, value)
189+
// *** FIN: Crear una copia del valor ***
190+
191+
// Determinar el nodo "dueño" de la clave según el hash consistente.
179192
owner, isRemote := g.instance.PickPeer(key)
193+
194+
// Si el dueño es un nodo remoto...
180195
if isRemote {
181-
// Set the key/value on the remote peer
182-
if err := g.setPeer(ctx, owner, key, value, expire); err != nil {
196+
// ...enviar la operación Set al dueño (usando la copia).
197+
// Esta llamada es síncrona dentro de la función de singleflight.Do.
198+
if err := g.setPeer(ctx, owner, key, valueCopy, expire); err != nil { // <-- Usa valueCopy
199+
// Si falla la comunicación con el dueño, retornamos error.
200+
// La política aquí podría variar (¿continuar actualizando otros peers?).
201+
// Actualmente, si falla con el dueño, la operación Set completa falla.
183202
return nil, err
184203
}
185204
}
186-
// Update the local caches
187-
bv := transport.ByteViewWithExpire(value, expire)
205+
206+
// Actualizar las cachés locales (mainCache y hotCache) de este nodo.
207+
// Se usa la copia del valor para crear la ByteView.
208+
bv := transport.ByteViewWithExpire(valueCopy, expire) // <-- Usa valueCopy
209+
210+
// Usamos el lock del loadGroup (singleflight de Get) para sincronizar
211+
// el acceso a las cachés locales (mainCache y hotCache).
212+
// Esto previene condiciones de carrera si un Get local ocurre
213+
// exactamente al mismo tiempo que esta actualización.
188214
g.loadGroup.Lock(func() {
189-
g.mainCache.Add(key, bv)
190-
g.hotCache.Remove(key)
215+
g.mainCache.Add(key, bv) // Añade/sobrescribe en mainCache.
216+
g.hotCache.Remove(key) // Elimina de hotCache (si existía).
191217
})
192218

193-
// Update all peers in the cluster
219+
// Actualizar todos los demás peers en el clúster (excepto este nodo y el dueño).
194220
var wg sync.WaitGroup
195221
for _, p := range g.instance.getAllPeers() {
222+
// Saltar la actualización a sí mismo.
196223
if p.PeerInfo().IsSelf {
197-
continue // Skip self
224+
continue
198225
}
199226

200-
// Do not update the owner again, we already updated them
227+
// Saltar la actualización al dueño (ya se hizo si era remoto,
228+
// y si era local, la actualización local ya ocurrió).
201229
if p.HashKey() == owner.HashKey() {
202230
continue
203231
}
204232

233+
// Incrementar el contador del WaitGroup para esta goroutine.
205234
wg.Add(1)
235+
// Lanzar una goroutine para actualizar a este peer de forma asíncrona.
206236
go func(p peer.Client) {
207-
if err := g.setPeer(ctx, p, key, value, expire); err != nil {
237+
// *** INICIO: Asegurar wg.Done() y Recuperar Pánico ***
238+
// defer wg.Done() es crucial para asegurar que Wait() no se bloquee
239+
// indefinidamente si g.setPeer panica.
240+
defer wg.Done()
241+
242+
// Opcional pero recomendado: Recuperar pánicos dentro de la goroutine
243+
// para loguearlos y evitar que el programa entero caiga.
244+
defer func() {
245+
if r := recover(); r != nil {
246+
g.instance.opts.Logger.Error("PANIC during setPeer",
247+
"peer", p.PeerInfo().Address,
248+
"key", key,
249+
"panic", fmt.Sprintf("%v", r),
250+
// Considerar loguear stack trace: "stack", string(debug.Stack()),
251+
)
252+
}
253+
}()
254+
// *** FIN: Asegurar wg.Done() y Recuperar Pánico ***
255+
256+
// Llamar a setPeer para enviar la operación Set al peer (usando la copia).
257+
// La goroutine captura 'valueCopy', que es la copia específica de esta ejecución de Set.
258+
if err := g.setPeer(ctx, p, key, valueCopy, expire); err != nil { // <-- Usa valueCopy
259+
// Loguear errores de comunicación con el peer, pero no hacer fallar
260+
// la operación Set principal (es un esfuerzo "best-effort").
208261
g.instance.opts.Logger.Error("while calling Set on peer",
209262
"peer", p.PeerInfo().Address,
210263
"key", key,
211264
"err", err)
212265
}
213-
wg.Done()
214-
}(p)
266+
}(p) // Pasar el peer 'p' como argumento a la goroutine
215267
}
268+
// Esperar a que todas las goroutines de actualización de peers terminen.
269+
// Esto asegura que la llamada a group.Set no retorne hasta que se haya
270+
// intentado actualizar a todos los peers.
216271
wg.Wait()
217272

273+
// La función de singleflight.Do retorna nil en caso de éxito.
218274
return nil, nil
219-
})
275+
}) // Fin de singleflight.Do
276+
277+
// Retornar el error de singleflight.Do (si lo hubo).
220278
return err
221279
}
222280

@@ -235,38 +293,93 @@ func (g *group) Remove(ctx context.Context, key string) error {
235293
owner, isRemote := g.instance.PickPeer(key)
236294
if isRemote {
237295
if err := g.removeFromPeer(ctx, owner, key); err != nil {
296+
// Si falla la eliminación en el dueño, retornamos error.
297+
// Podría considerarse continuar con los otros peers, pero
298+
// actualmente la operación Remove completa falla.
238299
return nil, err
239300
}
240301
}
241302
// Remove from our cache next
242-
g.LocalRemove(key)
303+
g.LocalRemove(key) // Elimina de mainCache y hotCache locales
304+
305+
// --- INICIO MODIFICACIÓN ---
243306
wg := sync.WaitGroup{}
244-
errCh := make(chan error)
307+
// Usamos un buffered channel para evitar bloqueos si hay muchos errores rápidos
308+
// y el lector (más abajo) no es lo suficientemente rápido. El tamaño puede ajustarse.
309+
numPeersToNotify := 0
310+
for _, p := range g.instance.getAllPeers() {
311+
if p != owner { // Contar cuántos peers necesitan notificación
312+
numPeersToNotify++
313+
}
314+
}
315+
// Crear canal con buffer suficiente para todos los posibles errores + 1 (por si acaso)
316+
errCh := make(chan error, numPeersToNotify+1)
245317

246318
// Asynchronously clear the key from all hot and main caches of peers
247319
for _, p := range g.instance.getAllPeers() {
248320
// avoid deleting from owner a second time
249321
if p == owner {
250322
continue
251323
}
324+
// Saltar a sí mismo (LocalRemove ya lo hizo)
325+
if p.PeerInfo().IsSelf {
326+
continue
327+
}
252328

253329
wg.Add(1)
254330
go func(p peer.Client) {
255-
errCh <- g.removeFromPeer(ctx, p, key)
256-
wg.Done()
257-
}(p)
331+
// *** AÑADIR defer wg.Done() ***
332+
// Asegura que wg.Done() se llame incluso si removeFromPeer panica.
333+
defer wg.Done()
334+
335+
// *** Opcional pero recomendado: Añadir recover() ***
336+
defer func() {
337+
if r := recover(); r != nil {
338+
// Loguear el pánico
339+
g.instance.opts.Logger.Error("PANIC during removeFromPeer",
340+
"peer", p.PeerInfo().Address,
341+
"key", key,
342+
"panic", fmt.Sprintf("%v", r),
343+
// Considerar loguear stack trace: "stack", string(debug.Stack()),
344+
)
345+
// Opcionalmente, enviar un error específico al canal
346+
// errCh <- fmt.Errorf("panic during removeFromPeer for peer %s: %v", p.PeerInfo().Address, r)
347+
}
348+
}()
349+
350+
// Llamar a removeFromPeer y enviar el resultado (error o nil) al canal
351+
err := g.removeFromPeer(ctx, p, key)
352+
if err != nil {
353+
// Loguear el error específico de este peer
354+
g.instance.opts.Logger.Error("while calling Remove on peer",
355+
"peer", p.PeerInfo().Address,
356+
"key", key,
357+
"err", err)
358+
}
359+
// Enviar el error (puede ser nil) al canal para agregación
360+
errCh <- err
361+
362+
// wg.Done() // Ya no es necesario aquí explícitamente
363+
}(p) // Pasar el peer 'p' como argumento
258364
}
365+
366+
// Goroutine para esperar a que todas las llamadas a removeFromPeer terminen y luego cerrar el canal de errores
259367
go func() {
260368
wg.Wait()
261369
close(errCh)
262370
}()
371+
// --- FIN MODIFICACIÓN ---
263372

264-
m := &MultiError{}
373+
// Recolectar todos los errores del canal
374+
m := &MultiError{} // Asumiendo que tienes una estructura MultiError o similar
265375
for err := range errCh {
266-
m.Add(err)
376+
if err != nil { // Solo añadir errores reales
377+
m.Add(err)
378+
}
267379
}
268380

269-
return nil, m.NilOrError()
381+
// Retornar nil si no hubo errores, o el MultiError si los hubo
382+
return nil, m.NilOrError() // Asumiendo que NilOrError() devuelve nil si no hay errores
270383
})
271384
return err
272385
}

transport/http_transport.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -283,12 +283,14 @@ func (t *HttpTransport) ServeHTTP(w http.ResponseWriter, r *http.Request) {
283283
defer r.Body.Close()
284284
b := bufferPool.Get().(*bytes.Buffer)
285285
b.Reset()
286+
bufferPool.Put(b)
286287
defer func() {
287-
if b.Cap() > maxBufferSize {
288+
bufferPool.Put(b)
289+
/*if b.Cap() > maxBufferSize {
288290
b = nil
289291
} else {
290292
bufferPool.Put(b)
291-
}
293+
}*/
292294
}()
293295
_, err := io.Copy(b, r.Body)
294296
if err != nil {
@@ -391,11 +393,12 @@ func (h *HttpClient) Get(ctx context.Context, in *pb.GetRequest, out *pb.GetResp
391393
b := bufferPool.Get().(*bytes.Buffer)
392394
b.Reset()
393395
defer func() {
394-
if b.Cap() > maxBufferSize {
396+
/*if b.Cap() > maxBufferSize {
395397
b = nil
396398
} else {
397399
bufferPool.Put(b)
398-
}
400+
}*/
401+
bufferPool.Put(b)
399402
}()
400403
b.Reset()
401404
defer bufferPool.Put(b)

0 commit comments

Comments
 (0)