@@ -11,20 +11,22 @@ import (
1111 "github.com/feast-dev/feast/go/protos/feast/serving"
1212 "github.com/feast-dev/feast/go/protos/feast/types"
1313 "github.com/roberson-io/mmh3"
14+ "github.com/rs/zerolog/log"
1415 "golang.org/x/sync/errgroup"
1516 "golang.org/x/sync/semaphore"
1617 "google.golang.org/protobuf/proto"
1718 "google.golang.org/protobuf/types/known/timestamppb"
19+ "math/rand"
1820 "runtime"
1921 "sync"
2022 "time"
2123)
2224
23- type batchResult struct {
24- index int
25- response * dynamodb. BatchGetItemOutput
26- err error
27- }
25+ const (
26+ maxRetriesDefault = 5
27+ initialBackoff = 50 * time . Millisecond
28+ maxBackoff = 1 * time . Second
29+ )
2830
2931type DynamodbOnlineStore struct {
3032 // Feast project name
@@ -38,6 +40,7 @@ type DynamodbOnlineStore struct {
3840 // dynamodb configuration
3941 consistentRead * bool
4042 batchSize * int
43+ maxRetries * int
4144}
4245
4346func NewDynamodbOnlineStore (project string , config * registry.RepoConfig , onlineStoreConfig map [string ]interface {}) (* DynamodbOnlineStore , error ) {
@@ -70,6 +73,14 @@ func NewDynamodbOnlineStore(project string, config *registry.RepoConfig, onlineS
7073 }
7174 store .batchSize = & batchSize
7275
76+ var maxRetries int
77+ if maxRetriesFloat , ok := onlineStoreConfig ["max_retries" ].(float64 ); ok {
78+ maxRetries = int (maxRetriesFloat )
79+ } else {
80+ maxRetries = maxRetriesDefault
81+ }
82+ store .maxRetries = & maxRetries
83+
7384 return & store , nil
7485}
7586
@@ -79,12 +90,12 @@ func (d *DynamodbOnlineStore) OnlineRead(ctx context.Context, entityKeys []*type
7990 return nil , ctx .Err ()
8091 }
8192
93+ maxRetries := * d .maxRetries
8294 results := make ([][]FeatureData , len (entityKeys ))
8395
8496 // serialize entity key into entity hash id
8597 entityIndexMap := make (map [string ]int )
8698 entityIds := make ([]string , 0 , len (entityKeys ))
87- unprocessedEntityIds := make (map [string ]bool )
8899 for i , entityKey := range entityKeys {
89100 serKey , err := serializeEntityKey (entityKey , d .config .EntityKeySerializationVersion )
90101 if err != nil {
@@ -93,7 +104,6 @@ func (d *DynamodbOnlineStore) OnlineRead(ctx context.Context, entityKeys []*type
93104 entityId := hex .EncodeToString (mmh3 .Hashx64_128 (* serKey , 0 ))
94105 entityIds = append (entityIds , entityId )
95106 entityIndexMap [entityId ] = i
96- unprocessedEntityIds [entityId ] = false
97107 }
98108
99109 // metadata from feature views, feature names
@@ -116,6 +126,11 @@ func (d *DynamodbOnlineStore) OnlineRead(ctx context.Context, entityKeys []*type
116126 for featureViewName , featureNames := range featureMap {
117127 tableName := fmt .Sprintf ("%s.%s" , d .project , featureViewName )
118128
129+ unprocessedEntityIdsFeatureView := make (map [string ]bool )
130+ for _ , entityId := range entityIds {
131+ unprocessedEntityIdsFeatureView [entityId ] = true
132+ }
133+
119134 var batchGetItemInputs []* dynamodb.BatchGetItemInput
120135 batchSize := * d .batchSize
121136 for i := 0 ; i < len (entityIds ); i += batchSize {
@@ -151,28 +166,87 @@ func (d *DynamodbOnlineStore) OnlineRead(ctx context.Context, entityKeys []*type
151166 }
152167 defer sem .Release (1 )
153168
169+ var Responses []map [string ]dtypes.AttributeValue
170+ var unprocessedKeys dtypes.KeysAndAttributes
171+
172+ // response from initial request to dynamodb
154173 resp , err := d .client .BatchGetItem (ctx , batchGetItemInput )
155174 if err != nil {
156175 return err
157176 }
177+ if len (resp .Responses [tableName ]) > 0 {
178+ Responses = append (Responses , resp .Responses [tableName ]... )
179+ }
180+ if len (resp .UnprocessedKeys [tableName ].Keys ) > 0 {
181+ unprocessedKeys = resp .UnprocessedKeys [tableName ]
182+ }
183+ // retry about unprocessed key from initial request to dynamodb
184+ retries := 0
185+ backoff := initialBackoff
186+ jitterRand := rand .New (rand .NewSource (time .Now ().UnixNano ()))
187+ for len (unprocessedKeys .Keys ) > 0 && retries < maxRetries {
188+ log .Info ().Msgf ("%d retry using exponential backoff to dynamodb" , retries + 1 )
189+ if err := ctx .Err (); err != nil {
190+ return err
191+ }
192+ // jitter before retrying
193+ jitter := time .Duration (jitterRand .Intn (100 )) * time .Millisecond
194+ waitDuration := backoff + jitter
195+ timer := time .NewTimer (waitDuration )
196+ select {
197+ case <- ctx .Done ():
198+ timer .Stop ()
199+ return ctx .Err ()
200+ case <- timer .C :
201+ }
202+
203+ retries ++
204+ backoff *= 2
205+ if backoff > maxBackoff {
206+ backoff = maxBackoff
207+ }
208+ retryBatchGetItemInput := & dynamodb.BatchGetItemInput {
209+ RequestItems : map [string ]dtypes.KeysAndAttributes {
210+ tableName : unprocessedKeys ,
211+ },
212+ }
213+ retryResp , err := d .client .BatchGetItem (ctx , retryBatchGetItemInput )
214+ if err != nil {
215+ log .Info ().Msgf ("BatchGetItem retry attempt(%d) failed for table %s. err: %v\n " , retries , tableName , err )
216+ continue
217+ }
218+ if len (retryResp .Responses [tableName ]) > 0 {
219+ Responses = append (Responses , retryResp .Responses [tableName ]... )
220+ }
221+ // check unprocessed key in retried response again
222+ if len (retryResp .UnprocessedKeys [tableName ].Keys ) > 0 {
223+ unprocessedKeys = retryResp .UnprocessedKeys [tableName ]
224+ } else {
225+ unprocessedKeys = dtypes.KeysAndAttributes {}
226+ }
227+ }
228+
229+ if len (unprocessedKeys .Keys ) > 0 {
230+ return fmt .Errorf ("failed to process %d keys from table %s after %d retries. keys=%+v\n " , len (unprocessedKeys .Keys ), tableName , maxRetries , unprocessedKeys .Keys )
231+ }
158232
159233 // in case there is no entity id of a feature view in dynamodb
160- batchSize := len (resp . Responses [ tableName ] )
234+ batchSize := len (Responses )
161235 if batchSize == 0 {
162236 return nil
163237 }
164238
165239 // process response from dynamodb
166240 for j := 0 ; j < batchSize ; j ++ {
167- entityId := resp. Responses [ tableName ] [j ]["entity_id" ].(* dtypes.AttributeValueMemberS ).Value
168- timestampString := resp. Responses [ tableName ] [j ]["event_ts" ].(* dtypes.AttributeValueMemberS ).Value
241+ entityId := Responses [j ]["entity_id" ].(* dtypes.AttributeValueMemberS ).Value
242+ timestampString := Responses [j ]["event_ts" ].(* dtypes.AttributeValueMemberS ).Value
169243 t , err := time .Parse ("2006-01-02 15:04:05-07:00" , timestampString )
170244 if err != nil {
171245 return err
172246 }
173247 timeStamp := timestamppb .New (t )
174248
175- featureValues := resp. Responses [ tableName ] [j ]["values" ].(* dtypes.AttributeValueMemberM ).Value
249+ featureValues := Responses [j ]["values" ].(* dtypes.AttributeValueMemberM ).Value
176250 entityIndex := entityIndexMap [entityId ]
177251
178252 for _ , featureName := range featureNames {
@@ -192,7 +266,7 @@ func (d *DynamodbOnlineStore) OnlineRead(ctx context.Context, entityKeys []*type
192266 }
193267
194268 mu .Lock ()
195- delete (unprocessedEntityIds , entityId )
269+ delete (unprocessedEntityIdsFeatureView , entityId )
196270 mu .Unlock ()
197271 }
198272 return nil
@@ -204,7 +278,7 @@ func (d *DynamodbOnlineStore) OnlineRead(ctx context.Context, entityKeys []*type
204278
205279 // process null imputation for entity ids that don't exist in dynamodb
206280 currentTime := timestamppb .Now () // TODO: should use a different timestamp?
207- for entityId , _ := range unprocessedEntityIds {
281+ for entityId , _ := range unprocessedEntityIdsFeatureView {
208282 entityIndex := entityIndexMap [entityId ]
209283 for _ , featureName := range featureNames {
210284 featureIndex := featureNamesIndex [featureName ]
0 commit comments