Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
c9dc5d3
feat: implement filter in cdc
vikaxsh Jan 14, 2026
b3195f8
feat: add log for flatten record count
vikaxsh Jan 15, 2026
9cdb42a
fix: github workflows
vikaxsh Jan 15, 2026
06be111
Merge branch 'staging' of https://github.com/datazip-inc/olake into f…
vikaxsh Jan 15, 2026
7be9043
fix: lint issue
vikaxsh Jan 15, 2026
485cf52
fix: lint issue
vikaxsh Jan 15, 2026
4bc38e9
feat: filter only for cdc and backfill
vikaxsh Jan 16, 2026
adb97a5
Merge branch 'staging' of https://github.com/datazip-inc/olake into f…
vikaxsh Jan 16, 2026
1dab2d1
fix: integration test for incremental
vikaxsh Jan 16, 2026
69dee47
Merge branch 'staging' of https://github.com/datazip-inc/olake into f…
vikaxsh Jan 19, 2026
4d681ac
feat: add filter in full load for specific drivers
vikaxsh Jan 19, 2026
ab0848f
feat: add unit test for filter
vikaxsh Jan 19, 2026
d12ec1b
fix: add mongo integration test
vikaxsh Jan 20, 2026
b163ac6
fix: skip filtring of delete records
vikaxsh Jan 20, 2026
855eae0
fix: add mysql, postgres, oracle integration test
vikaxsh Jan 20, 2026
4ec7731
fix: integration test
vikaxsh Jan 20, 2026
4a974c4
Merge branch 'staging' of https://github.com/datazip-inc/olake into f…
vikaxsh Jan 20, 2026
94f9d89
fix: integration test
vikaxsh Jan 20, 2026
0f1f35c
fix: integration test for oracle
vikaxsh Jan 20, 2026
7da5ccb
fix: integration test for oracle
vikaxsh Jan 20, 2026
cfacf76
fix: integration test for oracle
vikaxsh Jan 20, 2026
57514bc
chore: cleanup
vikaxsh Jan 21, 2026
6d7ed0b
Merge branch 'staging' of https://github.com/datazip-inc/olake into f…
vikaxsh Jan 21, 2026
5af80db
fix: lint issue
vikaxsh Jan 21, 2026
91ba0f2
Merge branch 'staging' into feat/filter-in-cdc&incremental
vaibhav-datazip Jan 23, 2026
ea4939e
Merge branch 'staging' of https://github.com/datazip-inc/olake into f…
vikaxsh Jan 27, 2026
52e3ff0
chore: resolve review comments
vikaxsh Jan 27, 2026
98da6ab
chore: improve comments
vikaxsh Jan 27, 2026
a8e88d3
fix: oracle integration test
vikaxsh Jan 27, 2026
3c497bd
Merge branch 'staging' into feat/filter-in-cdc&incremental
vikaxsh Jan 27, 2026
df4cf0f
Merge branch 'staging' into feat/filter-in-cdc&incremental
vaibhav-datazip Jan 28, 2026
4a3cba1
Merge branch 'staging' into feat/filter-in-cdc&incremental
vishalm0509 Jan 29, 2026
6e200d2
chore: add unit test for GetFilter
vikaxsh Jan 29, 2026
26d8311
Merge branch 'feat/filter-in-cdc&incremental' of https://github.com/d…
vikaxsh Jan 29, 2026
680b680
fix: db2 integration test
vikaxsh Jan 29, 2026
cab6da9
Merge branch 'staging' into feat/filter-in-cdc&incremental
vikaxsh Jan 29, 2026
dd771cf
Merge branch 'staging' into feat/filter-in-cdc&incremental
vishalm0509 Feb 3, 2026
db5da83
Merge branch 'staging' into feat/filter-in-cdc&incremental
vishalm0509 Feb 3, 2026
fb6e239
chore: add test for delete op in record
vikaxsh Feb 4, 2026
af45b34
chore: add apply filter bool
vikaxsh Feb 4, 2026
a550bef
Merge branch 'staging' of https://github.com/datazip-inc/olake into f…
vikaxsh Feb 11, 2026
8c9c426
fix: merge staging
vikaxsh Feb 11, 2026
675c390
fix: rm debug log
vikaxsh Feb 11, 2026
6d84188
Merge branch 'staging' of https://github.com/datazip-inc/olake into f…
vikaxsh Feb 16, 2026
496e61e
feat: added an todo
vikaxsh Feb 16, 2026
fe91a6e
feat: maintain backward compatablity
vikaxsh Feb 16, 2026
bf7ff02
chore: move filter in typeutils pkg
vikaxsh Feb 20, 2026
909b773
chore: keep legacy filter code in mongo backfill
vikaxsh Feb 22, 2026
8d5b92d
Merge branch 'staging' of https://github.com/datazip-inc/olake into f…
vikaxsh Feb 22, 2026
498c136
fix: unit test for filter
vikaxsh Feb 22, 2026
2cc51da
fix: add filter value parsing in filter
vikaxsh Feb 23, 2026
db18902
fix: add filter unit tests again
vikaxsh Feb 25, 2026
8b16aa0
fix: lint issue
vikaxsh Feb 25, 2026
da7cb54
chore: change name to filter config
vikaxsh Feb 25, 2026
f84574f
fix: monog backfill filter
vikaxsh Feb 25, 2026
c9d1eb3
fix: mongo incr itegration test
vikaxsh Feb 25, 2026
d6b39cf
chore: clear destination in change of filter
vikaxsh Feb 26, 2026
bae5499
fix: unit test
vikaxsh Feb 26, 2026
ca51f9d
fix: delete integration test
vikaxsh Feb 26, 2026
fe3795b
fix: delete integration test for parquet
vikaxsh Feb 26, 2026
53992f4
fix: delete integration test for parquet
vikaxsh Feb 26, 2026
2ff69f8
Merge branch 'staging' into feat/filter-in-cdc&incremental
vaibhav-datazip Feb 27, 2026
88e764a
Merge branch 'staging' into feat/filter-in-cdc&incremental
vikaxsh Mar 4, 2026
0db60b1
fix: reject invalid timestamp filter values instead of silently using…
vikaxsh Mar 10, 2026
fc6b34b
Merge branch 'feat/filter-in-cdc&incremental' of https://github.com/d…
vikaxsh Mar 10, 2026
5b9391d
Merge branch 'staging' of https://github.com/datazip-inc/olake into f…
vikaxsh Mar 12, 2026
1e4ac95
chore: merge staging
vikaxsh Mar 12, 2026
4c6926d
fix; integration test
vikaxsh Mar 12, 2026
0c59ec9
fix: add db2 integration test
vikaxsh Mar 13, 2026
a885bb3
Merge branch 'staging' of https://github.com/datazip-inc/olake into f…
vikaxsh Mar 16, 2026
5ca6f0b
fix: disable new filter in case of normalization false
vikaxsh Mar 16, 2026
4b9ccbf
chore: add filter condition validation
vikaxsh Mar 16, 2026
c3813b5
Merge branch 'staging' into feat/filter-in-cdc&incremental
vikaxsh Mar 16, 2026
bcf630d
fix: remove race condition
vikaxsh Mar 16, 2026
b724818
Merge branch 'feat/filter-in-cdc&incremental' of https://github.com/d…
vikaxsh Mar 16, 2026
905e042
Merge branch 'staging' into feat/filter-in-cdc&incremental
shubham19may Mar 18, 2026
107a1c8
Merge branch 'staging' into feat/filter-in-cdc&incremental
hash-data Mar 20, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
integration-tests:
environment: integration_tests
runs-on: 32gb-runner
timeout-minutes: 30
timeout-minutes: 45
steps:
- name: Checkout code
uses: actions/checkout@v3
Expand Down
2 changes: 2 additions & 0 deletions constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const (
MSSQL DriverType = "mssql"
)

