-
Notifications
You must be signed in to change notification settings - Fork 23
[ISSUE-153] Add blocking poll into python bindings #154
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,10 +20,13 @@ use crate::*; | |
| use arrow::array::RecordBatch; | ||
| use arrow_pyarrow::{FromPyArrow, ToPyArrow}; | ||
| use fluss::client::EARLIEST_OFFSET; | ||
| use fluss::record::to_arrow_schema; | ||
| use fluss::rpc::message::OffsetSpec; | ||
| use pyo3::types::IntoPyDict; | ||
| use pyo3_async_runtimes::tokio::future_into_py; | ||
| use std::collections::HashMap; | ||
| use std::sync::Arc; | ||
| use std::time::Duration; | ||
|
|
||
| // Time conversion constants | ||
| const MILLIS_PER_SECOND: i64 = 1_000; | ||
|
|
@@ -186,7 +189,7 @@ impl FlussTable { | |
| } | ||
|
|
||
| let rust_scanner = table_scan | ||
| .create_log_scanner() | ||
| .create_record_batch_log_scanner() | ||
| .map_err(|e| FlussError::new_err(format!("Failed to create log scanner: {e}")))?; | ||
|
|
||
| let admin = conn | ||
|
|
@@ -888,7 +891,7 @@ fn get_type_name(value: &Bound<PyAny>) -> String { | |
| /// Scanner for reading log data from a Fluss table | ||
| #[pyclass] | ||
| pub struct LogScanner { | ||
| inner: fcore::client::LogScanner, | ||
| inner: fcore::client::RecordBatchLogScanner, | ||
| admin: fcore::client::FlussAdmin, | ||
| table_info: fcore::metadata::TableInfo, | ||
| #[allow(dead_code)] | ||
|
|
@@ -933,63 +936,78 @@ impl LogScanner { | |
|
|
||
| /// Convert all data to Arrow Table | ||
| fn to_arrow(&self, py: Python) -> PyResult<Py<PyAny>> { | ||
| use std::collections::HashMap; | ||
| use std::time::Duration; | ||
|
|
||
| let mut all_batches = Vec::new(); | ||
|
|
||
| let num_buckets = self.table_info.get_num_buckets(); | ||
| let bucket_ids: Vec<i32> = (0..num_buckets).collect(); | ||
|
|
||
| // todo: after supporting list_offsets with timestamp, we can use start_timestamp and end_timestamp here | ||
| let mut stopping_offsets: HashMap<i32, i64> = TOKIO_RUNTIME | ||
| .block_on(async { | ||
| self.admin | ||
| .list_offsets( | ||
| &self.table_info.table_path, | ||
| bucket_ids.as_slice(), | ||
| OffsetSpec::Latest, | ||
| ) | ||
| .await | ||
| let mut stopping_offsets: HashMap<i32, i64> = py | ||
| .detach(|| { | ||
| TOKIO_RUNTIME.block_on(async { | ||
| self.admin | ||
| .list_offsets( | ||
| &self.table_info.table_path, | ||
| bucket_ids.as_slice(), | ||
| OffsetSpec::Latest, | ||
| ) | ||
| .await | ||
| }) | ||
| }) | ||
| .map_err(|e| FlussError::new_err(e.to_string()))?; | ||
|
|
||
| if !stopping_offsets.is_empty() { | ||
| loop { | ||
| let batch_result = TOKIO_RUNTIME | ||
| .block_on(async { self.inner.poll(Duration::from_millis(500)).await }); | ||
|
|
||
| match batch_result { | ||
| Ok(scan_records) => { | ||
| let mut result_records: Vec<fcore::record::ScanRecord> = vec![]; | ||
| for (bucket, records) in scan_records.into_records_by_buckets() { | ||
| let stopping_offset = stopping_offsets.get(&bucket.bucket_id()); | ||
|
|
||
| if stopping_offset.is_none() { | ||
| // not to include this bucket, skip records for this bucket | ||
| // since we already reach end offset for this bucket | ||
| continue; | ||
| } | ||
| if let Some(last_record) = records.last() { | ||
| let offset = last_record.offset(); | ||
| result_records.extend(records); | ||
| if offset >= stopping_offset.unwrap() - 1 { | ||
| stopping_offsets.remove(&bucket.bucket_id()); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if !result_records.is_empty() { | ||
| let arrow_batch = Utils::convert_scan_records_to_arrow(result_records); | ||
| all_batches.extend(arrow_batch); | ||
| } | ||
|
|
||
| // we have reach end offsets of all bucket | ||
| if stopping_offsets.is_empty() { | ||
| break; | ||
| } | ||
| } | ||
| Err(e) => return Err(FlussError::new_err(e.to_string())), | ||
| // Filter out buckets with no records to read (stop_at <= 0) | ||
| stopping_offsets.retain(|_, &mut v| v > 0); | ||
|
|
||
| while !stopping_offsets.is_empty() { | ||
| let scan_batches = py | ||
| .detach(|| { | ||
| TOKIO_RUNTIME | ||
| .block_on(async { self.inner.poll(Duration::from_millis(500)).await }) | ||
| }) | ||
| .map_err(|e| FlussError::new_err(e.to_string()))?; | ||
|
|
||
| if scan_batches.is_empty() { | ||
| continue; | ||
| } | ||
|
|
||
| for scan_batch in scan_batches { | ||
| let bucket_id = scan_batch.bucket().bucket_id(); | ||
|
|
||
| // Check if this bucket is still being tracked; if not, ignore the batch | ||
| let Some(&stop_at) = stopping_offsets.get(&bucket_id) else { | ||
| continue; | ||
| }; | ||
|
|
||
| let base_offset = scan_batch.base_offset(); | ||
| let last_offset = scan_batch.last_offset(); | ||
|
|
||
| // If the batch starts at or after the stop_at offset, the bucket is exhausted | ||
| if base_offset >= stop_at { | ||
| stopping_offsets.remove(&bucket_id); | ||
| continue; | ||
| } | ||
|
|
||
| let batch = if last_offset >= stop_at { | ||
| // This batch contains the target offset; slice it to keep only records | ||
| // where offset < stop_at. | ||
| let num_to_keep = (stop_at - base_offset) as usize; | ||
| let b = scan_batch.into_batch(); | ||
|
|
||
| // Safety check: ensure we don't attempt to slice more rows than the batch contains | ||
| let limit = num_to_keep.min(b.num_rows()); | ||
| b.slice(0, limit) | ||
| } else { | ||
| // The entire batch is within the desired range (all offsets < stop_at) | ||
| scan_batch.into_batch() | ||
| }; | ||
|
|
||
| all_batches.push(Arc::new(batch)); | ||
|
|
||
| // If the batch's last offset reached or passed the inclusive limit (stop_at - 1), | ||
| // we are done with this bucket. | ||
| if last_offset >= stop_at - 1 { | ||
| stopping_offsets.remove(&bucket_id); | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -1006,15 +1024,68 @@ impl LogScanner { | |
| Ok(df) | ||
| } | ||
|
|
||
| /// Poll for new records with the specified timeout | ||
| /// | ||
| /// Args: | ||
| /// timeout_ms: Timeout in milliseconds to wait for records | ||
| /// | ||
| /// Returns: | ||
| /// PyArrow Table containing the polled records | ||
| /// | ||
| /// Note: | ||
| /// - Returns an empty table (with correct schema) if no records are available | ||
| /// - When timeout expires, returns an empty table (NOT an error) | ||
| fn poll(&self, py: Python, timeout_ms: i64) -> PyResult<Py<PyAny>> { | ||
| if timeout_ms < 0 { | ||
| return Err(FlussError::new_err(format!( | ||
| "timeout_ms must be non-negative, got: {timeout_ms}" | ||
| ))); | ||
| } | ||
|
|
||
| let timeout = Duration::from_millis(timeout_ms as u64); | ||
| let scan_batches = py | ||
| .detach(|| TOKIO_RUNTIME.block_on(async { self.inner.poll(timeout).await })) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually, I'm thinking can we use
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. makes sense, thank you! |
||
| .map_err(|e| FlussError::new_err(e.to_string()))?; | ||
|
|
||
| // Convert ScanBatch to Arrow batches | ||
| if scan_batches.is_empty() { | ||
| return self.create_empty_table(py); | ||
| } | ||
|
|
||
| let arrow_batches: Vec<_> = scan_batches | ||
| .into_iter() | ||
| .map(|scan_batch| Arc::new(scan_batch.into_batch())) | ||
| .collect(); | ||
|
|
||
| Utils::combine_batches_to_table(py, arrow_batches) | ||
| } | ||
|
|
||
| /// Create an empty PyArrow table with the correct schema | ||
| fn create_empty_table(&self, py: Python) -> PyResult<Py<PyAny>> { | ||
| let arrow_schema = to_arrow_schema(self.table_info.get_row_type()) | ||
| .map_err(|e| FlussError::new_err(format!("Failed to get arrow schema: {e}")))?; | ||
| let py_schema = arrow_schema | ||
| .as_ref() | ||
| .to_pyarrow(py) | ||
| .map_err(|e| FlussError::new_err(format!("Failed to convert schema: {e}")))?; | ||
|
|
||
| let pyarrow = py.import("pyarrow")?; | ||
| let empty_table = pyarrow | ||
| .getattr("Table")? | ||
| .call_method1("from_batches", (vec![] as Vec<Py<PyAny>>, py_schema))?; | ||
|
|
||
| Ok(empty_table.into()) | ||
| } | ||
|
|
||
| fn __repr__(&self) -> String { | ||
| format!("LogScanner(table={})", self.table_info.table_path) | ||
| } | ||
| } | ||
|
|
||
| impl LogScanner { | ||
| /// Create LogScanner from core LogScanner | ||
| /// Create LogScanner from core RecordBatchLogScanner | ||
| pub fn from_core( | ||
| inner_scanner: fcore::client::LogScanner, | ||
| inner_scanner: fcore::client::RecordBatchLogScanner, | ||
| admin: fcore::client::FlussAdmin, | ||
| table_info: fcore::metadata::TableInfo, | ||
| ) -> Self { | ||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.