Skip to content

Commit 05875ad

Browse files
jahhulbert-ccrielahrvivaz
authored andcommitted
GEOMESA-389 Refactor SimpleFeatureEncoder
Separated encoder trait from decoder trait
1 parent aaac3a1 commit 05875ad

File tree

21 files changed

+250
-157
lines changed

21 files changed

+250
-157
lines changed

geomesa-compute/src/main/scala/org/locationtech/geomesa/compute/spark/GeoMesaSpark.scala

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package org.locationtech.geomesa.compute.spark
1818

19-
import java.io.ByteArrayInputStream
2019
import java.nio.charset.StandardCharsets
2120
import java.text.SimpleDateFormat
2221

@@ -32,8 +31,8 @@ import org.apache.spark.serializer.KryoRegistrator
3231
import org.apache.spark.{SparkConf, SparkContext}
3332
import org.geotools.data.{DataStore, Query}
3433
import org.geotools.factory.CommonFactoryFinder
35-
import org.locationtech.geomesa.core.data.{AccumuloDataStore, AvroFeatureEncoder}
36-
import org.locationtech.geomesa.core.index.{STIdxStrategy, IndexSchema}
34+
import org.locationtech.geomesa.core.data._
35+
import org.locationtech.geomesa.core.index.{IndexSchema, STIdxStrategy}
3736
import org.locationtech.geomesa.feature.AvroSimpleFeature
3837
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes
3938
import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}
@@ -56,8 +55,10 @@ object GeoMesaSpark {
5655
val typeName = query.getTypeName
5756
val sft = ds.getSchema(typeName)
5857
val spec = SimpleFeatureTypes.encodeType(sft)
58+
val encoder = SimpleFeatureEncoder(sft, ds.getFeatureEncoding(sft))
59+
val decoder = SimpleFeatureDecoder(sft, ds.getFeatureEncoding(sft))
5960

60-
val indexSchema = IndexSchema(ds.getIndexSchemaFmt(typeName), sft, ds.getFeatureEncoder(sft))
61+
val indexSchema = IndexSchema(ds.getIndexSchemaFmt(typeName), sft, encoder)
6162

6263
val planner = new STIdxStrategy
6364
val qp = planner.buildSTIdxQueryPlan(query, indexSchema.planner, sft, org.locationtech.geomesa.core.index.ExplainPrintln)
@@ -75,7 +76,7 @@ object GeoMesaSpark {
7576
rdd.mapPartitions { iter =>
7677
val sft = SimpleFeatureTypes.createType(typeName, spec)
7778
val encoder = new AvroFeatureEncoder(sft)
78-
iter.map { case (k: Key, v: Value) => encoder.decode(v) }
79+
iter.map { case (k: Key, v: Value) => decoder.decode(v) }
7980
}
8081
}
8182

@@ -106,14 +107,16 @@ class KryoAvroSimpleFeatureBridge extends KryoRegistrator {
106107
}
107108
})
108109

109-
// There is some overhead to managing this with a cache and it may be better to find a way
110-
// to do this outside of the methods write() and read() and make the encoder a member
111-
// variable on the instance of the Serializer
112110
val encoderCache = CacheBuilder.newBuilder().build(
113111
new CacheLoader[String, AvroFeatureEncoder] {
114112
override def load(key: String): AvroFeatureEncoder = new AvroFeatureEncoder(typeCache.get(key))
115113
})
116114

115+
val decoderCache = CacheBuilder.newBuilder().build(
116+
new CacheLoader[String, AvroFeatureDecoder] {
117+
override def load(key: String): AvroFeatureDecoder = new AvroFeatureDecoder(typeCache.get(key))
118+
})
119+
117120
override def write(kryo: Kryo, out: Output, feature: AvroSimpleFeature): Unit = {
118121
val typeName = feature.getFeatureType.getTypeName
119122
val len = typeName.length
@@ -130,7 +133,7 @@ class KryoAvroSimpleFeatureBridge extends KryoRegistrator {
130133
val sft = typeCache.get(typeName)
131134
val flen = in.readInt(true)
132135
val bytes = in.readBytes(flen)
133-
encoderCache.get(typeName).decode(new ByteArrayInputStream(bytes))
136+
decoderCache.get(typeName).decode(bytes)
134137
}
135138
})
136139
}

