From eaf82f5ae58b43195331fe5133a8b93e011a47f0 Mon Sep 17 00:00:00 2001 From: Robin Kooyman Date: Thu, 18 Jun 2026 13:31:24 +0200 Subject: [PATCH] Add GeoParquet read support to beacon-arrow-geoparquet Refactor the previously write-only beacon-arrow-geoparquet crate into a full read+write GeoParquet format. Read path: - infer_schema reads each file's GeoArrow schema concurrently and super-types them; geometry columns described in the file's `geo` metadata are decoded to their native GeoArrow representation (CoordType::Separated). Files without a `geo` key fall back to the plain Arrow schema. - New GeoParquetSource (FileSource) + GeoParquetOpener (FileOpener) stream files via the async Parquet reader and the geoparquet crate's GeoParquetRecordBatchStream, applying column projection through a BatchAdapterFactory. - FileFormatFactoryExt::discover_datasets registers `.geoparquet` files, and the factory is registered in beacon-data-lake so external tables (STORED AS GEOPARQUET) and auto-discovery work. SQL: - New read_geoparquet() table function mirroring read_parquet. Tests: - 5 unit tests: native-geometry schema inference, full round-trip read, column projection, dataset discovery by extension, and plain-Parquet fallback. Docs: - GeoParquet sections added to Supported Formats, External Tables, and the read_geoparquet table-function reference. Note: spatial bbox row-group pruning is not yet applied; reads are a full scan with column projection. --- Cargo.lock | 9 + beacon-data-lake/Cargo.toml | 3 +- beacon-data-lake/src/file_formats.rs | 2 + .../beacon-arrow-geoparquet/Cargo.toml | 9 +- .../src/datafusion/mod.rs | 366 +++++++++++++++++- .../src/datafusion/opener.rs | 94 +++++ .../src/datafusion/reader.rs | 65 ++++ .../src/datafusion/source.rs | 121 ++++++ .../beacon-arrow-geoparquet/src/lib.rs | 13 +- beacon-functions/Cargo.toml | 1 + beacon-functions/src/file_formats/mod.rs | 6 + .../src/file_formats/read_geoparquet.rs | 134 +++++++ docs/docs/1.7.2/data-lake/datasets.md | 32 ++ docs/docs/1.7.2/data-lake/external-tables.md | 10 + docs/docs/1.7.2/sql/table-functions.md | 12 + 15 files changed, 853 insertions(+), 24 deletions(-) create mode 100644 beacon-file-formats/beacon-arrow-geoparquet/src/datafusion/opener.rs create mode 100644 beacon-file-formats/beacon-arrow-geoparquet/src/datafusion/reader.rs create mode 100644 beacon-file-formats/beacon-arrow-geoparquet/src/datafusion/source.rs create mode 100644 beacon-functions/src/file_formats/read_geoparquet.rs diff --git a/Cargo.lock b/Cargo.lock index 2c82e08e..89b2be2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1267,15 +1267,21 @@ version = "1.7.2" dependencies = [ "arrow 58.3.0", "async-trait", + "beacon-common", + "beacon-datafusion-ext", + "bytes", "datafusion", "futures", "geoarrow", "geoarrow-array", + "geoarrow-schema", "geoparquet", "object_store 0.13.2", "parquet", "serde", "tempfile", + "tokio", + "tracing", ] [[package]] @@ -1556,6 +1562,7 @@ dependencies = [ "beacon-arrow-atlas", "beacon-arrow-bbf", "beacon-arrow-csv", + "beacon-arrow-geoparquet", "beacon-arrow-ipc", "beacon-arrow-netcdf", "beacon-arrow-parquet", @@ -1612,6 +1619,7 @@ dependencies = [ "beacon-arrow-atlas", "beacon-arrow-bbf", "beacon-arrow-csv", + "beacon-arrow-geoparquet", "beacon-arrow-ipc", "beacon-arrow-netcdf", "beacon-arrow-odv", @@ -3844,6 +3852,7 @@ dependencies = [ "arrow-buffer 58.3.0", "arrow-ord 58.3.0", "arrow-schema 58.3.0", + "futures", "geo-traits 0.3.0", "geo-types", "geoarrow-array", diff --git a/beacon-data-lake/Cargo.toml b/beacon-data-lake/Cargo.toml index 12cb0b58..8c1e9f61 100644 --- a/beacon-data-lake/Cargo.toml +++ b/beacon-data-lake/Cargo.toml @@ -35,4 +35,5 @@ beacon-arrow-bbf = { path = "../beacon-file-formats/beacon-arrow-bbf" } beacon-arrow-netcdf = { path = "../beacon-file-formats/beacon-arrow-netcdf" } beacon-arrow-atlas = { path = "../beacon-file-formats/beacon-arrow-atlas" } beacon-arrow-tiff = { path = "../beacon-file-formats/beacon-arrow-tiff" } -beacon-arrow-zarr = { path = "../beacon-file-formats/beacon-arrow-zarr" } \ No newline at end of file +beacon-arrow-zarr = { path = "../beacon-file-formats/beacon-arrow-zarr" } +beacon-arrow-geoparquet = { path = "../beacon-file-formats/beacon-arrow-geoparquet" } \ No newline at end of file diff --git a/beacon-data-lake/src/file_formats.rs b/beacon-data-lake/src/file_formats.rs index e346bb7a..3b288ad3 100644 --- a/beacon-data-lake/src/file_formats.rs +++ b/beacon-data-lake/src/file_formats.rs @@ -8,6 +8,7 @@ use std::sync::Arc; use beacon_arrow_atlas::datafusion::{AtlasFormatFactory, options::AtlasOptions}; use beacon_arrow_bbf::datafusion::BBFFormatFactory; use beacon_arrow_csv::datafusion::CsvFormatFactory; +use beacon_arrow_geoparquet::datafusion::GeoParquetFormatFactory; use beacon_arrow_ipc::datafusion::ArrowFormatFactory; use beacon_arrow_netcdf::datafusion::{NetCDFFormatFactory, options::NetcdfOptions}; use beacon_arrow_parquet::datafusion::ParquetFormatFactory; @@ -40,6 +41,7 @@ pub fn file_formats( Arc::new(TiffFormatFactory::new(Default::default())), Arc::new(ZarrFormatFactory), Arc::new(BBFFormatFactory), + Arc::new(GeoParquetFormatFactory::default()), ]; for format in formats.iter() { diff --git a/beacon-file-formats/beacon-arrow-geoparquet/Cargo.toml b/beacon-file-formats/beacon-arrow-geoparquet/Cargo.toml index 02a6b2d9..c77d9705 100644 --- a/beacon-file-formats/beacon-arrow-geoparquet/Cargo.toml +++ b/beacon-file-formats/beacon-arrow-geoparquet/Cargo.toml @@ -10,10 +10,17 @@ object_store = { workspace = true } parquet = { workspace = true } geoarrow = { workspace = true } geoarrow-array = { workspace = true } -geoparquet = { workspace = true } +geoarrow-schema = { workspace = true } +geoparquet = { workspace = true, features = ["async"] } async-trait = { workspace = true } serde = { workspace = true } futures = { workspace = true } +tracing = { workspace = true } + +beacon-common = { path = "../../beacon-common" } +beacon-datafusion-ext = { path = "../../beacon-datafusion-ext" } [dev-dependencies] tempfile = { workspace = true } +tokio = { workspace = true } +bytes = { workspace = true } diff --git a/beacon-file-formats/beacon-arrow-geoparquet/src/datafusion/mod.rs b/beacon-file-formats/beacon-arrow-geoparquet/src/datafusion/mod.rs index 34f76907..ae48c2ac 100644 --- a/beacon-file-formats/beacon-arrow-geoparquet/src/datafusion/mod.rs +++ b/beacon-file-formats/beacon-arrow-geoparquet/src/datafusion/mod.rs @@ -1,21 +1,30 @@ use std::{any::Any, fmt::Debug, sync::Arc}; use arrow::datatypes::SchemaRef; +use beacon_common::{file_descriptors::file_open_parallelism, super_typing::super_type_schema}; +use beacon_datafusion_ext::format_ext::{DatasetMetadata, FileFormatFactoryExt}; use datafusion::{ - catalog::Session, - common::{GetExt, Statistics}, + catalog::{Session, memory::DataSourceExec}, + common::{GetExt, Statistics, exec_datafusion_err}, datasource::{ file_format::{FileFormat, FileFormatFactory, file_compression_type::FileCompressionType}, - physical_plan::{FileScanConfig, FileSinkConfig, FileSource}, + physical_plan::{FileScanConfig, FileScanConfigBuilder, FileSinkConfig, FileSource}, sink::{DataSink, DataSinkExec}, + table_schema::TableSchema, }, physical_expr::LexRequirement, physical_plan::ExecutionPlan, }; +use futures::{StreamExt, TryStreamExt, stream}; use object_store::{ObjectMeta, ObjectStore}; +use crate::datafusion::source::GeoParquetSource; + +pub mod opener; +mod reader; pub mod sink; +pub mod source; const GEOPARQUET_EXTENSION: &str = "geoparquet"; @@ -71,6 +80,29 @@ impl GetExt for GeoParquetFormatFactory { } } +impl FileFormatFactoryExt for GeoParquetFormatFactory { + fn discover_datasets( + &self, + objects: &[ObjectMeta], + ) -> datafusion::error::Result> { + let datasets = objects + .iter() + .filter(|obj| { + obj.location + .extension() + .map(|ext| ext == GEOPARQUET_EXTENSION) + .unwrap_or(false) + }) + .map(|obj| DatasetMetadata::new(obj.location.to_string(), self.get_ext())) + .collect(); + Ok(datasets) + } + + fn file_format_name(&self) -> String { + self.get_ext() + } +} + #[derive(Debug, Clone)] pub struct GeoParquetFormat { pub options: GeoParquetOptions, @@ -106,34 +138,60 @@ impl FileFormat for GeoParquetFormat { async fn infer_schema( &self, _state: &dyn Session, - _store: &Arc, - _objects: &[ObjectMeta], + store: &Arc, + objects: &[ObjectMeta], ) -> datafusion::error::Result { - return Err(datafusion::error::DataFusionError::NotImplemented( - "GeoParquet format does not support schema inference yet".to_string(), - )); + // Read each file's GeoArrow schema concurrently, then merge into a + // single super-typed schema (mirrors the plain Parquet format). + let schemas = stream::iter(objects.iter().cloned()) + .map(|object| { + let store = Arc::clone(store); + async move { reader::fetch_schema(store, object).await } + }) + .buffer_unordered(file_open_parallelism()) + .try_collect::>() + .await?; + + if schemas.is_empty() { + return Ok(Arc::new(arrow::datatypes::Schema::empty())); + } + + let super_schema = super_type_schema(&schemas).map_err(|e| { + exec_datafusion_err!("Failed to compute super type schema for GeoParquet: {}", e) + })?; + + Ok(Arc::new(super_schema)) } async fn infer_stats( &self, _state: &dyn Session, _store: &Arc, - _table_schema: SchemaRef, + table_schema: SchemaRef, _object: &ObjectMeta, ) -> datafusion::error::Result { - return Err(datafusion::error::DataFusionError::NotImplemented( - "GeoParquet format does not support statistics inference yet".to_string(), - )); + Ok(Statistics::new_unknown(&table_schema)) } async fn create_physical_plan( &self, _state: &dyn Session, - _conf: FileScanConfig, + conf: FileScanConfig, ) -> datafusion::error::Result> { - return Err(datafusion::error::DataFusionError::NotImplemented( - "GeoParquet format does not support physical plan creation yet".to_string(), - )); + let table_schema = TableSchema::new( + conf.file_schema().clone(), + conf.table_partition_cols().clone(), + ); + // Preserve a projection that the scan pushed down into the incoming + // source — rebuilding the source below would otherwise drop it. + let projection = conf.file_source().projection().cloned(); + let source = GeoParquetSource::new(table_schema).with_projection(projection); + + let conf = FileScanConfigBuilder::from(conf) + .with_source(Arc::new(source)) + .build(); + + Ok(DataSourceExec::from_data_source(conf)) } async fn create_writer_physical_plan( @@ -203,8 +261,11 @@ impl FileFormat for GeoParquetFormat { Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements))) } - fn file_source(&self, table_schema: datafusion::datasource::table_schema::TableSchema) -> Arc { - panic!("GeoParquetFormat does not support file source"); + fn file_source( + &self, + table_schema: datafusion::datasource::table_schema::TableSchema, + ) -> Arc { + Arc::new(GeoParquetSource::new(table_schema)) } } @@ -221,3 +282,272 @@ fn is_lon_column(name: &str) -> bool { .iter() .any(|&pat| lower.contains(pat) || lower == "x") } + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{ArrayRef, AsArray, Int32Array}; + use arrow::datatypes::{DataType, Field, Float64Type, Int32Type, Schema}; + use arrow::record_batch::RecordBatch; + use datafusion::datasource::listing::PartitionedFile; + use datafusion::datasource::physical_plan::{FileOpener, FileScanConfigBuilder}; + use datafusion::execution::object_store::ObjectStoreUrl; + use futures::StreamExt; + use geoarrow::array::PointBuilder; + use geoarrow::datatypes::{Dimension, Metadata, PointType}; + use geoarrow_array::GeoArrowArray; + use geoparquet::writer::{ + GeoParquetRecordBatchEncoder, GeoParquetWriterEncoding, GeoParquetWriterOptionsBuilder, + }; + use object_store::ObjectStore; + use object_store::ObjectStoreExt; + use object_store::memory::InMemory; + use object_store::path::Path; + use parquet::arrow::ArrowWriter; + + /// Build an in-memory GeoParquet file with a native GeoArrow point column + /// (`geometry`) and an `id` column, returning the encoded bytes. + fn write_geoparquet_fixture() -> Vec { + let point_type = PointType::new(Dimension::XY, Arc::new(Metadata::default())); + let geometry_field = Arc::new(point_type.to_field("geometry", true)); + let id_field = Arc::new(Field::new("id", DataType::Int32, false)); + let schema = Arc::new(Schema::new(vec![id_field, geometry_field])); + + let mut point_builder = + PointBuilder::new(PointType::new(Dimension::XY, Arc::new(Metadata::default()))); + for coord in [(1.0_f64, 2.0_f64), (3.0, 4.0), (5.0, 6.0)] { + point_builder.push_coord(Some(&coord)); + } + let geometry: ArrayRef = point_builder.finish().to_array_ref(); + let ids: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); + + let batch = RecordBatch::try_new(schema.clone(), vec![ids, geometry]).unwrap(); + + let options = GeoParquetWriterOptionsBuilder::default() + .set_encoding(GeoParquetWriterEncoding::GeoArrow) + .build(); + let mut encoder = GeoParquetRecordBatchEncoder::try_new(&schema, &options).unwrap(); + + let mut buf: Vec = Vec::new(); + { + let mut writer = ArrowWriter::try_new(&mut buf, encoder.target_schema(), None).unwrap(); + let encoded = encoder.encode_record_batch(&batch).unwrap(); + writer.write(&encoded).unwrap(); + let kv = encoder.into_keyvalue().unwrap(); + writer.append_key_value_metadata(kv); + writer.finish().unwrap(); + } + buf + } + + async fn put_fixture(store: &Arc, path: &Path) -> ObjectMeta { + let bytes = write_geoparquet_fixture(); + store + .put(path, bytes::Bytes::from(bytes).into()) + .await + .expect("write fixture"); + store.head(path).await.expect("head fixture") + } + + #[tokio::test] + async fn infer_schema_decodes_geometry_to_native_geoarrow() { + let store = Arc::new(InMemory::new()); + let object_store: Arc = store.clone(); + let path = Path::from("test.geoparquet"); + let object = put_fixture(&store, &path).await; + + let format = GeoParquetFormat::new(GeoParquetOptions { + longitude_column: None, + latitude_column: None, + }); + let ctx = datafusion::prelude::SessionContext::new(); + + let schema = format + .infer_schema(&ctx.state(), &object_store, &[object]) + .await + .expect("infer schema"); + + let geometry = schema.field_with_name("geometry").expect("geometry field"); + // Geometry is decoded to its native GeoArrow storage (a struct of x/y + // child arrays for the Separated coord layout), not left as opaque WKB + // binary. Note the GeoArrow extension *metadata* is dropped here because + // Beacon's `super_type_schema` rebuilds fields without metadata — the + // native struct layout is what survives at the table-schema level. + let DataType::Struct(children) = geometry.data_type() else { + panic!( + "geometry should decode to a native GeoArrow struct, got {:?}", + geometry.data_type() + ); + }; + let child_names: Vec<&str> = children.iter().map(|f| f.name().as_str()).collect(); + assert!( + child_names.contains(&"x") && child_names.contains(&"y"), + "geometry struct should have x/y children, got {child_names:?}" + ); + assert!(schema.field_with_name("id").is_ok()); + } + + #[tokio::test] + async fn opener_reads_back_rows_and_geometry() { + let store = Arc::new(InMemory::new()); + let object_store: Arc = store.clone(); + let path = Path::from("test.geoparquet"); + let object = put_fixture(&store, &path).await; + + let table_schema = reader::fetch_schema(object_store.clone(), object.clone()) + .await + .expect("schema"); + let ts = TableSchema::from_file_schema(table_schema); + let source = GeoParquetSource::new(ts); + + let conf = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("memory://").unwrap(), + Arc::new(source.clone()) as Arc, + ) + .build(); + let opener = source + .create_file_opener(object_store, &conf, 0) + .expect("opener"); + + let stream = opener + .open(PartitionedFile::from(object)) + .expect("open") + .await + .expect("stream"); + let batches: Vec<_> = stream + .collect::>() + .await + .into_iter() + .collect::, _>>() + .expect("batches ok"); + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 3); + + let full = arrow::compute::concat_batches(&batches[0].schema(), &batches).expect("concat"); + let id_idx = full.schema().index_of("id").expect("id column"); + let ids = full.column(id_idx).as_primitive::(); + assert_eq!(ids.values(), &[1, 2, 3]); + + // Geometry column round-trips as a native GeoArrow struct (x, y children). + let geom_idx = full.schema().index_of("geometry").expect("geometry column"); + let geom = full.column(geom_idx).as_struct(); + let x = geom.column_by_name("x").expect("x child").as_primitive::(); + let y = geom.column_by_name("y").expect("y child").as_primitive::(); + assert_eq!(x.values(), &[1.0, 3.0, 5.0]); + assert_eq!(y.values(), &[2.0, 4.0, 6.0]); + } + + #[tokio::test] + async fn opener_applies_column_projection() { + let store = Arc::new(InMemory::new()); + let object_store: Arc = store.clone(); + let path = Path::from("test.geoparquet"); + let object = put_fixture(&store, &path).await; + + let file_schema = reader::fetch_schema(object_store.clone(), object.clone()) + .await + .expect("schema"); + // Project to the `id` column only — geometry should be dropped. + let id_idx = file_schema.index_of("id").expect("id column"); + let projected: SchemaRef = Arc::new(file_schema.project(&[id_idx]).expect("project")); + + let opener = opener::GeoParquetOpener::new(object_store, projected, 128 * 1024); + let stream = opener + .open(PartitionedFile::from(object)) + .expect("open") + .await + .expect("stream"); + let batches: Vec<_> = stream + .collect::>() + .await + .into_iter() + .collect::, _>>() + .expect("batches ok"); + + let full = arrow::compute::concat_batches(&batches[0].schema(), &batches).expect("concat"); + let out_schema = full.schema(); + let names: Vec<&str> = out_schema.fields().iter().map(|f| f.name().as_str()).collect(); + assert_eq!(names, vec!["id"], "only the projected column should remain"); + assert_eq!(full.num_rows(), 3); + assert_eq!( + full.column(0).as_primitive::().values(), + &[1, 2, 3] + ); + } + + #[tokio::test] + async fn discover_datasets_filters_by_extension() { + let factory = ::default(); + let objects = vec![ + object_meta("a.geoparquet"), + object_meta("nested/b.geoparquet"), + object_meta("c.parquet"), + object_meta("d.csv"), + object_meta("no_extension"), + ]; + + let discovered = factory.discover_datasets(&objects).expect("discover"); + let paths: Vec<&str> = discovered.iter().map(|d| d.file_path.as_str()).collect(); + + assert_eq!(paths, vec!["a.geoparquet", "nested/b.geoparquet"]); + assert!(discovered.iter().all(|d| d.format == "geoparquet")); + } + + #[tokio::test] + async fn reads_plain_parquet_without_geo_metadata() { + // A regular Parquet file (no `geo` key) should still be readable: the + // schema falls back to plain Arrow and no geometry decoding occurs. + let store = Arc::new(InMemory::new()); + let object_store: Arc = store.clone(); + let path = Path::from("plain.parquet"); + + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(vec![7, 8]))]) + .unwrap(); + let mut buf: Vec = Vec::new(); + { + let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + } + store + .put(&path, bytes::Bytes::from(buf).into()) + .await + .unwrap(); + let object = store.head(&path).await.unwrap(); + + let inferred = reader::fetch_schema(object_store.clone(), object.clone()) + .await + .expect("plain parquet should infer a schema"); + assert!(inferred.field_with_name("id").is_ok()); + assert!(inferred.field_with_name("geometry").is_err()); + + let opener = + opener::GeoParquetOpener::new(object_store, inferred.clone(), 128 * 1024); + let stream = opener + .open(PartitionedFile::from(object)) + .expect("open") + .await + .expect("stream"); + let batches: Vec<_> = stream + .collect::>() + .await + .into_iter() + .collect::, _>>() + .expect("batches ok"); + let total: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total, 2); + } + + fn object_meta(path: &str) -> ObjectMeta { + ObjectMeta { + location: Path::from(path), + last_modified: Default::default(), + size: 0, + e_tag: None, + version: None, + } + } +} diff --git a/beacon-file-formats/beacon-arrow-geoparquet/src/datafusion/opener.rs b/beacon-file-formats/beacon-arrow-geoparquet/src/datafusion/opener.rs new file mode 100644 index 00000000..34de82de --- /dev/null +++ b/beacon-file-formats/beacon-arrow-geoparquet/src/datafusion/opener.rs @@ -0,0 +1,94 @@ +//! [`FileOpener`] that streams a single GeoParquet file as Arrow +//! [`RecordBatch`]es with geometry columns decoded to native GeoArrow. + +use std::sync::Arc; + +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion::{ + datasource::{ + listing::PartitionedFile, + physical_plan::{FileOpenFuture, FileOpener}, + }, + error::{DataFusionError, Result}, + physical_expr_adapter::BatchAdapterFactory, +}; +use futures::{FutureExt, StreamExt, stream::BoxStream}; +use geoparquet::reader::GeoParquetRecordBatchStream; +use object_store::{ObjectMeta, ObjectStore}; + +use crate::datafusion::reader; + +/// Opens GeoParquet files and yields batches matching the projected schema. +pub struct GeoParquetOpener { + object_store: Arc, + /// Schema of the columns the scan requested, in output order. + projected_schema: SchemaRef, + batch_size: usize, +} + +impl GeoParquetOpener { + pub fn new( + object_store: Arc, + projected_schema: SchemaRef, + batch_size: usize, + ) -> Self { + Self { + object_store, + projected_schema, + batch_size, + } + } + + async fn read_task( + object: ObjectMeta, + object_store: Arc, + projected_schema: SchemaRef, + batch_size: usize, + ) -> Result>> { + let builder = reader::stream_builder(object_store, &object).await?; + // Full GeoArrow schema of this file (geometry decoded to native types). + let file_schema = reader::output_schema(&builder)?; + + let parquet_stream = builder + .with_batch_size(batch_size) + .build() + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + // Wraps the raw Parquet stream to apply GeoArrow metadata and parse + // geometry columns onto every emitted batch. + let geo_stream = GeoParquetRecordBatchStream::try_new(parquet_stream, file_schema.clone()) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + // Adapt the file's GeoArrow schema onto the projected output schema: + // select/reorder the requested columns and null-fill any this file + // lacks. This is how column projection is applied — we read the full + // file schema and project in Arrow. + let adapter = BatchAdapterFactory::new(projected_schema).make_adapter(&file_schema)?; + + let stream = geo_stream + .map(move |batch| { + let batch = batch.map_err(|e| DataFusionError::External(Box::new(e)))?; + adapter.adapt_batch(&batch).map_err(|e| { + DataFusionError::Execution(format!("Failed to adapt GeoParquet batch: {e}")) + }) + }) + .boxed(); + + Ok(stream) + } +} + +impl FileOpener for GeoParquetOpener { + fn open(&self, file: PartitionedFile) -> Result { + let fut = Self::read_task( + file.object_meta, + self.object_store.clone(), + self.projected_schema.clone(), + self.batch_size, + ) + .boxed(); + + Ok(fut) + } +} diff --git a/beacon-file-formats/beacon-arrow-geoparquet/src/datafusion/reader.rs b/beacon-file-formats/beacon-arrow-geoparquet/src/datafusion/reader.rs new file mode 100644 index 00000000..ee62ba41 --- /dev/null +++ b/beacon-file-formats/beacon-arrow-geoparquet/src/datafusion/reader.rs @@ -0,0 +1,65 @@ +//! Low-level helpers for opening GeoParquet files and deriving their GeoArrow +//! output schema. +//! +//! These wrap the upstream async Parquet reader together with the `geoparquet` +//! crate's [`GeoParquetReaderBuilder`] extension so geometry columns described +//! in the file's `geo` metadata are decoded to their native GeoArrow type. + +use std::sync::Arc; + +use arrow::datatypes::SchemaRef; +use datafusion::error::{DataFusionError, Result}; +use geoarrow_schema::CoordType; +use geoparquet::reader::GeoParquetReaderBuilder; +use object_store::{ObjectMeta, ObjectStore}; +use parquet::arrow::ParquetRecordBatchStreamBuilder; +use parquet::arrow::async_reader::ParquetObjectReader; + +/// Coordinate layout used when decoding geometry columns to native GeoArrow. +/// +/// `Separated` stores each coordinate dimension in its own child array (struct +/// of `x`/`y` arrays), which round-trips Beacon's own GeoParquet writer output. +pub(crate) const COORD_TYPE: CoordType = CoordType::Separated; + +/// The concrete async Parquet reader builder used throughout this crate. +pub(crate) type GeoStreamBuilder = ParquetRecordBatchStreamBuilder; + +/// Open a streaming Parquet reader builder for a GeoParquet object. +pub(crate) async fn stream_builder( + object_store: Arc, + object: &ObjectMeta, +) -> Result { + let reader = ParquetObjectReader::new(object_store, object.location.clone()) + .with_file_size(object.size); + + ParquetRecordBatchStreamBuilder::new(reader) + .await + .map_err(|e| DataFusionError::External(Box::new(e))) +} + +/// Derive the output Arrow schema for a builder. +/// +/// Geometry columns described in the file's GeoParquet metadata are decoded to +/// their native GeoArrow type. Files without a `geo` metadata key fall back to +/// the plain Arrow schema, so a regular Parquet file is still readable. +pub(crate) fn output_schema(builder: &GeoStreamBuilder) -> Result { + match builder.geoparquet_metadata() { + Some(geo_meta) => { + let geo_meta = geo_meta.map_err(|e| DataFusionError::External(Box::new(e)))?; + builder + .geoarrow_schema(&geo_meta, true, COORD_TYPE) + .map_err(|e| DataFusionError::External(Box::new(e))) + } + None => Ok(builder.schema().clone()), + } +} + +/// Fetch the GeoArrow output schema for a single object, used for schema +/// inference. Only the file metadata is read; no row groups are decoded. +pub(crate) async fn fetch_schema( + object_store: Arc, + object: ObjectMeta, +) -> Result { + let builder = stream_builder(object_store, &object).await?; + output_schema(&builder) +} diff --git a/beacon-file-formats/beacon-arrow-geoparquet/src/datafusion/source.rs b/beacon-file-formats/beacon-arrow-geoparquet/src/datafusion/source.rs new file mode 100644 index 00000000..795a5865 --- /dev/null +++ b/beacon-file-formats/beacon-arrow-geoparquet/src/datafusion/source.rs @@ -0,0 +1,121 @@ +//! [`FileSource`] implementation for reading GeoParquet datasets. + +use std::sync::Arc; + +use datafusion::{ + datasource::{ + physical_plan::{FileOpener, FileScanConfig, FileSource}, + schema_adapter::SchemaAdapterFactory, + table_schema::TableSchema, + }, + physical_expr::projection::ProjectionExprs, + physical_plan::metrics::ExecutionPlanMetricsSet, +}; +use object_store::ObjectStore; + +use crate::datafusion::opener::GeoParquetOpener; + +/// A [`FileSource`] that produces [`GeoParquetOpener`]s for the scanned files. +#[derive(Debug, Clone)] +pub struct GeoParquetSource { + schema_adapter_factory: Option>, + /// The table schema (file schema + partition columns). + table_schema: TableSchema, + execution_plan_metrics: ExecutionPlanMetricsSet, + batch_size: usize, + /// Projection pushed down by the scan, applied on top of the table schema. + projection: Option, +} + +impl GeoParquetSource { + pub fn new(table_schema: TableSchema) -> Self { + Self { + schema_adapter_factory: None, + table_schema, + execution_plan_metrics: ExecutionPlanMetricsSet::new(), + batch_size: 128 * 1024, + projection: None, + } + } + + /// Returns a copy of this source carrying the given projection. Used to + /// preserve a pushed-down projection when the format rebuilds the source + /// in `create_physical_plan`. + pub fn with_projection(mut self, projection: Option) -> Self { + self.projection = projection; + self + } +} + +impl FileSource for GeoParquetSource { + fn create_file_opener( + &self, + object_store: Arc, + base_config: &FileScanConfig, + _partition: usize, + ) -> datafusion::error::Result> { + let projected_schema = base_config.projected_schema()?; + + Ok(Arc::new(GeoParquetOpener::new( + object_store, + projected_schema, + self.batch_size, + ))) + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn table_schema(&self) -> &TableSchema { + &self.table_schema + } + + fn with_batch_size(&self, batch_size: usize) -> Arc { + Arc::new(Self { + batch_size, + ..self.clone() + }) + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + &self.execution_plan_metrics + } + + fn file_type(&self) -> &str { + "geoparquet" + } + + fn schema_adapter_factory(&self) -> Option> { + self.schema_adapter_factory.clone() + } + + fn with_schema_adapter_factory( + &self, + factory: Arc, + ) -> datafusion::error::Result> { + Ok(Arc::new(Self { + schema_adapter_factory: Some(factory), + ..self.clone() + })) + } + + fn projection(&self) -> Option<&ProjectionExprs> { + self.projection.as_ref() + } + + fn try_pushdown_projection( + &self, + projection: &ProjectionExprs, + ) -> datafusion::error::Result>> { + let merged = match &self.projection { + Some(existing) => existing.try_merge(projection)?, + None => projection.clone(), + }; + let source = Self { + projection: Some(merged), + ..self.clone() + }; + Ok(Some(Arc::new(source))) + } +} diff --git a/beacon-file-formats/beacon-arrow-geoparquet/src/lib.rs b/beacon-file-formats/beacon-arrow-geoparquet/src/lib.rs index ee968736..6e58aab1 100644 --- a/beacon-file-formats/beacon-arrow-geoparquet/src/lib.rs +++ b/beacon-file-formats/beacon-arrow-geoparquet/src/lib.rs @@ -1,8 +1,13 @@ -//! `beacon-arrow-geoparquet` provides the DataFusion GeoParquet write integration. +//! `beacon-arrow-geoparquet` provides the DataFusion GeoParquet integration. //! //! It exposes a [`datafusion::GeoParquetFormat`] / [`datafusion::GeoParquetFormatFactory`] -//! that map longitude/latitude columns into a geometry column and write GeoParquet output. -//! The read path is not yet implemented. +//! supporting both directions: +//! +//! * **Write** — maps longitude/latitude columns into a geometry column and +//! writes GeoParquet output. +//! * **Read** — infers the GeoArrow schema and scans GeoParquet files, decoding +//! geometry columns described in the file's `geo` metadata to their native +//! GeoArrow representation. -/// DataFusion integration for GeoParquet output. +/// DataFusion integration for reading and writing GeoParquet. pub mod datafusion; diff --git a/beacon-functions/Cargo.toml b/beacon-functions/Cargo.toml index 1d749317..b58163d9 100644 --- a/beacon-functions/Cargo.toml +++ b/beacon-functions/Cargo.toml @@ -38,5 +38,6 @@ beacon-arrow-zarr = { path = "../beacon-file-formats/beacon-arrow-zarr" } beacon-arrow-ipc = { path = "../beacon-file-formats/beacon-arrow-ipc" } beacon-arrow-csv = { path = "../beacon-file-formats/beacon-arrow-csv" } beacon-arrow-parquet = { path = "../beacon-file-formats/beacon-arrow-parquet" } +beacon-arrow-geoparquet = { path = "../beacon-file-formats/beacon-arrow-geoparquet" } beacon-arrow-bbf = { path = "../beacon-file-formats/beacon-arrow-bbf" } beacon-datafusion-ext = { path = "../beacon-datafusion-ext" } \ No newline at end of file diff --git a/beacon-functions/src/file_formats/mod.rs b/beacon-functions/src/file_formats/mod.rs index 339ba122..3148afd3 100644 --- a/beacon-functions/src/file_formats/mod.rs +++ b/beacon-functions/src/file_formats/mod.rs @@ -15,6 +15,7 @@ pub mod read_arrow; pub mod read_atlas; pub mod read_bbf; pub mod read_csv; +pub mod read_geoparquet; pub mod read_netcdf; pub mod read_odv_ascii; pub mod read_parquet; @@ -35,6 +36,11 @@ pub fn register_table_functions( session_ctx.clone(), data_object_store_url.clone(), )), + Arc::new(read_geoparquet::ReadGeoParquetFunc::new( + runtime_handle.clone(), + session_ctx.clone(), + data_object_store_url.clone(), + )), Arc::new(read_arrow::ReadArrowFunc::new( runtime_handle.clone(), session_ctx.clone(), diff --git a/beacon-functions/src/file_formats/read_geoparquet.rs b/beacon-functions/src/file_formats/read_geoparquet.rs new file mode 100644 index 00000000..b9dce775 --- /dev/null +++ b/beacon-functions/src/file_formats/read_geoparquet.rs @@ -0,0 +1,134 @@ +use std::sync::Arc; + +use arrow::datatypes::{DataType, Field}; +use beacon_arrow_geoparquet::datafusion::{GeoParquetFormat, GeoParquetOptions}; +use beacon_common::{listing_url::parse_listing_table_url, super_table::SuperListingTable}; +use datafusion::{ + catalog::TableFunctionImpl, + common::plan_err, + execution::object_store::ObjectStoreUrl, + prelude::{Expr, SessionContext}, + scalar::ScalarValue, +}; + +use crate::file_formats::BeaconTableFunctionImpl; + +pub struct ReadGeoParquetFunc { + // Session Reference + runtime_handle: tokio::runtime::Handle, + session_ctx: Arc, + data_object_store_url: ObjectStoreUrl, +} + +impl ReadGeoParquetFunc { + pub fn new( + runtime_handle: tokio::runtime::Handle, + session_ctx: Arc, + data_object_store_url: ObjectStoreUrl, + ) -> Self { + Self { + runtime_handle, + session_ctx, + data_object_store_url, + } + } +} + +impl std::fmt::Debug for ReadGeoParquetFunc { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "ReadGeoParquetFunc") + } +} + +impl BeaconTableFunctionImpl for ReadGeoParquetFunc { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn description(&self) -> Option { + Some( + "Reads GeoParquet files from specified glob paths, decoding geometry columns to native GeoArrow." + .to_string(), + ) + } + + fn name(&self) -> String { + "read_geoparquet".to_string() + } + + fn arguments(&self) -> Option> { + Some(vec![Field::new( + "glob_paths", + DataType::List(Arc::new(Field::new("glob_path", DataType::Utf8, false))), + false, + )]) + } +} + +impl TableFunctionImpl for ReadGeoParquetFunc { + fn call( + &self, + args: &[datafusion::prelude::Expr], + ) -> datafusion::error::Result> { + let mut glob_paths: Vec = vec![]; + if let Some(glob_path_arg) = args.first() { + match glob_path_arg { + Expr::Literal(ScalarValue::List(values), _) => { + let string_array = values.as_ref().values(); + match string_array + .as_any() + .downcast_ref::() + { + Some(str_arr) => { + str_arr.iter().for_each(|opt_str| { + if let Some(s) = opt_str { + glob_paths.push(s.to_string()); + } + }); + } + None => { + return plan_err!( + "read_geoparquet first argument must be a List of glob paths" + ); + } + } + } + _ => { + return plan_err!( + "read_geoparquet first argument must be a List of glob paths" + ); + } + } + } else { + return plan_err!( + "read_geoparquet requires at least 1 argument: glob_paths : List" + ); + } + + tracing::debug!("read_geoparquet glob paths: {:?}", glob_paths); + + let mut listing_urls = vec![]; + for path in &glob_paths { + tracing::debug!("read_geoparquet processing path: {}", path); + listing_urls.push(parse_listing_table_url(&self.data_object_store_url, path)?); + } + + // Reading does not use the lon/lat write options; defaults are fine. + let file_format = GeoParquetFormat::new(GeoParquetOptions { + longitude_column: None, + latitude_column: None, + }); + let super_listing_table = tokio::task::block_in_place(|| { + self.runtime_handle.block_on(async move { + SuperListingTable::new( + &self.session_ctx.state(), + Arc::new(file_format), + listing_urls, + ) + .await + }) + })?; + + Ok(Arc::new(super_listing_table)) + } +} diff --git a/docs/docs/1.7.2/data-lake/datasets.md b/docs/docs/1.7.2/data-lake/datasets.md index e937b6f9..7f76f011 100644 --- a/docs/docs/1.7.2/data-lake/datasets.md +++ b/docs/docs/1.7.2/data-lake/datasets.md @@ -12,6 +12,38 @@ Native support via DataFusion. Recommended for analytical workloads due to colum - Hive-style directory partitioning is supported via `PARTITIONED BY` on [External Tables](./external-tables.md). - Compatible with files produced by DuckDB, Spark, pandas, and similar tools. +## GeoParquet + +[GeoParquet](https://geoparquet.org/) files (`.geoparquet`) are Parquet files that carry geospatial geometry columns and a `geo` metadata key. Beacon reads them in addition to writing them. + +- Geometry columns described in the file's `geo` metadata are decoded to their native [GeoArrow](https://geoarrow.org/) representation on read (a non-geospatial Parquet file is read like ordinary Parquet). +- Column projection is applied — only the columns a query selects are materialized. +- Works over local disk and S3-compatible object stores. + +Query a GeoParquet file with the [`read_geoparquet()`](../sql/table-functions.md#read_geoparquet) table function: + +```sql +SELECT * FROM read_geoparquet(['spatial/**/*.geoparquet']) LIMIT 100 +``` + +Or register a stable table name with an [External Table](./external-tables.md): + +```sql +CREATE EXTERNAL TABLE stations +STORED AS GEOPARQUET +LOCATION 'spatial/stations/*.geoparquet'; + +SELECT * FROM stations LIMIT 10; +``` + +:::tip +Beacon can also *write* GeoParquet: a query result with longitude/latitude columns is mapped into a geometry column on output. See [querying output formats](../api/querying/index.md). +::: + +:::warning +Spatial bounding-box pruning (row-group skipping via the GeoParquet `bbox` covering) is not yet applied on read — queries perform a full scan with column projection. Geometry-aware predicate pushdown is planned. +::: + ## NetCDF Streaming reads with chunk-level access — large files are read incrementally rather than loaded entirely into memory. diff --git a/docs/docs/1.7.2/data-lake/external-tables.md b/docs/docs/1.7.2/data-lake/external-tables.md index 5017af77..138a3ed2 100644 --- a/docs/docs/1.7.2/data-lake/external-tables.md +++ b/docs/docs/1.7.2/data-lake/external-tables.md @@ -48,6 +48,16 @@ STORED AS PARQUET LOCATION 'profiles/**/*.parquet' ``` +### GeoParquet + +```sql +CREATE EXTERNAL TABLE stations +STORED AS GEOPARQUET +LOCATION 'spatial/stations/*.geoparquet' +``` + +Geometry columns are decoded to their native [GeoArrow](https://geoarrow.org/) representation on read. See [GeoParquet in Supported Formats](./datasets.md#geoparquet) for details. + ### NetCDF ```sql diff --git a/docs/docs/1.7.2/sql/table-functions.md b/docs/docs/1.7.2/sql/table-functions.md index 582326c4..a8647f42 100644 --- a/docs/docs/1.7.2/sql/table-functions.md +++ b/docs/docs/1.7.2/sql/table-functions.md @@ -116,6 +116,18 @@ read_parquet(glob_paths) SELECT * FROM read_parquet(['obs/**/*.parquet']) LIMIT 100 ``` +## `read_geoparquet` + +```text +read_geoparquet(glob_paths) +``` + +Reads [GeoParquet](https://geoparquet.org/) files. Geometry columns described in the file's `geo` metadata are decoded to their native [GeoArrow](https://geoarrow.org/) representation; files without geometry are read like ordinary Parquet. + +```sql +SELECT * FROM read_geoparquet(['spatial/**/*.geoparquet']) LIMIT 100 +``` + ## `read_arrow` ```text