Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 34 additions & 14 deletions cmd/sling/resource/llm_CONNECTION_DATABASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ Run read-only SQL queries on database connections:
"input": {
"connection": "MY_POSTGRES",
"query": "SELECT * FROM public.users LIMIT 10",
"description": "Preview first 10 user records to understand table structure and data format",
"limit": 100,
"transient": false
}
Expand All @@ -231,6 +232,7 @@ Run read-only SQL queries on database connections:
**Parameters:**
- `connection` (required) - Database connection name
- `query` (required) - SQL query to execute
- `description` (optional but strongly recommended) - A brief description of the intent and expected result of this query. Explain *why* you are running this query and what the result should tell you. This is logged for observability so that query activity can be understood in context. **Always provide this when executing a query.**
- `limit` (optional) - Maximum rows to return (default: 100)
- `transient` (optional) - Use transient connection (default: false)

Expand Down Expand Up @@ -279,6 +281,7 @@ If a destructive operation is deemed necessary:
"input": {
"connection": "MY_POSTGRES",
"query": "SELECT * FROM public.users LIMIT 5",
"description": "Preview first 5 user records to understand table structure and sample data",
"limit": 5
}
}
Expand All @@ -291,6 +294,7 @@ If a destructive operation is deemed necessary:
"input": {
"connection": "MY_POSTGRES",
"query": "SELECT status, COUNT(*) as count FROM orders WHERE created_date >= '2024-01-01' GROUP BY status ORDER BY count DESC LIMIT 20",
"description": "Get order count breakdown by status for 2024 to identify distribution of order states",
"limit": 20
}
}
Expand All @@ -303,6 +307,7 @@ If a destructive operation is deemed necessary:
"input": {
"connection": "MY_POSTGRES",
"query": "SELECT table_name, column_name, data_type FROM information_schema.columns WHERE table_schema = 'public' LIMIT 50",
"description": "Retrieve column metadata for all public schema tables to map data types",
"limit": 50
}
}
Expand All @@ -315,6 +320,7 @@ If a destructive operation is deemed necessary:
"input": {
"connection": "MY_POSTGRES",
"query": "SELECT version()",
"description": "Check PostgreSQL server version for compatibility verification",
"transient": true
}
}
Expand Down Expand Up @@ -365,7 +371,8 @@ If a destructive operation is deemed necessary:
"action": "query",
"input": {
"connection": "MY_DB",
"query": "SELECT * FROM production.users LIMIT 3"
"query": "SELECT * FROM production.users LIMIT 3",
"description": "Sample 3 rows from production.users to inspect data values and format"
}
}
```
Expand All @@ -378,7 +385,8 @@ If a destructive operation is deemed necessary:
"action": "query",
"input": {
"connection": "MY_DB",
"query": "SELECT COUNT(*) as total_records FROM sales.orders"
"query": "SELECT COUNT(*) as total_records FROM sales.orders",
"description": "Get total row count of sales.orders to understand data volume"
}
}
```
Expand All @@ -389,7 +397,8 @@ If a destructive operation is deemed necessary:
"action": "query",
"input": {
"connection": "MY_DB",
"query": "SELECT status, COUNT(*) as count FROM sales.orders GROUP BY status ORDER BY count DESC LIMIT 10"
"query": "SELECT status, COUNT(*) as count FROM sales.orders GROUP BY status ORDER BY count DESC LIMIT 10",
"description": "Analyze order status distribution to understand the breakdown of order states"
}
}
```
Expand All @@ -400,7 +409,8 @@ If a destructive operation is deemed necessary:
"action": "query",
"input": {
"connection": "MY_DB",
"query": "SELECT COUNT(*) as nulls FROM sales.orders WHERE customer_id IS NULL"
"query": "SELECT COUNT(*) as nulls FROM sales.orders WHERE customer_id IS NULL",
"description": "Check for null customer_id values in orders to assess data quality"
}
}
```
Expand Down Expand Up @@ -434,7 +444,8 @@ When working with multiple databases:
"action": "query",
"input": {
"connection": "SOURCE_DB",
"query": "SELECT COUNT(*), MIN(created_at), MAX(created_at) FROM public.users"
"query": "SELECT COUNT(*), MIN(created_at), MAX(created_at) FROM public.users",
"description": "Get row count and date range of source users table for cross-database comparison"
}
}
```
Expand All @@ -451,7 +462,8 @@ When working with multiple databases:
"action": "query",
"input": {
"connection": "MY_POSTGRES",
"query": "SELECT schemaname, tablename, pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size FROM pg_tables WHERE schemaname = 'public' ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC LIMIT 10"
"query": "SELECT schemaname, tablename, pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size FROM pg_tables WHERE schemaname = 'public' ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC LIMIT 10",
"description": "Get top 10 largest tables in public schema by total size for capacity planning"
}
}

Expand All @@ -460,7 +472,8 @@ When working with multiple databases:
"action": "query",
"input": {
"connection": "MY_POSTGRES",
"query": "SELECT indexname, indexdef FROM pg_indexes WHERE tablename = 'users' LIMIT 20"
"query": "SELECT indexname, indexdef FROM pg_indexes WHERE tablename = 'users' LIMIT 20",
"description": "List indexes on users table to review indexing strategy"
}
}
```
Expand All @@ -473,7 +486,8 @@ When working with multiple databases:
"action": "query",
"input": {
"connection": "MY_MYSQL",
"query": "SELECT table_schema, ROUND(SUM(data_length + index_length) / 1024 / 1024, 2) AS 'DB Size in MB' FROM information_schema.tables GROUP BY table_schema"
"query": "SELECT table_schema, ROUND(SUM(data_length + index_length) / 1024 / 1024, 2) AS 'DB Size in MB' FROM information_schema.tables GROUP BY table_schema",
"description": "Get size of each database schema in MB for storage overview"
}
}
```
Expand All @@ -486,7 +500,8 @@ When working with multiple databases:
"action": "query",
"input": {
"connection": "MY_SNOWFLAKE",
"query": "SHOW WAREHOUSES"
"query": "SHOW WAREHOUSES",
"description": "List available Snowflake warehouses and their current status"
}
}

Expand All @@ -495,7 +510,8 @@ When working with multiple databases:
"action": "query",
"input": {
"connection": "MY_SNOWFLAKE",
"query": "SELECT * FROM information_schema.tables WHERE table_schema = 'PUBLIC' LIMIT 10"
"query": "SELECT * FROM information_schema.tables WHERE table_schema = 'PUBLIC' LIMIT 10",
"description": "List tables in PUBLIC schema to review table metadata and clustering info"
}
}
```
Expand All @@ -508,7 +524,8 @@ When working with multiple databases:
"action": "query",
"input": {
"connection": "MY_BIGQUERY",
"query": "SELECT schema_name, catalog_name FROM INFORMATION_SCHEMA.SCHEMATA LIMIT 20"
"query": "SELECT schema_name, catalog_name FROM INFORMATION_SCHEMA.SCHEMATA LIMIT 20",
"description": "List available BigQuery datasets and their parent projects"
}
}