geomesa-core/src/main/scala/org/locationtech/geomesa/core/data/AccumuloDataStore.scala

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -289,11 +289,11 @@ class AccumuloDataStore(val connector: Connector,
289289
/**
290290
* Read SpatioTemporal Index table name from store metadata
291291
*/
292-
def getSpatioTemporalMaxShard(featureType: SimpleFeatureType): Int = {
293-
val indexSchemaFmt = readMetadataItem(featureType.getTypeName, SCHEMA_CF)
292+
def getSpatioTemporalMaxShard(sft: SimpleFeatureType): Int = {
293+
val indexSchemaFmt = readMetadataItem(sft.getTypeName, SCHEMA_CF)
294294
.getOrElse(throw new RuntimeException(s"Unable to find required metadata property for $SCHEMA_CF"))
295-
val featureEncoder = getFeatureEncoder(featureType)
296-
val indexSchema = IndexSchema(indexSchemaFmt, featureType, featureEncoder)
295+
val fe = SimpleFeatureEncoder(sft, getFeatureEncoding(sft))
296+
val indexSchema = IndexSchema(indexSchemaFmt, sft, fe)
297297
indexSchema.maxShard
298298
}
299299

@@ -801,11 +801,10 @@ class AccumuloDataStore(val connector: Connector,
801801
/**
802802
* Reads the feature encoding from the metadata. Defaults to TEXT if there is no metadata.
803803
*/
804-
def getFeatureEncoder(sft: SimpleFeatureType) = {
804+
def getFeatureEncoding(sft: SimpleFeatureType): FeatureEncoding = {
805805
val encodingString = readMetadataItem(sft.getTypeName, FEATURE_ENCODING_CF)
806806
.getOrElse(FeatureEncoding.TEXT.toString)
807-
val encoding = FeatureEncoding.withName(encodingString).asInstanceOf[FeatureEncoding]
808-
SimpleFeatureEncoderFactory.createEncoder(sft, encoding)
807+
FeatureEncoding.withName(encodingString)
809808
}
810809

811810
// We assume that they want the bounds for everything.
@@ -892,31 +891,31 @@ class AccumuloDataStore(val connector: Connector,
892891
validateMetadata(featureName)
893892
val indexSchemaFmt = getIndexSchemaFmt(featureName)
894893
val sft = getSchema(featureName)
895-
val fe = getFeatureEncoder(sft)
894+
val fe = SimpleFeatureEncoder(sft, getFeatureEncoding(sft))
896895
new AccumuloFeatureReader(this, query, indexSchemaFmt, sft, fe)
897896
}
898897

899898
/* create a general purpose writer that is capable of insert, deletes, and updates */
900899
override def createFeatureWriter(typeName: String, transaction: Transaction): SFFeatureWriter = {
901900
validateMetadata(typeName)
902901
checkWritePermissions(typeName)
903-
val featureType = getSchema(typeName)
902+
val sft = getSchema(typeName)
904903
val indexSchemaFmt = getIndexSchemaFmt(typeName)
905-
val fe = getFeatureEncoder(featureType)
904+
val fe = SimpleFeatureEncoder(sft, getFeatureEncoding(sft))
906905
val encoder = IndexSchema.buildKeyEncoder(indexSchemaFmt, fe)
907-
new ModifyAccumuloFeatureWriter(featureType, encoder, connector, fe, writeVisibilities, this)
906+
new ModifyAccumuloFeatureWriter(sft, encoder, connector, fe, writeVisibilities, this)
908907
}
909908

910909
/* optimized for GeoTools API to return writer ONLY for appending (aka don't scan table) */
911910
override def getFeatureWriterAppend(typeName: String,
912911
transaction: Transaction): SFFeatureWriter = {
913912
validateMetadata(typeName)
914913
checkWritePermissions(typeName)
915-
val featureType = getSchema(typeName)
914+
val sft = getSchema(typeName)
916915
val indexSchemaFmt = getIndexSchemaFmt(typeName)
917-
val fe = getFeatureEncoder(featureType)
916+
val fe = SimpleFeatureEncoder(sft, getFeatureEncoding(sft))
918917
val encoder = IndexSchema.buildKeyEncoder(indexSchemaFmt, fe)
919-
new AppendAccumuloFeatureWriter(featureType, encoder, connector, fe, writeVisibilities, this)
918+
new AppendAccumuloFeatureWriter(sft, encoder, connector, fe, writeVisibilities, this)
920919
}
921920

922921
override def getUnsupportedFilter(featureName: String, filter: Filter): Filter = Filter.INCLUDE

geomesa-core/src/main/scala/org/locationtech/geomesa/core/data/SimpleFeatureEncoder.scala

Lines changed: 99 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -22,34 +22,77 @@ import org.apache.accumulo.core.data.{Value => AValue}
2222
import org.apache.avro.io._
2323
import org.geotools.data.DataUtilities
2424
import org.locationtech.geomesa.core.data.FeatureEncoding.FeatureEncoding
25-
import org.locationtech.geomesa.feature.{AvroSimpleFeatureWriter, FeatureSpecificReader}
25+
import org.locationtech.geomesa.feature.{AvroSimpleFeatureFactory, AvroSimpleFeature, AvroSimpleFeatureWriter, FeatureSpecificReader}
2626
import org.locationtech.geomesa.utils.text.ObjectPoolFactory
2727
import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}
2828

2929

30+
trait HasEncoding {
31+
def encoding: FeatureEncoding
32+
}
33+
3034
/**
31-
* Responsible for collecting data-entries that share an identifier, and
32-
* when done, collapsing these rows into a single SimpleFeature.
33-
*
34-
* All encoding/decoding/serializing of features should be done through this
35-
* single class to allow future versions of serialization instead of scattering
36-
* knowledge of how the serialization is done through geomesa codebase.
35+
* Interface to encode SimpleFeatures with a configurable serialization format.
3736
*
3837
* A SimpleFeatureEncoder is bound to a given SimpleFeatureType since serialization
3938
* may depend upon the schema of the feature type.
4039
*
4140
* SimpleFeatureEncoder classes may not be thread safe and should generally be used
42-
* as instance variables for performance reasons. They can serialize and deserialize
43-
* multiple features.
41+
* as instance variables for performance reasons.
4442
*/
45-
trait SimpleFeatureEncoder {
43+
trait SimpleFeatureEncoder extends HasEncoding {
4644
def encode(feature: SimpleFeature): Array[Byte]
45+
}
46+
47+
/**
48+
* Interface to read SimpleFeatures with a configurable serialization format.
49+
*
50+
* A SimpleFeatureDecoder is bound to a given SimpleFeatureType since serialization
51+
* may depend upon the schema of the feature type.
52+
*
53+
* SimpleFeatureDecoder classes may not be thread safe and should generally be used
54+
* as instance variables for performance reasons.
55+
*/
56+
trait SimpleFeatureDecoder extends HasEncoding {
4757
def decode(featureValue: AValue): SimpleFeature = decode(featureValue.get)
4858
def decode(featureBytes: Array[Byte]): SimpleFeature
4959
def extractFeatureId(value: AValue): String = extractFeatureId(value.get)
5060
def extractFeatureId(bytes: Array[Byte]): String
51-
def getName = getEncoding.toString
52-
def getEncoding: FeatureEncoding
61+
}
62+
63+
object SimpleFeatureDecoder {
64+
def apply(sft: SimpleFeatureType, encoding: FeatureEncoding) =
65+
encoding match {
66+
case FeatureEncoding.AVRO => new AvroFeatureDecoder(sft)
67+
case FeatureEncoding.TEXT => new TextFeatureDecoder(sft)
68+
}
69+
70+
def apply(originalSft: SimpleFeatureType,
71+
projectedSft: SimpleFeatureType,
72+
encoding: FeatureEncoding) =
73+
encoding match {
74+
case FeatureEncoding.AVRO => new ProjectingAvroFeatureDecoder(originalSft, projectedSft)
75+
case FeatureEncoding.TEXT => new ProjectingTextDecoder(originalSft, projectedSft)
76+
}
77+
78+
def apply(sft: SimpleFeatureType, encoding: String): SimpleFeatureDecoder =
79+
SimpleFeatureDecoder(sft, FeatureEncoding.withName(encoding))
80+
81+
def apply(originalSft: SimpleFeatureType,
82+
projectedSft: SimpleFeatureType,
83+
encoding: String): SimpleFeatureDecoder =
84+
SimpleFeatureDecoder(originalSft, projectedSft, FeatureEncoding.withName(encoding))
85+
}
86+
87+
object SimpleFeatureEncoder {
88+
def apply(sft: SimpleFeatureType, encoding: FeatureEncoding) =
89+
encoding match {
90+
case FeatureEncoding.AVRO => new AvroFeatureEncoder(sft)
91+
case FeatureEncoding.TEXT => new TextFeatureEncoder(sft)
92+
}
93+
94+
def apply(sft: SimpleFeatureType, encoding: String): SimpleFeatureEncoder =
95+
SimpleFeatureEncoder(sft, FeatureEncoding.withName(encoding))
5396
}
5497

5598
object FeatureEncoding extends Enumeration {
@@ -58,18 +101,38 @@ object FeatureEncoding extends Enumeration {
58101
val TEXT = Value("text")
59102
}
60103

61-
class TextFeatureEncoder(sft: SimpleFeatureType) extends SimpleFeatureEncoder{
62-
override def encode(feature:SimpleFeature): Array[Byte] = ThreadSafeDataUtilities.encodeFeature(feature).getBytes
104+
class TextFeatureEncoder(sft: SimpleFeatureType) extends SimpleFeatureEncoder {
105+
override def encode(feature:SimpleFeature): Array[Byte] =
106+
ThreadSafeDataUtilities.encodeFeature(feature).getBytes
63107

64-
override def decode(bytes: Array[Byte]) = ThreadSafeDataUtilities.createFeature(sft, new String(bytes))
108+
override def encoding: FeatureEncoding = FeatureEncoding.TEXT
109+
}
110+
111+
class TextFeatureDecoder(sft: SimpleFeatureType) extends SimpleFeatureDecoder {
112+
override def decode(bytes: Array[Byte]) =
113+
ThreadSafeDataUtilities.createFeature(sft, new String(bytes))
65114

66115
// This is derived from the knowledge of the GeoTools encoding in DataUtilities
67116
override def extractFeatureId(bytes: Array[Byte]): String = {
68117
val featureString = new String(bytes)
69118
featureString.substring(0, featureString.indexOf("="))
70119
}
71120

72-
override def getEncoding: FeatureEncoding = FeatureEncoding.TEXT
121+
override def encoding: FeatureEncoding = FeatureEncoding.TEXT
122+
}
123+
124+
class ProjectingTextDecoder(original: SimpleFeatureType, projected: SimpleFeatureType)
125+
extends TextFeatureDecoder(original) {
126+
127+
private val fac = AvroSimpleFeatureFactory.featureBuilder(projected)
128+
private val attrs = DataUtilities.attributeNames(projected)
129+
130+
override def decode(bytes: Array[Byte]) = {
131+
val sf = super.decode(bytes)
132+
fac.reset()
133+
attrs.foreach { attr => fac.set(attr, sf.getAttribute(attr)) }
134+
fac.buildFeature(sf.getID)
135+
}
73136
}
74137

75138
/**
@@ -89,44 +152,44 @@ object ThreadSafeDataUtilities {
89152
}
90153
}
91154

92-
/**
93-
* Encode features as avro making reuse of binary decoders and encoders
94-
* as well as a custom datum writer and reader
95-
*
96-
* This class is NOT threadsafe and cannot be shared across multiple threads.
97-
*
98-
* @param sft
99-
*/
100155
class AvroFeatureEncoder(sft: SimpleFeatureType) extends SimpleFeatureEncoder {
101156

102157
private val writer = new AvroSimpleFeatureWriter(sft)
103-
private val reader = FeatureSpecificReader(sft)
104158

105159
// Encode using a direct binary encoder that is reused. No need to buffer
106160
// small simple features. Reuse a common BAOS as well.
107161
private val baos = new ByteArrayOutputStream()
108-
private var reusableEncoder: DirectBinaryEncoder = null
162+
private var reuse: DirectBinaryEncoder = null
163+
109164
override def encode(feature: SimpleFeature): Array[Byte] = {
110165
baos.reset()
111-
reusableEncoder = EncoderFactory.get().directBinaryEncoder(baos, reusableEncoder).asInstanceOf[DirectBinaryEncoder]
112-
writer.write(feature, reusableEncoder)
113-
reusableEncoder.flush()
166+
reuse = EncoderFactory.get().directBinaryEncoder(baos, reuse).asInstanceOf[DirectBinaryEncoder]
167+
writer.write(feature, reuse)
168+
reuse.flush()
114169
baos.toByteArray
115170
}
116171

172+
override def encoding: FeatureEncoding = FeatureEncoding.AVRO
173+
}
174+
175+
class ProjectingAvroFeatureDecoder(original: SimpleFeatureType, projected: SimpleFeatureType)
176+
extends SimpleFeatureDecoder {
177+
178+
private val reader = new FeatureSpecificReader(original, projected)
179+
117180
override def decode(bytes: Array[Byte]) = decode(new ByteArrayInputStream(bytes))
118181

119-
// Use a direct binary encoder that is reused. No need to buffer simple features
120-
// since they are small and no stream read-ahead is required
121-
private var reusableDecoder: BinaryDecoder = null
182+
private var reuse: BinaryDecoder = null
183+
122184
def decode(is: InputStream) = {
123-
reusableDecoder = DecoderFactory.get().directBinaryDecoder(is, reusableDecoder)
124-
reader.read(null, reusableDecoder)
185+
reuse = DecoderFactory.get().directBinaryDecoder(is, reuse)
186+
reader.read(null, reuse)
125187
}
126188

127189
override def extractFeatureId(bytes: Array[Byte]) =
128-
FeatureSpecificReader.extractId(new ByteArrayInputStream(bytes), reusableDecoder)
190+
FeatureSpecificReader.extractId(new ByteArrayInputStream(bytes), reuse)
129191

130-
override def getEncoding: FeatureEncoding = FeatureEncoding.AVRO
192+
override def encoding: FeatureEncoding = FeatureEncoding.AVRO
131193
}
132194

195+
class AvroFeatureDecoder(sft: SimpleFeatureType) extends ProjectingAvroFeatureDecoder(sft, sft)

geomesa-core/src/main/scala/org/locationtech/geomesa/core/data/SimpleFeatureEncoderFactory.scala

Lines changed: 0 additions & 18 deletions
This file was deleted.

geomesa-core/src/main/scala/org/locationtech/geomesa/core/index/AttributeIdxStrategy.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.locationtech.geomesa.core.data.tables.AttributeTable
3232
import org.locationtech.geomesa.core.filter._
3333
import org.locationtech.geomesa.core.index.FilterHelper._
3434
import org.locationtech.geomesa.core.index.QueryPlanner._
35-
import org.locationtech.geomesa.core.iterators.{IteratorConfig, IteratorTrigger, AttributeIndexFilteringIterator}
35+
import org.locationtech.geomesa.core.iterators.{AttributeIndexFilteringIterator, IteratorConfig, IteratorTrigger}
3636
import org.locationtech.geomesa.core.util.{BatchMultiScanner, SelfClosingIterator}
3737
import org.opengis.feature.simple.SimpleFeatureType
3838
import org.opengis.filter.expression.{Expression, Literal, PropertyName}
@@ -54,7 +54,7 @@ trait AttributeIdxStrategy extends Strategy with Logging {
5454
output: ExplainerOutputType): SelfClosingIterator[Entry[Key, Value]] = {
5555
output(s"Searching the attribute table with filter ${query.getFilter}")
5656
val schema = iqp.schema
57-
val featureEncoder = iqp.featureEncoder
57+
val featureEncoding = iqp.featureEncoding
5858

5959
output(s"Scanning attribute table for feature type ${featureType.getTypeName}")
6060
val attrScanner = acc.createAttrIdxScanner(featureType)
@@ -79,7 +79,7 @@ trait AttributeIdxStrategy extends Strategy with Logging {
7979
configureSimpleFeatureFilteringIterator(featureType,
8080
filterListAsAnd(nonSTFilters).map(ECQL.toCQL),
8181
schema,
82-
featureEncoder,
82+
featureEncoding,
8383
query)
8484
recordScanner.addScanIterator(iterSetting)
8585
}

geomesa-core/src/main/scala/org/locationtech/geomesa/core/index/IndexSchema.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ object IndexSchema extends RegexParsers {
332332
val keyPlanner = buildKeyPlanner(s)
333333
val cfPlanner = buildColumnFamilyPlanner(s)
334334
val indexEntryDecoder = IndexEntryDecoder(geohashDecoder, dateDecoder)
335-
val queryPlanner = QueryPlanner(s, featureType, featureEncoder)
335+
val queryPlanner = QueryPlanner(s, featureType, featureEncoder.encoding)
336336
IndexSchema(keyEncoder, indexEntryDecoder, queryPlanner, featureType)
337337
}
338338

0 commit comments

Comments
 (0)