Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions bindings/python/example/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,28 @@ async def main():

# TODO: support to_duckdb()

# Test the new poll() method for incremental reading
print("\n--- Testing poll() method ---")
# Reset subscription to start from the beginning
log_scanner.subscribe(None, None)

# Poll with a timeout of 5000ms (5 seconds)
# Note: poll() returns an empty table (not an error) on timeout
try:
poll_result = log_scanner.poll(5000)
print(f"Number of rows: {poll_result.num_rows}")

if poll_result.num_rows > 0:
poll_df = poll_result.to_pandas()
print(f"Polled data:\n{poll_df}")
else:
print("Empty result (no records available)")
# Empty table still has schema
print(f"Schema: {poll_result.schema}")

except Exception as e:
print(f"Error during poll: {e}")

except Exception as e:
print(f"Error during scanning: {e}")

Expand Down
175 changes: 123 additions & 52 deletions bindings/python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ use crate::*;
use arrow::array::RecordBatch;
use arrow_pyarrow::{FromPyArrow, ToPyArrow};
use fluss::client::EARLIEST_OFFSET;
use fluss::record::to_arrow_schema;
use fluss::rpc::message::OffsetSpec;
use pyo3::types::IntoPyDict;
use pyo3_async_runtimes::tokio::future_into_py;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

// Time conversion constants
const MILLIS_PER_SECOND: i64 = 1_000;
Expand Down Expand Up @@ -186,7 +189,7 @@ impl FlussTable {
}

let rust_scanner = table_scan
.create_log_scanner()
.create_record_batch_log_scanner()
.map_err(|e| FlussError::new_err(format!("Failed to create log scanner: {e}")))?;

