[--json]
```
-```
-$ devha headers https://example.com
-
-┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
-┃ HEADER ┃ VALUE ┃
-┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
-│ Content-Type │ text/html; charset=UTF-8 │
-│ Strict-Transport-Security │ max-age=63072000 │
-│ X-Frame-Options │ DENY │
-└────────────────────────────┴──────────────────────────────────────┘
-
-╭─ Security Score: 4/6 ⚠️ ──────────────────────────────╮
-│ ✅ Strict-Transport-Security │
-│ ✅ X-Content-Type-Options │
-│ ✅ X-Frame-Options │
-│ ✅ Referrer-Policy │
-│ ❌ Content-Security-Policy (missing) │
-│ ❌ Permissions-Policy (missing) │
-╰────────────────────────────────────────────────────────╯
-```
+Fetches all response headers and audits the presence of 6 critical security headers:
+
+- `Content-Security-Policy`
+- `Strict-Transport-Security`
+- `X-Content-Type-Options`
+- `X-Frame-Options`
+- `Referrer-Policy`
+- `Permissions-Policy`
+
+Outputs a `★★★★★☆` score with explanations for missing headers.
+
🏓 ping — Educational packet-level ICMP
```bash
-devha ping [--count 4] [--show-packet]
+sudo devha ping [--count 4] [--show-packet] [--timeout 2]
```
-Built on Scapy. Shows you what an ICMP packet actually *looks like* — perfect for learning networking.
+Uses Scapy to send raw ICMP packets and shows TTL, RTT, and size. `--show-packet` displays the full packet summary for learning purposes.
+
+> ⚠️ Requires root/admin on most systems.
-> Requires root/admin privileges on most systems.
---
## ⚙️ Configuration
-`devha` reads optional defaults from `~/.config/devha/config.toml`:
+devha reads optional defaults from `~/.config/devha/config.toml`:
```toml
[defaults]
@@ -311,10 +297,10 @@ timeout = 5.0
user_agent = "devha/0.1.0"
[colors]
-banner = "cyan"
+banner = "cyan"
success = "bright_green"
warning = "yellow"
-error = "bright_red"
+error = "bright_red"
```
You can also use `devha --no-banner` to hide the ASCII banner for cleaner output in CI/scripting contexts.
@@ -323,43 +309,44 @@ You can also use `devha --no-banner` to hide the ASCII banner for cleaner output
## ⚖️ Ethical Use
-> **`devha` is a learning tool. Use it to understand networks and security — not to break things.**
+> devha is a **learning tool**. Use it to understand networks and security — not to break things.
+
+### ✅ Allowed
-✅ **Allowed**
- Your own systems and networks
-- Legal practice ranges: `scanme.nmap.org`, [HackTheBox](https://hackthebox.com), [TryHackMe](https://tryhackme.com), [PicoCTF](https://picoctf.org)
+- Legal practice ranges: `scanme.nmap.org`, HackTheBox, TryHackMe, PicoCTF
- Targets where you have **explicit written permission** (bug bounties, pentesting contracts)
- Public APIs that openly allow it (GitHub, crt.sh, etc.)
-❌ **Not allowed**
+### ❌ Not allowed
+
- Scanning, crawling, or harvesting systems you don't own without permission
-- Any activity that violates [the Computer Fraud and Abuse Act (US)](https://www.law.cornell.edu/uscode/text/18/1030), [Computer Misuse Act (UK)](https://www.legislation.gov.uk/ukpga/1990/18/contents), [Wet computercriminaliteit (NL)](https://wetten.overheid.nl/), or your local equivalent
+- Any activity that violates the Computer Fraud and Abuse Act (US), Computer Misuse Act (UK), Wet computercriminaliteit (NL), or your local equivalent
-Every active-scan command in `devha` shows a confirmation prompt before sending traffic. **You waive your right to claim ignorance the moment you press `y`.** Don't be that person.
+Every active-scan command in devha shows a **confirmation prompt** before sending traffic. You waive your right to claim ignorance the moment you press `y`. Don't be that person.
-The maintainers are not responsible for misuse. Be smart, be legal, be kind.
+*The maintainers are not responsible for misuse. Be smart, be legal, be kind.*
---
## 🤝 Contributing
Contributions are very welcome — especially:
-- 🌐 New sites for the `username` checker (just edit `devha/data/sites.json`)
-- 📝 Better wordlists for `subdomains` and `dirscan`
-- 🎨 New ciphers (Playfair, Hill, Enigma?)
-- 🐛 Bug fixes & test coverage
-- 📖 Translations of the README
-Quick start:
+- 🌐 **New sites** for the username checker (just edit `devha/data/sites.json`)
+- 📝 **Better wordlists** for subdomains and dirscan
+- 🎨 **New ciphers** (Playfair, Hill, Enigma?)
+- 🐛 **Bug fixes & test coverage**
+- 📖 **Translations** of the README
+
```bash
-git clone https://github.com/YOUR-USERNAME/devha.git
+git clone https://github.com/waldex451/devha.git
cd devha
poetry install
poetry run pytest
-poetry run pre-commit install
```
-See [CONTRIBUTING.md](CONTRIBUTING.md) for details. By contributing, you agree to the [Code of Conduct](CODE_OF_CONDUCT.md).
+See [CONTRIBUTING.md](CONTRIBUTING.md) for details.
---
@@ -370,37 +357,37 @@ See [CONTRIBUTING.md](CONTRIBUTING.md) for details. By contributing, you agree t
- [ ] Shell completions (bash, zsh, fish, PowerShell)
- [ ] More cipher types (Enigma, Playfair, Hill)
- [ ] Plugin system for community-contributed commands
-- [ ] Integration with Have-I-Been-Pwned for `username` checks
+- [ ] Integration with Have-I-Been-Pwned for username checks
-Vote for features in [Discussions](https://github.com/YOUR-USERNAME/devha/discussions) or open an [issue](https://github.com/YOUR-USERNAME/devha/issues).
+Vote for features in [Discussions](https://github.com/waldex451/devha/discussions) or open an issue.
---
## 🙏 Acknowledgements
-`devha` stands on the shoulders of giants:
+devha stands on the shoulders of giants:
- 🦸 [Sherlock](https://github.com/sherlock-project/sherlock) — for proving one CLI can have 60k+ stars
- 🌐 [Sublist3r](https://github.com/aboul3la/Sublist3r) — subdomain enum done right
- 📁 [dirsearch](https://github.com/maurosoria/dirsearch) — directory discovery
- 🕷️ [Photon](https://github.com/s0md3v/Photon) — fast crawler
- 🌾 [theHarvester](https://github.com/laramies/theHarvester) — OSINT classic
-- 📦 [Scapy](https://github.com/secdev/scapy) — packet magic in Python
+- 📦 [Scapy](https://scapy.net/) — packet magic in Python
- 🎨 [Rich](https://github.com/Textualize/rich) — for making Python terminals beautiful
-- ⌨️ [Typer](https://github.com/tiangolo/typer) — for the cleanest CLI framework around
+- ⌨️ [Typer](https://typer.tiangolo.com/) — for the cleanest CLI framework around
---
## 📜 License
-MIT © [YOUR-NAME](https://github.com/YOUR-USERNAME) — see [LICENSE](LICENSE).
+MIT © waldex451 — see [LICENSE](LICENSE).
---
-**⭐ If `devha` saved you a few `pip install`s, consider giving it a star — it really helps!**
+⭐ **If devha saved you a few `pip install`s, consider giving it a star — it really helps!**
-Made with 🐍 and a healthy obsession with terminal aesthetics.
+*Made with 🐍 and a healthy obsession with terminal aesthetics.*
diff --git a/devha/__init__.py b/devha/__init__.py
new file mode 100644
index 0000000..3dc1f76
--- /dev/null
+++ b/devha/__init__.py
@@ -0,0 +1 @@
+__version__ = "0.1.0"
diff --git a/devha/__main__.py b/devha/__main__.py
new file mode 100644
index 0000000..b831c35
--- /dev/null
+++ b/devha/__main__.py
@@ -0,0 +1,4 @@
+from devha.cli import app
+
+if __name__ == "__main__":
+ app()
diff --git a/devha/cli.py b/devha/cli.py
new file mode 100644
index 0000000..58751d8
--- /dev/null
+++ b/devha/cli.py
@@ -0,0 +1,275 @@
+"""Main Typer application for devha — Hacking Studio v2.0."""
+
+from __future__ import annotations
+
+import subprocess
+import sys
+from typing import Annotated
+
+import typer
+
+from devha import __version__
+from devha.ui import print_banner, console
+
+from devha.commands import (
+ portscan,
+ username,
+ wifi,
+ cipher,
+ subdomains,
+ dirscan,
+ crawl,
+ harvest,
+ headers,
+ ping,
+ wifilab,
+ passlab,
+ packetlab,
+)
+from devha.commands.devscanner import devscanner
+from devha.commands.dnsrecon import dnsrecon
+from devha.commands.sslcheck import sslcheck
+from devha.commands.ipinfo import ipinfo
+from devha.commands.webvuln import webvuln
+from devha.commands.whois_cmd import whois_lookup
+
+app = typer.Typer(
+ name="devha",
+ help="[cyan]devha Hacking Studio v2.0[/cyan] — ethical hacking & developer toolkit.",
+ rich_markup_mode="rich",
+ no_args_is_help=False,
+ add_completion=True,
+)
+
+# ─── Sub-apps ─────────────────────────────────────────────────────────────────
+app.add_typer(cipher.app, name="cipher", help="🔐 Classical & modern ciphers.")
+app.add_typer(wifilab.app, name="wifilab", help="📡 WiFi Lab — scan, map devices, test own AP.")
+app.add_typer(passlab.app, name="passlab", help="🔑 Password Lab — hash, crack, generate.")
+app.add_typer(packetlab.app, name="packetlab", help="📦 Packet Lab — capture, ARP scan, builder.")
+
+# ─── Single commands ──────────────────────────────────────────────────────────
+app.command("portscan")(portscan.portscan)
+app.command("username")(username.username)
+app.command("wifi")(wifi.wifi)
+app.command("subdomains")(subdomains.subdomains)
+app.command("dirscan")(dirscan.dirscan)
+app.command("crawl")(crawl.crawl)
+app.command("harvest")(harvest.harvest)
+app.command("headers")(headers.headers)
+app.command("ping")(ping.ping)
+app.command("devscanner")(devscanner)
+app.command("dnsrecon")(dnsrecon)
+app.command("sslcheck")(sslcheck)
+app.command("ipinfo")(ipinfo)
+app.command("webvuln")(webvuln)
+app.command("whois")(whois_lookup)
+
+
+# ─── Studio TUI ───────────────────────────────────────────────────────────────
+
+@app.command("studio")
+def studio(
+ no_fx: Annotated[bool, typer.Option("--no-fx", help="Skip boot animation.")] = False,
+) -> None:
+ """🎮 Open the interactive Hacking Studio TUI menu."""
+ if not no_fx:
+ from devha.fx import hacker_boot
+ hacker_boot()
+ from devha.studio import run_studio
+ module = run_studio()
+ if module:
+ _launch_module(module)
+
+
+# ─── Version + main callback ─────────────────────────────────────────────────
+
+def _version_callback(value: bool) -> None:
+ if value:
+ console.print(f"[cyan]devha[/cyan] Hacking Studio version [bold]{__version__}[/bold]")
+ raise typer.Exit()
+
+
+@app.callback(invoke_without_command=True)
+def main(
+ ctx: typer.Context,
+ version: Annotated[bool, typer.Option("--version", "-V", callback=_version_callback, is_eager=True, help="Show version.")] = False,
+ no_banner: Annotated[bool, typer.Option("--no-banner", help="Skip the ASCII banner.")] = False,
+ studio_mode: Annotated[bool, typer.Option("--studio", "-s", help="Launch interactive Studio TUI.")] = False,
+) -> None:
+ """[cyan bold]devha[/cyan bold] Hacking Studio — Developer & Hacking CLI v2.0."""
+ if studio_mode:
+ ctx.invoke(studio)
+ return
+ if ctx.invoked_subcommand is None:
+ from devha.fx import hacker_boot
+ if not no_banner:
+ hacker_boot()
+ from devha.studio import run_studio
+ module = run_studio()
+ if module:
+ _launch_module(module)
+ elif not no_banner:
+ print_banner()
+
+
+# ─── Interactive launcher ─────────────────────────────────────────────────────
+
+def _ask(prompt: str) -> str:
+ try:
+ return input(f" {prompt}: ").strip()
+ except (EOFError, KeyboardInterrupt):
+ return ""
+
+
+def _run(*args: str) -> None:
+ subprocess.run(["devha", *args])
+
+
+def _launch_module(module: str) -> None: # noqa: C901
+ console.print()
+
+ if module == "network":
+ console.print("[bold cyan]── Network & Port Scanner ──[/bold cyan]")
+ host = _ask("Target host or IP (e.g. scanme.nmap.org)")
+ if host:
+ _run("devscanner", host)
+
+ elif module == "web":
+ console.print("[bold cyan]── Web Recon ──[/bold cyan]")
+ console.print(" [1] Directory scan — find hidden paths\n [2] Crawl — extract links, emails, API keys\n [3] Security headers audit\n [4] Vulnerability scan — exposed files, misconfigs")
+ choice = _ask("Choose (1/2/3/4)")
+ url = _ask("URL (e.g. https://example.com)")
+ if not url:
+ return
+ if not url.startswith("http"):
+ url = "https://" + url
+ if choice == "1":
+ _run("dirscan", url)
+ elif choice == "2":
+ _run("crawl", url)
+ elif choice == "3":
+ _run("headers", url)
+ elif choice == "4":
+ _run("webvuln", url)
+
+ elif module == "dns":
+ console.print("[bold cyan]── DNS, SSL & WHOIS ──[/bold cyan]")
+ console.print(" [1] DNS Recon — all records, zone transfer, SPF/DMARC\n [2] SSL/TLS Inspector — certificate, expiry, protocol\n [3] WHOIS — registrar, creation date, nameservers")
+ choice = _ask("Choose (1/2/3)")
+ if choice == "1":
+ domain = _ask("Domain (e.g. example.com)")
+ if domain:
+ _run("dnsrecon", domain)
+ elif choice == "2":
+ host = _ask("Hostname (e.g. github.com)")
+ if host:
+ _run("sslcheck", host)
+ elif choice == "3":
+ target = _ask("Domain or IP")
+ if target:
+ _run("whois", target)
+
+ elif module == "osint":
+ console.print("[bold cyan]── OSINT Suite ──[/bold cyan]")
+ console.print(" [1] Username — check 50+ social platforms\n [2] Domain harvest — emails, links, tech stack\n [3] Subdomain discovery\n [4] IP Intelligence — location, ISP, VPN detection")
+ choice = _ask("Choose (1/2/3/4)")
+ if choice == "1":
+ name = _ask("Username")
+ if name:
+ _run("username", name)
+ elif choice == "2":
+ domain = _ask("Domain (e.g. example.com)")
+ if domain:
+ _run("harvest", domain)
+ elif choice == "3":
+ domain = _ask("Domain (e.g. example.com)")
+ if domain:
+ _run("subdomains", domain)
+ elif choice == "4":
+ target = _ask("IP address or hostname")
+ if target:
+ _run("ipinfo", target)
+
+ elif module == "cipher":
+ console.print("[bold cyan]── Crypto & Cipher ──[/bold cyan]")
+ console.print(" [1] Encode text\n [2] Decode text\n [3] Crack caesar cipher\n [4] Show all encodings at once")
+ choice = _ask("Choose (1/2/3/4)")
+ if choice in ("1", "2"):
+ text = _ask("Text")
+ if not text:
+ return
+ console.print(" Types: caesar rot13 rot47 atbash vigenere base64 hex binary morse")
+ cipher_type = _ask("Cipher type") or "caesar"
+ op = "encode" if choice == "1" else "decode"
+ if cipher_type == "caesar":
+ key = _ask("Shift (default 13)") or "13"
+ _run("cipher", op, text, "--type", cipher_type, "--key", key)
+ elif cipher_type == "vigenere":
+ key = _ask("Key word")
+ if key:
+ _run("cipher", op, text, "--type", cipher_type, "--key", key)
+ else:
+ _run("cipher", op, text, "--type", cipher_type)
+ elif choice == "3":
+ text = _ask("Ciphertext to crack")
+ if text:
+ _run("cipher", "crack", text)
+ elif choice == "4":
+ text = _ask("Text")
+ if text:
+ _run("cipher", "all", text)
+
+ elif module == "password":
+ console.print("[bold cyan]── Password Lab ──[/bold cyan]")
+ console.print(" [1] Generate strong passwords\n [2] Hash a password\n [3] Check password strength\n [4] Identify hash type\n [5] Crack a hash")
+ choice = _ask("Choose (1/2/3/4/5)")
+ if choice == "1":
+ _run("passlab", "generate")
+ elif choice == "2":
+ pw = _ask("Password to hash")
+ if pw:
+ _run("passlab", "hash", pw)
+ elif choice == "3":
+ pw = _ask("Password to check")
+ if pw:
+ _run("passlab", "strength", pw)
+ elif choice == "4":
+ h = _ask("Hash string")
+ if h:
+ _run("passlab", "identify", h)
+ elif choice == "5":
+ h = _ask("Hash to crack")
+ if h:
+ _run("passlab", "crack", h)
+
+ elif module == "packets":
+ console.print("[bold cyan]── Packet Lab ──[/bold cyan]")
+ console.print(" [1] ARP scan — find all devices on network (needs sudo)\n [2] Capture live packets (needs sudo)\n [3] Traffic stats (needs sudo)")
+ choice = _ask("Choose (1/2/3)")
+ if choice == "1":
+ console.print(" [dim]Run: sudo devha packetlab arp-scan[/dim]")
+ _run("packetlab", "arp-scan")
+ elif choice == "2":
+ iface = _ask("Interface (e.g. en0, eth0)") or "eth0"
+ count = _ask("Number of packets (default 20)") or "20"
+ _run("packetlab", "capture", "--iface", iface, "--count", count)
+ elif choice == "3":
+ _run("packetlab", "stats")
+
+ elif module == "wifi":
+ console.print("[bold cyan]── WiFi Lab ──[/bold cyan]")
+ console.print(" [1] Scan nearby WiFi networks\n [2] Find devices on my network (needs sudo)\n [3] Check my router for security issues")
+ choice = _ask("Choose (1/2/3)")
+ if choice == "1":
+ _run("wifilab", "scan")
+ elif choice == "2":
+ _run("wifilab", "devices")
+ elif choice == "3":
+ host = _ask("Router IP (default 192.168.1.1)") or "192.168.1.1"
+ _run("wifilab", "security", host)
+
+ elif module == "ping":
+ console.print("[bold cyan]── Ping ──[/bold cyan]")
+ host = _ask("Hostname or IP (e.g. 8.8.8.8)")
+ if host:
+ _run("ping", host)
diff --git a/devha/commands/__init__.py b/devha/commands/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/devha/commands/_cipher_tui.py b/devha/commands/_cipher_tui.py
new file mode 100644
index 0000000..63acb5e
--- /dev/null
+++ b/devha/commands/_cipher_tui.py
@@ -0,0 +1,71 @@
+"""Textual TUI for live cipher encode/decode."""
+
+from __future__ import annotations
+
+from textual.app import App, ComposeResult
+from textual.containers import Vertical, Horizontal
+from textual.widgets import Header, Footer, Input, Select, Label, Static
+from textual.reactive import reactive
+
+from devha.commands.cipher import caesar_encode, rot13_encode, atbash_encode, vigenere_encode
+
+CIPHERS = ["caesar", "rot13", "atbash", "vigenere"]
+
+
+class CipherTUI(App):
+ """Live cipher encode/decode TUI."""
+
+ CSS = """
+ Screen { background: #0d0d0d; }
+ Label { color: cyan; margin: 1 0 0 1; }
+ Input { margin: 0 1; }
+ Static#output { color: bright_green; margin: 1; padding: 1; border: round cyan; height: 5; }
+ """
+
+ TITLE = "devha — Cipher TUI"
+ BINDINGS = [("q", "quit", "Quit"), ("ctrl+c", "quit", "Quit")]
+
+ cipher_type: reactive[str] = reactive("caesar")
+ key_value: reactive[str] = reactive("13")
+
+ def compose(self) -> ComposeResult:
+ yield Header()
+ with Vertical():
+ yield Label("Cipher type:")
+ yield Select([(c, c) for c in CIPHERS], value="caesar", id="cipher_select")
+ yield Label("Key (shift for caesar, phrase for vigenere):")
+ yield Input(placeholder="13", id="key_input")
+ yield Label("Plaintext:")
+ yield Input(placeholder="Type something...", id="plain_input")
+ yield Label("Output:")
+ yield Static("", id="output")
+ yield Footer()
+
+ def on_select_changed(self, event: Select.Changed) -> None:
+ self.cipher_type = str(event.value)
+ self._update_output()
+
+ def on_input_changed(self, event: Input.Changed) -> None:
+ if event.input.id == "key_input":
+ self.key_value = event.value
+ self._update_output()
+
+ def _update_output(self) -> None:
+ plain_input = self.query_one("#plain_input", Input)
+ text = plain_input.value
+ ct = self.cipher_type
+ key = self.key_value or "13"
+ try:
+ if ct == "caesar":
+ result = caesar_encode(text, int(key) if key.lstrip("-").isdigit() else 13)
+ elif ct == "rot13":
+ result = rot13_encode(text)
+ elif ct == "atbash":
+ result = atbash_encode(text)
+ elif ct == "vigenere":
+ result = vigenere_encode(text, key if key.isalpha() else "key")
+ else:
+ result = text
+ except Exception:
+ result = "⚠ Invalid input"
+ self.query_one("#output", Static).update(result)
diff --git a/devha/commands/cipher.py b/devha/commands/cipher.py
new file mode 100644
index 0000000..27c130f
--- /dev/null
+++ b/devha/commands/cipher.py
@@ -0,0 +1,209 @@
+"""Cipher command: encode, decode, crack classical ciphers."""
+
+from __future__ import annotations
+
+import json
+from typing import Annotated
+
+import typer
+from rich.text import Text
+
+from devha.ui import console, make_table, print_panel, error
+
+app = typer.Typer(help="Classical cipher [encode / decode / crack].", rich_markup_mode="rich")
+
+# English letter frequency for readability scoring
+_EN_FREQ = {
+ "e": 12.70, "t": 9.06, "a": 8.17, "o": 7.51, "i": 6.97,
+ "n": 6.75, "s": 6.33, "h": 6.09, "r": 5.99, "d": 4.25,
+ "l": 4.03, "c": 2.78, "u": 2.76, "m": 2.41, "w": 2.36,
+ "f": 2.23, "g": 2.02, "y": 1.97, "p": 1.93, "b": 1.29,
+ "v": 0.98, "k": 0.77, "j": 0.15, "x": 0.15, "q": 0.10, "z": 0.07,
+}
+
+
+# ─── Core cipher implementations ─────────────────────────────────────────────
+
+
+def caesar_encode(text: str, shift: int) -> str:
+ result = []
+ for ch in text:
+ if ch.isalpha():
+ base = ord("A") if ch.isupper() else ord("a")
+ result.append(chr((ord(ch) - base + shift) % 26 + base))
+ else:
+ result.append(ch)
+ return "".join(result)
+
+
+def caesar_decode(text: str, shift: int) -> str:
+ return caesar_encode(text, -shift)
+
+
+def rot13_encode(text: str) -> str:
+ return caesar_encode(text, 13)
+
+
+def rot13_decode(text: str) -> str:
+ return rot13_encode(text)
+
+
+def atbash_encode(text: str) -> str:
+ result = []
+ for ch in text:
+ if ch.isalpha():
+ base = ord("A") if ch.isupper() else ord("a")
+ result.append(chr(base + 25 - (ord(ch) - base)))
+ else:
+ result.append(ch)
+ return "".join(result)
+
+
+def atbash_decode(text: str) -> str:
+ return atbash_encode(text)
+
+
+def vigenere_encode(text: str, key: str) -> str:
+ key = key.lower()
+ result = []
+ ki = 0
+ for ch in text:
+ if ch.isalpha():
+ shift = ord(key[ki % len(key)]) - ord("a")
+ base = ord("A") if ch.isupper() else ord("a")
+ result.append(chr((ord(ch) - base + shift) % 26 + base))
+ ki += 1
+ else:
+ result.append(ch)
+ return "".join(result)
+
+
+def vigenere_decode(text: str, key: str) -> str:
+ key = key.lower()
+ result = []
+ ki = 0
+ for ch in text:
+ if ch.isalpha():
+ shift = ord(key[ki % len(key)]) - ord("a")
+ base = ord("A") if ch.isupper() else ord("a")
+ result.append(chr((ord(ch) - base - shift) % 26 + base))
+ ki += 1
+ else:
+ result.append(ch)
+ return "".join(result)
+
+
+def _readability_score(text: str) -> float:
+ """Score 0-100 based on English letter frequency match."""
+ letters = [ch.lower() for ch in text if ch.isalpha()]
+ if not letters:
+ return 0.0
+ total = len(letters)
+ score = sum(_EN_FREQ.get(ch, 0) * (letters.count(ch) / total) for ch in set(letters))
+ return round(score, 2)
+
+
+# ─── Typer sub-commands ───────────────────────────────────────────────────────
+
+
+@app.command("encode")
+def encode(
+ text: Annotated[str, typer.Argument(help="Text to encode.")],
+ cipher_type: Annotated[str, typer.Option("--type", "-t", help="[caesar|vigenere|rot13|atbash]")] = "caesar",
+ key: Annotated[str, typer.Option("--key", "-k", help="Key (shift for caesar, phrase for vigenere).")] = "13",
+ json_out: Annotated[bool, typer.Option("--json", help="Output as JSON.")] = False,
+) -> None:
+ """Encode text using a classical cipher."""
+ result = _apply_cipher(text, cipher_type, key, mode="encode")
+ if json_out:
+ console.print_json(json.dumps({"cipher": cipher_type, "input": text, "output": result}))
+ else:
+ print_panel(f"[bright_green]{result}[/bright_green]", title=f"Encoded ({cipher_type})")
+
+
+@app.command("decode")
+def decode(
+ text: Annotated[str, typer.Argument(help="Text to decode.")],
+ cipher_type: Annotated[str, typer.Option("--type", "-t", help="[caesar|vigenere|rot13|atbash]")] = "caesar",
+ key: Annotated[str, typer.Option("--key", "-k", help="Key.")] = "13",
+ json_out: Annotated[bool, typer.Option("--json", help="Output as JSON.")] = False,
+) -> None:
+ """Decode text using a classical cipher."""
+ result = _apply_cipher(text, cipher_type, key, mode="decode")
+ if json_out:
+ console.print_json(json.dumps({"cipher": cipher_type, "input": text, "output": result}))
+ else:
+ print_panel(f"[bright_green]{result}[/bright_green]", title=f"Decoded ({cipher_type})")
+
+
+@app.command("crack")
+def crack(
+ text: Annotated[str, typer.Argument(help="Ciphertext to crack.")],
+ cipher_type: Annotated[str, typer.Option("--type", "-t", help="Cipher type to crack (currently: caesar).")] = "caesar",
+ json_out: Annotated[bool, typer.Option("--json", help="Output as JSON.")] = False,
+) -> None:
+ """Try all possible keys and rank by readability score."""
+ if cipher_type.lower() != "caesar":
+ error("Only Caesar cracking is supported.")
+ raise typer.Exit(1)
+
+ results = []
+ for shift in range(1, 26):
+ decoded = caesar_decode(text, shift)
+ score = _readability_score(decoded)
+ results.append({"shift": shift, "text": decoded, "score": score})
+
+ results.sort(key=lambda x: x["score"], reverse=True)
+
+ if json_out:
+ console.print_json(json.dumps(results))
+ return
+
+ table = make_table("SHIFT", "SCORE", "PLAINTEXT", title="Caesar Crack — All 25 Shifts")
+ for r in results:
+ score_style = "bright_green" if r["score"] == results[0]["score"] else "white"
+ table.add_row(
+ str(r["shift"]),
+ f"[{score_style}]{r['score']}[/{score_style}]",
+ r["text"][:80],
+ )
+ console.print(table)
+ console.print(f"\n[bold]Best guess (shift={results[0]['shift']}):[/bold] [bright_green]{results[0]['text']}[/bright_green]")
+
+
+@app.command("tui")
+def tui() -> None:
+ """Open an interactive TUI for live cipher encode/decode."""
+ try:
+ from devha.commands._cipher_tui import CipherTUI
+ app_tui = CipherTUI()
+ app_tui.run()
+ except ImportError:
+ error("Textual is not installed. Run: pip install textual")
+ raise typer.Exit(1)
+
+
+# ─── Helpers ─────────────────────────────────────────────────────────────────
+
+
+def _apply_cipher(text: str, cipher_type: str, key: str, mode: str) -> str:
+ ct = cipher_type.lower()
+ try:
+ if ct == "caesar":
+ shift = int(key)
+ return caesar_encode(text, shift) if mode == "encode" else caesar_decode(text, shift)
+ elif ct == "rot13":
+ return rot13_encode(text)
+ elif ct == "atbash":
+ return atbash_encode(text)
+ elif ct == "vigenere":
+ if not key.isalpha():
+ error("Vigenere key must be alphabetic.")
+ raise typer.Exit(1)
+ return vigenere_encode(text, key) if mode == "encode" else vigenere_decode(text, key)
+ else:
+ error(f"Unknown cipher: {cipher_type}. Choose: caesar, vigenere, rot13, atbash")
+ raise typer.Exit(1)
+ except ValueError:
+ error(f"Invalid key '{key}' for {cipher_type}.")
+ raise typer.Exit(1)
diff --git a/devha/commands/crawl.py b/devha/commands/crawl.py
new file mode 100644
index 0000000..19f0670
--- /dev/null
+++ b/devha/commands/crawl.py
@@ -0,0 +1,161 @@
+"""Crawl command — website crawler that extracts emails, links, and potential secrets."""
+
+from __future__ import annotations
+
+import json
+import re
+import urllib.robotparser
+from collections import defaultdict
+from typing import Annotated
+from urllib.parse import urljoin, urlparse
+
+import httpx
+import typer
+from bs4 import BeautifulSoup
+from rich.text import Text
+
+from devha.ui import console, print_panel, warn, info, error
+from devha.ethics import ethics_check
+
+# Patterns
+_EMAIL_RE = re.compile(r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}")
+_PHONE_RE = re.compile(r"(?:\+?\d[\d\s\-().]{7,}\d)")
+_API_KEY_RE = re.compile(r"['\"]([A-Za-z0-9_\-]{32,64})['\"]")
+_SOCIAL_DOMAINS = {"twitter.com", "x.com", "linkedin.com", "facebook.com", "instagram.com",
+ "github.com", "youtube.com", "tiktok.com"}
+
+
+def _same_origin(base: str, href: str) -> bool:
+ b = urlparse(base)
+ h = urlparse(href)
+ return h.netloc == "" or h.netloc == b.netloc
+
+
+def _can_fetch(rp: urllib.robotparser.RobotFileParser, url: str) -> bool:
+ return rp.can_fetch("*", url)
+
+
+def _load_robots(base_url: str, client: httpx.Client) -> urllib.robotparser.RobotFileParser:
+ rp = urllib.robotparser.RobotFileParser()
+ robots_url = urljoin(base_url, "/robots.txt")
+ try:
+ resp = client.get(robots_url, timeout=5.0)
+ rp.parse(resp.text.splitlines())
+ except httpx.RequestError:
+ pass
+ return rp
+
+
+def crawl(
+ url: Annotated[str, typer.Argument(help="Target URL to crawl.")],
+ depth: Annotated[int, typer.Option("--depth", "-d", help="Maximum crawl depth.")] = 2,
+ ignore_robots: Annotated[bool, typer.Option("--ignore-robots", help="Ignore robots.txt (use responsibly!).")] = False,
+ yes: Annotated[bool, typer.Option("--yes", "-y", help="Skip ethics confirmation.")] = False,
+ json_out: Annotated[bool, typer.Option("--json", help="Output as JSON.")] = False,
+ timeout: Annotated[float, typer.Option("--timeout", help="Request timeout in seconds.")] = 10.0,
+) -> None:
+ """
+ Crawl a website and extract emails, links, phone numbers, and API key patterns.
+
+ Examples:
+ devha crawl https://example.com
+ devha crawl https://target.local --depth 3
+ """
+ if not url.startswith(("http://", "https://")):
+ url = "https://" + url
+
+ host = urlparse(url).netloc
+ ethics_check(host, yes=yes)
+
+ if ignore_robots:
+ warn("--ignore-robots active. Ensure you have permission to crawl this site.")
+
+ info(f"Crawling [cyan]{url}[/cyan] depth=[cyan]{depth}[/cyan]")
+
+ found: dict[str, set[str]] = defaultdict(set)
+ visited: set[str] = set()
+ queue = [(url, 0)]
+
+ headers = {"User-Agent": "devha/0.1 crawler (educational)"}
+
+ with httpx.Client(headers=headers, timeout=timeout, follow_redirects=True) as client:
+ rp = _load_robots(url, client) if not ignore_robots else None
+
+ while queue:
+ current_url, current_depth = queue.pop(0)
+ if current_url in visited or current_depth > depth:
+ continue
+ visited.add(current_url)
+
+ if rp and not _can_fetch(rp, current_url):
+ warn(f"robots.txt blocks: {current_url}")
+ continue
+
+ try:
+ resp = client.get(current_url)
+ content_type = resp.headers.get("content-type", "")
+ except httpx.RequestError:
+ continue
+
+ if "text/html" in content_type:
+ soup = BeautifulSoup(resp.text, "html.parser")
+ text = resp.text
+
+ # Extract emails
+ for email in _EMAIL_RE.findall(text):
+ found["emails"].add(email)
+
+ # Extract phone numbers (basic)
+ for phone in _PHONE_RE.findall(text):
+ cleaned = phone.strip()
+ if len(cleaned) >= 9:
+ found["phones"].add(cleaned)
+
+ # Extract links
+ for tag in soup.find_all("a", href=True):
+ href = urljoin(current_url, tag["href"])
+ if href.startswith("http"):
+ parsed = urlparse(href)
+ if parsed.netloc in _SOCIAL_DOMAINS:
+ found["social"].add(href)
+ elif not _same_origin(url, href):
+ found["external_links"].add(href)
+ elif current_depth < depth:
+ queue.append((href, current_depth + 1))
+
+ elif "javascript" in content_type or current_url.endswith(".js"):
+ text = resp.text
+ for key in _API_KEY_RE.findall(text):
+ found["potential_keys"].add(key)
+
+ # Convert sets to sorted lists for output
+ result_dict = {k: sorted(v) for k, v in found.items()}
+
+ if json_out:
+ console.print_json(json.dumps({"url": url, "pages_crawled": len(visited), "results": result_dict}))
+ return
+
+ console.print(f"\n[blue]Crawled[/blue] [cyan]{len(visited)}[/cyan] page(s)\n")
+
+ def _render_panel(title: str, items: list[str], style: str = "cyan") -> None:
+ if not items:
+ return
+ content = Text()
+ for item in items[:50]:
+ content.append(f" • {item}\n")
+ if len(items) > 50:
+ content.append(f" … and {len(items) - 50} more\n", style="dim")
+ print_panel(content, title=f"{title} ({len(items)})", style=style)
+
+ _render_panel("Emails", result_dict.get("emails", []), "bright_green")
+ _render_panel("Phone Numbers", result_dict.get("phones", []), "blue")
+ _render_panel("Social Links", result_dict.get("social", []), "magenta")
+ _render_panel("External Links", result_dict.get("external_links", []), "cyan")
+
+ keys = result_dict.get("potential_keys", [])
+ if keys:
+ warn("Potential API key patterns found (may be false positives):")
+ _render_panel("Potential API Keys", keys, "yellow")
+
+ if not any(result_dict.values()):
+ console.print("[yellow]Nothing extracted.[/yellow]")
diff --git a/devha/commands/dirscan.py b/devha/commands/dirscan.py
new file mode 100644
index 0000000..74c2cec
--- /dev/null
+++ b/devha/commands/dirscan.py
@@ -0,0 +1,139 @@
+"""Dirscan command — directory/path brute-force scanner."""
+
+from __future__ import annotations
+
+import asyncio
+import json
+import time
+from importlib.resources import files
+from typing import Annotated
+from urllib.parse import urljoin
+
+import httpx
+import typer
+from rich.progress import Progress, SpinnerColumn, BarColumn, TaskProgressColumn, TimeElapsedColumn
+
+from devha.ui import console, make_table, error, warn, info
+from devha.ethics import ethics_check
+
+_INTERESTING_CODES = {200, 201, 204, 301, 302, 307, 308, 401, 403, 405}
+
+
+async def _check_path(
+ client: httpx.AsyncClient,
+ base_url: str,
+ path: str,
+ semaphore: asyncio.Semaphore,
+ rate_limiter: list[float],
+ rate: float,
+) -> dict | None:
+ async with semaphore:
+ # Simple rate limiting
+ now = time.time()
+ if rate_limiter and (now - rate_limiter[-1]) < (1.0 / rate):
+ await asyncio.sleep((1.0 / rate) - (now - rate_limiter[-1]))
+ rate_limiter.append(time.time())
+
+ url = urljoin(base_url.rstrip("/") + "/", path.lstrip("/"))
+ try:
+ resp = await client.head(url, timeout=5.0, follow_redirects=False)
+ if resp.status_code in _INTERESTING_CODES:
+ content_length = resp.headers.get("content-length", "-")
+ return {
+ "status": resp.status_code,
+ "size": content_length,
+ "path": "/" + path.lstrip("/"),
+ "url": url,
+ }
+ except httpx.RequestError:
+ pass
+ return None
+
+
+async def _run_scan(
+ base_url: str,
+ paths: list[str],
+ threads: int,
+ rate: float,
+ user_agent: str,
+) -> list[dict]:
+ semaphore = asyncio.Semaphore(threads)
+ rate_limiter: list[float] = []
+ headers = {"User-Agent": user_agent}
+
+ async with httpx.AsyncClient(headers=headers, follow_redirects=False) as client:
+ tasks = [_check_path(client, base_url, p, semaphore, rate_limiter, rate) for p in paths]
+ results = await asyncio.gather(*tasks)
+
+ return [r for r in results if r is not None]
+
+
+def dirscan(
+ url: Annotated[str, typer.Argument(help="Target URL (e.g. https://example.com).")],
+ threads: Annotated[int, typer.Option("--threads", help="Concurrent threads.")] = 50,
+ extensions: Annotated[str, typer.Option("--extensions", "-e", help="Extra extensions to append (e.g. php,html,txt).")] = "",
+ user_agent: Annotated[str, typer.Option("--user-agent", "-A", help="Custom User-Agent.")] = "devha/0.1 dirscan",
+ rate: Annotated[float, typer.Option("--rate", "-r", help="Max requests per second.")] = 10.0,
+ yes: Annotated[bool, typer.Option("--yes", "-y", help="Skip ethics confirmation.")] = False,
+ json_out: Annotated[bool, typer.Option("--json", help="Output as JSON.")] = False,
+) -> None:
+ """
+ Brute-force common paths on a web server.
+
+ Examples:
+ devha dirscan https://example.com
+ devha dirscan https://target.local --extensions php,html --threads 20
+ """
+ if not url.startswith(("http://", "https://")):
+ url = "https://" + url
+
+ from urllib.parse import urlparse
+ host = urlparse(url).netloc
+ ethics_check(host, yes=yes)
+
+ # Load paths
+ paths_data = files("devha.data").joinpath("common_paths.txt").read_text(encoding="utf-8")
+ base_paths = [p.strip() for p in paths_data.splitlines() if p.strip()]
+
+ # Expand with extensions
+ all_paths = list(base_paths)
+ if extensions:
+ exts = [e.strip().lstrip(".") for e in extensions.split(",") if e.strip()]
+ for path in base_paths:
+ if "." not in path.split("/")[-1]:
+ for ext in exts:
+ all_paths.append(f"{path}.{ext}")
+
+ info(f"Scanning [cyan]{url}[/cyan] with {len(all_paths)} paths "
+ f"[blue]threads[/blue]=[cyan]{threads}[/cyan] "
+ f"[blue]rate[/blue]=[cyan]{rate}/s[/cyan]\n")
+
+ results = asyncio.run(_run_scan(url, all_paths, threads, rate, user_agent))
+ results.sort(key=lambda x: x["status"])
+
+ if json_out:
+ console.print_json(json.dumps({"url": url, "results": results}))
+ return
+
+ if not results:
+ console.print(f"[yellow]Nothing interesting found at {url}.[/yellow]")
+ return
+
+ table = make_table("STATUS", "SIZE", "PATH", title=f"Dirscan results — {url}")
+ for r in results:
+ code = r["status"]
+ if code == 200:
+ code_style = "bright_green"
+ elif code in (301, 302, 307, 308):
+ code_style = "cyan"
+ elif code in (401, 403):
+ code_style = "yellow"
+ else:
+ code_style = "white"
+ table.add_row(
+ f"[{code_style}]{code}[/{code_style}]",
+ str(r["size"]),
+ f"[{code_style}]{r['path']}[/{code_style}]",
+ )
+ console.print(table)
+ console.print(f"\n[bright_green]Found {len(results)} interesting path(s).[/bright_green]")
diff --git a/devha/commands/dnsrecon.py b/devha/commands/dnsrecon.py
new file mode 100644
index 0000000..b99ad06
--- /dev/null
+++ b/devha/commands/dnsrecon.py
@@ -0,0 +1,102 @@
+"""DNS Reconnaissance — query all record types, zone transfer, SPF/DMARC."""
+
+from __future__ import annotations
+
+import json
+from typing import Annotated
+
+import typer
+
+from devha.ui import console, make_table, info, warn, error, success
+
+app = typer.Typer(help="🔍 DNS Recon — query records, zone transfer, email security.")
+
+
+def dnsrecon(
+ domain: Annotated[str, typer.Argument(help="Domain to query (e.g. example.com).")],
+ record_type: Annotated[str, typer.Option("--type", "-t", help="Record type: A,MX,NS,TXT,CNAME,SOA,AAAA or 'all'.")] = "all",
+ json_out: Annotated[bool, typer.Option("--json")] = False,
+) -> None:
+ """
+ DNS Reconnaissance — queries A, MX, NS, TXT, CNAME, SOA records.
+ Also tests zone transfer and analyses SPF/DMARC email security.
+
+ Example:
+ devha dnsrecon example.com
+ devha dnsrecon example.com --type MX
+ """
+ try:
+ import dns.resolver
+ import dns.zone
+ import dns.query
+ import dns.exception
+ except ImportError:
+ error("dnspython not installed. Run: pip install dnspython")
+ raise typer.Exit(1)
+
+ info(f"DNS Recon on [cyan]{domain}[/cyan]...\n")
+
+ results: dict[str, list[str]] = {}
+ record_types = ["A", "AAAA", "MX", "NS", "TXT", "CNAME", "SOA"] if record_type == "all" else [record_type.upper()]
+
+ for rtype in record_types:
+ try:
+ answers = dns.resolver.resolve(domain, rtype)
+ results[rtype] = [r.to_text() for r in answers]
+ except Exception:
+ results[rtype] = []
+
+ # Zone transfer attempt on each NS
+ zone_results: dict[str, list[str] | None] = {}
+ for ns in results.get("NS", [])[:3]:
+ ns_clean = ns.rstrip(".")
+ try:
+ zone = dns.zone.from_xfr(dns.query.xfr(ns_clean, domain, timeout=5))
+ zone_results[ns_clean] = [str(n) for n in zone.nodes.keys()][:20]
+ except Exception:
+ zone_results[ns_clean] = None
+
+ # SPF / DMARC
+ spf = next((r for r in results.get("TXT", []) if "v=spf1" in r.lower()), "")
+ dmarc = ""
+ try:
+ import dns.resolver as _r
+ dmarc_ans = _r.resolve(f"_dmarc.{domain}", "TXT")
+ dmarc = " ".join(r.to_text() for r in dmarc_ans)
+ except Exception:
+ pass
+
+ if json_out:
+ console.print_json(json.dumps({"domain": domain, "records": results, "zone_transfer": zone_results, "spf": spf, "dmarc": dmarc}))
+ return
+
+ # Print records
+ for rtype, records in results.items():
+ if records:
+ table = make_table("TYPE", "VALUE", title=f"DNS {rtype} — {domain}")
+ style_map = {"A": "cyan", "AAAA": "cyan", "MX": "bright_green", "NS": "yellow", "TXT": "magenta", "CNAME": "blue", "SOA": "dim"}
+ for r in records:
+ s = style_map.get(rtype, "white")
+ table.add_row(f"[{s}]{rtype}[/{s}]", r[:100])
+ console.print(table)
+
+ # Security analysis
+ console.print("\n[bold cyan]── Email Security ──[/bold cyan]")
+ if spf:
+ console.print(f" [bright_green]✔ SPF:[/bright_green] {spf[:80]}")
+ else:
+ console.print(" [bright_red]✘ SPF: Not configured — email spoofing possible![/bright_red]")
+
+ if dmarc:
+ console.print(f" [bright_green]✔ DMARC:[/bright_green] {dmarc[:80]}")
+ else:
+ console.print(" [bright_red]✘ DMARC: Not configured — phishing risk![/bright_red]")
+
+ # Zone transfer results
+ console.print("\n[bold cyan]── Zone Transfer ──[/bold cyan]")
+ for ns, names in zone_results.items():
+ if names:
+ warn(f"⚠️ ZONE TRANSFER SUCCEEDED on {ns}! Misconfiguration!")
+ console.print(f" Records: {', '.join(names[:8])}{'...' if len(names) > 8 else ''}")
+ else:
+ console.print(f" [dim]Blocked on {ns} ✔[/dim]")
diff --git a/devha/commands/harvest.py b/devha/commands/harvest.py
new file mode 100644
index 0000000..4ddd394
--- /dev/null
+++ b/devha/commands/harvest.py
@@ -0,0 +1,129 @@
+"""Harvest command — gather public info about a domain (mini-theHarvester)."""
+
+from __future__ import annotations
+
+import json
+import re
+from typing import Annotated
+from urllib.parse import quote_plus
+
+import httpx
+import typer
+from bs4 import BeautifulSoup
+from rich.text import Text
+
+from devha.ui import console, print_panel, warn, info, error
+
+_EMAIL_RE = re.compile(r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}")
+_CRTSH = "https://crt.sh/?q=%25.{}&output=json"
+_DDG_URL = "https://html.duckduckgo.com/html/?q={}"
+
+
+def _ddg_search(query: str, client: httpx.Client) -> str:
+ headers = {
+ "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:124.0) Gecko/20100101 Firefox/124.0",
+ "Accept": "text/html",
+ }
+ try:
+ resp = client.post(
+ "https://html.duckduckgo.com/html/",
+ data={"q": query},
+ headers=headers,
+ timeout=15.0,
+ )
+ return resp.text
+ except httpx.RequestError:
+ return ""
+
+
+def _harvest_emails(domain: str, client: httpx.Client) -> set[str]:
+ emails: set[str] = set()
+ html = _ddg_search(f'site:{domain} "@{domain}"', client)
+ if html:
+ emails.update(e for e in _EMAIL_RE.findall(html) if e.endswith(f"@{domain}"))
+ html2 = _ddg_search(f'"{domain}" email OR contact', client)
+ if html2:
+ emails.update(e for e in _EMAIL_RE.findall(html2) if domain in e)
+ return emails
+
+
+def _harvest_subdomains_crt(domain: str, client: httpx.Client) -> set[str]:
+ subs: set[str] = set()
+ try:
+ resp = client.get(_CRTSH.format(domain), timeout=15.0)
+ if resp.status_code == 200:
+ for entry in resp.json():
+ for name in entry.get("name_value", "").splitlines():
+ name = name.strip().lstrip("*.")
+ if name.endswith(domain):
+ subs.add(name)
+ except (httpx.RequestError, ValueError):
+ pass
+ return subs
+
+
+def _harvest_names(domain: str, client: httpx.Client) -> set[str]:
+ names: set[str] = set()
+ html = _ddg_search(f'site:linkedin.com "{domain}"', client)
+ if not html:
+ return names
+ soup = BeautifulSoup(html, "html.parser")
+ for result in soup.find_all("a", class_="result__a"):
+ text = result.get_text(strip=True)
+ # LinkedIn results often: "Name | Title | Company"
+ if "|" in text:
+ name_part = text.split("|")[0].strip()
+ if 3 < len(name_part) < 50 and " " in name_part:
+ names.add(name_part)
+ return names
+
+
+def harvest(
+ domain: Annotated[str, typer.Argument(help="Target domain (e.g. example.com).")],
+ json_out: Annotated[bool, typer.Option("--json", help="Output as JSON.")] = False,
+ timeout: Annotated[float, typer.Option("--timeout", help="Request timeout in seconds.")] = 15.0,
+) -> None:
+ """
+ Collect publicly available info about a domain.
+
+ Gathers emails, subdomains, and employee names from public sources only.
+ This tool only uses public information — use responsibly.
+
+ Examples:
+ devha harvest example.com
+ devha harvest github.com --json
+ """
+ warn("This tool only collects [bold]publicly available[/bold] information. Use responsibly.")
+ info(f"Harvesting public info for [cyan]{domain}[/cyan]...\n")
+
+ with httpx.Client(follow_redirects=True) as client:
+ emails = _harvest_emails(domain, client)
+ subs = _harvest_subdomains_crt(domain, client)
+ names = _harvest_names(domain, client)
+
+ if json_out:
+ console.print_json(json.dumps({
+ "domain": domain,
+ "emails": sorted(emails),
+ "subdomains": sorted(subs),
+ "names": sorted(names),
+ }))
+ return
+
+ def _panel(title: str, items: set[str], style: str = "cyan") -> None:
+ if not items:
+ console.print(f"[dim]{title}: nothing found[/dim]")
+ return
+ content = Text()
+ for item in sorted(items)[:100]:
+ content.append(f" • {item}\n")
+ print_panel(content, title=f"{title} ({len(items)})", style=style)
+
+ _panel("Emails", emails, "bright_green")
+ _panel("Subdomains", subs, "blue")
+ _panel("Employee Names (LinkedIn snippets)", names, "magenta")
+
+ console.print(
+ "\n[dim]⚠ All data sourced from public internet (DuckDuckGo, crt.sh). "
+ "No credentials or private systems accessed.[/dim]"
+ )
diff --git a/devha/commands/headers.py b/devha/commands/headers.py
new file mode 100644
index 0000000..dedb894
--- /dev/null
+++ b/devha/commands/headers.py
@@ -0,0 +1,103 @@
+"""Headers command — HTTP header inspector and security audit."""
+
+from __future__ import annotations
+
+import json
+from typing import Annotated
+
+import httpx
+import typer
+from rich.text import Text
+
+from devha.ui import console, make_table, print_panel, error, info
+
+_SECURITY_HEADERS = [
+ "content-security-policy",
+ "strict-transport-security",
+ "x-content-type-options",
+ "x-frame-options",
+ "referrer-policy",
+ "permissions-policy",
+]
+
+_HEADER_DESCRIPTIONS = {
+ "content-security-policy": "Prevents XSS and data injection attacks",
+ "strict-transport-security": "Forces HTTPS connections (HSTS)",
+ "x-content-type-options": "Prevents MIME-type sniffing",
+ "x-frame-options": "Prevents clickjacking attacks",
+ "referrer-policy": "Controls referrer information leakage",
+ "permissions-policy": "Controls browser feature access",
+}
+
+
+def headers(
+ url: Annotated[str, typer.Argument(help="Target URL (include https://).")],
+ json_out: Annotated[bool, typer.Option("--json", help="Output as JSON.")] = False,
+ timeout: Annotated[float, typer.Option("--timeout", help="Request timeout in seconds.")] = 10.0,
+) -> None:
+ """
+ Inspect HTTP headers and audit security headers.
+
+ Examples:
+ devha headers https://example.com
+ devha headers https://github.com --json
+ """
+ if not url.startswith(("http://", "https://")):
+ url = "https://" + url
+
+ info(f"Fetching headers from [cyan]{url}[/cyan]...")
+
+ try:
+ with httpx.Client(follow_redirects=True, timeout=timeout) as client:
+ resp = client.get(url)
+ except httpx.RequestError as exc:
+ error(f"Request failed: {exc}")
+ raise typer.Exit(1)
+
+ all_headers = dict(resp.headers)
+ lower_headers = {k.lower(): v for k, v in all_headers.items()}
+
+ present = [h for h in _SECURITY_HEADERS if h in lower_headers]
+ missing = [h for h in _SECURITY_HEADERS if h not in lower_headers]
+ score = len(present)
+ total = len(_SECURITY_HEADERS)
+
+ if json_out:
+ console.print_json(json.dumps({
+ "url": url,
+ "status_code": resp.status_code,
+ "headers": all_headers,
+ "security_score": f"{score}/{total}",
+ "present": present,
+ "missing": missing,
+ }))
+ return
+
+ # All headers table
+ hdr_table = make_table("HEADER", "VALUE", title=f"Response Headers — {url}")
+ for k, v in all_headers.items():
+ display_val = v if len(v) <= 80 else v[:77] + "..."
+ style = "bright_green" if k.lower() in _SECURITY_HEADERS else "white"
+ hdr_table.add_row(f"[{style}]{k}[/{style}]", display_val)
+ console.print(hdr_table)
+
+ # Security audit panel
+ score_color = "bright_green" if score >= 5 else "yellow" if score >= 3 else "bright_red"
+ score_text = Text()
+ score_text.append(f"Security Score: {score}/{total} ", style="bold")
+ score_text.append("★" * score + "☆" * (total - score), style=score_color)
+ score_text.append("\n")
+
+ if present:
+ score_text.append("\n✔ Present:\n", style="bright_green bold")
+ for h in present:
+ score_text.append(f" • {h}\n", style="bright_green")
+
+ if missing:
+ score_text.append("\n✘ Missing:\n", style="bright_red bold")
+ for h in missing:
+ desc = _HEADER_DESCRIPTIONS.get(h, "")
+ score_text.append(f" • {h}", style="bright_red")
+ score_text.append(f" — {desc}\n", style="dim")
+
+ print_panel(score_text, title="Security Audit", style=score_color)
diff --git a/devha/commands/ipinfo.py b/devha/commands/ipinfo.py
new file mode 100644
index 0000000..82d2a51
--- /dev/null
+++ b/devha/commands/ipinfo.py
@@ -0,0 +1,76 @@
+"""IP Intelligence — geolocation, ASN, ISP, proxy/VPN detection."""
+
+from __future__ import annotations
+
+import json
+import socket
+from typing import Annotated
+
+import httpx
+import typer
+
+from devha.ui import console, make_table, info, error
+
+
+def ipinfo(
+ target: Annotated[str, typer.Argument(help="IP address or hostname.")],
+ json_out: Annotated[bool, typer.Option("--json")] = False,
+) -> None:
+ """
+ IP / Domain Intelligence — location, ISP, ASN, proxy/VPN detection.
+
+ Uses ip-api.com (free, no API key needed).
+
+ Example:
+ devha ipinfo 8.8.8.8
+ devha ipinfo github.com
+ """
+ try:
+ ip = socket.gethostbyname(target)
+ except socket.gaierror:
+ error(f"Cannot resolve: {target}")
+ raise typer.Exit(1)
+
+ if ip != target:
+ info(f"Resolved [cyan]{target}[/cyan] → [cyan]{ip}[/cyan]")
+
+ info(f"Looking up [cyan]{ip}[/cyan]...")
+
+ try:
+ resp = httpx.get(
+ f"http://ip-api.com/json/{ip}",
+ params={"fields": "status,message,country,countryCode,regionName,city,zip,lat,lon,timezone,isp,org,as,proxy,hosting,query"},
+ timeout=10,
+ )
+ data = resp.json()
+ except Exception as e:
+ error(f"API error: {e}")
+ raise typer.Exit(1)
+
+ if data.get("status") != "success":
+ error(f"Lookup failed: {data.get('message', 'unknown')}")
+ raise typer.Exit(1)
+
+ if json_out:
+ console.print_json(json.dumps(data))
+ return
+
+ code = data.get("countryCode", "")
+ flag = (chr(0x1F1E6 + ord(code[0]) - 65) + chr(0x1F1E6 + ord(code[1]) - 65)) if len(code) == 2 else "🌐"
+
+ table = make_table("FIELD", "VALUE", title=f"🌍 IP Intelligence — {data.get('query', ip)}")
+ table.add_row("[cyan]IP Address[/cyan]", data.get("query", ip))
+ table.add_row("[cyan]Country[/cyan]", f"{flag} {data.get('country','?')} ({code})")
+ table.add_row("[cyan]Region[/cyan]", data.get("regionName", "?"))
+ table.add_row("[cyan]City[/cyan]", data.get("city", "?"))
+ table.add_row("[cyan]Coordinates[/cyan]", f"{data.get('lat','?')}, {data.get('lon','?')}")
+ table.add_row("[cyan]Timezone[/cyan]", data.get("timezone", "?"))
+ table.add_row("[cyan]ISP[/cyan]", data.get("isp", "?"))
+ table.add_row("[cyan]Organization[/cyan]", data.get("org", "?"))
+ table.add_row("[cyan]ASN[/cyan]", data.get("as", "?"))
+
+ proxy = data.get("proxy", False)
+ hosting = data.get("hosting", False)
+ table.add_row("[cyan]Proxy / VPN[/cyan]", f"[bright_red]YES — anonymizer[/bright_red]" if proxy else "[bright_green]No[/bright_green]")
+ table.add_row("[cyan]Datacenter[/cyan]", f"[yellow]YES — hosting/DC IP[/yellow]" if hosting else "[bright_green]No[/bright_green]")
+ console.print(table)
diff --git a/devha/commands/ping.py b/devha/commands/ping.py
new file mode 100644
index 0000000..05687d0
--- /dev/null
+++ b/devha/commands/ping.py
@@ -0,0 +1,88 @@
+"""Ping command — uses system ping, no root required."""
+
+from __future__ import annotations
+
+import json
+import platform
+import re
+import subprocess
+from typing import Annotated
+
+import typer
+
+from devha.ui import console, print_panel, error, info
+
+
+def ping(
+ host: Annotated[str, typer.Argument(help="Target hostname or IP.")],
+ count: Annotated[int, typer.Option("--count", "-c", help="Number of pings.")] = 4,
+ timeout: Annotated[float, typer.Option("--timeout", help="Timeout per ping in seconds.")] = 2.0,
+ json_out: Annotated[bool, typer.Option("--json", help="Output as JSON.")] = False,
+) -> None:
+ """
+ ICMP ping — shows RTT, TTL, and packet loss.
+
+ Examples:
+ devha ping 8.8.8.8
+ devha ping example.com --count 10
+ """
+ info(f"Pinging [cyan]{host}[/cyan] ({count} packets)...\n")
+
+ system = platform.system()
+ if system == "Darwin":
+ cmd = ["ping", "-c", str(count), "-W", str(int(timeout * 1000)), host]
+ elif system == "Windows":
+ cmd = ["ping", "-n", str(count), "-w", str(int(timeout * 1000)), host]
+ else:
+ cmd = ["ping", "-c", str(count), "-W", str(int(timeout)), host]
+
+ try:
+ result = subprocess.run(cmd, capture_output=True, text=True)
+ output = result.stdout + result.stderr
+ except FileNotFoundError:
+ error("'ping' command not found on this system.")
+ raise typer.Exit(1)
+
+ # Parse individual replies
+ pattern = re.compile(r"icmp_seq=?(\d+).*?ttl=(\d+).*?time=([\d.]+)\s*ms", re.IGNORECASE)
+ results = []
+ rtts = []
+
+ for line in output.splitlines():
+ m = pattern.search(line)
+ if m:
+ seq = int(m.group(1))
+ ttl = int(m.group(2))
+ rtt = float(m.group(3))
+ rtts.append(rtt)
+ results.append({"seq": seq, "status": "reply", "rtt_ms": rtt, "ttl": ttl})
+ console.print(
+ f" [{seq}] [bright_green]Reply[/bright_green] "
+ f"ttl={ttl} rtt=[cyan]{rtt:.2f}ms[/cyan]"
+ )
+ elif re.search(r"(request timeout|no route|unreachable|timed out)", line, re.IGNORECASE):
+ seq = len(results) + 1
+ results.append({"seq": seq, "status": "timeout", "rtt_ms": None, "ttl": None})
+ console.print(f" [{seq}] [yellow]Request timeout[/yellow]")
+
+ if not results:
+ console.print(output)
+
+ if json_out:
+ console.print_json(json.dumps({"host": host, "results": results}))
+ return
+
+ sent = count
+ received = len(rtts)
+ loss = round((sent - received) / sent * 100) if sent else 0
+ avg_rtt = round(sum(rtts) / len(rtts), 2) if rtts else 0
+ min_rtt = round(min(rtts), 2) if rtts else 0
+ max_rtt = round(max(rtts), 2) if rtts else 0
+
+ summary = (
+ f"[bold]Sent:[/bold] {sent} "
+ f"[bold]Received:[/bold] [bright_green]{received}[/bright_green] "
+ f"[bold]Loss:[/bold] [{'bright_red' if loss > 0 else 'bright_green'}]{loss}%[/{'bright_red' if loss > 0 else 'bright_green'}] "
+ f"[bold]RTT:[/bold] min={min_rtt}ms avg={avg_rtt}ms max={max_rtt}ms"
+ )
+ print_panel(summary, title=f"Ping Summary — {host}")
diff --git a/devha/commands/portscan.py b/devha/commands/portscan.py
new file mode 100644
index 0000000..2026376
--- /dev/null
+++ b/devha/commands/portscan.py
@@ -0,0 +1,115 @@
+"""Portscan command — mini nmap using sockets and threading."""
+
+from __future__ import annotations
+
+import json
+import socket
+import concurrent.futures
+from typing import Annotated
+
+import typer
+from rich.progress import Progress, SpinnerColumn, BarColumn, TaskProgressColumn, TimeElapsedColumn
+
+from devha.ui import console, make_table, error
+from devha.ethics import ethics_check
+
+
+def _parse_port_range(ports: str) -> list[int]:
+ result: list[int] = []
+ for part in ports.split(","):
+ part = part.strip()
+ if "-" in part:
+ lo, hi = part.split("-", 1)
+ result.extend(range(int(lo), int(hi) + 1))
+ else:
+ result.append(int(part))
+ return result
+
+
+def _get_service(port: int) -> str:
+ try:
+ return socket.getservbyport(port)
+ except OSError:
+ return "unknown"
+
+
+def _scan_port(host: str, port: int, timeout: float) -> dict | None:
+ try:
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+ s.settimeout(timeout)
+ result = s.connect_ex((host, port))
+ if result == 0:
+ return {"port": port, "status": "open", "service": _get_service(port)}
+ except (socket.gaierror, OSError):
+ pass
+ return None
+
+
+def portscan(
+ target: Annotated[str, typer.Argument(help="Target IP address or hostname.")],
+ ports: Annotated[str, typer.Option("--ports", "-p", help="Port range, e.g. 1-1024 or 22,80,443.")] = "1-1024",
+ threads: Annotated[int, typer.Option("--threads", help="Number of concurrent threads.")] = 100,
+ timeout: Annotated[float, typer.Option("--timeout", help="Socket timeout in seconds.")] = 1.0,
+ yes: Annotated[bool, typer.Option("--yes", "-y", help="Skip ethics confirmation.")] = False,
+ json_out: Annotated[bool, typer.Option("--json", help="Output as JSON.")] = False,
+) -> None:
+ """
+ Scan open ports on a host.
+
+ Examples:
+ devha portscan 192.168.1.1
+ devha portscan scanme.nmap.org --ports 1-1000 --threads 200
+ devha portscan 10.0.0.1 --ports 22,80,443,8080 --yes
+ """
+ ethics_check(target, yes=yes)
+
+ try:
+ resolved = socket.gethostbyname(target)
+ except socket.gaierror:
+ error(f"Cannot resolve host: {target}")
+ raise typer.Exit(1)
+
+ port_list = _parse_port_range(ports)
+ open_ports: list[dict] = []
+
+ console.print(f"[blue]Scanning[/blue] [cyan]{target}[/cyan] ({resolved}) "
+ f"[blue]ports[/blue] [cyan]{ports}[/cyan] "
+ f"[blue]threads[/blue] [cyan]{threads}[/cyan]\n")
+
+ with Progress(
+ SpinnerColumn(),
+ "[progress.description]{task.description}",
+ BarColumn(),
+ TaskProgressColumn(),
+ TimeElapsedColumn(),
+ console=console,
+ ) as progress:
+ task = progress.add_task("[cyan]Scanning...", total=len(port_list))
+
+ with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
+ futures = {executor.submit(_scan_port, resolved, p, timeout): p for p in port_list}
+ for future in concurrent.futures.as_completed(futures):
+ result = future.result()
+ if result:
+ open_ports.append(result)
+ progress.advance(task)
+
+ open_ports.sort(key=lambda x: x["port"])
+
+ if json_out:
+ console.print_json(json.dumps({"target": target, "resolved": resolved, "open_ports": open_ports}))
+ return
+
+ if not open_ports:
+ console.print(f"\n[yellow]No open ports found on {target} in range {ports}.[/yellow]")
+ return
+
+ table = make_table("PORT", "STATUS", "SERVICE", title=f"Open ports on {target}")
+ for p in open_ports:
+ table.add_row(
+ f"[cyan]{p['port']}[/cyan]",
+ "[bright_green]OPEN[/bright_green]",
+ p["service"],
+ )
+ console.print(table)
+ console.print(f"\n[bright_green]Found {len(open_ports)} open port(s).[/bright_green]")
diff --git a/devha/commands/sslcheck.py b/devha/commands/sslcheck.py
new file mode 100644
index 0000000..6f01bfa
--- /dev/null
+++ b/devha/commands/sslcheck.py
@@ -0,0 +1,114 @@
+"""SSL/TLS Certificate Inspector."""
+
+from __future__ import annotations
+
+import datetime
+import json
+import socket
+import ssl
+from typing import Annotated
+
+import typer
+
+from devha.ui import console, make_table, info, warn, error, success
+
+
+def sslcheck(
+ host: Annotated[str, typer.Argument(help="Hostname (e.g. github.com).")],
+ port: Annotated[int, typer.Option("--port", "-p")] = 443,
+ json_out: Annotated[bool, typer.Option("--json")] = False,
+) -> None:
+ """
+ SSL/TLS certificate inspector — expiry, issuer, SANs, protocol, ciphers.
+
+ Example:
+ devha sslcheck github.com
+ devha sslcheck expired.badssl.com
+ """
+ host = host.replace("https://", "").replace("http://", "").split("/")[0]
+ info(f"Checking SSL/TLS on [cyan]{host}:{port}[/cyan]...")
+
+ cert = proto = cipher = None
+ verified = True
+
+ ctx = ssl.create_default_context()
+ try:
+ with socket.create_connection((host, port), timeout=10) as sock:
+ with ctx.wrap_socket(sock, server_hostname=host) as ssock:
+ cert = ssock.getpeercert()
+ proto = ssock.version()
+ cipher = ssock.cipher()
+ except ssl.SSLCertVerificationError as e:
+ verified = False
+ warn(f"Certificate verification failed: {e}")
+ ctx2 = ssl.create_default_context()
+ ctx2.check_hostname = False
+ ctx2.verify_mode = ssl.CERT_NONE
+ try:
+ with socket.create_connection((host, port), timeout=10) as sock:
+ with ctx2.wrap_socket(sock, server_hostname=host) as ssock:
+ cert = ssock.getpeercert()
+ proto = ssock.version()
+ cipher = ssock.cipher()
+ except Exception as e2:
+ error(f"Cannot connect: {e2}")
+ raise typer.Exit(1)
+ except Exception as e:
+ error(f"Cannot connect to {host}:{port} — {e}")
+ raise typer.Exit(1)
+
+ if not cert:
+ error("No certificate returned.")
+ raise typer.Exit(1)
+
+ subject = dict(x[0] for x in cert.get("subject", []))
+ issuer = dict(x[0] for x in cert.get("issuer", []))
+ not_after = cert.get("notAfter", "")
+ not_before = cert.get("notBefore", "")
+ sans = [v for t, v in cert.get("subjectAltName", []) if t == "DNS"]
+
+ expiry = datetime.datetime.strptime(not_after, "%b %d %H:%M:%S %Y %Z")
+ days_left = (expiry - datetime.datetime.utcnow()).days
+
+ if json_out:
+ console.print_json(json.dumps({
+ "host": host, "port": port, "verified": verified,
+ "subject": subject, "issuer": issuer,
+ "not_before": not_before, "not_after": not_after,
+ "days_until_expiry": days_left, "sans": sans,
+ "protocol": proto, "cipher": cipher[0] if cipher else None,
+ }))
+ return
+
+ table = make_table("FIELD", "VALUE", title=f"🔒 SSL Certificate — {host}")
+ table.add_row("[cyan]Common Name[/cyan]", subject.get("commonName", "?"))
+ table.add_row("[cyan]Organization[/cyan]", subject.get("organizationName", "?"))
+ table.add_row("[cyan]Issuer[/cyan]", issuer.get("organizationName", "?"))
+ table.add_row("[cyan]Valid From[/cyan]", not_before)
+
+ exp_s = "bright_red" if days_left < 14 else "yellow" if days_left < 30 else "bright_green"
+ table.add_row("[cyan]Expires[/cyan]", f"[{exp_s}]{not_after} ({days_left}d left)[/{exp_s}]")
+
+ proto_s = "bright_green" if proto in ("TLSv1.3", "TLSv1.2") else "bright_red"
+ table.add_row("[cyan]Protocol[/cyan]", f"[{proto_s}]{proto}[/{proto_s}]")
+ table.add_row("[cyan]Cipher[/cyan]", cipher[0] if cipher else "?")
+ table.add_row("[cyan]SANs[/cyan]", ", ".join(sans[:5]) + ("…" if len(sans) > 5 else ""))
+ table.add_row("[cyan]Verified[/cyan]", "[bright_green]Yes[/bright_green]" if verified else "[bright_red]No — self-signed or invalid[/bright_red]")
+ console.print(table)
+
+ console.print("\n[bold]Security Verdict:[/bold]")
+ if days_left < 0:
+ console.print(" [bright_red]✘ EXPIRED![/bright_red]")
+ elif days_left < 14:
+ console.print(f" [bright_red]⚠ Expiring in {days_left} days![/bright_red]")
+ elif days_left < 30:
+ console.print(f" [yellow]⚠ Expiring soon ({days_left} days)[/yellow]")
+ else:
+ console.print(f" [bright_green]✔ Valid — {days_left} days remaining[/bright_green]")
+
+ if proto == "TLSv1.3":
+ console.print(" [bright_green]✔ TLS 1.3 — best available[/bright_green]")
+ elif proto == "TLSv1.2":
+ console.print(" [bright_green]✔ TLS 1.2 — good[/bright_green]")
+ else:
+ console.print(f" [bright_red]✘ {proto} — deprecated and vulnerable![/bright_red]")
diff --git a/devha/commands/subdomains.py b/devha/commands/subdomains.py
new file mode 100644
index 0000000..55cde52
--- /dev/null
+++ b/devha/commands/subdomains.py
@@ -0,0 +1,142 @@
+"""Subdomains command — find subdomains via wordlist, crt.sh, and HackerTarget."""
+
+from __future__ import annotations
+
+import asyncio
+import json
+import socket
+from importlib.resources import files
+from typing import Annotated
+
+import httpx
+import typer
+
+from devha.ui import console, make_table, info, warn, error
+
+_HACKERTARGET = "https://api.hackertarget.com/hostsearch/?q={}"
+_CRTSH = "https://crt.sh/?q=%25.{}&output=json"
+
+
+def _dns_resolve(hostname: str) -> str | None:
+ try:
+ return socket.gethostbyname(hostname)
+ except socket.gaierror:
+ return None
+
+
+async def _wordlist_scan(domain: str, client: httpx.AsyncClient) -> list[dict]:
+ wordlist_data = files("devha.data").joinpath("subdomains.txt").read_text(encoding="utf-8")
+ words = [w.strip() for w in wordlist_data.splitlines() if w.strip()]
+ results = []
+
+ async def check(word: str) -> dict | None:
+ hostname = f"{word}.{domain}"
+ ip = await asyncio.get_event_loop().run_in_executor(None, _dns_resolve, hostname)
+ if ip:
+ return {"subdomain": hostname, "ip": ip, "method": "wordlist"}
+ return None
+
+ tasks = [check(w) for w in words]
+ found = await asyncio.gather(*tasks)
+ results.extend(r for r in found if r)
+ return results
+
+
+async def _crtsh_scan(domain: str, client: httpx.AsyncClient) -> list[dict]:
+ results = []
+ try:
+ resp = await client.get(_CRTSH.format(domain), timeout=15.0)
+ if resp.status_code == 200:
+ entries = resp.json()
+ seen: set[str] = set()
+ for entry in entries:
+ name = entry.get("name_value", "")
+ for sub in name.splitlines():
+ sub = sub.strip().lstrip("*.")
+ if sub.endswith(domain) and sub not in seen:
+ seen.add(sub)
+ ip = await asyncio.get_event_loop().run_in_executor(None, _dns_resolve, sub)
+ results.append({"subdomain": sub, "ip": ip or "N/A", "method": "crt.sh"})
+ except (httpx.RequestError, ValueError):
+ warn("crt.sh request failed — skipping.")
+ return results
+
+
+async def _hackertarget_scan(domain: str, client: httpx.AsyncClient) -> list[dict]:
+ results = []
+ try:
+ resp = await client.get(_HACKERTARGET.format(domain), timeout=15.0)
+ if resp.status_code == 200 and "error" not in resp.text.lower()[:20]:
+ for line in resp.text.strip().splitlines():
+ if "," in line:
+ sub, ip = line.split(",", 1)
+ results.append({"subdomain": sub.strip(), "ip": ip.strip(), "method": "hackertarget"})
+ except httpx.RequestError:
+ warn("HackerTarget request failed — skipping.")
+ return results
+
+
+async def _run_all(domain: str, method: str) -> list[dict]:
+ async with httpx.AsyncClient() as client:
+ tasks = []
+ if method in ("all", "wordlist"):
+ tasks.append(_wordlist_scan(domain, client))
+ if method in ("all", "crt"):
+ tasks.append(_crtsh_scan(domain, client))
+ if method in ("all", "hackertarget"):
+ tasks.append(_hackertarget_scan(domain, client))
+
+ all_results: list[dict] = []
+ for coro_results in await asyncio.gather(*tasks):
+ all_results.extend(coro_results)
+
+ # Deduplicate by subdomain
+ seen: set[str] = set()
+ deduped = []
+ for r in all_results:
+ if r["subdomain"] not in seen:
+ seen.add(r["subdomain"])
+ deduped.append(r)
+ return sorted(deduped, key=lambda x: x["subdomain"])
+
+
+def subdomains(
+ domain: Annotated[str, typer.Argument(help="Target domain (e.g. example.com).")],
+ method: Annotated[str, typer.Option("--method", "-m", help="[wordlist|crt|hackertarget|all]")] = "all",
+ json_out: Annotated[bool, typer.Option("--json", help="Output as JSON.")] = False,
+) -> None:
+ """
+ Discover subdomains via wordlist brute-force, crt.sh, and HackerTarget.
+
+ Examples:
+ devha subdomains example.com
+ devha subdomains example.com --method crt
+ devha subdomains tesla.com --method wordlist
+ """
+ valid_methods = {"all", "wordlist", "crt", "hackertarget"}
+ if method not in valid_methods:
+ error(f"Unknown method '{method}'. Choose: {', '.join(valid_methods)}")
+ raise typer.Exit(1)
+
+ info(f"Scanning subdomains for [cyan]{domain}[/cyan] using method=[cyan]{method}[/cyan]...")
+
+ results = asyncio.run(_run_all(domain, method))
+
+ if json_out:
+ console.print_json(json.dumps({"domain": domain, "results": results}))
+ return
+
+ if not results:
+ console.print(f"[yellow]No subdomains found for {domain}.[/yellow]")
+ return
+
+ table = make_table("SUBDOMAIN", "IP", "METHOD", title=f"Subdomains of {domain}")
+ for r in results:
+ method_color = {"wordlist": "cyan", "crt.sh": "blue", "hackertarget": "magenta"}.get(r["method"], "white")
+ table.add_row(
+ f"[bright_green]{r['subdomain']}[/bright_green]",
+ r["ip"],
+ f"[{method_color}]{r['method']}[/{method_color}]",
+ )
+ console.print(table)
+ console.print(f"\n[bright_green]Found {len(results)} subdomain(s).[/bright_green]")
diff --git a/devha/commands/username.py b/devha/commands/username.py
new file mode 100644
index 0000000..e582e67
--- /dev/null
+++ b/devha/commands/username.py
@@ -0,0 +1,104 @@
+"""Username command — check username existence on 50+ platforms."""
+
+from __future__ import annotations
+
+import asyncio
+import json
+from importlib.resources import files
+from typing import Annotated
+
+import httpx
+import typer
+
+from devha.ui import console, make_table, info
+
+
+def _load_sites() -> dict:
+ data = files("devha.data").joinpath("sites.json").read_text(encoding="utf-8")
+ return json.loads(data)
+
+
+async def _check_site(
+ client: httpx.AsyncClient,
+ site_name: str,
+ template: dict,
+ username: str,
+ timeout: float,
+) -> dict:
+ url = template["url"].replace("{}", username)
+ error_code = template.get("error_code", 404)
+ try:
+ resp = await client.get(url, timeout=timeout, follow_redirects=True)
+ if resp.status_code == error_code:
+ status = "not_found"
+ elif resp.status_code < 400:
+ status = "found"
+ else:
+ status = f"error_{resp.status_code}"
+ except httpx.TimeoutException:
+ status = "timeout"
+ except httpx.RequestError:
+ status = "error"
+ return {"site": site_name, "status": status, "url": url}
+
+
+async def _run_checks(username: str, sites: dict, site_filter: list[str], timeout: float) -> list[dict]:
+ if site_filter:
+ sites = {k: v for k, v in sites.items() if k.lower() in [s.lower() for s in site_filter]}
+
+ headers = {
+ "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:124.0) Gecko/20100101 Firefox/124.0"
+ }
+ async with httpx.AsyncClient(headers=headers) as client:
+ tasks = [_check_site(client, name, tmpl, username, timeout) for name, tmpl in sites.items()]
+ return await asyncio.gather(*tasks)
+
+
+def username(
+ name: Annotated[str, typer.Argument(help="Username to search for.")],
+ sites: Annotated[str, typer.Option("--sites", "-s", help="Comma-separated site names (default: all).")] = "",
+ timeout: Annotated[float, typer.Option("--timeout", help="Request timeout in seconds.")] = 5.0,
+ found_only: Annotated[bool, typer.Option("--found", help="Show only found accounts.")] = False,
+ json_out: Annotated[bool, typer.Option("--json", help="Output as JSON.")] = False,
+) -> None:
+ """
+ Check if a username exists on 50+ platforms.
+
+ Examples:
+ devha username torvalds
+ devha username coolkid42 --sites github,reddit,twitter
+ devha username hacker --found
+ """
+ all_sites = _load_sites()
+ site_filter = [s.strip() for s in sites.split(",") if s.strip()] if sites else []
+
+ info(f"Checking username [cyan]{name}[/cyan] across {len(site_filter) or len(all_sites)} sites...")
+
+ results = asyncio.run(_run_checks(name, all_sites, site_filter, timeout))
+ results.sort(key=lambda x: (x["status"] != "found", x["site"].lower()))
+
+ if found_only:
+ results = [r for r in results if r["status"] == "found"]
+
+ if json_out:
+ console.print_json(json.dumps({"username": name, "results": results}))
+ return
+
+ table = make_table("SITE", "STATUS", "URL", title=f"Username: {name}")
+ for r in results:
+ st = r["status"]
+ if st == "found":
+ status_str = "[bright_green]✔ FOUND[/bright_green]"
+ elif st == "not_found":
+ status_str = "[red]✘ NOT FOUND[/red]"
+ elif st == "timeout":
+ status_str = "[yellow]⚠ TIMEOUT[/yellow]"
+ else:
+ status_str = f"[yellow]⚠ {st.upper()}[/yellow]"
+
+ url_str = f"[cyan]{r['url']}[/cyan]" if st == "found" else r["url"]
+ table.add_row(r["site"], status_str, url_str)
+
+ console.print(table)
+ found_count = sum(1 for r in results if r["status"] == "found")
+ console.print(f"\n[bright_green]Found on {found_count}[/bright_green] / {len(results)} platform(s).")
diff --git a/devha/commands/webvuln.py b/devha/commands/webvuln.py
new file mode 100644
index 0000000..e6ec94b
--- /dev/null
+++ b/devha/commands/webvuln.py
@@ -0,0 +1,170 @@
+"""Web Vulnerability Scanner — exposed files, misconfigs, security headers."""
+
+from __future__ import annotations
+
+import asyncio
+import json
+from typing import Annotated
+from urllib.parse import urljoin, urlparse
+
+import httpx
+import typer
+
+from devha.ui import console, make_table, info, warn, error, success
+
+
+def webvuln(
+ url: Annotated[str, typer.Argument(help="Target URL (only scan sites you own).")],
+ json_out: Annotated[bool, typer.Option("--json")] = False,
+) -> None:
+ """
+ Web vulnerability scanner — exposed files, CORS, headers, info leaks.
+
+ Checks: .env, .git, phpinfo, backup files, security headers,
+ server version disclosure, CORS wildcard, directory listing.
+
+ ⚠ Only scan sites you own or have explicit permission to test!
+
+ Example:
+ devha webvuln https://example.com
+ """
+ if not url.startswith("http"):
+ url = "https://" + url
+
+ info(f"Scanning [cyan]{url}[/cyan]...")
+ warn("⚡ Only scan sites you own or have explicit permission to test!")
+ console.print()
+
+ findings = asyncio.run(_run_checks(url))
+
+ if json_out:
+ console.print_json(json.dumps({"url": url, "findings": findings}))
+ return
+
+ if not findings:
+ success("No issues found.")
+ return
+
+ order = {"HIGH": 0, "MEDIUM": 1, "LOW": 2, "INFO": 3}
+ findings.sort(key=lambda f: order.get(f["severity"], 9))
+
+ table = make_table("SEVERITY", "FINDING", "DETAIL", title=f"🕸️ Web Vuln Scan — {url}")
+ styles = {"HIGH": "bright_red bold", "MEDIUM": "yellow", "LOW": "cyan", "INFO": "dim"}
+ for f in findings:
+ s = styles.get(f["severity"], "white")
+ table.add_row(f"[{s}]{f['severity']}[/{s}]", f["name"], f["detail"][:80])
+ console.print(table)
+
+ counts = {k: sum(1 for f in findings if f["severity"] == k) for k in ("HIGH", "MEDIUM", "LOW", "INFO")}
+ console.print(
+ f"\n[bright_red bold]HIGH: {counts['HIGH']}[/bright_red bold] "
+ f"[yellow]MEDIUM: {counts['MEDIUM']}[/yellow] "
+ f"[cyan]LOW: {counts['LOW']}[/cyan] "
+ f"[dim]INFO: {counts['INFO']}[/dim]"
+ )
+
+
+async def _run_checks(url: str) -> list[dict]:
+ findings: list[dict] = []
+
+ async with httpx.AsyncClient(
+ timeout=10,
+ follow_redirects=True,
+ headers={"User-Agent": "Mozilla/5.0 (devha security scanner — authorized use only)"},
+ ) as client:
+ try:
+ resp = await client.get(url)
+ except Exception as e:
+ error(f"Cannot reach {url}: {e}")
+ return []
+
+ headers = {k.lower(): v for k, v in resp.headers.items()}
+ body = resp.text.lower()
+
+ # Security headers
+ missing_headers = [
+ ("strict-transport-security", "MEDIUM", "Missing HSTS — HTTPS not enforced"),
+ ("content-security-policy", "MEDIUM", "Missing CSP — XSS protection absent"),
+ ("x-frame-options", "MEDIUM", "Missing X-Frame-Options — clickjacking risk"),
+ ("x-content-type-options", "LOW", "Missing X-Content-Type-Options"),
+ ("referrer-policy", "LOW", "Missing Referrer-Policy"),
+ ("permissions-policy", "LOW", "Missing Permissions-Policy"),
+ ]
+ for h, sev, desc in missing_headers:
+ if h not in headers:
+ findings.append({"severity": sev, "name": f"Missing: {h}", "detail": desc})
+
+ # Server version disclosure
+ server = headers.get("server", "")
+ if server and any(v in server.lower() for v in ["apache/", "nginx/", "iis/", "lighttpd/"]):
+ findings.append({"severity": "LOW", "name": "Server Version Exposed", "detail": f"Server: {server}"})
+
+ if headers.get("x-powered-by"):
+ findings.append({"severity": "LOW", "name": "X-Powered-By Exposed", "detail": headers["x-powered-by"]})
+
+ # CORS
+ cors = headers.get("access-control-allow-origin", "")
+ if cors == "*":
+ findings.append({"severity": "MEDIUM", "name": "Wildcard CORS", "detail": "Any origin can read responses — possible data theft"})
+
+ # Directory listing
+ if "index of" in body or "directory listing for" in body:
+ findings.append({"severity": "HIGH", "name": "Directory Listing Enabled", "detail": "Server exposes file/directory listings"})
+
+ # Error disclosure
+ for pattern in ["stack trace", "sql syntax", "mysql_query", "pg_query", "ora-0", "exception in thread", "traceback (most recent"]:
+ if pattern in body:
+ findings.append({"severity": "MEDIUM", "name": "Error Info Disclosure", "detail": f"Pattern '{pattern}' found in response"})
+ break
+
+ # robots.txt info
+ base = f"{urlparse(url).scheme}://{urlparse(url).netloc}"
+ try:
+ r = await client.get(f"{base}/robots.txt")
+ if r.status_code == 200:
+ disallowed = [l.split(":",1)[1].strip() for l in r.text.splitlines() if l.lower().startswith("disallow:") and l.split(":",1)[1].strip() not in ("/", "")]
+ if disallowed:
+ findings.append({"severity": "INFO", "name": "robots.txt reveals paths", "detail": f"Disallowed: {', '.join(disallowed[:5])}"})
+ except Exception:
+ pass
+
+ # Sensitive files
+ sensitive = [
+ (".env", "HIGH", "Environment file — may contain passwords/API keys"),
+ (".git/config", "HIGH", "Git repo exposed — source code accessible"),
+ (".git/HEAD", "HIGH", "Git repo exposed — source code accessible"),
+ ("wp-config.php.bak", "HIGH", "WordPress config backup"),
+ ("config.php.bak", "HIGH", "PHP config backup"),
+ ("backup.zip", "HIGH", "Backup archive accessible"),
+ ("backup.tar.gz", "HIGH", "Backup archive accessible"),
+ ("database.sql", "HIGH", "SQL dump accessible"),
+ ("phpinfo.php", "MEDIUM", "PHP info page — server config exposed"),
+ (".htaccess", "MEDIUM", "htaccess file readable"),
+ ("server-status", "MEDIUM", "Apache server-status exposed"),
+ ("admin/", "INFO", "Admin panel path accessible"),
+ ("administrator/", "INFO", "Admin panel path accessible"),
+ ("phpmyadmin/", "MEDIUM", "phpMyAdmin exposed"),
+ ("test.php", "LOW", "Test file accessible"),
+ ("readme.txt", "LOW", "README exposed — version info"),
+ ("README.md", "LOW", "README exposed — version info"),
+ ("CHANGELOG.md", "LOW", "CHANGELOG — version info"),
+ ("composer.json", "LOW", "composer.json — dependency info"),
+ ("package.json", "LOW", "package.json — dependency info"),
+ (".DS_Store", "LOW", ".DS_Store — macOS metadata, dir structure leak"),
+ ]
+
+ tasks = [_probe(client, base, path, sev, desc) for path, sev, desc in sensitive]
+ results = await asyncio.gather(*tasks, return_exceptions=True)
+ findings.extend(r for r in results if isinstance(r, dict))
+
+ return findings
+
+
+async def _probe(client: httpx.AsyncClient, base: str, path: str, severity: str, desc: str) -> dict | None:
+ try:
+ r = await client.get(urljoin(base + "/", path))
+ if r.status_code == 200 and len(r.content) > 10:
+ return {"severity": severity, "name": f"Exposed: /{path}", "detail": desc}
+ except Exception:
+ pass
+ return None
diff --git a/devha/commands/whois_cmd.py b/devha/commands/whois_cmd.py
new file mode 100644
index 0000000..990ce43
--- /dev/null
+++ b/devha/commands/whois_cmd.py
@@ -0,0 +1,105 @@
+"""WHOIS / RDAP lookup — domain registration info."""
+
+from __future__ import annotations
+
+import json
+import subprocess
+from typing import Annotated
+
+import httpx
+import typer
+
+from devha.ui import console, make_table, info, warn, error
+
+
+def whois_lookup(
+ target: Annotated[str, typer.Argument(help="Domain or IP (e.g. example.com, 8.8.8.8).")],
+ json_out: Annotated[bool, typer.Option("--json")] = False,
+) -> None:
+ """
+ WHOIS / RDAP lookup — registrar, creation date, expiry, nameservers.
+
+ Example:
+ devha whois google.com
+ devha whois 8.8.8.8
+ """
+ target = target.lstrip("www.").strip()
+ info(f"WHOIS lookup for [cyan]{target}[/cyan]...")
+
+ data = _rdap_lookup(target)
+
+ if data:
+ if json_out:
+ console.print_json(json.dumps(data))
+ return
+ table = make_table("FIELD", "VALUE", title=f"🔎 WHOIS — {target}")
+ for k, v in data.items():
+ if v:
+ table.add_row(f"[cyan]{k}[/cyan]", str(v)[:120])
+ console.print(table)
+ else:
+ _system_whois(target)
+
+
+def _rdap_lookup(domain: str) -> dict | None:
+ try:
+ resp = httpx.get(f"https://rdap.org/domain/{domain}", timeout=10, follow_redirects=True)
+ if resp.status_code != 200:
+ return None
+ raw = resp.json()
+ except Exception:
+ return None
+
+ data: dict[str, str] = {}
+ data["Domain"] = raw.get("ldhName", domain).upper()
+ data["Status"] = ", ".join(raw.get("status", []))
+
+ for event in raw.get("events", []):
+ action = event.get("eventAction", "")
+ date = event.get("eventDate", "")[:10]
+ if action == "registration":
+ data["Created"] = date
+ elif action == "expiration":
+ data["Expires"] = date
+ elif action == "last changed":
+ data["Updated"] = date
+
+ for entity in raw.get("entities", []):
+ roles = entity.get("roles", [])
+ vcard = entity.get("vcardArray", [])
+ name = ""
+ if isinstance(vcard, list) and len(vcard) > 1:
+ for field in vcard[1]:
+ if isinstance(field, list) and len(field) >= 4 and field[0] == "fn":
+ name = str(field[3])
+ if "registrar" in roles and name:
+ data["Registrar"] = name
+ elif "registrant" in roles and name:
+ data["Registrant"] = name
+
+ ns_list = [n.get("ldhName", "") for n in raw.get("nameservers", [])]
+ data["Nameservers"] = ", ".join(ns_list[:6])
+
+ return {k: v for k, v in data.items() if v}
+
+
+def _system_whois(target: str) -> None:
+ try:
+ result = subprocess.run(["whois", target], capture_output=True, text=True, timeout=15)
+ output = result.stdout or result.stderr
+ if not output:
+ error("No whois output returned.")
+ return
+ keep_keys = ["domain", "registrar", "created", "creat", "expir", "updated", "nameserver", "status", "registrant", "dnssec", "org", "netrange", "cidr", "country"]
+ useful = []
+ for line in output.splitlines():
+ ls = line.strip()
+ if not ls or ls.startswith("%") or ls.startswith("#") or ls.startswith(">>>"):
+ continue
+ if any(k in ls.lower() for k in keep_keys):
+ useful.append(ls)
+ console.print("\n".join(useful[:35]) if useful else output[:1500])
+ except FileNotFoundError:
+ error("'whois' command not available. Install: brew install whois")
+ except Exception as e:
+ error(f"whois error: {e}")
diff --git a/devha/commands/wifi.py b/devha/commands/wifi.py
new file mode 100644
index 0000000..0fae143
--- /dev/null
+++ b/devha/commands/wifi.py
@@ -0,0 +1,162 @@
+"""Wifi command — list nearby Wi-Fi networks (read-only)."""
+
+from __future__ import annotations
+
+import json
+import platform
+import re
+import subprocess
+from typing import Annotated
+
+import typer
+
+from devha.ui import console, make_table, error, warn, info
+
+
+def _run(cmd: list[str]) -> str:
+ try:
+ result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
+ return result.stdout
+ except (FileNotFoundError, subprocess.TimeoutExpired):
+ return ""
+
+
+def _parse_linux() -> list[dict]:
+ """Try nmcli first, fall back to iwlist."""
+ networks = []
+
+ # nmcli approach
+ out = _run(["nmcli", "-t", "-f", "SSID,SIGNAL,SECURITY,CHAN,BSSID", "device", "wifi", "list"])
+ if out.strip():
+ for line in out.strip().splitlines():
+ parts = line.split(":")
+ if len(parts) >= 4:
+ networks.append({
+ "ssid": parts[0] or "",
+ "signal": parts[1],
+ "security": parts[2] or "Open",
+ "channel": parts[3] if len(parts) > 3 else "?",
+ })
+ return networks
+
+ # iwlist fallback
+ out = _run(["iwlist", "scan"])
+ if not out:
+ return []
+
+ cells = re.split(r"Cell \d+", out)
+ for cell in cells[1:]:
+ ssid_m = re.search(r'ESSID:"([^"]*)"', cell)
+ signal_m = re.search(r"Signal level[=:](-?\d+)", cell)
+ enc_m = re.search(r"Encryption key:(on|off)", cell, re.IGNORECASE)
+ chan_m = re.search(r"Channel[:\s]+(\d+)", cell)
+ networks.append({
+ "ssid": ssid_m.group(1) if ssid_m else "",
+ "signal": signal_m.group(1) if signal_m else "?",
+ "security": "WPA/WEP" if enc_m and enc_m.group(1).lower() == "on" else "Open",
+ "channel": chan_m.group(1) if chan_m else "?",
+ })
+ return networks
+
+
+def _parse_macos() -> list[dict]:
+ airport = (
+ "/System/Library/PrivateFrameworks/Apple80211.framework"
+ "/Versions/Current/Resources/airport"
+ )
+ out = _run([airport, "-s"])
+ if not out:
+ return []
+
+ networks = []
+ for line in out.strip().splitlines()[1:]:
+ parts = line.split()
+ if len(parts) >= 5:
+ networks.append({
+ "ssid": parts[0],
+ "signal": parts[2],
+ "security": parts[-1] if len(parts) > 4 else "?",
+ "channel": parts[3] if len(parts) > 3 else "?",
+ })
+ return networks
+
+
+def _parse_windows() -> list[dict]:
+ out = _run(["netsh", "wlan", "show", "networks", "mode=Bssid"])
+ if not out:
+ return []
+
+ networks = []
+ current: dict = {}
+ for line in out.splitlines():
+ line = line.strip()
+ if line.startswith("SSID") and "BSSID" not in line:
+ if current:
+ networks.append(current)
+ current = {"ssid": line.split(":", 1)[-1].strip(), "signal": "?", "security": "?", "channel": "?"}
+ elif "Signal" in line and current:
+ current["signal"] = line.split(":", 1)[-1].strip()
+ elif "Authentication" in line and current:
+ current["security"] = line.split(":", 1)[-1].strip()
+ elif "Channel" in line and current:
+ current["channel"] = line.split(":", 1)[-1].strip()
+ if current:
+ networks.append(current)
+ return networks
+
+
+def _signal_sort_key(net: dict) -> int:
+ try:
+ return -int(net["signal"].replace("%", "").strip())
+ except (ValueError, AttributeError):
+ return 0
+
+
+def wifi(
+ json_out: Annotated[bool, typer.Option("--json", help="Output as JSON.")] = False,
+) -> None:
+ """
+ List nearby Wi-Fi networks (read-only — does not connect or crack).
+
+ Examples:
+ devha wifi
+ devha wifi --json
+ """
+ os_name = platform.system()
+ info(f"Scanning Wi-Fi networks on [cyan]{os_name}[/cyan]...")
+
+ if os_name == "Linux":
+ networks = _parse_linux()
+ elif os_name == "Darwin":
+ networks = _parse_macos()
+ elif os_name == "Windows":
+ networks = _parse_windows()
+ else:
+ error(f"Unsupported OS: {os_name}")
+ raise typer.Exit(1)
+
+ if not networks:
+ warn("No networks found. Are Wi-Fi adapters available and enabled?")
+ warn("Linux users: try running with sudo, or check 'nmcli' / 'iwlist' availability.")
+ raise typer.Exit(0)
+
+ networks.sort(key=_signal_sort_key)
+
+ if json_out:
+ console.print_json(json.dumps({"os": os_name, "networks": networks}))
+ return
+
+ table = make_table("SSID", "SIGNAL", "SECURITY", "CHANNEL", title="Nearby Wi-Fi Networks")
+ for net in networks:
+ ssid = net["ssid"] or "[dim][/dim]"
+ security = net["security"]
+ sec_style = "bright_green" if security == "Open" else "yellow"
+ table.add_row(
+ f"[cyan]{ssid}[/cyan]",
+ str(net["signal"]),
+ f"[{sec_style}]{security}[/{sec_style}]",
+ str(net["channel"]),
+ )
+ console.print(table)
+ console.print(f"\n[dim]Read-only scan — {len(networks)} network(s) found. "
+ "devha does not connect or crack networks.[/dim]")
diff --git a/devha/data/common_paths.txt b/devha/data/common_paths.txt
new file mode 100644
index 0000000..89dfa4d
--- /dev/null
+++ b/devha/data/common_paths.txt
@@ -0,0 +1,188 @@
+admin
+admin/
+admin/login
+admin/index.php
+administrator
+administrator/
+login
+login.php
+login.html
+signin
+signup
+register
+logout
+dashboard
+panel
+control
+manage
+wp-admin
+wp-login.php
+wp-content
+wp-includes
+wp-json
+xmlrpc.php
+joomla
+drupal
+magento
+shopify
+phpmyadmin
+phpmyadmin/
+pma/
+myadmin/
+mysql/
+dbadmin/
+sqlitemanager/
+robots.txt
+sitemap.xml
+sitemap.xml.gz
+.htaccess
+.htpasswd
+.git
+.git/HEAD
+.git/config
+.git/index
+.svn
+.svn/entries
+.env
+.env.local
+.env.production
+.env.backup
+.DS_Store
+web.config
+web.config.bak
+web.config.orig
+config.php
+config.php.bak
+configuration.php
+config.yml
+config.yaml
+settings.py
+settings.php
+database.php
+db.php
+connect.php
+connection.php
+backup
+backup.zip
+backup.tar.gz
+backup.sql
+backup.sql.gz
+dump.sql
+database.sql
+data.sql
+db_backup.sql
+site.tar.gz
+www.zip
+html.zip
+public_html.zip
+api
+api/v1
+api/v2
+api/v1/users
+api/v1/admin
+api/docs
+api/swagger
+swagger.json
+swagger.yaml
+openapi.json
+openapi.yaml
+graphql
+graphiql
+v1
+v2
+v3
+status
+health
+health/
+healthcheck
+ping
+info
+debug
+trace
+metrics
+prometheus/metrics
+actuator
+actuator/health
+actuator/info
+actuator/env
+server-status
+server-info
+test
+test.php
+test.html
+info.php
+phpinfo.php
+readme.txt
+README.txt
+README.md
+CHANGELOG
+LICENSE
+TODO
+index.php
+index.html
+index.asp
+index.aspx
+default.asp
+default.aspx
+home.php
+main.php
+old
+old/
+backup_old
+_old
+upload
+uploads
+upload/
+uploads/
+files/
+documents/
+images/
+img/
+media/
+static/
+assets/
+css/
+js/
+fonts/
+vendor/
+bower_components/
+node_modules/
+tmp/
+temp/
+cache/
+logs/
+log/
+error.log
+access.log
+debug.log
+setup.php
+install.php
+install/
+installer/
+update.php
+upgrade.php
+user
+users
+user/list
+users/list
+profile
+account
+accounts
+password
+forgot-password
+reset-password
+change-password
+oauth
+oauth/authorize
+oauth/token
+auth
+auth/login
+sso
+saml
+oidc
+.well-known/security.txt
+security.txt
+crossdomain.xml
+clientaccesspolicy.xml
+apple-app-site-association
+assetlinks.json
diff --git a/devha/data/sites.json b/devha/data/sites.json
new file mode 100644
index 0000000..2291acf
--- /dev/null
+++ b/devha/data/sites.json
@@ -0,0 +1,57 @@
+{
+ "GitHub": {"url": "https://github.com/{}", "error_code": 404},
+ "GitLab": {"url": "https://gitlab.com/{}", "error_code": 404},
+ "Reddit": {"url": "https://www.reddit.com/user/{}", "error_code": 404},
+ "Twitter": {"url": "https://twitter.com/{}", "error_code": 404},
+ "Instagram": {"url": "https://www.instagram.com/{}/", "error_code": 404},
+ "Twitch": {"url": "https://www.twitch.tv/{}", "error_code": 404},
+ "YouTube": {"url": "https://www.youtube.com/@{}", "error_code": 404},
+ "Steam": {"url": "https://steamcommunity.com/id/{}", "error_code": 404},
+ "Roblox": {"url": "https://www.roblox.com/user.aspx?username={}", "error_code": 404},
+ "Spotify": {"url": "https://open.spotify.com/user/{}", "error_code": 404},
+ "Pinterest": {"url": "https://www.pinterest.com/{}/", "error_code": 404},
+ "Medium": {"url": "https://medium.com/@{}", "error_code": 404},
+ "DEV.to": {"url": "https://dev.to/{}", "error_code": 404},
+ "Hacker News": {"url": "https://news.ycombinator.com/user?id={}", "error_code": 404},
+ "Stack Overflow": {"url": "https://stackoverflow.com/users/{}}", "error_code": 404},
+ "Replit": {"url": "https://replit.com/@{}", "error_code": 404},
+ "Keybase": {"url": "https://keybase.io/{}", "error_code": 404},
+ "Pastebin": {"url": "https://pastebin.com/u/{}", "error_code": 404},
+ "Gravatar": {"url": "https://en.gravatar.com/{}", "error_code": 404},
+ "Flickr": {"url": "https://www.flickr.com/people/{}", "error_code": 404},
+ "SoundCloud": {"url": "https://soundcloud.com/{}", "error_code": 404},
+ "Bandcamp": {"url": "https://{}.bandcamp.com", "error_code": 404},
+ "Vimeo": {"url": "https://vimeo.com/{}", "error_code": 404},
+ "Tumblr": {"url": "https://{}.tumblr.com", "error_code": 404},
+ "WordPress": {"url": "https://{}.wordpress.com", "error_code": 404},
+ "Blogger": {"url": "https://{}.blogspot.com", "error_code": 404},
+ "Substack": {"url": "https://{}.substack.com", "error_code": 404},
+ "Hashnode": {"url": "https://hashnode.com/@{}", "error_code": 404},
+ "CodePen": {"url": "https://codepen.io/{}", "error_code": 404},
+ "JSFiddle": {"url": "https://jsfiddle.net/user/{}/", "error_code": 404},
+ "Dribbble": {"url": "https://dribbble.com/{}", "error_code": 404},
+ "Behance": {"url": "https://www.behance.net/{}", "error_code": 404},
+ "Figma": {"url": "https://www.figma.com/@{}", "error_code": 404},
+ "Trello": {"url": "https://trello.com/{}", "error_code": 404},
+ "Notion": {"url": "https://www.notion.so/@{}", "error_code": 404},
+ "DockerHub": {"url": "https://hub.docker.com/u/{}", "error_code": 404},
+ "npm": {"url": "https://www.npmjs.com/~{}", "error_code": 404},
+ "PyPI": {"url": "https://pypi.org/user/{}/", "error_code": 404},
+ "Crates.io": {"url": "https://crates.io/users/{}", "error_code": 404},
+ "RubyGems": {"url": "https://rubygems.org/profiles/{}", "error_code": 404},
+ "HackerOne": {"url": "https://hackerone.com/{}", "error_code": 404},
+ "Bugcrowd": {"url": "https://bugcrowd.com/{}", "error_code": 404},
+ "TryHackMe": {"url": "https://tryhackme.com/p/{}", "error_code": 404},
+ "HackTheBox": {"url": "https://app.hackthebox.com/profile/{}", "error_code": 404},
+ "LeetCode": {"url": "https://leetcode.com/{}/", "error_code": 404},
+ "Codeforces": {"url": "https://codeforces.com/profile/{}", "error_code": 404},
+ "HackerRank": {"url": "https://www.hackerrank.com/{}", "error_code": 404},
+ "Kaggle": {"url": "https://www.kaggle.com/{}", "error_code": 404},
+ "ORCID": {"url": "https://orcid.org/search/search?searchQuery={}", "error_code": 404},
+ "Linktree": {"url": "https://linktr.ee/{}", "error_code": 404},
+ "Ko-fi": {"url": "https://ko-fi.com/{}", "error_code": 404},
+ "Patreon": {"url": "https://www.patreon.com/{}", "error_code": 404},
+ "Buy Me a Coffee": {"url": "https://www.buymeacoffee.com/{}", "error_code": 404},
+ "ProductHunt": {"url": "https://www.producthunt.com/@{}", "error_code": 404},
+ "AngelList": {"url": "https://angel.co/{}", "error_code": 404}
+}
diff --git a/devha/data/subdomains.txt b/devha/data/subdomains.txt
new file mode 100644
index 0000000..4ee6ff3
--- /dev/null
+++ b/devha/data/subdomains.txt
@@ -0,0 +1,201 @@
+www
+mail
+ftp
+localhost
+webmail
+smtp
+pop
+ns1
+ns2
+hosting
+blog
+dev
+staging
+api
+admin
+portal
+vpn
+cdn
+app
+secure
+m
+mobile
+shop
+store
+test
+beta
+alpha
+demo
+help
+support
+docs
+status
+static
+assets
+img
+images
+media
+files
+download
+upload
+s3
+login
+auth
+sso
+oauth
+dashboard
+panel
+control
+manage
+manager
+cpanel
+whm
+webdisk
+autodiscover
+autoconfig
+imap
+pop3
+exchange
+owa
+remote
+rdp
+ssh
+git
+svn
+hg
+repo
+ci
+jenkins
+gitlab
+bitbucket
+jira
+confluence
+wiki
+forum
+community
+news
+events
+calendar
+mail2
+email
+webmin
+phpmyadmin
+mysql
+db
+database
+sql
+redis
+mongo
+elastic
+search
+solr
+api2
+v1
+v2
+v3
+rest
+graphql
+socket
+ws
+wss
+chat
+stream
+video
+audio
+cdn2
+cdn3
+cache
+proxy
+lb
+load
+haproxy
+nginx
+apache
+node
+python
+ruby
+php
+java
+dotnet
+go
+rust
+microservice
+gateway
+payment
+billing
+invoice
+checkout
+cart
+order
+product
+catalog
+inventory
+warehouse
+shipping
+tracking
+analytics
+metrics
+monitoring
+logs
+logging
+kibana
+grafana
+prometheus
+alertmanager
+vault
+consul
+etcd
+kubernetes
+k8s
+docker
+registry
+nexus
+sonarqube
+artifactory
+jenkins2
+teamcity
+travis
+circle
+drone
+deploy
+release
+staging2
+prod
+production
+sandbox
+qa
+uat
+pre-prod
+preprod
+preview
+review
+internal
+intranet
+extranet
+partner
+vendor
+supplier
+customer
+client
+affiliate
+reseller
+agent
+crm
+erp
+hr
+hris
+payroll
+accounting
+finance
+legal
+compliance
+security
+infosec
+siem
+soc
+backup
+archive
+disaster
+recovery
+dr
+failover
diff --git a/devha/ethics.py b/devha/ethics.py
new file mode 100644
index 0000000..fd33e96
--- /dev/null
+++ b/devha/ethics.py
@@ -0,0 +1,33 @@
+"""Ethical-use warnings and confirmation helpers."""
+
+from __future__ import annotations
+
+import typer
+from rich.panel import Panel
+from rich.text import Text
+
+from devha.ui import console, warn
+
+_WHITELIST = {"localhost", "127.0.0.1", "::1", "scanme.nmap.org"}
+
+WARNING_TEXT = """[yellow bold]ETHICAL WARNING[/yellow bold]
+
+This scan sends traffic to an external host.
+Only run this against:
+
+ • Systems you own
+ • Systems for which you have [bold]explicit written permission[/bold]
+ • Legal test environments ([cyan]scanme.nmap.org[/cyan], HackTheBox, TryHackMe)
+
+Unauthorised scanning may be illegal in your jurisdiction."""
+
+
+def ethics_check(target: str, *, yes: bool = False) -> None:
+ """Show ethics warning and ask for confirmation unless bypassed."""
+ if yes or target.lower() in _WHITELIST:
+ return
+
+ console.print(Panel(Text.from_markup(WARNING_TEXT), border_style="yellow", title="⚠ devha"))
+ confirmed = typer.confirm("Continue?", default=False)
+ if not confirmed:
+ raise typer.Abort()
diff --git a/devha/studio.py b/devha/studio.py
new file mode 100644
index 0000000..83b9706
--- /dev/null
+++ b/devha/studio.py
@@ -0,0 +1,221 @@
+"""devha Hacking Studio — Textual TUI main menu."""
+
+from __future__ import annotations
+
+import asyncio
+import random
+import time
+from typing import Callable
+
+from rich.text import Text
+from textual.app import App, ComposeResult
+from textual.binding import Binding
+from textual.containers import Container, Horizontal, Vertical
+from textual.reactive import reactive
+from textual.widgets import Footer, Header, Label, Static, RichLog
+
+# ─── Matrix rain effect ───────────────────────────────────────────────────────
+
+_MATRIX_CHARS = "ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ01アイウエオカキクケコ"
+
+_MENU_ITEMS = [
+ ("1", "🔌", "Network Scanner", "Port scan · OS detect · Banner grab · CVE lookup"),
+ ("2", "🌐", "Web Recon", "Dirscan · Crawl · Headers · Vuln scan"),
+ ("3", "🔍", "DNS / SSL / WHOIS","DNS recon · SSL cert · WHOIS · Zone transfer"),
+ ("4", "👤", "OSINT Suite", "Username · Harvest · Subdomains · IP Intel"),
+ ("5", "🔐", "Crypto & Cipher", "Encode · Decode · Crack · All formats"),
+ ("6", "🔑", "Password Lab", "Hash · Crack · Strength · Generate"),
+ ("7", "📦", "Packet Lab", "Capture · ARP scan · Builder · Stats"),
+ ("8", "📡", "WiFi Lab", "Scan · Device map · Router security"),
+ ("9", "🏓", "Ping & Trace", "ICMP ping · RTT · Packet loss"),
+]
+
+_BANNER = """[bold cyan]
+ ██████╗ ███████╗██╗ ██╗██║ ██╗ █████╗
+ ██╔══██╗██╔════╝██║ ██║██║ ██║██╔══██╗
+ ██║ ██║█████╗ ██║ ██║███████║███████║
+ ██║ ██║██╔══╝ ╚██╗ ██╔╝██╔══██║██╔══██║
+ ██████╔╝███████╗ ╚████╔╝ ██║ ██║██║ ██║
+ ╚═════╝ ╚══════╝ ╚═══╝ ╚═╝ ╚═╝╚═╝ ╚═╝[/bold cyan]
+[dim cyan] Hacking Studio v2.0 · Ethical use only · Own systems only[/dim cyan]"""
+
+
+class MatrixColumn(Static):
+ """Single column of falling matrix characters."""
+
+ DEFAULT_CSS = """
+ MatrixColumn {
+ width: 1;
+ color: green;
+ text-opacity: 60%;
+ }
+ """
+
+ chars: reactive[str] = reactive("")
+
+ def on_mount(self) -> None:
+ self.col_chars = [random.choice(_MATRIX_CHARS) for _ in range(40)]
+ self._scroll = random.randint(0, 30)
+ self.speed = random.uniform(0.05, 0.15)
+ self.set_interval(self.speed, self._tick)
+
+ def _tick(self) -> None:
+ self._scroll = (self._scroll + 1) % 40
+ self.col_chars[random.randint(0, 39)] = random.choice(_MATRIX_CHARS)
+ self.chars = "\n".join(self.col_chars)
+
+ def render(self) -> Text:
+ t = Text()
+ chars = self.col_chars
+ for i, ch in enumerate(chars):
+ dist = (i - self._scroll) % len(chars)
+ if dist == 0:
+ t.append(ch, "bold #ffffff")
+ elif dist < 4:
+ t.append(ch, "bold #00ff00")
+ elif dist < 10:
+ t.append(ch, "green")
+ else:
+ t.append(ch, "dim green")
+ return t
+
+
+class MenuCard(Static):
+ """A single menu item card."""
+
+ DEFAULT_CSS = """
+ MenuCard {
+ border: round #1a1a2e;
+ padding: 0 1;
+ margin: 0 0 1 0;
+ background: #0a0a0f;
+ color: #aaaaaa;
+ }
+ MenuCard:hover {
+ border: round cyan;
+ background: #0d1f2d;
+ color: #ffffff;
+ }
+ MenuCard.selected {
+ border: round #00ffff;
+ background: #0d2137;
+ color: #ffffff;
+ }
+ """
+
+ def __init__(self, key: str, icon: str, title: str, desc: str, **kwargs):
+ super().__init__(**kwargs)
+ self.key = key
+ self.icon = icon
+ self.title = title
+ self.desc = desc
+
+ def render(self) -> Text:
+ t = Text()
+ t.append(f" [{self.key}] ", "bold cyan")
+ t.append(f"{self.icon} ", "")
+ t.append(self.title, "bold #ffffff")
+ t.append(f" — {self.desc}", "dim")
+ return t
+
+
+class DevhaStudio(App):
+ """devha Hacking Studio — main TUI application."""
+
+ CSS = """
+ Screen {
+ background: #050508;
+ layers: matrix menu;
+ }
+
+ #matrix-bg {
+ layer: matrix;
+ layout: horizontal;
+ width: 100%;
+ height: 100%;
+ opacity: 15%;
+ overflow: hidden;
+ }
+
+ #main-container {
+ layer: menu;
+ align: center middle;
+ width: 100%;
+ height: 100%;
+ }
+
+ #center-panel {
+ width: 70;
+ height: auto;
+ background: #050508;
+ border: double cyan;
+ padding: 1 2;
+ }
+
+ #banner {
+ width: 100%;
+ text-align: center;
+ margin: 0 0 1 0;
+ }
+
+ #ethics-note {
+ color: #aaaa00;
+ text-align: center;
+ margin: 0 0 1 0;
+ }
+
+ #menu-list {
+ width: 100%;
+ height: auto;
+ }
+
+ Footer {
+ background: #050508;
+ color: cyan;
+ }
+ """
+
+ BINDINGS = [
+ Binding("1", "launch('network')", "Network"),
+ Binding("2", "launch('web')", "Web"),
+ Binding("3", "launch('dns')", "DNS/SSL"),
+ Binding("4", "launch('osint')", "OSINT"),
+ Binding("5", "launch('cipher')", "Crypto"),
+ Binding("6", "launch('password')", "Passwords"),
+ Binding("7", "launch('packets')", "Packets"),
+ Binding("8", "launch('wifi')", "WiFi"),
+ Binding("9", "launch('ping')", "Ping"),
+ Binding("q", "quit", "Quit"),
+ ]
+
+ TITLE = "devha Hacking Studio"
+
+ def compose(self) -> ComposeResult:
+ # Matrix background
+ with Horizontal(id="matrix-bg"):
+ for _ in range(80):
+ yield MatrixColumn()
+
+ # Main menu
+ with Container(id="main-container"):
+ with Vertical(id="center-panel"):
+ yield Static(_BANNER, id="banner")
+ yield Static(
+ "⚡ Own systems only · Stay legal · Be ethical",
+ id="ethics-note",
+ )
+ with Vertical(id="menu-list"):
+ for key, icon, title, desc in _MENU_ITEMS:
+ yield MenuCard(key, icon, title, desc)
+
+ yield Footer()
+
+ def action_launch(self, module: str) -> None:
+ """Launch a module — exit TUI and run the command."""
+ self.exit(result=module)
+
+
+def run_studio() -> str | None:
+ """Run the TUI and return the selected module name."""
+ app = DevhaStudio()
+ return app.run()
diff --git a/devha/ui.py b/devha/ui.py
new file mode 100644
index 0000000..2b38e00
--- /dev/null
+++ b/devha/ui.py
@@ -0,0 +1,72 @@
+"""Shared Rich UI helpers for devha."""
+
+from __future__ import annotations
+
+import sys
+from typing import Any
+
+from rich.console import Console
+from rich.panel import Panel
+from rich.table import Table
+from rich.text import Text
+from rich import box
+
+console = Console()
+err_console = Console(stderr=True)
+
+
+BANNER = r"""
+ ╔══════════════════════════════════════════╗
+ ║ ██████╗ ███████╗██╗ ██╗██╗ ██╗ █████╗ ║
+ ║ ██╔══██╗██╔════╝██║ ██║██║ ██║██╔══██╗ ║
+ ║ ██║ ██║█████╗ ██║ ██║███████║███████║ ║
+ ║ ██║ ██║██╔══╝ ╚██╗ ██╔╝██╔══██║██╔══██║ ║
+ ║ ██████╔╝███████╗ ╚████╔╝ ██║ ██║██║ ██║ ║
+ ║ ╚═════╝ ╚══════╝ ╚═══╝ ╚═╝ ╚═╝╚═╝ ╚═╝ ║
+ ║ ║
+ ║ Developer & Hacking CLI v0.1.0 ║
+ ║ ⚠ For ethical use only ║
+ ╚══════════════════════════════════════════╝
+"""
+
+
+def print_banner() -> None:
+ console.print(Text(BANNER, style="cyan dim"))
+
+
+def make_table(*columns: str, title: str | None = None) -> Table:
+ table = Table(
+ title=title,
+ box=box.ROUNDED,
+ border_style="cyan",
+ header_style="bold magenta",
+ show_lines=False,
+ )
+ for col in columns:
+ table.add_column(col)
+ return table
+
+
+def print_panel(content: Any, title: str = "", style: str = "cyan") -> None:
+ console.print(Panel(content, title=title, border_style=style, expand=False))
+
+
+def success(msg: str) -> None:
+ console.print(f"[bright_green]✔[/bright_green] {msg}")
+
+
+def warn(msg: str) -> None:
+ console.print(f"[yellow]⚠[/yellow] {msg}")
+
+
+def error(msg: str) -> None:
+ err_console.print(f"[bright_red]✘[/bright_red] {msg}")
+
+
+def info(msg: str) -> None:
+ console.print(f"[blue]ℹ[/blue] {msg}")
+
+
+def fatal(msg: str, code: int = 1) -> None:
+ error(msg)
+ sys.exit(code)
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 0000000..a6cd17c
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,66 @@
+[tool.poetry]
+name = "devha"
+version = "0.1.0"
+description = "Developer & Hacking CLI — an ethical hacking and developer toolkit"
+authors = ["devha contributors "]
+license = "MIT"
+readme = "README.md"
+homepage = "https://github.com/waldex451/devha"
+repository = "https://github.com/waldex451/devha"
+keywords = ["hacking", "security", "cli", "ethical", "pentesting", "developer"]
+classifiers = [
+ "Development Status :: 3 - Alpha",
+ "Environment :: Console",
+ "Intended Audience :: Developers",
+ "License :: OSI Approved :: MIT License",
+ "Programming Language :: Python :: 3",
+ "Programming Language :: Python :: 3.10",
+ "Programming Language :: Python :: 3.11",
+ "Programming Language :: Python :: 3.12",
+ "Topic :: Security",
+ "Topic :: Utilities",
+]
+packages = [{include = "devha"}]
+
+[tool.poetry.scripts]
+devha = "devha.cli:app"
+
+[tool.poetry.dependencies]
+python = "^3.10"
+typer = {extras = ["all"], version = "^0.12.0"}
+rich = "^13.7.0"
+httpx = "^0.27.0"
+dnspython = "^2.6.0"
+beautifulsoup4 = "^4.12.0"
+scapy = "^2.5.0"
+pyfiglet = "^1.0.2"
+textual = "^0.61.0"
+click = ">=8.0,<8.2"
+
+[tool.poetry.group.dev.dependencies]
+pytest = "^8.0.0"
+pytest-asyncio = "^0.23.0"
+pytest-cov = "^5.0.0"
+ruff = "^0.4.0"
+black = "^24.0.0"
+
+[tool.black]
+line-length = 100
+target-version = ["py310", "py311", "py312"]
+
+[tool.ruff]
+line-length = 100
+target-version = "py310"
+
+[tool.ruff.lint]
+select = ["E", "F", "W", "I"]
+ignore = ["E501"]
+
+[tool.pytest.ini_options]
+asyncio_mode = "auto"
+testpaths = ["tests"]
+addopts = "--cov=devha --cov-report=term-missing"
+
+[build-system]
+requires = ["poetry-core"]
+build-backend = "poetry.core.masonry.api"
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tests/test_cipher.py b/tests/test_cipher.py
new file mode 100644
index 0000000..400683c
--- /dev/null
+++ b/tests/test_cipher.py
@@ -0,0 +1,132 @@
+"""Tests for cipher module."""
+
+import pytest
+from devha.commands.cipher import (
+ caesar_encode,
+ caesar_decode,
+ rot13_encode,
+ rot13_decode,
+ atbash_encode,
+ atbash_decode,
+ vigenere_encode,
+ vigenere_decode,
+ _readability_score,
+ _apply_cipher,
+)
+
+
+class TestCaesar:
+ def test_encode_basic(self):
+ assert caesar_encode("ABC", 3) == "DEF"
+
+ def test_encode_wrap(self):
+ assert caesar_encode("XYZ", 3) == "ABC"
+
+ def test_encode_lowercase(self):
+ assert caesar_encode("abc", 1) == "bcd"
+
+ def test_encode_preserves_non_alpha(self):
+ assert caesar_encode("Hello, World!", 13) == "Uryyb, Jbeyq!"
+
+ def test_decode_reverses_encode(self):
+ original = "The quick brown fox"
+ assert caesar_decode(caesar_encode(original, 7), 7) == original
+
+ def test_decode_shift_13(self):
+ assert caesar_decode("Uryyb", 13) == "Hello"
+
+ def test_zero_shift_identity(self):
+ text = "Hello"
+ assert caesar_encode(text, 0) == text
+
+ def test_full_alphabet_cycle(self):
+ assert caesar_encode("A", 26) == "A"
+
+
+class TestRot13:
+ def test_encode(self):
+ assert rot13_encode("Hello") == "Uryyb"
+
+ def test_decode_is_encode(self):
+ assert rot13_decode("Uryyb") == "Hello"
+
+ def test_involution(self):
+ text = "Testing ROT13!"
+ assert rot13_encode(rot13_encode(text)) == text
+
+ def test_numbers_unchanged(self):
+ assert rot13_encode("abc123") == "nop123"
+
+
+class TestAtbash:
+ def test_encode_a_z(self):
+ assert atbash_encode("A") == "Z"
+ assert atbash_encode("Z") == "A"
+
+ def test_encode_word(self):
+ assert atbash_encode("ABC") == "ZYX"
+
+ def test_involution(self):
+ text = "Hello World"
+ assert atbash_decode(atbash_encode(text)) == text
+
+ def test_preserves_case(self):
+ assert atbash_encode("a") == "z"
+ assert atbash_encode("A") == "Z"
+
+ def test_preserves_spaces(self):
+ assert atbash_encode("A B") == "Z Y"
+
+
+class TestVigenere:
+ def test_encode_basic(self):
+ result = vigenere_encode("HELLO", "KEY")
+ assert result == "RIJVS"
+
+ def test_decode_reverses_encode(self):
+ original = "Hello World"
+ encoded = vigenere_encode(original, "secret")
+ assert vigenere_decode(encoded, "secret") == original
+
+ def test_key_wraps_around(self):
+ text = "AAAAAA"
+ key = "AB"
+ encoded = vigenere_encode(text, key)
+ assert encoded == "ABABAB"
+
+ def test_preserves_non_alpha(self):
+ encoded = vigenere_encode("Hello, World!", "key")
+ assert "," in encoded
+ assert " " in encoded
+ assert "!" in encoded
+
+
+class TestReadabilityScore:
+ def test_english_text_high_score(self):
+ score = _readability_score("the quick brown fox jumps over the lazy dog")
+ assert score > 4.5
+
+ def test_random_garbage_low_score(self):
+ score = _readability_score("ZZZZQQQXXX")
+ assert score < 3.0
+
+ def test_empty_string(self):
+ assert _readability_score("") == 0.0
+
+
+class TestApplyCipher:
+ def test_caesar_encode(self):
+ result = _apply_cipher("Hello", "caesar", "3", "encode")
+ assert result == "Khoor"
+
+ def test_rot13(self):
+ result = _apply_cipher("Hello", "rot13", "0", "encode")
+ assert result == "Uryyb"
+
+ def test_atbash(self):
+ result = _apply_cipher("ABC", "atbash", "0", "encode")
+ assert result == "ZYX"
+
+ def test_vigenere(self):
+ result = _apply_cipher("HELLO", "vigenere", "KEY", "encode")
+ assert result == "RIJVS"
diff --git a/tests/test_portscan.py b/tests/test_portscan.py
new file mode 100644
index 0000000..b095158
--- /dev/null
+++ b/tests/test_portscan.py
@@ -0,0 +1,76 @@
+"""Tests for portscan module."""
+
+import socket
+from unittest.mock import patch, MagicMock
+
+import pytest
+from devha.commands.portscan import _parse_port_range, _scan_port, _get_service
+
+
+class TestParsePortRange:
+ def test_single_range(self):
+ assert _parse_port_range("1-5") == [1, 2, 3, 4, 5]
+
+ def test_single_port(self):
+ assert _parse_port_range("80") == [80]
+
+ def test_comma_separated(self):
+ assert _parse_port_range("22,80,443") == [22, 80, 443]
+
+ def test_mixed(self):
+ result = _parse_port_range("22,80-82,443")
+ assert result == [22, 80, 81, 82, 443]
+
+ def test_single_port_range(self):
+ assert _parse_port_range("100-100") == [100]
+
+
+class TestGetService:
+ def test_known_port(self):
+ # Port 80 is universally "http"
+ assert _get_service(80) == "http"
+
+ def test_ssh_port(self):
+ assert _get_service(22) == "ssh"
+
+ def test_unknown_port_returns_unknown(self):
+ # Very high port unlikely to be in the services DB
+ result = _get_service(65432)
+ assert result == "unknown"
+
+
+class TestScanPort:
+ def test_open_port_returns_dict(self):
+ with patch("socket.socket") as mock_socket_cls:
+ mock_sock = MagicMock()
+ mock_sock.__enter__ = MagicMock(return_value=mock_sock)
+ mock_sock.__exit__ = MagicMock(return_value=False)
+ mock_sock.connect_ex.return_value = 0
+ mock_socket_cls.return_value = mock_sock
+
+ result = _scan_port("127.0.0.1", 80, 1.0)
+ assert result is not None
+ assert result["port"] == 80
+ assert result["status"] == "open"
+
+ def test_closed_port_returns_none(self):
+ with patch("socket.socket") as mock_socket_cls:
+ mock_sock = MagicMock()
+ mock_sock.__enter__ = MagicMock(return_value=mock_sock)
+ mock_sock.__exit__ = MagicMock(return_value=False)
+ mock_sock.connect_ex.return_value = 111 # connection refused
+ mock_socket_cls.return_value = mock_sock
+
+ result = _scan_port("127.0.0.1", 9999, 1.0)
+ assert result is None
+
+ def test_socket_error_returns_none(self):
+ with patch("socket.socket") as mock_socket_cls:
+ mock_sock = MagicMock()
+ mock_sock.__enter__ = MagicMock(return_value=mock_sock)
+ mock_sock.__exit__ = MagicMock(return_value=False)
+ mock_sock.connect_ex.side_effect = OSError("network error")
+ mock_socket_cls.return_value = mock_sock
+
+ result = _scan_port("127.0.0.1", 80, 1.0)
+ assert result is None
diff --git a/tests/test_username.py b/tests/test_username.py
new file mode 100644
index 0000000..ce2f5df
--- /dev/null
+++ b/tests/test_username.py
@@ -0,0 +1,115 @@
+"""Tests for username module."""
+
+import json
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+import httpx
+
+from devha.commands.username import _load_sites, _check_site
+
+
+class TestLoadSites:
+ def test_returns_dict(self):
+ sites = _load_sites()
+ assert isinstance(sites, dict)
+
+ def test_has_required_sites(self):
+ sites = _load_sites()
+ required = ["GitHub", "Reddit", "Twitter", "Instagram"]
+ for site in required:
+ assert site in sites, f"{site} missing from sites.json"
+
+ def test_site_has_url_and_error_code(self):
+ sites = _load_sites()
+ for name, data in sites.items():
+ assert "url" in data, f"{name} missing 'url'"
+ assert "error_code" in data, f"{name} missing 'error_code'"
+ assert "{}" in data["url"], f"{name} url missing {{}} placeholder"
+
+ def test_minimum_count(self):
+ sites = _load_sites()
+ assert len(sites) >= 50
+
+
+class TestCheckSite:
+ @pytest.mark.asyncio
+ async def test_found_returns_found(self):
+ mock_resp = MagicMock()
+ mock_resp.status_code = 200
+
+ mock_client = AsyncMock()
+ mock_client.get = AsyncMock(return_value=mock_resp)
+
+ result = await _check_site(
+ mock_client,
+ "GitHub",
+ {"url": "https://github.com/{}", "error_code": 404},
+ "torvalds",
+ 5.0,
+ )
+ assert result["status"] == "found"
+ assert result["site"] == "GitHub"
+ assert "torvalds" in result["url"]
+
+ @pytest.mark.asyncio
+ async def test_not_found_on_error_code(self):
+ mock_resp = MagicMock()
+ mock_resp.status_code = 404
+
+ mock_client = AsyncMock()
+ mock_client.get = AsyncMock(return_value=mock_resp)
+
+ result = await _check_site(
+ mock_client,
+ "GitHub",
+ {"url": "https://github.com/{}", "error_code": 404},
+ "thisuserdoesnotexist99999",
+ 5.0,
+ )
+ assert result["status"] == "not_found"
+
+ @pytest.mark.asyncio
+ async def test_timeout_returns_timeout(self):
+ mock_client = AsyncMock()
+ mock_client.get = AsyncMock(side_effect=httpx.TimeoutException("timeout"))
+
+ result = await _check_site(
+ mock_client,
+ "GitHub",
+ {"url": "https://github.com/{}", "error_code": 404},
+ "user",
+ 5.0,
+ )
+ assert result["status"] == "timeout"
+
+ @pytest.mark.asyncio
+ async def test_request_error_returns_error(self):
+ mock_client = AsyncMock()
+ mock_client.get = AsyncMock(side_effect=httpx.RequestError("connection error"))
+
+ result = await _check_site(
+ mock_client,
+ "GitHub",
+ {"url": "https://github.com/{}", "error_code": 404},
+ "user",
+ 5.0,
+ )
+ assert result["status"] == "error"
+
+ @pytest.mark.asyncio
+ async def test_url_substitution(self):
+ mock_resp = MagicMock()
+ mock_resp.status_code = 200
+
+ mock_client = AsyncMock()
+ mock_client.get = AsyncMock(return_value=mock_resp)
+
+ result = await _check_site(
+ mock_client,
+ "TestSite",
+ {"url": "https://example.com/user/{}/profile", "error_code": 404},
+ "myuser",
+ 5.0,
+ )
+ assert result["url"] == "https://example.com/user/myuser/profile"