diff --git a/bun.lock b/bun.lock index 914a319..210d967 100644 --- a/bun.lock +++ b/bun.lock @@ -7,6 +7,7 @@ "dependencies": { "@types/react": "^19.2.14", "better-sqlite3": "^12.6.2", + "chrono-node": "^2.9.0", "exceljs": "^4.4.0", "fast-xml-parser": "^5.3.7", "ink": "^6.8.0", @@ -22,6 +23,7 @@ "pg": "^8.18.0", "react": "^19.2.4", "string-width": "^8.2.0", + "woothee": "^1.11.1", }, "devDependencies": { "@types/better-sqlite3": "^7.6.13", @@ -383,6 +385,8 @@ "chownr": ["chownr@1.1.4", "", {}, "sha512-jJ0bqzaylmJtVnNgzTeSOs8DPavpbYgEr/b0YL8/2GO3xJEhInFmhKMUnEJQjZumK7KXGFhUy89PrsJWlakBVg=="], + "chrono-node": ["chrono-node@2.9.0", "", {}, "sha512-glI4YY2Jy6JII5l3d5FN6rcrIbKSQqKPhWsIRYPK2IK8Mm4Q1ZZFdYIaDqglUNf7gNwG+kWIzTn0omzzE0VkvQ=="], + "cli-boxes": ["cli-boxes@3.0.0", "", {}, "sha512-/lzGpEWL/8PfI0BmBOPRwp0c/wFNX1RdUML3jK/RcSBA9T8mZDdQpqYBKtCFTOfQbwPqWEOpjqW+Fnayc0969g=="], "cli-cursor": ["cli-cursor@4.0.0", "", { "dependencies": { "restore-cursor": "^4.0.0" } }, "sha512-VGtlMu3x/4DOtIUwEkRezxUZ2lBacNJCHash0N0WeZDBS+7Ux1dm3XWAgWYxLJFMMdOeXMHXorshEFhbMSGelg=="], @@ -831,6 +835,8 @@ "widest-line": ["widest-line@6.0.0", "", { "dependencies": { "string-width": "^8.1.0" } }, "sha512-U89AsyEeAsyoF0zVJBkG9zBgekjgjK7yk9sje3F4IQpXBJ10TF6ByLlIfjMhcmHMJgHZI4KHt4rdNfktzxIAMA=="], + "woothee": ["woothee@1.11.1", "", {}, "sha512-KdArM3MsNa5tlSBSL29w9ouy9MXZoFPeUdPVnL4QZH3iyV8HsqnwbWw2YLiXEx2wAh0bM55dnl0+qDE6KHBlhQ=="], + "wrap-ansi": ["wrap-ansi@9.0.2", "", { "dependencies": { "ansi-styles": "^6.2.1", "string-width": "^7.0.0", "strip-ansi": "^7.1.0" } }, "sha512-42AtmgqjV+X1VpdOfyTGOYRi0/zsoLqtXQckTmqTeybT+BDIbM/Guxo7x3pE2vtpr1ok6xRqM9OpBe+Jyoqyww=="], "wrappy": ["wrappy@1.0.2", "", {}, "sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ=="], diff --git a/package.json b/package.json index 228b033..6f66e5b 100644 --- a/package.json +++ b/package.json @@ -33,6 +33,7 @@ "dependencies": { "@types/react": "^19.2.14", "better-sqlite3": "^12.6.2", + "chrono-node": "^2.9.0", "exceljs": "^4.4.0", "fast-xml-parser": "^5.3.7", "ink": "^6.8.0", @@ -47,6 +48,7 @@ "papaparse": "^5.5.3", "pg": "^8.18.0", "react": "^19.2.4", - "string-width": "^8.2.0" + "string-width": "^8.2.0", + "woothee": "^1.11.1" } } diff --git a/src/aggregators/Ord2Bivariate.ts b/src/aggregators/Ord2Bivariate.ts new file mode 100644 index 0000000..71a5538 --- /dev/null +++ b/src/aggregators/Ord2Bivariate.ts @@ -0,0 +1,97 @@ +import type { Aggregator } from "../Aggregator.ts"; +import type { Record } from "../Record.ts"; +import type { JsonValue } from "../types/json.ts"; +import { findKey } from "../KeySpec.ts"; +import { aggregatorRegistry } from "../Aggregator.ts"; + +// [sum1, sumX, sumY, sumXY, sumX2, sumY2] +type Ord2BivState = [number, number, number, number, number, number]; + +/** + * Second-order bivariate statistics aggregator. + * Computes covariance, correlation, and linear regression parameters + * between two fields using a single pass. + * + * Analogous to App::RecordStream::Aggregator::Ord2Bivariate in Perl. + */ +export class Ord2BivariateAggregator implements Aggregator { + fieldX: string; + fieldY: string; + + constructor(fieldX: string, fieldY: string) { + this.fieldX = fieldX; + this.fieldY = fieldY; + } + + initial(): Ord2BivState | null { + return null; + } + + combine(state: Ord2BivState | null, record: Record): Ord2BivState | null { + const vx = findKey(record.dataRef(), this.fieldX, true); + const vy = findKey(record.dataRef(), this.fieldY, true); + if (vx === undefined || vx === null || vy === undefined || vy === null) return state; + const x = Number(vx); + const y = Number(vy); + const mapped: Ord2BivState = [1, x, y, x * y, x * x, y * y]; + if (state === null) return mapped; + return [ + state[0] + mapped[0], + state[1] + mapped[1], + state[2] + mapped[2], + state[3] + mapped[3], + state[4] + mapped[4], + state[5] + mapped[5], + ]; + } + + squish(state: Ord2BivState | null): JsonValue { + if (state === null) return null; + const [n, sumX, sumY, sumXY, sumX2, sumY2] = state; + + const meanX = sumX / n; + const meanY = sumY / n; + + // Covariance: E[XY] - E[X]*E[Y] + const covariance = sumXY / n - meanX * meanY; + + // Variances + const varX = sumX2 / n - meanX * meanX; + const varY = sumY2 / n - meanY * meanY; + + // Correlation: cov / (stdX * stdY) + const denominator = Math.sqrt(varX * varY); + const correlation = denominator > 0 + ? (sumXY * n - sumX * sumY) / Math.sqrt((sumX2 * n - sumX ** 2) * (sumY2 * n - sumY ** 2)) + : null; + + // Linear regression: y = alpha + beta * x + const betaDenom = sumX2 * n - sumX ** 2; + const beta = betaDenom !== 0 ? (sumXY * n - sumX * sumY) / betaDenom : null; + const alpha = beta !== null ? (sumY - beta * sumX) / n : null; + + const result: { [key: string]: JsonValue } = { + count: n, + covariance, + correlation, + }; + + if (alpha !== null && beta !== null) { + result["alpha"] = alpha; + result["beta"] = beta; + } + + return result; + } +} + +aggregatorRegistry.register("ord2biv", { + create: (fieldX: string, fieldY: string) => new Ord2BivariateAggregator(fieldX, fieldY), + argCounts: [2], + shortUsage: "compute second-order bivariate statistics for two fields", + longUsage: + "Usage: ord2biv,,\n" + + " Compute covariance, correlation, and linear regression parameters\n" + + " between two fields.", + aliases: ["ord2bivariate"], +}); diff --git a/src/aggregators/Ord2Univariate.ts b/src/aggregators/Ord2Univariate.ts new file mode 100644 index 0000000..ff78d65 --- /dev/null +++ b/src/aggregators/Ord2Univariate.ts @@ -0,0 +1,82 @@ +import type { Aggregator } from "../Aggregator.ts"; +import type { Record } from "../Record.ts"; +import type { JsonValue } from "../types/json.ts"; +import { findKey } from "../KeySpec.ts"; +import { aggregatorRegistry } from "../Aggregator.ts"; + +// [count, sumX, sumX2, sumX3, sumX4] +type Ord2UniState = [number, number, number, number, number]; + +/** + * Second-order univariate statistics aggregator. + * Computes count, mean, variance, standard deviation, skewness, and kurtosis + * for a single field using a single pass. + * + * Analogous to App::RecordStream::Aggregator::Ord2Univariate in Perl. + */ +export class Ord2UnivariateAggregator implements Aggregator { + field: string; + + constructor(field: string) { + this.field = field; + } + + initial(): Ord2UniState | null { + return null; + } + + combine(state: Ord2UniState | null, record: Record): Ord2UniState | null { + const value = findKey(record.dataRef(), this.field, true); + if (value === undefined || value === null) return state; + const x = Number(value); + const mapped: Ord2UniState = [1, x, x * x, x * x * x, x * x * x * x]; + if (state === null) return mapped; + return [ + state[0] + mapped[0], + state[1] + mapped[1], + state[2] + mapped[2], + state[3] + mapped[3], + state[4] + mapped[4], + ]; + } + + squish(state: Ord2UniState | null): JsonValue { + if (state === null) return null; + const [n, sumX, sumX2, sumX3, sumX4] = state; + + const mean = sumX / n; + const variance = sumX2 / n - mean * mean; + const stddev = Math.sqrt(variance); + + const result: { [key: string]: JsonValue } = { + count: n, + mean, + variance, + stddev, + }; + + // Skewness and kurtosis require variance > 0 + if (variance > 0) { + // E[(X - mean)^3] = E[X^3] - 3*mean*E[X^2] + 2*mean^3 + const m3 = sumX3 / n - 3 * mean * (sumX2 / n) + 2 * mean * mean * mean; + result["skewness"] = m3 / (stddev * stddev * stddev); + + // E[(X - mean)^4] = E[X^4] - 4*mean*E[X^3] + 6*mean^2*E[X^2] - 3*mean^4 + const m4 = sumX4 / n - 4 * mean * (sumX3 / n) + 6 * mean * mean * (sumX2 / n) - 3 * mean * mean * mean * mean; + result["kurtosis"] = m4 / (variance * variance); + } + + return result; + } +} + +aggregatorRegistry.register("ord2uni", { + create: (field: string) => new Ord2UnivariateAggregator(field), + argCounts: [1], + shortUsage: "compute second-order univariate statistics for a field", + longUsage: + "Usage: ord2uni,\n" + + " Compute count, mean, variance, standard deviation, skewness,\n" + + " and kurtosis for the specified field.", + aliases: ["ord2univariate"], +}); diff --git a/src/aggregators/registry.ts b/src/aggregators/registry.ts index c110701..b450037 100644 --- a/src/aggregators/registry.ts +++ b/src/aggregators/registry.ts @@ -33,5 +33,7 @@ import "./FirstRecord.ts"; import "./LastRecord.ts"; import "./RecordForMaximum.ts"; import "./RecordForMinimum.ts"; +import "./Ord2Univariate.ts"; +import "./Ord2Bivariate.ts"; export { aggregatorRegistry, makeAggregators } from "../Aggregator.ts"; diff --git a/src/clumpers/Options.ts b/src/clumpers/Options.ts index 9dd9c48..71880e4 100644 --- a/src/clumpers/Options.ts +++ b/src/clumpers/Options.ts @@ -132,30 +132,38 @@ export class ClumperOptions { keyParts.push(String(val ?? "")); } - const groupKey = keyParts.join("\x1E"); - if (!this.groups) { this.groups = new Map(); } - let cookie = this.groups.get(groupKey); - if (cookie === undefined) { - // Handle LRU eviction if keySize is set and NOT in perfect mode - if (!this.keyPerfect && this.keySize !== null && this.groups.size >= this.keySize) { - const oldestKey = this.groupOrder.shift()!; - const oldCookie = this.groups.get(oldestKey); - if (oldCookie !== undefined) { - this.callback.clumperCallbackEnd(oldCookie); - this.groups.delete(oldestKey); + // In cube mode, generate all 2^N combinations of actual values and "ALL" + const combos = this.keyCube + ? this.cubeKeyValues(keySpecs, keyValues) + : [{ keyValues, keyParts }]; + + for (const combo of combos) { + const groupKey = combo.keyParts.join("\x1E"); + + let cookie = this.groups.get(groupKey); + if (cookie === undefined) { + // Handle LRU eviction if keySize is set and NOT in perfect mode + if (!this.keyPerfect && this.keySize !== null && this.groups.size >= this.keySize) { + const oldestKey = this.groupOrder.shift()!; + const oldCookie = this.groups.get(oldestKey); + if (oldCookie !== undefined) { + this.callback.clumperCallbackEnd(oldCookie); + this.groups.delete(oldestKey); + } } + + cookie = this.callback.clumperCallbackBegin(combo.keyValues); + this.groups.set(groupKey, cookie); + this.groupOrder.push(groupKey); } - cookie = this.callback.clumperCallbackBegin(keyValues); - this.groups.set(groupKey, cookie); - this.groupOrder.push(groupKey); + this.callback.clumperCallbackPushRecord(cookie, record); } - this.callback.clumperCallbackPushRecord(cookie, record); return true; } @@ -183,6 +191,40 @@ export class ClumperOptions { } } + /** + * Generate all 2^N combinations of actual key values and "ALL" for cube mode. + */ + cubeKeyValues( + keySpecs: string[], + keyValues: { [key: string]: JsonValue } + ): Array<{ keyValues: { [key: string]: JsonValue }; keyParts: string[] }> { + const n = keySpecs.length; + const combos: Array<{ keyValues: { [key: string]: JsonValue }; keyParts: string[] }> = []; + + // Iterate all 2^N bitmasks + for (let mask = 0; mask < (1 << n); mask++) { + const comboValues: { [key: string]: JsonValue } = {}; + const comboParts: string[] = []; + + for (let i = 0; i < n; i++) { + const spec = keySpecs[i]!; + if (mask & (1 << i)) { + // Replace this key with "ALL" + comboValues[spec] = "ALL"; + comboParts.push("ALL"); + } else { + // Use actual value + comboValues[spec] = keyValues[spec] ?? null; + comboParts.push(String(keyValues[spec] ?? "")); + } + } + + combos.push({ keyValues: comboValues, keyParts: comboParts }); + } + + return combos; + } + getKeySize(): number | null { return this.keySize; } diff --git a/src/deaggregators/registry.ts b/src/deaggregators/registry.ts new file mode 100644 index 0000000..2cb592d --- /dev/null +++ b/src/deaggregators/registry.ts @@ -0,0 +1,11 @@ +/** + * Deaggregator registry - central place for registering and looking up + * deaggregators by name. + * + * Importing this module ensures all deaggregator implementations are registered. + */ + +// Import all deaggregator implementations to trigger their self-registration +import "./Split.ts"; +import "./Unarray.ts"; +import "./Unhash.ts"; diff --git a/src/operations/input/fromapache.ts b/src/operations/input/fromapache.ts index eca82c4..77140e5 100644 --- a/src/operations/input/fromapache.ts +++ b/src/operations/input/fromapache.ts @@ -1,6 +1,7 @@ import { Operation } from "../../Operation.ts"; import { Record } from "../../Record.ts"; import type { JsonObject } from "../../types/json.ts"; +import woothee from "woothee"; /** * Apache combined/common log format regex patterns. @@ -30,6 +31,7 @@ type ParseMode = "fast" | "strict"; */ export class FromApache extends Operation { mode: ParseMode = "fast"; + useWoothee = false; strictFormats: string[] | null = null; extraArgs: string[] = []; @@ -74,6 +76,9 @@ export class FromApache extends Operation { strictSet = true; strictArg = arg.slice(9); i++; + } else if (arg === "--woothee") { + this.useWoothee = true; + i++; } else if (arg === "--verbose") { i++; } else if (arg === "--help" || arg === "-h") { @@ -176,6 +181,20 @@ export class FromApache extends Operation { return this.parseCombined(line) ?? this.parseCommon(line) ?? this.parseVhostCommon(line); } + applyWoothee(data: JsonObject): void { + if (!this.useWoothee) return; + const agent = data["agent"]; + if (typeof agent !== "string") return; + + const parsed = woothee.parse(agent); + data["ua_name"] = parsed.name; + data["ua_category"] = parsed.category; + data["ua_os"] = parsed.os; + data["ua_version"] = parsed.version; + data["ua_vendor"] = parsed.vendor; + data["ua_os_version"] = parsed.os_version; + } + parseCombined(line: string): Record | null { const m = COMBINED_RE.exec(line); if (!m) return null; @@ -204,6 +223,8 @@ export class FromApache extends Operation { data["proto"] = proto; } + this.applyWoothee(data); + return new Record(data); } @@ -305,6 +326,11 @@ export const documentation: CommandDoc = { description: "Use the strict parser which works relatively slow. It can process any style format logs, with specification about separator, and checker for perfection. It can also process backslash-quoted double-quotes properly.", }, + { + flags: ["--woothee"], + description: + "Parse the user agent field with the Woothee library and add ua_name, ua_category, ua_os, ua_version, ua_vendor, and ua_os_version fields to each record.", + }, { flags: ["--verbose"], description: "Verbose output.", diff --git a/src/operations/input/fromps.ts b/src/operations/input/fromps.ts index 5b1d1ac..5a015eb 100644 --- a/src/operations/input/fromps.ts +++ b/src/operations/input/fromps.ts @@ -1,3 +1,4 @@ +import { readFileSync } from "fs"; import { Operation, type OptionDef } from "../../Operation.ts"; import { Record } from "../../Record.ts"; import type { JsonObject, JsonValue } from "../../types/json.ts"; @@ -11,6 +12,39 @@ export interface ProcessTableSource { getFields(): string[]; } +/** + * Build a UID-to-username map by parsing /etc/passwd. + */ +function buildUidMap(): Map { + const map = new Map(); + try { + const contents = readFileSync("/etc/passwd", "utf-8"); + for (const line of contents.split("\n")) { + if (line.startsWith("#") || line.trim() === "") continue; + const parts = line.split(":"); + if (parts.length >= 3) { + const username = parts[0]!; + const uid = parts[2]!; + map.set(uid, username); + } + } + } catch { + // If /etc/passwd is not readable, return empty map + } + return map; +} + +/** + * Default UID converter using /etc/passwd lookup. + * Returns the username if found, otherwise the original UID value as a string. + */ +function defaultUidConverter(uidMap: Map): (uid: JsonValue) => string { + return (uid: JsonValue) => { + const uidStr = String(uid); + return uidMap.get(uidStr) ?? uidStr; + }; +} + /** * Default process table source that shells out to `ps`. */ @@ -74,6 +108,8 @@ export class FromPs extends Operation { fields: string[] = []; processTable: ProcessTableSource = new PsProcessTable(); uidConverter: ((uid: JsonValue) => string) | null = null; + noUidConvert = false; + uidConverterExplicitlySet = false; acceptRecord(_record: Record): boolean { return true; @@ -99,10 +135,24 @@ export class FromPs extends Operation { }, description: "Fields to output", }, + { + long: "no-uid-convert", + type: "boolean", + handler: () => { + this.noUidConvert = true; + }, + description: "Do not convert UIDs to usernames", + }, ]; this.parseOptions(args, defs); + // Set up default UID converter if not explicitly set and not disabled + if (!this.uidConverterExplicitlySet && !this.noUidConvert) { + const uidMap = buildUidMap(); + this.uidConverter = defaultUidConverter(uidMap); + } + if (this.fields.length === 0) { this.fields = this.processTable.getFields(); } @@ -112,8 +162,9 @@ export class FromPs extends Operation { this.processTable = table; } - setUidConverter(converter: (uid: JsonValue) => string): void { + setUidConverter(converter: ((uid: JsonValue) => string) | null): void { this.uidConverter = converter; + this.uidConverterExplicitlySet = true; } override wantsInput(): boolean { @@ -129,7 +180,7 @@ export class FromPs extends Operation { let value = proc[field]; if (value === undefined) continue; - if (field === "uid" && this.uidConverter) { + if ((field === "uid" || field === "euid") && this.uidConverter) { value = this.uidConverter(value); } @@ -161,6 +212,11 @@ export const documentation: CommandDoc = { description: "Fields to output. May be specified multiple times, may be comma separated. Defaults to all fields.", }, + { + flags: ["--no-uid-convert"], + description: + "Do not convert UIDs to usernames. By default, uid and euid fields are converted to usernames via /etc/passwd.", + }, ], examples: [ { diff --git a/src/operations/input/fromtcpdump.ts b/src/operations/input/fromtcpdump.ts index 916b776..c87168d 100644 --- a/src/operations/input/fromtcpdump.ts +++ b/src/operations/input/fromtcpdump.ts @@ -49,15 +49,14 @@ export class FromTcpdump extends Operation { } dumpPackets(file: string): void { - // Use tcpdump to read pcap file and output verbose text const tcpdumpArgs = [ "tcpdump", "-r", file, - "-nn", // Don't resolve addresses or ports - "-tt", // Print unformatted timestamp - "-v", // Verbose - "-e", // Print link-layer header + "-nn", // Don't resolve addresses or ports + "-tt", // Print unformatted timestamp + "-vv", // Very verbose (includes checksums) + "-e", // Print link-layer header (MAC addresses) ]; if (this.includeData) { @@ -73,75 +72,326 @@ export class FromTcpdump extends Operation { } const output = result.stdout.toString(); - const lines = output.split("\n").filter((l) => l.trim() !== ""); + const lines = output.split("\n"); + // Group lines into packets: a new packet starts with a timestamp, + // continuation lines start with whitespace + const packets: string[][] = []; for (const line of lines) { - const record = this.parsePacketLine(line, file); + if (line.trim() === "") continue; + if (/^\d+\.\d+\s/.test(line)) { + packets.push([line]); + } else if (packets.length > 0) { + packets[packets.length - 1]!.push(line); + } + } + + for (const packetLines of packets) { + const record = this.parsePacket(packetLines, file); if (record) { this.pushRecord(new Record(record)); } } } - parsePacketLine(line: string, file: string): JsonObject | null { + parsePacket(lines: string[], file: string): JsonObject | null { const record: JsonObject = { file }; + const headerLine = lines[0]!; + + // Combine continuation lines, filtering out hex dump lines (from -X) + const detailLines = lines + .slice(1) + .filter((l) => !l.trim().startsWith("0x")) + .map((l) => l.trim()) + .join(" "); // Parse timestamp - const tsMatch = line.match(/^(\d+\.\d+)\s+/); + const tsMatch = headerLine.match(/^(\d+\.\d+)\s+/); if (tsMatch) { record["timestamp"] = tsMatch[1]!; } - // Parse basic IP info - const ipMatch = line.match( - /IP\s+(\S+?)\.(\d+)\s+>\s+(\S+?)\.(\d+):/ + // Parse ethernet MAC addresses from -e output + this.parseEthernet(headerLine, record); + + // Parse frame length from ethernet header: ", length N:" + const frameLenMatch = headerLine.match(/,\s+length\s+(\d+):/); + if (frameLenMatch) { + record["length"] = parseInt(frameLenMatch[1]!, 10); + } + + // Determine packet type and parse protocol details + if ( + headerLine.includes("ethertype IPv4") || + headerLine.includes("ethertype IPv6") + ) { + this.parseIpPacket(headerLine, detailLines, record); + } else if ( + headerLine.includes("ethertype ARP") || + headerLine.includes("ARP") + ) { + record["type"] = "arp"; + this.parseArpDetails(headerLine + " " + detailLines, record); + } else { + record["type"] = "ethernet"; + // Fallback length parsing + if (!record["length"]) { + const lenMatch = headerLine.match(/length\s+(\d+)/); + if (lenMatch) { + record["length"] = parseInt(lenMatch[1]!, 10); + } + } + } + + return record; + } + + parseEthernet(line: string, record: JsonObject): void { + // Format after timestamp: SRC_MAC > DEST_MAC, ethertype ... + const macMatch = line.match( + /^[\d.]+\s+([\da-f]{2}:[\da-f]{2}:[\da-f]{2}:[\da-f]{2}:[\da-f]{2}:[\da-f]{2})\s+>\s+([\da-f]{2}:[\da-f]{2}:[\da-f]{2}:[\da-f]{2}:[\da-f]{2}:[\da-f]{2})/i ); - if (ipMatch) { - record["ip"] = { - src_ip: ipMatch[1]!, - dest_ip: ipMatch[3]!, + if (macMatch) { + record["ethernet"] = { + src_mac: macMatch[1]!, + dest_mac: macMatch[2]!, } as JsonValue; + } + } + + parseIpPacket( + headerLine: string, + detailLines: string, + record: JsonObject + ): void { + const ip: JsonObject = {}; - // Determine protocol - if (line.includes("UDP")) { + // Parse IP header: (tos 0xN, ttl N, id N, offset N, flags [...], proto TYPE (N), length N) + const ipHeaderMatch = headerLine.match( + /\(tos\s+(0x[\da-f]+),\s*ttl\s+(\d+),\s*id\s+(\d+),\s*offset\s+(\d+),\s*flags\s+\[([^\]]*)\],\s*proto\s+(\w+)\s+\((\d+)\),\s*length\s+(\d+)\)/i + ); + if (ipHeaderMatch) { + ip["tos"] = parseInt(ipHeaderMatch[1]!, 16); + ip["ttl"] = parseInt(ipHeaderMatch[2]!, 10); + ip["id"] = parseInt(ipHeaderMatch[3]!, 10); + ip["offset"] = parseInt(ipHeaderMatch[4]!, 10); + ip["proto"] = parseInt(ipHeaderMatch[7]!, 10); + ip["len"] = parseInt(ipHeaderMatch[8]!, 10); + + // Parse IP flags + const flagStr = ipHeaderMatch[5]!; + const ipFlags: JsonObject = {}; + if (flagStr.includes("DF")) ipFlags["dont_fragment"] = 1; + if (flagStr.includes("MF")) ipFlags["more_fragments"] = 1; + if (/\bCE\b/.test(flagStr)) ipFlags["congestion"] = 1; + ip["flags"] = ipFlags; + } + + // Parse IP addresses and ports from detail line + // Format: src_ip.src_port > dest_ip.dest_port: + const addrMatch = detailLines.match( + /(\S+?)\.(\d+)\s+>\s+(\S+?)\.(\d+):/ + ); + if (addrMatch) { + ip["src_ip"] = addrMatch[1]!; + ip["dest_ip"] = addrMatch[3]!; + const srcPort = parseInt(addrMatch[2]!, 10); + const destPort = parseInt(addrMatch[4]!, 10); + record["ip"] = ip as JsonValue; + + // Determine transport protocol + if ( + headerLine.includes("proto UDP") || + detailLines.includes(" UDP,") + ) { record["type"] = "udp"; - record["udp"] = { - src_port: parseInt(ipMatch[2]!, 10), - dest_port: parseInt(ipMatch[4]!, 10), - } as JsonValue; + const udp: JsonObject = { + src_port: srcPort, + dest_port: destPort, + }; + this.parseUdpDetails(detailLines, udp); + record["udp"] = udp as JsonValue; + + if (srcPort === 53 || destPort === 53) { + this.parseDnsDetails(detailLines, record); + } } else { record["type"] = "tcp"; - record["tcp"] = { - src_port: parseInt(ipMatch[2]!, 10), - dest_port: parseInt(ipMatch[4]!, 10), - } as JsonValue; - - // Parse TCP flags - const flagsMatch = line.match(/Flags \[([^\]]*)\]/); - if (flagsMatch) { - const flagStr = flagsMatch[1]!; - const flags: JsonObject = {}; - if (flagStr.includes("S")) flags["SYN"] = 1; - if (flagStr.includes("F")) flags["FIN"] = 1; - if (flagStr.includes("R")) flags["RST"] = 1; - if (flagStr.includes("P")) flags["PSH"] = 1; - if (flagStr.includes(".")) flags["ACK"] = 1; - (record["tcp"] as JsonObject)["flags"] = flags; + const tcp: JsonObject = { + src_port: srcPort, + dest_port: destPort, + }; + this.parseTcpDetails(detailLines, tcp); + record["tcp"] = tcp as JsonValue; + + if (srcPort === 53 || destPort === 53) { + this.parseDnsDetails(detailLines, record); } } - } else if (line.includes("ARP")) { - record["type"] = "arp"; } else { - record["type"] = "ethernet"; + // IP packet without clear port info (e.g. ICMP) + record["ip"] = ip as JsonValue; + record["type"] = "ip"; + } + } + + parseTcpDetails(detail: string, tcp: JsonObject): void { + // TCP flags: Flags [S.], [P.], [SEWU.], etc. + // S=SYN, F=FIN, R=RST, P=PSH, .=ACK, U=URG, E=ECE, W=CWR + const flagsMatch = detail.match(/Flags\s+\[([^\]]*)\]/); + if (flagsMatch) { + const flagStr = flagsMatch[1]!; + const flags: JsonObject = {}; + if (flagStr.includes("S")) flags["SYN"] = 1; + if (flagStr.includes("F")) flags["FIN"] = 1; + if (flagStr.includes("R")) flags["RST"] = 1; + if (flagStr.includes("P")) flags["PSH"] = 1; + if (flagStr.includes(".")) flags["ACK"] = 1; + if (flagStr.includes("U")) flags["URG"] = 1; + if (flagStr.includes("E")) flags["ECE"] = 1; + if (flagStr.includes("W")) flags["CWR"] = 1; + tcp["flags"] = flags; + } + + // seq N or seq N:M + const seqMatch = detail.match(/\bseq\s+(\d+)(?::(\d+))?/); + if (seqMatch) { + tcp["seq"] = parseInt(seqMatch[1]!, 10); + if (seqMatch[2]) { + tcp["seq_end"] = parseInt(seqMatch[2]!, 10); + } + } + + // ack N + const ackMatch = detail.match(/\back\s+(\d+)/); + if (ackMatch) { + tcp["ack"] = parseInt(ackMatch[1]!, 10); + } + + // win N + const winMatch = detail.match(/\bwin\s+(\d+)/); + if (winMatch) { + tcp["win"] = parseInt(winMatch[1]!, 10); + } + + // cksum 0xNNNN + const cksumMatch = detail.match(/\bcksum\s+(0x[\da-f]+)/i); + if (cksumMatch) { + tcp["cksum"] = parseInt(cksumMatch[1]!, 16); + } + + // options [...] + const optMatch = detail.match(/\boptions\s+\[([^\]]*)\]/); + if (optMatch) { + tcp["options"] = optMatch[1]!; } - // Parse length - const lenMatch = line.match(/length\s+(\d+)/); + // TCP payload length at end of line + const lenMatch = detail.match(/\blength\s+(\d+)\s*$/); if (lenMatch) { - record["length"] = parseInt(lenMatch[1]!, 10); + tcp["data_length"] = parseInt(lenMatch[1]!, 10); } + } - return record; + parseUdpDetails(detail: string, udp: JsonObject): void { + // UDP checksum: [bad udp cksum 0xNNNN -> 0xNNNN!] + const cksumMatch = detail.match(/udp cksum\s+(0x[\da-f]+)/i); + if (cksumMatch) { + udp["cksum"] = parseInt(cksumMatch[1]!, 16); + } + + // Data length from trailing (N) — UDP length = data + 8 byte header + const dataLenMatch = detail.match(/\((\d+)\)\s*$/); + if (dataLenMatch) { + const dataLen = parseInt(dataLenMatch[1]!, 10); + udp["len"] = dataLen + 8; + } + } + + parseDnsDetails(detail: string, record: JsonObject): void { + const dns: JsonObject = {}; + + // DNS query: ID[flags] QTYPE? QNAME. (len) + // Example: 3930+ A? blog.benjaminbernard.com. (42) + const queryMatch = detail.match( + /:\s+(?:\[.*?\]\s+)?(\d+)([+*%-]*)\s+(\w+)\?\s+(\S+?)\.\s+\((\d+)\)/ + ); + if (queryMatch) { + dns["id"] = parseInt(queryMatch[1]!, 10); + dns["qr"] = 0; + dns["question"] = [ + { qname: queryMatch[4]!, qtype: queryMatch[3]! }, + ] as JsonValue; + dns["answer"] = [] as JsonValue; + record["dns"] = dns as JsonValue; + return; + } + + // DNS response: ID[flags] q: QTYPE? QNAME. AN/NS/AR answers... + // Example: 3930 q: A? blog.benjaminbernard.com. 6/0/0 blog... A 1.2.3.4, ... + const responseMatch = detail.match( + /:\s+(?:\[.*?\]\s+)?(\d+)([+*%-]*)\s+(?:q:\s+)?(\w+)\?\s+(\S+?)\.\s+(\d+)\/(\d+)\/(\d+)\s+(.*)/ + ); + if (responseMatch) { + dns["id"] = parseInt(responseMatch[1]!, 10); + dns["qr"] = 1; + dns["question"] = [ + { qname: responseMatch[4]!, qtype: responseMatch[3]! }, + ] as JsonValue; + dns["counts"] = { + answer: parseInt(responseMatch[5]!, 10), + authority: parseInt(responseMatch[6]!, 10), + additional: parseInt(responseMatch[7]!, 10), + } as JsonValue; + dns["answer"] = this.parseDnsAnswers(responseMatch[8]!) as JsonValue; + record["dns"] = dns as JsonValue; + } + } + + parseDnsAnswers(answersStr: string): JsonValue[] { + const answers: JsonValue[] = []; + // Remove trailing (len) + const cleaned = answersStr.replace(/\s*\(\d+\)\s*$/, ""); + const parts = cleaned.split(/,\s*/); + for (const part of parts) { + const trimmed = part.trim(); + if (!trimmed) continue; + // Match: name. TYPE data + const rrMatch = trimmed.match(/^(\S+?)\.?\s+(\w+)\s+(.+)$/); + if (rrMatch) { + answers.push({ + name: rrMatch[1]!, + type: rrMatch[2]!, + data: rrMatch[3]!, + }); + } + } + return answers; + } + + parseArpDetails(text: string, record: JsonObject): void { + const arp: JsonObject = {}; + + // ARP Request: Request who-has TARGET_IP tell SENDER_IP + const requestMatch = text.match( + /Request\s+who-has\s+([^\s,]+)\s+tell\s+([^\s,]+)/ + ); + if (requestMatch) { + arp["opcode"] = "ARP_REQUEST"; + arp["target_ip"] = requestMatch[1]!; + arp["sender_ip"] = requestMatch[2]!; + record["arp"] = arp as JsonValue; + return; + } + + // ARP Reply: Reply SENDER_IP is-at SENDER_MAC + const replyMatch = text.match(/Reply\s+([^\s,]+)\s+is-at\s+([^\s,]+)/); + if (replyMatch) { + arp["opcode"] = "ARP_REPLY"; + arp["sender_ip"] = replyMatch[1]!; + arp["sender_mac"] = replyMatch[2]!; + record["arp"] = arp as JsonValue; + } } } diff --git a/src/operations/output/todb.ts b/src/operations/output/todb.ts index 8202a59..bf9f836 100644 --- a/src/operations/output/todb.ts +++ b/src/operations/output/todb.ts @@ -5,7 +5,7 @@ import type { JsonValue } from "../../types/json.ts"; /** * Writes records to a SQL database. - * Supports SQLite (via bun:sqlite), with extensibility for other databases. + * Supports SQLite (via bun:sqlite), PostgreSQL (via pg), and MySQL (via mysql2). * * Analogous to App::RecordStream::Operation::todb in Perl. */ @@ -13,12 +13,19 @@ export class ToDb extends Operation { tableName = "recs"; debug = false; dropTable = false; - fields: Map = new Map(); // field name -> SQL type (0 = default VARCHAR) + fields: Map = new Map(); // field name -> SQL type ("" = default VARCHAR) first = true; dbFile = ""; dbType = "sqlite"; + host: string | null = null; + port: string | null = null; + dbName: string | null = null; + user: string | null = null; + password: string | null = null; // eslint-disable-next-line @typescript-eslint/no-explicit-any -- Database instance type depends on runtime db: { run: (sql: string) => void; close: () => void } | null = null; + // Buffered records for async DB types (pg, mysql) + pendingRecords: Record[] = []; constructor(next?: RecordReceiver) { super(next); @@ -92,42 +99,95 @@ export class ToDb extends Operation { long: "type", type: "string", handler: (v) => { this.dbType = v as string; }, - description: "Database type (sqlite)", + description: "Database type: sqlite (default), pg, mysql", + }, + { + long: "host", + type: "string", + handler: (v) => { this.host = v as string; }, + description: "Hostname for database connection (pg, mysql)", + }, + { + long: "port", + type: "string", + handler: (v) => { this.port = v as string; }, + description: "Port for database connection (pg, mysql)", + }, + { + long: "db", + type: "string", + handler: (v) => { this.dbName = v as string; }, + description: "Database name (pg, mysql)", + }, + { + long: "dbname", + type: "string", + handler: (v) => { this.dbName = v as string; }, + description: "Database name (alias for --db)", + }, + { + long: "user", + type: "string", + handler: (v) => { this.user = v as string; }, + description: "Database user", + }, + { + long: "password", + type: "string", + handler: (v) => { this.password = v as string; }, + description: "Database password", }, ]; this.parseOptions(args, defs); - if (!this.dbFile) { - this.dbFile = ":memory:"; + if (this.dbType === "postgres") { + this.dbType = "pg"; } - this.initDb(); + if (this.dbType === "pg" && !this.dbName) { + throw new Error("--db is required for PostgreSQL databases"); + } - if (this.dropTable) { - try { - this.dbDo(`DROP TABLE IF EXISTS "${this.tableName}"`); - } catch { - // Ignore errors from dropping a non-existent table + if (this.dbType === "mysql" && (!this.host || !this.dbName)) { + throw new Error("--host and --db are required for MySQL databases"); + } + + if (this.dbType === "sqlite") { + if (!this.dbFile) { + this.dbFile = ":memory:"; + } + this.initSqliteDb(); + + if (this.dropTable) { + try { + this.dbDo(`DROP TABLE IF EXISTS "${this.tableName}"`); + } catch { + // Ignore errors from dropping a non-existent table + } } } + // For pg/mysql, connection is established asynchronously in finish() } - initDb(): void { - // Use Bun's built-in SQLite + initSqliteDb(): void { // eslint-disable-next-line @typescript-eslint/no-require-imports -- bun:sqlite is a runtime module const { Database } = require("bun:sqlite") as { Database: new (path: string) => { run: (sql: string) => void; close: () => void } }; this.db = new Database(this.dbFile); } acceptRecord(record: Record): boolean { - if (this.first) { - this.addFields(record); - this.createTable(); - this.first = false; + if (this.dbType === "sqlite") { + if (this.first) { + this.addFields(record); + this.createTable(); + this.first = false; + } + this.addRow(record); + } else { + // Buffer records for async DB types + this.pendingRecords.push(record); } - - this.addRow(record); return true; } @@ -139,7 +199,7 @@ export class ToDb extends Operation { } } - addRow(record: Record): void { + buildInsertSql(record: Record): string { const data = record.dataRef(); const keys = [...this.fields.keys()]; const columns = keys.map((k) => `"${k}"`).join(","); @@ -154,22 +214,40 @@ export class ToDb extends Operation { return `'${strVal.replace(/'/g, "''")}'`; }).join(","); - const sql = `INSERT INTO "${this.tableName}" (${columns}) VALUES (${values})`; + return `INSERT INTO "${this.tableName}" (${columns}) VALUES (${values})`; + } + + addRow(record: Record): void { + const sql = this.buildInsertSql(record); this.dbDo(sql); } - createTable(): void { - const incrementName = this.dbType === "sqlite" ? "AUTOINCREMENT" : "AUTO_INCREMENT"; - let sql = `CREATE TABLE IF NOT EXISTS "${this.tableName}" ( id INTEGER PRIMARY KEY ${incrementName}, `; + buildCreateTableSql(): string { + const incrementName = this.dbType === "sqlite" ? "AUTOINCREMENT" : (this.dbType === "pg" ? "GENERATED ALWAYS AS IDENTITY" : "AUTO_INCREMENT"); + if (this.dbType === "pg") { + let sql = `CREATE TABLE IF NOT EXISTS "${this.tableName}" ( id INTEGER ${incrementName} PRIMARY KEY, `; + const columnDefs: string[] = []; + for (const [name, type] of this.fields) { + const sqlType = type || "VARCHAR(255)"; + columnDefs.push(`"${name}" ${sqlType}`); + } + sql += columnDefs.join(", ") + " )"; + return sql; + } + + let sql = `CREATE TABLE IF NOT EXISTS "${this.tableName}" ( id INTEGER PRIMARY KEY ${incrementName}, `; const columnDefs: string[] = []; for (const [name, type] of this.fields) { const sqlType = type || "VARCHAR(255)"; columnDefs.push(`"${name}" ${sqlType}`); } - sql += columnDefs.join(", ") + " )"; + return sql; + } + createTable(): void { + const sql = this.buildCreateTableSql(); try { this.dbDo(sql); } catch { @@ -185,7 +263,169 @@ export class ToDb extends Operation { } override streamDone(): void { - this.db?.close(); + if (this.dbType === "sqlite") { + this.db?.close(); + } + // For async types, cleanup happens in streamDoneAsync + } + + async streamDoneAsync(): Promise { + switch (this.dbType) { + case "sqlite": + this.db?.close(); + break; + case "pg": + await this.flushPostgres(); + break; + case "mysql": + await this.flushMysql(); + break; + default: + throw new Error( + `Database type '${this.dbType}' is not supported. Supported types: sqlite, pg, mysql` + ); + } + } + + /** + * Override finish to support async database types. + * The dispatcher and executor both call `await op.finish()`, so returning + * a Promise here is correctly awaited at runtime even though the base + * class types this as void. + */ + override finish(): void { + if (this.dbType === "sqlite") { + this.streamDone(); + this.next.finish(); + return; + } + // For async types (pg, mysql), return the Promise so the + // dispatcher's `await op.finish()` waits for it at runtime. + return this.streamDoneAsync().then(() => { + this.next.finish(); + }) as unknown as void; + } + + async flushPostgres(): Promise { + const { Client } = await import("pg"); + + const config: { + database: string; + host?: string; + port?: number; + user?: string; + password?: string; + } = { + database: this.dbName!, + }; + + if (this.host) { + config.host = this.host; + } + if (this.port) { + config.port = parseInt(this.port, 10); + } + if (this.user) { + config.user = this.user; + } + if (this.password) { + config.password = this.password; + } + + const client = new Client(config); + try { + await client.connect(); + + if (this.dropTable) { + const dropSql = `DROP TABLE IF EXISTS "${this.tableName}"`; + if (this.debug) { + this.pushLine("Running: " + dropSql); + } + await client.query(dropSql); + } + + // Determine fields from first record if not specified + if (this.pendingRecords.length > 0 && this.fields.size === 0) { + this.addFields(this.pendingRecords[0]!); + } + + if (this.fields.size > 0) { + const createSql = this.buildCreateTableSql(); + if (this.debug) { + this.pushLine("Running: " + createSql); + } + await client.query(createSql); + } + + for (const record of this.pendingRecords) { + const sql = this.buildInsertSql(record); + if (this.debug) { + this.pushLine("Running: " + sql); + } + await client.query(sql); + } + } finally { + await client.end(); + } + } + + async flushMysql(): Promise { + const mysql = await import("mysql2/promise"); + + const config: { + host: string; + database: string; + port?: number; + user?: string; + password?: string; + } = { + host: this.host!, + database: this.dbName!, + }; + + if (this.port) { + config.port = parseInt(this.port, 10); + } + if (this.user) { + config.user = this.user; + } + if (this.password) { + config.password = this.password; + } + + const connection = await mysql.createConnection(config); + try { + if (this.dropTable) { + const dropSql = `DROP TABLE IF EXISTS "${this.tableName}"`; + if (this.debug) { + this.pushLine("Running: " + dropSql); + } + await connection.execute(dropSql); + } + + // Determine fields from first record if not specified + if (this.pendingRecords.length > 0 && this.fields.size === 0) { + this.addFields(this.pendingRecords[0]!); + } + + if (this.fields.size > 0) { + const createSql = this.buildCreateTableSql(); + if (this.debug) { + this.pushLine("Running: " + createSql); + } + await connection.execute(createSql); + } + + for (const record of this.pendingRecords) { + const sql = this.buildInsertSql(record); + if (this.debug) { + this.pushLine("Running: " + sql); + } + await connection.execute(sql); + } + } finally { + await connection.end(); + } } override doesRecordOutput(): boolean { @@ -196,20 +436,37 @@ export class ToDb extends Operation { return `Usage: recs todb [] Dumps a stream of input records into a database. + Supports SQLite (default), PostgreSQL (--type pg), and MySQL (--type mysql). + PostgreSQL requires the 'pg' package. MySQL requires the 'mysql2' package. + Arguments: --drop Drop the table before create/insert --table Table name (default: recs) --debug Print all executed SQL --key|-k Fields to insert (name or name=SQL_TYPE) --dbfile Database file path (for SQLite) - --type Database type (sqlite) + --type Database type: sqlite (default), pg, mysql + --host Hostname for database connection (pg, mysql) + --port Port for database connection (pg, mysql) + --db Database name (pg, mysql) + --user Database user for authentication + --password Database password for authentication Examples: - # Put all records into the recs table + # Put all records into the recs table (SQLite) recs todb --type sqlite --dbfile testDb --table recs # Specify fields and drop existing table - recs todb --dbfile testDb --drop --key status,description=TEXT --key user`; + recs todb --dbfile testDb --drop --key status,description=TEXT --key user + + # Insert into PostgreSQL via Unix socket + recs todb --type pg --db mydb --table users + + # Insert into PostgreSQL with host and credentials + recs todb --type pg --host db.example.com --port 5432 --db mydb --user admin --password secret + + # Insert into MySQL + recs todb --type mysql --host localhost --db mydb --user root --table orders`; } } @@ -226,7 +483,7 @@ export const documentation: CommandDoc = { category: "output", synopsis: "recs todb [options] [files...]", description: - "Dumps a stream of input records into a database. The record fields you want inserted should have the same keys as the column names in the database, and the records should be key-value pairs. This command will attempt to create the table if it is not already present.", + "Dumps a stream of input records into a database. The record fields you want inserted should have the same keys as the column names in the database, and the records should be key-value pairs. This command will attempt to create the table if it is not already present.\n\nSupports SQLite (default), PostgreSQL (--type pg), and MySQL (--type mysql).\n\nPostgreSQL requires the 'pg' package. MySQL requires the 'mysql2' package.", options: [ { flags: ["--drop"], @@ -261,18 +518,58 @@ export const documentation: CommandDoc = { { flags: ["--type"], argument: "", - description: "Database type (sqlite).", + description: "Database type: sqlite (default), pg, mysql.", + }, + { + flags: ["--host"], + argument: "", + description: + "Hostname for database connection. For pg, omit to use Unix domain socket. Required for mysql.", + }, + { + flags: ["--port"], + argument: "", + description: "Port for database connection (pg, mysql).", + }, + { + flags: ["--db", "--dbname"], + argument: "", + description: "Database name. Required for pg and mysql.", + }, + { + flags: ["--user"], + argument: "", + description: "Database user for authentication.", + }, + { + flags: ["--password"], + argument: "", + description: "Database password for authentication.", }, ], examples: [ { - description: "Put all records into the recs table", + description: "Put all records into the recs table (SQLite)", command: "recs todb --type sqlite --dbfile testDb --table recs", }, { description: "Specify fields and drop existing table", command: "recs todb --dbfile testDb --drop --key status,description=TEXT --key user", }, + { + description: "Insert into PostgreSQL via Unix socket", + command: "recs todb --type pg --db mydb --table users", + }, + { + description: "Insert into PostgreSQL with credentials", + command: + "recs todb --type pg --host db.example.com --port 5432 --db mydb --user admin --password secret", + }, + { + description: "Insert into MySQL", + command: + "recs todb --type mysql --host localhost --db mydb --user root --table orders", + }, ], seeAlso: ["fromdb"], }; diff --git a/src/operations/transform/chain.ts b/src/operations/transform/chain.ts index 713badb..5d3881d 100644 --- a/src/operations/transform/chain.ts +++ b/src/operations/transform/chain.ts @@ -1,4 +1,3 @@ -import { spawnSync } from "node:child_process"; import { Operation } from "../../Operation.ts"; import type { RecordReceiver } from "../../Operation.ts"; import { Record } from "../../Record.ts"; @@ -54,14 +53,51 @@ export function createOperationOrShell( } /** - * A "shell operation" that pipes JSONL through an external command. - * Records are serialized to JSON, piped through the command's stdin/stdout, - * and parsed back to records. Used when chain encounters a non-recs command. + * Receiver wrapper that forwards records/lines but blocks finish propagation. + * ChainOperation uses this between operations so it can manage the finish + * sequence explicitly (required for async shell operations). + */ +class ChainFinishBarrier implements RecordReceiver { + constructor(private target: RecordReceiver) {} + + acceptRecord(record: Record): boolean { + return this.target.acceptRecord(record); + } + + acceptLine(line: string): boolean { + if (this.target.acceptLine) { + return this.target.acceptLine(line); + } + return true; + } + + finish(): void { + // Intentionally does NOT propagate. + // ChainOperation manages the finish sequence. + } +} + +/** + * A "shell operation" that pipes JSONL through an external command using + * streaming I/O. Records are written to the command's stdin as they arrive, + * and stdout is read asynchronously and parsed back to records. + * + * This replaces the previous spawnSync-based approach which buffered the + * entire dataset in memory (with a 100MB limit). The streaming approach + * can handle arbitrarily large datasets. */ class ShellOperation extends Operation { command: string; commandArgs: string[]; - bufferedRecords: Record[] = []; + spawnedProc: { + readonly stdin: { write(data: string): number | Promise; end(): number | Promise }; + readonly stdout: ReadableStream; + readonly stderr: ReadableStream; + readonly exited: Promise; + } | null = null; + outputDone: Promise | null = null; + stderrDone: Promise | null = null; + stdinClosed = false; constructor(next: RecordReceiver, command: string, commandArgs: string[]) { super(next); @@ -73,44 +109,119 @@ class ShellOperation extends Operation { // No-op: initialized via constructor } - acceptRecord(record: Record): boolean { - this.bufferedRecords.push(record); - return true; + ensureProcess(): void { + if (this.spawnedProc) return; + try { + this.spawnedProc = Bun.spawn([this.command, ...this.commandArgs], { + stdin: "pipe", + stdout: "pipe", + stderr: "pipe", + }); + } catch (err: unknown) { + const msg = err instanceof Error ? err.message : String(err); + throw new Error(`Shell command '${this.command}' failed: ${msg}`); + } + // Start reading stdout/stderr immediately to avoid pipe buffer deadlocks + this.outputDone = this.readStdout(); + this.stderrDone = this.readStderr(); } - override streamDone(): void { - // Serialize all buffered records to JSONL - const input = this.bufferedRecords.map((r) => r.toString()).join("\n") + "\n"; - - // Spawn the shell command - const result = spawnSync(this.command, this.commandArgs, { - input, - encoding: "utf-8", - shell: false, - maxBuffer: 100 * 1024 * 1024, // 100MB - }); - - if (result.error) { - throw new Error(`Shell command '${this.command}' failed: ${result.error.message}`); - } + async readStdout(): Promise { + const reader = this.spawnedProc!.stdout.getReader(); + const decoder = new TextDecoder(); + let partial = ""; - if (result.stderr && result.stderr.trim()) { - process.stderr.write(result.stderr); + try { + for (;;) { + const { done, value } = await reader.read(); + if (done) break; + partial += decoder.decode(value, { stream: true }); + + const lines = partial.split("\n"); + partial = lines.pop()!; + + for (const line of lines) { + if (line.trim() === "") continue; + try { + this.pushRecord(Record.fromJSON(line)); + } catch { + this.pushLine(line); + } + } + } + } finally { + reader.releaseLock(); } - // Parse output lines back as records - const output = result.stdout ?? ""; - const lines = output.split("\n").filter((line: string) => line.trim() !== ""); - for (const line of lines) { + // Flush remaining decoder state + partial += decoder.decode(); + if (partial.trim()) { try { - const record = Record.fromJSON(line); - this.pushRecord(record); + this.pushRecord(Record.fromJSON(partial)); } catch { - // If the line isn't valid JSON, pass it through as a raw line - this.pushLine(line); + this.pushLine(partial); } } } + + async readStderr(): Promise { + const reader = this.spawnedProc!.stderr.getReader(); + try { + for (;;) { + const { done, value } = await reader.read(); + if (done) break; + process.stderr.write(Buffer.from(value)); + } + } finally { + reader.releaseLock(); + } + } + + acceptRecord(record: Record): boolean { + if (this.stdinClosed) return false; + this.ensureProcess(); + try { + this.spawnedProc!.stdin.write(record.toString() + "\n"); + } catch { + // Process may have exited early (e.g. head -1), stop writing + this.stdinClosed = true; + return false; + } + return true; + } + + override streamDone(): void { + // No-op: finish() handles cleanup + } + + override finish(): void { + if (!this.spawnedProc) { + // No records were sent; no process to wait on + this.next.finish(); + return; + } + + const doFinish = async (): Promise => { + // Close stdin to signal end of input + if (!this.stdinClosed) { + try { + await this.spawnedProc!.stdin.end(); + } catch { + // stdin may already be closed if the process exited early + } + } + + // Wait for stdout and stderr readers to complete + await Promise.all([this.outputDone, this.stderrDone]); + + // Wait for process to exit + await this.spawnedProc!.exited; + + this.next.finish(); + }; + + return doFinish() as unknown as void; + } } /** @@ -198,7 +309,10 @@ export class ChainOperation extends Operation { if (this.dryRun) return; - // Build the operation chain from right to left + // Build the operation chain from right to left. + // Each operation's receiver is wrapped in a ChainFinishBarrier so that + // finish() calls don't cascade automatically. ChainOperation manages the + // finish sequence explicitly (required for async shell operations). let receiver: RecordReceiver = this.next; const ops: Operation[] = []; @@ -207,12 +321,14 @@ export class ChainOperation extends Operation { const name = cmd[0]!; const cmdArgs = cmd.slice(1); + const barrier = new ChainFinishBarrier(receiver); + let op: Operation; if (isKnownRecsOp(name)) { - op = createOperation(name, cmdArgs, receiver); + op = createOperation(name, cmdArgs, barrier); } else { // Shell command: pipe JSONL through it - op = new ShellOperation(receiver, name, cmdArgs); + op = new ShellOperation(barrier, name, cmdArgs); } ops.unshift(op); receiver = op; @@ -303,28 +419,47 @@ export class ChainOperation extends Operation { } override streamDone(): void { + // No-op: finish() handles the complete sequence + } + + override finish(): void { if (this.dryRun) return; + if (this.operations.length === 0) return; + + // If we buffered bulk stdin content, feed it to the first op now + if (this.bulkStdinLines.length > 0) { + const content = this.bulkStdinLines.join("\n") + "\n"; + const first = this.operations[0]!; + const opAny = first as unknown as { [key: string]: unknown }; + if (typeof opAny["parseXml"] === "function") { + (opAny["parseXml"] as (xml: string) => void)(content); + } else if (typeof opAny["parseContent"] === "function") { + (opAny["parseContent"] as (content: string) => void)(content); + } + } - if (this.operations.length > 0) { - // If we buffered bulk stdin content, feed it to the first op now - if (this.bulkStdinLines.length > 0) { - const content = this.bulkStdinLines.join("\n") + "\n"; - const first = this.operations[0]!; - const opAny = first as unknown as { [key: string]: unknown }; - if (typeof opAny["parseXml"] === "function") { - (opAny["parseXml"] as (xml: string) => void)(content); - } else if (typeof opAny["parseContent"] === "function") { - (opAny["parseContent"] as (content: string) => void)(content); - } + // Check if any shell ops need async finish + const hasAsyncOps = this.operations.some(op => op instanceof ShellOperation); + + if (!hasAsyncOps) { + // Pure recs chain: finish each op synchronously. + // FinishBarriers prevent the cascade from propagating automatically. + for (const op of this.operations) { + op.finish(); } - this.operations[0]!.finish(); + this.next.finish(); + return; } - } - override finish(): void { - this.streamDone(); - // The chain's last operation already connects to this.next, - // so finishing the first operation propagates through the chain. + // Chain with shell ops: finish each op with await for async support. + const doFinish = async (): Promise => { + for (const op of this.operations) { + await op.finish(); + } + this.next.finish(); + }; + + return doFinish() as unknown as void; } } diff --git a/src/operations/transform/decollate.ts b/src/operations/transform/decollate.ts index 126cc84..985b718 100644 --- a/src/operations/transform/decollate.ts +++ b/src/operations/transform/decollate.ts @@ -1,10 +1,13 @@ -import { Operation } from "../../Operation.ts"; +import { Operation, HelpExit } from "../../Operation.ts"; import type { OptionDef } from "../../Operation.ts"; import { deaggregatorRegistry } from "../../Deaggregator.ts"; import type { Deaggregator } from "../../Deaggregator.ts"; import { Record } from "../../Record.ts"; import type { JsonValue } from "../../types/json.ts"; +// Ensure all deaggregators are registered +import "../../deaggregators/registry.ts"; + /** * Reverse of collate: takes a single record and produces multiple records * using deaggregators. @@ -38,6 +41,16 @@ export class DecollateOperation extends Operation { }, description: "Deaggregator specification (colon-separated)", }, + { + long: "dldeaggregator", + short: "D", + type: "string", + handler: (v) => { + const spec = v as string; + deaggSpecs.push(spec); + }, + description: "Domain language deaggregator specification", + }, { long: "only", short: "o", @@ -49,10 +62,18 @@ export class DecollateOperation extends Operation { long: "list-deaggregators", type: "boolean", handler: () => { - throw new Error(deaggregatorRegistry.listImplementations()); + throw new HelpExit(deaggregatorRegistry.listImplementations()); }, description: "List available deaggregators", }, + { + long: "show-deaggregator", + type: "string", + handler: (v) => { + throw new HelpExit(deaggregatorRegistry.showImplementation(v as string)); + }, + description: "Show details of a specific deaggregator and exit", + }, ]; this.extraArgs = this.parseOptions(args, defs); @@ -114,6 +135,13 @@ export const documentation: CommandDoc = { description: "Deaggregator specification (colon-separated).", argument: "", }, + { + flags: ["--dldeaggregator", "-D"], + description: + "Domain language deaggregator specification. " + + "Shorthand for specifying deaggregators using the same comma-separated syntax as -d.", + argument: "", + }, { flags: ["--only", "-o"], description: @@ -124,6 +152,11 @@ export const documentation: CommandDoc = { flags: ["--list-deaggregators"], description: "List available deaggregators and exit.", }, + { + flags: ["--show-deaggregator"], + description: "Show details of a specific deaggregator and exit.", + argument: "", + }, ], examples: [ { @@ -134,6 +167,10 @@ export const documentation: CommandDoc = { description: "Decollate and only keep deaggregated fields", command: "recs decollate --only -d 'unarray,items,,item'", }, + { + description: "Expand a hash field into key-value records", + command: "recs decollate -d 'unhash,data,key,value'", + }, ], seeAlso: ["collate"], }; diff --git a/src/operations/transform/eval.ts b/src/operations/transform/eval.ts index 46b8e64..bfeee64 100644 --- a/src/operations/transform/eval.ts +++ b/src/operations/transform/eval.ts @@ -9,9 +9,6 @@ import { createSnippetRunner, isJsLang, langOptionDef } from "../../snippets/ind * Evaluate a JS snippet on each record and output the result as a line. * This is NOT a record stream output—it prints raw text lines. * - * When --lang is set to a non-JS language, the external runner modifies the - * record and the modified record is output as a JSON line. - * * Analogous to App::RecordStream::Operation::eval in Perl. */ export class EvalOperation extends Operation { @@ -81,7 +78,7 @@ export class EvalOperation extends Operation { let result = String(value ?? ""); if (this.chomp) { - result = result.replace(/\n+$/, ""); + result = result.replace(/\n$/, ""); } this.pushLine(result); @@ -97,7 +94,11 @@ export class EvalOperation extends Operation { continue; } if (result.record) { - this.pushLine(JSON.stringify(result.record)); + let text = JSON.stringify(result.record); + if (this.chomp) { + text = text.replace(/\n$/, ""); + } + this.pushLine(text); } } } @@ -118,9 +119,7 @@ export const documentation: CommandDoc = { "Evaluate an expression on each record and print the result as a line " + "of text. This is NOT a record stream output -- it prints raw text lines. " + "The expression is evaluated with r set to the current Record object and " + - "line set to the current line number (starting at 1). " + - "When --lang is used with a non-JS language, the record is modified by " + - "the snippet and output as a JSON line.", + "line set to the current line number (starting at 1).", options: [ { flags: ["--chomp"], diff --git a/src/operations/transform/generate.ts b/src/operations/transform/generate.ts index a4e3621..6b2860b 100644 --- a/src/operations/transform/generate.ts +++ b/src/operations/transform/generate.ts @@ -1,9 +1,10 @@ +import { spawnSync } from "node:child_process"; import { Operation } from "../../Operation.ts"; import type { OptionDef } from "../../Operation.ts"; import { Executor, autoReturn, snippetFromFileOption, executorCommandDocOptions } from "../../Executor.ts"; import { Record } from "../../Record.ts"; import type { JsonObject, JsonValue } from "../../types/json.ts"; -import { setKey } from "../../KeySpec.ts"; +import { findKey, setKey } from "../../KeySpec.ts"; import type { SnippetRunner } from "../../snippets/SnippetRunner.ts"; import { createSnippetRunner, isJsLang, langOptionDef } from "../../snippets/index.ts"; @@ -22,6 +23,8 @@ export class GenerateOperation extends Operation { executor!: Executor; keychain = "_chain"; passthrough = false; + shellMode = false; + shellCommand = ""; lang: string | null = null; runner: SnippetRunner | null = null; bufferedRecords: Record[] = []; @@ -48,6 +51,14 @@ export class GenerateOperation extends Operation { handler: () => { this.passthrough = true; }, description: "Emit input record in addition to generated records", }, + { + long: "shell", + type: "boolean", + handler: () => { this.shellMode = true; }, + description: "Execute the expression as a shell command instead of a code snippet. " + + "Each line of stdout is parsed as a JSON record. " + + "Use {{keyspec}} for template interpolation from the input record.", + }, { long: "expr", short: "e", @@ -73,7 +84,9 @@ export class GenerateOperation extends Operation { this.extraArgs = remaining.slice(1); } - if (this.lang && !isJsLang(this.lang)) { + if (this.shellMode) { + this.shellCommand = expression; + } else if (this.lang && !isJsLang(this.lang)) { this.runner = createSnippetRunner(this.lang); void this.runner.init(expression, { mode: "generate" }); } else { @@ -91,6 +104,11 @@ export class GenerateOperation extends Operation { this.pushRecord(record); } + if (this.shellMode) { + this.executeShellCommand(record); + return true; + } + const result = this.executor.executeCode(record); // Result should be an array of records (or objects) @@ -118,6 +136,62 @@ export class GenerateOperation extends Operation { return true; } + /** + * Interpolate {{keyspec}} placeholders in a shell command using values + * from the given record, then execute the command and parse each line + * of stdout as a JSON record. + */ + executeShellCommand(record: Record): void { + // Interpolate {{keyspec}} with record values + const command = this.shellCommand.replace( + /\{\{(.*?)\}\}/g, + (_match, keyspec: string) => { + const value = findKey(record.dataRef() as JsonObject, "@" + keyspec); + if (value === undefined || value === null) return ""; + return String(value); + } + ); + + const result = spawnSync(command, { + shell: true, + encoding: "utf-8", + maxBuffer: 100 * 1024 * 1024, // 100MB + }); + + if (result.error) { + process.stderr.write(`generate: shell command failed: ${result.error.message}\n`); + return; + } + + if (result.status !== 0) { + const stderr = result.stderr ? result.stderr.trim() : ""; + process.stderr.write( + `generate: shell command exited with status ${result.status}${stderr ? ": " + stderr : ""}\n` + ); + return; + } + + if (result.stderr && result.stderr.trim()) { + process.stderr.write(result.stderr); + } + + const output = result.stdout ?? ""; + const lines = output.split("\n").filter((line: string) => line.trim() !== ""); + for (const line of lines) { + try { + const genRecord = Record.fromJSON(line); + setKey( + genRecord.dataRef() as JsonObject, + this.keychain, + record.toJSON() as JsonValue + ); + this.pushRecord(genRecord); + } catch { + process.stderr.write(`generate: failed to parse JSON from shell output: ${line}\n`); + } + } + } + override streamDone(): void { if (this.runner && this.bufferedRecords.length > 0) { const results = this.runner.executeBatch(this.bufferedRecords); @@ -161,7 +235,10 @@ export const documentation: CommandDoc = { "Execute an expression for each record to generate new records. " + "The expression should return an array of new record objects (or a single " + "record). Each generated record gets a chain link back to the original " + - "input record under the '_chain' key (configurable via --keychain).", + "input record under the '_chain' key (configurable via --keychain). " + + "With --shell, the expression is executed as a shell command and each line " + + "of stdout is parsed as a JSON record. Use {{keyspec}} for template " + + "interpolation from the input record in shell commands.", options: [ { flags: ["--keychain"], @@ -174,6 +251,13 @@ export const documentation: CommandDoc = { flags: ["--passthrough"], description: "Emit the input record in addition to the generated records.", }, + { + flags: ["--shell"], + description: + "Execute the expression as a shell command instead of a code snippet. " + + "Each line of stdout is parsed as a JSON record. " + + "Use {{keyspec}} for template interpolation from the input record.", + }, ...executorCommandDocOptions(), ], examples: [ @@ -183,6 +267,14 @@ export const documentation: CommandDoc = { command: "recs generate 'fetchFeed(r.url).map(item => ({ title: item.title }))'", }, + { + description: "Execute a shell command to generate JSON records", + command: "recs generate --shell 'echo {\"x\": 1}'", + }, + { + description: "Shell command with template interpolation from input record", + command: "recs generate --shell 'echo {\"name\": \"{{name}}\"}'", + }, ], seeAlso: ["xform", "chain"], }; diff --git a/src/operations/transform/multiplex.ts b/src/operations/transform/multiplex.ts index 9bbda71..0884575 100644 --- a/src/operations/transform/multiplex.ts +++ b/src/operations/transform/multiplex.ts @@ -8,7 +8,7 @@ import { Record } from "../../Record.ts"; import { createOperation } from "./chain.ts"; import { findKey } from "../../KeySpec.ts"; import { snippetValuation } from "../../DomainLanguage.ts"; -import type { JsonObject } from "../../types/json.ts"; +import type { JsonObject, JsonValue } from "../../types/json.ts"; /** * ClumperCallback for multiplex: creates a separate operation instance @@ -68,11 +68,11 @@ class MultiplexClumperCallback implements ClumperCallback { const collector = new CollectorReceiver(); const op = createOperation(this.operationName, [...this.operationArgs], collector); const outputFile = this.resolveOutputFile(options); - return { operation: op, collector, outputFile }; + return { operation: op, collector, outputFile, options }; } clumperCallbackPushRecord(cookie: unknown, record: Record): void { - const state = cookie as { operation: Operation; collector: CollectorReceiver; outputFile: string | null }; + const state = cookie as { operation: Operation; collector: CollectorReceiver; outputFile: string | null; options: { [key: string]: unknown } }; if (this.lineKey) { const data = record.dataRef() as JsonObject; @@ -86,9 +86,16 @@ class MultiplexClumperCallback implements ClumperCallback { } clumperCallbackEnd(cookie: unknown): void { - const state = cookie as { operation: Operation; collector: CollectorReceiver; outputFile: string | null }; + const state = cookie as { operation: Operation; collector: CollectorReceiver; outputFile: string | null; options: { [key: string]: unknown } }; state.operation.finish(); + // Merge bucket keys into output records (matching Perl behavior) + for (const record of state.collector.records) { + for (const [key, val] of Object.entries(state.options)) { + record.set(key, val as JsonValue); + } + } + if (state.outputFile) { // Write output to file const dir = dirname(state.outputFile); diff --git a/src/operations/transform/normalizetime.ts b/src/operations/transform/normalizetime.ts index 2491632..84d4feb 100644 --- a/src/operations/transform/normalizetime.ts +++ b/src/operations/transform/normalizetime.ts @@ -3,6 +3,7 @@ import type { OptionDef } from "../../Operation.ts"; import { findKey } from "../../KeySpec.ts"; import { Record } from "../../Record.ts"; import type { JsonObject } from "../../types/json.ts"; +import * as chrono from "chrono-node"; /** * Normalize time fields to specified time bucket thresholds. @@ -88,10 +89,16 @@ export class NormalizeTimeOperation extends Operation { if (this.epoch) { time = Number(value); } else { - // Try to parse the date value as a Date object - const parsed = new Date(String(value)); + const dateStr = String(value); + // Try native Date first (handles ISO 8601, RFC 2822, etc.) + let parsed = new Date(dateStr); if (isNaN(parsed.getTime())) { - throw new Error(`Cannot parse date from key: ${this.key}, value: ${String(value)}`); + // Fall back to chrono-node for natural language and non-standard formats + const chronoParsed = chrono.parseDate(dateStr); + if (!chronoParsed) { + throw new Error(`Cannot parse date from key: ${this.key}, value: ${dateStr}`); + } + parsed = chronoParsed; } time = parsed.getTime() / 1000; // Convert to epoch seconds } @@ -117,33 +124,45 @@ export class NormalizeTimeOperation extends Operation { } } +const DURATION_MULTIPLIERS: { [unit: string]: number } = { + s: 1, sec: 1, second: 1, seconds: 1, + m: 60, min: 60, minute: 60, minutes: 60, + h: 3600, hr: 3600, hour: 3600, hours: 3600, + d: 86400, day: 86400, days: 86400, + w: 604800, week: 604800, weeks: 604800, +}; + /** - * Parse a simple duration string into seconds. - * Supports: "N seconds", "N minutes", "N hours", "N days", "N weeks" + * Parse a duration string into seconds. + * Supports single-unit ("5 minutes") and multi-unit ("3 days 2 hours 30 minutes"). */ function parseDuration(str: string): number { - const match = str.trim().match(/^(\d+(?:\.\d+)?)\s*(\w+)$/); - if (!match) { - throw new Error(`Cannot parse duration: '${str}'`); - } - - const num = parseFloat(match[1]!); - const unit = match[2]!.toLowerCase(); + const trimmed = str.trim(); + + // Match all "number unit" pairs in the string + const pattern = /(\d+(?:\.\d+)?)\s*([a-zA-Z]+)/g; + let match: RegExpExecArray | null; + let totalSeconds = 0; + let matchCount = 0; + + while ((match = pattern.exec(trimmed)) !== null) { + matchCount++; + const num = parseFloat(match[1]!); + const unit = match[2]!.toLowerCase(); + + const mult = DURATION_MULTIPLIERS[unit]; + if (mult === undefined) { + throw new Error(`Unknown duration unit: '${unit}'`); + } - const multipliers: { [unit: string]: number } = { - s: 1, sec: 1, second: 1, seconds: 1, - m: 60, min: 60, minute: 60, minutes: 60, - h: 3600, hr: 3600, hour: 3600, hours: 3600, - d: 86400, day: 86400, days: 86400, - w: 604800, week: 604800, weeks: 604800, - }; + totalSeconds += num * mult; + } - const mult = multipliers[unit]; - if (mult === undefined) { - throw new Error(`Unknown duration unit: '${unit}'`); + if (matchCount === 0) { + throw new Error(`Cannot parse duration: '${str}'`); } - return num * mult; + return totalSeconds; } function normalizeTimeFullHelp(): string { @@ -205,6 +224,9 @@ THRESHOLD: The threshold can be a plain number (seconds) or a duration string: 300, "5 minutes", "1 hour", "1 day", "1 week" + Multi-unit durations are also supported: + "3 days 2 hours", "1 hour 30 minutes", "2d 12h" + Supported units: s/sec/second/seconds, m/min/minute/minutes, h/hr/hour/hours, d/day/days, w/week/weeks @@ -245,7 +267,7 @@ export const documentation: CommandDoc = { flags: ["--threshold", "-n"], description: "Number of seconds in each bucket. May also be a duration string " + - "like '1 week' or '5 minutes'.", + "like '1 week', '5 minutes', or '3 days 2 hours'.", argument: "