Skip to content

feat(connectors): add universal source and destination using PyAirbyte#969

Open
Aaron ("AJ") Steers (aaronsteers) wants to merge 13 commits intomainfrom
devin/1769653021-destination-pyairbyte-universal
Open

feat(connectors): add universal source and destination using PyAirbyte#969
Aaron ("AJ") Steers (aaronsteers) wants to merge 13 commits intomainfrom
devin/1769653021-destination-pyairbyte-universal

Conversation

@aaronsteers
Copy link
Contributor

@aaronsteers Aaron ("AJ") Steers (aaronsteers) commented Jan 29, 2026

Summary

Adds universal source and destination connectors that leverage PyAirbyte as a backend. These connectors are named destination-pyairbyte and source-pyairbyte internally, but will be published to the Airbyte connector registry as destination-smoke-test and source-smoke-test (via a companion PR in airbytehq/airbyte).

Universal Destination (destination-pyairbyte): Writes to any PyAirbyte-supported cache backend (DuckDB, Postgres, Snowflake, BigQuery, MotherDuck). Implements the Airbyte CDK Destination interface with spec(), check(), write(), and support for overwrite/append sync modes.

Universal Source (source-pyairbyte): Wraps any PyAirbyte-supported source connector, acting as a proxy that delegates to the underlying source based on configuration. Implements the CDK Source interface with spec(), check(), discover(), and read() methods.

Smoke Test Source (source-smoke-test): A dedicated test source with predefined scenarios for destination regression testing. Includes scenarios for basic types, timestamps, large decimals, nested JSON, null handling, column naming edge cases, and more.

All connectors are registered as CLI entrypoints and include a Dockerfile for containerization.

Updates since last revision

  • Merged source-smoke-test connector from PR feat(connectors): add smoke test source for destination regression testing #982 (predefined scenarios for destination regression testing)
  • Reverted CLI entrypoints back to destination-pyairbyte/source-pyairbyte (internal PyAirbyte naming)
  • The airbyte connector registry will use destination-smoke-test/source-smoke-test names (handled in companion PR)
  • Added TK-TODO comment for STATE message handling (tracks tech debt for incremental sync support)
  • Imported TK-TODO CI check workflow from airbyte-ops-mcp repo

Review & Testing Checklist for Human

  • State handling limitation (source): The state parameter in read() is ignored and STATE messages are skipped. Incremental syncs will not work - only full refresh is supported. This is tracked with a TK-TODO marker.
  • SQL injection risk (destination): Schema and table names are interpolated into SQL strings without identifier quoting. Verify this is acceptable given these values come from configuration/catalog.
  • Private API usage (source): Uses PyAirbyte's internal _read_with_catalog() method which may change without notice.
  • Exception messages in check(): The source check() method returns raw exception text which could potentially expose config details. Consider if this needs sanitization.
  • No automated tests included: Manual testing was performed but unit/integration tests should be added.

Recommended test plan:

  1. Build the Docker image locally
  2. Destination: Run a test sync with DuckDB backend, verify records in _airbyte_raw_* tables
  3. Source: Configure with a simple source (e.g., source-faker), run discover and read operations
  4. Smoke Test Source: Run source-smoke-test to verify predefined scenarios emit expected records
  5. Pipe test: source-pyairbyte read --config source.json --catalog catalog.json | destination-pyairbyte write --config dest.json --catalog catalog.json

Notes

This is part 1 of 2 PRs - a companion PR will register these connectors in the airbytehq/airbyte repo as destination-smoke-test.

Requested by: Aaron ("AJ") Steers (@aaronsteers) (AJ)
Devin session: https://app.devin.ai/sessions/46cd19c4793a4e1ea32bb660de9c1fe0

This adds a new universal destination connector that can write to any
PyAirbyte-supported cache backend (DuckDB, Postgres, Snowflake, BigQuery,
MotherDuck).

The destination implements the Airbyte CDK Destination interface and uses
PyAirbyte's cache system to handle the actual writing. Configuration allows
selecting the destination type and passing backend-specific settings.

Co-Authored-By: AJ Steers <aj@airbyte.io>
@devin-ai-integration
Copy link
Contributor

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This PyAirbyte Version

You can test this version of PyAirbyte using the following:

# Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1769653021-destination-pyairbyte-universal' pyairbyte --help

# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1769653021-destination-pyairbyte-universal'

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /fix-pr - Fixes most formatting and linting issues
  • /uv-lock - Updates uv.lock file
  • /test-pr - Runs tests with the updated PyAirbyte
  • /prerelease - Builds and publishes a prerelease version to PyPI
📚 Show Repo Guidance

Helpful Resources

Community Support

Questions? Join the #pyairbyte channel in our Slack workspace.

📝 Edit this welcome message.

@github-actions
Copy link

github-actions bot commented Jan 29, 2026

PyTest Results (Fast Tests Only, No Creds)

343 tests  ±0   343 ✅ ±0   5m 46s ⏱️ +7s
  1 suites ±0     0 💤 ±0 
  1 files   ±0     0 ❌ ±0 

Results for commit 271673c. ± Comparison against base commit fa52519.

♻️ This comment has been updated with latest results.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 29, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds a PyAirbyte "universal" CLI package implementing both a universal destination and source wrappers, CLI entry points and Dockerfile, console script registrations, and a GitHub Actions TK-TODO check workflow. New modules implement cache-backed destination write flow and a PyAirbyte-wrapping source.

Changes

Cohort / File(s) Summary
Package & CLI registration
airbyte/cli/__init__.py, pyproject.toml
Add CLI package initializer and register console scripts destination-pyairbyte and source-pyairbyte.
Universal Destination package
airbyte/cli/universal_connector/__init__.py, airbyte/cli/universal_connector/destination.py, airbyte/cli/universal_connector/run.py, airbyte/cli/universal_connector/Dockerfile
New package and entry: DestinationPyAirbyteUniversal implemented. Selects cache backend (DuckDB/Postgres/Snowflake/BigQuery/MotherDuck), validates SQL engine connectivity, creates per-stream raw tables, buffers RECORD messages (IDs, timestamps, JSON), flushes on STATE and at end, and exposes CLI run entry; Dockerfile for the destination image added.
Universal Source package
airbyte/cli/universal_connector/source.py, airbyte/cli/universal_connector/run_source.py
New SourcePyAirbyteUniversal wraps PyAirbyte sources: spec, instantiate underlying source, check/discover/read flows, catalog translation, and CLI run entry for source.
CI check
.github/workflows/tk-todo-check.yml
Add GitHub Actions workflow that fails on unresolved TK-TODO markers (supports in-line IGNORE:TK suppression).

