2828import java .io .IOException ;
2929import java .io .InputStream ;
3030import java .io .OutputStream ;
31- import java .util .ArrayList ;
32- import java .util .List ;
33- import java .util .Map ;
31+ import java .util .*;
3432import java .util .stream .Collectors ;
3533import org .apache .beam .repackaged .core .org .apache .commons .lang3 .tuple .Pair ;
3634import org .apache .beam .sdk .coders .Coder ;
@@ -146,8 +144,11 @@ private void createTable(String specKey, Schema schema) {
146144 */
147145 private Schema createSchemaFromSpec (
148146 FeatureSetProto .FeatureSetSpec spec , String specKey , Table existingTable ) {
149- List <Field > fields = new ArrayList <>();
150- log .info ("Table {} will have the following fields:" , specKey );
147+ Map <String , Field > fields = new LinkedHashMap <>();
148+ if (existingTable != null ) {
149+ Schema existingSchema = existingTable .getDefinition ().getSchema ();
150+ existingSchema .getFields ().forEach (f -> fields .put (f .getName (), f ));
151+ }
151152
152153 for (FeatureSetProto .EntitySpec entitySpec : spec .getEntitiesList ()) {
153154 Field .Builder builder =
@@ -157,8 +158,7 @@ private Schema createSchemaFromSpec(
157158 builder .setMode (Field .Mode .REPEATED );
158159 }
159160 Field field = builder .build ();
160- log .info ("- {}" , field .toString ());
161- fields .add (field );
161+ fields .put (field .getName (), field );
162162 }
163163 for (FeatureSetProto .FeatureSpec featureSpec : spec .getFeaturesList ()) {
164164 Field .Builder builder =
@@ -169,8 +169,7 @@ private Schema createSchemaFromSpec(
169169 }
170170
171171 Field field = builder .build ();
172- log .info ("- {}" , field .toString ());
173- fields .add (field );
172+ fields .put (field .getName (), field );
174173 }
175174
176175 // Refer to protos/feast/core/Store.proto for reserved fields in BigQuery.
@@ -192,24 +191,13 @@ private Schema createSchemaFromSpec(
192191 Field .newBuilder (entry .getKey (), entry .getValue ().getLeft ())
193192 .setDescription (entry .getValue ().getRight ())
194193 .build ();
195- log .info ("- {}" , field .toString ());
196- fields .add (field );
194+ fields .put (field .getName (), field );
197195 }
198196
199- List <Field > fieldsList = new ArrayList <>();
200- if (existingTable != null ) {
201- Schema existingSchema = existingTable .getDefinition ().getSchema ();
202- fieldsList .addAll (existingSchema .getFields ());
203- }
204-
205- for (Field field : fields ) {
206- if (!fieldsList .contains (field )) {
207- fieldsList .add (field );
208- log .info ("- {}" , field .toString ());
209- }
210- }
197+ log .info ("Table {} will have the following fields:" , specKey );
198+ fields .values ().forEach (f -> log .info ("- {}" , f .toString ()));
211199
212- return Schema .of (FieldList .of (fieldsList ));
200+ return Schema .of (FieldList .of (fields . values () ));
213201 }
214202
215203 /**
0 commit comments