feat(connectors): add universal source and destination using PyAirbyte#969
feat(connectors): add universal source and destination using PyAirbyte#969Aaron ("AJ") Steers (aaronsteers) wants to merge 13 commits intomainfrom
Conversation
This adds a new universal destination connector that can write to any PyAirbyte-supported cache backend (DuckDB, Postgres, Snowflake, BigQuery, MotherDuck). The destination implements the Airbyte CDK Destination interface and uses PyAirbyte's cache system to handle the actual writing. Configuration allows selecting the destination type and passing backend-specific settings. Co-Authored-By: AJ Steers <aj@airbyte.io>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This PyAirbyte VersionYou can test this version of PyAirbyte using the following: # Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1769653021-destination-pyairbyte-universal' pyairbyte --help
# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1769653021-destination-pyairbyte-universal'PR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
📚 Show Repo GuidanceHelpful ResourcesCommunity SupportQuestions? Join the #pyairbyte channel in our Slack workspace. |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds a PyAirbyte "universal" CLI package implementing both a universal destination and source wrappers, CLI entry points and Dockerfile, console script registrations, and a GitHub Actions TK-TODO check workflow. New modules implement cache-backed destination write flow and a PyAirbyte-wrapping source. Changes
Sequence DiagramsequenceDiagram
participant CLI as CLI Entry
participant Dest as DestinationPyAirbyteUniversal
participant Cache as Cache Backend
participant Engine as SQL Engine
CLI->>Dest: check(config)
Dest->>Cache: instantiate backend (DuckDB/Postgres/...)
Dest->>Engine: verify connectivity
alt Connection OK
Engine-->>Dest: success
Dest-->>CLI: ConnectionStatus(success)
else Connection Failed
Engine-->>Dest: error
Dest-->>CLI: ConnectionStatus(failed)
end
CLI->>Dest: write(config, catalog, messages)
loop RECORD messages
Dest->>Dest: buffer record (generate ID, timestamp, JSON payload)
end
alt STATE message received
Dest->>Dest: flush_buffer()
Dest->>Engine: INSERT buffered records into per-stream table
Engine-->>Dest: commit
Dest-->>CLI: emit STATE
end
alt On completion
Dest->>Dest: final flush_buffer()
Dest->>Engine: INSERT remaining records
Dest->>Cache: close connection
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Would you like focused review checkpoints (e.g., SQL injection risks, transactional semantics, and buffer flushing semantics) to speed verification? wdyt? 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…or/ and add CLI entrypoint Co-Authored-By: AJ Steers <aj@airbyte.io>
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@airbyte/cli/universal_connector/destination.py`:
- Around line 76-87: The code currently constructs a SnowflakeCache when
destination_type == DESTINATION_TYPE_SNOWFLAKE without rejecting empty strings
for required fields; add early validation to fail fast by checking required
Snowflake fields (account, username, warehouse, database, role) are non-empty
before constructing SnowflakeCache (or implement the same checks in
SnowflakeConfig/SnowflakeCache initializer/constructor), and raise a clear
ValueError indicating which field is missing; preserve the existing SecretString
wrapping for password and include that validation logic near the SnowflakeCache
creation site or inside SnowflakeCache/SnowflakeConfig (functions/classes to
touch: destination_type check, SnowflakeCache constructor, or SnowflakeConfig
class) so misconfigurations are caught immediately with explicit error messages.
In `@airbyte/cli/universal_connector/Dockerfile`:
- Line 5: The Dockerfile's COPY instruction currently references a nonexistent
poetry.lock; update the COPY line that mentions "poetry.lock" to instead
reference "uv.lock" so it reads as copying pyproject.toml and uv.lock into the
image (ensure uv.lock exists at repo root); this change is in the Dockerfile's
COPY command.
🧹 Nitpick comments (2)
airbyte/cli/universal_connector/destination.py (1)
233-248: Consider batch inserts for better performance, wdyt?The current implementation executes one INSERT per record. For large buffers, this can be significantly slower than using
executemanyor SQLAlchemy's bulk insert capabilities.⚡ Proposed batch insert approach
with engine.connect() as conn: - for i in range(len(entries["_airbyte_ab_id"])): - insert_sql = text(f""" - INSERT INTO {schema_name}.{table_name} - (_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data) - VALUES (:ab_id, :emitted_at, :data) - """) - conn.execute( - insert_sql, - { - "ab_id": entries["_airbyte_ab_id"][i], - "emitted_at": entries["_airbyte_emitted_at"][i], - "data": entries["_airbyte_data"][i], - }, - ) + insert_sql = text(f""" + INSERT INTO {schema_name}.{table_name} + (_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data) + VALUES (:ab_id, :emitted_at, :data) + """) + params = [ + { + "ab_id": entries["_airbyte_ab_id"][i], + "emitted_at": entries["_airbyte_emitted_at"][i], + "data": entries["_airbyte_data"][i], + } + for i in range(len(entries["_airbyte_ab_id"])) + ] + conn.execute(insert_sql, params) conn.commit() # pyrefly: ignore[missing-attribute]airbyte/cli/universal_connector/Dockerfile (1)
1-11: Consider adding aspec.jsonfor connector registration, wdyt?The PR objectives note that
spec.jsonmay be required for connector registration in the airbytehq/airbyte repository. The Dockerfile looks good otherwise—nice use of SHA pinning for reproducibility!Would you like me to help generate a
spec.jsonfile based on the configuration schema used in_get_cache()?
| if destination_type == DESTINATION_TYPE_SNOWFLAKE: | ||
| sf_config = config.get("snowflake", {}) | ||
| password = sf_config.get("password") | ||
| return SnowflakeCache( | ||
| account=sf_config.get("account", ""), | ||
| username=sf_config.get("username", ""), | ||
| password=SecretString(password) if password else None, | ||
| warehouse=sf_config.get("warehouse", ""), | ||
| database=sf_config.get("database", ""), | ||
| schema_name=sf_config.get("schema_name", "PUBLIC"), | ||
| role=sf_config.get("role", ""), | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Find the SnowflakeCache class definition
find . -type f -name "*.py" | xargs rg "class SnowflakeCache" -A 50Repository: airbytehq/PyAirbyte
Length of output: 1430
🏁 Script executed:
# Also check if there's any validation of these parameters
rg "SnowflakeCache" --type py -B 2 -A 10 | head -100Repository: airbytehq/PyAirbyte
Length of output: 7486
🏁 Script executed:
# Search for snowflake cache files
fd "snowflake" --type f --extension py | grep -i cacheRepository: airbytehq/PyAirbyte
Length of output: 90
🏁 Script executed:
# Find SnowflakeConfig definition
rg "class SnowflakeConfig" -A 50 airbyte/caches/Repository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Check if there's validation in SnowflakeConfig or SnowflakeCache
rg -A 5 -B 5 "validator\|root_validator\|field_validator" airbyte/caches/snowflake.pyRepository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Look at the full SnowflakeConfig definition to understand defaults
cat -n airbyte/caches/snowflake.py | head -100Repository: airbytehq/PyAirbyte
Length of output: 3232
🏁 Script executed:
# Find the SnowflakeConfig definition
find . -path "*/_processors/sql/snowflake.py" -type fRepository: airbytehq/PyAirbyte
Length of output: 101
🏁 Script executed:
# Search for SnowflakeConfig class
rg "class SnowflakeConfig" -A 100 --type pyRepository: airbytehq/PyAirbyte
Length of output: 7591
🏁 Script executed:
# Check SqlConfig for any validation
rg "class SqlConfig" -A 30 airbyte/_processors/sql/Repository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Look at the full _get_cache method in destination.py for context
rg -B 10 -A 40 "def _get_cache" airbyte/cli/universal_connector/destination.pyRepository: airbytehq/PyAirbyte
Length of output: 2399
Add early validation to reject empty strings for required Snowflake fields, wdyt?
The constructor accepts all these parameters, but passing empty strings for required fields like account, username, warehouse, database, and role will slip through validation and only fail at runtime during connection. Consider adding validation in SnowflakeConfig to catch these misconfigurations early with clear error messages.
🤖 Prompt for AI Agents
In `@airbyte/cli/universal_connector/destination.py` around lines 76 - 87, The
code currently constructs a SnowflakeCache when destination_type ==
DESTINATION_TYPE_SNOWFLAKE without rejecting empty strings for required fields;
add early validation to fail fast by checking required Snowflake fields
(account, username, warehouse, database, role) are non-empty before constructing
SnowflakeCache (or implement the same checks in SnowflakeConfig/SnowflakeCache
initializer/constructor), and raise a clear ValueError indicating which field is
missing; preserve the existing SecretString wrapping for password and include
that validation logic near the SnowflakeCache creation site or inside
SnowflakeCache/SnowflakeConfig (functions/classes to touch: destination_type
check, SnowflakeCache constructor, or SnowflakeConfig class) so
misconfigurations are caught immediately with explicit error messages.
- Add SourcePyAirbyteUniversal class that wraps any PyAirbyte source - Add CLI entrypoint 'source-pyairbyte' for the universal source - Fix Dockerfile to use uv.lock instead of poetry.lock - Use proper CDK AirbyteEntrypoint.launch() pattern for sources Co-Authored-By: AJ Steers <aj@airbyte.io>
…iping Co-Authored-By: AJ Steers <aj@airbyte.io>
Co-Authored-By: AJ Steers <aj@airbyte.io>
…ination-pyairbyte-universal
Co-Authored-By: AJ Steers <aj@airbyte.io>
Co-Authored-By: AJ Steers <aj@airbyte.io>
Bot Feedback Assessment SummaryI've reviewed all CodeRabbit comments and addressed them as follows: Fixed Issues
Known Limitations (Deferred)
Nitpicks (Not Implemented)
|
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (3)
airbyte/cli/universal_connector/destination.py (2)
221-237:⚠️ Potential issue | 🔴 CriticalCould we quote schema/table identifiers before embedding them in SQL, wdyt?
Lines 221, 232, 237, and 312 interpolate config/catalog-derived identifiers directly; this is injection-prone and also breaks on reserved/special identifiers.
🛡️ Suggested fix pattern
+ def _quote_ident(self, engine: Engine, name: str) -> str: + return engine.dialect.identifier_preparer.quote(name) @@ - with engine.connect() as conn: - conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) + q_schema = self._quote_ident(engine, schema_name) + with engine.connect() as conn: + conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {q_schema}")) @@ - table_name = f"_airbyte_raw_{name}" + raw_table_name = f"_airbyte_raw_{name}" + q_table = self._quote_ident(engine, raw_table_name) @@ - conn.execute(text(f"DROP TABLE IF EXISTS {schema_name}.{table_name}")) + conn.execute(text(f"DROP TABLE IF EXISTS {q_schema}.{q_table}")) @@ - CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} ( + CREATE TABLE IF NOT EXISTS {q_schema}.{q_table} (Also applies to: 311-313
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte/cli/universal_connector/destination.py` around lines 221 - 237, Embed schema and table identifiers safely by quoting them with SQLAlchemy's identifier preparer instead of interpolating raw strings: use engine.dialect.identifier_preparer.quote(schema_name) and quote(table_name) (or construct Table objects via sqlalchemy.schema.Table with schema and metadata) wherever identifiers are used (references: the CREATE SCHEMA call using conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")), the DROP TABLE call using conn.execute(text(f"DROP TABLE IF EXISTS {schema_name}.{table_name}")), and the CREATE TABLE IF NOT EXISTS construction that builds {schema_name}.{table_name} as well as the similar occurrences around lines 311-313); replace direct f-string embedding with the quoted identifiers so reserved names and injection are handled safely.
273-275:⚠️ Potential issue | 🟡 MinorCould we preserve source
record.emitted_atfor_airbyte_emitted_at, wdyt?Using destination wall-clock time loses original source emission timing, which can skew lineage/debugging.
🕒 Suggested timestamp handling
- buffer[stream_name]["_airbyte_emitted_at"].append( - datetime.datetime.now(datetime.timezone.utc).isoformat() - ) + emitted_at = ( + datetime.datetime.fromtimestamp( + record.emitted_at / 1000, tz=datetime.timezone.utc + ).isoformat() + if record.emitted_at is not None + else datetime.datetime.now(datetime.timezone.utc).isoformat() + ) + buffer[stream_name]["_airbyte_emitted_at"].append(emitted_at)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte/cli/universal_connector/destination.py` around lines 273 - 275, The current code always appends the destination wall-clock time to buffer[stream_name]["_airbyte_emitted_at"], losing the source's original emission time; change the logic in the block that writes to buffer (referencing buffer, stream_name and the "_airbyte_emitted_at" list) to first check the incoming record's emitted timestamp (e.g., record.get("emitted_at") or record.emitted_at) and, if present and non-empty, append that value (normalizing/formatting to an ISO UTC string if needed); only if the source emitted_at is missing or invalid should you fall back to datetime.datetime.now(datetime.timezone.utc).isoformat().airbyte/cli/universal_connector/source.py (1)
151-166:⚠️ Potential issue | 🟠 MajorCould we force full-refresh (or fail fast on incoming
state) until STATE conversion is implemented, wdyt?Line 165 currently uses default sync-mode selection, which is incremental-preferred, while
stateis ignored and STATE messages are dropped; that combination can produce unsafe checkpoint semantics.🧭 Safer interim behavior
def read( self, logger: logging.Logger, # noqa: ARG002 config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: list[Any] | None = None, # noqa: ARG002 ) -> Iterable[AirbyteMessage]: @@ source = self._get_pyairbyte_source(config) + if state: + raise ValueError( + "Incremental sync/state is not yet supported by source-pyairbyte." + ) @@ - configured_catalog = source.get_configured_catalog(streams=stream_names) + configured_catalog = source.get_configured_catalog( + streams=stream_names, + force_full_refresh=True, + )Based on learnings: In PyAirbyte reviews, TODOs should receive a friendly reminder to resolve before merging.
Also applies to: 179-197
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte/cli/universal_connector/source.py` around lines 151 - 166, In read(), guard against ignored incoming state by failing fast: if the state parameter is not None/empty, raise a clear exception (e.g., RuntimeError or ValueError) stating that STATE conversion is not implemented instead of proceeding with default incremental behavior; locate this check in the read() method (inspect the state parameter, the catalog variable, and the calls to source.get_configured_catalog and source.select_streams) and add the early validation so we never continue with unsafe checkpoint semantics (also apply the same guard to the other read-path around the configured_catalog handling that mirrors lines 179-197).
🧹 Nitpick comments (4)
.github/workflows/tk-todo-check.yml (1)
32-32: Consider switching togit grepfor simpler and more robust path handling?The current
git ls-files | xargs grepapproach works fine today, butgit grepis more idiomatic and handles edge cases better (e.g., binary files with the-Iflag). It also eliminates a pipe and makes the intent clearer.Suggested update
- FOUND_TODOS=$(git ls-files | xargs grep -i -n "TK-TODO" 2>/dev/null | grep -i -v "IGNORE:TK" || true) # IGNORE:TK + FOUND_TODOS=$(git grep -nI -i "TK-TODO" -- | grep -i -v "IGNORE:TK" || true) # IGNORE:TK🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In @.github/workflows/tk-todo-check.yml at line 32, Replace the pipeline that uses git ls-files | xargs grep with a single git grep invocation: update the FOUND_TODOS assignment to call git grep -n -I "TK-TODO" (to skip binary files) and then filter out IGNORE:TK (e.g., pipe to grep -v "IGNORE:TK") with a fallback || true so the variable is empty instead of failing; locate the existing FOUND_TODOS line and swap the command accordingly (reference: FOUND_TODOS assignment and the string "TK-TODO").airbyte/cli/universal_connector/Dockerfile (1)
8-11: Could we explicitly set a non-root runtime user in this Dockerfile, wdyt?Relying on the base image’s default user is implicit; adding an explicit
USERhelps prevent accidental root execution if upstream defaults change.🔒 Suggested hardening
RUN pip install --no-cache-dir . +USER airbyte ENV AIRBYTE_ENTRYPOINT="destination-pyairbyte" ENTRYPOINT ["destination-pyairbyte"]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte/cli/universal_connector/Dockerfile` around lines 8 - 11, The Dockerfile currently installs the package and sets AIRBYTE_ENTRYPOINT/ENTRYPOINT but does not declare a non-root user; add steps to create a dedicated unprivileged user and group (e.g., appuser/appgroup), chown the installed files and any runtime directories to that user, and then set USER to that non-root account so the container runs without root privileges; update around the pip install and ENTRYPOINT sections referenced by AIRBYTE_ENTRYPOINT and ENTRYPOINT to ensure ownership and runtime user are applied.airbyte/cli/universal_connector/source.py (1)
179-182: Could we isolate private_read_with_catalogusage behind a small adapter, wdyt?Direct dependency on a private method (
SLF001) is fragile across internal refactors; a wrapper/fallback path would reduce breakage risk.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte/cli/universal_connector/source.py` around lines 179 - 182, The code directly calls the private method source._read_with_catalog which is fragile; wrap this in a small adapter on the Source object (e.g., add a public method read_with_catalog on the source wrapper or utility) that first tries calling source.read_with_catalog (preferred) and falls back to source._read_with_catalog if needed, then update the loop to iterate over source.read_with_catalog(...). This isolates the private call, centralizes the fallback logic, and makes future refactors safe while referencing the existing symbols source and _read_with_catalog.airbyte/cli/universal_connector/destination.py (1)
309-323: Could we batch insert instead of executing one row at a time, wdyt?The per-row execute loop creates many round trips; passing a list of parameter dicts to one statement is much more efficient.
⚡ Example refactor
with engine.connect() as conn: - for i in range(len(entries["_airbyte_ab_id"])): - insert_sql = text(f""" - INSERT INTO {schema_name}.{table_name} - (_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data) - VALUES (:ab_id, :emitted_at, :data) - """) - conn.execute( - insert_sql, - { - "ab_id": entries["_airbyte_ab_id"][i], - "emitted_at": entries["_airbyte_emitted_at"][i], - "data": entries["_airbyte_data"][i], - }, - ) + insert_sql = text(f""" + INSERT INTO {schema_name}.{table_name} + (_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data) + VALUES (:ab_id, :emitted_at, :data) + """) + payload = [ + { + "ab_id": entries["_airbyte_ab_id"][i], + "emitted_at": entries["_airbyte_emitted_at"][i], + "data": entries["_airbyte_data"][i], + } + for i in range(len(entries["_airbyte_ab_id"])) + ] + conn.execute(insert_sql, payload) conn.commit() # pyrefly: ignore[missing-attribute]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte/cli/universal_connector/destination.py` around lines 309 - 323, The loop performs one execute per row causing many round trips; instead build a list of parameter dicts from entries (use the same keys: "ab_id", "emitted_at", "data") and call conn.execute once with insert_sql and that list so SQLAlchemy will perform a batched/ executemany insert; update the block that creates insert_sql and the conn.execute call (referencing engine.connect, insert_sql, conn.execute, and entries/_airbyte_ab_id/_airbyte_emitted_at/_airbyte_data) to pass a single list of parameter mappings rather than executing inside the for loop.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@airbyte/cli/universal_connector/destination.py`:
- Around line 246-283: The buffer can grow unbounded because you only flush on
STATE/end; add a size-based flush threshold (e.g., self.buffer_max_records) and
in the input_messages loop inside the Type.RECORD branch, after appending to
buffer[stream_name], check if len(buffer[stream_name]["_airbyte_ab_id"]) >=
self.buffer_max_records and if so call self._flush_buffer(engine=engine,
buffer=buffer, schema_name=schema_name, stream_name=stream_name) and then
clear/reset buffer[stream_name] (or reassign to a fresh defaultdict list) to
avoid OOM; add a configurable attribute (e.g., buffer_max_records with a
sensible default) on the class or in the constructor so the threshold is tunable
and reference buffer, Type.RECORD, and _flush_buffer when applying the change.
In `@airbyte/cli/universal_connector/source.py`:
- Around line 106-115: The current except blocks in the check() flow return raw
exception text via AirbyteConnectionStatus (using str(e) and {e!r}); change
these to return a static, non-sensitive failure message (e.g., "Connection check
failed") while logging the full exception details to a logger if needed; update
the except ValueError and generic except Exception in source.py so both return
AirbyteConnectionStatus(status=Status.FAILED, message="Connection check failed")
and ensure any existing logger (or add one) records the exception for debugging
(reference: function check(), AirbyteConnectionStatus, Status.FAILED).
---
Duplicate comments:
In `@airbyte/cli/universal_connector/destination.py`:
- Around line 221-237: Embed schema and table identifiers safely by quoting them
with SQLAlchemy's identifier preparer instead of interpolating raw strings: use
engine.dialect.identifier_preparer.quote(schema_name) and quote(table_name) (or
construct Table objects via sqlalchemy.schema.Table with schema and metadata)
wherever identifiers are used (references: the CREATE SCHEMA call using
conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")), the DROP TABLE
call using conn.execute(text(f"DROP TABLE IF EXISTS
{schema_name}.{table_name}")), and the CREATE TABLE IF NOT EXISTS construction
that builds {schema_name}.{table_name} as well as the similar occurrences around
lines 311-313); replace direct f-string embedding with the quoted identifiers so
reserved names and injection are handled safely.
- Around line 273-275: The current code always appends the destination
wall-clock time to buffer[stream_name]["_airbyte_emitted_at"], losing the
source's original emission time; change the logic in the block that writes to
buffer (referencing buffer, stream_name and the "_airbyte_emitted_at" list) to
first check the incoming record's emitted timestamp (e.g.,
record.get("emitted_at") or record.emitted_at) and, if present and non-empty,
append that value (normalizing/formatting to an ISO UTC string if needed); only
if the source emitted_at is missing or invalid should you fall back to
datetime.datetime.now(datetime.timezone.utc).isoformat().
In `@airbyte/cli/universal_connector/source.py`:
- Around line 151-166: In read(), guard against ignored incoming state by
failing fast: if the state parameter is not None/empty, raise a clear exception
(e.g., RuntimeError or ValueError) stating that STATE conversion is not
implemented instead of proceeding with default incremental behavior; locate this
check in the read() method (inspect the state parameter, the catalog variable,
and the calls to source.get_configured_catalog and source.select_streams) and
add the early validation so we never continue with unsafe checkpoint semantics
(also apply the same guard to the other read-path around the configured_catalog
handling that mirrors lines 179-197).
---
Nitpick comments:
In @.github/workflows/tk-todo-check.yml:
- Line 32: Replace the pipeline that uses git ls-files | xargs grep with a
single git grep invocation: update the FOUND_TODOS assignment to call git grep
-n -I "TK-TODO" (to skip binary files) and then filter out IGNORE:TK (e.g., pipe
to grep -v "IGNORE:TK") with a fallback || true so the variable is empty instead
of failing; locate the existing FOUND_TODOS line and swap the command
accordingly (reference: FOUND_TODOS assignment and the string "TK-TODO").
In `@airbyte/cli/universal_connector/destination.py`:
- Around line 309-323: The loop performs one execute per row causing many round
trips; instead build a list of parameter dicts from entries (use the same keys:
"ab_id", "emitted_at", "data") and call conn.execute once with insert_sql and
that list so SQLAlchemy will perform a batched/ executemany insert; update the
block that creates insert_sql and the conn.execute call (referencing
engine.connect, insert_sql, conn.execute, and
entries/_airbyte_ab_id/_airbyte_emitted_at/_airbyte_data) to pass a single list
of parameter mappings rather than executing inside the for loop.
In `@airbyte/cli/universal_connector/Dockerfile`:
- Around line 8-11: The Dockerfile currently installs the package and sets
AIRBYTE_ENTRYPOINT/ENTRYPOINT but does not declare a non-root user; add steps to
create a dedicated unprivileged user and group (e.g., appuser/appgroup), chown
the installed files and any runtime directories to that user, and then set USER
to that non-root account so the container runs without root privileges; update
around the pip install and ENTRYPOINT sections referenced by AIRBYTE_ENTRYPOINT
and ENTRYPOINT to ensure ownership and runtime user are applied.
In `@airbyte/cli/universal_connector/source.py`:
- Around line 179-182: The code directly calls the private method
source._read_with_catalog which is fragile; wrap this in a small adapter on the
Source object (e.g., add a public method read_with_catalog on the source wrapper
or utility) that first tries calling source.read_with_catalog (preferred) and
falls back to source._read_with_catalog if needed, then update the loop to
iterate over source.read_with_catalog(...). This isolates the private call,
centralizes the fallback logic, and makes future refactors safe while
referencing the existing symbols source and _read_with_catalog.
ℹ️ Review info
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
.github/workflows/tk-todo-check.ymlairbyte/cli/universal_connector/Dockerfileairbyte/cli/universal_connector/__init__.pyairbyte/cli/universal_connector/destination.pyairbyte/cli/universal_connector/run_source.pyairbyte/cli/universal_connector/source.pypyproject.toml
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte/cli/universal_connector/init.py
| # Buffer for records | ||
| buffer: dict[str, dict[str, list[Any]]] = defaultdict(lambda: defaultdict(list)) | ||
|
|
||
| for message in input_messages: | ||
| if message.type == Type.STATE: | ||
| # Flush the buffer before yielding state | ||
| for stream_name in list(buffer.keys()): | ||
| self._flush_buffer( | ||
| engine=engine, | ||
| buffer=buffer, | ||
| schema_name=schema_name, | ||
| stream_name=stream_name, | ||
| ) | ||
| buffer = defaultdict(lambda: defaultdict(list)) | ||
| yield message | ||
|
|
||
| elif message.type == Type.RECORD: | ||
| record = message.record | ||
| if record is None: | ||
| continue | ||
| stream_name = record.stream | ||
| if stream_name not in streams: | ||
| logger.debug(f"Stream {stream_name} not in configured streams, skipping") | ||
| continue | ||
|
|
||
| # Add to buffer | ||
| buffer[stream_name]["_airbyte_ab_id"].append(str(uuid.uuid4())) | ||
| buffer[stream_name]["_airbyte_emitted_at"].append( | ||
| datetime.datetime.now(datetime.timezone.utc).isoformat() | ||
| ) | ||
| buffer[stream_name]["_airbyte_data"].append(json.dumps(record.data)) | ||
|
|
||
| else: | ||
| logger.debug(f"Message type {message.type} not handled, skipping") | ||
|
|
||
| # Flush any remaining records | ||
| for stream_name in list(buffer.keys()): | ||
| self._flush_buffer( |
There was a problem hiding this comment.
Could we add a size-based buffer flush threshold, wdyt?
Buffering only on STATE/end can grow unbounded for sources that emit STATE infrequently, which risks OOM on larger syncs.
📦 Example safeguard
- buffer: dict[str, dict[str, list[Any]]] = defaultdict(lambda: defaultdict(list))
+ buffer: dict[str, dict[str, list[Any]]] = defaultdict(lambda: defaultdict(list))
+ max_buffer_records = 10_000
+ buffered_records = 0
@@
buffer[stream_name]["_airbyte_data"].append(json.dumps(record.data))
+ buffered_records += 1
+
+ if buffered_records >= max_buffer_records:
+ for pending_stream in list(buffer.keys()):
+ self._flush_buffer(
+ engine=engine,
+ buffer=buffer,
+ schema_name=schema_name,
+ stream_name=pending_stream,
+ )
+ buffer = defaultdict(lambda: defaultdict(list))
+ buffered_records = 0🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@airbyte/cli/universal_connector/destination.py` around lines 246 - 283, The
buffer can grow unbounded because you only flush on STATE/end; add a size-based
flush threshold (e.g., self.buffer_max_records) and in the input_messages loop
inside the Type.RECORD branch, after appending to buffer[stream_name], check if
len(buffer[stream_name]["_airbyte_ab_id"]) >= self.buffer_max_records and if so
call self._flush_buffer(engine=engine, buffer=buffer, schema_name=schema_name,
stream_name=stream_name) and then clear/reset buffer[stream_name] (or reassign
to a fresh defaultdict list) to avoid OOM; add a configurable attribute (e.g.,
buffer_max_records with a sensible default) on the class or in the constructor
so the threshold is tunable and reference buffer, Type.RECORD, and _flush_buffer
when applying the change.
| except ValueError as e: | ||
| return AirbyteConnectionStatus( | ||
| status=Status.FAILED, | ||
| message=str(e), | ||
| ) | ||
| except Exception as e: | ||
| return AirbyteConnectionStatus( | ||
| status=Status.FAILED, | ||
| message=f"Connection check failed: {e!r}", | ||
| ) |
There was a problem hiding this comment.
Could we avoid returning raw exception text from check(), wdyt?
str(e) / {e!r} here can expose connector config details (including secrets) in status messages; a static failure message is safer.
🔐 Proposed adjustment
- except ValueError as e:
+ except ValueError:
return AirbyteConnectionStatus(
status=Status.FAILED,
- message=str(e),
+ message="Invalid source configuration.",
)
- except Exception as e:
+ except Exception as e:
+ logger.warning("Universal source check failed (%s)", type(e).__name__)
return AirbyteConnectionStatus(
status=Status.FAILED,
- message=f"Connection check failed: {e!r}",
+ message="Connection check failed.",
)Based on learnings: In PyAirbyte, error messages should decouple static message text from dynamic values to reduce PII exposure in telemetry.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@airbyte/cli/universal_connector/source.py` around lines 106 - 115, The
current except blocks in the check() flow return raw exception text via
AirbyteConnectionStatus (using str(e) and {e!r}); change these to return a
static, non-sensitive failure message (e.g., "Connection check failed") while
logging the full exception details to a logger if needed; update the except
ValueError and generic except Exception in source.py so both return
AirbyteConnectionStatus(status=Status.FAILED, message="Connection check failed")
and ensure any existing logger (or add one) records the exception for debugging
(reference: function check(), AirbyteConnectionStatus, Status.FAILED).
There was a problem hiding this comment.
♻️ Duplicate comments (2)
airbyte/cli/universal_connector/destination.py (2)
221-233:⚠️ Potential issue | 🔴 CriticalQuote SQL identifiers before composing statements, wdyt?
Line 221, Line 232, Line 237, and Line 319 still interpolate raw identifier strings. Could we quote
schema_nameandtable_namevia the dialect preparer to avoid SQL injection and broken identifiers across backends?🔧 Proposed fix
class DestinationPyAirbyteUniversal(Destination): """Universal destination that writes to any PyAirbyte-supported cache backend.""" + + `@staticmethod` + def _quote_ident(engine: Engine, ident: str) -> str: + return engine.dialect.identifier_preparer.quote(ident) @@ engine = cache.get_sql_engine() + quoted_schema_name = self._quote_ident(engine, schema_name) with engine.connect() as conn: - conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) + conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {quoted_schema_name}")) conn.commit() # pyrefly: ignore[missing-attribute] @@ name = configured_stream.stream.name table_name = f"_airbyte_raw_{name}" + quoted_table_name = self._quote_ident(engine, table_name) @@ - conn.execute(text(f"DROP TABLE IF EXISTS {schema_name}.{table_name}")) + conn.execute( + text( + f"DROP TABLE IF EXISTS {quoted_schema_name}.{quoted_table_name}" + ) + ) @@ - CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} ( + CREATE TABLE IF NOT EXISTS {quoted_schema_name}.{quoted_table_name} ( @@ - table_name = f"_airbyte_raw_{stream_name}" + table_name = f"_airbyte_raw_{stream_name}" + quoted_schema_name = self._quote_ident(engine, schema_name) + quoted_table_name = self._quote_ident(engine, table_name) @@ - INSERT INTO {schema_name}.{table_name} + INSERT INTO {quoted_schema_name}.{quoted_table_name}#!/bin/bash # Verify raw identifier interpolation still exists in SQL text() calls. rg -nP 'text\(f".*\{(schema_name|table_name)\}' airbyte/cli/universal_connector/destination.py -C2 # Verify whether identifier quoting is implemented. rg -n 'identifier_preparer\.quote|def _quote_ident' airbyte/cli/universal_connector/destination.py -C2Also applies to: 236-243, 318-320
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte/cli/universal_connector/destination.py` around lines 221 - 233, The SQL statements are interpolating raw schema and table identifiers (e.g., schema_name and table_name) into text() calls in the create/drop table blocks inside the loop over configured_catalog.streams, which risks SQL injection and broken identifiers across backends; fix by quoting identifiers using the engine's dialect preparer (engine.dialect.identifier_preparer.quote) or by building statements with SQLAlchemy Table/DDL APIs instead of f-strings, e.g., quote schema_name and table_name before passing into text() in the create schema and DROP TABLE/CREATE TABLE sections (references: configured_catalog.streams loop, variables schema_name and table_name, and the with engine.connect() blocks).
246-260:⚠️ Potential issue | 🟠 MajorCould we add a size-based flush guard for the record buffer, wdyt?
Line 246-Line 295 flushes only on STATE/end. For long-running streams with sparse STATE messages, memory can grow unbounded. Would you consider a configurable
buffer_max_recordsthreshold and intermediate flushes?📦 Proposed fix
- buffer: dict[str, dict[str, list[Any]]] = defaultdict(lambda: defaultdict(list)) + buffer: dict[str, dict[str, list[Any]]] = defaultdict(lambda: defaultdict(list)) + buffer_max_records = int(config.get("buffer_max_records", 10_000)) + buffered_records = 0 @@ buffer[stream_name]["_airbyte_emitted_at"].append(emitted_at.isoformat()) buffer[stream_name]["_airbyte_data"].append(json.dumps(record.data)) + buffered_records += 1 + + if buffered_records >= buffer_max_records: + for pending_stream in list(buffer.keys()): + self._flush_buffer( + engine=engine, + buffer=buffer, + schema_name=schema_name, + stream_name=pending_stream, + ) + buffer = defaultdict(lambda: defaultdict(list)) + buffered_records = 0Also applies to: 262-295
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte/cli/universal_connector/destination.py` around lines 246 - 260, The record buffer currently only flushes on STATE messages causing unbounded memory growth; add a configurable buffer_max_records (e.g., passed into constructor or method options) and enforce a size-based flush inside the input_messages loop: after appending to buffer, compute the total buffered record count (sum of lengths in buffer dict) and if it exceeds buffer_max_records, call self._flush_buffer for each stream_name in buffer (same logic used on STATE flush) and reset buffer to a fresh defaultdict; ensure buffer_max_records default is sane (e.g., None or a positive int) and skip size checks when unset.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@airbyte/cli/universal_connector/destination.py`:
- Around line 221-233: The SQL statements are interpolating raw schema and table
identifiers (e.g., schema_name and table_name) into text() calls in the
create/drop table blocks inside the loop over configured_catalog.streams, which
risks SQL injection and broken identifiers across backends; fix by quoting
identifiers using the engine's dialect preparer
(engine.dialect.identifier_preparer.quote) or by building statements with
SQLAlchemy Table/DDL APIs instead of f-strings, e.g., quote schema_name and
table_name before passing into text() in the create schema and DROP TABLE/CREATE
TABLE sections (references: configured_catalog.streams loop, variables
schema_name and table_name, and the with engine.connect() blocks).
- Around line 246-260: The record buffer currently only flushes on STATE
messages causing unbounded memory growth; add a configurable buffer_max_records
(e.g., passed into constructor or method options) and enforce a size-based flush
inside the input_messages loop: after appending to buffer, compute the total
buffered record count (sum of lengths in buffer dict) and if it exceeds
buffer_max_records, call self._flush_buffer for each stream_name in buffer (same
logic used on STATE flush) and reset buffer to a fresh defaultdict; ensure
buffer_max_records default is sane (e.g., None or a positive int) and skip size
checks when unset.
…-smoke-test Co-Authored-By: AJ Steers <aj@airbyte.io>
Co-Authored-By: AJ Steers <aj@airbyte.io>
…byte Co-Authored-By: AJ Steers <aj@airbyte.io>
…sting (#982) Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Bot Feedback Review SummaryI've reviewed the latest CodeRabbit feedback (Comments 20-22). Here's my assessment: Comment 20: Size-based buffer flush threshold (destination.py)Suggestion: Add a Assessment: Valid optimization for production use. However, for this initial implementation targeting smoke testing, the risk is low since test scenarios are small. This is a good follow-up enhancement. Decision: Defer to human reviewer. Can be added as a follow-up if needed. Comment 21: Avoid raw exception text in check() (source.py)Suggestion: Return static failure messages instead of Assessment: Valid security concern. However, for debugging purposes during initial development, having the actual error message is valuable. The underlying PyAirbyte source's Decision: Defer to human reviewer. The current behavior aids debugging but could be tightened for production. Review 22: Duplicate comments (already addressed)
Nitpicks (low priority)
All major concerns are either already addressed or documented in the PR description for human review. |
| create_sql = f""" | ||
| CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} ( | ||
| _airbyte_ab_id VARCHAR(36) PRIMARY KEY, | ||
| _airbyte_emitted_at TIMESTAMP, | ||
| _airbyte_data JSON | ||
| ) | ||
| """ | ||
| conn.execute(text(create_sql)) | ||
| conn.commit() # pyrefly: ignore[missing-attribute] |
There was a problem hiding this comment.
🔴 Unquoted SQL identifiers cause syntax errors for stream names with special characters
Stream names and schema names are interpolated directly into raw SQL strings without any identifier quoting. When a stream name contains characters that are not valid in unquoted SQL identifiers (e.g., dashes, dots, spaces, or SQL keywords), the generated SQL will fail with a syntax error or, worse, enable SQL injection.
Root Cause and Impact
The write method constructs SQL statements by f-string interpolation of schema_name and table_name (which is f"_airbyte_raw_{stream_name}") at multiple locations:
destination.py:221:CREATE SCHEMA IF NOT EXISTS {schema_name}destination.py:232:DROP TABLE IF EXISTS {schema_name}.{table_name}destination.py:237:CREATE TABLE IF NOT EXISTS {schema_name}.{table_name}destination.py:319:INSERT INTO {schema_name}.{table_name}
For example, a stream named my-stream produces _airbyte_raw_my-stream, and the SQL CREATE TABLE IF NOT EXISTS main._airbyte_raw_my-stream (...) will be a syntax error because - is interpreted as the minus operator.
Similarly, a stream name containing '; DROP TABLE users; -- could lead to SQL injection.
Notably, the companion smoke test source defines scenarios like "CamelCaseStreamName" which would work (unquoted identifiers are case-insensitive but still syntactically valid), but any source with stream names containing dashes, dots, or spaces would break this destination entirely.
Impact: The destination will crash with SQL errors for any stream whose name contains characters not valid in unquoted SQL identifiers. This includes common stream naming patterns like my-table, schema.table, or table name.
Prompt for agents
In airbyte/cli/universal_connector/destination.py, all SQL identifiers (schema_name and table_name) must be properly quoted before interpolation into SQL strings. This affects lines 221, 232, 237-243, and 318-322.
One approach: add a helper method to quote identifiers, e.g.:
def _quote_identifier(self, name: str) -> str:
escaped = name.replace('"', '""')
return f'"{ escaped}"'
Then use it everywhere identifiers are interpolated:
- Line 221: text(f"CREATE SCHEMA IF NOT EXISTS {self._quote_identifier(schema_name)}")
- Line 232: text(f"DROP TABLE IF EXISTS {self._quote_identifier(schema_name)}.{self._quote_identifier(table_name)}")
- Lines 237-243: Same pattern for CREATE TABLE
- Lines 318-322: Same pattern for INSERT INTO in _flush_buffer method
This needs to be applied consistently across all 4 SQL construction sites in the file.
Was this helpful? React with 👍 or 👎 to provide feedback.
Co-Authored-By: AJ Steers <aj@airbyte.io>
Summary
Adds universal source and destination connectors that leverage PyAirbyte as a backend. These connectors are named
destination-pyairbyteandsource-pyairbyteinternally, but will be published to the Airbyte connector registry asdestination-smoke-testandsource-smoke-test(via a companion PR in airbytehq/airbyte).Universal Destination (
destination-pyairbyte): Writes to any PyAirbyte-supported cache backend (DuckDB, Postgres, Snowflake, BigQuery, MotherDuck). Implements the Airbyte CDKDestinationinterface withspec(),check(),write(), and support foroverwrite/appendsync modes.Universal Source (
source-pyairbyte): Wraps any PyAirbyte-supported source connector, acting as a proxy that delegates to the underlying source based on configuration. Implements the CDKSourceinterface withspec(),check(),discover(), andread()methods.Smoke Test Source (
source-smoke-test): A dedicated test source with predefined scenarios for destination regression testing. Includes scenarios for basic types, timestamps, large decimals, nested JSON, null handling, column naming edge cases, and more.All connectors are registered as CLI entrypoints and include a Dockerfile for containerization.
Updates since last revision
source-smoke-testconnector from PR feat(connectors): add smoke test source for destination regression testing #982 (predefined scenarios for destination regression testing)destination-pyairbyte/source-pyairbyte(internal PyAirbyte naming)destination-smoke-test/source-smoke-testnames (handled in companion PR)Review & Testing Checklist for Human
stateparameter inread()is ignored and STATE messages are skipped. Incremental syncs will not work - only full refresh is supported. This is tracked with a TK-TODO marker._read_with_catalog()method which may change without notice.check()method returns raw exception text which could potentially expose config details. Consider if this needs sanitization.Recommended test plan:
_airbyte_raw_*tablessource-faker), run discover and read operationssource-smoke-testto verify predefined scenarios emit expected recordssource-pyairbyte read --config source.json --catalog catalog.json | destination-pyairbyte write --config dest.json --catalog catalog.jsonNotes
This is part 1 of 2 PRs - a companion PR will register these connectors in the airbytehq/airbyte repo as
destination-smoke-test.Requested by: Aaron ("AJ") Steers (@aaronsteers) (AJ)
Devin session: https://app.devin.ai/sessions/46cd19c4793a4e1ea32bb660de9c1fe0