Skip to content

Commit 6aadf0a

Browse files
John Armstronganthonyccri
John Armstrong
authored andcommitted
GEOMESA-424 cql 'OR' filter query does not deduplicate
* moved deduplication code into QueryPlanner.getIterator method and used it for both duplicatable data and OR queries * refactoring to be a bit more selective * added inclusion-exclusion test to AccumuloDataStoreTest
1 parent 38b71fa commit 6aadf0a

File tree

2 files changed

+83
-20
lines changed

2 files changed

+83
-20
lines changed

geomesa-core/src/main/scala/org/locationtech/geomesa/core/index/QueryPlanner.scala

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -85,22 +85,40 @@ case class QueryPlanner(schema: String,
8585
output: ExplainerOutputType = log): CloseableIterator[Entry[Key,Value]] = {
8686

8787
output(s"Running ${ExplainerOutputType.toString(query)}")
88-
8988
val ff = CommonFactoryFinder.getFilterFactory2
9089
val isDensity = query.getHints.containsKey(BBOX_KEY)
91-
val queries: Iterator[Query] =
92-
if(isDensity) {
93-
val env = query.getHints.get(BBOX_KEY).asInstanceOf[ReferencedEnvelope]
94-
val q1 = new Query(featureType.getTypeName, ff.bbox(ff.property(featureType.getGeometryDescriptor.getLocalName), env))
95-
Iterator(DataUtilities.mixQueries(q1, query, "geomesa.mixed.query"))
90+
val duplicatableData = IndexSchema.mayContainDuplicates(featureType)
91+
92+
def flatten(queries: Seq[Query]): CloseableIterator[Entry[Key, Value]] =
93+
queries.toIterator.ciFlatMap(configureScanners(acc, sft, _, isDensity, output))
94+
95+
// in some cases, where duplicates may appear in overlapping queries or the data itself, remove them
96+
def deduplicate(queries: Seq[Query]): CloseableIterator[Entry[Key, Value]] = {
97+
val flatQueries = flatten(queries)
98+
val decoder = SimpleFeatureDecoder(getReturnSFT(query), featureEncoding)
99+
new DeDuplicatingIterator(flatQueries, (key: Key, value: Value) => decoder.extractFeatureId(value))
100+
}
101+
102+
if(isDensity) {
103+
val env = query.getHints.get(BBOX_KEY).asInstanceOf[ReferencedEnvelope]
104+
val q1 = new Query(featureType.getTypeName, ff.bbox(ff.property(featureType.getGeometryDescriptor.getLocalName), env))
105+
val mixedQuery = DataUtilities.mixQueries(q1, query, "geomesa.mixed.query")
106+
if (duplicatableData) {
107+
deduplicate(Seq(mixedQuery))
96108
} else {
97-
splitQueryOnOrs(query, output)
109+
flatten(Seq(mixedQuery))
98110
}
99-
100-
queries.ciFlatMap(configureScanners(acc, sft, _, isDensity, output))
111+
} else {
112+
val rawQueries = splitQueryOnOrs(query, output)
113+
if (rawQueries.length > 1 || duplicatableData) {
114+
deduplicate(rawQueries)
115+
} else {
116+
flatten(rawQueries)
117+
}
118+
}
101119
}
102120

103-
def splitQueryOnOrs(query: Query, output: ExplainerOutputType): Iterator[Query] = {
121+
def splitQueryOnOrs(query: Query, output: ExplainerOutputType): Seq[Query] = {
104122
val originalFilter = query.getFilter
105123
output(s"Original filter: $originalFilter")
106124

@@ -117,7 +135,7 @@ case class QueryPlanner(schema: String,
117135
val q = new Query(query)
118136
q.setFilter(filter)
119137
q
120-
}.toIterator
138+
}
121139
}
122140

123141
/**
@@ -155,20 +173,14 @@ case class QueryPlanner(schema: String,
155173
val returnSFT = getReturnSFT(query)
156174
val decoder = SimpleFeatureDecoder(returnSFT, featureEncoding)
157175

158-
// the final iterator may need duplicates removed
159-
val uniqKVIter: CloseableIterator[Entry[Key,Value]] =
160-
if (IndexSchema.mayContainDuplicates(featureType))
161-
new DeDuplicatingIterator(accumuloIterator, (key: Key, value: Value) => decoder.extractFeatureId(value))
162-
else accumuloIterator
163-
164176
// Decode according to the SFT return type.
165177
// if this is a density query, expand the map
166178
if (query.getHints.containsKey(DENSITY_KEY)) {
167-
uniqKVIter.flatMap { kv: Entry[Key, Value] =>
179+
accumuloIterator.flatMap { kv: Entry[Key, Value] =>
168180
DensityIterator.expandFeature(decoder.decode(kv.getValue))
169181
}
170182
} else {
171-
uniqKVIter.map { kv => decoder.decode(kv.getValue) }
183+
accumuloIterator.map { kv => decoder.decode(kv.getValue) }
172184
}
173185
}
174186

geomesa-core/src/test/scala/org/locationtech/geomesa/core/data/AccumuloDataStoreTest.scala

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.geotools.referencing.CRS
3939
import org.geotools.referencing.crs.DefaultGeographicCRS
4040
import org.joda.time.DateTime
4141
import org.junit.runner.RunWith
42-
import org.locationtech.geomesa.core.index.{IndexSchemaBuilder, _}
42+
import org.locationtech.geomesa.core.index._
4343
import org.locationtech.geomesa.core.iterators.TestData
4444
import org.locationtech.geomesa.core.security.{AuthorizationsProvider, DefaultAuthorizationsProvider, FilteringAuthorizationsProvider}
4545
import org.locationtech.geomesa.core.util.{SelfClosingIterator, CloseableIterator}
@@ -53,6 +53,7 @@ import org.specs2.mutable.Specification
5353
import org.specs2.runner.JUnitRunner
5454

5555
import scala.collection.JavaConversions._
56+
import scala.util.Random
5657

5758
@RunWith(classOf[JUnitRunner])
5859
class AccumuloDataStoreTest extends Specification {
@@ -175,6 +176,7 @@ class AccumuloDataStoreTest extends Specification {
175176
"and there are no results" >> { features.hasNext should be equalTo false }
176177
}
177178
}
179+
178180
"process a DWithin query correctly" in {
179181
// create the data store
180182
val sftName = "dwithintest"
@@ -210,6 +212,55 @@ class AccumuloDataStoreTest extends Specification {
210212
"and no more results" >> { features.hasNext must beFalse }
211213
}
212214

215+
"process an OR query correctly" in {
216+
val sftName = "ortest"
217+
val sft = SimpleFeatureTypes.createType(sftName, s"NAME:String,dtg:Date,*geom:Point:srid=4326")
218+
sft.getUserData.put(SF_PROPERTY_START_TIME, "dtg")
219+
ds.createSchema(sft)
220+
221+
val fs = ds.getFeatureSource(sftName).asInstanceOf[AccumuloFeatureStore]
222+
223+
{
224+
val randVal: (Double, Double) => Double = {
225+
val r = new Random(System.nanoTime())
226+
(low, high) => {
227+
(r.nextDouble() * (high - low)) + low
228+
}
229+
}
230+
val fc = new DefaultFeatureCollection(sftName, sft)
231+
for (i <- 0 until 1000) {
232+
val lat = randVal(-0.001, 0.001)
233+
val lon = randVal(-0.001, 0.001)
234+
val geom = WKTUtils.read(s"POINT($lat $lon)")
235+
val builder = new SimpleFeatureBuilder(sft, featureFactory)
236+
builder.addAll(List("testType", null, geom))
237+
val feature = builder.buildFeature(s"fid-$i")
238+
feature.getUserData.put(Hints.USE_PROVIDED_FID, java.lang.Boolean.TRUE)
239+
fc.add(feature)
240+
}
241+
fs.addFeatures(fc)
242+
}
243+
244+
val geomFactory = JTSFactoryFinder.getGeometryFactory
245+
val urq = ff.dwithin(ff.property("geom"), ff.literal(geomFactory.createPoint(new Coordinate( 0.0005, 0.0005))), 150.0, "meters")
246+
val llq = ff.dwithin(ff.property("geom"), ff.literal(geomFactory.createPoint(new Coordinate(-0.0005, -0.0005))), 150.0, "meters")
247+
val orq = ff.or(urq, llq)
248+
val andq = ff.and(urq, llq)
249+
val urQuery = new Query(sftName, urq)
250+
val llQuery = new Query(sftName, llq)
251+
val orQuery = new Query(sftName, orq)
252+
val andQuery = new Query(sftName, andq)
253+
254+
val urNum = fs.getFeatures( urQuery).features.length
255+
val llNum = fs.getFeatures( llQuery).features.length
256+
val orNum = fs.getFeatures( orQuery).features.length
257+
val andNum = fs.getFeatures(andQuery).features.length
258+
259+
"obeying inclusion-exclusion principle" >> {
260+
(urNum + llNum) mustEqual (orNum + andNum)
261+
}
262+
}
263+
213264
"handle transformations" in {
214265
val sftName = "transformtest1"
215266
val sft = SimpleFeatureTypes.createType(sftName, s"name:String,dtg:Date,*geom:Point:srid=4326")

0 commit comments

Comments
 (0)