Skip to content

Commit 4965b42

Browse files
Ryan Brooksjnh5y
Ryan Brooks
authored andcommitted
GEOMESA-445 Create Temporal Density Iterator
1 parent eebcf4c commit 4965b42

File tree

7 files changed

+495
-12
lines changed

7 files changed

+495
-12
lines changed

geomesa-core/src/main/scala/org/locationtech/geomesa/core/index/QueryPlanner.scala

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@ import org.locationtech.geomesa.core.data.FeatureEncoding.FeatureEncoding
2828
import org.locationtech.geomesa.core.data._
2929
import org.locationtech.geomesa.core.filter._
3030
import org.locationtech.geomesa.core.index.QueryHints._
31-
import org.locationtech.geomesa.core.iterators.{DeDuplicatingIterator, DensityIterator}
31+
import org.locationtech.geomesa.core.iterators.TemporalDensityIterator._
32+
import org.locationtech.geomesa.core.iterators.{DeDuplicatingIterator, DensityIterator, TemporalDensityIterator}
3233
import org.locationtech.geomesa.core.util.CloseableIterator._
3334
import org.locationtech.geomesa.core.util.{CloseableIterator, SelfClosingIterator}
35+
import org.locationtech.geomesa.feature.AvroSimpleFeatureFactory
3436
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes
3537
import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}
3638
import org.opengis.filter.sort.{SortBy, SortOrder}
@@ -46,6 +48,8 @@ object QueryPlanner {
4648
val iteratorPriority_SpatioTemporalIterator = 200
4749
val iteratorPriority_SimpleFeatureFilteringIterator = 300
4850
val iteratorPriority_AnalysisIterator = 400
51+
52+
val zeroPoint = new GeometryFactory().createPoint(new Coordinate(0,0))
4953
}
5054

