Skip to content

Commit 25cdccb

Browse files
elahrvivazAnthony Fox
authored and
Anthony Fox
committed
GEOMESA-449 adding support for lists and maps in simple feature attributes
* Using SimpleFeatureTypes.createType(), you can specify a list or map: * names:List[String]:index=true * weights:Map[String][Double]:index=false * Attribute values will be instances of java.util.List or java.util.Map when retrieved * Lists can be queried through CQL which will execute against the individual items in the list * Can use standard comparison operators * Given the attribute names=List('bill', 'bob', 'henry') => * (names = 'bob') will match * (names < 'joe') will match * (names > 'joe') will match * (names > 'mark') will not match * Maps can not be queried, but can be stored and retrieved
1 parent 4442908 commit 25cdccb

File tree

25 files changed

+1539
-234
lines changed

25 files changed

+1539
-234
lines changed

geomesa-core/src/main/scala/org/locationtech/geomesa/core/data/AccumuloDataStore.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import org.locationtech.geomesa.core.index._
4343
import org.locationtech.geomesa.core.security.AuthorizationsProvider
4444
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes
4545
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.{AttributeSpec, NonGeomAttributeSpec}
46+
import org.opengis.feature.`type`.AttributeDescriptor
4647
import org.opengis.feature.simple.SimpleFeatureType
4748
import org.opengis.filter.Filter
4849
import org.opengis.referencing.crs.CoordinateReferenceSystem
@@ -337,8 +338,8 @@ class AccumuloDataStore(val connector: Connector,
337338
val indexedAttrs = SimpleFeatureTypes.getIndexedAttributes(featureType)
338339
if (!indexedAttrs.isEmpty) {
339340
val prefix = index.getTableSharingPrefix(featureType)
340-
val prefixFn = AttributeTable.getAttributeIndexRowPrefix(prefix, _: String)
341-
val names = indexedAttrs.map(_.getLocalName).map(prefixFn).map(new Text(_))
341+
val prefixFn = AttributeTable.getAttributeIndexRowPrefix(prefix, _: AttributeDescriptor)
342+
val names = indexedAttrs.map(prefixFn).map(new Text(_))
342343
val splits = ImmutableSortedSet.copyOf(names.toArray)
343344
tableOps.addSplits(attributeIndexTable, splits)
344345
}

geomesa-core/src/main/scala/org/locationtech/geomesa/core/data/tables/AttributeTable.scala

Lines changed: 66 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.locationtech.geomesa.core.data.tables
1919

20-
import java.util.{Date, Locale}
20+
import java.util.{Date, Locale, Collection => JCollection, Map => JMap}
2121

2222
import com.typesafe.scalalogging.slf4j.Logging
2323
import org.apache.accumulo.core.client.BatchWriter
@@ -27,11 +27,14 @@ import org.apache.hadoop.io.Text
2727
import org.calrissian.mango.types.{LexiTypeEncoders, SimpleTypeEncoders, TypeEncoder}
2828
import org.joda.time.format.ISODateTimeFormat
2929
import org.locationtech.geomesa.core.data._
30-
import org.locationtech.geomesa.core.index._
30+
import org.locationtech.geomesa.core.index
31+
import org.locationtech.geomesa.core.index.IndexEntry
32+
import org.locationtech.geomesa.utils.geotools.Conversions.RichAttributeDescriptor
3133
import org.opengis.feature.`type`.AttributeDescriptor
3234
import org.opengis.feature.simple.SimpleFeature
3335

3436
import scala.collection.JavaConversions._
37+
import scala.collection.JavaConverters._
3538
import scala.util.{Failure, Success, Try}
3639

3740
/**
@@ -85,70 +88,107 @@ object AttributeTable extends GeoMesaTable with Logging {
8588
delete: Boolean = false): Seq[Mutation] = {
8689
val cq = new Text(feature.getID)
8790
lazy val value = IndexEntry.encodeIndexValue(feature)
88-
indexedAttributes.map { descriptor =>
91+
indexedAttributes.flatMap { descriptor =>
8992
val attribute = Option(feature.getAttribute(descriptor.getName))
90-
val m = new Mutation(getAttributeIndexRow(rowIdPrefix, descriptor.getLocalName, attribute))
93+
val mutations = getAttributeIndexRows(rowIdPrefix, descriptor, attribute).map(new Mutation(_))
9194
if (delete) {
92-
m.putDelete(EMPTY_COLF, cq, visibility)
95+
mutations.foreach(_.putDelete(EMPTY_COLF, cq, visibility))
9396
} else {
94-
m.put(EMPTY_COLF, cq, visibility, value)
97+
mutations.foreach(_.put(EMPTY_COLF, cq, visibility, value))
9598
}
96-
m
99+
mutations
97100
}
98101
}
99102

100103
/**
101-
* Gets a row key for the attribute index
104+
* Gets row keys for the attribute index. Usually a single row will be returned, but collections
105+
* will result in multiple rows.
102106
*
103-
* @param attributeName
104-
* @param attributeValue
107+
* @param rowIdPrefix
108+
* @param descriptor
109+
* @param value
105110
* @return
106111
*/
107-
def getAttributeIndexRow(rowIdPrefix: String, attributeName: String, attributeValue: Option[Any]): String =
108-
getAttributeIndexRowPrefix(rowIdPrefix, attributeName) ++ encode(attributeValue)
112+
def getAttributeIndexRows(rowIdPrefix: String,
113+
descriptor: AttributeDescriptor,
114+
value: Option[Any]): Seq[String] = {
115+
val prefix = getAttributeIndexRowPrefix(rowIdPrefix, descriptor)
116+
encode(value, descriptor).map(prefix + _)
117+
}
109118

110119
/**
111120
* Gets a prefix for an attribute row - useful for ranges over a particular attribute
112121
*
113-
* @param attributeName
122+
* @param rowIdPrefix
123+
* @param descriptor
114124
* @return
115125
*/
116-
def getAttributeIndexRowPrefix(rowIdPrefix: String, attributeName: String): String =
117-
rowIdPrefix ++ attributeName ++ NULLBYTE
126+
def getAttributeIndexRowPrefix(rowIdPrefix: String, descriptor: AttributeDescriptor): String =
127+
rowIdPrefix ++ descriptor.getLocalName ++ NULLBYTE
118128

119129
/**
120130
* Decodes an attribute value out of row string
121131
*
122132
* @param rowIdPrefix table sharing prefix
123-
* @param attributeType class of the attribute we're decoding
133+
* @param descriptor the attribute we're decoding
124134
* @param row
125135
* @return
126136
*/
127-
def decodeAttributeIndexRow(rowIdPrefix: String, attributeType: Class[_], row: String): Try[AttributeIndexRow] =
137+
def decodeAttributeIndexRow(rowIdPrefix: String,
138+
descriptor: AttributeDescriptor,
139+
row: String): Try[AttributeIndexRow] =
128140
for {
129141
suffix <- Try(row.substring(rowIdPrefix.length))
130142
separator = suffix.indexOf(NULLBYTE)
131-
name <- Try(suffix.substring(0, separator))
132-
encodedValue <- Try(suffix.substring(separator + 1))
133-
decodedValue <- Try(decode(encodedValue, attributeType))
143+
name = suffix.substring(0, separator)
144+
encodedValue = suffix.substring(separator + 1)
145+
decodedValue = decode(encodedValue, descriptor)
134146
} yield {
135147
AttributeIndexRow(name, decodedValue)
136148
}
137149

138150
/**
139-
* Lexicographically encode the value
151+
* Lexicographically encode the value. Collections will return multiple rows, one for each entry.
140152
*
141153
* @param valueOption
154+
* @param descriptor
142155
* @return
143156
*/
144-
def encode(valueOption: Option[Any]): String = {
157+
def encode(valueOption: Option[Any], descriptor: AttributeDescriptor): Seq[String] = {
145158
val value = valueOption.getOrElse(nullString)
146-
Try(typeRegistry.encode(value)).getOrElse(value.toString)
159+
if (descriptor.isCollection) {
160+
// encode each value into a separate row
161+
value.asInstanceOf[JCollection[_]].asScala.toSeq.map(Option(_).getOrElse(nullString)).map(typeEncode)
162+
} else if (descriptor.isMap) {
163+
// TODO GEOMESA-454 - support querying against map attributes
164+
Seq.empty
165+
} else {
166+
Seq(typeEncode(value))
167+
}
147168
}
148169

149-
def decode(encoded: String, attributeType: Class[_]): Any = {
150-
val alias = attributeType.getSimpleName.toLowerCase(Locale.US)
151-
typeRegistry.decode(alias, encoded)
170+
private def typeEncode(value: Any): String = Try(typeRegistry.encode(value)).getOrElse(value.toString)
171+
172+
/**
173+
* Decode an encoded value. Note that for collection types, only a single entry of the collection
174+
* will be decoded - this is because the collection entries have been broken up into multiple rows.
175+
*
176+
* @param encoded
177+
* @param descriptor
178+
* @return
179+
*/
180+
def decode(encoded: String, descriptor: AttributeDescriptor): Any = {
181+
if (descriptor.isCollection) {
182+
// get the alias from the type of values in the collection
183+
val alias = index.getCollectionType(descriptor).map(_.getSimpleName.toLowerCase(Locale.US)).head
184+
Seq(typeRegistry.decode(alias, encoded)).asJava
185+
} else if (descriptor.isMap) {
186+
// TODO GEOMESA-454 - support querying against map attributes
187+
Map.empty.asJava
188+
} else {
189+
val alias = descriptor.getType.getBinding.getSimpleName.toLowerCase(Locale.US)
190+
typeRegistry.decode(alias, encoded)
191+
}
152192
}
153193

154194
private val dateFormat = ISODateTimeFormat.dateTime()

geomesa-core/src/main/scala/org/locationtech/geomesa/core/index/AttributeIdxStrategy.scala

Lines changed: 70 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,15 @@ package org.locationtech.geomesa.core.index
1818

1919

2020
import java.util.Map.Entry
21+
import java.util.{Collection => JCollection, Map => JMap}
2122

23+
import com.typesafe.scalalogging.slf4j.Logging
2224
import org.apache.accumulo.core.client.{BatchScanner, IteratorSetting, Scanner}
2325
import org.apache.accumulo.core.data.{Key, Value, Range => AccRange}
2426
import org.apache.hadoop.io.Text
2527
import org.geotools.data.Query
2628
import org.geotools.filter.text.ecql.ECQL
2729
import org.geotools.temporal.`object`.DefaultPeriod
28-
import org.locationtech.geomesa.core.DEFAULT_FILTER_PROPERTY_NAME
2930
import org.locationtech.geomesa.core.data.FeatureEncoding.FeatureEncoding
3031
import org.locationtech.geomesa.core.data._
3132
import org.locationtech.geomesa.core.data.tables.{AttributeTable, RecordTable}
@@ -34,14 +35,17 @@ import org.locationtech.geomesa.core.index.FilterHelper._
3435
import org.locationtech.geomesa.core.index.QueryPlanner._
3536
import org.locationtech.geomesa.core.iterators._
3637
import org.locationtech.geomesa.core.util.{BatchMultiScanner, SelfClosingIterator}
38+
import org.locationtech.geomesa.core.{DEFAULT_FILTER_PROPERTY_NAME, index}
39+
import org.locationtech.geomesa.utils.geotools.Conversions.RichAttributeDescriptor
3740
import org.opengis.feature.simple.SimpleFeatureType
3841
import org.opengis.filter.expression.{Expression, Literal, PropertyName}
3942
import org.opengis.filter.temporal.{After, Before, During, TEquals}
4043
import org.opengis.filter.{Filter, PropertyIsEqualTo, PropertyIsLike, _}
4144

4245
import scala.collection.JavaConversions._
46+
import scala.collection.JavaConverters._
4347

44-
trait AttributeIdxStrategy extends Strategy {
48+
trait AttributeIdxStrategy extends Strategy with Logging {
4549

4650
/**
4751
* Perform scan against the Attribute Index Table and get an iterator returning records from the Record table
@@ -77,7 +81,7 @@ trait AttributeIdxStrategy extends Strategy {
7781

7882
val opts = oFilter.map(f => DEFAULT_FILTER_PROPERTY_NAME -> ECQL.toCQL(f)).toMap
7983

80-
iteratorChoice.iterator match {
84+
val iter = iteratorChoice.iterator match {
8185
case IndexOnlyIterator =>
8286
indexOnlyIterator(attrScanner,
8387
featureType,
@@ -100,6 +104,19 @@ trait AttributeIdxStrategy extends Strategy {
100104
opts,
101105
output)
102106
}
107+
108+
// wrap with a de-duplicator if the attribute could have multiple values, and it won't be
109+
// de-duped by the query planner
110+
if (!IndexSchema.mayContainDuplicates(featureType)
111+
&& featureType.getDescriptor(attributeName).isMultiValued) {
112+
val returnSft = Option(query.getHints.get(TRANSFORM_SCHEMA).asInstanceOf[SimpleFeatureType])
113+
.getOrElse(featureType)
114+
val decoder = SimpleFeatureDecoder(returnSft, iqp.featureEncoding)
115+
val deduper = new DeDuplicatingIterator(iter, (_: Key, value: Value) => decoder.extractFeatureId(value))
116+
SelfClosingIterator(deduper)
117+
} else {
118+
iter
119+
}
103120
}
104121

105122
/**
@@ -242,21 +259,34 @@ trait AttributeIdxStrategy extends Strategy {
242259
* @return
243260
*/
244261
def getEncodedAttrIdxRow(sft: SimpleFeatureType, prop: String, value: Any): String = {
262+
val descriptor = sft.getDescriptor(prop)
245263
// the class type as defined in the SFT
246-
val expectedBinding = sft.getDescriptor(prop).getType.getBinding
264+
val expectedBinding = descriptor.getType.getBinding
247265
// the class type of the literal pulled from the query
248266
val actualBinding = value.getClass
249-
val typedValue =
250-
if (expectedBinding.equals(actualBinding)) {
251-
value
267+
val typedValue = if (expectedBinding == actualBinding) {
268+
value
269+
} else if (descriptor.isCollection) {
270+
// we need to encode with the collection type
271+
val collectionType = index.getCollectionType(descriptor).head
272+
if (collectionType == actualBinding) {
273+
Seq(value).asJava
252274
} else {
253-
// type mismatch, encoding won't work b/c class is stored as part of the row
254-
// try to convert to the appropriate class
255-
AttributeTable.convertType(value, actualBinding, expectedBinding)
275+
Seq(AttributeTable.convertType(value, actualBinding, collectionType)).asJava
256276
}
277+
} else if (descriptor.isMap) {
278+
// TODO GEOMESA-454 - support querying against map attributes
279+
Map.empty.asJava
280+
} else {
281+
// type mismatch, encoding won't work b/c value is wrong class
282+
// try to convert to the appropriate class
283+
AttributeTable.convertType(value, actualBinding, expectedBinding)
284+
}
257285

258286
val rowIdPrefix = org.locationtech.geomesa.core.index.getTableSharingPrefix(sft)
259-
AttributeTable.getAttributeIndexRow(rowIdPrefix, prop, Some(typedValue))
287+
// grab the first encoded row - right now there will only ever be a single item in the seq
288+
// eventually we may support searching a whole collection at once
289+
AttributeTable.getAttributeIndexRows(rowIdPrefix, descriptor, Some(typedValue)).head
260290
}
261291

262292
/**
@@ -305,36 +335,38 @@ class AttributeIdxEqualsStrategy extends AttributeIdxStrategy {
305335

306336
override def execute(acc: AccumuloConnectorCreator,
307337
iqp: QueryPlanner,
308-
featureType: SimpleFeatureType,
338+
sft: SimpleFeatureType,
309339
query: Query,
310340
output: ExplainerOutputType): SelfClosingIterator[Entry[Key, Value]] = {
311-
val (strippedQuery, filter) = partitionFilter(query, featureType)
341+
val (strippedQuery, filter) = partitionFilter(query, sft)
312342
val (prop, range) =
313343
filter match {
314344
case f: PropertyIsEqualTo =>
315345
val (prop, lit, _) = checkOrder(f.getExpression1, f.getExpression2)
316-
(prop, AccRange.exact(getEncodedAttrIdxRow(featureType, prop, lit)))
346+
(prop, AccRange.exact(getEncodedAttrIdxRow(sft, prop, lit)))
317347

318348
case f: TEquals =>
319349
val (prop, lit, _) = checkOrder(f.getExpression1, f.getExpression2)
320-
(prop, AccRange.exact(getEncodedAttrIdxRow(featureType, prop, lit)))
350+
(prop, AccRange.exact(getEncodedAttrIdxRow(sft, prop, lit)))
321351

322352
case f: PropertyIsNil =>
323353
val prop = f.getExpression.asInstanceOf[PropertyName].getPropertyName
324-
val rowIdPrefix = org.locationtech.geomesa.core.index.getTableSharingPrefix(featureType)
325-
(prop, AccRange.exact(AttributeTable.getAttributeIndexRow(rowIdPrefix, prop, None)))
354+
val rowIdPrefix = org.locationtech.geomesa.core.index.getTableSharingPrefix(sft)
355+
val exact = AttributeTable.getAttributeIndexRows(rowIdPrefix, sft.getDescriptor(prop), None).head
356+
(prop, AccRange.exact(exact))
326357

327358
case f: PropertyIsNull =>
328359
val prop = f.getExpression.asInstanceOf[PropertyName].getPropertyName
329-
val rowIdPrefix = org.locationtech.geomesa.core.index.getTableSharingPrefix(featureType)
330-
(prop, AccRange.exact(AttributeTable.getAttributeIndexRow(rowIdPrefix, prop, None)))
360+
val rowIdPrefix = org.locationtech.geomesa.core.index.getTableSharingPrefix(sft)
361+
val exact = AttributeTable.getAttributeIndexRows(rowIdPrefix, sft.getDescriptor(prop), None).head
362+
(prop, AccRange.exact(exact))
331363

332364
case _ =>
333365
val msg = s"Unhandled filter type in equals strategy: ${filter.getClass.getName}"
334366
throw new RuntimeException(msg)
335367
}
336368

337-
attrIdxQuery(acc, strippedQuery, iqp, featureType, prop, range, output)
369+
attrIdxQuery(acc, strippedQuery, iqp, sft, prop, range, output)
338370
}
339371
}
340372

@@ -415,31 +447,33 @@ class AttributeIdxRangeStrategy extends AttributeIdxStrategy {
415447
attrIdxQuery(acc, strippedQuery, iqp, featureType, prop, range, output)
416448
}
417449

418-
private def greaterThanRange(featureType: SimpleFeatureType, prop: String, lit: AnyRef): AccRange = {
419-
val rowIdPrefix = org.locationtech.geomesa.core.index.getTableSharingPrefix(featureType)
420-
val start = new Text(getEncodedAttrIdxRow(featureType, prop, lit))
421-
val end = AccRange.followingPrefix(new Text(AttributeTable.getAttributeIndexRowPrefix(rowIdPrefix, prop)))
450+
private def greaterThanRange(sft: SimpleFeatureType, prop: String, lit: AnyRef): AccRange = {
451+
val rowIdPrefix = getTableSharingPrefix(sft)
452+
val start = new Text(getEncodedAttrIdxRow(sft, prop, lit))
453+
val endPrefix = AttributeTable.getAttributeIndexRowPrefix(rowIdPrefix, sft.getDescriptor(prop))
454+
val end = AccRange.followingPrefix(new Text(endPrefix))
422455
new AccRange(start, false, end, false)
423456
}
424457

425-
private def greaterThanOrEqualRange(featureType: SimpleFeatureType, prop: String, lit: AnyRef): AccRange = {
426-
val rowIdPrefix = org.locationtech.geomesa.core.index.getTableSharingPrefix(featureType)
427-
val start = new Text(getEncodedAttrIdxRow(featureType, prop, lit))
428-
val end = AccRange.followingPrefix(new Text(AttributeTable.getAttributeIndexRowPrefix(rowIdPrefix, prop)))
458+
private def greaterThanOrEqualRange(sft: SimpleFeatureType, prop: String, lit: AnyRef): AccRange = {
459+
val rowIdPrefix = getTableSharingPrefix(sft)
460+
val start = new Text(getEncodedAttrIdxRow(sft, prop, lit))
461+
val endPrefix = AttributeTable.getAttributeIndexRowPrefix(rowIdPrefix, sft.getDescriptor(prop))
462+
val end = AccRange.followingPrefix(new Text(endPrefix))
429463
new AccRange(start, true, end, false)
430464
}
431465

432-
private def lessThanRange(featureType: SimpleFeatureType, prop: String, lit: AnyRef): AccRange = {
433-
val rowIdPrefix = org.locationtech.geomesa.core.index.getTableSharingPrefix(featureType)
434-
val start = AttributeTable.getAttributeIndexRowPrefix(rowIdPrefix, prop)
435-
val end = getEncodedAttrIdxRow(featureType, prop, lit)
466+
private def lessThanRange(sft: SimpleFeatureType, prop: String, lit: AnyRef): AccRange = {
467+
val rowIdPrefix = getTableSharingPrefix(sft)
468+
val start = AttributeTable.getAttributeIndexRowPrefix(rowIdPrefix, sft.getDescriptor(prop))
469+
val end = getEncodedAttrIdxRow(sft, prop, lit)
436470
new AccRange(start, false, end, false)
437471
}
438472

439-
private def lessThanOrEqualRange(featureType: SimpleFeatureType, prop: String, lit: AnyRef): AccRange = {
440-
val rowIdPrefix = org.locationtech.geomesa.core.index.getTableSharingPrefix(featureType)
441-
val start = AttributeTable.getAttributeIndexRowPrefix(rowIdPrefix, prop)
442-
val end = getEncodedAttrIdxRow(featureType, prop, lit)
473+
private def lessThanOrEqualRange(sft: SimpleFeatureType, prop: String, lit: AnyRef): AccRange = {
474+
val rowIdPrefix = getTableSharingPrefix(sft)
475+
val start = AttributeTable.getAttributeIndexRowPrefix(rowIdPrefix, sft.getDescriptor(prop))
476+
val end = getEncodedAttrIdxRow(sft, prop, lit)
443477
new AccRange(start, false, end, true)
444478
}
445479
}

0 commit comments

Comments
 (0)