Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RasterSourceRDD.tiledLayerRDD within the geometry intersection #3474

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added
- Add RasterSourceRDD.tiledLayerRDD within the geometry [#3474](https://github.com/locationtech/geotrellis/pull/3474)

## [3.6.3] - 2022-07-12

### Changed
Expand Down
29 changes: 26 additions & 3 deletions spark/src/main/scala/geotrellis/spark/RasterSourceRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import geotrellis.raster._
import geotrellis.raster.io.geotiff.OverviewStrategy
import geotrellis.raster.resample.NearestNeighbor
import geotrellis.layer._
import geotrellis.vector.Geometry
import geotrellis.util._
import org.apache.spark.rdd._
import org.apache.spark.{Partitioner, SparkContext}
import cats.syntax.option._

import scala.collection.mutable.ArrayBuilder
import scala.reflect.ClassTag
Expand Down Expand Up @@ -199,14 +201,29 @@ object RasterSourceRDD {
sources: RDD[RasterSource],
layout: LayoutDefinition
)(implicit sc: SparkContext): MultibandTileLayerRDD[SpatialKey] =
tiledLayerRDD(sources, layout, KeyExtractor.spatialKeyExtractor, NearestNeighbor, None, None)
tiledLayerRDD(sources, layout, KeyExtractor.spatialKeyExtractor, None, NearestNeighbor, None, None)

def tiledLayerRDD(
sources: RDD[RasterSource],
layout: LayoutDefinition,
geometry: Geometry
)(implicit sc: SparkContext): MultibandTileLayerRDD[SpatialKey] =
tiledLayerRDD(sources, layout, KeyExtractor.spatialKeyExtractor, geometry.some, NearestNeighbor, None, None)

def tiledLayerRDD(
sources: RDD[RasterSource],
layout: LayoutDefinition,
resampleMethod: ResampleMethod
)(implicit sc: SparkContext): MultibandTileLayerRDD[SpatialKey] =
tiledLayerRDD(sources, layout, KeyExtractor.spatialKeyExtractor, resampleMethod, None, None)
tiledLayerRDD(sources, layout, KeyExtractor.spatialKeyExtractor, None, resampleMethod, None, None)

def tiledLayerRDD(
sources: RDD[RasterSource],
layout: LayoutDefinition,
resampleMethod: ResampleMethod,
geometry: Geometry
)(implicit sc: SparkContext): MultibandTileLayerRDD[SpatialKey] =
tiledLayerRDD(sources, layout, KeyExtractor.spatialKeyExtractor, geometry.some, resampleMethod, None, None)

/**
* On tiling more than a single MultibandTile may get into a group that correspond to the same key.
Expand All @@ -223,6 +240,7 @@ object RasterSourceRDD {
sources: RDD[RasterSource],
layout: LayoutDefinition,
keyExtractor: KeyExtractor.Aux[K, M],
geometry: Option[Geometry] = None,
resampleMethod: ResampleMethod = NearestNeighbor,
rasterSummary: Option[RasterSummary[M]] = None,
partitioner: Option[Partitioner] = None,
Expand All @@ -239,7 +257,12 @@ object RasterSourceRDD {
}

val rasterRegionRDD: RDD[(K, RasterRegion)] =
tiledLayoutSourceRDD.flatMap { _.keyedRasterRegions() }
tiledLayoutSourceRDD.flatMap { source =>
val keyedRasterRegions = source.keyedRasterRegions()
geometry.fold(keyedRasterRegions) { geom => keyedRasterRegions.filter {
case (key, _) => layerMetadata.keyToExtent(key).intersects(geom)
} }
}

// The number of partitions estimated by RasterSummary can sometimes be much
// lower than what the user set. Therefore, we assume that the larger value
Expand Down
32 changes: 32 additions & 0 deletions spark/src/test/scala/geotrellis/spark/RasterSourceRDDSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import geotrellis.raster.geotiff._
import geotrellis.raster.io.geotiff._
import geotrellis.spark.store.hadoop._
import geotrellis.store.hadoop._
import geotrellis.store.cog._
import geotrellis.vector.Geometry

import spire.syntax.cfor._
import cats.implicits._
Expand Down Expand Up @@ -179,6 +181,36 @@ class RasterSourceRDDSpec extends AnyFunSpec with TestEnvironment with RasterMat

assertRDDLayersEqual(reprojectedExpectedRDD, tiledSource)
}

it("should reproduce tileToLayout when given an RDD[RasterSource] within the extent") {
val rasterSourceRDD: RDD[RasterSource] = sc.parallelize(Seq(rasterSource))

// Need to define these here or else a serialization error will occur
val targetLayout = layout
val crs = targetCRS
val mapTransform = targetLayout.mapTransform

// filter by a correct square to validate keys
val expectedKeys = List(SpatialKey(2303, 3223), SpatialKey(2303, 3224), SpatialKey(2304, 3223), SpatialKey(2304, 3224)).sorted
val geometry =
expectedKeys
.map(mapTransform.keyToExtent)
.reduce(_ combine _)
.bufferByLayout(targetLayout) // buffer to ensure the extent is within the given keys
.toPolygon()

val filteredExpectedRDD = reprojectedExpectedRDD.withContext { _.filter { case (key, _) => mapTransform.keyToExtent(key).intersects(geometry) } }

val reprojectedRasterSourceRDD: RDD[RasterSource] = rasterSourceRDD.map { _.reprojectToGrid(crs, targetLayout) }

val tiledSource: MultibandTileLayerRDD[SpatialKey] = RasterSourceRDD.tiledLayerRDD(reprojectedRasterSourceRDD, targetLayout, geometry)

val actualKeys = tiledSource.map(_._1).collect().toList.sorted

actualKeys shouldBe expectedKeys

assertRDDLayersEqual(filteredExpectedRDD, tiledSource)
}
}

describe("RasterSourceRDD.read") {
Expand Down