Skip to content

chore(dataobj): Add columnar reading APIs to logs and streams sections #17976

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 10, 2025
Merged
151 changes: 151 additions & 0 deletions pkg/dataobj/internal/arrowconv/arrowconv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Package arrowconv provides helper utilities for converting between Arrow and
// dataset values.
package arrowconv

import (
"fmt"

"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/arrow/scalar"

"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
)

// DatasetType returns the [datasetmd.ValueType] that corresponds to the given
// Arrow type.
//
// - [arrow.INT64] maps to [datasetmd.VALUE_TYPE_INT64].
// - [arrow.UINT64] maps to [datasetmd.VALUE_TYPE_UINT64].
// - [arrow.TIMESTAMP] maps to [datasetmd.VALUE_TYPE_INT64].
// - [arrow.BINARY] maps to [datasetmd.VALUE_TYPE_BYTE_ARRAY].
//
// DatasetType returns [datasetmd.VALUE_TYPE_UNSPECIFIED], false for
// unsupported Arrow types.
func DatasetType(arrowType arrow.DataType) (datasetmd.ValueType, bool) {
switch arrowType.ID() {
case arrow.NULL:
return datasetmd.VALUE_TYPE_UNSPECIFIED, true
case arrow.INT64:
return datasetmd.VALUE_TYPE_INT64, true
case arrow.UINT64:
return datasetmd.VALUE_TYPE_UINT64, true
case arrow.TIMESTAMP:
return datasetmd.VALUE_TYPE_INT64, true
case arrow.BINARY:
return datasetmd.VALUE_TYPE_BYTE_ARRAY, true
}

return datasetmd.VALUE_TYPE_UNSPECIFIED, false
}

// FromScalar converts a [scalar.Scalar] into a [dataset.Value] of the
// specified type.
//
// The kind of toType and the type of s must be compatible:
//
// - For [datasetmd.VALUE_TYPE_INT64], s must be a [scalar.Int64] or [scalar.Timestamp].
// - For [datasetmd.VALUE_TYPE_UINT64], s must be a [scalar.Uint64].
// - For [datasetmd.VALUE_TYPE_BYTE_ARRAY], s must be a [scalar.Binary].
//
// If s references allocated memory, FromScalar will hold a reference to that
// memory. Callers are responsible for releasing the scalar after the returned
// dataset.Value is no longer used.
//
// If s is a null type, it will return a nil [dataset.Value].
func FromScalar(s scalar.Scalar, toType datasetmd.ValueType) dataset.Value {
// IsValid returns false when the scalar is a null value.
if !s.IsValid() {
return dataset.Value{}
}

switch toType {
case datasetmd.VALUE_TYPE_UNSPECIFIED:
return dataset.Value{}

case datasetmd.VALUE_TYPE_INT64:
switch s := s.(type) {
case *scalar.Int64:
return dataset.Int64Value(s.Value)
case *scalar.Timestamp:
return dataset.Int64Value(int64(s.Value))
default:
panic(fmt.Sprintf("arrowconv.FromScalar: invalid conversion to INT64; got %T, want *scalar.Int64 or *scalar.Timestamp", s))
}

case datasetmd.VALUE_TYPE_UINT64:
s, ok := s.(*scalar.Uint64)
if !ok {
panic(fmt.Sprintf("arrowconv.FromScalar: invalid conversion to UINT64; got %T, want *scalar.Uint64", s))
}
return dataset.Uint64Value(s.Value)

case datasetmd.VALUE_TYPE_BYTE_ARRAY:
s, ok := s.(*scalar.Binary)
if !ok {
panic(fmt.Sprintf("arrowconv.FromScalar: invalid conversion to BYTE_ARRAY; got %T, want *scalar.Binary", s))
}

// Retain the scalar to ensure that alloced memory doesn't get overwritten
// while the returned value is still active.
s.Retain()
return dataset.ByteArrayValue(s.Value.Bytes())

default:
panic(fmt.Sprintf("arrowconv.FromScalar: unsupported conversion to dataset.Value type %s", toType))
}
}