Sequence Diagram

sequenceDiagram
    participant CLI as CLI Entry
    participant Dest as DestinationPyAirbyteUniversal
    participant Cache as Cache Backend
    participant Engine as SQL Engine

    CLI->>Dest: check(config)
    Dest->>Cache: instantiate backend (DuckDB/Postgres/...)
    Dest->>Engine: verify connectivity
    alt Connection OK
        Engine-->>Dest: success
        Dest-->>CLI: ConnectionStatus(success)
    else Connection Failed
        Engine-->>Dest: error
        Dest-->>CLI: ConnectionStatus(failed)
    end

    CLI->>Dest: write(config, catalog, messages)

    loop RECORD messages
        Dest->>Dest: buffer record (generate ID, timestamp, JSON payload)
    end

    alt STATE message received
        Dest->>Dest: flush_buffer()
        Dest->>Engine: INSERT buffered records into per-stream table
        Engine-->>Dest: commit
        Dest-->>CLI: emit STATE
    end

    alt On completion
        Dest->>Dest: final flush_buffer()
        Dest->>Engine: INSERT remaining records
        Dest->>Cache: close connection
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Would you like focused review checkpoints (e.g., SQL injection risks, transactional semantics, and buffer flushing semantics) to speed verification? wdyt?

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Title check ✅ Passed The title accurately captures the main change: adding universal source and destination connectors powered by PyAirbyte, which is the core objective of this PR.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch devin/1769653021-destination-pyairbyte-universal

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

coderabbitai[bot]

This comment was marked as resolved.

@github-actions
Copy link

github-actions bot commented Jan 29, 2026

PyTest Results (Full)

413 tests  ±0   396 ✅ ±0   24m 39s ⏱️ -20s
  1 suites ±0    17 💤 ±0 
  1 files   ±0     0 ❌ ±0 

Results for commit 271673c. ± Comparison against base commit fa52519.

♻️ This comment has been updated with latest results.

…or/ and add CLI entrypoint

