Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 112 additions & 65 deletions lib/arrow/gtfs.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule Arrow.Gtfs do
@moduledoc """
GTFS import logic.
GTFS import and validation logic.
"""
alias Arrow.Gtfs.Importable
alias Arrow.Gtfs.JobHelper
Expand All @@ -13,104 +13,123 @@ defmodule Arrow.Gtfs do
@import_timeout_ms :timer.minutes(10)

@doc """
Loads a GTFS archive into Arrow's gtfs_* DB tables,
replacing the previous archive's data.
Loads a GTFS archive into Arrow's gtfs_* DB tables, replacing the previous
archive's data.

`job` is the Oban job running this import, or `nil` if the import is being
run directly, e.g. by `mix import_gtfs`.

Options:
- `:validate_only?` - Set true to have the transaction roll back
instead of committing, even if all queries succeed.

Returns:

- `:ok` on successful import or validation, or skipped import due to unchanged version.
- `{:error, reason}` if the import or validation failed.
`job` is the Oban job running this import, or `nil` if the import is being run
directly, e.g. by `mix import_gtfs`.
"""
@spec import(Unzip.t(), String.t(), Oban.Job.t() | nil, Keyword.t()) ::
:ok | {:error, term}
def import(unzip, new_version, job \\ nil, opts \\ []) do
validate_only? = Keyword.get(opts, :validate_only?, false)
@spec import(Unzip.t(), String.t(), Oban.Job.t() | nil) :: :ok | {:error, term}
def import(unzip, new_version, job \\ nil) do
job_info = job && JobHelper.logging_params(job)

Logger.info("GTFS import or validation job starting #{job_info}")
Logger.info("GTFS import job starting #{job_info}")

current_version =
if validate_only? do
"doesn't matter for validation"
else
Arrow.Repo.one(
from info in Arrow.Gtfs.FeedInfo, where: info.id == "mbta-ma-us", select: info.version
)
end
Arrow.Repo.one(
from info in Arrow.Gtfs.FeedInfo, where: info.id == "mbta-ma-us", select: info.version
)

with :ok <- validate_required_files(unzip),
:ok <- validate_version_change(new_version, current_version) do
case import_transaction(unzip, validate_only?) do
{:ok, _} ->
Logger.info("GTFS import success #{job_info}")
:ok

{:error, :validation_success} ->
Logger.info("GTFS validation success #{job_info}")
:ok

{:error, reason} = error ->
Logger.warning("GTFS import or validation failed reason=#{inspect(reason)} #{job_info}")

error
end
:ok <- validate_version_change(new_version, current_version),
{:ok, _} <- import_transaction(unzip) do
Logger.info("GTFS import success #{job_info}")
:ok
else
:unchanged ->
Logger.info("GTFS import skipped due to unchanged version #{job_info}")

:ok

{:error, reason} = error ->
Logger.warning("GTFS import or validation failed reason=#{inspect(reason)} #{job_info}")
Logger.warning("GTFS import failed reason=#{inspect(reason)} #{job_info}")
error
end
end

defp import_transaction(unzip) do
schemas = gtfs_schemas()

transaction = fn ->
external_fkeys = get_external_fkeys()
drop_external_fkeys(external_fkeys)

truncate(schemas)
import_feed(unzip, schemas)

add_external_fkeys(external_fkeys)
end

{elapsed_ms, result} =
fn -> Repo.transaction(transaction, timeout: @import_timeout_ms) end
|> :timer.tc(:millisecond)

Logger.info("GTFS archive import transaction completed elapsed_ms=#{elapsed_ms}")

result
end

@doc """
Validates a GTFS feed for relational consistency with Arrow's disruption data.

`job` is the Oban job running this validation.
"""
@spec validate(Unzip.t(), Oban.Job.t()) :: :ok | {:error, term}
def validate(unzip, job) do
job_info = JobHelper.logging_params(job)

Logger.info("GTFS validation job starting #{job_info}")

with :ok <- validate_required_files(unzip),
# (Repo.rollback/1 wraps returned value in an error tuple.)
{:error, :validation_success} <- validate_transaction(unzip) do
Logger.info("GTFS validation success #{job_info}")
:ok
else
{:error, reason} = error ->
Logger.info("GTFS validation failed reason=#{inspect(reason)} #{job_info}")
error
end
end

defp import_transaction(unzip, validate_only?) do
defp validate_transaction(unzip) do
schemas = validated_schemas()