5155
case class QueryPlanner(schema: String,
@@ -153,7 +157,7 @@ case class QueryPlanner(schema: String,
153157
private def configureScanners(acc: AccumuloConnectorCreator,
154158
sft: SimpleFeatureType,
155159
derivedQuery: Query,
156-
isDensity: Boolean,
160+
isADensity: Boolean,
157161
output: ExplainerOutputType): SelfClosingIterator[Entry[Key, Value]] = {
158162
output(s"Transforms: ${derivedQuery.getHints.get(TRANSFORMS)}")
159163
val strategy = QueryStrategyDecider.chooseStrategy(acc.catalogTableFormat(sft), sft, derivedQuery)
@@ -179,9 +183,24 @@ case class QueryPlanner(schema: String,
179183
// Decode according to the SFT return type.
180184
// if this is a density query, expand the map
181185
if (query.getHints.containsKey(DENSITY_KEY)) {
182-
accumuloIterator.flatMap { kv: Entry[Key, Value] =>
186+
accumuloIterator.flatMap { kv =>
183187
DensityIterator.expandFeature(decoder.decode(kv.getValue))
184188
}
189+
} else if (query.getHints.containsKey(TEMPORAL_DENSITY_KEY)) {
190+
val timeSeriesStrings = accumuloIterator.map { kv =>
191+
decoder.decode(kv.getValue).getAttribute(ENCODED_TIME_SERIES).toString
192+
}
193+
194+
val summedTimeSeries = timeSeriesStrings.map(decodeTimeSeries).reduce(combineTimeSeries)
195+
196+
val featureBuilder = AvroSimpleFeatureFactory.featureBuilder(returnSFT)
197+
featureBuilder.reset()
198+
featureBuilder.add(TemporalDensityIterator.encodeTimeSeries(summedTimeSeries))
199+
200+
featureBuilder.add(QueryPlanner.zeroPoint) //Filler value as Feature requires a geometry
201+
val result = featureBuilder.buildFeature(null)
202+
203+
List(result).iterator
185204
} else {
186205
val features = accumuloIterator.map { kv => decoder.decode(kv.getValue) }
187206
if(query.getSortBy != null && query.getSortBy.length > 0) sort(features, query.getSortBy)
@@ -219,9 +238,10 @@ case class QueryPlanner(schema: String,
219238
query match {
220239
case _: Query if query.getHints.containsKey(DENSITY_KEY) =>
221240
SimpleFeatureTypes.createType(featureType.getTypeName, DensityIterator.DENSITY_FEATURE_STRING)
241+
case _: Query if query.getHints.containsKey(TEMPORAL_DENSITY_KEY) =>
242+
SimpleFeatureTypes.createType(featureType.getTypeName, TemporalDensityIterator.TEMPORAL_DENSITY_FEATURE_STRING)
222243
case _: Query if query.getHints.get(TRANSFORM_SCHEMA) != null =>
223244
query.getHints.get(TRANSFORM_SCHEMA).asInstanceOf[SimpleFeatureType]
224245
case _ => featureType
225246
}
226247
}
227-

geomesa-core/src/main/scala/org/locationtech/geomesa/core/index/QueryStrategyDecider.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ object QueryStrategyDecider {
3939

4040
def chooseNewStrategy(sft: SimpleFeatureType, query: Query): Strategy = {
4141
val filter = query.getFilter
42-
val isDensity = query.getHints.containsKey(BBOX_KEY)
42+
val isADensity = query.getHints.containsKey(BBOX_KEY) || query.getHints.contains(TIME_BUCKETS_KEY)
4343

44-
if (isDensity) {
44+
if (isADensity) {
4545
// TODO GEOMESA-322 use other strategies with density iterator
4646
new STIdxStrategy
4747
} else {
@@ -50,14 +50,14 @@ object QueryStrategyDecider {
5050
attributeStrategy.getOrElse {
5151
filter match {
5252
case idFilter: Id => new RecordIdxStrategy
53-
case and: And => processAnd(isDensity, sft, and)
53+
case and: And => processAnd(isADensity, sft, and)
5454
case cql => new STIdxStrategy
5555
}
5656
}
5757
}
5858
}
5959

60-
private def processAnd(isDensity: Boolean, sft: SimpleFeatureType, and: And): Strategy = {
60+
private def processAnd(isADensity: Boolean, sft: SimpleFeatureType, and: And): Strategy = {
6161
val children: util.List[Filter] = decomposeAnd(and)
6262

6363
def determineStrategy(attr: Filter, st: Filter): Strategy = {

geomesa-core/src/main/scala/org/locationtech/geomesa/core/index/Strategy.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import com.vividsolutions.jts.geom.{Geometry, Polygon}
2222
import org.apache.accumulo.core.client.{BatchScanner, IteratorSetting}
2323
import org.apache.accumulo.core.data.{Key, Value}
2424
import org.geotools.data.Query
25+
import org.joda.time.Interval
2526
import org.locationtech.geomesa.core._
2627
import org.locationtech.geomesa.core.data.FeatureEncoding.FeatureEncoding
2728
import org.locationtech.geomesa.core.data._
@@ -138,6 +139,24 @@ trait Strategy {
138139
configureFeatureEncoding(cfg, featureEncoding)
139140
configureFeatureType(cfg, featureType)
140141

142+
Some(cfg)
143+
}
144+
else if (query.getHints.containsKey(TEMPORAL_DENSITY_KEY)){
145+
val clazz = classOf[TemporalDensityIterator]
146+
147+
val cfg = new IteratorSetting(iteratorPriority_AnalysisIterator,
148+
"topfilter-" + randomPrintableString(5),
149+
clazz)
150+
151+
val interval = query.getHints.get(TIME_INTERVAL_KEY).asInstanceOf[Interval]
152+
val buckets = query.getHints.get(TIME_BUCKETS_KEY).asInstanceOf[Int]
153+
154+
TemporalDensityIterator.configure(cfg, interval, buckets)
155+
156+
cfg.addOption(DEFAULT_SCHEMA_NAME, schema)
157+
configureFeatureEncoding(cfg, featureEncoding)
158+
configureFeatureType(cfg, featureType)
159+
141160
Some(cfg)
142161
} else None
143162
}

geomesa-core/src/main/scala/org/locationtech/geomesa/core/index/index.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ package object index {
9191
val WIDTH_KEY = new IntegerKey(256)
9292
val HEIGHT_KEY = new IntegerKey(256)
9393
val BBOX_KEY = new ClassKey(classOf[ReferencedEnvelope])
94+
val TEMPORAL_DENSITY_KEY = new ClassKey(classOf[java.lang.Boolean])
95+
96+
val TIME_INTERVAL_KEY = new ClassKey(classOf[org.joda.time.Interval])
97+
val TIME_BUCKETS_KEY = new IntegerKey(256)
9498
}
9599

96100
type ExplainerOutputType = ( => String) => Unit
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
/*
2+
* Copyright 2014 Commonwealth Computer Research, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the License);
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an AS IS BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
18+
package org.locationtech.geomesa.core.iterators
19+
20+
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
21+
import java.util.Date
22+
import java.{util => ju}
23+
24+
import com.typesafe.scalalogging.slf4j.Logging
25+
import com.vividsolutions.jts.geom._
26+
import org.apache.accumulo.core.client.IteratorSetting
27+
import org.apache.accumulo.core.data.{ByteSequence, Key, Value, Range => ARange}
28+
import org.apache.accumulo.core.iterators.{IteratorEnvironment, SortedKeyValueIterator}
29+
import org.apache.commons.codec.binary.Base64
30+
import org.geotools.feature.simple.SimpleFeatureBuilder
31+
import org.geotools.geometry.jts.JTSFactoryFinder
32+
import org.joda.time.{DateTime, Interval}
33+
import org.locationtech.geomesa.core._
34+
import org.locationtech.geomesa.core.data.{FeatureEncoding, SimpleFeatureDecoder, SimpleFeatureEncoder}
35+
import org.locationtech.geomesa.core.index.{IndexEntryDecoder, _}
36+
import org.locationtech.geomesa.feature.AvroSimpleFeatureFactory
37+
import org.locationtech.geomesa.utils.geotools.{SimpleFeatureTypes, TimeSnap}
38+
import org.opengis.feature.simple.SimpleFeatureType
39+
40+
import scala.util.Random
41+
42+
class TemporalDensityIterator(other: TemporalDensityIterator, env: IteratorEnvironment) extends SortedKeyValueIterator[Key, Value] {
43+
44+
import org.locationtech.geomesa.core.iterators.TemporalDensityIterator.{TEMPORAL_DENSITY_FEATURE_STRING, TimeSeries}
45+
46+
var curRange: ARange = null
47+
var result: TimeSeries = new collection.mutable.HashMap[DateTime, Long]()
48+
var projectedSFT: SimpleFeatureType = null
49+
var featureBuilder: SimpleFeatureBuilder = null
50+
var snap: TimeSnap = null
51+
var topTemporalDensityKey: Option[Key] = None
52+
var topTemporalDensityValue: Option[Value] = None
53+
protected var decoder: IndexEntryDecoder = null
54+
55+
var simpleFeatureType: SimpleFeatureType = null
56+
var source: SortedKeyValueIterator[Key,Value] = null
57+
58+
var topSourceKey: Key = null
59+
var topSourceValue: Value = null
60+
var originalDecoder: SimpleFeatureDecoder = null
61+
var temporalDensityFeatureEncoder: SimpleFeatureEncoder = null
62+
63+
var dateTimeFieldName: String = null
64+
65+
def this() = this(null, null)
66+
67+
def init(source: SortedKeyValueIterator[Key, Value],
68+
options: ju.Map[String, String],
69+
env: IteratorEnvironment): Unit = {
70+
this.source = source
71+
72+
val simpleFeatureTypeSpec = options.get(GEOMESA_ITERATORS_SIMPLE_FEATURE_TYPE)
73+
simpleFeatureType = SimpleFeatureTypes.createType(this.getClass.getCanonicalName, simpleFeatureTypeSpec)
74+
simpleFeatureType.decodeUserData(options, GEOMESA_ITERATORS_SIMPLE_FEATURE_TYPE)
75+
76+
dateTimeFieldName = getDtgFieldName(simpleFeatureType).getOrElse ( throw new IllegalArgumentException("dtg field required"))
77+
78+
// default to text if not found for backwards compatibility
79+
val encodingOpt = Option(options.get(FEATURE_ENCODING)).getOrElse(FeatureEncoding.TEXT.toString)
80+
originalDecoder = SimpleFeatureDecoder(simpleFeatureType, encodingOpt)
81+
82+
projectedSFT = SimpleFeatureTypes.createType(simpleFeatureType.getTypeName, TEMPORAL_DENSITY_FEATURE_STRING)
83+
84+
temporalDensityFeatureEncoder = SimpleFeatureEncoder(projectedSFT, encodingOpt)
85+
featureBuilder = AvroSimpleFeatureFactory.featureBuilder(projectedSFT)
86+
87+
val buckets = TemporalDensityIterator.getBuckets(options)
88+
val bounds = TemporalDensityIterator.getTimeBounds(options)
89+
snap = new TimeSnap(bounds, buckets)
90+
91+
}
92+
93+
/**
94+
* Combines the results from the underlying iterator stack
95+
* into a single feature
96+
*/
97+
def findTop() = {
98+
// reset our 'top' (current) variables
99+
result.clear()
100+
topSourceKey = null
101+
topSourceValue = null
102+
103+
while(source.hasTop && !curRange.afterEndKey(source.getTopKey)) {
104+
topSourceKey = source.getTopKey
105+
topSourceValue = source.getTopValue //SimpleFeature
106+
107+
val date = originalDecoder.decode(topSourceValue).getAttribute(dateTimeFieldName).asInstanceOf[Date]
108+
val dateTime = new DateTime(date.getTime)
109+
addResultDate(dateTime)
110+
111+
source.next()
112+
}
113+
114+
if(topSourceKey != null) {
115+
featureBuilder.reset()
116+
featureBuilder.add(TemporalDensityIterator.encodeTimeSeries(result))
117+
featureBuilder.add(TemporalDensityIterator.zeroPoint) //Filler value as Feature requires a geometry
118+
val feature = featureBuilder.buildFeature(Random.nextString(6))
119+
topTemporalDensityKey = Some(topSourceKey)
120+
topTemporalDensityValue = Some(new Value(temporalDensityFeatureEncoder.encode(feature)))
121+
}
122+
}
123+
124+
/** take a given Coordinate and add 1 to the result time that it corresponds to via the snap time */
125+
def addResultDate(date: DateTime) = {
126+
val t: DateTime = snap.t(snap.i(date))
127+
val cur: Long = result.get(t).getOrElse(0L)
128+
result.put(t, cur + 1L)
129+
}
130+
131+
override def seek(range: ARange,
132+
columnFamilies: ju.Collection[ByteSequence],
133+
inclusive: Boolean): Unit = {
134+
curRange = range
135+
source.seek(range, columnFamilies, inclusive)
136+
findTop()
137+
}
138+
139+
def hasTop: Boolean = topTemporalDensityKey.nonEmpty
140+
141+
def getTopKey: Key = topTemporalDensityKey.orNull
142+
143+
def getTopValue = topTemporalDensityValue.orNull
144+
145+
def deepCopy(env: IteratorEnvironment): SortedKeyValueIterator[Key, Value] = new TemporalDensityIterator(this, env)
146+
147+
def next(): Unit = if(!source.hasTop) {
148+
topTemporalDensityKey = None
149+
topTemporalDensityValue = None
150+
} else {
151+
findTop()
152+
}
153+
}
154+
155+
object TemporalDensityIterator extends Logging {
156+
157+
val INTERVAL_KEY = "geomesa.temporal.density.bounds"
158+
val BUCKETS_KEY = "geomesa.temporal.density.buckets"
159+
val ENCODED_TIME_SERIES: String = "timeseries"
160+
val TEMPORAL_DENSITY_FEATURE_STRING = s"$ENCODED_TIME_SERIES:String,geom:Geometry"
161+
162+
val zeroPoint = new GeometryFactory().createPoint(new Coordinate(0,0))
163+
164+
type TimeSeries = collection.mutable.HashMap[DateTime, Long]
165+
166+
val geomFactory = JTSFactoryFinder.getGeometryFactory
167+
168+
def configure(cfg: IteratorSetting, interval : Interval, buckets: Int) = {
169+
setTimeBounds(cfg, interval)
170+
setBuckets(cfg, buckets)
171+
}
172+
173+
def setTimeBounds(iterSettings: IteratorSetting, interval: Interval) : Unit = {
174+
iterSettings.addOption(INTERVAL_KEY, s"${interval.getStart.getMillis},${interval.getEnd.getMillis}")
175+
}
176+
177+
def setBuckets(iterSettings: IteratorSetting, buckets: Int): Unit = {
178+
iterSettings.addOption(BUCKETS_KEY, s"$buckets")
179+
}
180+
181+
def getBuckets(options: ju.Map[String, String]): Int = {
182+
options.get(BUCKETS_KEY).toInt
183+
}
184+
185+
def getTimeBounds(options: ju.Map[String, String]): Interval = {
186+
val Array(s, e) = options.get(INTERVAL_KEY).split(",").map(_.toLong)
187+
new Interval(s, e)
188+
}
189+
190+
def combineTimeSeries(ts1: TimeSeries, ts2: TimeSeries) : TimeSeries = {
191+
val resultTS = new collection.mutable.HashMap[DateTime, Long]()
192+
for (key <- ts1.keySet ++ ts2.keySet) {
193+
resultTS.put(key, ts1.getOrElse(key, 0L) + ts2.getOrElse(key,0L))
194+
}
195+
resultTS
196+
}
197+
198+
def encodeTimeSeries(timeSeries: TimeSeries): String = {
199+
val baos = new ByteArrayOutputStream()
200+
val os = new DataOutputStream(baos)
201+
for((date,count) <- timeSeries) {
202+
os.writeLong(date.getMillis)
203+
os.writeLong(count)
204+
}
205+
os.flush()
206+
Base64.encodeBase64URLSafeString(baos.toByteArray)
207+
}
208+
209+
def decodeTimeSeries(encoded: String): TimeSeries = {
210+
val bytes = Base64.decodeBase64(encoded)
211+
val is = new DataInputStream(new ByteArrayInputStream(bytes))
212+
val table = new collection.mutable.HashMap[DateTime, Long]()
213+
while(is.available() > 0) {
214+
val dateIdx = new DateTime(is.readLong())
215+
val weight = is.readLong()
216+
table.put(dateIdx, weight)
217+
}
218+
table
219+
}
220+
}

0 commit comments

Comments
 (0)