From 52e46ff0ca389a83e9bb708d67ae6294a200ca81 Mon Sep 17 00:00:00 2001 From: Jon Zimbel Date: Thu, 4 Jun 2026 15:46:39 -0400 Subject: [PATCH 01/12] refactor: Split GTFS import, validation into distinct functions --- lib/arrow/gtfs.ex | 116 +++++++++++++++------------- lib/arrow/gtfs/validation_worker.ex | 4 +- 2 files changed, 66 insertions(+), 54 deletions(-) diff --git a/lib/arrow/gtfs.ex b/lib/arrow/gtfs.ex index a967455e5..6a1f0248b 100644 --- a/lib/arrow/gtfs.ex +++ b/lib/arrow/gtfs.ex @@ -1,6 +1,6 @@ defmodule Arrow.Gtfs do @moduledoc """ - GTFS import logic. + GTFS import and validation logic. """ alias Arrow.Gtfs.Importable alias Arrow.Gtfs.JobHelper @@ -13,68 +13,83 @@ defmodule Arrow.Gtfs do @import_timeout_ms :timer.minutes(10) @doc """ - Loads a GTFS archive into Arrow's gtfs_* DB tables, - replacing the previous archive's data. + Loads a GTFS archive into Arrow's gtfs_* DB tables, replacing the previous + archive's data. - `job` is the Oban job running this import, or `nil` if the import is being - run directly, e.g. by `mix import_gtfs`. - - Options: - - `:validate_only?` - Set true to have the transaction roll back - instead of committing, even if all queries succeed. - - Returns: - - - `:ok` on successful import or validation, or skipped import due to unchanged version. - - `{:error, reason}` if the import or validation failed. + `job` is the Oban job running this import, or `nil` if the import is being run + directly, e.g. by `mix import_gtfs`. """ - @spec import(Unzip.t(), String.t(), Oban.Job.t() | nil, Keyword.t()) :: - :ok | {:error, term} - def import(unzip, new_version, job \\ nil, opts \\ []) do - validate_only? = Keyword.get(opts, :validate_only?, false) + @spec import(Unzip.t(), String.t(), Oban.Job.t() | nil) :: :ok | {:error, term} + def import(unzip, new_version, job \\ nil) do job_info = job && JobHelper.logging_params(job) - Logger.info("GTFS import or validation job starting #{job_info}") + Logger.info("GTFS import job starting #{job_info}") current_version = - if validate_only? do - "doesn't matter for validation" - else - Arrow.Repo.one( - from info in Arrow.Gtfs.FeedInfo, where: info.id == "mbta-ma-us", select: info.version - ) - end + Arrow.Repo.one( + from info in Arrow.Gtfs.FeedInfo, where: info.id == "mbta-ma-us", select: info.version + ) with :ok <- validate_required_files(unzip), - :ok <- validate_version_change(new_version, current_version) do - case import_transaction(unzip, validate_only?) do - {:ok, _} -> - Logger.info("GTFS import success #{job_info}") - :ok - - {:error, :validation_success} -> - Logger.info("GTFS validation success #{job_info}") - :ok - - {:error, reason} = error -> - Logger.warning("GTFS import or validation failed reason=#{inspect(reason)} #{job_info}") - - error - end + :ok <- validate_version_change(new_version, current_version), + {:ok, _} <- import_transaction(unzip) do + Logger.info("GTFS import success #{job_info}") + :ok else :unchanged -> Logger.info("GTFS import skipped due to unchanged version #{job_info}") - :ok {:error, reason} = error -> - Logger.warning("GTFS import or validation failed reason=#{inspect(reason)} #{job_info}") + Logger.warning("GTFS import failed reason=#{inspect(reason)} #{job_info}") + error + end + end + + defp import_transaction(unzip) do + transaction = fn -> + external_fkeys = get_external_fkeys() + drop_external_fkeys(external_fkeys) + + truncate_all() + import_all(unzip) + + add_external_fkeys(external_fkeys) + end + + {elapsed_ms, result} = + fn -> Repo.transaction(transaction, timeout: @import_timeout_ms) end + |> :timer.tc(:millisecond) + + Logger.info("GTFS archive import transaction completed elapsed_ms=#{elapsed_ms}") + + result + end + @doc """ + Validates a GTFS feed for internal consistency, and consistency with Arrow + disruption data that depends on it. + + `job` is the Oban job running this validation. + """ + @spec validate(Unzip.t(), Oban.Job.t()) :: :ok | {:error, term} + def validate(unzip, job) do + job_info = JobHelper.logging_params(job) + + Logger.info("GTFS validation job starting #{job_info}") + + with :ok <- validate_required_files(unzip), + {:error, :validation_success} <- validate_transaction(unzip) do + Logger.info("GTFS validation success #{job_info}") + :ok + else + {:error, reason} = error -> + Logger.warning("GTFS validation failed reason=#{inspect(reason)} #{job_info}") error end end - defp import_transaction(unzip, validate_only?) do + defp validate_transaction(unzip) do transaction = fn -> external_fkeys = get_external_fkeys() drop_external_fkeys(external_fkeys) @@ -84,20 +99,17 @@ defmodule Arrow.Gtfs do add_external_fkeys(external_fkeys) - if validate_only? do - # Set any deferred constraints to run now, instead of on transaction commit, - # since we don't actually commit the transaction in this case. - _ = Repo.query!("SET CONSTRAINTS ALL IMMEDIATE") - Repo.rollback(:validation_success) - end + # Set any deferred constraints to run now, instead of on transaction commit, + # since we don't actually commit the transaction in this case. + _ = Repo.query!("SET CONSTRAINTS ALL IMMEDIATE") + Repo.rollback(:validation_success) end {elapsed_ms, result} = fn -> Repo.transaction(transaction, timeout: @import_timeout_ms) end |> :timer.tc(:millisecond) - action = if validate_only?, do: "validation", else: "import" - Logger.info("GTFS archive #{action} transaction completed elapsed_ms=#{elapsed_ms}") + Logger.info("GTFS archive validation transaction completed elapsed_ms=#{elapsed_ms}") result end diff --git a/lib/arrow/gtfs/validation_worker.ex b/lib/arrow/gtfs/validation_worker.ex index 73d5a9144..90f165b57 100644 --- a/lib/arrow/gtfs/validation_worker.ex +++ b/lib/arrow/gtfs/validation_worker.ex @@ -20,9 +20,9 @@ defmodule Arrow.Gtfs.ValidationWorker do ] @impl Oban.Worker - def perform(%Oban.Job{args: %{"s3_uri" => s3_uri, "archive_version" => new_version}} = job) do + def perform(%Oban.Job{args: %{"s3_uri" => s3_uri}} = job) do with {:ok, unzip} <- Arrow.Gtfs.Archive.to_unzip_struct(s3_uri) do - Arrow.Gtfs.import(unzip, new_version, job, validate_only?: true) + Arrow.Gtfs.validate(unzip, job) end end From 82e0d8d0bd38af3696bbbe81ab1c9b1238ef8d66 Mon Sep 17 00:00:00 2001 From: Jon Zimbel Date: Thu, 4 Jun 2026 15:52:22 -0400 Subject: [PATCH 02/12] feat: Add ForeignKeyConstraint.internal_constraints/1 --- lib/arrow/repo/foreign_key_constraint.ex | 28 +++++++++++++++++++ .../repo/foreign_key_constraint_test.exs | 21 ++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/lib/arrow/repo/foreign_key_constraint.ex b/lib/arrow/repo/foreign_key_constraint.ex index 03bd7355e..baefcd228 100644 --- a/lib/arrow/repo/foreign_key_constraint.ex +++ b/lib/arrow/repo/foreign_key_constraint.ex @@ -45,6 +45,34 @@ defmodule Arrow.Repo.ForeignKeyConstraint do |> Repo.all() end + @doc """ + Returns foreign key constraints that both originate in, and reference, tables in `tables`. + + For example, given the following foreign key relations: + + foo.bar_id -> bar.id + foo.baz_id -> baz.id + baz.bar_id -> bar.id + + Calling this: + + internal_constraints(["bar", "baz"]) + + Would produce this: + + [ + %ForeignKeyConstraint{origin_table: "baz", referenced_table: "bar"} + ] + """ + @spec internal_constraints(list(String.t() | atom)) :: list(t()) + def internal_constraints(tables) when is_list(tables) do + from(fk in __MODULE__, + where: fk.referenced_table in ^tables, + where: fk.origin_table in ^tables + ) + |> Repo.all() + end + @doc """ Drops a foreign key constraint. diff --git a/test/arrow/repo/foreign_key_constraint_test.exs b/test/arrow/repo/foreign_key_constraint_test.exs index a4efa5b34..61b2232b4 100644 --- a/test/arrow/repo/foreign_key_constraint_test.exs +++ b/test/arrow/repo/foreign_key_constraint_test.exs @@ -92,6 +92,27 @@ defmodule Arrow.Repo.ForeignKeyConstraintTest do end end + describe "constraints_referencing_internal_tables/1" do + test "returns fkeys both originating in and referencing the given tables" do + assert fkeys = ForeignKeyConstraint.internal_constraints(["a", "c"]) + assert length(fkeys) == 2 + [fkey1, fkey2] = Enum.sort_by(fkeys, & &1.name) + + assert %ForeignKeyConstraint{name: "a_c_id_fkey", origin_table: "a", referenced_table: "c"} = + fkey1 + + assert %ForeignKeyConstraint{name: "c_a_id_fkey", origin_table: "c", referenced_table: "a"} = + fkey2 + + assert [fkey] = ForeignKeyConstraint.internal_constraints(["b", "c"]) + + assert %ForeignKeyConstraint{name: "b_c_id_fkey", origin_table: "b", referenced_table: "c"} = + fkey + + assert [] = ForeignKeyConstraint.internal_constraints(["a"]) + end + end + describe "drop/1" do test "drops the given fkey" do assert [c_a_fkey] = ForeignKeyConstraint.external_constraints_referencing_tables(["a"]) From 5a743a82ceb29885b4a7330cc34f243b80acf29d Mon Sep 17 00:00:00 2001 From: Jon Zimbel Date: Thu, 4 Jun 2026 16:34:10 -0400 Subject: [PATCH 03/12] feat: In GTFS feed validation, only check consistency against disruption data --- lib/arrow/gtfs.ex | 58 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 46 insertions(+), 12 deletions(-) diff --git a/lib/arrow/gtfs.ex b/lib/arrow/gtfs.ex index 6a1f0248b..3c7842cd7 100644 --- a/lib/arrow/gtfs.ex +++ b/lib/arrow/gtfs.ex @@ -47,12 +47,14 @@ defmodule Arrow.Gtfs do end defp import_transaction(unzip) do + schemas = importable_schemas() + transaction = fn -> external_fkeys = get_external_fkeys() drop_external_fkeys(external_fkeys) - truncate_all() - import_all(unzip) + truncate(schemas) + import_feed(unzip, schemas) add_external_fkeys(external_fkeys) end @@ -67,8 +69,7 @@ defmodule Arrow.Gtfs do end @doc """ - Validates a GTFS feed for internal consistency, and consistency with Arrow - disruption data that depends on it. + Validates a GTFS feed for relational consistency with Arrow's disruption data. `job` is the Oban job running this validation. """ @@ -90,17 +91,21 @@ defmodule Arrow.Gtfs do end defp validate_transaction(unzip) do + schemas = validation_schemas() + transaction = fn -> external_fkeys = get_external_fkeys() drop_external_fkeys(external_fkeys) + drop_internal_fkeys() - truncate_all() - import_all(unzip) + truncate(schemas) + import_feed(unzip, schemas) + # Only re-add external FKs since we're not concerned with validating the internal consistency of the feed. add_external_fkeys(external_fkeys) # Set any deferred constraints to run now, instead of on transaction commit, - # since we don't actually commit the transaction in this case. + # since we don't actually commit the transaction for validations. _ = Repo.query!("SET CONSTRAINTS ALL IMMEDIATE") Repo.rollback(:validation_success) end @@ -114,15 +119,15 @@ defmodule Arrow.Gtfs do result end - @spec truncate_all() :: :ok - defp truncate_all do - tables = Enum.map_join(importable_schemas(), ", ", & &1.__schema__(:source)) + @spec truncate(list(module)) :: :ok + defp truncate(schemas) do + tables = Enum.map_join(schemas, ", ", & &1.__schema__(:source)) _ = Repo.query!("TRUNCATE #{tables}") :ok end - defp import_all(unzip) do - Enum.each(importable_schemas(), &Importable.import(&1, unzip)) + defp import_feed(unzip, schemas_to_import) do + Enum.each(schemas_to_import, &Importable.import(&1, unzip)) end defp validate_required_files(unzip) do @@ -151,6 +156,7 @@ defmodule Arrow.Gtfs do defp validate_version_change(_new_version, _current_version), do: :ok defp importable_schemas do + # All the Ecto schemas that represent GTFS feed tables. # Listed in the order in which they should be imported. [ Arrow.Gtfs.FeedInfo, @@ -172,6 +178,19 @@ defmodule Arrow.Gtfs do ] end + # For validation, only the feed tables that are referenced by FKs from + # Arrow disruption data tables are imported. + defp validation_schemas do + gtfs_tables_referenced_by_external_fkeys = + get_external_fkeys() + |> MapSet.new(& &1.referenced_table) + + importable_schemas() + |> Enum.filter(fn schema -> + schema.__schema__(:source) in gtfs_tables_referenced_by_external_fkeys + end) + end + defp required_files do importable_schemas() |> Enum.flat_map(& &1.filenames()) @@ -184,6 +203,12 @@ defmodule Arrow.Gtfs do |> ForeignKeyConstraint.external_constraints_referencing_tables() end + defp get_internal_fkeys do + importable_schemas() + |> Enum.map(& &1.__schema__(:source)) + |> ForeignKeyConstraint.internal_constraints() + end + @spec drop_external_fkeys(list(ForeignKeyConstraint.t())) :: :ok defp drop_external_fkeys(external_fkeys) do # To allow all GTFS tables to be truncated, we first need to @@ -202,6 +227,15 @@ defmodule Arrow.Gtfs do :ok end + @spec drop_internal_fkeys() :: :ok + defp drop_internal_fkeys do + internal_fkeys = get_internal_fkeys() + fkey_names = Enum.map_join(internal_fkeys, ",", & &1.name) + Logger.info("temporarily dropping intra-feed internal foreign keys fkey_names=#{fkey_names}") + + Enum.each(internal_fkeys, &ForeignKeyConstraint.drop/1) + end + @spec add_external_fkeys(list(ForeignKeyConstraint.t())) :: :ok defp add_external_fkeys(external_fkeys) do fkey_names = Enum.map_join(external_fkeys, ",", & &1.name) From f1d8d702582a1d95aea0780d5c8c2faac3f595a3 Mon Sep 17 00:00:00 2001 From: Jon Zimbel Date: Thu, 4 Jun 2026 16:40:25 -0400 Subject: [PATCH 04/12] refactor: Clearer code for FK constraint drop + re-add --- lib/arrow/gtfs.ex | 54 +++++++++++++++++++++++------------------------ 1 file changed, 26 insertions(+), 28 deletions(-) diff --git a/lib/arrow/gtfs.ex b/lib/arrow/gtfs.ex index 3c7842cd7..387220dbd 100644 --- a/lib/arrow/gtfs.ex +++ b/lib/arrow/gtfs.ex @@ -50,13 +50,12 @@ defmodule Arrow.Gtfs do schemas = importable_schemas() transaction = fn -> - external_fkeys = get_external_fkeys() - drop_external_fkeys(external_fkeys) + re_add_external_fkeys = drop_external_fkeys() truncate(schemas) import_feed(unzip, schemas) - add_external_fkeys(external_fkeys) + re_add_external_fkeys.() end {elapsed_ms, result} = @@ -94,15 +93,14 @@ defmodule Arrow.Gtfs do schemas = validation_schemas() transaction = fn -> - external_fkeys = get_external_fkeys() - drop_external_fkeys(external_fkeys) + re_add_external_fkeys = drop_external_fkeys() drop_internal_fkeys() truncate(schemas) import_feed(unzip, schemas) # Only re-add external FKs since we're not concerned with validating the internal consistency of the feed. - add_external_fkeys(external_fkeys) + re_add_external_fkeys.() # Set any deferred constraints to run now, instead of on transaction commit, # since we don't actually commit the transaction for validations. @@ -209,11 +207,13 @@ defmodule Arrow.Gtfs do |> ForeignKeyConstraint.internal_constraints() end - @spec drop_external_fkeys(list(ForeignKeyConstraint.t())) :: :ok - defp drop_external_fkeys(external_fkeys) do - # To allow all GTFS tables to be truncated, we first need to + @spec drop_external_fkeys() :: (-> :ok) + defp drop_external_fkeys do + # To allow GTFS tables to be truncated, we first need to # temporarily drop all foreign key constraints referencing them # from non-GTFS tables. + external_fkeys = get_external_fkeys() + fkey_names = Enum.map_join(external_fkeys, ",", & &1.name) Logger.info( @@ -224,33 +224,31 @@ defmodule Arrow.Gtfs do Logger.info("finished dropping external foreign keys referencing GTFS tables") - :ok + re_add_keys = fn -> + Logger.info( + "re-adding external foreign keys referencing GTFS tables fkey_names=#{fkey_names}" + ) + + Enum.each(external_fkeys, fn fkey -> + Logger.info("re-adding foreign key fkey_name=#{fkey.name}") + ForeignKeyConstraint.add(fkey) + end) + + Logger.info("finished re-adding external foreign keys referencing GTFS tables") + + :ok + end + + re_add_keys end @spec drop_internal_fkeys() :: :ok defp drop_internal_fkeys do internal_fkeys = get_internal_fkeys() + fkey_names = Enum.map_join(internal_fkeys, ",", & &1.name) Logger.info("temporarily dropping intra-feed internal foreign keys fkey_names=#{fkey_names}") Enum.each(internal_fkeys, &ForeignKeyConstraint.drop/1) end - - @spec add_external_fkeys(list(ForeignKeyConstraint.t())) :: :ok - defp add_external_fkeys(external_fkeys) do - fkey_names = Enum.map_join(external_fkeys, ",", & &1.name) - - Logger.info( - "re-adding external foreign keys referencing GTFS tables fkey_names=#{fkey_names}" - ) - - Enum.each(external_fkeys, fn fkey -> - Logger.info("re-adding foreign key fkey_name=#{fkey.name}") - ForeignKeyConstraint.add(fkey) - end) - - Logger.info("finished re-adding external foreign keys referencing GTFS tables") - - :ok - end end From 26a49e18b4b5a5357cb840d8f43c2c1b3649a7e9 Mon Sep 17 00:00:00 2001 From: Jon Zimbel Date: Thu, 4 Jun 2026 16:43:15 -0400 Subject: [PATCH 05/12] chore: Downgrade failed validation log from warning to info --- lib/arrow/gtfs.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/arrow/gtfs.ex b/lib/arrow/gtfs.ex index 387220dbd..058aaa111 100644 --- a/lib/arrow/gtfs.ex +++ b/lib/arrow/gtfs.ex @@ -84,7 +84,7 @@ defmodule Arrow.Gtfs do :ok else {:error, reason} = error -> - Logger.warning("GTFS validation failed reason=#{inspect(reason)} #{job_info}") + Logger.info("GTFS validation failed reason=#{inspect(reason)} #{job_info}") error end end From 5737d66d665126b977da58ed4b3f056e6ee6d7b5 Mon Sep 17 00:00:00 2001 From: Jon Zimbel Date: Thu, 4 Jun 2026 16:44:41 -0400 Subject: [PATCH 06/12] feat: Decrease validation worker timeout, remove irrelevant verbose comment --- lib/arrow/gtfs/validation_worker.ex | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/lib/arrow/gtfs/validation_worker.ex b/lib/arrow/gtfs/validation_worker.ex index 90f165b57..6a549ddad 100644 --- a/lib/arrow/gtfs/validation_worker.ex +++ b/lib/arrow/gtfs/validation_worker.ex @@ -26,13 +26,9 @@ defmodule Arrow.Gtfs.ValidationWorker do end end - # A sane timeout to avoid buildup of stuck jobs. This is especially important - # for cases where our RDS instance runs out of credits--it stops the job from - # eating up additional credits as they recharge and keeping the server - # unresponsive for even longer. - # Validation jobs generally take around 2-3 minutes. + # Validation jobs generally take less than a minute. @impl Oban.Worker - def timeout(_job), do: :timer.minutes(10) + def timeout(_job), do: :timer.minutes(5) @spec check_jobs(Arrow.Gtfs.JobHelper.status_filter()) :: list(map) def check_jobs(status_filter) do From 3362cb34c40b1abe95b201057c3bf2f40fd8d877 Mon Sep 17 00:00:00 2001 From: Jon Zimbel Date: Thu, 4 Jun 2026 16:45:02 -0400 Subject: [PATCH 07/12] feat: Misc. comment edits --- lib/arrow/gtfs.ex | 1 + lib/arrow/gtfs/import_worker.ex | 4 ---- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/lib/arrow/gtfs.ex b/lib/arrow/gtfs.ex index 058aaa111..5ca79468c 100644 --- a/lib/arrow/gtfs.ex +++ b/lib/arrow/gtfs.ex @@ -79,6 +79,7 @@ defmodule Arrow.Gtfs do Logger.info("GTFS validation job starting #{job_info}") with :ok <- validate_required_files(unzip), + # (Repo.rollback/1 wraps returned value in an error tuple.) {:error, :validation_success} <- validate_transaction(unzip) do Logger.info("GTFS validation success #{job_info}") :ok diff --git a/lib/arrow/gtfs/import_worker.ex b/lib/arrow/gtfs/import_worker.ex index cac604089..5d3564c4f 100644 --- a/lib/arrow/gtfs/import_worker.ex +++ b/lib/arrow/gtfs/import_worker.ex @@ -26,10 +26,6 @@ defmodule Arrow.Gtfs.ImportWorker do end end - # A sane timeout to avoid buildup of stuck jobs. This is especially important - # for cases where our RDS instance runs out of credits--it stops the job from - # eating up additional credits as they recharge and keeping the server - # unresponsive for even longer. # Import jobs generally take around 5 minutes. @impl Oban.Worker def timeout(_job), do: :timer.minutes(10) From 4efe348723ff5701437e178db5681a50e481b1e4 Mon Sep 17 00:00:00 2001 From: Jon Zimbel Date: Fri, 5 Jun 2026 15:24:24 -0400 Subject: [PATCH 08/12] importable_schemas -> gtfs_schemas --- lib/arrow/gtfs.ex | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/arrow/gtfs.ex b/lib/arrow/gtfs.ex index 5ca79468c..faddfd3f8 100644 --- a/lib/arrow/gtfs.ex +++ b/lib/arrow/gtfs.ex @@ -47,7 +47,7 @@ defmodule Arrow.Gtfs do end defp import_transaction(unzip) do - schemas = importable_schemas() + schemas = gtfs_schemas() transaction = fn -> re_add_external_fkeys = drop_external_fkeys() @@ -154,7 +154,7 @@ defmodule Arrow.Gtfs do defp validate_version_change(version, version), do: :unchanged defp validate_version_change(_new_version, _current_version), do: :ok - defp importable_schemas do + defp gtfs_schemas do # All the Ecto schemas that represent GTFS feed tables. # Listed in the order in which they should be imported. [ @@ -184,26 +184,26 @@ defmodule Arrow.Gtfs do get_external_fkeys() |> MapSet.new(& &1.referenced_table) - importable_schemas() + gtfs_schemas() |> Enum.filter(fn schema -> schema.__schema__(:source) in gtfs_tables_referenced_by_external_fkeys end) end defp required_files do - importable_schemas() + gtfs_schemas() |> Enum.flat_map(& &1.filenames()) |> MapSet.new() end defp get_external_fkeys do - importable_schemas() + gtfs_schemas() |> Enum.map(& &1.__schema__(:source)) |> ForeignKeyConstraint.external_constraints_referencing_tables() end defp get_internal_fkeys do - importable_schemas() + gtfs_schemas() |> Enum.map(& &1.__schema__(:source)) |> ForeignKeyConstraint.internal_constraints() end From 7ac5dba02a3ac675cfcd1905fce452a3635cb23a Mon Sep 17 00:00:00 2001 From: Jon Zimbel Date: Fri, 5 Jun 2026 15:25:54 -0400 Subject: [PATCH 09/12] validation_schemas -> validated_schemas --- lib/arrow/gtfs.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/arrow/gtfs.ex b/lib/arrow/gtfs.ex index faddfd3f8..d78794781 100644 --- a/lib/arrow/gtfs.ex +++ b/lib/arrow/gtfs.ex @@ -91,7 +91,7 @@ defmodule Arrow.Gtfs do end defp validate_transaction(unzip) do - schemas = validation_schemas() + schemas = validated_schemas() transaction = fn -> re_add_external_fkeys = drop_external_fkeys() @@ -179,7 +179,7 @@ defmodule Arrow.Gtfs do # For validation, only the feed tables that are referenced by FKs from # Arrow disruption data tables are imported. - defp validation_schemas do + defp validated_schemas do gtfs_tables_referenced_by_external_fkeys = get_external_fkeys() |> MapSet.new(& &1.referenced_table) From a78931eb6164dc9a330a4ff24cf94520d3f96d43 Mon Sep 17 00:00:00 2001 From: Jon Zimbel Date: Fri, 5 Jun 2026 15:31:47 -0400 Subject: [PATCH 10/12] Fix test description --- test/arrow/repo/foreign_key_constraint_test.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/arrow/repo/foreign_key_constraint_test.exs b/test/arrow/repo/foreign_key_constraint_test.exs index 61b2232b4..8c0c8e2f8 100644 --- a/test/arrow/repo/foreign_key_constraint_test.exs +++ b/test/arrow/repo/foreign_key_constraint_test.exs @@ -92,7 +92,7 @@ defmodule Arrow.Repo.ForeignKeyConstraintTest do end end - describe "constraints_referencing_internal_tables/1" do + describe "internal_constraints/1" do test "returns fkeys both originating in and referencing the given tables" do assert fkeys = ForeignKeyConstraint.internal_constraints(["a", "c"]) assert length(fkeys) == 2 From a600a716b0f73f8d160f55a2bdd4fd37c0da2d6f Mon Sep 17 00:00:00 2001 From: Jon Zimbel Date: Wed, 10 Jun 2026 09:55:33 -0400 Subject: [PATCH 11/12] Revert "refactor: Clearer code for FK constraint drop + re-add" This reverts commit f1d8d702582a1d95aea0780d5c8c2faac3f595a3. --- lib/arrow/gtfs.ex | 54 ++++++++++++++++++++++++----------------------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/lib/arrow/gtfs.ex b/lib/arrow/gtfs.ex index d78794781..3d02aeefd 100644 --- a/lib/arrow/gtfs.ex +++ b/lib/arrow/gtfs.ex @@ -50,12 +50,13 @@ defmodule Arrow.Gtfs do schemas = gtfs_schemas() transaction = fn -> - re_add_external_fkeys = drop_external_fkeys() + external_fkeys = get_external_fkeys() + drop_external_fkeys(external_fkeys) truncate(schemas) import_feed(unzip, schemas) - re_add_external_fkeys.() + add_external_fkeys(external_fkeys) end {elapsed_ms, result} = @@ -94,14 +95,15 @@ defmodule Arrow.Gtfs do schemas = validated_schemas() transaction = fn -> - re_add_external_fkeys = drop_external_fkeys() + external_fkeys = get_external_fkeys() + drop_external_fkeys(external_fkeys) drop_internal_fkeys() truncate(schemas) import_feed(unzip, schemas) # Only re-add external FKs since we're not concerned with validating the internal consistency of the feed. - re_add_external_fkeys.() + add_external_fkeys(external_fkeys) # Set any deferred constraints to run now, instead of on transaction commit, # since we don't actually commit the transaction for validations. @@ -208,13 +210,11 @@ defmodule Arrow.Gtfs do |> ForeignKeyConstraint.internal_constraints() end - @spec drop_external_fkeys() :: (-> :ok) - defp drop_external_fkeys do - # To allow GTFS tables to be truncated, we first need to + @spec drop_external_fkeys(list(ForeignKeyConstraint.t())) :: :ok + defp drop_external_fkeys(external_fkeys) do + # To allow all GTFS tables to be truncated, we first need to # temporarily drop all foreign key constraints referencing them # from non-GTFS tables. - external_fkeys = get_external_fkeys() - fkey_names = Enum.map_join(external_fkeys, ",", & &1.name) Logger.info( @@ -225,31 +225,33 @@ defmodule Arrow.Gtfs do Logger.info("finished dropping external foreign keys referencing GTFS tables") - re_add_keys = fn -> - Logger.info( - "re-adding external foreign keys referencing GTFS tables fkey_names=#{fkey_names}" - ) - - Enum.each(external_fkeys, fn fkey -> - Logger.info("re-adding foreign key fkey_name=#{fkey.name}") - ForeignKeyConstraint.add(fkey) - end) - - Logger.info("finished re-adding external foreign keys referencing GTFS tables") - - :ok - end - - re_add_keys + :ok end @spec drop_internal_fkeys() :: :ok defp drop_internal_fkeys do internal_fkeys = get_internal_fkeys() - fkey_names = Enum.map_join(internal_fkeys, ",", & &1.name) Logger.info("temporarily dropping intra-feed internal foreign keys fkey_names=#{fkey_names}") Enum.each(internal_fkeys, &ForeignKeyConstraint.drop/1) end + + @spec add_external_fkeys(list(ForeignKeyConstraint.t())) :: :ok + defp add_external_fkeys(external_fkeys) do + fkey_names = Enum.map_join(external_fkeys, ",", & &1.name) + + Logger.info( + "re-adding external foreign keys referencing GTFS tables fkey_names=#{fkey_names}" + ) + + Enum.each(external_fkeys, fn fkey -> + Logger.info("re-adding foreign key fkey_name=#{fkey.name}") + ForeignKeyConstraint.add(fkey) + end) + + Logger.info("finished re-adding external foreign keys referencing GTFS tables") + + :ok + end end From 908b4133370548d9ed760c45b9deeee380724fd1 Mon Sep 17 00:00:00 2001 From: Jon Zimbel Date: Wed, 10 Jun 2026 10:02:11 -0400 Subject: [PATCH 12/12] docs: Small comment tweaks --- lib/arrow/gtfs.ex | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/arrow/gtfs.ex b/lib/arrow/gtfs.ex index 3d02aeefd..c17da8bf7 100644 --- a/lib/arrow/gtfs.ex +++ b/lib/arrow/gtfs.ex @@ -102,7 +102,8 @@ defmodule Arrow.Gtfs do truncate(schemas) import_feed(unzip, schemas) - # Only re-add external FKs since we're not concerned with validating the internal consistency of the feed. + # Re-add external FKs only, not internal FKs, since we're not concerned + # with validating the internal consistency of the feed. add_external_fkeys(external_fkeys) # Set any deferred constraints to run now, instead of on transaction commit, @@ -212,9 +213,8 @@ defmodule Arrow.Gtfs do @spec drop_external_fkeys(list(ForeignKeyConstraint.t())) :: :ok defp drop_external_fkeys(external_fkeys) do - # To allow all GTFS tables to be truncated, we first need to - # temporarily drop all foreign key constraints referencing them - # from non-GTFS tables. + # To allow GTFS tables to be truncated, we first need to temporarily drop + # all foreign key constraints referencing them from non-GTFS tables. fkey_names = Enum.map_join(external_fkeys, ",", & &1.name) Logger.info(