-
Notifications
You must be signed in to change notification settings - Fork 492
[docs] add Python client documentation #2461
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,213 @@ | ||
| --- | ||
| title: "Python Client" | ||
| sidebar_position: 2 | ||
| --- | ||
|
|
||
| # Fluss Python Client | ||
|
|
||
| ## Overview | ||
| The Fluss Python Client provides an interface for interacting with Fluss clusters. It supports asynchronous operations for managing resources and handling data. | ||
|
|
||
| The client provides two main APIs: | ||
| * **Admin API**: For managing databases, tables, partitions, and retrieving metadata. | ||
| * **Table API**: For reading from and writing to Fluss tables. | ||
|
|
||
| ## Installation | ||
| The Fluss Python client is provided as bindings within the Rust client repository. To install it from source: | ||
|
|
||
| ```bash | ||
| git clone https://github.com/apache/fluss-rust.git | ||
| cd fluss-rust/bindings/python | ||
| pip install . | ||
| ``` | ||
|
|
||
| ## Initialization | ||
|
|
||
| The `Connection` object is the entry point for interacting with Fluss. It is created using `Connection.create()` and requires a configuration dictionary. | ||
|
|
||
| ```python | ||
| from fluss.client import Connection | ||
| from fluss.config import Configuration | ||
|
|
||
| # Create configuration | ||
| conf = Configuration() | ||
| # Adjust according to where your Fluss cluster is running | ||
| conf.set_string("bootstrap.servers", "localhost:9123") | ||
|
|
||
| # Create connection | ||
| connection = Connection(conf) | ||
|
|
||
| # Obtain Admin instance | ||
| admin = connection.get_admin() | ||
| databases = admin.list_databases() | ||
| print(databases) | ||
|
|
||
| # Obtain Table instance | ||
| table = connection.get_table("my_db.my_table") | ||
| print(table.get_table_info()) | ||
|
Comment on lines
+46
to
+47
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While this follows the Java API flow, should we consider a flow that does not cause error as user follow it? If the server is a fresh one, would an Error occur here? |
||
| ``` | ||
|
|
||
| ### SASL Authentication | ||
| If your Fluss cluster uses SASL authentication, configure the security properties: | ||
|
|
||
| ```python | ||
| conf = Configuration() | ||
| conf.set_string("bootstrap.servers", "localhost:9123") | ||
| conf.set_string("client.security.protocol", "sasl") | ||
| conf.set_string("client.security.sasl.mechanism", "PLAIN") | ||
| conf.set_string("client.security.sasl.username", "alice") | ||
| conf.set_string("client.security.sasl.password", "alice-secret") | ||
|
Comment on lines
+56
to
+59
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you confirm if these are currently honoured? |
||
|
|
||
| connection = Connection(conf) | ||
| ``` | ||
|
|
||
| ## Admin API | ||
|
|
||
| The `Admin` API allows you to manage databases and tables. | ||
|
|
||
| ### Creating a Database | ||
|
|
||
| ```python | ||
| from fluss.client.admin import DatabaseDescriptor | ||
|
|
||
| # Create database descriptor | ||
| descriptor = DatabaseDescriptor() | ||
| descriptor.comment = "This is a test database" | ||
| descriptor.add_custom_property("owner", "data-team") | ||
|
|
||
| # Create database (ignore_if_exists=True) | ||
| admin.create_database("my_db", descriptor, True) | ||
| ``` | ||
|
|
||
| ### Creating a Table | ||
|
|
||
| ```python | ||
| from fluss.types import DataTypes | ||
| from fluss.metadata import Schema, TableDescriptor | ||
|
|
||
| # Define schema | ||
| schema = Schema.new_builder() \ | ||
| .column("id", DataTypes.STRING()) \ | ||
| .column("age", DataTypes.INT()) \ | ||
| .column("created_at", DataTypes.TIMESTAMP()) \ | ||
| .column("is_active", DataTypes.BOOLEAN()) \ | ||
| .primary_key(["id"]) \ | ||
| .build() | ||
|
|
||
| # Create table descriptor | ||
| table_descriptor = TableDescriptor.builder() \ | ||
| .schema(schema) \ | ||
| .distributed_by(1, ["id"]) \ | ||
| .build() | ||
|
|
||
| # Create table | ||
| table_path = "my_db.user_table" | ||
| admin.create_table(table_path, table_descriptor, False) | ||
|
|
||
| # Get table info | ||
| table_info = admin.get_table_info(table_path) | ||
| print(table_info) | ||
| ``` | ||
|
|
||
| ## Table API | ||
|
|
||
| ### Writers | ||
|
|
||
| To write data, first obtain a `Table` instance. Fluss supports `UpsertWriter` for Primary Key tables and `AppendWriter` for Log tables. | ||
|
|
||
| ```python | ||
| table = connection.get_table("my_db.user_table") | ||
| ``` | ||
|
|
||
| #### Writing to a Primary Key Table | ||
|
|
||
| ```python | ||
| from datetime import datetime | ||
|
|
||
| # Create writer | ||
| writer = table.new_upsert().create_writer() | ||
|
|
||
| # Prepare data as a list of values matching the schema | ||
| row1 = ["1", 20, datetime.now(), True] | ||
| row2 = ["2", 22, datetime.now(), True] | ||
|
|
||
| # Upsert data and flush | ||
| writer.upsert(row1) | ||
| writer.upsert(row2) | ||
|
|
||
| # Flush to ensure data is sent | ||
| writer.flush() | ||
| ``` | ||
|
|
||
| #### Writing to a Log Table | ||
|
|
||
| ```python | ||
| # Create append writer | ||
| writer = table.new_append().create_writer() | ||
|
|
||
| # Append data | ||
| writer.append(["user_log_1", "login_event"]) | ||
| writer.flush() | ||
| ``` | ||
|
|
||
| ### Scanner | ||
|
|
||
| To read data, create a `LogScanner` and subscribe to buckets. | ||
|
|
||
| ```python | ||
| import time | ||
|
|
||
| # Create scanner | ||
| scanner = table.new_scan().create_log_scanner() | ||
|
|
||
| # Subscribe to all buckets from the beginning | ||
| num_buckets = table.get_table_info().num_buckets | ||
| for i in range(num_buckets): | ||
| scanner.subscribe_from_beginning(i) | ||
|
|
||
| # Poll for records | ||
| while True: | ||
| scan_records = scanner.poll(1000) # timeout in ms | ||
| for bucket in scan_records.buckets(): | ||
| for record in scan_records.records(bucket): | ||
| row = record.get_row() | ||
| print(f"Received row: {row}") | ||
| time.sleep(0.1) | ||
| ``` | ||
|
|
||
| ### Lookup | ||
|
|
||
| You can perform key-based lookups on Primary Key tables. | ||
|
|
||
| ```python | ||
| # Create lookuper | ||
| lookuper = table.new_lookup().create_lookuper() | ||
|
|
||
| # Lookup by key | ||
| key = ["1"] | ||
| result_row = lookuper.lookup(key) | ||
|
|
||
| if result_row: | ||
| print(f"Found row: {result_row}") | ||
| else: | ||
| print("Row not found") | ||
| ``` | ||
|
|
||
| ## Type Mapping | ||
|
|
||
| The Python client maps Fluss types to Python types as follows: | ||
|
|
||
| | Fluss Type | Python Type | | ||
| |---|---| | ||
| | INT | int | | ||
| | BIGINT | int | | ||
| | STRING | str | | ||
| | BOOLEAN | bool | | ||
| | FLOAT | float | | ||
| | DOUBLE | float | | ||
| | DECIMAL | decimal.Decimal | | ||
| | DATE | datetime.date | | ||
| | TIME | datetime.time | | ||
| | TIMESTAMP | datetime.datetime | | ||
| | TIMESTAMP_LTZ | datetime.datetime | | ||
| | BINARY / BYTES | bytes | | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comment here to adjust according to where Fluss is running