Skip to content

Commit d0d27a3

Browse files
authored
fix: Version entity serialization mechanism and fix issue with int64 vals (feast-dev#2944)
* fix: version the entity serialization mechanism to fix issue with int64 vals Signed-off-by: Achal Shah <[email protected]> * fix tests Signed-off-by: Achal Shah <[email protected]> * Add a test Signed-off-by: Achal Shah <[email protected]> * Add a test Signed-off-by: Achal Shah <[email protected]> * fix test Signed-off-by: Achal Shah <[email protected]> * fix test Signed-off-by: Achal Shah <[email protected]> * fix test Signed-off-by: Achal Shah <[email protected]> * simplify Signed-off-by: Achal Shah <[email protected]> * simplify Signed-off-by: Achal Shah <[email protected]> * feature_store.yaml Signed-off-by: Achal Shah <[email protected]> * fix tests Signed-off-by: Achal Shah <[email protected]> * remove protos Signed-off-by: Achal Shah <[email protected]> * fix tests Signed-off-by: Achal Shah <[email protected]> * update feature_store.yaml templates Signed-off-by: Achal Shah <[email protected]> * fix java Signed-off-by: Achal Shah <[email protected]> * fix java test Signed-off-by: Achal Shah <[email protected]> * docs Signed-off-by: Achal Shah <[email protected]> * docs Signed-off-by: Achal Shah <[email protected]> * docs Signed-off-by: Achal Shah <[email protected]>
1 parent 92785b8 commit d0d27a3

File tree

35 files changed

+373
-159
lines changed

35 files changed

+373
-159
lines changed

go/internal/feast/onlinestore/onlinestore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func NewOnlineStore(config *registry.RepoConfig) (OnlineStore, error) {
6161
onlineStore, err := NewSqliteOnlineStore(config.Project, config, config.OnlineStore)
6262
return onlineStore, err
6363
} else if onlineStoreType == "redis" {
64-
onlineStore, err := NewRedisOnlineStore(config.Project, config.OnlineStore)
64+
onlineStore, err := NewRedisOnlineStore(config.Project, config, config.OnlineStore)
6565
return onlineStore, err
6666
} else {
6767
return nil, fmt.Errorf("%s online store type is currently not supported; only redis and sqlite are supported", onlineStoreType)

go/internal/feast/onlinestore/redisonlinestore.go

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@ import (
66
"encoding/binary"
77
"errors"
88
"fmt"
9+
"github.com/feast-dev/feast/go/internal/feast/registry"
910
"sort"
1011
"strconv"
1112
"strings"
1213

1314
"github.com/go-redis/redis/v8"
1415
"github.com/golang/protobuf/proto"
1516
"github.com/spaolacci/murmur3"
16-
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
17+
"google.golang.org/protobuf/types/known/timestamppb"
1718

1819
"github.com/feast-dev/feast/go/protos/feast/serving"
1920
"github.com/feast-dev/feast/go/protos/feast/types"
@@ -37,10 +38,15 @@ type RedisOnlineStore struct {
3738

3839
// Redis client connector
3940
client *redis.Client
41+
42+
config *registry.RepoConfig
4043
}
4144

42-
func NewRedisOnlineStore(project string, onlineStoreConfig map[string]interface{}) (*RedisOnlineStore, error) {
43-
store := RedisOnlineStore{project: project}
45+
func NewRedisOnlineStore(project string, config *registry.RepoConfig, onlineStoreConfig map[string]interface{}) (*RedisOnlineStore, error) {
46+
store := RedisOnlineStore{
47+
project: project,
48+
config: config,
49+
}
4450

4551
var address []string
4652
var password string
@@ -161,7 +167,7 @@ func (r *RedisOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.E
161167
redisKeyToEntityIndex := make(map[string]int)
162168
for i := 0; i < len(entityKeys); i++ {
163169

164-
var key, err = buildRedisKey(r.project, entityKeys[i])
170+
var key, err = buildRedisKey(r.project, entityKeys[i], r.config.EntityKeySerializationVersion)
165171
if err != nil {
166172
return nil, err
167173
}
@@ -270,16 +276,16 @@ func (r *RedisOnlineStore) Destruct() {
270276

271277
}
272278

273-
func buildRedisKey(project string, entityKey *types.EntityKey) (*[]byte, error) {
274-
serKey, err := serializeEntityKey(entityKey)
279+
func buildRedisKey(project string, entityKey *types.EntityKey, entityKeySerializationVersion int64) (*[]byte, error) {
280+
serKey, err := serializeEntityKey(entityKey, entityKeySerializationVersion)
275281
if err != nil {
276282
return nil, err
277283
}
278284
fullKey := append(*serKey, []byte(project)...)
279285
return &fullKey, nil
280286
}
281287

282-
func serializeEntityKey(entityKey *types.EntityKey) (*[]byte, error) {
288+
func serializeEntityKey(entityKey *types.EntityKey, entityKeySerializationVersion int64) (*[]byte, error) {
283289
// Serialize entity key to a bytestring so that it can be used as a lookup key in a hash table.
284290

285291
// Ensure that we have the right amount of join keys and entity values
@@ -316,7 +322,7 @@ func serializeEntityKey(entityKey *types.EntityKey) (*[]byte, error) {
316322
offset := (2 * len(keys)) + (i * 3)
317323
value := m[keys[i]].GetVal()
318324

319-
valueBytes, valueTypeBytes, err := serializeValue(value)
325+
valueBytes, valueTypeBytes, err := serializeValue(value, entityKeySerializationVersion)
320326
if err != nil {
321327
return valueBytes, err
322328
}
@@ -341,7 +347,7 @@ func serializeEntityKey(entityKey *types.EntityKey) (*[]byte, error) {
341347
return &entityKeyBuffer, nil
342348
}
343349

344-
func serializeValue(value interface{}) (*[]byte, types.ValueType_Enum, error) {
350+
func serializeValue(value interface{}, entityKeySerializationVersion int64) (*[]byte, types.ValueType_Enum, error) {
345351
// TODO: Implement support for other types (at least the major types like ints, strings, bytes)
346352
switch x := (value).(type) {
347353
case *types.Value_StringVal:
@@ -354,10 +360,16 @@ func serializeValue(value interface{}) (*[]byte, types.ValueType_Enum, error) {
354360
binary.LittleEndian.PutUint32(valueBuffer, uint32(x.Int32Val))
355361
return &valueBuffer, types.ValueType_INT32, nil
356362
case *types.Value_Int64Val:
357-
// TODO (woop): We unfortunately have to use 32 bit here for backward compatibility :(
358-
valueBuffer := make([]byte, 4)
359-
binary.LittleEndian.PutUint32(valueBuffer, uint32(x.Int64Val))
360-
return &valueBuffer, types.ValueType_INT64, nil
363+
if entityKeySerializationVersion <= 1 {
364+
// We unfortunately have to use 32 bit here for backward compatibility :(
365+
valueBuffer := make([]byte, 4)
366+
binary.LittleEndian.PutUint32(valueBuffer, uint32(x.Int64Val))
367+
return &valueBuffer, types.ValueType_INT64, nil
368+
} else {
369+
valueBuffer := make([]byte, 8)
370+
binary.LittleEndian.PutUint64(valueBuffer, uint64(x.Int64Val))
371+
return &valueBuffer, types.ValueType_INT64, nil
372+
}
361373
case nil:
362374
return nil, types.ValueType_INVALID, fmt.Errorf("could not detect type for %v", x)
363375
default:

go/internal/feast/onlinestore/redisonlinestore_test.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package onlinestore
22

33
import (
4+
"github.com/feast-dev/feast/go/internal/feast/registry"
45
"testing"
56

67
"github.com/stretchr/testify/assert"
@@ -10,7 +11,11 @@ func TestNewRedisOnlineStore(t *testing.T) {
1011
var config = map[string]interface{}{
1112
"connection_string": "redis://localhost:6379",
1213
}
13-
store, err := NewRedisOnlineStore("test", config)
14+
rc := &registry.RepoConfig{
15+
OnlineStore: config,
16+
EntityKeySerializationVersion: 2,
17+
}
18+
store, err := NewRedisOnlineStore("test", rc, config)
1419
assert.Nil(t, err)
1520
var opts = store.client.Options()
1621
assert.Equal(t, opts.Addr, "redis://localhost:6379")
@@ -23,7 +28,11 @@ func TestNewRedisOnlineStoreWithPassword(t *testing.T) {
2328
var config = map[string]interface{}{
2429
"connection_string": "redis://localhost:6379,password=secret",
2530
}
26-
store, err := NewRedisOnlineStore("test", config)
31+
rc := &registry.RepoConfig{
32+
OnlineStore: config,
33+
EntityKeySerializationVersion: 2,
34+
}
35+
store, err := NewRedisOnlineStore("test", rc, config)
2736
assert.Nil(t, err)
2837
var opts = store.client.Options()
2938
assert.Equal(t, opts.Addr, "redis://localhost:6379")
@@ -34,7 +43,11 @@ func TestNewRedisOnlineStoreWithDB(t *testing.T) {
3443
var config = map[string]interface{}{
3544
"connection_string": "redis://localhost:6379,db=1",
3645
}
37-
store, err := NewRedisOnlineStore("test", config)
46+
rc := &registry.RepoConfig{
47+
OnlineStore: config,
48+
EntityKeySerializationVersion: 2,
49+
}
50+
store, err := NewRedisOnlineStore("test", rc, config)
3851
assert.Nil(t, err)
3952
var opts = store.client.Options()
4053
assert.Equal(t, opts.Addr, "redis://localhost:6379")
@@ -45,7 +58,11 @@ func TestNewRedisOnlineStoreWithSsl(t *testing.T) {
4558
var config = map[string]interface{}{
4659
"connection_string": "redis://localhost:6379,ssl=true",
4760
}
48-
store, err := NewRedisOnlineStore("test", config)
61+
rc := &registry.RepoConfig{
62+
OnlineStore: config,
63+
EntityKeySerializationVersion: 2,
64+
}
65+
store, err := NewRedisOnlineStore("test", rc, config)
4966
assert.Nil(t, err)
5067
var opts = store.client.Options()
5168
assert.Equal(t, opts.Addr, "redis://localhost:6379")

go/internal/feast/onlinestore/sqliteonlinestore.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,24 @@ import (
1616

1717
_ "github.com/mattn/go-sqlite3"
1818
"google.golang.org/protobuf/proto"
19-
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
19+
"google.golang.org/protobuf/types/known/timestamppb"
2020

2121
"github.com/feast-dev/feast/go/protos/feast/serving"
2222
"github.com/feast-dev/feast/go/protos/feast/types"
2323
)
2424

2525
type SqliteOnlineStore struct {
2626
// Feast project name
27-
project string
28-
path string
29-
db *sql.DB
30-
db_mu sync.Mutex
27+
project string
28+
path string
29+
db *sql.DB
30+
db_mu sync.Mutex
31+
repoConfig *registry.RepoConfig
3132
}
3233

3334
// Creates a new sqlite online store object. onlineStoreConfig should have relative path of database file with respect to repoConfig.repoPath.
3435
func NewSqliteOnlineStore(project string, repoConfig *registry.RepoConfig, onlineStoreConfig map[string]interface{}) (*SqliteOnlineStore, error) {
35-
store := SqliteOnlineStore{project: project}
36+
store := SqliteOnlineStore{project: project, repoConfig: repoConfig}
3637
if db_path, ok := onlineStoreConfig["path"]; !ok {
3738
return nil, fmt.Errorf("cannot find sqlite path %s", db_path)
3839
} else {
@@ -69,7 +70,7 @@ func (s *SqliteOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.
6970
in_query := make([]string, len(entityKeys))
7071
serialized_entities := make([]interface{}, len(entityKeys))
7172
for i := 0; i < len(entityKeys); i++ {
72-
serKey, err := serializeEntityKey(entityKeys[i])
73+
serKey, err := serializeEntityKey(entityKeys[i], s.repoConfig.EntityKeySerializationVersion)
7374
if err != nil {
7475
return nil, err
7576
}

go/internal/feast/registry/repoconfig.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ type RepoConfig struct {
3030
Flags map[string]interface{} `json:"flags"`
3131
// RepoPath
3232
RepoPath string `json:"repo_path"`
33+
// EntityKeySerializationVersion
34+
EntityKeySerializationVersion int64 `json:"entity_key_serialization_version"`
3335
}
3436

3537
type RegistryConfig struct {

0 commit comments

Comments
 (0)