// Drivers where filters are applied in memory after full refresh data is read.
var FullRefreshPostReadFilterDrivers = []DriverType{S3, Kafka}
var RelationalDrivers = []DriverType{Postgres, MySQL, Oracle, DB2, MSSQL}

var ParallelCDCDrivers = []DriverType{MongoDB, MSSQL}
Expand Down
14 changes: 13 additions & 1 deletion destination/iceberg/iceberg.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,19 @@ func (i *Iceberg) FlattenAndCleanData(ctx context.Context, records []types.RawRe
return false, nil, nil, fmt.Errorf("failed to extract schema from records: %s", err)
}

return schemaDifference, records, recordsSchema, err
filter, isLegacy, err := i.stream.GetFilter()
if err != nil {
return false, nil, nil, fmt.Errorf("failed to parse stream filter: %s", err)
}

if i.options.ApplyFilter {
records, err = typeutils.FilterRecords(ctx, records, filter, isLegacy, recordsSchema)
if err != nil {
return false, nil, nil, fmt.Errorf("failed to filter records: %s", err)
}
}

return schemaDifference, records, recordsSchema, nil
Comment thread
vaibhav-datazip marked this conversation as resolved.
}

// compares with global schema and update schema in destination accordingly
Expand Down
17 changes: 15 additions & 2 deletions destination/parquet/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,22 @@ func (p *Parquet) FlattenAndCleanData(ctx context.Context, records []types.RawRe
}
}