transaction = fn ->
external_fkeys = get_external_fkeys()
drop_external_fkeys(external_fkeys)
drop_internal_fkeys()

truncate_all()
import_all(unzip)
truncate(schemas)
import_feed(unzip, schemas)

# Re-add external FKs only, not internal FKs, since we're not concerned
# with validating the internal consistency of the feed.
add_external_fkeys(external_fkeys)

if validate_only? do
# Set any deferred constraints to run now, instead of on transaction commit,
# since we don't actually commit the transaction in this case.
_ = Repo.query!("SET CONSTRAINTS ALL IMMEDIATE")
Repo.rollback(:validation_success)
end
# Set any deferred constraints to run now, instead of on transaction commit,
# since we don't actually commit the transaction for validations.
_ = Repo.query!("SET CONSTRAINTS ALL IMMEDIATE")
Repo.rollback(:validation_success)
end

{elapsed_ms, result} =
fn -> Repo.transaction(transaction, timeout: @import_timeout_ms) end
|> :timer.tc(:millisecond)

action = if validate_only?, do: "validation", else: "import"
Logger.info("GTFS archive #{action} transaction completed elapsed_ms=#{elapsed_ms}")
Logger.info("GTFS archive validation transaction completed elapsed_ms=#{elapsed_ms}")

result
end

@spec truncate_all() :: :ok
defp truncate_all do
tables = Enum.map_join(importable_schemas(), ", ", & &1.__schema__(:source))
@spec truncate(list(module)) :: :ok
defp truncate(schemas) do
tables = Enum.map_join(schemas, ", ", & &1.__schema__(:source))
_ = Repo.query!("TRUNCATE #{tables}")
:ok
end

defp import_all(unzip) do
Enum.each(importable_schemas(), &Importable.import(&1, unzip))
defp import_feed(unzip, schemas_to_import) do
Enum.each(schemas_to_import, &Importable.import(&1, unzip))
end

defp validate_required_files(unzip) do
Expand Down Expand Up @@ -138,7 +157,8 @@ defmodule Arrow.Gtfs do
defp validate_version_change(version, version), do: :unchanged
defp validate_version_change(_new_version, _current_version), do: :ok

defp importable_schemas do
defp gtfs_schemas do
# All the Ecto schemas that represent GTFS feed tables.
# Listed in the order in which they should be imported.
[
Arrow.Gtfs.FeedInfo,
Expand All @@ -160,23 +180,41 @@ defmodule Arrow.Gtfs do
]
end

# For validation, only the feed tables that are referenced by FKs from
# Arrow disruption data tables are imported.
defp validated_schemas do
gtfs_tables_referenced_by_external_fkeys =
get_external_fkeys()
|> MapSet.new(& &1.referenced_table)

gtfs_schemas()
|> Enum.filter(fn schema ->
schema.__schema__(:source) in gtfs_tables_referenced_by_external_fkeys
end)
end

defp required_files do
importable_schemas()
gtfs_schemas()
|> Enum.flat_map(& &1.filenames())
|> MapSet.new()
end

defp get_external_fkeys do
importable_schemas()
gtfs_schemas()
|> Enum.map(& &1.__schema__(:source))
|> ForeignKeyConstraint.external_constraints_referencing_tables()
end

defp get_internal_fkeys do
gtfs_schemas()
|> Enum.map(& &1.__schema__(:source))
|> ForeignKeyConstraint.internal_constraints()
end

@spec drop_external_fkeys(list(ForeignKeyConstraint.t())) :: :ok
defp drop_external_fkeys(external_fkeys) do
# To allow all GTFS tables to be truncated, we first need to
# temporarily drop all foreign key constraints referencing them
# from non-GTFS tables.
# To allow GTFS tables to be truncated, we first need to temporarily drop
# all foreign key constraints referencing them from non-GTFS tables.
fkey_names = Enum.map_join(external_fkeys, ",", & &1.name)