Expand All @@ -517,7 +534,8 @@ When working with multiple databases:
"action": "query",
"input": {
"connection": "MY_BIGQUERY",
"query": "SELECT table_name, partition_id, total_rows FROM `project.dataset.INFORMATION_SCHEMA.PARTITIONS` WHERE table_name = 'my_table' LIMIT 10"
"query": "SELECT table_name, partition_id, total_rows FROM `project.dataset.INFORMATION_SCHEMA.PARTITIONS` WHERE table_name = 'my_table' LIMIT 10",
"description": "Check partition layout and row counts for my_table to understand data distribution"
}
}
```
Expand All @@ -530,7 +548,8 @@ When working with multiple databases:
"action": "query",
"input": {
"connection": "MY_CLICKHOUSE",
"query": "SELECT * FROM system.databases LIMIT 20"
"query": "SELECT * FROM system.databases LIMIT 20",
"description": "List ClickHouse databases to discover available data stores"
}
}

Expand All @@ -539,7 +558,8 @@ When working with multiple databases:
"action": "query",
"input": {
"connection": "MY_CLICKHOUSE",
"query": "SELECT database, name, engine, total_rows FROM system.tables WHERE database = 'default' LIMIT 20"
"query": "SELECT database, name, engine, total_rows FROM system.tables WHERE database = 'default' LIMIT 20",
"description": "List tables in default database with engine types and row counts for overview"
}
}
```
Expand Down
10 changes: 7 additions & 3 deletions cmd/sling/resource/mcp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ tools:
**For `action: "query"`**:
- `connection` (string, required): The name of the database connection to execute the query on
- `query` (string, required): The SQL query to execute
- `description` (string, required): A brief description of the intent and expected result of this query. Explain *why* you are running this query and what the result should tell you. Always provide this when executing a query.
- `limit` (number, optional): The limit of rows to return (defaults to 100)
- `transient` (boolean, optional): Whether to use a transient connection (default: false)

