From 4a9335644ad2bc531df39df94f21c73ced6ac175 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Mon, 19 Aug 2024 20:03:02 -0400 Subject: [PATCH 01/19] Support Arrow PyCapsule for reading arrow tables and for exporting dataframes --- python/datafusion/dataframe.py | 16 ++++++++++++++ src/context.rs | 38 ++++++++++++++++++++++++++-------- src/dataframe.rs | 25 +++++++++++++++++++++- 3 files changed, 69 insertions(+), 10 deletions(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index fa7398442..4f1760135 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -524,3 +524,19 @@ def unnest_columns(self, *columns: str, preserve_nulls: bool = True) -> DataFram """ columns = [c for c in columns] return DataFrame(self.df.unnest_columns(columns, preserve_nulls=preserve_nulls)) + + def __arrow_c_stream__(self, requested_schema: pa.Schema) -> Any: + """Export an Arrow PyCapsule Stream. + + This will execute and collect the DataFrame. We will attempt to respect the + requested schema, but only trivial transformations will be applied such as only + returning the fields listed in the requested schema if their data types match + those in the DataFrame. + + Args: + requested_schema: Attempt to provide the DataFrame using this schema. + + Returns: + Arrow PyCapsule object. + """ + return self.df.__arrow_c_stream__(requested_schema) diff --git a/src/context.rs b/src/context.rs index a43599cf7..3e3793956 100644 --- a/src/context.rs +++ b/src/context.rs @@ -21,11 +21,14 @@ use std::str::FromStr; use std::sync::Arc; use datafusion::execution::session_state::SessionStateBuilder; +use arrow::array::RecordBatchReader; +use arrow::ffi_stream::ArrowArrayStreamReader; +use arrow::pyarrow::FromPyArrow; use object_store::ObjectStore; use url::Url; use uuid::Uuid; -use pyo3::exceptions::{PyKeyError, PyValueError}; +use pyo3::exceptions::{PyKeyError, PyTypeError, PyValueError}; use pyo3::prelude::*; use crate::catalog::{PyCatalog, PyTable}; @@ -474,18 +477,35 @@ impl PySessionContext { name: Option<&str>, py: Python, ) -> PyResult { - // Instantiate pyarrow Table object & convert to batches - let table = data.call_method0("to_batches")?; + let mut batches = None; + let mut schema = None; - let schema = data.getattr("schema")?; - let schema = schema.extract::>()?; + if let Ok(stream_reader) = ArrowArrayStreamReader::from_pyarrow_bound(&data) { + // Works for any object that implements __arrow_c_stream__ in pycapsule. + + schema = Some(stream_reader.schema().as_ref().to_owned()); + batches = Some(stream_reader.filter_map(|v| v.ok()).collect()); + } else if let Ok(array) = RecordBatch::from_pyarrow_bound(&data) { + // While this says RecordBatch, it will work for any object that implements + // __arrow_c_array__ in pycapsule. + + schema = Some(array.schema().as_ref().to_owned()); + batches = Some(vec![array]); + } + + if batches.is_none() || schema.is_none() { + return Err(PyTypeError::new_err( + "Expected either a Arrow Array or Arrow Stream in from_arrow_table().", + )); + } + + let batches = batches.unwrap(); + let schema = schema.unwrap(); - // Cast PyAny to RecordBatch type // Because create_dataframe() expects a vector of vectors of record batches // here we need to wrap the vector of record batches in an additional vector - let batches = table.extract::>>()?; - let list_of_batches = PyArrowType::from(vec![batches.0]); - self.create_dataframe(list_of_batches, name, Some(schema), py) + let list_of_batches = PyArrowType::from(vec![batches]); + self.create_dataframe(list_of_batches, name, Some(schema.into()), py) } /// Construct datafusion dataframe from pandas diff --git a/src/dataframe.rs b/src/dataframe.rs index 4db59d4fe..461838676 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -15,8 +15,11 @@ // specific language governing permissions and limitations // under the License. +use std::ffi::CString; use std::sync::Arc; +use arrow::array::{RecordBatchIterator, RecordBatchReader}; +use arrow::ffi_stream::FFI_ArrowArrayStream; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::pyarrow::{PyArrowType, ToPyArrow}; use datafusion::arrow::util::pretty; @@ -29,7 +32,7 @@ use datafusion_common::UnnestOptions; use pyo3::exceptions::{PyTypeError, PyValueError}; use pyo3::prelude::*; use pyo3::pybacked::PyBackedStr; -use pyo3::types::PyTuple; +use pyo3::types::{PyCapsule, PyTuple}; use tokio::task::JoinHandle; use crate::errors::py_datafusion_err; @@ -451,6 +454,26 @@ impl PyDataFrame { Ok(table) } + #[allow(unused_variables)] + fn __arrow_c_stream__<'py>( + &'py mut self, + py: Python<'py>, + requested_schema: Option>, + ) -> PyResult> { + let batches = wait_for_future(py, self.df.as_ref().clone().collect())? + .into_iter() + .map(|r| Ok(r)); + let schema = self.df.schema().to_owned().into(); + + // let reader = RecordBatchIterator::new(vec![Ok(self.clone())], self.schema()); + let reader = RecordBatchIterator::new(batches, schema); + let reader: Box = Box::new(reader); + + let ffi_stream = FFI_ArrowArrayStream::new(reader); + let stream_capsule_name = CString::new("arrow_array_stream").unwrap(); + PyCapsule::new_bound(py, ffi_stream, Some(stream_capsule_name)) + } + fn execute_stream(&self, py: Python) -> PyResult { // create a Tokio runtime to run the async code let rt = &get_tokio_runtime(py).0; From d61621c0f9086ee4245607842e61a42eb07c97a0 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 24 Aug 2024 07:48:08 -0400 Subject: [PATCH 02/19] Improve flow control on receiving record batches from arrow pycapsule --- src/context.rs | 44 ++++++++++++++++++++------------------------ 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/src/context.rs b/src/context.rs index 3e3793956..c86779ef2 100644 --- a/src/context.rs +++ b/src/context.rs @@ -477,30 +477,26 @@ impl PySessionContext { name: Option<&str>, py: Python, ) -> PyResult { - let mut batches = None; - let mut schema = None; - - if let Ok(stream_reader) = ArrowArrayStreamReader::from_pyarrow_bound(&data) { - // Works for any object that implements __arrow_c_stream__ in pycapsule. - - schema = Some(stream_reader.schema().as_ref().to_owned()); - batches = Some(stream_reader.filter_map(|v| v.ok()).collect()); - } else if let Ok(array) = RecordBatch::from_pyarrow_bound(&data) { - // While this says RecordBatch, it will work for any object that implements - // __arrow_c_array__ in pycapsule. - - schema = Some(array.schema().as_ref().to_owned()); - batches = Some(vec![array]); - } - - if batches.is_none() || schema.is_none() { - return Err(PyTypeError::new_err( - "Expected either a Arrow Array or Arrow Stream in from_arrow_table().", - )); - } - - let batches = batches.unwrap(); - let schema = schema.unwrap(); + let (schema, batches) = + if let Ok(stream_reader) = ArrowArrayStreamReader::from_pyarrow_bound(&data) { + // Works for any object that implements __arrow_c_stream__ in pycapsule. + + let schema = stream_reader.schema().as_ref().to_owned(); + let batches = stream_reader + .collect::, arrow::error::ArrowError>>() + .map_err(DataFusionError::from)?; + + (schema, batches) + } else if let Ok(array) = RecordBatch::from_pyarrow_bound(&data) { + // While this says RecordBatch, it will work for any object that implements + // __arrow_c_array__ in pycapsule. + + (array.schema().as_ref().to_owned(), vec![array]) + } else { + return Err(PyTypeError::new_err( + "Expected either a Arrow Array or Arrow Stream in from_arrow_table().", + )); + }; // Because create_dataframe() expects a vector of vectors of record batches // here we need to wrap the vector of record batches in an additional vector From a813dc994c65bed6b55fa5b4c4f0b23185a2c905 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 24 Aug 2024 07:48:56 -0400 Subject: [PATCH 03/19] Update code comment --- src/context.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/context.rs b/src/context.rs index c86779ef2..38f5f9564 100644 --- a/src/context.rs +++ b/src/context.rs @@ -489,7 +489,7 @@ impl PySessionContext { (schema, batches) } else if let Ok(array) = RecordBatch::from_pyarrow_bound(&data) { // While this says RecordBatch, it will work for any object that implements - // __arrow_c_array__ in pycapsule. + // __arrow_c_array__ and returns a StructArray. (array.schema().as_ref().to_owned(), vec![array]) } else { From 533c915c5450880256a052cfe062b465d2705493 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 24 Aug 2024 09:22:46 -0400 Subject: [PATCH 04/19] Add function to project record batch into desired schema --- src/dataframe.rs | 85 +++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 77 insertions(+), 8 deletions(-) diff --git a/src/dataframe.rs b/src/dataframe.rs index 461838676..114960aff 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -18,8 +18,11 @@ use std::ffi::CString; use std::sync::Arc; -use arrow::array::{RecordBatchIterator, RecordBatchReader}; +use arrow::array::{new_null_array, RecordBatch, RecordBatchIterator, RecordBatchReader}; +use arrow::compute::can_cast_types; +use arrow::error::ArrowError; use arrow::ffi_stream::FFI_ArrowArrayStream; +use arrow::pyarrow::FromPyArrow; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::pyarrow::{PyArrowType, ToPyArrow}; use datafusion::arrow::util::pretty; @@ -454,19 +457,29 @@ impl PyDataFrame { Ok(table) } - #[allow(unused_variables)] fn __arrow_c_stream__<'py>( &'py mut self, py: Python<'py>, requested_schema: Option>, ) -> PyResult> { - let batches = wait_for_future(py, self.df.as_ref().clone().collect())? - .into_iter() - .map(|r| Ok(r)); - let schema = self.df.schema().to_owned().into(); + let mut batches = wait_for_future(py, self.df.as_ref().clone().collect())?; + let mut schema: Schema = self.df.schema().to_owned().into(); + + if let Some(schema_capsule) = requested_schema { + let desired_schema: Schema = Schema::from_pyarrow_bound(&schema_capsule)?; + schema = project_schema(schema, desired_schema) + .map_err(|e| DataFusionError::ArrowError(e))?; + + batches = batches + .into_iter() + .map(|record_batch| record_batch_into_schema(record_batch, &schema)) + .collect::, ArrowError>>() + .map_err(|e| DataFusionError::ArrowError(e))?; + } - // let reader = RecordBatchIterator::new(vec![Ok(self.clone())], self.schema()); - let reader = RecordBatchIterator::new(batches, schema); + let batches_wrapped = batches.into_iter().map(|r| Ok(r)); + + let reader = RecordBatchIterator::new(batches_wrapped, Arc::new(schema)); let reader: Box = Box::new(reader); let ffi_stream = FFI_ArrowArrayStream::new(reader); @@ -562,3 +575,59 @@ fn print_dataframe(py: Python, df: DataFrame) -> PyResult<()> { print.call1((result,))?; Ok(()) } + +fn project_schema(from_schema: Schema, to_schema: Schema) -> Result { + let merged_schema = Schema::try_merge(vec![from_schema, to_schema.clone()])?; + + let project_indices: Vec = to_schema + .fields + .iter() + .map(|field| field.name()) + .filter_map(|field_name| merged_schema.index_of(field_name).ok()) + .collect(); + + merged_schema.project(&project_indices) +} + +fn record_batch_into_schema( + record_batch: RecordBatch, + schema: &Schema, +) -> Result { + let schema = Arc::new(schema.clone()); + let base_schema = record_batch.schema(); + if base_schema.fields().len() == 0 { + // Nothing to project + return Ok(RecordBatch::new_empty(schema)); + } + + let array_size = record_batch.column(0).len(); + let mut data_arrays = Vec::with_capacity(schema.fields().len()); + + for field in schema.fields() { + let desired_data_type = field.data_type(); + if let Some(original_data) = record_batch.column_by_name(field.name()) { + let original_data_type = original_data.data_type(); + + if can_cast_types(original_data_type, desired_data_type) { + data_arrays.push(arrow::compute::kernels::cast( + original_data, + desired_data_type, + )?); + } else if field.is_nullable() { + data_arrays.push(new_null_array(desired_data_type, array_size)); + } else { + return Err(ArrowError::CastError(format!("Attempting to cast to non-nullable and non-castable field {} during schema projection.", field.name()))); + } + } else { + if !field.is_nullable() { + return Err(ArrowError::CastError(format!( + "Attempting to set null to non-nullable field {} during schema projection.", + field.name() + ))); + } + data_arrays.push(new_null_array(desired_data_type, array_size)); + } + } + + RecordBatch::try_new(schema, data_arrays) +} From d94885ca93a589e2083103c671c7d2480d3c4f86 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 24 Aug 2024 10:44:56 -0400 Subject: [PATCH 05/19] Validate the pycapsule and get the pointer when successful --- src/dataframe.rs | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/src/dataframe.rs b/src/dataframe.rs index 114960aff..979eb7b07 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use arrow::array::{new_null_array, RecordBatch, RecordBatchIterator, RecordBatchReader}; use arrow::compute::can_cast_types; use arrow::error::ArrowError; +use arrow::ffi::FFI_ArrowSchema; use arrow::ffi_stream::FFI_ArrowArrayStream; use arrow::pyarrow::FromPyArrow; use datafusion::arrow::datatypes::Schema; @@ -466,7 +467,11 @@ impl PyDataFrame { let mut schema: Schema = self.df.schema().to_owned().into(); if let Some(schema_capsule) = requested_schema { - let desired_schema: Schema = Schema::from_pyarrow_bound(&schema_capsule)?; + validate_pycapsule(&schema_capsule, "arrow_schema")?; + + let schema_ptr = unsafe { schema_capsule.reference::() }; + let desired_schema = Schema::try_from(schema_ptr).map_err(DataFusionError::from)?; + schema = project_schema(schema, desired_schema) .map_err(|e| DataFusionError::ArrowError(e))?; @@ -631,3 +636,22 @@ fn record_batch_into_schema( RecordBatch::try_new(schema, data_arrays) } + +fn validate_pycapsule(capsule: &Bound, name: &str) -> PyResult<()> { + let capsule_name = capsule.name()?; + if capsule_name.is_none() { + return Err(PyValueError::new_err( + "Expected schema PyCapsule to have name set.", + )); + } + + let capsule_name = capsule_name.unwrap().to_str()?; + if capsule_name != name { + return Err(PyValueError::new_err(format!( + "Expected name '{}' in PyCapsule, instead got '{}'", + name, capsule_name + ))); + } + + Ok(()) +} From 932c8c25292c8f04abb79004b7eb4df25294414a Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 25 Aug 2024 05:09:27 -0400 Subject: [PATCH 06/19] Add unit test for export via pycapsule --- python/datafusion/tests/test_dataframe.py | 57 +++++++++++++++++++---- 1 file changed, 47 insertions(+), 10 deletions(-) diff --git a/python/datafusion/tests/test_dataframe.py b/python/datafusion/tests/test_dataframe.py index e5e0c9c8a..c6abc2007 100644 --- a/python/datafusion/tests/test_dataframe.py +++ b/python/datafusion/tests/test_dataframe.py @@ -835,13 +835,50 @@ def test_write_compressed_parquet_missing_compression_level(df, tmp_path, compre df.write_parquet(str(path), compression=compression) -# ctx = SessionContext() - -# # create a RecordBatch and a new DataFrame from it -# batch = pa.RecordBatch.from_arrays( -# [pa.array([1, 2, 3]), pa.array([4, 5, 6]), pa.array([8, 5, 8])], -# names=["a", "b", "c"], -# ) - -# df = ctx.create_dataframe([[batch]]) -# test_execute_stream(df) +def test_dataframe_export(df) -> None: + import nanoarrow + + # Guarantees that we have the canonical implementation + # reading our dataframe export + table = pa.table(df) + assert table.num_columns == 3 + assert table.num_rows == 3 + + # nanoarrow is an independent library that should also be + # able to import our dataframe + table = nanoarrow.Array(df) + assert len(table) == 3 + assert len(table[0].as_py()) == 3 + + desired_schema = pa.schema([("a", pa.int64())]) + + # Verify we can request a schema + table = pa.table(df, schema=desired_schema) + assert table.num_columns == 1 + assert table.num_rows == 3 + + # Expect a table of nulls if the schema don't overlap + desired_schema = pa.schema([("g", pa.string())]) + table = pa.table(df, schema=desired_schema) + assert table.num_columns == 1 + assert table.num_rows == 3 + for i in range(0, 3): + assert table[0][i].as_py() is None + + # Expect an error when we cannot convert schema + desired_schema = pa.schema([("a", pa.float32())]) + failed_convert = False + try: + table = pa.table(df, schema=desired_schema) + except Exception: + failed_convert = True + assert failed_convert + + # Expect an error when we have a not set non-nullable + desired_schema = pa.schema([("g", pa.string(), False)]) + failed_convert = False + try: + table = pa.table(df, schema=desired_schema) + except Exception: + failed_convert = True + assert failed_convert From a404a686da9195d6955dc6f4bac3f694da089360 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 25 Aug 2024 05:10:03 -0400 Subject: [PATCH 07/19] Readability --- src/context.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/context.rs b/src/context.rs index 38f5f9564..c15d73148 100644 --- a/src/context.rs +++ b/src/context.rs @@ -483,7 +483,7 @@ impl PySessionContext { let schema = stream_reader.schema().as_ref().to_owned(); let batches = stream_reader - .collect::, arrow::error::ArrowError>>() + .collect::, arrow::error::ArrowError>>() .map_err(DataFusionError::from)?; (schema, batches) From 1ed4632608d94d1093dd486cd514b32c1e16ad70 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 25 Aug 2024 05:11:49 -0400 Subject: [PATCH 08/19] Remove nanoarrow so we don't add another dependency that isn't gaining much over pyarrow --- python/datafusion/tests/test_dataframe.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/python/datafusion/tests/test_dataframe.py b/python/datafusion/tests/test_dataframe.py index c6abc2007..1c081dd53 100644 --- a/python/datafusion/tests/test_dataframe.py +++ b/python/datafusion/tests/test_dataframe.py @@ -836,20 +836,12 @@ def test_write_compressed_parquet_missing_compression_level(df, tmp_path, compre def test_dataframe_export(df) -> None: - import nanoarrow - # Guarantees that we have the canonical implementation # reading our dataframe export table = pa.table(df) assert table.num_columns == 3 assert table.num_rows == 3 - # nanoarrow is an independent library that should also be - # able to import our dataframe - table = nanoarrow.Array(df) - assert len(table) == 3 - assert len(table[0].as_py()) == 3 - desired_schema = pa.schema([("a", pa.int64())]) # Verify we can request a schema From d1fe4df33e8ae760440b928ce9e42a81dcfc7db2 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 25 Aug 2024 05:25:07 -0400 Subject: [PATCH 09/19] Remove unused import --- src/dataframe.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/dataframe.rs b/src/dataframe.rs index 979eb7b07..086cd550a 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -23,7 +23,6 @@ use arrow::compute::can_cast_types; use arrow::error::ArrowError; use arrow::ffi::FFI_ArrowSchema; use arrow::ffi_stream::FFI_ArrowArrayStream; -use arrow::pyarrow::FromPyArrow; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::pyarrow::{PyArrowType, ToPyArrow}; use datafusion::arrow::util::pretty; From 61473df0f2c40d9544fb05114436964fd8caafde Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 25 Aug 2024 05:37:56 -0400 Subject: [PATCH 10/19] Add unit test to check for reading from record batch streams --- python/datafusion/tests/test_context.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/python/datafusion/tests/test_context.py b/python/datafusion/tests/test_context.py index 66d7e013a..2bb914e74 100644 --- a/python/datafusion/tests/test_context.py +++ b/python/datafusion/tests/test_context.py @@ -166,6 +166,21 @@ def test_from_arrow_table(ctx): assert df.collect()[0].num_rows == 3 +def test_from_arrow_record_batch_reader(ctx) -> None: + schema = pa.schema([("a", pa.int64())]) + + def iter_record_batches(): + for i in range(2): + yield pa.RecordBatch.from_arrays([pa.array([1, 2, 3])], schema=schema) + + reader = pa.RecordBatchReader.from_batches(schema, iter_record_batches()) + df = ctx.from_arrow_table(reader) + assert df + assert isinstance(df, DataFrame) + assert df.schema().names == ["a"] + assert df.count() == 6 + + def test_from_arrow_table_with_name(ctx): # create a PyArrow table data = {"a": [1, 2, 3], "b": [4, 5, 6]} From 0bbb293eec27e4042d5074fc2046d53f7a73a2a8 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 25 Aug 2024 08:13:32 -0400 Subject: [PATCH 11/19] Update unit test to try a variety of arrow data sources on import --- python/datafusion/tests/test_context.py | 33 ++++++++++++++++++------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/python/datafusion/tests/test_context.py b/python/datafusion/tests/test_context.py index 2bb914e74..16e7122ba 100644 --- a/python/datafusion/tests/test_context.py +++ b/python/datafusion/tests/test_context.py @@ -166,19 +166,34 @@ def test_from_arrow_table(ctx): assert df.collect()[0].num_rows == 3 -def test_from_arrow_record_batch_reader(ctx) -> None: - schema = pa.schema([("a", pa.int64())]) +@pytest.mark.skip +def record_batch_generator(num_batches: int): + schema = pa.schema([("a", pa.int64()), ("b", pa.int64())]) + for i in range(num_batches): + yield pa.RecordBatch.from_arrays( + [pa.array([1, 2, 3]), pa.array([4, 5, 6])], schema=schema + ) - def iter_record_batches(): - for i in range(2): - yield pa.RecordBatch.from_arrays([pa.array([1, 2, 3])], schema=schema) - reader = pa.RecordBatchReader.from_batches(schema, iter_record_batches()) - df = ctx.from_arrow_table(reader) +@pytest.mark.parametrize( + "source", + [ + # __arrow_c_array__ sources + pa.array([{"a": 1, "b": 4}, {"a": 2, "b": 5}, {"a": 3, "b": 6}]), + # __arrow_c_stream__ sources + pa.RecordBatch.from_pydict({"a": [1, 2, 3], "b": [4, 5, 6]}), + pa.RecordBatchReader.from_batches( + pa.schema([("a", pa.int64()), ("b", pa.int64())]), record_batch_generator(1) + ), + pa.Table.from_pydict({"a": [1, 2, 3], "b": [4, 5, 6]}), + ], +) +def test_from_arrow_sources(ctx, source) -> None: + df = ctx.from_arrow(source) assert df assert isinstance(df, DataFrame) - assert df.schema().names == ["a"] - assert df.count() == 6 + assert df.schema().names == ["a", "b"] + assert df.count() == 3 def test_from_arrow_table_with_name(ctx): From 2586b3fe4ef03ceb06d37ed447ec02e79600feaf Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 25 Aug 2024 08:14:06 -0400 Subject: [PATCH 12/19] Simplify dataframe creation in unit test --- python/datafusion/tests/test_dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/datafusion/tests/test_dataframe.py b/python/datafusion/tests/test_dataframe.py index 1c081dd53..477bc0fce 100644 --- a/python/datafusion/tests/test_dataframe.py +++ b/python/datafusion/tests/test_dataframe.py @@ -47,7 +47,7 @@ def df(): names=["a", "b", "c"], ) - return ctx.create_dataframe([[batch]]) + return ctx.from_arrow(batch) @pytest.fixture From 43a869b7cc57b575a8b499f363e535a4c4b6154d Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 25 Aug 2024 08:21:20 -0400 Subject: [PATCH 13/19] Update user documenation to include usage of import and exporting arrow objects --- docs/source/user-guide/io/arrow.rst | 71 +++++++++++++++++++++++++++++ docs/source/user-guide/io/index.rst | 6 +-- 2 files changed, 74 insertions(+), 3 deletions(-) create mode 100644 docs/source/user-guide/io/arrow.rst diff --git a/docs/source/user-guide/io/arrow.rst b/docs/source/user-guide/io/arrow.rst new file mode 100644 index 000000000..a7554e2e3 --- /dev/null +++ b/docs/source/user-guide/io/arrow.rst @@ -0,0 +1,71 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, +.. software distributed under the License is distributed on an +.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +.. KIND, either express or implied. See the License for the +.. specific language governing permissions and limitations +.. under the License. + +Arrow +===== + +DataFusion implements the +`Apache Arrow PyCapsule interface `_ +for importing and exporting DataFrames with zero copy. With this feature, any Python +project that implements this interface can share data back and forth with DataFusion +with zero copy. + +We can demonstrate using `pyarrow `_. + +Importing to DataFusion +----------------------- + +Here we will create an Arrow table and import it to DataFusion. + +To import an Arrow table, use :py:func:`datafusion.context.SessionContext.from_arrow`. +This will accept any Python object that implements +`__arrow_c_stream__ `_ +or `__arrow_c_array__ `_ +and returns a ``StructArray``. Common pyarrow sources you can use are: + +- `Array `_ (but it must return a Struct Array) +- `Record Batch `_ +- `Record Batch Reader `_ +- `Table `_ + +.. ipython:: python + + from datafusion import SessionContext + import pyarrow as pa + + data = {"a": [1, 2, 3], "b": [4, 5, 6]} + table = pa.Table.from_pydict(data) + + ctx = SessionContext() + df = ctx.from_arrow(table) + df + +Exporting from DataFusion +------------------------- + +DataFusion DataFrames implement ``__arrow_c_stream__`` PyCapsule interface, so any +Python library that accepts these can import a DataFusion DataFrame directly. It is +important to note that this will cause the DataFrame execution to happen, which may be +a time consuming task. That is, you will cause a :py:func:`datafusion.dataframe.DataFrame.collect` +operation call to occur. + + +.. ipython:: python + + df = df.select((col("a") * lit(1.5)).alias("c"), lit("df").alias("d")) + pa.table(df) + diff --git a/docs/source/user-guide/io/index.rst b/docs/source/user-guide/io/index.rst index af08240ff..05411327e 100644 --- a/docs/source/user-guide/io/index.rst +++ b/docs/source/user-guide/io/index.rst @@ -21,8 +21,8 @@ IO .. toctree:: :maxdepth: 2 + arrow + avro csv - parquet json - avro - + parquet From 06e16fbce1b56339e32f77c8863ceade9fe159be Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 25 Aug 2024 08:22:09 -0400 Subject: [PATCH 14/19] Rename from_arrow_table to from_arrow since we now support multiple types of input --- python/datafusion/context.py | 23 +++++++++++++++++------ src/context.rs | 10 +++++----- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/python/datafusion/context.py b/python/datafusion/context.py index d4e50cfe2..e6d31d82b 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -586,19 +586,30 @@ def from_pydict( """ return DataFrame(self.ctx.from_pydict(data, name)) - def from_arrow_table( - self, data: pyarrow.Table, name: str | None = None - ) -> DataFrame: - """Create a :py:class:`~datafusion.dataframe.DataFrame` from an Arrow table. + def from_arrow(self, data: Any, name: str | None = None) -> DataFrame: + """Create a :py:class:`~datafusion.dataframe.DataFrame` from an Arrow source. + + The Arrow data source can be any object that implements either + ``__arrow_c_stream__`` or ``__arrow_c_array__``. For the latter, it must return + a struct array. Common examples of sources from pyarrow include Args: - data: Arrow table. + data: Arrow data source. name: Name of the DataFrame. Returns: DataFrame representation of the Arrow table. """ - return DataFrame(self.ctx.from_arrow_table(data, name)) + return DataFrame(self.ctx.from_arrow(data, name)) + + def from_arrow_table( + self, data: pyarrow.Table, name: str | None = None + ) -> DataFrame: + """Create a :py:class:`~datafusion.dataframe.DataFrame` from an Arrow table. + + This is an alias for :py:func:`from_arrow`. + """ + return self.from_arrow(data, name) def from_pandas(self, data: pandas.DataFrame, name: str | None = None) -> DataFrame: """Create a :py:class:`~datafusion.dataframe.DataFrame` from a Pandas DataFrame. diff --git a/src/context.rs b/src/context.rs index c15d73148..04079eb79 100644 --- a/src/context.rs +++ b/src/context.rs @@ -447,7 +447,7 @@ impl PySessionContext { let table = table_class.call_method1("from_pylist", args)?; // Convert Arrow Table to datafusion DataFrame - let df = self.from_arrow_table(table, name, py)?; + let df = self.from_arrow(table, name, py)?; Ok(df) } @@ -466,12 +466,12 @@ impl PySessionContext { let table = table_class.call_method1("from_pydict", args)?; // Convert Arrow Table to datafusion DataFrame - let df = self.from_arrow_table(table, name, py)?; + let df = self.from_arrow(table, name, py)?; Ok(df) } /// Construct datafusion dataframe from Arrow Table - pub fn from_arrow_table( + pub fn from_arrow( &mut self, data: Bound<'_, PyAny>, name: Option<&str>, @@ -520,7 +520,7 @@ impl PySessionContext { let table = table_class.call_method1("from_pandas", args)?; // Convert Arrow Table to datafusion DataFrame - let df = self.from_arrow_table(table, name, py)?; + let df = self.from_arrow(table, name, py)?; Ok(df) } @@ -534,7 +534,7 @@ impl PySessionContext { let table = data.call_method0("to_arrow")?; // Convert Arrow Table to datafusion DataFrame - let df = self.from_arrow_table(table, name, data.py())?; + let df = self.from_arrow(table, name, data.py())?; Ok(df) } From afe6420b3ec0d3041d0ba5009025c0f827749e5d Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Tue, 27 Aug 2024 06:23:28 -0400 Subject: [PATCH 15/19] Add admonition box to warn user --- docs/source/user-guide/io/arrow.rst | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/docs/source/user-guide/io/arrow.rst b/docs/source/user-guide/io/arrow.rst index a7554e2e3..d571aa99c 100644 --- a/docs/source/user-guide/io/arrow.rst +++ b/docs/source/user-guide/io/arrow.rst @@ -18,7 +18,7 @@ Arrow ===== -DataFusion implements the +DataFusion implements the `Apache Arrow PyCapsule interface `_ for importing and exporting DataFrames with zero copy. With this feature, any Python project that implements this interface can share data back and forth with DataFusion @@ -58,10 +58,12 @@ Exporting from DataFusion ------------------------- DataFusion DataFrames implement ``__arrow_c_stream__`` PyCapsule interface, so any -Python library that accepts these can import a DataFusion DataFrame directly. It is -important to note that this will cause the DataFrame execution to happen, which may be -a time consuming task. That is, you will cause a :py:func:`datafusion.dataframe.DataFrame.collect` -operation call to occur. +Python library that accepts these can import a DataFusion DataFrame directly. + +.. warning:: + It is important to note that this will cause the DataFrame execution to happen, which may be + a time consuming task. That is, you will cause a + :py:func:`datafusion.dataframe.DataFrame.collect` operation call to occur. .. ipython:: python From a5b4e39a5d7a77548d1bcab8538f50b739aa6b04 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Tue, 27 Aug 2024 06:27:52 -0400 Subject: [PATCH 16/19] Mark from_arrow_table as deprecated --- examples/import.py | 2 +- python/datafusion/context.py | 1 + python/datafusion/tests/test_context.py | 8 ++++---- src/context.rs | 2 +- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/examples/import.py b/examples/import.py index a249a1c4e..cd965cb46 100644 --- a/examples/import.py +++ b/examples/import.py @@ -54,5 +54,5 @@ # Convert Arrow Table to datafusion DataFrame arrow_table = pa.Table.from_pydict({"a": [1, 2, 3], "b": [4, 5, 6]}) -df = ctx.from_arrow_table(arrow_table) +df = ctx.from_arrow(arrow_table) assert type(df) == datafusion.DataFrame diff --git a/python/datafusion/context.py b/python/datafusion/context.py index e6d31d82b..283f71e1e 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -602,6 +602,7 @@ def from_arrow(self, data: Any, name: str | None = None) -> DataFrame: """ return DataFrame(self.ctx.from_arrow(data, name)) + @deprecated("Use ``from_arrow`` instead.") def from_arrow_table( self, data: pyarrow.Table, name: str | None = None ) -> DataFrame: diff --git a/python/datafusion/tests/test_context.py b/python/datafusion/tests/test_context.py index 16e7122ba..dbf6388fc 100644 --- a/python/datafusion/tests/test_context.py +++ b/python/datafusion/tests/test_context.py @@ -156,7 +156,7 @@ def test_from_arrow_table(ctx): table = pa.Table.from_pydict(data) # convert to DataFrame - df = ctx.from_arrow_table(table) + df = ctx.from_arrow(table) tables = list(ctx.catalog().database().names()) assert df @@ -202,7 +202,7 @@ def test_from_arrow_table_with_name(ctx): table = pa.Table.from_pydict(data) # convert to DataFrame with optional name - df = ctx.from_arrow_table(table, name="tbl") + df = ctx.from_arrow(table, name="tbl") tables = list(ctx.catalog().database().names()) assert df @@ -215,7 +215,7 @@ def test_from_arrow_table_empty(ctx): table = pa.Table.from_pydict(data, schema=schema) # convert to DataFrame - df = ctx.from_arrow_table(table) + df = ctx.from_arrow(table) tables = list(ctx.catalog().database().names()) assert df @@ -230,7 +230,7 @@ def test_from_arrow_table_empty_no_schema(ctx): table = pa.Table.from_pydict(data) # convert to DataFrame - df = ctx.from_arrow_table(table) + df = ctx.from_arrow(table) tables = list(ctx.catalog().database().names()) assert df diff --git a/src/context.rs b/src/context.rs index 04079eb79..f0240d86a 100644 --- a/src/context.rs +++ b/src/context.rs @@ -494,7 +494,7 @@ impl PySessionContext { (array.schema().as_ref().to_owned(), vec![array]) } else { return Err(PyTypeError::new_err( - "Expected either a Arrow Array or Arrow Stream in from_arrow_table().", + "Expected either a Arrow Array or Arrow Stream in from_arrow().", )); }; From 7cc807d79f30bb0d5a524e9e2e3ed059978b9ef9 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Tue, 27 Aug 2024 06:30:18 -0400 Subject: [PATCH 17/19] Clean up error handling --- src/dataframe.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/dataframe.rs b/src/dataframe.rs index 086cd550a..22b05226c 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -471,17 +471,16 @@ impl PyDataFrame { let schema_ptr = unsafe { schema_capsule.reference::() }; let desired_schema = Schema::try_from(schema_ptr).map_err(DataFusionError::from)?; - schema = project_schema(schema, desired_schema) - .map_err(|e| DataFusionError::ArrowError(e))?; + schema = project_schema(schema, desired_schema).map_err(DataFusionError::ArrowError)?; batches = batches .into_iter() .map(|record_batch| record_batch_into_schema(record_batch, &schema)) .collect::, ArrowError>>() - .map_err(|e| DataFusionError::ArrowError(e))?; + .map_err(DataFusionError::ArrowError)?; } - let batches_wrapped = batches.into_iter().map(|r| Ok(r)); + let batches_wrapped = batches.into_iter().map(Ok); let reader = RecordBatchIterator::new(batches_wrapped, Arc::new(schema)); let reader: Box = Box::new(reader); From babeb774177af617bfc2a1dc24d04e8001debb34 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Tue, 27 Aug 2024 12:20:35 -0400 Subject: [PATCH 18/19] Marking was not required Co-authored-by: Michael J Ward --- python/datafusion/tests/test_context.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/datafusion/tests/test_context.py b/python/datafusion/tests/test_context.py index dbf6388fc..0184280c2 100644 --- a/python/datafusion/tests/test_context.py +++ b/python/datafusion/tests/test_context.py @@ -166,7 +166,6 @@ def test_from_arrow_table(ctx): assert df.collect()[0].num_rows == 3 -@pytest.mark.skip def record_batch_generator(num_batches: int): schema = pa.schema([("a", pa.int64()), ("b", pa.int64())]) for i in range(num_batches): From 955d7d59f6122fb95b63e6655bc88950be82e1b1 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 28 Aug 2024 04:40:57 -0400 Subject: [PATCH 19/19] Apply linting after rebase --- src/context.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/context.rs b/src/context.rs index f0240d86a..4433d94c2 100644 --- a/src/context.rs +++ b/src/context.rs @@ -20,10 +20,10 @@ use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; -use datafusion::execution::session_state::SessionStateBuilder; use arrow::array::RecordBatchReader; use arrow::ffi_stream::ArrowArrayStreamReader; use arrow::pyarrow::FromPyArrow; +use datafusion::execution::session_state::SessionStateBuilder; use object_store::ObjectStore; use url::Url; use uuid::Uuid;