diff --git a/lib/arrow/gtfs.ex b/lib/arrow/gtfs.ex index a967455e5..c17da8bf7 100644 --- a/lib/arrow/gtfs.ex +++ b/lib/arrow/gtfs.ex @@ -1,6 +1,6 @@ defmodule Arrow.Gtfs do @moduledoc """ - GTFS import logic. + GTFS import and validation logic. """ alias Arrow.Gtfs.Importable alias Arrow.Gtfs.JobHelper @@ -13,104 +13,123 @@ defmodule Arrow.Gtfs do @import_timeout_ms :timer.minutes(10) @doc """ - Loads a GTFS archive into Arrow's gtfs_* DB tables, - replacing the previous archive's data. + Loads a GTFS archive into Arrow's gtfs_* DB tables, replacing the previous + archive's data. - `job` is the Oban job running this import, or `nil` if the import is being - run directly, e.g. by `mix import_gtfs`. - - Options: - - `:validate_only?` - Set true to have the transaction roll back - instead of committing, even if all queries succeed. - - Returns: - - - `:ok` on successful import or validation, or skipped import due to unchanged version. - - `{:error, reason}` if the import or validation failed. + `job` is the Oban job running this import, or `nil` if the import is being run + directly, e.g. by `mix import_gtfs`. """ - @spec import(Unzip.t(), String.t(), Oban.Job.t() | nil, Keyword.t()) :: - :ok | {:error, term} - def import(unzip, new_version, job \\ nil, opts \\ []) do - validate_only? = Keyword.get(opts, :validate_only?, false) + @spec import(Unzip.t(), String.t(), Oban.Job.t() | nil) :: :ok | {:error, term} + def import(unzip, new_version, job \\ nil) do job_info = job && JobHelper.logging_params(job) - Logger.info("GTFS import or validation job starting #{job_info}") + Logger.info("GTFS import job starting #{job_info}") current_version = - if validate_only? do - "doesn't matter for validation" - else - Arrow.Repo.one( - from info in Arrow.Gtfs.FeedInfo, where: info.id == "mbta-ma-us", select: info.version - ) - end + Arrow.Repo.one( + from info in Arrow.Gtfs.FeedInfo, where: info.id == "mbta-ma-us", select: info.version + ) with :ok <- validate_required_files(unzip), - :ok <- validate_version_change(new_version, current_version) do - case import_transaction(unzip, validate_only?) do - {:ok, _} -> - Logger.info("GTFS import success #{job_info}") - :ok - - {:error, :validation_success} -> - Logger.info("GTFS validation success #{job_info}") - :ok - - {:error, reason} = error -> - Logger.warning("GTFS import or validation failed reason=#{inspect(reason)} #{job_info}") - - error - end + :ok <- validate_version_change(new_version, current_version), + {:ok, _} <- import_transaction(unzip) do + Logger.info("GTFS import success #{job_info}") + :ok else :unchanged -> Logger.info("GTFS import skipped due to unchanged version #{job_info}") - :ok {:error, reason} = error -> - Logger.warning("GTFS import or validation failed reason=#{inspect(reason)} #{job_info}") + Logger.warning("GTFS import failed reason=#{inspect(reason)} #{job_info}") + error + end + end + + defp import_transaction(unzip) do + schemas = gtfs_schemas() + + transaction = fn -> + external_fkeys = get_external_fkeys() + drop_external_fkeys(external_fkeys) + + truncate(schemas) + import_feed(unzip, schemas) + + add_external_fkeys(external_fkeys) + end + + {elapsed_ms, result} = + fn -> Repo.transaction(transaction, timeout: @import_timeout_ms) end + |> :timer.tc(:millisecond) + Logger.info("GTFS archive import transaction completed elapsed_ms=#{elapsed_ms}") + + result + end + + @doc """ + Validates a GTFS feed for relational consistency with Arrow's disruption data. + + `job` is the Oban job running this validation. + """ + @spec validate(Unzip.t(), Oban.Job.t()) :: :ok | {:error, term} + def validate(unzip, job) do + job_info = JobHelper.logging_params(job) + + Logger.info("GTFS validation job starting #{job_info}") + + with :ok <- validate_required_files(unzip), + # (Repo.rollback/1 wraps returned value in an error tuple.) + {:error, :validation_success} <- validate_transaction(unzip) do + Logger.info("GTFS validation success #{job_info}") + :ok + else + {:error, reason} = error -> + Logger.info("GTFS validation failed reason=#{inspect(reason)} #{job_info}") error end end - defp import_transaction(unzip, validate_only?) do + defp validate_transaction(unzip) do + schemas = validated_schemas() + transaction = fn -> external_fkeys = get_external_fkeys() drop_external_fkeys(external_fkeys) + drop_internal_fkeys() - truncate_all() - import_all(unzip) + truncate(schemas) + import_feed(unzip, schemas) + # Re-add external FKs only, not internal FKs, since we're not concerned + # with validating the internal consistency of the feed. add_external_fkeys(external_fkeys) - if validate_only? do - # Set any deferred constraints to run now, instead of on transaction commit, - # since we don't actually commit the transaction in this case. - _ = Repo.query!("SET CONSTRAINTS ALL IMMEDIATE") - Repo.rollback(:validation_success) - end + # Set any deferred constraints to run now, instead of on transaction commit, + # since we don't actually commit the transaction for validations. + _ = Repo.query!("SET CONSTRAINTS ALL IMMEDIATE") + Repo.rollback(:validation_success) end {elapsed_ms, result} = fn -> Repo.transaction(transaction, timeout: @import_timeout_ms) end |> :timer.tc(:millisecond) - action = if validate_only?, do: "validation", else: "import" - Logger.info("GTFS archive #{action} transaction completed elapsed_ms=#{elapsed_ms}") + Logger.info("GTFS archive validation transaction completed elapsed_ms=#{elapsed_ms}") result end - @spec truncate_all() :: :ok - defp truncate_all do - tables = Enum.map_join(importable_schemas(), ", ", & &1.__schema__(:source)) + @spec truncate(list(module)) :: :ok + defp truncate(schemas) do + tables = Enum.map_join(schemas, ", ", & &1.__schema__(:source)) _ = Repo.query!("TRUNCATE #{tables}") :ok end - defp import_all(unzip) do - Enum.each(importable_schemas(), &Importable.import(&1, unzip)) + defp import_feed(unzip, schemas_to_import) do + Enum.each(schemas_to_import, &Importable.import(&1, unzip)) end defp validate_required_files(unzip) do @@ -138,7 +157,8 @@ defmodule Arrow.Gtfs do defp validate_version_change(version, version), do: :unchanged defp validate_version_change(_new_version, _current_version), do: :ok - defp importable_schemas do + defp gtfs_schemas do + # All the Ecto schemas that represent GTFS feed tables. # Listed in the order in which they should be imported. [ Arrow.Gtfs.FeedInfo, @@ -160,23 +180,41 @@ defmodule Arrow.Gtfs do ] end + # For validation, only the feed tables that are referenced by FKs from + # Arrow disruption data tables are imported. + defp validated_schemas do + gtfs_tables_referenced_by_external_fkeys = + get_external_fkeys() + |> MapSet.new(& &1.referenced_table) + + gtfs_schemas() + |> Enum.filter(fn schema -> + schema.__schema__(:source) in gtfs_tables_referenced_by_external_fkeys + end) + end + defp required_files do - importable_schemas() + gtfs_schemas() |> Enum.flat_map(& &1.filenames()) |> MapSet.new() end defp get_external_fkeys do - importable_schemas() + gtfs_schemas() |> Enum.map(& &1.__schema__(:source)) |> ForeignKeyConstraint.external_constraints_referencing_tables() end + defp get_internal_fkeys do + gtfs_schemas() + |> Enum.map(& &1.__schema__(:source)) + |> ForeignKeyConstraint.internal_constraints() + end + @spec drop_external_fkeys(list(ForeignKeyConstraint.t())) :: :ok defp drop_external_fkeys(external_fkeys) do - # To allow all GTFS tables to be truncated, we first need to - # temporarily drop all foreign key constraints referencing them - # from non-GTFS tables. + # To allow GTFS tables to be truncated, we first need to temporarily drop + # all foreign key constraints referencing them from non-GTFS tables. fkey_names = Enum.map_join(external_fkeys, ",", & &1.name) Logger.info( @@ -190,6 +228,15 @@ defmodule Arrow.Gtfs do :ok end + @spec drop_internal_fkeys() :: :ok + defp drop_internal_fkeys do + internal_fkeys = get_internal_fkeys() + fkey_names = Enum.map_join(internal_fkeys, ",", & &1.name) + Logger.info("temporarily dropping intra-feed internal foreign keys fkey_names=#{fkey_names}") + + Enum.each(internal_fkeys, &ForeignKeyConstraint.drop/1) + end + @spec add_external_fkeys(list(ForeignKeyConstraint.t())) :: :ok defp add_external_fkeys(external_fkeys) do fkey_names = Enum.map_join(external_fkeys, ",", & &1.name) diff --git a/lib/arrow/gtfs/import_worker.ex b/lib/arrow/gtfs/import_worker.ex index cac604089..5d3564c4f 100644 --- a/lib/arrow/gtfs/import_worker.ex +++ b/lib/arrow/gtfs/import_worker.ex @@ -26,10 +26,6 @@ defmodule Arrow.Gtfs.ImportWorker do end end - # A sane timeout to avoid buildup of stuck jobs. This is especially important - # for cases where our RDS instance runs out of credits--it stops the job from - # eating up additional credits as they recharge and keeping the server - # unresponsive for even longer. # Import jobs generally take around 5 minutes. @impl Oban.Worker def timeout(_job), do: :timer.minutes(10) diff --git a/lib/arrow/gtfs/validation_worker.ex b/lib/arrow/gtfs/validation_worker.ex index 73d5a9144..6a549ddad 100644 --- a/lib/arrow/gtfs/validation_worker.ex +++ b/lib/arrow/gtfs/validation_worker.ex @@ -20,19 +20,15 @@ defmodule Arrow.Gtfs.ValidationWorker do ] @impl Oban.Worker - def perform(%Oban.Job{args: %{"s3_uri" => s3_uri, "archive_version" => new_version}} = job) do + def perform(%Oban.Job{args: %{"s3_uri" => s3_uri}} = job) do with {:ok, unzip} <- Arrow.Gtfs.Archive.to_unzip_struct(s3_uri) do - Arrow.Gtfs.import(unzip, new_version, job, validate_only?: true) + Arrow.Gtfs.validate(unzip, job) end end - # A sane timeout to avoid buildup of stuck jobs. This is especially important - # for cases where our RDS instance runs out of credits--it stops the job from - # eating up additional credits as they recharge and keeping the server - # unresponsive for even longer. - # Validation jobs generally take around 2-3 minutes. + # Validation jobs generally take less than a minute. @impl Oban.Worker - def timeout(_job), do: :timer.minutes(10) + def timeout(_job), do: :timer.minutes(5) @spec check_jobs(Arrow.Gtfs.JobHelper.status_filter()) :: list(map) def check_jobs(status_filter) do diff --git a/lib/arrow/repo/foreign_key_constraint.ex b/lib/arrow/repo/foreign_key_constraint.ex index 03bd7355e..baefcd228 100644 --- a/lib/arrow/repo/foreign_key_constraint.ex +++ b/lib/arrow/repo/foreign_key_constraint.ex @@ -45,6 +45,34 @@ defmodule Arrow.Repo.ForeignKeyConstraint do |> Repo.all() end + @doc """ + Returns foreign key constraints that both originate in, and reference, tables in `tables`. + + For example, given the following foreign key relations: + + foo.bar_id -> bar.id + foo.baz_id -> baz.id + baz.bar_id -> bar.id + + Calling this: + + internal_constraints(["bar", "baz"]) + + Would produce this: + + [ + %ForeignKeyConstraint{origin_table: "baz", referenced_table: "bar"} + ] + """ + @spec internal_constraints(list(String.t() | atom)) :: list(t()) + def internal_constraints(tables) when is_list(tables) do + from(fk in __MODULE__, + where: fk.referenced_table in ^tables, + where: fk.origin_table in ^tables + ) + |> Repo.all() + end + @doc """ Drops a foreign key constraint. diff --git a/test/arrow/repo/foreign_key_constraint_test.exs b/test/arrow/repo/foreign_key_constraint_test.exs index a4efa5b34..8c0c8e2f8 100644 --- a/test/arrow/repo/foreign_key_constraint_test.exs +++ b/test/arrow/repo/foreign_key_constraint_test.exs @@ -92,6 +92,27 @@ defmodule Arrow.Repo.ForeignKeyConstraintTest do end end + describe "internal_constraints/1" do + test "returns fkeys both originating in and referencing the given tables" do + assert fkeys = ForeignKeyConstraint.internal_constraints(["a", "c"]) + assert length(fkeys) == 2 + [fkey1, fkey2] = Enum.sort_by(fkeys, & &1.name) + + assert %ForeignKeyConstraint{name: "a_c_id_fkey", origin_table: "a", referenced_table: "c"} = + fkey1 + + assert %ForeignKeyConstraint{name: "c_a_id_fkey", origin_table: "c", referenced_table: "a"} = + fkey2 + + assert [fkey] = ForeignKeyConstraint.internal_constraints(["b", "c"]) + + assert %ForeignKeyConstraint{name: "b_c_id_fkey", origin_table: "b", referenced_table: "c"} = + fkey + + assert [] = ForeignKeyConstraint.internal_constraints(["a"]) + end + end + describe "drop/1" do test "drops the given fkey" do assert [c_a_fkey] = ForeignKeyConstraint.external_constraints_referencing_tables(["a"])