Skip to content

Commit a735066

Browse files
elahrvivazjnh5y
authored andcommitted
GEOMESA-498 adding filters to modify feature writer
1 parent 1e36392 commit a735066

File tree

4 files changed

+40
-5
lines changed

4 files changed

+40
-5
lines changed

geomesa-core/src/main/scala/org/locationtech/geomesa/core/data/AccumuloDataStore.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -731,14 +731,14 @@ class AccumuloDataStore(val connector: Connector,
731731
}
732732

733733
/* create a general purpose writer that is capable of insert, deletes, and updates */
734-
override def createFeatureWriter(typeName: String, transaction: Transaction): SFFeatureWriter = {
734+
override def getFeatureWriter(typeName: String, filter: Filter, transaction: Transaction): SFFeatureWriter = {
735735
validateMetadata(typeName)
736736
checkWritePermissions(typeName)
737737
val sft = getSchema(typeName)
738738
val indexSchemaFmt = getIndexSchemaFmt(typeName)
739739
val fe = SimpleFeatureEncoder(sft, getFeatureEncoding(sft))
740740
val encoder = IndexSchema.buildKeyEncoder(indexSchemaFmt, fe)
741-
new ModifyAccumuloFeatureWriter(sft, encoder, connector, fe, writeVisibilities, this)
741+
new ModifyAccumuloFeatureWriter(sft, encoder, connector, fe, writeVisibilities, filter, this)
742742
}
743743

744744
/* optimized for GeoTools API to return writer ONLY for appending (aka don't scan table) */

geomesa-core/src/main/scala/org/locationtech/geomesa/core/data/AccumuloFeatureWriter.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,16 @@ import org.apache.accumulo.core.client.{BatchWriterConfig, Connector}
2323
import org.apache.accumulo.core.data.{Key, Mutation, Value}
2424
import org.apache.hadoop.mapred.{RecordWriter, Reporter}
2525
import org.apache.hadoop.mapreduce.TaskInputOutputContext
26-
import org.geotools.data.DataUtilities
2726
import org.geotools.data.simple.SimpleFeatureWriter
27+
import org.geotools.data.{DataUtilities, Query}
2828
import org.geotools.factory.Hints
2929
import org.geotools.filter.identity.FeatureIdImpl
3030
import org.locationtech.geomesa.core.data.tables.{AttributeTable, RecordTable, SpatioTemporalTable}
3131
import org.locationtech.geomesa.core.index._
3232
import org.locationtech.geomesa.feature.{AvroSimpleFeature, AvroSimpleFeatureFactory}
3333
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes
3434
import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}
35+
import org.opengis.filter.Filter
3536

3637
object AccumuloFeatureWriter {
3738

@@ -159,10 +160,12 @@ class ModifyAccumuloFeatureWriter(featureType: SimpleFeatureType,
159160
connector: Connector,
160161
encoder: SimpleFeatureEncoder,
161162
visibility: String,
163+
filter: Filter,
162164
dataStore: AccumuloDataStore)
163165
extends AccumuloFeatureWriter(featureType, indexEncoder, encoder, dataStore, visibility) {
164166

165-
val reader = dataStore.getFeatureReader(featureType.getName.toString)
167+
val reader = dataStore.getFeatureReader(featureType.getTypeName, new Query(featureType.getTypeName, filter))
168+
166169
var live: SimpleFeature = null /* feature to let user modify */
167170
var original: SimpleFeature = null /* feature returned from reader */
168171

geomesa-core/src/main/scala/org/locationtech/geomesa/core/data/mapreduce/FeatureIngestMapper.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ object FeatureIngestMapper
4848
context.getConfiguration)).asInstanceOf[AccumuloDataStore]
4949

5050
featureType = ds.getSchema(featureName)
51-
fw = ds.createFeatureWriter(featureName, Transaction.AUTO_COMMIT)
51+
fw = ds.getFeatureWriterAppend(featureName, Transaction.AUTO_COMMIT)
5252
}
5353

5454
override def map(key: LongWritable, value: Text, context: Mapper#Context) {

geomesa-core/src/test/scala/org/locationtech/geomesa/core/data/AccumuloDataStoreTest.scala

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1010,6 +1010,38 @@ class AccumuloDataStoreTest extends Specification {
10101010
retrievedSchema mustEqual originalSchema
10111011
}
10121012
}
1013+
1014+
"Provide a feature update implementation" >> {
1015+
val sftName = "featureUpdateTest"
1016+
val sft = SimpleFeatureTypes.createType(sftName, "name:String,dtg:Date,*geom:Point:srid=4326")
1017+
val ds = DataStoreFinder.getDataStore(Map(
1018+
"instanceId" -> "mycloud",
1019+
"zookeepers" -> "zoo1:2181,zoo2:2181,zoo3:2181",
1020+
"user" -> "myuser",
1021+
"password" -> "mypassword",
1022+
"tableName" -> sftName,
1023+
"useMock" -> "true"))
1024+
ds.createSchema(sft)
1025+
val builder = AvroSimpleFeatureFactory.featureBuilder(ds.getSchema(sftName))
1026+
val features = (0 until 6).map { i =>
1027+
builder.reset()
1028+
builder.set("geom", WKTUtils.read("POINT(45.0 45.0)"))
1029+
builder.set("dtg", "2012-01-02T05:06:07.000Z")
1030+
builder.set("name",i.toString)
1031+
val sf = builder.buildFeature(i.toString)
1032+
sf.getUserData()(Hints.USE_PROVIDED_FID) = java.lang.Boolean.TRUE
1033+
sf
1034+
}
1035+
val fs = ds.getFeatureSource(sftName).asInstanceOf[AccumuloFeatureStore]
1036+
fs.addFeatures(new ListFeatureCollection(sft, features))
1037+
1038+
val filter = ff.id(ff.featureId("2"))
1039+
val writer = ds.getFeatureWriter(sftName, filter, Transaction.AUTO_COMMIT)
1040+
writer must beAnInstanceOf[ModifyAccumuloFeatureWriter]
1041+
writer.hasNext must beTrue
1042+
writer.next.getID mustEqual "2"
1043+
writer.hasNext must beFalse
1044+
}
10131045
}
10141046

10151047
"AccumuloFeatureStore" should {

0 commit comments

Comments
 (0)