Skip to content

add CollectRowsWithFilter #2292

Closed
Closed
@piusalfred

Description

@piusalfred

Recently I have a case where after collecting rows using pgx.CollectRows, I needed to iterate over the result and filter out nil values which are basically the values that did not pass my custom filter after scan.

func (p *PostgresEventStore) Load(ctx context.Context, aggregateID uuid.UUID, predicate domain.EventPredicate) ([]domain.Event, error) {
    // omitted for brevity
    rows, err := p.pool.Query(ctx, stmt, args...)
    if err != nil {
        return nil, err
    }

    pgEvents, err := pgx.CollectRows[domain.Event](rows, func(row pgx.CollectableRow) (domain.Event, error) {
        var e PgEvent
        if scanErr := row.Scan(//....); scanErr != nil {
            return nil, err
        }

        // If it doesn't pass the predicate, we return nil
        if predicate != nil && !predicate(e) {
            return nil, nil
        }

        return e, nil
    })
    if err != nil {
        return nil, err
    }

    // SECOND PASS: filter out the nil items
    var events []domain.Event
    for _, e := range pgEvents {
        if e != nil {
            events = append(events, e)
        }
    }

    return events, nil
}

I believe it will be good to have a new function, for example pgx.CollectRowsWithFilter that extends pgx.CollectRows by also accepting a filter func func(T)bool meaning we’d handle everything in one pass. The above snippet becomes

func (p *PostgresEventStore) Load(ctx context.Context, aggregateID uuid.UUID, predicate domain.EventPredicate) ([]domain.Event, error) {
    // omitted for brevity
    rows, err := p.pool.Query(ctx, stmt, args...)
    if err != nil {
        return nil, fmt.Errorf("query events: %w", err)
    }

    pgEvents, err := pgx.CollectRowsWithFilter[domain.Event](
        rows,
        // Step 1: How to collect/scan each row
        func(row pgx.CollectableRow) (domain.Event, error) {
            var e PgEvent
            if scanErr := row.Scan(//...); scanErr != nil {
                return nil, err
            }
            return e, nil
        },
        // Step 2: Decide whether to keep the scanned event
        func(evt domain.Event) bool {return predicate(evt)},
    )
    if err != nil {
        return nil, err
    }

    // No second pass needed
    return pgEvents, nil
}

maybe we can have something like this

func AppendRowsWithFilter[T any, S ~[]T](slice S, rows Rows, fn RowToFunc[T], filter func(T)bool) (S, error){
	defer rows.Close()

	for rows.Next() {
		value, err := fn(rows)
		if err != nil {
			return nil, err
		}
                if filter == nil{
                    slice = append(slice,value)
                    continue
                }
		if filter(value) {
			slice = append(slice, value)
		}
	}

	if err := rows.Err(); err != nil {
		return nil, err
	}

	return slice, nil
}

func CollectRowsWithFilter[T any](rows Rows, fn RowToFunc[T], filter func(T)bool) ([]T, error){
	return AppendRowsWithFilter([]T{}, rows, fn, filter)
}

skipping items as soon as they fail a filter pass can improve efficiency by avoiding storing and then re-iterating the unwanted rows.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions