Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
integration-tests:
environment: integration_tests
runs-on: 32gb-runner
timeout-minutes: 45
timeout-minutes: 90
steps:
- name: Checkout code
uses: actions/checkout@v3
Expand Down
20 changes: 20 additions & 0 deletions drivers/db2/internal/db2_test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,26 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation
require.NoError(t, err, "Failed to insert filtered test data row")
return

case "insert_2":
query = fmt.Sprintf(`
INSERT INTO %s (
col_cursor, col_bigint, col_char, col_character,
col_varchar, col_date, col_decimal,
col_double, col_real, col_int, col_smallint,
col_clob, col_blob, col_timestamp, col_time,
col_graphic, col_vargraphic, col_bool
) VALUES (
7, 12345678901234, 'c', 'char_val',
'varchar_val', DATE('2023-01-01'), 123.45,
123.456789, 123.5, 123, 123,
CLOB('sample text'), BLOB(X'424C4F422044415441204F4E45'),
TIMESTAMP('2023-01-01-12.00.00.000000'),
TIME('12.00.00'),
GRAPHIC('graphic_val'),
VARGRAPHIC('vargraphic_val'),
TRUE
)`, integrationTestTable)

case "update":
query = fmt.Sprintf(`
UPDATE %s SET
Expand Down
5 changes: 3 additions & 2 deletions drivers/mongodb/internal/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,12 @@ func (m *Mongo) StreamChanges(ctx context.Context, streamIndex int, metadataStat
// state >= metadata → state is current or ahead; read forward normally.
if mtState != nil {
// TODO: addition of all the state updations in metadata file even for blank sync scenario
// metadata > state → crash-recovery path (metadata committed but state write failed).
// metadata > state → crash-recovery path (metadata committed but state write failed), no further sync for this stream just update the state to metadata resume token.
// state >= metadata → read forward normally.
if typeutils.Compare(prevResumeToken, mtState) < 0 {
logger.Infof("Stream[%s] metadata ahead of state, using metadata resume token for recovery", stream.ID())
prevResumeToken = mtState
m.cdcCursor.Store(stream.ID(), mtState)
return mtState, nil
}
}

Expand Down
19 changes: 19 additions & 0 deletions drivers/mongodb/internal/mon_test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,25 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation
_, err = collection.InsertOne(ctx, filteredDoc)
require.NoError(t, err, "Failed to insert filtered test data row")

case "insert_2":
doc2 := bson.M{
"id_bigint": int64(123456789012345),
"id_int": int32(100),
"id_timestamp": time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC),
"id_double": float64(123.456),
"id_bool": true,
"id_cursor": int32(7),
"created_timestamp": primitive.Timestamp{T: uint32(1754905992), I: 1},
"id_nil": nil,
"id_regex": primitive.Regex{Pattern: "test.*", Options: "i"},
"id_nested": nestedDoc,
"id_minkey": primitive.MinKey{},
"id_maxkey": primitive.MaxKey{},
"name_varchar": "varchar_val",
}
_, err2 := collection.InsertOne(ctx, doc2)
require.NoError(t, err2, "Failed to insert document (insert_2)")

case "update":
filter := bson.M{"id": int32(1)}
update := bson.M{
Expand Down
37 changes: 35 additions & 2 deletions drivers/mssql/internal/mssql_test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,43 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation
_, err = db.ExecContext(ctx, filteredQuery)
require.NoError(t, err, "failed to insert filtered CDC row")

case "insert_2":
insertTwo := fmt.Sprintf(`
INSERT INTO dbo.%s (
id_cursor,
col_tinyint, col_smallint, col_int, col_bigint,
col_decimal, col_numeric, col_smallmoney, col_money,
col_float, col_real, col_bit,
col_char, col_varchar, col_text, col_nchar, col_nvarchar, col_ntext,
col_date, col_time, col_smalldatetime, col_datetime, col_datetime2, col_datetimeoffset,
col_uniqueidentifier,
col_xml, col_sysname,
col_image, col_hierarchyid, col_sql_variant,
col_int_nullable, col_varchar_nullable, col_datetime2_nullable,
created_at
) VALUES (
7,
3, 5, 10, 19,
123.50, 10.12500, 1.2500, 2.5000,
123.50, 12.50, 1,
'char_val__', 'varchar_val', 'text_val', N'nchar_val_', N'nvarchar_val', N'ntext_val',
'2023-01-01', '12:00:00', '2023-01-01 12:00:00', '2023-01-01 12:00:00',
'2023-01-01 12:00:00', '2023-01-01 12:00:00 +00:00',
'123e4567-e89b-12d3-a456-426614174000',
'<xml>test</xml>', 'sysname_val',
0x43434343,
hierarchyid::Parse('/1/1/'), CAST('variant_base' AS sql_variant),
NULL, NULL, NULL,
'2023-01-01 12:00:00'
);
`, integrationTestTable)
_, err2 := db.ExecContext(ctx, insertTwo)
require.NoError(t, err2, "failed to insert CDC row (insert_2)")

case "update":
updateRow := fmt.Sprintf(`
UPDATE dbo.%s SET
id_cursor = 7,
id_cursor = 100,
col_bigint = 20,
col_decimal = 543.25,
col_money = 9.7500,
Expand All @@ -253,7 +286,7 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation
col_datetime2_nullable = '2024-07-01 15:30:00',
created_at = '2024-07-01 15:30:00',
excludedColumn = 102
WHERE id = 6;
WHERE id = 1;
`, integrationTestTable)
_, err := db.ExecContext(ctx, updateRow)
require.NoError(t, err, "failed to update CDC row")
Expand Down
27 changes: 26 additions & 1 deletion drivers/mysql/internal/mysql_test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,31 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation
require.NoError(t, err, "Failed to insert filtered test data row")
return

case "insert_2":
query = fmt.Sprintf(`
INSERT INTO %s (
id_cursor, id, id_bigint,
id_int, id_int_unsigned, id_integer, id_integer_unsigned,
id_mediumint, id_mediumint_unsigned, id_smallint, id_smallint_unsigned,
id_tinyint, id_tinyint_unsigned, price_decimal, amount_decimal_9_2, price_double,
price_double_precision, price_float, price_numeric, price_real,
name_char, name_varchar, name_text, name_tinytext,
name_mediumtext, name_longtext, created_date,
created_timestamp, is_active,
long_varchar, name_bool, status, priority
) VALUES (
7, 7, 123456789012345,
100, 101, 102, 103,
5001, 5002, 101, 102,
50, 51,
123.45, 5330197.27, 123.456,
123.456, 123.45, 123.45, 123.456,
'c', 'varchar_val', 'text_val', 'tinytext_val',
'mediumtext_val', 'longtext_val', '2023-01-01 12:00:00',
'2023-01-01 12:00:00', 1,
'long_varchar_val', 1, 'active', 'high'
)`, integrationTestTable)

case "update":
query = fmt.Sprintf(`
UPDATE %s SET
Expand All @@ -167,7 +192,7 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation
long_varchar = 'updated long...', name_bool = 0,
status = 'pending', priority = 'low', excludedColumn = 102,
includedColumn = 202
WHERE id = 6`, integrationTestTable)
WHERE id = 1`, integrationTestTable)