return schemaChange, records, p.schema, utils.Concurrent(ctx, records, runtime.GOMAXPROCS(0)*16, func(_ context.Context, record types.RawRecord, _ int) error {
if err := utils.Concurrent(ctx, records, runtime.GOMAXPROCS(0)*16, func(_ context.Context, record types.RawRecord, _ int) error {
return typeutils.ReformatRecord(p.schema, record.Data)
})
}); err != nil {
return false, nil, nil, fmt.Errorf("failed to reformat records: %s", err)
}
filter, isLegacy, err := p.stream.GetFilter()
if err != nil {
return false, nil, nil, fmt.Errorf("failed to parse stream filter: %s", err)
}
if p.options.ApplyFilter {
records, err = typeutils.FilterRecords(ctx, records, filter, isLegacy, p.schema)
if err != nil {
return false, nil, nil, fmt.Errorf("failed to filter records: %s", err)
}
}
return schemaChange, records, p.schema, nil
}

// EvolveSchema updates the schema based on changes. Need to pass olakeTimestamp to get the correct partition path based on record ingestion time.
Expand Down
20 changes: 14 additions & 6 deletions destination/writers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ type (
WriterOption func(Writer) error

Options struct {
Identifier string
Number int64
Backfill bool
ThreadID string
Identifier string
Number int64
Backfill bool
ThreadID string
ApplyFilter bool
}

ThreadOptions func(opt *Options)
Expand All @@ -34,6 +35,7 @@ type (
Stats struct {
TotalRecordsToSync atomic.Int64 // total record that are required to sync
ReadCount atomic.Int64 // records that got read
RecordsFiltered atomic.Int64 // records that got filtered
Comment thread
vaibhav-datazip marked this conversation as resolved.
ThreadCount atomic.Int64 // total number of writer threads
}

Expand Down Expand Up @@ -83,6 +85,11 @@ func WithThreadID(threadID string) ThreadOptions {
opt.ThreadID = threadID
}
}
func WithApplyFilter(applyFilter bool) ThreadOptions {
return func(opt *Options) {
opt.ApplyFilter = applyFilter
}
}
Comment thread
vishalm0509 marked this conversation as resolved.

func NewWriterPool(ctx context.Context, config *types.WriterConfig, syncStreams []string, batchSize int64) (*WriterPool, error) {
newfunc, found := RegisteredWriters[config.Type]
Expand All @@ -105,6 +112,7 @@ func NewWriterPool(ctx context.Context, config *types.WriterConfig, syncStreams
TotalRecordsToSync: atomic.Int64{},
ThreadCount: atomic.Int64{},
ReadCount: atomic.Int64{},
RecordsFiltered: atomic.Int64{},
},
config: config.WriterConfig,
init: newfunc,
Expand Down Expand Up @@ -230,12 +238,12 @@ func (wt *WriterThread) flush(ctx context.Context, buf []types.RawRecord) (err e
// create flush context
flushCtx, cancel := context.WithCancel(ctx)
defer cancel()

recordsCountBeforeFiltering := len(buf)
evolution, buf, threadSchema, err := wt.writer.FlattenAndCleanData(flushCtx, buf)
if err != nil {
return fmt.Errorf("failed to flatten and clean data: %s", err)
}

wt.stats.RecordsFiltered.Add(int64(recordsCountBeforeFiltering - len(buf)))
Comment thread
vishalm0509 marked this conversation as resolved.
// TODO: after flattening record type raw_record not make sense
if evolution {
wt.streamArtifact.mu.Lock()
Expand Down
2 changes: 1 addition & 1 deletion drivers/abstract/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (a *AbstractDriver) Backfill(mainCtx context.Context, backfilledStreams cha
defer backfillCtxCancel()

threadID := generateThreadID(stream.ID(), fmt.Sprintf("min[%v]-max[%v]", chunk.Min, chunk.Max))
inserter, prevMetadataState, err := pool.NewWriter(backfillCtx, stream, destination.WithBackfill(true), destination.WithThreadID(threadID))
inserter, prevMetadataState, err := pool.NewWriter(backfillCtx, stream, destination.WithBackfill(true), destination.WithThreadID(threadID), destination.WithApplyFilter(slices.Contains(constants.FullRefreshPostReadFilterDrivers, constants.DriverType(a.driver.Type()))))
if err != nil {
return fmt.Errorf("failed to create new writer thread: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion drivers/abstract/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (a *AbstractDriver) streamChanges(mainCtx context.Context, pool *destinatio

for _, stream := range streams {
threadID := generateThreadID(stream.ID(), "")
w, writerMeta, createErr := pool.NewWriter(cdcCtx, stream, destination.WithThreadID(threadID))
w, writerMeta, createErr := pool.NewWriter(cdcCtx, stream, destination.WithThreadID(threadID), destination.WithApplyFilter(true))
if createErr != nil {
return fmt.Errorf("failed to create CDC writer for stream %s: %s", stream.ID(), createErr)
}
Expand Down
2 changes: 1 addition & 1 deletion drivers/abstract/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (a *AbstractDriver) Incremental(mainCtx context.Context, pool *destination.
defer incrementalCtxCancel()

threadID := generateThreadID(stream.ID(), fmt.Sprintf("%v_%v", maxPrimaryCursorValue, maxSecondaryCursorValue))
inserter, prevMetadataState, err := pool.NewWriter(incrementalCtx, stream, destination.WithThreadID(threadID))
inserter, prevMetadataState, err := pool.NewWriter(incrementalCtx, stream, destination.WithThreadID(threadID), destination.WithApplyFilter(true))
if err != nil {
return fmt.Errorf("failed to create new writer thread: %s", err)
}
Expand Down
15 changes: 15 additions & 0 deletions drivers/db2/internal/db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,21 @@ func TestDB2Integration(t *testing.T) {
DestinationDB: "db2_testdb_db2inst1",
CursorField: "COL_CURSOR:COL_TIMESTAMP",
PartitionRegex: "/{id, identity}",
FilterConfig: `{
"logical_operator": "And",
"conditions": [
{
"column": "COL_DOUBLE",
"operator": "<",
"value": 239834.89
},
{
"column": "COL_TIMESTAMP",
"operator": ">=",
"value": "2022-07-01T15:30:00.000+00:00"
}
]
}`,
}
testConfig.TestIntegration(t)
}
45 changes: 45 additions & 0 deletions drivers/db2/internal/db2_test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,30 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation
VARGRAPHIC('vargraphic_val'),
TRUE
)`, integrationTestTable)
_, err = db.ExecContext(ctx, query)
require.NoError(t, err, "Failed to execute %s operation", operation)
// insert a filtered row — timestamp is before the filter threshold, so it won't be synced
filteredQuery := fmt.Sprintf(`
INSERT INTO %s (
col_cursor, col_bigint, col_char, col_character,
col_varchar, col_date, col_decimal,
col_double, col_real, col_int, col_smallint,
col_clob, col_blob, col_timestamp, col_time,
col_graphic, col_vargraphic, col_bool
) VALUES (
-1, 111111111111111, 'x', 'filtered',
'filtered_val', DATE('2022-06-15'), 50.123,
50.123, 50.0, 0, 0,
CLOB('filtered text'), BLOB(X'00'),
TIMESTAMP('2022-06-15-10.00.00.000000'),
TIME('10.00.00'),
GRAPHIC('filtered'),
VARGRAPHIC('filtered'),
FALSE
)`, integrationTestTable)
_, err = db.ExecContext(ctx, filteredQuery)
require.NoError(t, err, "Failed to insert filtered test data row")
return

case "update":
query = fmt.Sprintf(`
Expand Down Expand Up @@ -149,6 +173,27 @@ func insertTestData(t *testing.T, ctx context.Context, db *sqlx.DB, tableName st
_, err := db.ExecContext(ctx, query)
require.NoError(t, err, "Failed to insert test data")
}
// insert a filtered row — timestamp is before the filter threshold, so it won't be synced
filteredQuery := fmt.Sprintf(`
INSERT INTO %s (
col_cursor, col_bigint, col_char, col_character,
col_varchar, col_date, col_decimal,
col_double, col_real, col_int, col_smallint,
col_clob, col_blob, col_timestamp, col_time,
col_graphic, col_vargraphic, col_bool
) VALUES (
-1, 111111111111111, 'x', 'filtered',
'filtered_val', DATE('2021-06-15'), 500234.123,
500234.123, 500234.0, 0, 0,
CLOB('filtered text'), BLOB(X'00'),
TIMESTAMP('2021-06-15-10.00.00.000000'),
TIME('10.00.00'),
GRAPHIC('filtered'),
VARGRAPHIC('filtered'),
FALSE
)`, tableName)
_, err := db.ExecContext(ctx, filteredQuery)
require.NoError(t, err, "Failed to insert filtered test data row")
}

var ExpectedDB2Data = map[string]interface{}{
Expand Down
101 changes: 69 additions & 32 deletions drivers/mongodb/internal/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func generateMinObjectID(t time.Time) string {
return objectID.Hex()
}

func buildMongoCondition(cond types.Condition) bson.D {
func buildMongoCondition(isLegacy bool, cond interface{}) bson.D {
opMap := map[string]string{
">": "$gt",
">=": "$gte",
Expand All @@ -398,36 +398,75 @@ func buildMongoCondition(cond types.Condition) bson.D {
"=": "$eq",
"!=": "$ne",
}
//TODO: take val as any type
value := func(field, val string) interface{} {
// Handle unquoted null
if val == "null" {
return nil
}
c := cond.(types.FilterCondition)
// legacy filter condition
if isLegacy {
value := func(field, val string) interface{} {
if val == "null" {
return nil
}

if strings.HasPrefix(val, "\"") && strings.HasSuffix(val, "\"") {
val = val[1 : len(val)-1]
}
if field == "_id" && len(val) == 24 {
if oid, err := primitive.ObjectIDFromHex(val); err == nil {
return oid
if strings.HasPrefix(val, "\"") && strings.HasSuffix(val, "\"") {
val = val[1 : len(val)-1]
}

if field == "_id" && len(val) == 24 {
if oid, err := primitive.ObjectIDFromHex(val); err == nil {
return oid
}
}

if strings.EqualFold(val, "true") || strings.EqualFold(val, "false") {
return strings.EqualFold(val, "true")
}

if timeVal, err := typeutils.ReformatDate(val, false); err == nil {
return timeVal
}

if intVal, err := typeutils.ReformatInt64(val); err == nil {
return intVal
}

if floatVal, err := typeutils.ReformatFloat64(val); err == nil {
return floatVal
}

return val
}(c.Column, c.Value.(string))

return bson.D{
{Key: c.Column, Value: bson.D{
{Key: opMap[c.Operator], Value: value},
}},
}
if strings.ToLower(val) == "true" || strings.ToLower(val) == "false" {
return strings.ToLower(val) == "true"
}
if timeVal, err := typeutils.ReformatDate(val, false); err == nil {
return timeVal
}
if intVal, err := typeutils.ReformatInt64(val); err == nil {
return intVal
}
if floatVal, err := typeutils.ReformatFloat64(val); err == nil {
return floatVal
}

var value interface{}
if v, ok := c.Value.(string); ok {
//For string values, attempt type conversion based on field characteristics
//This handles cases like timestamp strings, ObjectIDs etc.
if c.Column == "_id" && len(v) == 24 {
if oid, err := primitive.ObjectIDFromHex(v); err == nil {
value = oid
} else {
value = v
}
} else if timeVal, err := typeutils.ReformatDate(v, false); err == nil {
value = timeVal
} else {
value = c.Value
}
return val
}(cond.Column, cond.Value)
return bson.D{{Key: cond.Column, Value: bson.D{{Key: opMap[cond.Operator], Value: value}}}}
} else {
// already typed (nil, bool, int, float, etc.)
value = c.Value
}

return bson.D{
{Key: c.Column, Value: bson.D{
{Key: opMap[c.Operator], Value: value},
}},
}
}

// buildFilter generates a BSON document for MongoDB by combining threshold conditions with user-defined filter conditions
Expand All @@ -437,7 +476,7 @@ func (m *Mongo) buildFilter(stream types.StreamInterface) (bson.D, error) {
return nil, fmt.Errorf("failed to create threshold filter: %s", err)
}

filter, err := stream.GetFilter()
filter, isLegacy, err := stream.GetFilter()
if err != nil {
return nil, fmt.Errorf("failed to parse stream filter: %s", err)
}
Expand All @@ -451,11 +490,9 @@ func (m *Mongo) buildFilter(stream types.StreamInterface) (bson.D, error) {
case len(filter.Conditions) == 0:
return utils.Ternary(len(allConditions) == 0, bson.D{}, bson.D{{Key: "$and", Value: allConditions}}).(bson.D), nil
case len(filter.Conditions) == 1:
allConditions = append(allConditions, buildMongoCondition(filter.Conditions[0]))
allConditions = append(allConditions, buildMongoCondition(isLegacy, filter.Conditions[0]))
case len(filter.Conditions) == 2:
allConditions = append(allConditions, bson.D{{Key: "$" + filter.LogicalOperator, Value: bson.A{buildMongoCondition(filter.Conditions[0]), buildMongoCondition(filter.Conditions[1])}}})
default:
return nil, fmt.Errorf("multiple conditions are not supported in filter")
allConditions = append(allConditions, bson.D{{Key: "$" + filter.LogicalOperator, Value: bson.A{buildMongoCondition(isLegacy, filter.Conditions[0]), buildMongoCondition(isLegacy, filter.Conditions[1])}}})
}

return bson.D{{Key: "$and", Value: allConditions}}, nil
Expand Down
Loading
Loading