This action executes a SQL query on a database connection and return the results. **WARNING: Only use this tool for SELECT queries and other read-only operations. Never execute destructive queries such as DELETE, DROP, TRUNCATE, ALTER, UPDATE, INSERT, or any other data modification operations.**

* If a destructive operation (e.g., dropping an object, deleting significant data, altering table structures) is deemed necessary, **DO NOT execute it directly**. Instead, formulate the required SQL query/statement and **return it to the USER for manual review and execution**.
Expand All @@ -126,7 +127,8 @@ tools:
"action": "query",
"input": {
"connection": "MY_PG",
"query": "SELECT * FROM users LIMIT 10"
"query": "SELECT * FROM users LIMIT 10",
"description": "Preview first 10 user records to understand table structure and data format"
}
}

Expand Down Expand Up @@ -183,16 +185,18 @@ tools:
"input": {
"connection": "MY_PG",
"query": "SELECT status, COUNT(*) as count FROM orders WHERE created_date >= '2024-01-01' GROUP BY status ORDER BY count DESC LIMIT 50",
"description": "Get order count breakdown by status for 2024 to identify distribution of order states",
"limit": 50
}
}

# Use transient connection for one-off query
{
"action": "query",
"input": {
"connection": "MY_PG",
"query": "SELECT version()",
"description": "Check the PostgreSQL server version for compatibility verification",
"transient": true
}
}
Expand Down
114 changes: 114 additions & 0 deletions cmd/sling/tests/pipelines/p.27.adjust_column_type_expand.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Test that adjust_column_type expands varchar and decimal columns when
# the source schema widens. Source starts with VARCHAR(50)/NUMERIC(10,2),
# then grows to VARCHAR(200)/NUMERIC(18,6). The target should expand
# column types to match, avoiding truncation or precision loss.

steps:
# 1. Create source table with narrow types
- id: setup_source
connection: POSTGRES
query: |
DROP TABLE IF EXISTS public.adjust_type_src CASCADE;
CREATE TABLE public.adjust_type_src (
id INT PRIMARY KEY,
name VARCHAR(50),
amount NUMERIC(10,2),
description VARCHAR(100)
);
INSERT INTO public.adjust_type_src VALUES
(1, 'short', 123.45, 'short desc'),
(2, 'test', 678.90, 'another desc');

# 2. Create target table with same narrow types (simulating existing target)
- id: setup_target
connection: POSTGRES
query: |
DROP TABLE IF EXISTS public.adjust_type_tgt CASCADE;
CREATE TABLE public.adjust_type_tgt (
id INT,
name VARCHAR(50),
amount NUMERIC(10,2),
description VARCHAR(100)
);
INSERT INTO public.adjust_type_tgt VALUES
(1, 'short', 123.45, 'short desc'),
(2, 'test', 678.90, 'another desc');