Co-Authored-By: AJ Steers <aj@airbyte.io>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@airbyte/cli/universal_connector/destination.py`:
- Around line 76-87: The code currently constructs a SnowflakeCache when
destination_type == DESTINATION_TYPE_SNOWFLAKE without rejecting empty strings
for required fields; add early validation to fail fast by checking required
Snowflake fields (account, username, warehouse, database, role) are non-empty
before constructing SnowflakeCache (or implement the same checks in
SnowflakeConfig/SnowflakeCache initializer/constructor), and raise a clear
ValueError indicating which field is missing; preserve the existing SecretString
wrapping for password and include that validation logic near the SnowflakeCache
creation site or inside SnowflakeCache/SnowflakeConfig (functions/classes to
touch: destination_type check, SnowflakeCache constructor, or SnowflakeConfig
class) so misconfigurations are caught immediately with explicit error messages.

In `@airbyte/cli/universal_connector/Dockerfile`:
- Line 5: The Dockerfile's COPY instruction currently references a nonexistent
poetry.lock; update the COPY line that mentions "poetry.lock" to instead
reference "uv.lock" so it reads as copying pyproject.toml and uv.lock into the
image (ensure uv.lock exists at repo root); this change is in the Dockerfile's
COPY command.
🧹 Nitpick comments (2)
airbyte/cli/universal_connector/destination.py (1)

233-248: Consider batch inserts for better performance, wdyt?

The current implementation executes one INSERT per record. For large buffers, this can be significantly slower than using executemany or SQLAlchemy's bulk insert capabilities.

⚡ Proposed batch insert approach
         with engine.connect() as conn:
-            for i in range(len(entries["_airbyte_ab_id"])):
-                insert_sql = text(f"""
-                    INSERT INTO {schema_name}.{table_name}
-                    (_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data)
-                    VALUES (:ab_id, :emitted_at, :data)
-                """)
-                conn.execute(
-                    insert_sql,
-                    {
-                        "ab_id": entries["_airbyte_ab_id"][i],
-                        "emitted_at": entries["_airbyte_emitted_at"][i],
-                        "data": entries["_airbyte_data"][i],
-                    },
-                )
+            insert_sql = text(f"""
+                INSERT INTO {schema_name}.{table_name}
+                (_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data)
+                VALUES (:ab_id, :emitted_at, :data)
+            """)
+            params = [
+                {
+                    "ab_id": entries["_airbyte_ab_id"][i],
+                    "emitted_at": entries["_airbyte_emitted_at"][i],
+                    "data": entries["_airbyte_data"][i],
+                }
+                for i in range(len(entries["_airbyte_ab_id"]))
+            ]
+            conn.execute(insert_sql, params)
             conn.commit()  # pyrefly: ignore[missing-attribute]
airbyte/cli/universal_connector/Dockerfile (1)

1-11: Consider adding a spec.json for connector registration, wdyt?

The PR objectives note that spec.json may be required for connector registration in the airbytehq/airbyte repository. The Dockerfile looks good otherwise—nice use of SHA pinning for reproducibility!

Would you like me to help generate a spec.json file based on the configuration schema used in _get_cache()?

Comment on lines +76 to +87
if destination_type == DESTINATION_TYPE_SNOWFLAKE:
sf_config = config.get("snowflake", {})
password = sf_config.get("password")
return SnowflakeCache(
account=sf_config.get("account", ""),
username=sf_config.get("username", ""),
password=SecretString(password) if password else None,
warehouse=sf_config.get("warehouse", ""),
database=sf_config.get("database", ""),
schema_name=sf_config.get("schema_name", "PUBLIC"),
role=sf_config.get("role", ""),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# Find the SnowflakeCache class definition
find . -type f -name "*.py" | xargs rg "class SnowflakeCache" -A 50

Repository: airbytehq/PyAirbyte

Length of output: 1430


🏁 Script executed:

# Also check if there's any validation of these parameters
rg "SnowflakeCache" --type py -B 2 -A 10 | head -100

Repository: airbytehq/PyAirbyte

Length of output: 7486


🏁 Script executed:

# Search for snowflake cache files
fd "snowflake" --type f --extension py | grep -i cache

Repository: airbytehq/PyAirbyte

Length of output: 90


🏁 Script executed:

# Find SnowflakeConfig definition
rg "class SnowflakeConfig" -A 50 airbyte/caches/

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Check if there's validation in SnowflakeConfig or SnowflakeCache
rg -A 5 -B 5 "validator\|root_validator\|field_validator" airbyte/caches/snowflake.py

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Look at the full SnowflakeConfig definition to understand defaults
cat -n airbyte/caches/snowflake.py | head -100

Repository: airbytehq/PyAirbyte

Length of output: 3232


🏁 Script executed:

# Find the SnowflakeConfig definition
find . -path "*/_processors/sql/snowflake.py" -type f

Repository: airbytehq/PyAirbyte

Length of output: 101


🏁 Script executed:

# Search for SnowflakeConfig class
rg "class SnowflakeConfig" -A 100 --type py

Repository: airbytehq/PyAirbyte

Length of output: 7591


🏁 Script executed:

# Check SqlConfig for any validation
rg "class SqlConfig" -A 30 airbyte/_processors/sql/

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Look at the full _get_cache method in destination.py for context
rg -B 10 -A 40 "def _get_cache" airbyte/cli/universal_connector/destination.py

Repository: airbytehq/PyAirbyte

Length of output: 2399


Add early validation to reject empty strings for required Snowflake fields, wdyt?

The constructor accepts all these parameters, but passing empty strings for required fields like account, username, warehouse, database, and role will slip through validation and only fail at runtime during connection. Consider adding validation in SnowflakeConfig to catch these misconfigurations early with clear error messages.

🤖 Prompt for AI Agents
In `@airbyte/cli/universal_connector/destination.py` around lines 76 - 87, The
code currently constructs a SnowflakeCache when destination_type ==
DESTINATION_TYPE_SNOWFLAKE without rejecting empty strings for required fields;
add early validation to fail fast by checking required Snowflake fields
(account, username, warehouse, database, role) are non-empty before constructing
SnowflakeCache (or implement the same checks in SnowflakeConfig/SnowflakeCache
initializer/constructor), and raise a clear ValueError indicating which field is
missing; preserve the existing SecretString wrapping for password and include
that validation logic near the SnowflakeCache creation site or inside
SnowflakeCache/SnowflakeConfig (functions/classes to touch: destination_type
check, SnowflakeCache constructor, or SnowflakeConfig class) so
misconfigurations are caught immediately with explicit error messages.

- Add SourcePyAirbyteUniversal class that wraps any PyAirbyte source
- Add CLI entrypoint 'source-pyairbyte' for the universal source
- Fix Dockerfile to use uv.lock instead of poetry.lock
- Use proper CDK AirbyteEntrypoint.launch() pattern for sources

Co-Authored-By: AJ Steers <aj@airbyte.io>
@devin-ai-integration devin-ai-integration bot changed the title feat(destinations): add universal destination using PyAirbyte caches feat(connectors): add universal source and destination using PyAirbyte Jan 29, 2026
…iping

Co-Authored-By: AJ Steers <aj@airbyte.io>
@devin-ai-integration
Copy link
Contributor

Bot Feedback Assessment Summary

I've reviewed all CodeRabbit comments and addressed them as follows:

Fixed Issues

  1. Resource cleanup in check() method (Comments 5-6) - Added try/finally block to ensure cache is closed on both success and failure paths. ✓ Fixed in commit ab6ed52.

  2. Resource cleanup in write() method (Comments 5-6) - Wrapped the entire method body in try/finally to guarantee cache.close() runs even if exceptions occur during message processing. ✓ Fixed in commit ab6ed52.

  3. Use record's original emitted_at timestamp (Comment 9) - Changed from using datetime.now() to using the record's original emitted_at timestamp (epoch milliseconds converted to UTC datetime), with fallback to current time if not available. This preserves when the source actually extracted the record. ✓ Fixed in commit 74dbb59.

Known Limitations (Deferred)

  1. SQL injection risk (Comment 7) - Schema and table names are interpolated into SQL strings without quoting. This is documented in the PR description as a known limitation. The values come from configuration (schema_name) and catalog (stream names) - controlled inputs, not arbitrary user strings. The actual risk is low. Deferring to human reviewer on whether to address now or track as follow-up.

  2. Namespace handling (Comment 8) - PyAirbyte intentionally omits namespace handling for performance reasons (documented in _connector_base.py and _message_iterators.py). This destination follows the same design philosophy. Out of scope for initial implementation.

Nitpicks (Not Implemented)

  1. Batch inserts (Comment 10) - Suggested using executemany for better performance. Valid optimization but not critical for initial implementation. Can be addressed as a follow-up enhancement.

  2. Snowflake field validation (Comment 12) - Suggested adding early validation for empty strings in required Snowflake fields. The cache classes already handle validation at connection time. Deferring to human reviewer.


Devin session

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (3)
airbyte/cli/universal_connector/destination.py (2)

221-237: ⚠️ Potential issue | 🔴 Critical

Could we quote schema/table identifiers before embedding them in SQL, wdyt?

Lines 221, 232, 237, and 312 interpolate config/catalog-derived identifiers directly; this is injection-prone and also breaks on reserved/special identifiers.

🛡️ Suggested fix pattern
+    def _quote_ident(self, engine: Engine, name: str) -> str:
+        return engine.dialect.identifier_preparer.quote(name)
@@
-            with engine.connect() as conn:
-                conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}"))
+            q_schema = self._quote_ident(engine, schema_name)
+            with engine.connect() as conn:
+                conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {q_schema}"))
@@
-                table_name = f"_airbyte_raw_{name}"
+                raw_table_name = f"_airbyte_raw_{name}"
+                q_table = self._quote_ident(engine, raw_table_name)
@@
-                        conn.execute(text(f"DROP TABLE IF EXISTS {schema_name}.{table_name}"))
+                        conn.execute(text(f"DROP TABLE IF EXISTS {q_schema}.{q_table}"))
@@
-                    CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} (
+                    CREATE TABLE IF NOT EXISTS {q_schema}.{q_table} (

Also applies to: 311-313

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@airbyte/cli/universal_connector/destination.py` around lines 221 - 237, Embed
schema and table identifiers safely by quoting them with SQLAlchemy's identifier
preparer instead of interpolating raw strings: use
engine.dialect.identifier_preparer.quote(schema_name) and quote(table_name) (or
construct Table objects via sqlalchemy.schema.Table with schema and metadata)
wherever identifiers are used (references: the CREATE SCHEMA call using
conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")), the DROP TABLE
call using conn.execute(text(f"DROP TABLE IF EXISTS
{schema_name}.{table_name}")), and the CREATE TABLE IF NOT EXISTS construction
that builds {schema_name}.{table_name} as well as the similar occurrences around
lines 311-313); replace direct f-string embedding with the quoted identifiers so
reserved names and injection are handled safely.

273-275: ⚠️ Potential issue | 🟡 Minor

Could we preserve source record.emitted_at for _airbyte_emitted_at, wdyt?

Using destination wall-clock time loses original source emission timing, which can skew lineage/debugging.

🕒 Suggested timestamp handling
-                    buffer[stream_name]["_airbyte_emitted_at"].append(
-                        datetime.datetime.now(datetime.timezone.utc).isoformat()
-                    )
+                    emitted_at = (
+                        datetime.datetime.fromtimestamp(
+                            record.emitted_at / 1000, tz=datetime.timezone.utc
+                        ).isoformat()
+                        if record.emitted_at is not None
+                        else datetime.datetime.now(datetime.timezone.utc).isoformat()
+                    )
+                    buffer[stream_name]["_airbyte_emitted_at"].append(emitted_at)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@airbyte/cli/universal_connector/destination.py` around lines 273 - 275, The
current code always appends the destination wall-clock time to
buffer[stream_name]["_airbyte_emitted_at"], losing the source's original
emission time; change the logic in the block that writes to buffer (referencing
buffer, stream_name and the "_airbyte_emitted_at" list) to first check the
incoming record's emitted timestamp (e.g., record.get("emitted_at") or
record.emitted_at) and, if present and non-empty, append that value
(normalizing/formatting to an ISO UTC string if needed); only if the source
emitted_at is missing or invalid should you fall back to
datetime.datetime.now(datetime.timezone.utc).isoformat().
airbyte/cli/universal_connector/source.py (1)

151-166: ⚠️ Potential issue | 🟠 Major

Could we force full-refresh (or fail fast on incoming state) until STATE conversion is implemented, wdyt?

Line 165 currently uses default sync-mode selection, which is incremental-preferred, while state is ignored and STATE messages are dropped; that combination can produce unsafe checkpoint semantics.

🧭 Safer interim behavior
     def read(
         self,
         logger: logging.Logger,  # noqa: ARG002
         config: Mapping[str, Any],
         catalog: ConfiguredAirbyteCatalog,
         state: list[Any] | None = None,  # noqa: ARG002
     ) -> Iterable[AirbyteMessage]:
@@
         source = self._get_pyairbyte_source(config)
+        if state:
+            raise ValueError(
+                "Incremental sync/state is not yet supported by source-pyairbyte."
+            )
@@
-        configured_catalog = source.get_configured_catalog(streams=stream_names)
+        configured_catalog = source.get_configured_catalog(
+            streams=stream_names,
+            force_full_refresh=True,
+        )

Based on learnings: In PyAirbyte reviews, TODOs should receive a friendly reminder to resolve before merging.

Also applies to: 179-197

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@airbyte/cli/universal_connector/source.py` around lines 151 - 166, In read(),
guard against ignored incoming state by failing fast: if the state parameter is
not None/empty, raise a clear exception (e.g., RuntimeError or ValueError)
stating that STATE conversion is not implemented instead of proceeding with
default incremental behavior; locate this check in the read() method (inspect
the state parameter, the catalog variable, and the calls to
source.get_configured_catalog and source.select_streams) and add the early
validation so we never continue with unsafe checkpoint semantics (also apply the
same guard to the other read-path around the configured_catalog handling that
mirrors lines 179-197).
🧹 Nitpick comments (4)
.github/workflows/tk-todo-check.yml (1)

32-32: Consider switching to git grep for simpler and more robust path handling?

The current git ls-files | xargs grep approach works fine today, but git grep is more idiomatic and handles edge cases better (e.g., binary files with the -I flag). It also eliminates a pipe and makes the intent clearer.

Suggested update
-          FOUND_TODOS=$(git ls-files | xargs grep -i -n "TK-TODO" 2>/dev/null | grep -i -v "IGNORE:TK" || true)  # IGNORE:TK
+          FOUND_TODOS=$(git grep -nI -i "TK-TODO" -- | grep -i -v "IGNORE:TK" || true)  # IGNORE:TK
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In @.github/workflows/tk-todo-check.yml at line 32, Replace the pipeline that
uses git ls-files | xargs grep with a single git grep invocation: update the
FOUND_TODOS assignment to call git grep -n -I "TK-TODO" (to skip binary files)
and then filter out IGNORE:TK (e.g., pipe to grep -v "IGNORE:TK") with a
fallback || true so the variable is empty instead of failing; locate the
existing FOUND_TODOS line and swap the command accordingly (reference:
FOUND_TODOS assignment and the string "TK-TODO").
airbyte/cli/universal_connector/Dockerfile (1)

8-11: Could we explicitly set a non-root runtime user in this Dockerfile, wdyt?

Relying on the base image’s default user is implicit; adding an explicit USER helps prevent accidental root execution if upstream defaults change.

🔒 Suggested hardening
 RUN pip install --no-cache-dir .
 
+USER airbyte
 ENV AIRBYTE_ENTRYPOINT="destination-pyairbyte"
 ENTRYPOINT ["destination-pyairbyte"]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@airbyte/cli/universal_connector/Dockerfile` around lines 8 - 11, The
Dockerfile currently installs the package and sets AIRBYTE_ENTRYPOINT/ENTRYPOINT
but does not declare a non-root user; add steps to create a dedicated
unprivileged user and group (e.g., appuser/appgroup), chown the installed files
and any runtime directories to that user, and then set USER to that non-root
account so the container runs without root privileges; update around the pip
install and ENTRYPOINT sections referenced by AIRBYTE_ENTRYPOINT and ENTRYPOINT
to ensure ownership and runtime user are applied.
airbyte/cli/universal_connector/source.py (1)

179-182: Could we isolate private _read_with_catalog usage behind a small adapter, wdyt?

Direct dependency on a private method (SLF001) is fragile across internal refactors; a wrapper/fallback path would reduce breakage risk.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@airbyte/cli/universal_connector/source.py` around lines 179 - 182, The code
directly calls the private method source._read_with_catalog which is fragile;
wrap this in a small adapter on the Source object (e.g., add a public method
read_with_catalog on the source wrapper or utility) that first tries calling
source.read_with_catalog (preferred) and falls back to source._read_with_catalog
if needed, then update the loop to iterate over source.read_with_catalog(...).
This isolates the private call, centralizes the fallback logic, and makes future
refactors safe while referencing the existing symbols source and
_read_with_catalog.
airbyte/cli/universal_connector/destination.py (1)

309-323: Could we batch insert instead of executing one row at a time, wdyt?

The per-row execute loop creates many round trips; passing a list of parameter dicts to one statement is much more efficient.

⚡ Example refactor
         with engine.connect() as conn:
-            for i in range(len(entries["_airbyte_ab_id"])):
-                insert_sql = text(f"""
-                    INSERT INTO {schema_name}.{table_name}
-                    (_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data)
-                    VALUES (:ab_id, :emitted_at, :data)
-                """)
-                conn.execute(
-                    insert_sql,
-                    {
-                        "ab_id": entries["_airbyte_ab_id"][i],
-                        "emitted_at": entries["_airbyte_emitted_at"][i],
-                        "data": entries["_airbyte_data"][i],
-                    },
-                )
+            insert_sql = text(f"""
+                INSERT INTO {schema_name}.{table_name}
+                (_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data)
+                VALUES (:ab_id, :emitted_at, :data)
+            """)
+            payload = [
+                {
+                    "ab_id": entries["_airbyte_ab_id"][i],
+                    "emitted_at": entries["_airbyte_emitted_at"][i],
+                    "data": entries["_airbyte_data"][i],
+                }
+                for i in range(len(entries["_airbyte_ab_id"]))
+            ]
+            conn.execute(insert_sql, payload)
             conn.commit()  # pyrefly: ignore[missing-attribute]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@airbyte/cli/universal_connector/destination.py` around lines 309 - 323, The
loop performs one execute per row causing many round trips; instead build a list
of parameter dicts from entries (use the same keys: "ab_id", "emitted_at",
"data") and call conn.execute once with insert_sql and that list so SQLAlchemy
will perform a batched/ executemany insert; update the block that creates
insert_sql and the conn.execute call (referencing engine.connect, insert_sql,
conn.execute, and entries/_airbyte_ab_id/_airbyte_emitted_at/_airbyte_data) to
pass a single list of parameter mappings rather than executing inside the for
loop.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@airbyte/cli/universal_connector/destination.py`:
- Around line 246-283: The buffer can grow unbounded because you only flush on
STATE/end; add a size-based flush threshold (e.g., self.buffer_max_records) and
in the input_messages loop inside the Type.RECORD branch, after appending to
buffer[stream_name], check if len(buffer[stream_name]["_airbyte_ab_id"]) >=
self.buffer_max_records and if so call self._flush_buffer(engine=engine,
buffer=buffer, schema_name=schema_name, stream_name=stream_name) and then
clear/reset buffer[stream_name] (or reassign to a fresh defaultdict list) to
avoid OOM; add a configurable attribute (e.g., buffer_max_records with a
sensible default) on the class or in the constructor so the threshold is tunable
and reference buffer, Type.RECORD, and _flush_buffer when applying the change.

In `@airbyte/cli/universal_connector/source.py`:
- Around line 106-115: The current except blocks in the check() flow return raw
exception text via AirbyteConnectionStatus (using str(e) and {e!r}); change
these to return a static, non-sensitive failure message (e.g., "Connection check
failed") while logging the full exception details to a logger if needed; update
the except ValueError and generic except Exception in source.py so both return
AirbyteConnectionStatus(status=Status.FAILED, message="Connection check failed")
and ensure any existing logger (or add one) records the exception for debugging
(reference: function check(), AirbyteConnectionStatus, Status.FAILED).

---

Duplicate comments:
In `@airbyte/cli/universal_connector/destination.py`:
- Around line 221-237: Embed schema and table identifiers safely by quoting them
with SQLAlchemy's identifier preparer instead of interpolating raw strings: use
engine.dialect.identifier_preparer.quote(schema_name) and quote(table_name) (or
construct Table objects via sqlalchemy.schema.Table with schema and metadata)
wherever identifiers are used (references: the CREATE SCHEMA call using
conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")), the DROP TABLE
call using conn.execute(text(f"DROP TABLE IF EXISTS
{schema_name}.{table_name}")), and the CREATE TABLE IF NOT EXISTS construction
that builds {schema_name}.{table_name} as well as the similar occurrences around
lines 311-313); replace direct f-string embedding with the quoted identifiers so
reserved names and injection are handled safely.
- Around line 273-275: The current code always appends the destination
wall-clock time to buffer[stream_name]["_airbyte_emitted_at"], losing the
source's original emission time; change the logic in the block that writes to
buffer (referencing buffer, stream_name and the "_airbyte_emitted_at" list) to
first check the incoming record's emitted timestamp (e.g.,
record.get("emitted_at") or record.emitted_at) and, if present and non-empty,
append that value (normalizing/formatting to an ISO UTC string if needed); only
if the source emitted_at is missing or invalid should you fall back to
datetime.datetime.now(datetime.timezone.utc).isoformat().

In `@airbyte/cli/universal_connector/source.py`:
- Around line 151-166: In read(), guard against ignored incoming state by
failing fast: if the state parameter is not None/empty, raise a clear exception
(e.g., RuntimeError or ValueError) stating that STATE conversion is not
implemented instead of proceeding with default incremental behavior; locate this
check in the read() method (inspect the state parameter, the catalog variable,
and the calls to source.get_configured_catalog and source.select_streams) and
add the early validation so we never continue with unsafe checkpoint semantics
(also apply the same guard to the other read-path around the configured_catalog
handling that mirrors lines 179-197).

---

Nitpick comments:
In @.github/workflows/tk-todo-check.yml:
- Line 32: Replace the pipeline that uses git ls-files | xargs grep with a
single git grep invocation: update the FOUND_TODOS assignment to call git grep
-n -I "TK-TODO" (to skip binary files) and then filter out IGNORE:TK (e.g., pipe
to grep -v "IGNORE:TK") with a fallback || true so the variable is empty instead
of failing; locate the existing FOUND_TODOS line and swap the command
accordingly (reference: FOUND_TODOS assignment and the string "TK-TODO").

In `@airbyte/cli/universal_connector/destination.py`:
- Around line 309-323: The loop performs one execute per row causing many round
trips; instead build a list of parameter dicts from entries (use the same keys:
"ab_id", "emitted_at", "data") and call conn.execute once with insert_sql and
that list so SQLAlchemy will perform a batched/ executemany insert; update the
block that creates insert_sql and the conn.execute call (referencing
engine.connect, insert_sql, conn.execute, and
entries/_airbyte_ab_id/_airbyte_emitted_at/_airbyte_data) to pass a single list
of parameter mappings rather than executing inside the for loop.

In `@airbyte/cli/universal_connector/Dockerfile`:
- Around line 8-11: The Dockerfile currently installs the package and sets
AIRBYTE_ENTRYPOINT/ENTRYPOINT but does not declare a non-root user; add steps to
create a dedicated unprivileged user and group (e.g., appuser/appgroup), chown
the installed files and any runtime directories to that user, and then set USER
to that non-root account so the container runs without root privileges; update
around the pip install and ENTRYPOINT sections referenced by AIRBYTE_ENTRYPOINT
and ENTRYPOINT to ensure ownership and runtime user are applied.

In `@airbyte/cli/universal_connector/source.py`:
- Around line 179-182: The code directly calls the private method
source._read_with_catalog which is fragile; wrap this in a small adapter on the
Source object (e.g., add a public method read_with_catalog on the source wrapper
or utility) that first tries calling source.read_with_catalog (preferred) and
falls back to source._read_with_catalog if needed, then update the loop to
iterate over source.read_with_catalog(...). This isolates the private call,
centralizes the fallback logic, and makes future refactors safe while
referencing the existing symbols source and _read_with_catalog.

ℹ️ Review info

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between dbe9884 and ab6ed52.

📒 Files selected for processing (7)
  • .github/workflows/tk-todo-check.yml
  • airbyte/cli/universal_connector/Dockerfile
  • airbyte/cli/universal_connector/__init__.py
  • airbyte/cli/universal_connector/destination.py
  • airbyte/cli/universal_connector/run_source.py
  • airbyte/cli/universal_connector/source.py
  • pyproject.toml
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte/cli/universal_connector/init.py

Comment on lines +246 to +283
# Buffer for records
buffer: dict[str, dict[str, list[Any]]] = defaultdict(lambda: defaultdict(list))

for message in input_messages:
if message.type == Type.STATE:
# Flush the buffer before yielding state
for stream_name in list(buffer.keys()):
self._flush_buffer(
engine=engine,
buffer=buffer,
schema_name=schema_name,
stream_name=stream_name,
)
buffer = defaultdict(lambda: defaultdict(list))
yield message

elif message.type == Type.RECORD:
record = message.record
if record is None:
continue
stream_name = record.stream
if stream_name not in streams:
logger.debug(f"Stream {stream_name} not in configured streams, skipping")
continue

# Add to buffer
buffer[stream_name]["_airbyte_ab_id"].append(str(uuid.uuid4()))
buffer[stream_name]["_airbyte_emitted_at"].append(
datetime.datetime.now(datetime.timezone.utc).isoformat()
)
buffer[stream_name]["_airbyte_data"].append(json.dumps(record.data))

else:
logger.debug(f"Message type {message.type} not handled, skipping")

# Flush any remaining records
for stream_name in list(buffer.keys()):
self._flush_buffer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Could we add a size-based buffer flush threshold, wdyt?

Buffering only on STATE/end can grow unbounded for sources that emit STATE infrequently, which risks OOM on larger syncs.

📦 Example safeguard
-            buffer: dict[str, dict[str, list[Any]]] = defaultdict(lambda: defaultdict(list))
+            buffer: dict[str, dict[str, list[Any]]] = defaultdict(lambda: defaultdict(list))
+            max_buffer_records = 10_000
+            buffered_records = 0
@@
                     buffer[stream_name]["_airbyte_data"].append(json.dumps(record.data))
+                    buffered_records += 1
+
+                    if buffered_records >= max_buffer_records:
+                        for pending_stream in list(buffer.keys()):
+                            self._flush_buffer(
+                                engine=engine,
+                                buffer=buffer,
+                                schema_name=schema_name,
+                                stream_name=pending_stream,
+                            )
+                        buffer = defaultdict(lambda: defaultdict(list))
+                        buffered_records = 0
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@airbyte/cli/universal_connector/destination.py` around lines 246 - 283, The
buffer can grow unbounded because you only flush on STATE/end; add a size-based
flush threshold (e.g., self.buffer_max_records) and in the input_messages loop
inside the Type.RECORD branch, after appending to buffer[stream_name], check if
len(buffer[stream_name]["_airbyte_ab_id"]) >= self.buffer_max_records and if so
call self._flush_buffer(engine=engine, buffer=buffer, schema_name=schema_name,
stream_name=stream_name) and then clear/reset buffer[stream_name] (or reassign
to a fresh defaultdict list) to avoid OOM; add a configurable attribute (e.g.,
buffer_max_records with a sensible default) on the class or in the constructor
so the threshold is tunable and reference buffer, Type.RECORD, and _flush_buffer
when applying the change.

Comment on lines +106 to +115
except ValueError as e:
return AirbyteConnectionStatus(
status=Status.FAILED,
message=str(e),
)
except Exception as e:
return AirbyteConnectionStatus(
status=Status.FAILED,
message=f"Connection check failed: {e!r}",
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Could we avoid returning raw exception text from check(), wdyt?

str(e) / {e!r} here can expose connector config details (including secrets) in status messages; a static failure message is safer.

🔐 Proposed adjustment
-        except ValueError as e:
+        except ValueError:
             return AirbyteConnectionStatus(
                 status=Status.FAILED,
-                message=str(e),
+                message="Invalid source configuration.",
             )
-        except Exception as e:
+        except Exception as e:
+            logger.warning("Universal source check failed (%s)", type(e).__name__)
             return AirbyteConnectionStatus(
                 status=Status.FAILED,
-                message=f"Connection check failed: {e!r}",
+                message="Connection check failed.",
             )

Based on learnings: In PyAirbyte, error messages should decouple static message text from dynamic values to reduce PII exposure in telemetry.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@airbyte/cli/universal_connector/source.py` around lines 106 - 115, The
current except blocks in the check() flow return raw exception text via
AirbyteConnectionStatus (using str(e) and {e!r}); change these to return a
static, non-sensitive failure message (e.g., "Connection check failed") while
logging the full exception details to a logger if needed; update the except
ValueError and generic except Exception in source.py so both return
AirbyteConnectionStatus(status=Status.FAILED, message="Connection check failed")
and ensure any existing logger (or add one) records the exception for debugging
(reference: function check(), AirbyteConnectionStatus, Status.FAILED).

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (2)
airbyte/cli/universal_connector/destination.py (2)

221-233: ⚠️ Potential issue | 🔴 Critical

Quote SQL identifiers before composing statements, wdyt?

Line 221, Line 232, Line 237, and Line 319 still interpolate raw identifier strings. Could we quote schema_name and table_name via the dialect preparer to avoid SQL injection and broken identifiers across backends?

🔧 Proposed fix
 class DestinationPyAirbyteUniversal(Destination):
     """Universal destination that writes to any PyAirbyte-supported cache backend."""
+
+    `@staticmethod`
+    def _quote_ident(engine: Engine, ident: str) -> str:
+        return engine.dialect.identifier_preparer.quote(ident)
@@
             engine = cache.get_sql_engine()
+            quoted_schema_name = self._quote_ident(engine, schema_name)
             with engine.connect() as conn:
-                conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}"))
+                conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {quoted_schema_name}"))
                 conn.commit()  # pyrefly: ignore[missing-attribute]
@@
                 name = configured_stream.stream.name
                 table_name = f"_airbyte_raw_{name}"
+                quoted_table_name = self._quote_ident(engine, table_name)
@@
-                        conn.execute(text(f"DROP TABLE IF EXISTS {schema_name}.{table_name}"))
+                        conn.execute(
+                            text(
+                                f"DROP TABLE IF EXISTS {quoted_schema_name}.{quoted_table_name}"
+                            )
+                        )
@@
-                    CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} (
+                    CREATE TABLE IF NOT EXISTS {quoted_schema_name}.{quoted_table_name} (
@@
-        table_name = f"_airbyte_raw_{stream_name}"
+        table_name = f"_airbyte_raw_{stream_name}"
+        quoted_schema_name = self._quote_ident(engine, schema_name)
+        quoted_table_name = self._quote_ident(engine, table_name)
@@
-                    INSERT INTO {schema_name}.{table_name}
+                    INSERT INTO {quoted_schema_name}.{quoted_table_name}
#!/bin/bash
# Verify raw identifier interpolation still exists in SQL text() calls.
rg -nP 'text\(f".*\{(schema_name|table_name)\}' airbyte/cli/universal_connector/destination.py -C2

# Verify whether identifier quoting is implemented.
rg -n 'identifier_preparer\.quote|def _quote_ident' airbyte/cli/universal_connector/destination.py -C2

Also applies to: 236-243, 318-320

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@airbyte/cli/universal_connector/destination.py` around lines 221 - 233, The
SQL statements are interpolating raw schema and table identifiers (e.g.,
schema_name and table_name) into text() calls in the create/drop table blocks
inside the loop over configured_catalog.streams, which risks SQL injection and
broken identifiers across backends; fix by quoting identifiers using the
engine's dialect preparer (engine.dialect.identifier_preparer.quote) or by
building statements with SQLAlchemy Table/DDL APIs instead of f-strings, e.g.,
quote schema_name and table_name before passing into text() in the create schema
and DROP TABLE/CREATE TABLE sections (references: configured_catalog.streams
loop, variables schema_name and table_name, and the with engine.connect()
blocks).

246-260: ⚠️ Potential issue | 🟠 Major

Could we add a size-based flush guard for the record buffer, wdyt?

Line 246-Line 295 flushes only on STATE/end. For long-running streams with sparse STATE messages, memory can grow unbounded. Would you consider a configurable buffer_max_records threshold and intermediate flushes?

📦 Proposed fix
-            buffer: dict[str, dict[str, list[Any]]] = defaultdict(lambda: defaultdict(list))
+            buffer: dict[str, dict[str, list[Any]]] = defaultdict(lambda: defaultdict(list))
+            buffer_max_records = int(config.get("buffer_max_records", 10_000))
+            buffered_records = 0
@@
                     buffer[stream_name]["_airbyte_emitted_at"].append(emitted_at.isoformat())
                     buffer[stream_name]["_airbyte_data"].append(json.dumps(record.data))
+                    buffered_records += 1
+
+                    if buffered_records >= buffer_max_records:
+                        for pending_stream in list(buffer.keys()):
+                            self._flush_buffer(
+                                engine=engine,
+                                buffer=buffer,
+                                schema_name=schema_name,
+                                stream_name=pending_stream,
+                            )
+                        buffer = defaultdict(lambda: defaultdict(list))
+                        buffered_records = 0

Also applies to: 262-295

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@airbyte/cli/universal_connector/destination.py` around lines 246 - 260, The
record buffer currently only flushes on STATE messages causing unbounded memory
growth; add a configurable buffer_max_records (e.g., passed into constructor or
method options) and enforce a size-based flush inside the input_messages loop:
after appending to buffer, compute the total buffered record count (sum of
lengths in buffer dict) and if it exceeds buffer_max_records, call
self._flush_buffer for each stream_name in buffer (same logic used on STATE
flush) and reset buffer to a fresh defaultdict; ensure buffer_max_records
default is sane (e.g., None or a positive int) and skip size checks when unset.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@airbyte/cli/universal_connector/destination.py`:
- Around line 221-233: The SQL statements are interpolating raw schema and table
identifiers (e.g., schema_name and table_name) into text() calls in the
create/drop table blocks inside the loop over configured_catalog.streams, which
risks SQL injection and broken identifiers across backends; fix by quoting
identifiers using the engine's dialect preparer
(engine.dialect.identifier_preparer.quote) or by building statements with
SQLAlchemy Table/DDL APIs instead of f-strings, e.g., quote schema_name and
table_name before passing into text() in the create schema and DROP TABLE/CREATE
TABLE sections (references: configured_catalog.streams loop, variables
schema_name and table_name, and the with engine.connect() blocks).
- Around line 246-260: The record buffer currently only flushes on STATE
messages causing unbounded memory growth; add a configurable buffer_max_records
(e.g., passed into constructor or method options) and enforce a size-based flush
inside the input_messages loop: after appending to buffer, compute the total
buffered record count (sum of lengths in buffer dict) and if it exceeds
buffer_max_records, call self._flush_buffer for each stream_name in buffer (same
logic used on STATE flush) and reset buffer to a fresh defaultdict; ensure
buffer_max_records default is sane (e.g., None or a positive int) and skip size
checks when unset.

ℹ️ Review info

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ab6ed52 and 74dbb59.

📒 Files selected for processing (1)
  • airbyte/cli/universal_connector/destination.py

…-smoke-test

Co-Authored-By: AJ Steers <aj@airbyte.io>
@devin-ai-integration devin-ai-integration bot changed the title feat(connectors): add universal source and destination using PyAirbyte feat(connectors): add smoke-test source and destination using PyAirbyte Feb 26, 2026
devin-ai-integration bot and others added 2 commits February 26, 2026 07:57
Co-Authored-By: AJ Steers <aj@airbyte.io>
…byte

Co-Authored-By: AJ Steers <aj@airbyte.io>
@aaronsteers Aaron ("AJ") Steers (aaronsteers) marked this pull request as ready for review February 26, 2026 17:02
…sting (#982)

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
@devin-ai-integration devin-ai-integration bot changed the title feat(connectors): add smoke-test source and destination using PyAirbyte feat(connectors): add universal source and destination using PyAirbyte Feb 26, 2026
@devin-ai-integration
Copy link
Contributor

Bot Feedback Review Summary

I've reviewed the latest CodeRabbit feedback (Comments 20-22). Here's my assessment:

Comment 20: Size-based buffer flush threshold (destination.py)

Suggestion: Add a max_buffer_records threshold to prevent OOM on sources that emit STATE infrequently.

Assessment: Valid optimization for production use. However, for this initial implementation targeting smoke testing, the risk is low since test scenarios are small. This is a good follow-up enhancement.

Decision: Defer to human reviewer. Can be added as a follow-up if needed.

Comment 21: Avoid raw exception text in check() (source.py)

Suggestion: Return static failure messages instead of str(e) / {e!r} to avoid exposing config details.

Assessment: Valid security concern. However, for debugging purposes during initial development, having the actual error message is valuable. The underlying PyAirbyte source's check() method already sanitizes most sensitive information.

Decision: Defer to human reviewer. The current behavior aids debugging but could be tightened for production.

Review 22: Duplicate comments (already addressed)
  • SQL identifier quoting: Previously discussed - low risk with controlled inputs from config/catalog
  • Preserve source emitted_at: Already fixed in commit 74dbb59
  • Force full-refresh until STATE implemented: Tracked with TK-TODO marker
Nitpicks (low priority)
  • git grep instead of git ls-files | xargs grep: Minor optimization, not critical
  • Dockerfile USER directive: Base image already handles this appropriately
  • Isolate _read_with_catalog usage: Valid concern but adds complexity for initial implementation

All major concerns are either already addressed or documented in the PR description for human review.


Devin session

Copy link
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 potential issue.

View 8 additional findings in Devin Review.

Open in Devin Review

Comment on lines +236 to +244
create_sql = f"""
CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} (
_airbyte_ab_id VARCHAR(36) PRIMARY KEY,
_airbyte_emitted_at TIMESTAMP,
_airbyte_data JSON
)
"""
conn.execute(text(create_sql))
conn.commit() # pyrefly: ignore[missing-attribute]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Unquoted SQL identifiers cause syntax errors for stream names with special characters

Stream names and schema names are interpolated directly into raw SQL strings without any identifier quoting. When a stream name contains characters that are not valid in unquoted SQL identifiers (e.g., dashes, dots, spaces, or SQL keywords), the generated SQL will fail with a syntax error or, worse, enable SQL injection.

Root Cause and Impact

The write method constructs SQL statements by f-string interpolation of schema_name and table_name (which is f"_airbyte_raw_{stream_name}") at multiple locations:

  • destination.py:221: CREATE SCHEMA IF NOT EXISTS {schema_name}
  • destination.py:232: DROP TABLE IF EXISTS {schema_name}.{table_name}
  • destination.py:237: CREATE TABLE IF NOT EXISTS {schema_name}.{table_name}
  • destination.py:319: INSERT INTO {schema_name}.{table_name}

For example, a stream named my-stream produces _airbyte_raw_my-stream, and the SQL CREATE TABLE IF NOT EXISTS main._airbyte_raw_my-stream (...) will be a syntax error because - is interpreted as the minus operator.

Similarly, a stream name containing '; DROP TABLE users; -- could lead to SQL injection.

Notably, the companion smoke test source defines scenarios like "CamelCaseStreamName" which would work (unquoted identifiers are case-insensitive but still syntactically valid), but any source with stream names containing dashes, dots, or spaces would break this destination entirely.

Impact: The destination will crash with SQL errors for any stream whose name contains characters not valid in unquoted SQL identifiers. This includes common stream naming patterns like my-table, schema.table, or table name.

Prompt for agents
In airbyte/cli/universal_connector/destination.py, all SQL identifiers (schema_name and table_name) must be properly quoted before interpolation into SQL strings. This affects lines 221, 232, 237-243, and 318-322.

One approach: add a helper method to quote identifiers, e.g.:

    def _quote_identifier(self, name: str) -> str:
        escaped = name.replace('"', '""')
        return f'"{ escaped}"'

Then use it everywhere identifiers are interpolated:
- Line 221: text(f"CREATE SCHEMA IF NOT EXISTS {self._quote_identifier(schema_name)}")
- Line 232: text(f"DROP TABLE IF EXISTS {self._quote_identifier(schema_name)}.{self._quote_identifier(table_name)}")
- Lines 237-243: Same pattern for CREATE TABLE
- Lines 318-322: Same pattern for INSERT INTO in _flush_buffer method

This needs to be applied consistently across all 4 SQL construction sites in the file.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Co-Authored-By: AJ Steers <aj@airbyte.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant