diff --git a/.github/workflows/cleanliness.yaml b/.github/workflows/cleanliness.yaml index 40a6613..5a36f0e 100644 --- a/.github/workflows/cleanliness.yaml +++ b/.github/workflows/cleanliness.yaml @@ -3,21 +3,18 @@ on: push: tags: # only on releases, not RC, since we've tested already - "[0-9]+.[0-9]+.[0-9]+" - branches: ["**"] # glob pattern to allow slash / + branches: ["main"] pull_request: - types: - - opened - - synchronize - branches: - - "release**" - - "main**" + types: [opened, synchronize] + branches: [main] + env: DIEM_FORGE_NODE_BIN_PATH: ${{github.workspace}}/diem-node LIBRA_CI: 1 MODE_0L: "TESTNET" jobs: - clippy: + format: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 @@ -37,6 +34,20 @@ jobs: command: fmt args: --all -- --check + clippy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: setup env + uses: ./.github/actions/build_env + + - uses: Swatinem/rust-cache@v2 + with: + shared-key: "forensic-db" + cache-all-crates: true + cache-on-failure: true + # TODO: clippy can share cache if build for tests is done prior # - name: build for cache # run: cargo build --tests --workspace diff --git a/.github/workflows/rust-tests.yaml b/.github/workflows/rust-tests.yaml index 09be3c6..a36a296 100644 --- a/.github/workflows/rust-tests.yaml +++ b/.github/workflows/rust-tests.yaml @@ -4,14 +4,10 @@ on: push: tags: # only on releases, not RC, since we've tested already - "[0-9]+.[0-9]+.[0-9]+" - branches: ["**"] # glob pattern to allow slash / + branches: ["main"] pull_request: - types: - - opened - - synchronize - branches: - - "release**" - - "main**" + types: [opened, synchronize] + branches: [main] schedule: - cron: "30 00 * * *" diff --git a/Cargo.toml b/Cargo.toml index a9ae316..6c0dce2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ chrono = { version = "0.4.19", features = ["clock", "serde"] } clap = { version = "4.3.5", features = ["derive", "unstable-styles"] } diem-temppath = { git = "https://github.com/0LNetworkCommunity/diem.git", branch = "release" } diem-types = { git = "https://github.com/0LNetworkCommunity/diem.git", branch = "release" } +diem-backup-cli = { git = "https://github.com/0LNetworkCommunity/diem.git", branch = "release" } diem-crypto = { git = "https://github.com/0LNetworkCommunity/diem.git", branch = "release" } env_logger = "^0.11" flate2 = "^1.0" diff --git a/src/extract_transactions.rs b/src/extract_transactions.rs index 6055ce2..a87da8c 100644 --- a/src/extract_transactions.rs +++ b/src/extract_transactions.rs @@ -1,4 +1,5 @@ use crate::decode_entry_function::decode_entry_function_all_versions; +use crate::read_tx_chunk::{load_chunk, load_tx_chunk_manifest}; use crate::scan::FrameworkVersion; use crate::schema_transaction::{RelationLabel, UserEventTypes, WarehouseEvent, WarehouseTxMaster}; use anyhow::Result; @@ -7,7 +8,6 @@ use diem_crypto::HashValue; use diem_types::account_config::{NewBlockEvent, WithdrawEvent}; use diem_types::contract_event::ContractEvent; use diem_types::{account_config::DepositEvent, transaction::SignedTransaction}; -use libra_storage::read_tx_chunk::{load_chunk, load_tx_chunk_manifest}; use libra_types::move_resource::coin_register_event::CoinRegisterEvent; use log::{error, info, warn}; use serde_json::json; @@ -33,30 +33,14 @@ pub async fn extract_current_transactions( let mut user_txs: Vec = vec![]; let mut events: Vec = vec![]; + let mut count_excluded = 0; + for each_chunk_manifest in manifest.chunks { let chunk = load_chunk(archive_path, each_chunk_manifest).await?; for (i, tx) in chunk.txns.iter().enumerate() { - // TODO: unsure if this is off by one - // perhaps reverse the vectors before transforming - - // first increment the block metadata. This assumes the vector is sequential. + // first collect the block metadata. This assumes the vector is sequential. if let Some(block) = tx.try_as_block_metadata() { - // // check the epochs are incrementing or not - // if epoch > block.epoch() - // && round > block.round() - // && timestamp > block.timestamp_usecs() - // { - // dbg!( - // epoch, - // block.epoch(), - // round, - // block.round(), - // timestamp, - // block.timestamp_usecs() - // ); - // } - epoch = block.epoch(); round = block.round(); timestamp = block.timestamp_usecs(); @@ -66,6 +50,13 @@ pub async fn extract_current_transactions( .txn_infos .get(i) .expect("could not index on tx_info chunk, vectors may not be same length"); + + // only process successful transactions + if !tx_info.status().is_success() { + count_excluded += 1; + continue; + }; + let tx_hash_info = tx_info.transaction_hash(); let tx_events = chunk @@ -104,6 +95,8 @@ pub async fn extract_current_transactions( } } + info!("Excluding {} unsuccessful transactions", count_excluded); + Ok((user_txs, events)) } diff --git a/src/json_rescue_v5_extract.rs b/src/json_rescue_v5_extract.rs index 763ca37..a01d336 100644 --- a/src/json_rescue_v5_extract.rs +++ b/src/json_rescue_v5_extract.rs @@ -21,7 +21,7 @@ use libra_backwards_compatibility::{ use anyhow::{anyhow, Context, Result}; use diem_temppath::TempPath; use diem_types::account_address::AccountAddress; -use log::trace; +use log::{info, trace}; use std::path::{Path, PathBuf}; /// The canonical transaction archives for V5 were kept in a different format as in v6 and v7. @@ -32,9 +32,17 @@ pub fn extract_v5_json_rescue( ) -> Result<(Vec, Vec, Vec)> { let json = std::fs::read_to_string(one_json_file).context("could not read file")?; - let txs: Vec = serde_json::from_str(&json) + let mut txs: Vec = serde_json::from_str(&json) .map_err(|e| anyhow!("could not parse JSON to TransactionViewV5, {:?}", e))?; + // remove any aborted txs + let orig_len = txs.len(); + txs.retain(|t| t.vm_status.is_executed()); + let new_len = txs.len(); + if orig_len > new_len { + info!("Excluding {} unsuccessful transactions", orig_len - new_len); + }; + decode_transaction_dataview_v5(&txs) } diff --git a/src/lib.rs b/src/lib.rs index a1536ca..cc677bc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,7 @@ pub mod load_exchange_orders; pub mod load_tx_cypher; pub mod neo4j_init; pub mod queue; +pub mod read_tx_chunk; pub mod scan; pub mod schema_account_state; pub mod schema_exchange_orders; diff --git a/src/read_tx_chunk.rs b/src/read_tx_chunk.rs new file mode 100644 index 0000000..f55c73f --- /dev/null +++ b/src/read_tx_chunk.rs @@ -0,0 +1,83 @@ +use std::path::Path; + +use anyhow::{anyhow, Context, Result}; + +use diem_backup_cli::backup_types::transaction::manifest::TransactionBackup; +use diem_backup_cli::backup_types::transaction::manifest::TransactionChunk; +use diem_backup_cli::utils::read_record_bytes::ReadRecordBytes; +use diem_types::contract_event::ContractEvent; +use diem_types::transaction::Transaction; +use diem_types::transaction::TransactionInfo; +use diem_types::write_set::WriteSet; +use libra_backwards_compatibility::version_five::state_snapshot_v5::open_for_read; + +/// read snapshot manifest file into object +pub fn load_tx_chunk_manifest(path: &Path) -> anyhow::Result { + let s = + std::fs::read_to_string(path).context(format!("Error: cannot read file at {:?}", path))?; + + let map: TransactionBackup = serde_json::from_str(&s)?; + + Ok(map) +} + +// similar to Loaded Chunk +// diem/storage/backup/backup-cli/src/backup_types/transaction/restore.rs +// The vectors below are OF THE SAME LENGTH +// It is a table where for example, a tx without events will be an empty slot in the vector. +pub struct TransactionArchiveChunk { + pub manifest: TransactionChunk, + pub txns: Vec, + pub txn_infos: Vec, + pub event_vecs: Vec>, + pub write_sets: Vec, +} + +pub async fn load_chunk( + archive_path: &Path, + manifest: TransactionChunk, +) -> Result { + let full_handle = archive_path + .parent() + .expect("could not read archive path") + .join(&manifest.transactions); + let handle_str = full_handle.to_str().unwrap(); + assert!(full_handle.exists(), "file does not exist"); + + let mut file = open_for_read(handle_str) + .await + .map_err(|e| anyhow!("snapshot chunk {:?}, {:?}", &handle_str, e))?; + + let mut txns = Vec::new(); + let mut txn_infos = Vec::new(); + let mut event_vecs = Vec::new(); + let mut write_sets = Vec::new(); + + while let Some(record_bytes) = file.read_record_bytes().await? { + let (txn, txn_info, events, write_set): (_, _, _, WriteSet) = + bcs::from_bytes(&record_bytes)?; + txns.push(txn); + txn_infos.push(txn_info); + event_vecs.push(events); + write_sets.push(write_set); + } + + // the chunk is a table implements with vectors, + // they should have the same length + assert!( + txns.len() == txn_infos.len() + && txn_infos.len() == event_vecs.len() + && event_vecs.len() == write_sets.len(), + "transactions chunk have different vector length for txs, events, and writesets" + ); + + // TODO: for purposes of explorer/warehouse do we want to do the full tx restore controller verifications + + Ok(TransactionArchiveChunk { + manifest, + txns, + txn_infos, + event_vecs, + write_sets, + }) +} diff --git a/src/unzip_temp.rs b/src/unzip_temp.rs index c1a6262..8a8b187 100644 --- a/src/unzip_temp.rs +++ b/src/unzip_temp.rs @@ -2,7 +2,7 @@ use anyhow::{Context, Result}; use diem_temppath::TempPath; use flate2::read::GzDecoder; use glob::glob; -// use libra_storage::read_tx_chunk::load_tx_chunk_manifest; +// use crate::read_tx_chunk::load_tx_chunk_manifest; use log::{info, warn}; use std::{ fs::File, diff --git a/src/warehouse_cli.rs b/src/warehouse_cli.rs index f40b2d9..b3d1bc1 100644 --- a/src/warehouse_cli.rs +++ b/src/warehouse_cli.rs @@ -12,7 +12,7 @@ use crate::{ json_rescue_v5_load, load::{ingest_all, try_load_one_archive}, load_exchange_orders, - neo4j_init::{self, get_credentials_from_env, PASS_ENV, URI_ENV, USER_ENV}, + neo4j_init::{self, get_credentials_from_env}, scan::{scan_dir_archive, BundleContent, ManifestInfo}, unzip_temp, util, }; @@ -294,22 +294,19 @@ pub async fn try_db_connection_pool(cli: &WarehouseCli) -> Result { let db = match get_credentials_from_env() { Ok((uri, user, password)) => Graph::new(uri, user, password).await?, Err(_) => { - if cli.db_uri.is_some() && cli.db_username.is_some() && cli.db_password.is_some() { - Graph::new( - cli.db_uri.as_ref().unwrap(), - cli.db_username.as_ref().unwrap(), - cli.db_password.as_ref().unwrap(), - ) - .await? - } else { - println!("Must pass DB credentials, either with CLI args or environment variable"); - println!("call with --db-uri, --db-user, and --db-password"); - println!( - "Alternatively export credentials to env variables: {}, {}, {}", - URI_ENV, USER_ENV, PASS_ENV - ); - bail!("could not get a db instance with credentials"); - } + let uri = cli + .db_uri + .as_ref() + .expect("Must pass --db-uri or set URI_ENV"); + let user = cli + .db_username + .as_ref() + .expect("Must pass --db-user or set USER_ENV"); + let password = cli + .db_password + .as_ref() + .expect("Must pass --db-password or set PASS_ENV"); + Graph::new(uri, user, password).await? } }; Ok(db) diff --git a/tests/test_extract_transactions.rs b/tests/test_extract_transactions.rs index d4274db..4050319 100644 --- a/tests/test_extract_transactions.rs +++ b/tests/test_extract_transactions.rs @@ -18,8 +18,7 @@ async fn test_extract_tx_from_archive() -> anyhow::Result<()> { async fn test_extract_v6_tx_from_archive() -> anyhow::Result<()> { let archive_path = support::fixtures::v6_tx_manifest_fixtures_path(); let list = extract_current_transactions(&archive_path, &FrameworkVersion::V6).await?; - - assert!(list.0.len() == 27); + assert!(list.0.len() == 25); assert!(list.1.len() == 52); Ok(()) diff --git a/tests/test_json_rescue_v5_load.rs b/tests/test_json_rescue_v5_load.rs index b2f0472..80b7c8b 100644 --- a/tests/test_json_rescue_v5_load.rs +++ b/tests/test_json_rescue_v5_load.rs @@ -25,7 +25,7 @@ async fn test_load_all_tgz() -> anyhow::Result<()> { let tx_count = json_rescue_v5_load::single_thread_decompress_extract(&path, &pool).await?; - assert!(tx_count == 13); + assert!(tx_count == 12); Ok(()) } @@ -46,7 +46,7 @@ async fn test_load_entrypoint() -> anyhow::Result<()> { let path = fixtures::v5_json_tx_path(); let tx_count = json_rescue_v5_load::rip_concurrent_limited(&path, &pool, None).await?; - assert!(tx_count == 13); + assert!(tx_count == 12); Ok(()) } @@ -68,7 +68,7 @@ async fn test_load_queue() -> anyhow::Result<()> { let tx_count = json_rescue_v5_load::rip_concurrent_limited(&path, &pool, None).await?; - assert!(tx_count == 13); + assert!(tx_count == 12); let tx_count = json_rescue_v5_load::rip_concurrent_limited(&path, &pool, None).await?; assert!(tx_count == 0); @@ -116,3 +116,39 @@ async fn test_rescue_v5_parse_set_wallet_tx() -> anyhow::Result<()> { Ok(()) } + +// #[tokio::test] +// fn test_stream() { +// async fn process_files(paths: Vec<&str>) { +// let mut stream = stream::iter(paths) +// .then(|path| async move { +// match read_json_file(path).await { +// Ok(data) => Some(data), +// Err(_) => None, +// } +// }) +// .filter_map(|x| async { x }) +// .flat_map(|data| stream::iter(data)); + +// let mut batch: VecDeque = VecDeque::new(); + +// while let Some(item) = stream.next().await { +// batch.push_back(item); + +// if batch.len() >= 100 { +// // Batch is large enough, process it +// let mut batch_to_process: Vec = Vec::new(); +// while let Some(_) = batch.pop_front() { +// batch_to_process.push(batch.pop_front().unwrap()); +// } +// process_batch(batch_to_process).await; +// } +// } + +// // Process any remaining items in the batch +// if !batch.is_empty() { +// let mut batch_to_process: Vec = batch.into(); +// process_batch(batch_to_process).await; +// } +// } +// } diff --git a/tests/test_json_rescue_v5_parse.rs b/tests/test_json_rescue_v5_parse.rs index efb86b5..ded96f7 100644 --- a/tests/test_json_rescue_v5_parse.rs +++ b/tests/test_json_rescue_v5_parse.rs @@ -81,10 +81,11 @@ fn test_json_full_file() -> anyhow::Result<()> { let p = fixtures::v5_json_tx_path().join("10000-10999.json"); let (tx, _, _) = extract_v5_json_rescue(&p)?; - assert!(tx.len() == 4); - let first = tx.first().unwrap(); - assert!(first.sender.to_hex_literal() == "0xb31bd7796bc113013a2bf6c3953305fd"); + assert!(tx.len() == 3); + let first = tx.first().unwrap(); + dbg!(&first); + assert!(first.sender.to_hex_literal() == "0xecaf65add1b785b0495e3099f4045ec0"); if let Some(EntryFunctionArgs::V5(ScriptFunctionCall::CreateUserByCoinTx { account, .. })) = first.entry_function @@ -103,8 +104,8 @@ fn decompress_and_read() { // get an advanced record let first_file = temp_dir.path().join("10000-10999.json"); let (tx, _, _) = extract_v5_json_rescue(&first_file).unwrap(); - assert!(tx.len() == 4); + assert!(tx.len() == 3); let first = tx.first().unwrap(); - assert!(first.sender.to_hex_literal() == "0xb31bd7796bc113013a2bf6c3953305fd"); + assert!(first.sender.to_hex_literal() == "0xecaf65add1b785b0495e3099f4045ec0"); } diff --git a/tests/test_load_tx.rs b/tests/test_load_tx.rs index a8bd786..f2ea441 100644 --- a/tests/test_load_tx.rs +++ b/tests/test_load_tx.rs @@ -19,7 +19,7 @@ async fn test_tx_batch() -> anyhow::Result<()> { libra_forensic_db::log_setup(); let archive_path = support::fixtures::v6_tx_manifest_fixtures_path(); let (txs, _events) = extract_current_transactions(&archive_path, &FrameworkVersion::V6).await?; - assert!(txs.len() == 27); + assert!(txs.len() == 25); let c = start_neo4j_container(); let port = c.get_host_port_ipv4(7687); @@ -79,7 +79,7 @@ async fn test_tx_batch() -> anyhow::Result<()> { let row = result.next().await?.unwrap(); let total_tx_count: i64 = row.get("total_tx_count").unwrap(); - assert!(total_tx_count == 24); + assert!(total_tx_count == 22); Ok(()) } @@ -105,7 +105,8 @@ async fn test_load_entry_point_tx() -> anyhow::Result<()> { assert!(res.created_accounts == 25); assert!(res.modified_accounts == 6); assert!(res.unchanged_accounts == 0); - assert!(res.created_tx == 27); + dbg!(&res.created_tx); + assert!(res.created_tx == 25); Ok(()) } diff --git a/tests/test_unzip.rs b/tests/test_unzip.rs index 01964e5..0eb59b1 100644 --- a/tests/test_unzip.rs +++ b/tests/test_unzip.rs @@ -1,6 +1,6 @@ mod support; +use libra_forensic_db::read_tx_chunk::load_tx_chunk_manifest; use libra_forensic_db::unzip_temp; -use libra_storage::read_tx_chunk::load_tx_chunk_manifest; #[ignore] #[test]