Skip to content
Merged
27 changes: 19 additions & 8 deletions .github/workflows/cleanliness.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,18 @@ on:
push:
tags: # only on releases, not RC, since we've tested already
- "[0-9]+.[0-9]+.[0-9]+"
branches: ["**"] # glob pattern to allow slash /
branches: ["main"]
pull_request:
types:
- opened
- synchronize
branches:
- "release**"
- "main**"
types: [opened, synchronize]
branches: [main]

env:
DIEM_FORGE_NODE_BIN_PATH: ${{github.workspace}}/diem-node
LIBRA_CI: 1
MODE_0L: "TESTNET"

jobs:
clippy:
format:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
Expand All @@ -37,6 +34,20 @@ jobs:
command: fmt
args: --all -- --check

clippy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3

- name: setup env
uses: ./.github/actions/build_env

- uses: Swatinem/rust-cache@v2
with:
shared-key: "forensic-db"
cache-all-crates: true
cache-on-failure: true

# TODO: clippy can share cache if build for tests is done prior
# - name: build for cache
# run: cargo build --tests --workspace
Expand Down
10 changes: 3 additions & 7 deletions .github/workflows/rust-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,10 @@ on:
push:
tags: # only on releases, not RC, since we've tested already
- "[0-9]+.[0-9]+.[0-9]+"
branches: ["**"] # glob pattern to allow slash /
branches: ["main"]
pull_request:
types:
- opened
- synchronize
branches:
- "release**"
- "main**"
types: [opened, synchronize]
branches: [main]
schedule:
- cron: "30 00 * * *"

Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ chrono = { version = "0.4.19", features = ["clock", "serde"] }
clap = { version = "4.3.5", features = ["derive", "unstable-styles"] }
diem-temppath = { git = "https://github.com/0LNetworkCommunity/diem.git", branch = "release" }
diem-types = { git = "https://github.com/0LNetworkCommunity/diem.git", branch = "release" }
diem-backup-cli = { git = "https://github.com/0LNetworkCommunity/diem.git", branch = "release" }
diem-crypto = { git = "https://github.com/0LNetworkCommunity/diem.git", branch = "release" }
env_logger = "^0.11"
flate2 = "^1.0"
Expand Down
33 changes: 13 additions & 20 deletions src/extract_transactions.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::decode_entry_function::decode_entry_function_all_versions;
use crate::read_tx_chunk::{load_chunk, load_tx_chunk_manifest};
use crate::scan::FrameworkVersion;
use crate::schema_transaction::{RelationLabel, UserEventTypes, WarehouseEvent, WarehouseTxMaster};
use anyhow::Result;
Expand All @@ -7,7 +8,6 @@ use diem_crypto::HashValue;
use diem_types::account_config::{NewBlockEvent, WithdrawEvent};
use diem_types::contract_event::ContractEvent;
use diem_types::{account_config::DepositEvent, transaction::SignedTransaction};
use libra_storage::read_tx_chunk::{load_chunk, load_tx_chunk_manifest};
use libra_types::move_resource::coin_register_event::CoinRegisterEvent;
use log::{error, info, warn};
use serde_json::json;
Expand All @@ -33,30 +33,14 @@ pub async fn extract_current_transactions(
let mut user_txs: Vec<WarehouseTxMaster> = vec![];
let mut events: Vec<WarehouseEvent> = vec![];

let mut count_excluded = 0;

for each_chunk_manifest in manifest.chunks {
let chunk = load_chunk(archive_path, each_chunk_manifest).await?;

for (i, tx) in chunk.txns.iter().enumerate() {
// TODO: unsure if this is off by one
// perhaps reverse the vectors before transforming

// first increment the block metadata. This assumes the vector is sequential.
// first collect the block metadata. This assumes the vector is sequential.
if let Some(block) = tx.try_as_block_metadata() {
// // check the epochs are incrementing or not
// if epoch > block.epoch()
// && round > block.round()
// && timestamp > block.timestamp_usecs()
// {
// dbg!(
// epoch,
// block.epoch(),
// round,
// block.round(),
// timestamp,
// block.timestamp_usecs()
// );
// }

epoch = block.epoch();
round = block.round();
timestamp = block.timestamp_usecs();
Expand All @@ -66,6 +50,13 @@ pub async fn extract_current_transactions(
.txn_infos
.get(i)
.expect("could not index on tx_info chunk, vectors may not be same length");

// only process successful transactions
if !tx_info.status().is_success() {
count_excluded += 1;
continue;
};

let tx_hash_info = tx_info.transaction_hash();

let tx_events = chunk
Expand Down Expand Up @@ -104,6 +95,8 @@ pub async fn extract_current_transactions(
}
}

info!("Excluding {} unsuccessful transactions", count_excluded);

Ok((user_txs, events))
}

Expand Down
12 changes: 10 additions & 2 deletions src/json_rescue_v5_extract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use libra_backwards_compatibility::{
use anyhow::{anyhow, Context, Result};
use diem_temppath::TempPath;
use diem_types::account_address::AccountAddress;
use log::trace;
use log::{info, trace};
use std::path::{Path, PathBuf};

/// The canonical transaction archives for V5 were kept in a different format as in v6 and v7.
Expand All @@ -32,9 +32,17 @@ pub fn extract_v5_json_rescue(
) -> Result<(Vec<WarehouseTxMaster>, Vec<WarehouseEvent>, Vec<String>)> {
let json = std::fs::read_to_string(one_json_file).context("could not read file")?;

let txs: Vec<TransactionViewV5> = serde_json::from_str(&json)
let mut txs: Vec<TransactionViewV5> = serde_json::from_str(&json)
.map_err(|e| anyhow!("could not parse JSON to TransactionViewV5, {:?}", e))?;

// remove any aborted txs
let orig_len = txs.len();
txs.retain(|t| t.vm_status.is_executed());
let new_len = txs.len();
if orig_len > new_len {
info!("Excluding {} unsuccessful transactions", orig_len - new_len);
};

decode_transaction_dataview_v5(&txs)
}

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub mod load_exchange_orders;
pub mod load_tx_cypher;
pub mod neo4j_init;
pub mod queue;
pub mod read_tx_chunk;
pub mod scan;
pub mod schema_account_state;
pub mod schema_exchange_orders;
Expand Down
83 changes: 83 additions & 0 deletions src/read_tx_chunk.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use std::path::Path;

use anyhow::{anyhow, Context, Result};

use diem_backup_cli::backup_types::transaction::manifest::TransactionBackup;
use diem_backup_cli::backup_types::transaction::manifest::TransactionChunk;
use diem_backup_cli::utils::read_record_bytes::ReadRecordBytes;
use diem_types::contract_event::ContractEvent;
use diem_types::transaction::Transaction;
use diem_types::transaction::TransactionInfo;
use diem_types::write_set::WriteSet;
use libra_backwards_compatibility::version_five::state_snapshot_v5::open_for_read;

/// read snapshot manifest file into object
pub fn load_tx_chunk_manifest(path: &Path) -> anyhow::Result<TransactionBackup> {
let s =
std::fs::read_to_string(path).context(format!("Error: cannot read file at {:?}", path))?;

let map: TransactionBackup = serde_json::from_str(&s)?;

Ok(map)
}

// similar to Loaded Chunk
// diem/storage/backup/backup-cli/src/backup_types/transaction/restore.rs
// The vectors below are OF THE SAME LENGTH
// It is a table where for example, a tx without events will be an empty slot in the vector.
pub struct TransactionArchiveChunk {
pub manifest: TransactionChunk,
pub txns: Vec<Transaction>,
pub txn_infos: Vec<TransactionInfo>,
pub event_vecs: Vec<Vec<ContractEvent>>,
pub write_sets: Vec<WriteSet>,
}

pub async fn load_chunk(
archive_path: &Path,
manifest: TransactionChunk,
) -> Result<TransactionArchiveChunk> {
let full_handle = archive_path
.parent()
.expect("could not read archive path")
.join(&manifest.transactions);
let handle_str = full_handle.to_str().unwrap();
assert!(full_handle.exists(), "file does not exist");

let mut file = open_for_read(handle_str)
.await
.map_err(|e| anyhow!("snapshot chunk {:?}, {:?}", &handle_str, e))?;

let mut txns = Vec::new();
let mut txn_infos = Vec::new();
let mut event_vecs = Vec::new();
let mut write_sets = Vec::new();

while let Some(record_bytes) = file.read_record_bytes().await? {
let (txn, txn_info, events, write_set): (_, _, _, WriteSet) =
bcs::from_bytes(&record_bytes)?;
txns.push(txn);
txn_infos.push(txn_info);
event_vecs.push(events);
write_sets.push(write_set);
}

// the chunk is a table implements with vectors,
// they should have the same length
assert!(
txns.len() == txn_infos.len()
&& txn_infos.len() == event_vecs.len()
&& event_vecs.len() == write_sets.len(),
"transactions chunk have different vector length for txs, events, and writesets"
);

// TODO: for purposes of explorer/warehouse do we want to do the full tx restore controller verifications

Ok(TransactionArchiveChunk {
manifest,
txns,
txn_infos,
event_vecs,
write_sets,
})
}
2 changes: 1 addition & 1 deletion src/unzip_temp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{Context, Result};
use diem_temppath::TempPath;
use flate2::read::GzDecoder;
use glob::glob;
// use libra_storage::read_tx_chunk::load_tx_chunk_manifest;
// use crate::read_tx_chunk::load_tx_chunk_manifest;
use log::{info, warn};
use std::{
fs::File,
Expand Down
31 changes: 14 additions & 17 deletions src/warehouse_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
json_rescue_v5_load,
load::{ingest_all, try_load_one_archive},
load_exchange_orders,
neo4j_init::{self, get_credentials_from_env, PASS_ENV, URI_ENV, USER_ENV},
neo4j_init::{self, get_credentials_from_env},
scan::{scan_dir_archive, BundleContent, ManifestInfo},
unzip_temp, util,
};
Expand Down Expand Up @@ -294,22 +294,19 @@ pub async fn try_db_connection_pool(cli: &WarehouseCli) -> Result<Graph> {
let db = match get_credentials_from_env() {
Ok((uri, user, password)) => Graph::new(uri, user, password).await?,
Err(_) => {
if cli.db_uri.is_some() && cli.db_username.is_some() && cli.db_password.is_some() {
Graph::new(
cli.db_uri.as_ref().unwrap(),
cli.db_username.as_ref().unwrap(),
cli.db_password.as_ref().unwrap(),
)
.await?
} else {
println!("Must pass DB credentials, either with CLI args or environment variable");
println!("call with --db-uri, --db-user, and --db-password");
println!(
"Alternatively export credentials to env variables: {}, {}, {}",
URI_ENV, USER_ENV, PASS_ENV
);
bail!("could not get a db instance with credentials");
}
let uri = cli
.db_uri
.as_ref()
.expect("Must pass --db-uri or set URI_ENV");
let user = cli
.db_username
.as_ref()
.expect("Must pass --db-user or set USER_ENV");
let password = cli
.db_password
.as_ref()
.expect("Must pass --db-password or set PASS_ENV");
Graph::new(uri, user, password).await?
}
};
Ok(db)
Expand Down
3 changes: 1 addition & 2 deletions tests/test_extract_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ async fn test_extract_tx_from_archive() -> anyhow::Result<()> {
async fn test_extract_v6_tx_from_archive() -> anyhow::Result<()> {
let archive_path = support::fixtures::v6_tx_manifest_fixtures_path();
let list = extract_current_transactions(&archive_path, &FrameworkVersion::V6).await?;

assert!(list.0.len() == 27);
assert!(list.0.len() == 25);
assert!(list.1.len() == 52);

Ok(())
Expand Down
42 changes: 39 additions & 3 deletions tests/test_json_rescue_v5_load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn test_load_all_tgz() -> anyhow::Result<()> {

let tx_count = json_rescue_v5_load::single_thread_decompress_extract(&path, &pool).await?;

assert!(tx_count == 13);
assert!(tx_count == 12);

Ok(())
}
Expand All @@ -46,7 +46,7 @@ async fn test_load_entrypoint() -> anyhow::Result<()> {
let path = fixtures::v5_json_tx_path();

let tx_count = json_rescue_v5_load::rip_concurrent_limited(&path, &pool, None).await?;
assert!(tx_count == 13);
assert!(tx_count == 12);

Ok(())
}
Expand All @@ -68,7 +68,7 @@ async fn test_load_queue() -> anyhow::Result<()> {

let tx_count = json_rescue_v5_load::rip_concurrent_limited(&path, &pool, None).await?;

assert!(tx_count == 13);
assert!(tx_count == 12);

let tx_count = json_rescue_v5_load::rip_concurrent_limited(&path, &pool, None).await?;
assert!(tx_count == 0);
Expand Down Expand Up @@ -116,3 +116,39 @@ async fn test_rescue_v5_parse_set_wallet_tx() -> anyhow::Result<()> {

Ok(())
}

// #[tokio::test]
// fn test_stream() {
// async fn process_files(paths: Vec<&str>) {
// let mut stream = stream::iter(paths)
// .then(|path| async move {
// match read_json_file(path).await {
// Ok(data) => Some(data),
// Err(_) => None,
// }
// })
// .filter_map(|x| async { x })
// .flat_map(|data| stream::iter(data));

// let mut batch: VecDeque<MyStruct> = VecDeque::new();

// while let Some(item) = stream.next().await {
// batch.push_back(item);

// if batch.len() >= 100 {
// // Batch is large enough, process it
// let mut batch_to_process: Vec<MyStruct> = Vec::new();
// while let Some(_) = batch.pop_front() {
// batch_to_process.push(batch.pop_front().unwrap());
// }
// process_batch(batch_to_process).await;
// }
// }

// // Process any remaining items in the batch
// if !batch.is_empty() {
// let mut batch_to_process: Vec<MyStruct> = batch.into();
// process_batch(batch_to_process).await;
// }
// }
// }
Loading
Loading