# 3. Widen source columns and insert data that needs wider columns
- id: widen_source
connection: POSTGRES
query: |
ALTER TABLE public.adjust_type_src ALTER COLUMN name TYPE VARCHAR(200);
ALTER TABLE public.adjust_type_src ALTER COLUMN amount TYPE NUMERIC(18,6);
ALTER TABLE public.adjust_type_src ALTER COLUMN description TYPE VARCHAR(500);
INSERT INTO public.adjust_type_src VALUES
(3, 'this is a much longer name that exceeds fifty characters easily and needs expansion', 123456789.123456, 'this description is long enough to exceed one hundred characters and requires the target column to be expanded to avoid any truncation errors in the database');

- log: "Source setup complete: narrow types widened, wide data inserted"

# 4. Run replication with adjust_column_type enabled
- id: replicate
replication:
source: POSTGRES
target: POSTGRES
streams:
public.adjust_type_src:
object: public.adjust_type_tgt
mode: full-refresh
target_options:
adjust_column_type: true
on_failure: abort

- log: "Replication completed"

# 5. Verify row count
- connection: POSTGRES
query: SELECT COUNT(*) as count FROM public.adjust_type_tgt
into: target_count

- check: int_parse(store.target_count[0].count) == 3
failure_message: "Expected 3 rows, got {store.target_count[0].count}"

# 6. Verify the wide data was inserted without truncation
- connection: POSTGRES
query: SELECT id, name, amount, description FROM public.adjust_type_tgt WHERE id = 3
into: wide_row

- log: |
Wide row data:
name length: {length(store.wide_row[0].name)}
amount: {store.wide_row[0].amount}
description length: {length(store.wide_row[0].description)}

# Verify string was not truncated (original is 85 chars, wider than VARCHAR(50))
- check: length(store.wide_row[0].name) > 50
failure_message: "Name was truncated: length={length(store.wide_row[0].name)}, expected > 50"

# Verify decimal precision was preserved
- check: float_parse(store.wide_row[0].amount) == 123456789.123456
failure_message: "Amount precision lost: got {store.wide_row[0].amount}, expected 123456789.123456"

# Verify description was not truncated (original is > 100 chars)
- check: length(store.wide_row[0].description) > 100
failure_message: "Description was truncated: length={length(store.wide_row[0].description)}, expected > 100"

# 7. Verify target column types were expanded
- connection: POSTGRES
query: |
SELECT column_name, data_type, character_maximum_length, numeric_precision, numeric_scale
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = 'adjust_type_tgt'
ORDER BY ordinal_position
into: target_cols

- log: |
Target column types after replication:
{pretty_table(store.target_cols)}

- log: "SUCCESS: adjust_column_type correctly expanded varchar and decimal columns"

# 8. Clean up
- connection: POSTGRES
query: |
DROP TABLE IF EXISTS public.adjust_type_src CASCADE;
DROP TABLE IF EXISTS public.adjust_type_tgt CASCADE;
11 changes: 10 additions & 1 deletion cmd/sling/tests/suite.cli.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2193,4 +2193,13 @@
run: 'sling run -d -p cmd/sling/tests/pipelines/p.26.duckdb_arrow_ipc_output.yaml'
output_contains:
- 'SUCCESS: DuckDB Arrow IPC output produced correct row count'
- 'DuckDB Arrow IPC output test complete'
- 'DuckDB Arrow IPC output test complete'

# adjust_column_type: expand varchar and decimal columns when source types widen
# Source starts with VARCHAR(50)/NUMERIC(10,2), then grows to VARCHAR(200)/NUMERIC(18,6).
# The target should expand column types to match, avoiding truncation or precision loss.
- id: 229
name: 'adjust_column_type expands varchar and decimal columns'
run: 'sling run -d -p cmd/sling/tests/pipelines/p.27.adjust_column_type_expand.yaml'
output_contains:
- 'SUCCESS: adjust_column_type correctly expanded varchar and decimal columns'
Loading