Logger.info(
Expand All @@ -190,6 +228,15 @@ defmodule Arrow.Gtfs do
:ok
end

@spec drop_internal_fkeys() :: :ok
defp drop_internal_fkeys do
internal_fkeys = get_internal_fkeys()
fkey_names = Enum.map_join(internal_fkeys, ",", & &1.name)
Logger.info("temporarily dropping intra-feed internal foreign keys fkey_names=#{fkey_names}")

Enum.each(internal_fkeys, &ForeignKeyConstraint.drop/1)
end

@spec add_external_fkeys(list(ForeignKeyConstraint.t())) :: :ok
defp add_external_fkeys(external_fkeys) do
fkey_names = Enum.map_join(external_fkeys, ",", & &1.name)
Expand Down
4 changes: 0 additions & 4 deletions lib/arrow/gtfs/import_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ defmodule Arrow.Gtfs.ImportWorker do
end
end

# A sane timeout to avoid buildup of stuck jobs. This is especially important
# for cases where our RDS instance runs out of credits--it stops the job from
# eating up additional credits as they recharge and keeping the server
Comment thread
rudiejd marked this conversation as resolved.
# unresponsive for even longer.
# Import jobs generally take around 5 minutes.
@impl Oban.Worker
def timeout(_job), do: :timer.minutes(10)
Expand Down
12 changes: 4 additions & 8 deletions lib/arrow/gtfs/validation_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,15 @@ defmodule Arrow.Gtfs.ValidationWorker do
]

@impl Oban.Worker
def perform(%Oban.Job{args: %{"s3_uri" => s3_uri, "archive_version" => new_version}} = job) do
def perform(%Oban.Job{args: %{"s3_uri" => s3_uri}} = job) do
with {:ok, unzip} <- Arrow.Gtfs.Archive.to_unzip_struct(s3_uri) do
Arrow.Gtfs.import(unzip, new_version, job, validate_only?: true)
Arrow.Gtfs.validate(unzip, job)
end
end

# A sane timeout to avoid buildup of stuck jobs. This is especially important
# for cases where our RDS instance runs out of credits--it stops the job from
# eating up additional credits as they recharge and keeping the server
# unresponsive for even longer.
# Validation jobs generally take around 2-3 minutes.
# Validation jobs generally take less than a minute.
@impl Oban.Worker
def timeout(_job), do: :timer.minutes(10)
def timeout(_job), do: :timer.minutes(5)

@spec check_jobs(Arrow.Gtfs.JobHelper.status_filter()) :: list(map)
def check_jobs(status_filter) do
Expand Down
28 changes: 28 additions & 0 deletions lib/arrow/repo/foreign_key_constraint.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,34 @@ defmodule Arrow.Repo.ForeignKeyConstraint do
|> Repo.all()
end

@doc """
Returns foreign key constraints that both originate in, and reference, tables in `tables`.

For example, given the following foreign key relations:

foo.bar_id -> bar.id
foo.baz_id -> baz.id
baz.bar_id -> bar.id

Calling this:

internal_constraints(["bar", "baz"])

Would produce this:

[
%ForeignKeyConstraint{origin_table: "baz", referenced_table: "bar"}
]
"""
@spec internal_constraints(list(String.t() | atom)) :: list(t())
def internal_constraints(tables) when is_list(tables) do
Comment thread
jzimbel-mbta marked this conversation as resolved.
from(fk in __MODULE__,
where: fk.referenced_table in ^tables,
where: fk.origin_table in ^tables
)
|> Repo.all()
end

@doc """
Drops a foreign key constraint.

Expand Down
21 changes: 21 additions & 0 deletions test/arrow/repo/foreign_key_constraint_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,27 @@ defmodule Arrow.Repo.ForeignKeyConstraintTest do
end
end

describe "internal_constraints/1" do
test "returns fkeys both originating in and referencing the given tables" do
assert fkeys = ForeignKeyConstraint.internal_constraints(["a", "c"])
assert length(fkeys) == 2
[fkey1, fkey2] = Enum.sort_by(fkeys, & &1.name)

assert %ForeignKeyConstraint{name: "a_c_id_fkey", origin_table: "a", referenced_table: "c"} =
fkey1

assert %ForeignKeyConstraint{name: "c_a_id_fkey", origin_table: "c", referenced_table: "a"} =
fkey2

assert [fkey] = ForeignKeyConstraint.internal_constraints(["b", "c"])

assert %ForeignKeyConstraint{name: "b_c_id_fkey", origin_table: "b", referenced_table: "c"} =
fkey

assert [] = ForeignKeyConstraint.internal_constraints(["a"])
end
end

describe "drop/1" do
test "drops the given fkey" do
assert [c_a_fkey] = ForeignKeyConstraint.external_constraints_referencing_tables(["a"])
Expand Down
Loading