let admin = conn
Expand Down Expand Up @@ -888,7 +891,7 @@ fn get_type_name(value: &Bound<PyAny>) -> String {
/// Scanner for reading log data from a Fluss table
#[pyclass]
pub struct LogScanner {
inner: fcore::client::LogScanner,
inner: fcore::client::RecordBatchLogScanner,
admin: fcore::client::FlussAdmin,
table_info: fcore::metadata::TableInfo,
#[allow(dead_code)]
Expand Down Expand Up @@ -933,63 +936,78 @@ impl LogScanner {

/// Convert all data to Arrow Table
fn to_arrow(&self, py: Python) -> PyResult<Py<PyAny>> {
use std::collections::HashMap;
use std::time::Duration;

let mut all_batches = Vec::new();

let num_buckets = self.table_info.get_num_buckets();
let bucket_ids: Vec<i32> = (0..num_buckets).collect();

// todo: after supporting list_offsets with timestamp, we can use start_timestamp and end_timestamp here
let mut stopping_offsets: HashMap<i32, i64> = TOKIO_RUNTIME
.block_on(async {
self.admin
.list_offsets(
&self.table_info.table_path,
bucket_ids.as_slice(),
OffsetSpec::Latest,
)
.await
let mut stopping_offsets: HashMap<i32, i64> = py
.detach(|| {
TOKIO_RUNTIME.block_on(async {
self.admin
.list_offsets(
&self.table_info.table_path,
bucket_ids.as_slice(),
OffsetSpec::Latest,
)
.await
})
})
.map_err(|e| FlussError::new_err(e.to_string()))?;

if !stopping_offsets.is_empty() {
loop {
let batch_result = TOKIO_RUNTIME
.block_on(async { self.inner.poll(Duration::from_millis(500)).await });

match batch_result {
Ok(scan_records) => {
let mut result_records: Vec<fcore::record::ScanRecord> = vec![];
for (bucket, records) in scan_records.into_records_by_buckets() {
let stopping_offset = stopping_offsets.get(&bucket.bucket_id());

if stopping_offset.is_none() {
// not to include this bucket, skip records for this bucket
// since we already reach end offset for this bucket
continue;
}
if let Some(last_record) = records.last() {
let offset = last_record.offset();
result_records.extend(records);
if offset >= stopping_offset.unwrap() - 1 {
stopping_offsets.remove(&bucket.bucket_id());
}
}
}

if !result_records.is_empty() {
let arrow_batch = Utils::convert_scan_records_to_arrow(result_records);
all_batches.extend(arrow_batch);
}

// we have reach end offsets of all bucket
if stopping_offsets.is_empty() {
break;
}
}
Err(e) => return Err(FlussError::new_err(e.to_string())),
// Filter out buckets with no records to read (stop_at <= 0)
stopping_offsets.retain(|_, &mut v| v > 0);

while !stopping_offsets.is_empty() {
let scan_batches = py
.detach(|| {
TOKIO_RUNTIME
.block_on(async { self.inner.poll(Duration::from_millis(500)).await })
})
.map_err(|e| FlussError::new_err(e.to_string()))?;

if scan_batches.is_empty() {
continue;
}

for scan_batch in scan_batches {
let bucket_id = scan_batch.bucket().bucket_id();

// Check if this bucket is still being tracked; if not, ignore the batch
let Some(&stop_at) = stopping_offsets.get(&bucket_id) else {
continue;
};

let base_offset = scan_batch.base_offset();
let last_offset = scan_batch.last_offset();

// If the batch starts at or after the stop_at offset, the bucket is exhausted
if base_offset >= stop_at {
stopping_offsets.remove(&bucket_id);
continue;
}

let batch = if last_offset >= stop_at {
// This batch contains the target offset; slice it to keep only records
// where offset < stop_at.
let num_to_keep = (stop_at - base_offset) as usize;
let b = scan_batch.into_batch();

// Safety check: ensure we don't attempt to slice more rows than the batch contains
let limit = num_to_keep.min(b.num_rows());
b.slice(0, limit)
} else {
// The entire batch is within the desired range (all offsets < stop_at)
scan_batch.into_batch()
};

all_batches.push(Arc::new(batch));

// If the batch's last offset reached or passed the inclusive limit (stop_at - 1),
// we are done with this bucket.
if last_offset >= stop_at - 1 {
stopping_offsets.remove(&bucket_id);
}
}
}
Expand All @@ -1006,15 +1024,68 @@ impl LogScanner {
Ok(df)
}

/// Poll for new records with the specified timeout
///
/// Args:
/// timeout_ms: Timeout in milliseconds to wait for records
///
/// Returns:
/// PyArrow Table containing the polled records
///
/// Note:
/// - Returns an empty table (with correct schema) if no records are available
/// - When timeout expires, returns an empty table (NOT an error)
fn poll(&self, py: Python, timeout_ms: i64) -> PyResult<Py<PyAny>> {
if timeout_ms < 0 {
return Err(FlussError::new_err(format!(
"timeout_ms must be non-negative, got: {timeout_ms}"
)));
}

let timeout = Duration::from_millis(timeout_ms as u64);
let scan_batches = py
.detach(|| TOKIO_RUNTIME.block_on(async { self.inner.poll(timeout).await }))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, I'm thinking can we use scanner#poll_batches directly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, thank you!

.map_err(|e| FlussError::new_err(e.to_string()))?;

// Convert ScanBatch to Arrow batches
if scan_batches.is_empty() {
return self.create_empty_table(py);
}

let arrow_batches: Vec<_> = scan_batches
.into_iter()
.map(|scan_batch| Arc::new(scan_batch.into_batch()))
.collect();

Utils::combine_batches_to_table(py, arrow_batches)
}

/// Create an empty PyArrow table with the correct schema
fn create_empty_table(&self, py: Python) -> PyResult<Py<PyAny>> {
let arrow_schema = to_arrow_schema(self.table_info.get_row_type())
.map_err(|e| FlussError::new_err(format!("Failed to get arrow schema: {e}")))?;
let py_schema = arrow_schema
.as_ref()
.to_pyarrow(py)
.map_err(|e| FlussError::new_err(format!("Failed to convert schema: {e}")))?;

let pyarrow = py.import("pyarrow")?;
let empty_table = pyarrow
.getattr("Table")?
.call_method1("from_batches", (vec![] as Vec<Py<PyAny>>, py_schema))?;

Ok(empty_table.into())
}

fn __repr__(&self) -> String {
format!("LogScanner(table={})", self.table_info.table_path)
}
}

impl LogScanner {
/// Create LogScanner from core LogScanner
/// Create LogScanner from core RecordBatchLogScanner
pub fn from_core(
inner_scanner: fcore::client::LogScanner,
inner_scanner: fcore::client::RecordBatchLogScanner,
admin: fcore::client::FlussAdmin,
table_info: fcore::metadata::TableInfo,
) -> Self {
Expand Down
Loading