// ToScalar converts a [dataset.Value] into a [scalar.Scalar] of the specified
// type.
//
// The kind of toType and the type of v and toType must be compatible:
//
// - For [arrow.INT64], v must be a [datasetmd.VALUE_TYPE_INT64].
// - For [arrow.UINT64], v must be a [datasetmd.VALUE_TYPE_UINT64].
// - For [arrow.TIMESTAMP], v must be a [datasetmd.VALUE_TYPE_INT64], which
// will be converted into a nanosecond timestamp.
// - For [arrow.BINARY], v must be a [datasetmd.VALUE_TYPE_BYTE_ARRAY].
//
// If v is nil, ToScalar returns a null scalar of the specified type. If toType
// is a Null type, then ToScalar returns a null scalar even if v is non-null.
//
// ToScalar panics if v and toType are not compatible.
func ToScalar(v dataset.Value, toType arrow.DataType) scalar.Scalar {
if v.IsNil() {
return scalar.MakeNullScalar(toType)
}

switch toType.ID() {
case arrow.NULL:
return scalar.MakeNullScalar(toType)

case arrow.INT64:
if got, want := v.Type(), datasetmd.VALUE_TYPE_INT64; got != want {
panic(fmt.Sprintf("arrowconv.ToScalar: invalid conversion to INT64; got %s, want %s", got, want))
}
return scalar.NewInt64Scalar(v.Int64())

case arrow.UINT64:
if got, want := v.Type(), datasetmd.VALUE_TYPE_UINT64; got != want {
panic(fmt.Sprintf("arrowconv.ToScalar: invalid conversion to UINT64; got %s, want %s", got, want))
}
return scalar.NewUint64Scalar(v.Uint64())

case arrow.TIMESTAMP:
if got, want := v.Type(), datasetmd.VALUE_TYPE_INT64; got != want {
panic(fmt.Sprintf("arrowconv.ToScalar: invalid conversion to TIMESTAMP; got %s, want %s", got, want))
}
return scalar.NewTimestampScalar(arrow.Timestamp(v.Int64()), toType)

case arrow.BINARY:
if got, want := v.Type(), datasetmd.VALUE_TYPE_BYTE_ARRAY; got != want {
panic(fmt.Sprintf("arrowconv.ToScalar: invalid conversion to BINARY; got %s, want %s", got, want))
}
return scalar.NewBinaryScalar(memory.NewBufferBytes(v.ByteArray()), toType)

default:
panic(fmt.Sprintf("arrowconv.ToScalar: unsupported conversion to Arrow type %s", toType))
}
}
93 changes: 93 additions & 0 deletions pkg/dataobj/internal/util/arrowtest/arrowtest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Package arrowtest provides utilities for testing Arrow records.
package arrowtest

import (
"encoding/base64"
"time"
"unsafe"

"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
)

type (
// Rows is a slice of [Row].
Rows []Row

// Row represents a single record row as a map of column name to value.
Row map[string]any
)

// RecordRows converts an [arrow.Record] into [Rows] for comparison in tests.
// RecordRows requires all columns in the record to have a unique name.
//
// Most values are converted to their Go equivalents, with the exception of
// timestamps, which are converted to strings using [Time].
//
// Callers building expected [Rows] must use the same functions.
func RecordRows(rec arrow.Record) (Rows, error) {
rows := make(Rows, rec.NumRows())

for i := range int(rec.NumRows()) {
row := make(Row, rec.NumCols())
for j := range int(rec.NumCols()) {
row[rec.Schema().Field(j).Name] = rec.Column(j).GetOneForMarshal(i)
}

rows[i] = row
}

return rows, nil
}

// TableRows concatenates all chunks of the [arrow.Table] into a single
// [arrow.Record], and then returns it as [Rows]. TableRows requires all
// columns in the table to have a unique name.
//
// See [RecordRows] for specifies on how values are converted into Go values
// for a [Row].
func TableRows(alloc memory.Allocator, table arrow.Table) (Rows, error) {
rec, err := mergeTable(alloc, table)
if err != nil {
return nil, err
}
defer rec.Release()

return RecordRows(rec)
}

// mergeTable merges all chunks in an [arrow.Table] into a single
// [arrow.Record].
func mergeTable(alloc memory.Allocator, table arrow.Table) (arrow.Record, error) {
recordColumns := make([]arrow.Array, table.NumCols())

for i := range int(table.NumCols()) {
column, err := array.Concatenate(table.Column(i).Data().Chunks(), alloc)
if err != nil {
return nil, err
}
defer column.Release()
recordColumns[i] = column
}

return array.NewRecord(table.Schema(), recordColumns, table.NumRows()), nil
}

// Base64 encodes the given string as base64.
func Base64(s string) string {
rawString := unsafe.Slice(unsafe.StringData(s), len(s))
return base64.StdEncoding.EncodeToString(rawString)
}

// Time returns a string representation of t in the format emitted by
// [TableRows].
//
// Callers must configure t with the same timezone and precision used by the
// Arrow column.
func Time(t time.Time) string {
// This is the format used by [array.Timestamp.ValueStr]. Arrow will
// automatically truncate the timestamp before formatting it, but we bypass
// that here to make it the caller's responsibility instead.
return t.Format("2006-01-02 15:04:05.999999999Z0700")
}
Loading
Loading