case "delete":
query = fmt.Sprintf("DELETE FROM %s WHERE id = 1", integrationTestTable)
Expand Down
17 changes: 17 additions & 0 deletions drivers/oracle/internal/oracle_test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,23 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation
require.NoError(t, err, "Failed to insert filtered test data row")
return

case "insert_2":
query = fmt.Sprintf(`
INSERT INTO %s (
col_cursor, col_bigint, col_char, col_character,
col_varchar2, col_date, col_decimal,
col_double_precision, col_float, col_int, col_smallint,
col_integer, col_clob, col_nclob, col_timestamp, col_timestamptz, col_timestampltz
) VALUES (
7, 123456789012345, 'c', 'char_val',
'varchar_val', TO_DATE('2023-01-01', 'YYYY-MM-DD'), 123.45,
123.456789, 123.5, 123, 123, 12345,
'sample text', 'sample nclob',
TIMESTAMP '2023-01-01 12:00:00',
TIMESTAMP '2023-01-01 12:00:00+00:00',
TIMESTAMP '2023-01-01 12:00:00+05:30'
)`, integrationTestTable)

case "update":
query = fmt.Sprintf(`
UPDATE %s SET
Expand Down
10 changes: 7 additions & 3 deletions drivers/postgres/internal/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,13 @@ func (p *Postgres) StreamChanges(ctx context.Context, _ int, metadataStates map[
// persist replicator for post cdc
p.replicator = replicator

// validate global state (might got invalid during full load)
if err := validateGlobalState(postgresGlobalState, slot.LSN); err != nil {
return nil, fmt.Errorf("%s: invalid global state: %s", constants.ErrNonRetryable, err)
// validateGlobalState ensures slot and state agree before WAL replay begins.
// Skip it when remainingStreams is empty: all streams are already committed in
// Iceberg, so no WAL will be replayed and the slot/state relationship is irrelevant.
if len(remainingStreams) > 0 {
if err := validateGlobalState(postgresGlobalState, slot.LSN); err != nil {
return nil, fmt.Errorf("%s: invalid global state: %s", constants.ErrNonRetryable, err)
}
}

// choose replicator via factory based on OutputPlugin config (default wal2json)
Expand Down
24 changes: 24 additions & 0 deletions drivers/postgres/internal/postgres_test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,30 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation
require.NoError(t, err, "Failed to insert filtered test data row")
return

case "insert_2":
query = fmt.Sprintf(`
INSERT INTO %s (
col_cursor, col_bigint, col_bool, col_char, col_character,
col_character_varying, col_date, col_decimal,
col_double_precision, col_float4, col_int, col_int2,
col_integer, col_interval, col_json, col_jsonb,
col_name, col_numeric, col_real, col_text,
col_timestamp, col_timestamptz, col_uuid, col_varbit, col_xml,
col_point, col_polygon, col_circle
) VALUES (
7, 123456789012345, TRUE, 'c', 'charac_val',
'varchar_val', '2023-01-01', 123.45,
123.456789, 123.45, 123, 123, 12345,
'1 hour', '{"key": "value"}', '{"key": "value"}',
'test_name', 123.45, 123.45, 'sample text',
'2023-01-01 12:00:00', '2023-01-01 12:00:00+00',
'123e4567-e89b-12d3-a456-426614174000', B'101010',
'<tag>value</tag>',
'(10.5,20.5)'::point,
'((0,0),(10,0),(10,10),(0,10),(0,0))'::polygon,
'<(5,5),3.5>'::circle
)`, integrationTestTable)

case "update":
query = fmt.Sprintf(`
UPDATE %s SET
Expand Down
Loading
Loading