diff --git a/devha/__init__.py b/devha/__init__.py index 3dc1f76..8c0d5d5 100644 --- a/devha/__init__.py +++ b/devha/__init__.py @@ -1 +1 @@ -__version__ = "0.1.0" +__version__ = "2.0.0" diff --git a/devha/cli.py b/devha/cli.py index e42cc15..60051c8 100644 --- a/devha/cli.py +++ b/devha/cli.py @@ -1,7 +1,9 @@ -"""Main Typer application for devha.""" +"""Main Typer application for devha — Hacking Studio v2.0.""" from __future__ import annotations +import subprocess +import sys from typing import Annotated import typer @@ -20,18 +22,27 @@ harvest, headers, ping, + wifilab, + passlab, + packetlab, ) +from devha.commands.devscanner import devscanner app = typer.Typer( name="devha", - help="[cyan]Developer & Hacking CLI[/cyan] — ethical hacking and developer toolkit.", + help="[cyan]devha Hacking Studio v2.0[/cyan] — ethical hacking & developer toolkit.", rich_markup_mode="rich", - no_args_is_help=True, + no_args_is_help=False, add_completion=True, ) -# Register sub-apps -app.add_typer(cipher.app, name="cipher", help="Classical cipher encode / decode / crack.") +# ─── Sub-apps (grouped commands) ───────────────────────────────────────────── +app.add_typer(cipher.app, name="cipher", help="🔐 Classical & modern ciphers.") +app.add_typer(wifilab.app, name="wifilab", help="📡 WiFi Lab — scan, map devices, test own AP.") +app.add_typer(passlab.app, name="passlab", help="🔑 Password Lab — hash, crack, generate.") +app.add_typer(packetlab.app, name="packetlab", help="📦 Packet Lab — capture, ARP scan, builder.") + +# ─── Single commands ────────────────────────────────────────────────────────── app.command("portscan")(portscan.portscan) app.command("username")(username.username) app.command("wifi")(wifi.wifi) @@ -41,11 +52,49 @@ app.command("harvest")(harvest.harvest) app.command("headers")(headers.headers) app.command("ping")(ping.ping) +app.command("devscanner")(devscanner) + + +# ─── Studio TUI ─────────────────────────────────────────────────────────────── + +@app.command("studio") +def studio( + no_fx: Annotated[bool, typer.Option("--no-fx", help="Skip boot animation.")] = False, +) -> None: + """ + 🎮 Open the interactive Hacking Studio TUI menu. + + Navigate with [1-9] keys. All tools available in one interface. + """ + if not no_fx: + from devha.fx import hacker_boot + hacker_boot() + + from devha.studio import run_studio + module = run_studio() + # Map module name → CLI command to run + _module_map = { + "network": ["devha", "devscanner", "--help"], + "wifi": ["devha", "wifilab", "--help"], + "osint": ["devha", "username", "--help"], + "cipher": ["devha", "cipher", "--help"], + "web": ["devha", "dirscan", "--help"], + "password": ["devha", "passlab", "--help"], + "packets": ["devha", "packetlab", "--help"], + "headers": ["devha", "headers", "--help"], + "ping": ["devha", "ping", "--help"], + } + + if module and module in _module_map: + subprocess.run(_module_map[module]) + + +# ─── Version + main callback ───────────────────────────────────────────────── def _version_callback(value: bool) -> None: if value: - console.print(f"[cyan]devha[/cyan] version [bold]{__version__}[/bold]") + console.print(f"[cyan]devha[/cyan] Hacking Studio version [bold]{__version__}[/bold]") raise typer.Exit() @@ -57,7 +106,37 @@ def main( typer.Option("--version", "-V", callback=_version_callback, is_eager=True, help="Show version."), ] = False, no_banner: Annotated[bool, typer.Option("--no-banner", help="Skip the ASCII banner.")] = False, + studio_mode: Annotated[bool, typer.Option("--studio", "-s", help="Launch interactive Studio TUI.")] = False, ) -> None: - """[cyan bold]devha[/cyan bold] — Developer & Hacking CLI.""" - if not no_banner and ctx.invoked_subcommand is not None: + """[cyan bold]devha[/cyan bold] Hacking Studio — Developer & Hacking CLI v2.0.""" + if studio_mode: + ctx.invoke(studio) + return + + if ctx.invoked_subcommand is None: + # No subcommand → show studio TUI + from devha.fx import hacker_boot + if not no_banner: + hacker_boot() + from devha.studio import run_studio + module = run_studio() + if module: + _launch_module(module) + elif not no_banner: print_banner() + + +def _launch_module(module: str) -> None: + _map = { + "network": ["devha", "devscanner", "--help"], + "wifi": ["devha", "wifilab", "--help"], + "osint": ["devha", "username", "--help"], + "cipher": ["devha", "cipher", "--help"], + "web": ["devha", "dirscan", "--help"], + "password": ["devha", "passlab", "--help"], + "packets": ["devha", "packetlab", "--help"], + "headers": ["devha", "headers", "--help"], + "ping": ["devha", "ping", "--help"], + } + if module in _map: + subprocess.run(_map[module]) diff --git a/devha/commands/cipher.py b/devha/commands/cipher.py index 27c130f..e82552f 100644 --- a/devha/commands/cipher.py +++ b/devha/commands/cipher.py @@ -1,14 +1,15 @@ -"""Cipher command: encode, decode, crack classical ciphers.""" +"""Cipher command: encode, decode, crack classical ciphers + modern encodings.""" from __future__ import annotations +import base64 import json from typing import Annotated import typer from rich.text import Text -from devha.ui import console, make_table, print_panel, error +from devha.ui import console, make_table, print_panel, error, info app = typer.Typer(help="Classical cipher [encode / decode / crack].", rich_markup_mode="rich") @@ -94,7 +95,6 @@ def vigenere_decode(text: str, key: str) -> str: def _readability_score(text: str) -> float: - """Score 0-100 based on English letter frequency match.""" letters = [ch.lower() for ch in text if ch.isalpha()] if not letters: return 0.0 @@ -103,6 +103,67 @@ def _readability_score(text: str) -> float: return round(score, 2) +# ─── Modern encodings ───────────────────────────────────────────────────────── + +def base64_encode(text: str) -> str: + return base64.b64encode(text.encode()).decode() + +def base64_decode(text: str) -> str: + return base64.b64decode(text.encode()).decode(errors="replace") + +def hex_encode(text: str) -> str: + return text.encode().hex() + +def hex_decode(text: str) -> str: + try: + return bytes.fromhex(text).decode(errors="replace") + except ValueError: + return "Invalid hex string" + +def rot47_encode(text: str) -> str: + result = [] + for ch in text: + n = ord(ch) + if 33 <= n <= 126: + result.append(chr(33 + (n - 33 + 47) % 94)) + else: + result.append(ch) + return "".join(result) + +def binary_encode(text: str) -> str: + return " ".join(format(ord(c), "08b") for c in text) + +def binary_decode(text: str) -> str: + try: + return "".join(chr(int(b, 2)) for b in text.split()) + except ValueError: + return "Invalid binary string" + +def morse_encode(text: str) -> str: + _MORSE = { + "A": ".-", "B": "-...", "C": "-.-.", "D": "-..", "E": ".", "F": "..-.", + "G": "--.", "H": "....", "I": "..", "J": ".---", "K": "-.-", "L": ".-..", + "M": "--", "N": "-.", "O": "---", "P": ".--.", "Q": "--.-", "R": ".-.", + "S": "...", "T": "-", "U": "..-", "V": "...-", "W": ".--", "X": "-..-", + "Y": "-.--", "Z": "--..", "0": "-----", "1": ".----", "2": "..---", + "3": "...--", "4": "....-", "5": ".....", "6": "-....", "7": "--...", + "8": "---..", "9": "----.", " ": "/", + } + return " ".join(_MORSE.get(c.upper(), "?") for c in text) + +def morse_decode(text: str) -> str: + _MORSE_REV = { + ".-": "A", "-...": "B", "-.-.": "C", "-..": "D", ".": "E", "..-.": "F", + "--.": "G", "....": "H", "..": "I", ".---": "J", "-.-": "K", ".-..": "L", + "--": "M", "-.": "N", "---": "O", ".--.": "P", "--.-": "Q", ".-.": "R", + "...": "S", "-": "T", "..-": "U", "...-": "V", ".--": "W", "-..-": "X", + "-.--": "Y", "--..": "Z", "-----": "0", ".----": "1", "..---": "2", + "...--": "3", "....-": "4", ".....": "5", "-....": "6", "--...": "7", + "---..": "8", "----.": "9", "/": " ", + } + return "".join(_MORSE_REV.get(c, "?") for c in text.split()) + + # ─── Typer sub-commands ─────────────────────────────────────────────────────── @@ -194,6 +255,8 @@ def _apply_cipher(text: str, cipher_type: str, key: str, mode: str) -> str: return caesar_encode(text, shift) if mode == "encode" else caesar_decode(text, shift) elif ct == "rot13": return rot13_encode(text) + elif ct == "rot47": + return rot47_encode(text) elif ct == "atbash": return atbash_encode(text) elif ct == "vigenere": @@ -201,9 +264,42 @@ def _apply_cipher(text: str, cipher_type: str, key: str, mode: str) -> str: error("Vigenere key must be alphabetic.") raise typer.Exit(1) return vigenere_encode(text, key) if mode == "encode" else vigenere_decode(text, key) + elif ct == "base64": + return base64_encode(text) if mode == "encode" else base64_decode(text) + elif ct == "hex": + return hex_encode(text) if mode == "encode" else hex_decode(text) + elif ct == "binary": + return binary_encode(text) if mode == "encode" else binary_decode(text) + elif ct == "morse": + return morse_encode(text) if mode == "encode" else morse_decode(text) else: - error(f"Unknown cipher: {cipher_type}. Choose: caesar, vigenere, rot13, atbash") + error(f"Unknown cipher: {cipher_type}. Choose: caesar, vigenere, rot13, rot47, atbash, base64, hex, binary, morse") raise typer.Exit(1) except ValueError: error(f"Invalid key '{key}' for {cipher_type}.") raise typer.Exit(1) + + +@app.command("all") +def encode_all( + text: Annotated[str, typer.Argument(help="Text to encode in all formats.")], + json_out: Annotated[bool, typer.Option("--json")] = False, +) -> None: + """Encode text in ALL available formats at once.""" + results = { + "ROT13": rot13_encode(text), + "ROT47": rot47_encode(text), + "Caesar+13": caesar_encode(text, 13), + "Atbash": atbash_encode(text), + "Base64": base64_encode(text), + "Hex": hex_encode(text), + "Binary": binary_encode(text), + "Morse": morse_encode(text), + } + if json_out: + console.print_json(json.dumps({"input": text, "encodings": results})) + return + table = make_table("FORMAT", "OUTPUT", title=f"🔐 All encodings of '{text[:30]}'") + for fmt, out in results.items(): + table.add_row(f"[cyan]{fmt}[/cyan]", out[:80]) + console.print(table) diff --git a/devha/commands/devscanner.py b/devha/commands/devscanner.py new file mode 100644 index 0000000..85877a9 --- /dev/null +++ b/devha/commands/devscanner.py @@ -0,0 +1,211 @@ +"""Device Scanner — advanced port scan with OS detection and CVE lookup.""" + +from __future__ import annotations + +import asyncio +import json +import socket +import concurrent.futures +from typing import Annotated + +import httpx +import typer +from rich.progress import Progress, SpinnerColumn, BarColumn, TaskProgressColumn, TimeElapsedColumn + +from devha.ui import console, make_table, print_panel, info, warn, error, success +from devha.ethics import ethics_check + +# Common service banners / signatures +_SERVICE_PROBES = { + 21: b"", + 22: b"", + 25: b"", + 80: b"HEAD / HTTP/1.0\r\n\r\n", + 443: b"", + 3306: b"", + 5432: b"", + 6379: b"PING\r\n", + 27017: b"", +} + +_OS_TTLS = { + (60, 70): "Linux", + (120, 135): "Windows", + (250, 260): "macOS / BSD", + (50, 60): "Linux (low TTL)", +} + +_CVE_API = "https://services.nvd.nist.gov/rest/json/cves/2.0" + + +def _detect_os_by_ttl(ttl: int) -> str: + for (lo, hi), os_name in _OS_TTLS.items(): + if lo <= ttl <= hi: + return os_name + return "Unknown" + + +def _grab_banner(host: str, port: int, timeout: float = 2.0) -> str: + try: + probe = _SERVICE_PROBES.get(port, b"") + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.settimeout(timeout) + s.connect((host, port)) + if probe: + s.sendall(probe) + banner = s.recv(1024) + return banner.decode("utf-8", errors="ignore").strip()[:120] + except Exception: + return "" + + +def _scan_port(host: str, port: int, timeout: float) -> dict | None: + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.settimeout(timeout) + if s.connect_ex((host, port)) == 0: + try: + service = socket.getservbyport(port) + except OSError: + service = "unknown" + banner = _grab_banner(host, port, timeout) + return {"port": port, "service": service, "banner": banner} + except (socket.gaierror, OSError): + pass + return None + + +def _parse_port_range(ports: str) -> list[int]: + result = [] + for part in ports.split(","): + part = part.strip() + if "-" in part: + lo, hi = part.split("-", 1) + result.extend(range(int(lo), int(hi) + 1)) + else: + result.append(int(part)) + return result + + +async def _fetch_cves(keyword: str) -> list[dict]: + """Fetch CVEs from NVD NIST for a service keyword.""" + try: + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.get(_CVE_API, params={"keywordSearch": keyword, "resultsPerPage": 5}) + if resp.status_code == 200: + data = resp.json() + cves = [] + for item in data.get("vulnerabilities", []): + cve = item.get("cve", {}) + cve_id = cve.get("id", "?") + descs = cve.get("descriptions", []) + desc = next((d["value"] for d in descs if d["lang"] == "en"), "")[:100] + score = "?" + metrics = cve.get("metrics", {}) + for v in ["cvssMetricV31", "cvssMetricV30", "cvssMetricV2"]: + if v in metrics and metrics[v]: + score = metrics[v][0].get("cvssData", {}).get("baseScore", "?") + break + cves.append({"id": cve_id, "score": score, "desc": desc}) + return cves + except Exception: + pass + return [] + + +def devscanner( + target: Annotated[str, typer.Argument(help="Target IP or hostname.")], + ports: Annotated[str, typer.Option("--ports", "-p", help="Port range, e.g. 1-1024 or 22,80,443.")] = "1-1024", + threads: Annotated[int, typer.Option("--threads", help="Concurrent threads.")] = 150, + timeout: Annotated[float, typer.Option("--timeout", help="Socket timeout.")] = 0.8, + cve_lookup: Annotated[bool, typer.Option("--cve", help="Lookup CVEs for found services (requires internet).")] = False, + yes: Annotated[bool, typer.Option("--yes", "-y", help="Skip ethics confirmation.")] = False, + json_out: Annotated[bool, typer.Option("--json")] = False, +) -> None: + """ + Advanced port scanner with banner grabbing, OS detection, and CVE lookup. + + Example: + devha devscanner 192.168.1.1 + devha devscanner scanme.nmap.org --cve + devha devscanner 10.0.0.1 --ports 1-65535 --threads 300 + """ + ethics_check(target, yes=yes) + + try: + ip = socket.gethostbyname(target) + except socket.gaierror: + error(f"Cannot resolve: {target}") + raise typer.Exit(1) + + port_list = _parse_port_range(ports) + open_ports: list[dict] = [] + + info(f"Scanning [cyan]{target}[/cyan] ({ip}) ports=[cyan]{ports}[/cyan] threads=[cyan]{threads}[/cyan]\n") + + with Progress(SpinnerColumn(), "[cyan]Scanning...", BarColumn(), TaskProgressColumn(), TimeElapsedColumn(), console=console) as prog: + task = prog.add_task("scan", total=len(port_list)) + with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as ex: + futures = {ex.submit(_scan_port, ip, p, timeout): p for p in port_list} + for fut in concurrent.futures.as_completed(futures): + result = fut.result() + if result: + open_ports.append(result) + prog.advance(task) + + open_ports.sort(key=lambda x: x["port"]) + + # OS detection via ICMP TTL + os_guess = "Unknown" + try: + import os as _os + if _os.geteuid() == 0: + from scapy.all import IP, ICMP, sr1, conf as sc # type: ignore[import] + sc.verb = 0 + r = sr1(IP(dst=ip) / ICMP(), timeout=2) + if r: + os_guess = _detect_os_by_ttl(r.ttl) + except Exception: + pass + + # CVE lookup + cve_data: dict[str, list] = {} + if cve_lookup and open_ports: + info("Looking up CVEs for found services...") + services = list({p["service"] for p in open_ports if p["service"] != "unknown"}) + cve_data = {svc: asyncio.run(_fetch_cves(svc)) for svc in services[:5]} + + if json_out: + console.print_json(json.dumps({"target": target, "ip": ip, "os_guess": os_guess, "open_ports": open_ports, "cves": cve_data})) + return + + # Main results table + table = make_table("PORT", "SERVICE", "BANNER", title=f"🔌 Open ports on {target} ({ip})") + for p in open_ports: + banner_short = p["banner"][:60] + "…" if len(p["banner"]) > 60 else p["banner"] + table.add_row( + f"[cyan]{p['port']}[/cyan]", + f"[bright_green]{p['service']}[/bright_green]", + f"[dim]{banner_short}[/dim]", + ) + console.print(table) + + if os_guess != "Unknown": + console.print(f"\n[blue]OS guess:[/blue] [cyan]{os_guess}[/cyan] (based on TTL)") + + console.print(f"[bright_green]Found {len(open_ports)} open port(s).[/bright_green] [dim]⚡ Stay ethical[/dim]\n") + + # CVE results + if cve_data: + for svc, cves in cve_data.items(): + if cves: + cve_table = make_table("CVE ID", "SCORE", "DESCRIPTION", title=f"🔴 CVEs for {svc}") + for cve in cves: + score = float(cve["score"]) if str(cve["score"]).replace(".", "").isdigit() else 0 + score_style = "bright_red" if score >= 7 else "yellow" if score >= 4 else "bright_green" + cve_table.add_row( + f"[cyan]{cve['id']}[/cyan]", + f"[{score_style}]{cve['score']}[/{score_style}]", + cve["desc"], + ) + console.print(cve_table) diff --git a/devha/commands/packetlab.py b/devha/commands/packetlab.py new file mode 100644 index 0000000..354f713 --- /dev/null +++ b/devha/commands/packetlab.py @@ -0,0 +1,255 @@ +"""Packet Lab — live capture, ARP scan, TCP builder (your network only).""" + +from __future__ import annotations + +import json +import os +import socket +import time +from collections import defaultdict +from typing import Annotated + +import typer +from rich.live import Live +from rich.table import Table +from rich import box + +from devha.ui import console, make_table, print_panel, info, warn, error, success + +app = typer.Typer(help="📦 Packet Lab — capture, ARP scan, and build packets on your own network.") + + +def _need_root() -> None: + if os.geteuid() != 0: + error("Packet Lab requires root. Run: sudo devha packetlab ") + raise typer.Exit(1) + + +def _get_scapy(): + try: + from scapy.all import ( # type: ignore[import] + sniff, ARP, IP, TCP, UDP, ICMP, Ether, Raw, + get_if_list, conf as scapy_conf, + ) + scapy_conf.verb = 0 + return sniff, ARP, IP, TCP, UDP, ICMP, Ether, Raw, get_if_list + except ImportError: + error("Scapy not installed.") + raise typer.Exit(1) + + +# ─── Commands ────────────────────────────────────────────────────────────────── + +@app.command("capture") +def capture( + iface: Annotated[str, typer.Option("--iface", "-i", help="Network interface to capture on.")] = "", + count: Annotated[int, typer.Option("--count", "-c", help="Number of packets to capture (0 = infinite).")] = 50, + proto_filter: Annotated[str, typer.Option("--filter", "-f", help="Protocol filter: tcp, udp, icmp, arp, all.")] = "all", + json_out: Annotated[bool, typer.Option("--json")] = False, +) -> None: + """ + Capture live packets on your network interface. + + Only captures on your own network. Shows src, dst, protocol, size. + + Example: + sudo devha packetlab capture --count 20 + sudo devha packetlab capture --filter tcp --iface eth0 + """ + _need_root() + sniff, ARP, IP, TCP, UDP, ICMP, Ether, Raw, get_if_list = _get_scapy() + + bpf = "" if proto_filter == "all" else proto_filter + iface_arg = iface or None + + captured = [] + + def packet_handler(pkt): + rec = {"time": time.strftime("%H:%M:%S"), "proto": "?", "src": "?", "dst": "?", "len": len(pkt), "info": ""} + if pkt.haslayer(IP): + rec["src"] = pkt[IP].src + rec["dst"] = pkt[IP].dst + if pkt.haslayer(TCP): + rec["proto"] = "TCP" + rec["info"] = f"port {pkt[TCP].sport}→{pkt[TCP].dport} [{_tcp_flags(pkt[TCP].flags)}]" + elif pkt.haslayer(UDP): + rec["proto"] = "UDP" + rec["info"] = f"port {pkt[UDP].sport}→{pkt[UDP].dport}" + elif pkt.haslayer(ICMP): + rec["proto"] = "ICMP" + rec["info"] = f"type={pkt[ICMP].type}" + elif pkt.haslayer(ARP): + rec["proto"] = "ARP" + rec["src"] = pkt[ARP].psrc + rec["dst"] = pkt[ARP].pdst + rec["info"] = "who-has" if pkt[ARP].op == 1 else "is-at" + else: + rec["proto"] = pkt.name + captured.append(rec) + + info(f"Capturing [cyan]{count if count else '∞'}[/cyan] packets on [cyan]{iface or 'default'}[/cyan] filter=[cyan]{proto_filter}[/cyan]") + info("Press [bold]Ctrl+C[/bold] to stop early.\n") + + try: + sniff(iface=iface_arg, filter=bpf, count=count or 0, prn=packet_handler, store=False) + except KeyboardInterrupt: + pass + + if json_out: + console.print_json(json.dumps({"packets": captured})) + return + + table = make_table("TIME", "PROTO", "SRC", "DST", "LEN", "INFO", title=f"📦 Captured {len(captured)} packets") + proto_colors = {"TCP": "cyan", "UDP": "blue", "ICMP": "yellow", "ARP": "magenta"} + for r in captured: + c = proto_colors.get(r["proto"], "white") + table.add_row( + f"[dim]{r['time']}[/dim]", + f"[{c}]{r['proto']}[/{c}]", + r["src"], r["dst"], + str(r["len"]), + f"[dim]{r['info']}[/dim]", + ) + console.print(table) + + +def _tcp_flags(flags) -> str: + names = {0x01: "FIN", 0x02: "SYN", 0x04: "RST", 0x08: "PSH", 0x10: "ACK", 0x20: "URG"} + return "+".join(v for k, v in names.items() if int(flags) & k) or str(flags) + + +@app.command("arp-scan") +def arp_scan( + subnet: Annotated[str, typer.Option("--subnet", "-s", help="Subnet, e.g. 192.168.1.0/24")] = "", + json_out: Annotated[bool, typer.Option("--json")] = False, +) -> None: + """ + ARP scan to discover all devices on your local subnet. + + Example: + sudo devha packetlab arp-scan + sudo devha packetlab arp-scan --subnet 10.0.0.0/24 + """ + _need_root() + sniff, ARP, IP, TCP, UDP, ICMP, Ether, Raw, get_if_list = _get_scapy() + from scapy.all import srp # type: ignore[import] + + if not subnet: + hostname = socket.gethostname() + local_ip = socket.gethostbyname(hostname) + subnet = ".".join(local_ip.split(".")[:3]) + ".0/24" + + info(f"ARP scanning [cyan]{subnet}[/cyan]...") + + arp_req = Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst=subnet) + answered, _ = srp(arp_req, timeout=3, retry=1, verbose=False) + + devices = [] + for _, recv in answered: + try: + hostname = socket.gethostbyaddr(recv.psrc)[0] + except socket.herror: + hostname = "?" + devices.append({"ip": recv.psrc, "mac": recv.hwsrc, "hostname": hostname}) + + devices.sort(key=lambda x: [int(p) for p in x["ip"].split(".")]) + + if json_out: + console.print_json(json.dumps({"subnet": subnet, "devices": devices})) + return + + table = make_table("IP ADDRESS", "MAC ADDRESS", "HOSTNAME", title=f"🖥️ ARP Scan — {subnet}") + for d in devices: + table.add_row(f"[cyan]{d['ip']}[/cyan]", f"[dim]{d['mac']}[/dim]", d["hostname"]) + console.print(table) + console.print(f"\n[bright_green]Found {len(devices)} device(s).[/bright_green]") + + +@app.command("build") +def build_packet( + dst: Annotated[str, typer.Argument(help="Destination IP (your own test target).")], + proto: Annotated[str, typer.Option("--proto", "-p", help="Protocol: tcp, udp, icmp.")] = "icmp", + dport: Annotated[int, typer.Option("--dport", help="Destination port (TCP/UDP).")] = 80, + payload: Annotated[str, typer.Option("--payload", help="Payload string.")] = "devha-test", + count: Annotated[int, typer.Option("--count", "-n", help="Number of packets.")] = 1, +) -> None: + """ + Build and send a custom packet to your own test system. + + Educational — shows what raw packets look like. + + Example: + sudo devha packetlab build 127.0.0.1 --proto icmp + sudo devha packetlab build 192.168.1.1 --proto tcp --dport 80 + """ + _need_root() + sniff, ARP, IP, TCP, UDP, ICMP, Ether, Raw, get_if_list = _get_scapy() + from scapy.all import send # type: ignore[import] + + warn("⚡ Sending to your own system only — educational packet builder") + + if proto == "icmp": + pkt = IP(dst=dst) / ICMP() / Raw(load=payload.encode()) + elif proto == "tcp": + pkt = IP(dst=dst) / TCP(dport=dport, flags="S") / Raw(load=payload.encode()) + elif proto == "udp": + pkt = IP(dst=dst) / UDP(dport=dport) / Raw(load=payload.encode()) + else: + error(f"Unknown protocol: {proto}") + raise typer.Exit(1) + + info(f"Packet summary: [cyan]{pkt.summary()}[/cyan]") + console.print() + console.print(pkt.show(dump=True)) + + send(pkt, count=count, verbose=False) + success(f"Sent {count} x {proto.upper()} packet(s) to {dst}.") + + +@app.command("stats") +def network_stats( + iface: Annotated[str, typer.Option("--iface", "-i", help="Interface to monitor.")] = "", + duration: Annotated[int, typer.Option("--duration", "-d", help="Monitoring duration in seconds.")] = 10, +) -> None: + """ + Live traffic statistics — count packets by protocol. + + Example: + sudo devha packetlab stats --duration 15 + """ + _need_root() + sniff, ARP, IP, TCP, UDP, ICMP, Ether, Raw, get_if_list = _get_scapy() + + counts: dict[str, int] = defaultdict(int) + bytes_: dict[str, int] = defaultdict(int) + start = time.time() + + def handler(pkt): + if pkt.haslayer(TCP): + k = "TCP" + elif pkt.haslayer(UDP): + k = "UDP" + elif pkt.haslayer(ICMP): + k = "ICMP" + elif pkt.haslayer(ARP): + k = "ARP" + else: + k = "Other" + counts[k] += 1 + bytes_[k] += len(pkt) + + info(f"Monitoring traffic for [cyan]{duration}s[/cyan]...") + sniff(iface=iface or None, prn=handler, timeout=duration, store=False) + + table = make_table("PROTOCOL", "PACKETS", "BYTES", "% TRAFFIC", title=f"📊 Traffic Stats ({duration}s)") + total = max(sum(counts.values()), 1) + for proto in sorted(counts, key=lambda k: -counts[k]): + pct = counts[proto] / total * 100 + bar = "█" * int(pct / 5) + "░" * (20 - int(pct / 5)) + table.add_row( + f"[cyan]{proto}[/cyan]", + str(counts[proto]), + f"{bytes_[proto]:,} B", + f"{bar} {pct:.1f}%", + ) + console.print(table) diff --git a/devha/commands/passlab.py b/devha/commands/passlab.py new file mode 100644 index 0000000..eb7d547 --- /dev/null +++ b/devha/commands/passlab.py @@ -0,0 +1,280 @@ +"""Password Lab — hash identification, cracking, strength meter, generator.""" + +from __future__ import annotations + +import hashlib +import json +import random +import re +import string +import struct +from typing import Annotated + +import typer +from rich.progress import Progress, SpinnerColumn, BarColumn, TaskProgressColumn + +from devha.ui import console, make_table, print_panel, info, warn, error, success + +app = typer.Typer(help="🔑 Password Lab — hash ID, crack, strength meter, generator.") + +# ─── Hash patterns ───────────────────────────────────────────────────────────── + +_HASH_PATTERNS = [ + (r"^[a-f0-9]{32}$", "MD5"), + (r"^[a-f0-9]{40}$", "SHA-1"), + (r"^[a-f0-9]{56}$", "SHA-224"), + (r"^[a-f0-9]{64}$", "SHA-256"), + (r"^[a-f0-9]{96}$", "SHA-384"), + (r"^[a-f0-9]{128}$", "SHA-512"), + (r"^\$2[ayb]\$.{56}$", "bcrypt"), + (r"^\$1\$.{1,8}\$.{22}$", "MD5-crypt"), + (r"^\$5\$.*\$.{43}$", "SHA-256-crypt"), + (r"^\$6\$.*\$.{86}$", "SHA-512-crypt"), + (r"^[a-f0-9]{8}:[a-f0-9]{40}$", "SHA-1+salt"), + (r"^[A-Za-z0-9+/]{24}$", "Base64 (possible MD5)"), + (r"^[A-Z]{13}$", "ROT13"), + (r"^[a-f0-9]{16}$", "MySQL 3.x / DES"), + (r"^\*[A-F0-9]{40}$", "MySQL 4.1+"), + (r"^[a-zA-Z0-9]{13}$", "DES-crypt"), +] + +_HASH_FUNCS = { + "MD5": lambda p: hashlib.md5(p).hexdigest(), + "SHA-1": lambda p: hashlib.sha1(p).hexdigest(), + "SHA-224": lambda p: hashlib.sha224(p).hexdigest(), + "SHA-256": lambda p: hashlib.sha256(p).hexdigest(), + "SHA-384": lambda p: hashlib.sha384(p).hexdigest(), + "SHA-512": lambda p: hashlib.sha512(p).hexdigest(), +} + +_COMMON_PASSWORDS = [ + "password", "123456", "password123", "admin", "letmein", "qwerty", + "monkey", "dragon", "master", "abc123", "iloveyou", "1234567890", + "sunshine", "princess", "welcome", "shadow", "superman", "michael", + "football", "baseball", "trustno1", "passw0rd", "p@ssword", "hunter2", +] + + +# ─── Commands ────────────────────────────────────────────────────────────────── + +@app.command("identify") +def identify_hash( + hash_val: Annotated[str, typer.Argument(help="Hash string to identify.")], + json_out: Annotated[bool, typer.Option("--json")] = False, +) -> None: + """ + Identify the type of a hash string. + + Example: + devha passlab identify 5f4dcc3b5aa765d61d8327deb882cf99 + """ + matches = [] + for pattern, name in _HASH_PATTERNS: + if re.match(pattern, hash_val, re.IGNORECASE): + matches.append(name) + + if json_out: + console.print_json(json.dumps({"hash": hash_val, "possible_types": matches})) + return + + if not matches: + warn(f"Could not identify hash: [cyan]{hash_val}[/cyan]") + return + + table = make_table("POSSIBLE HASH TYPE", "LENGTH", title="🔍 Hash Identification") + for m in matches: + table.add_row(f"[bright_green]{m}[/bright_green]", str(len(hash_val))) + console.print(table) + + +@app.command("crack") +def crack_hash( + hash_val: Annotated[str, typer.Argument(help="Hash to crack.")], + wordlist: Annotated[str, typer.Option("--wordlist", "-w", help="Path to wordlist file (one password per line).")] = "", + hash_type: Annotated[str, typer.Option("--type", "-t", help="Hash type: md5, sha1, sha256, sha512 (auto-detect if not set).")] = "", + json_out: Annotated[bool, typer.Option("--json")] = False, +) -> None: + """ + Crack a hash using a wordlist or built-in common passwords. + + Example: + devha passlab crack 5f4dcc3b5aa765d61d8327deb882cf99 + devha passlab crack --wordlist /usr/share/wordlists/rockyou.txt + """ + # Auto-detect type + if not hash_type: + for pattern, name in _HASH_PATTERNS: + if re.match(pattern, hash_val, re.IGNORECASE) and name in _HASH_FUNCS: + hash_type = name + break + + if not hash_type or hash_type.upper() not in _HASH_FUNCS: + # Try all supported types + candidates = [t for t in _HASH_FUNCS if t != "bcrypt"] + else: + candidates = [hash_type.upper()] + + # Load wordlist + if wordlist: + try: + with open(wordlist, encoding="utf-8", errors="ignore") as f: + words = [line.strip() for line in f if line.strip()] + except FileNotFoundError: + error(f"Wordlist not found: {wordlist}") + raise typer.Exit(1) + else: + words = list(_COMMON_PASSWORDS) + warn(f"No wordlist specified — using {len(words)} common passwords. Use --wordlist for better results.") + + info(f"Cracking [cyan]{hash_val[:20]}...[/cyan] with {len(words)} words...") + + found = None + with Progress(SpinnerColumn(), "[cyan]Cracking...", BarColumn(), TaskProgressColumn(), console=console) as prog: + task = prog.add_task("crack", total=len(words) * len(candidates)) + for word in words: + for htype in candidates: + h = _HASH_FUNCS[htype](word.encode()) + if h.lower() == hash_val.lower(): + found = (word, htype) + prog.update(task, completed=prog.tasks[task].total) + break + if found: + break + prog.advance(task, len(candidates)) + + if json_out: + console.print_json(json.dumps({"hash": hash_val, "found": found[0] if found else None, "type": found[1] if found else None})) + return + + if found: + success(f"CRACKED! Hash type: [cyan]{found[1]}[/cyan]") + print_panel(f"[bold bright_green]Password: {found[0]}[/bold bright_green]", title="💥 Cracked!", style="bright_green") + else: + warn("Not found in wordlist. Try a larger wordlist like rockyou.txt") + + +@app.command("strength") +def check_strength( + password: Annotated[str, typer.Argument(help="Password to check.")], + json_out: Annotated[bool, typer.Option("--json")] = False, +) -> None: + """ + Check password strength with a detailed score breakdown. + + Example: + devha passlab strength "MyP@ssw0rd!" + """ + score = 0 + checks = [] + + def chk(name: str, passed: bool, points: int, tip: str = "") -> None: + nonlocal score + if passed: + score += points + checks.append({"name": name, "passed": passed, "points": points if passed else 0, "tip": tip}) + + chk("Length ≥ 8", len(password) >= 8, 10, "Use at least 8 characters") + chk("Length ≥ 12", len(password) >= 12, 10, "Use at least 12 characters") + chk("Length ≥ 16", len(password) >= 16, 10, "Use at least 16 characters") + chk("Uppercase", any(c.isupper() for c in password), 15, "Add uppercase letters (A-Z)") + chk("Lowercase", any(c.islower() for c in password), 15, "Add lowercase letters (a-z)") + chk("Digits", any(c.isdigit() for c in password), 15, "Add numbers (0-9)") + chk("Symbols", any(c in string.punctuation for c in password), 20, "Add symbols (!@#$%)") + chk("Not common", password.lower() not in _COMMON_PASSWORDS, 15, "Avoid common passwords") + + max_score = sum(c["points"] + (c["points"] if not c["passed"] else 0) for c in checks) + pct = int(score / 85 * 100) + + if pct >= 80: + label, style = "STRONG 💪", "bright_green" + elif pct >= 55: + label, style = "MEDIUM ⚠️", "yellow" + else: + label, style = "WEAK 💀", "bright_red" + + if json_out: + console.print_json(json.dumps({"password": "*" * len(password), "score": score, "percent": pct, "rating": label, "checks": checks})) + return + + table = make_table("CHECK", "RESULT", "POINTS", title="🔑 Password Strength") + for c in checks: + status = "[bright_green]✔[/bright_green]" if c["passed"] else "[bright_red]✘[/bright_red]" + pts = f"[bright_green]+{c['points']}[/bright_green]" if c["passed"] else f"[dim]0[/dim]" + tip = f"[dim] — {c['tip']}[/dim]" if not c["passed"] and c["tip"] else "" + table.add_row(c["name"] + tip, status, pts) + console.print(table) + + bar = "█" * int(pct / 5) + "░" * (20 - int(pct / 5)) + console.print(f"\n[{style}] {bar} {pct}% {label}[/{style}]") + + +@app.command("generate") +def generate_password( + length: Annotated[int, typer.Option("--length", "-l", help="Password length.")] = 20, + count: Annotated[int, typer.Option("--count", "-n", help="Number of passwords to generate.")] = 5, + no_symbols: Annotated[bool, typer.Option("--no-symbols", help="Exclude special characters.")] = False, + memorable: Annotated[bool, typer.Option("--memorable", help="Generate word-based passphrase.")] = False, + json_out: Annotated[bool, typer.Option("--json")] = False, +) -> None: + """ + Generate strong random passwords or passphrases. + + Example: + devha passlab generate --length 24 --count 3 + devha passlab generate --memorable + """ + _WORDS = [ + "correct", "horse", "battery", "staple", "purple", "monkey", + "dishwasher", "rainbow", "quantum", "laser", "cyber", "pixel", + "shadow", "thunder", "falcon", "vector", "matrix", "ninja", + "cosmic", "eclipse", "phoenix", "dragon", "titan", "nova", + ] + + passwords = [] + charset = string.ascii_letters + string.digits + ("" if no_symbols else "!@#$%^&*-_=+") + + for _ in range(count): + if memorable: + pw = "-".join(random.choices(_WORDS, k=4)) + str(random.randint(10, 99)) + else: + pw = "".join(random.choices(charset, k=length)) + passwords.append(pw) + + if json_out: + console.print_json(json.dumps({"passwords": passwords})) + return + + table = make_table("#", "PASSWORD", "LENGTH", title="🎲 Generated Passwords") + for i, pw in enumerate(passwords, 1): + table.add_row(str(i), f"[bright_green]{pw}[/bright_green]", str(len(pw))) + console.print(table) + console.print("[dim]Tip: use a password manager to store these.[/dim]") + + +@app.command("hash") +def hash_text( + text: Annotated[str, typer.Argument(help="Text to hash.")], + algo: Annotated[str, typer.Option("--algo", "-a", help="Algorithm: md5, sha1, sha256, sha512, all.")] = "all", + json_out: Annotated[bool, typer.Option("--json")] = False, +) -> None: + """ + Hash a string with common algorithms. + + Example: + devha passlab hash "hello world" --algo sha256 + devha passlab hash "password" --algo all + """ + algos = list(_HASH_FUNCS.keys()) if algo == "all" else [algo.upper()] + results = {} + for a in algos: + if a in _HASH_FUNCS: + results[a] = _HASH_FUNCS[a](text.encode()) + + if json_out: + console.print_json(json.dumps({"input": text, "hashes": results})) + return + + table = make_table("ALGORITHM", "HASH", title=f"🔒 Hashes of '{text[:30]}'") + for a, h in results.items(): + table.add_row(f"[cyan]{a}[/cyan]", h) + console.print(table) diff --git a/devha/commands/wifilab.py b/devha/commands/wifilab.py new file mode 100644 index 0000000..77b7cca --- /dev/null +++ b/devha/commands/wifilab.py @@ -0,0 +1,337 @@ +"""WiFi Lab — scan, map devices, test own network security.""" + +from __future__ import annotations + +import json +import os +import platform +import re +import socket +import subprocess +import time +from typing import Annotated + +import typer +from rich.progress import Progress, SpinnerColumn, BarColumn, TimeElapsedColumn +from rich.text import Text + +from devha.ui import console, make_table, print_panel, info, warn, error, success + +app = typer.Typer(help="📡 WiFi Lab — scan networks, map devices, test your own AP security.") + + +# ─── Helpers ───────────────────────────────────────────────────────────────── + +def _run(cmd: list[str], timeout: int = 15) -> str: + try: + r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) + return r.stdout + r.stderr + except (FileNotFoundError, subprocess.TimeoutExpired): + return "" + + +def _get_interface() -> str: + out = _run(["iwconfig"]) + for line in out.splitlines(): + if "IEEE 802.11" in line or "ESSID" in line: + return line.split()[0] + out2 = _run(["nmcli", "-t", "-f", "DEVICE,TYPE", "device"]) + for line in out2.splitlines(): + if "wifi" in line.lower(): + return line.split(":")[0] + return "wlan0" + + +def _resolve_mac(ip: str) -> str: + """Try to get MAC from ARP table.""" + try: + out = _run(["arp", "-n", ip]) + m = re.search(r"(([0-9a-fA-F]{2}[:\-]){5}[0-9a-fA-F]{2})", out) + return m.group(0) if m else "?" + except Exception: + return "?" + + +def _vendor_from_mac(mac: str) -> str: + """Simple OUI lookup — first 3 octets.""" + oui_map = { + "00:50:56": "VMware", "00:0c:29": "VMware", "08:00:27": "VirtualBox", + "b8:27:eb": "Raspberry Pi", "dc:a6:32": "Raspberry Pi", "e4:5f:01": "Raspberry Pi", + "00:11:22": "Cimsys", "fc:fb:fb": "Cisco", "00:1a:2b": "Cisco", + "ac:37:43": "Apple", "f8:ff:c2": "Apple", "3c:15:c2": "Apple", + "00:50:f2": "Microsoft", "28:d2:44": "Samsung", "98:01:a7": "Apple", + } + prefix = mac[:8].lower().replace("-", ":") + return oui_map.get(prefix, "Unknown") + + +# ─── Commands ───────────────────────────────────────────────────────────────── + +@app.command("scan") +def scan_networks( + json_out: Annotated[bool, typer.Option("--json")] = False, +) -> None: + """Scan nearby WiFi networks and show signal strength.""" + info("Scanning WiFi networks...") + + os_name = platform.system() + networks = [] + + if os_name == "Linux": + out = _run(["nmcli", "-t", "-f", "SSID,SIGNAL,SECURITY,CHAN,BSSID", "device", "wifi", "list"]) + for line in out.strip().splitlines(): + parts = line.split(":") + if len(parts) >= 3: + networks.append({ + "ssid": parts[0] or "", + "signal": parts[1], + "security": parts[2] or "Open", + "channel": parts[3] if len(parts) > 3 else "?", + "bssid": parts[4] if len(parts) > 4 else "?", + }) + elif os_name == "Darwin": + airport = "/System/Library/PrivateFrameworks/Apple80211.framework/Versions/Current/Resources/airport" + out = _run([airport, "-s"]) + for line in out.strip().splitlines()[1:]: + p = line.split() + if len(p) >= 5: + networks.append({"ssid": p[0], "signal": p[2], "security": p[-1], "channel": p[3], "bssid": p[1]}) + elif os_name == "Windows": + out = _run(["netsh", "wlan", "show", "networks", "mode=Bssid"]) + current: dict = {} + for line in out.splitlines(): + line = line.strip() + if line.startswith("SSID") and "BSSID" not in line: + if current: + networks.append(current) + current = {"ssid": line.split(":", 1)[-1].strip(), "signal": "?", "security": "?", "channel": "?", "bssid": "?"} + elif "Signal" in line and current: + current["signal"] = line.split(":", 1)[-1].strip() + elif "Authentication" in line and current: + current["security"] = line.split(":", 1)[-1].strip() + elif "BSSID" in line and current: + current["bssid"] = line.split(":", 1)[-1].strip() + if current: + networks.append(current) + + networks.sort(key=lambda x: -int(x["signal"].replace("%", "").strip()) if x["signal"].replace("%", "").strip().lstrip("-").isdigit() else 0) + + if json_out: + console.print_json(json.dumps(networks)) + return + + table = make_table("SSID", "SIGNAL", "SECURITY", "CHANNEL", "BSSID", title="📡 WiFi Networks") + for n in networks: + sig = n["signal"] + sec = n["security"] + sec_style = "bright_red bold" if sec == "Open" else "bright_green" if "WPA3" in sec else "yellow" + bar = _signal_bar(sig) + table.add_row( + f"[cyan]{n['ssid']}[/cyan]", + f"[dim]{bar}[/dim] {sig}", + f"[{sec_style}]{sec}[/{sec_style}]", + n["channel"], + f"[dim]{n['bssid']}[/dim]", + ) + console.print(table) + console.print(f"\n[dim]⚡ {len(networks)} networks found — read-only scan[/dim]") + + +def _signal_bar(sig: str) -> str: + try: + val = int(sig.replace("%", "").strip()) + val = val if val > 0 else 100 + val # dBm to percentage + blocks = int(val / 20) + return "█" * blocks + "░" * (5 - blocks) + except Exception: + return "░░░░░" + + +@app.command("devices") +def scan_devices( + subnet: Annotated[str, typer.Option("--subnet", "-s", help="Subnet to scan, e.g. 192.168.1.0/24")] = "", + json_out: Annotated[bool, typer.Option("--json")] = False, +) -> None: + """ + Discover all devices on your local network using ARP. + + Only works on your own network. Requires root/admin. + + Example: + sudo devha wifilab devices + sudo devha wifilab devices --subnet 192.168.0.0/24 + """ + if os.geteuid() != 0: + error("Device scan requires root. Run: sudo devha wifilab devices") + raise typer.Exit(1) + + try: + from scapy.all import ARP, Ether, srp, conf as scapy_conf # type: ignore[import] + scapy_conf.verb = 0 + except ImportError: + error("Scapy not installed.") + raise typer.Exit(1) + + if not subnet: + # Auto-detect local subnet + hostname = socket.gethostname() + local_ip = socket.gethostbyname(hostname) + subnet = ".".join(local_ip.split(".")[:3]) + ".0/24" + + info(f"ARP scanning [cyan]{subnet}[/cyan]...") + + arp = ARP(pdst=subnet) + ether = Ether(dst="ff:ff:ff:ff:ff:ff") + packet = ether / arp + + with Progress(SpinnerColumn(), "[cyan]Scanning...", TimeElapsedColumn(), console=console) as p: + p.add_task("scan", total=None) + answered, _ = srp(packet, timeout=3, retry=1) + + devices = [] + for _, recv in answered: + mac = recv.hwsrc + ip = recv.psrc + vendor = _vendor_from_mac(mac) + try: + hostname = socket.gethostbyaddr(ip)[0] + except socket.herror: + hostname = "?" + devices.append({"ip": ip, "mac": mac, "vendor": vendor, "hostname": hostname}) + + devices.sort(key=lambda x: [int(p) for p in x["ip"].split(".")]) + + if json_out: + console.print_json(json.dumps({"subnet": subnet, "devices": devices})) + return + + table = make_table("IP", "MAC", "VENDOR", "HOSTNAME", title=f"🖥️ Devices on {subnet}") + for d in devices: + table.add_row( + f"[cyan]{d['ip']}[/cyan]", + f"[dim]{d['mac']}[/dim]", + f"[yellow]{d['vendor']}[/yellow]", + d["hostname"], + ) + console.print(table) + console.print(f"\n[bright_green]Found {len(devices)} device(s) on {subnet}.[/bright_green]") + console.print("[dim]⚡ ARP scan — your network only[/dim]") + + +@app.command("security") +def security_check( + target: Annotated[str, typer.Argument(help="Router IP, e.g. 192.168.1.1")] = "192.168.1.1", + json_out: Annotated[bool, typer.Option("--json")] = False, +) -> None: + """ + Check your router/AP for common security weaknesses. + + Tests: open ports, Telnet, default HTTP login page, WPS indicator. + + Example: + devha wifilab security 192.168.1.1 + """ + import socket as _socket + info(f"Security checking [cyan]{target}[/cyan]...") + + checks = [] + + # Check common router ports + dangerous_ports = { + 23: ("Telnet", "HIGH — unencrypted admin access"), + 80: ("HTTP admin", "MEDIUM — check if default password is set"), + 443: ("HTTPS admin", "LOW — encrypted, but change default password"), + 8080: ("HTTP alt", "MEDIUM — alternative HTTP admin panel"), + 8443: ("HTTPS alt", "LOW"), + 21: ("FTP", "HIGH — unencrypted file access"), + 22: ("SSH", "LOW — encrypted, ensure strong password"), + } + + for port, (svc, risk) in dangerous_ports.items(): + try: + with _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) as s: + s.settimeout(1.0) + open_ = s.connect_ex((target, port)) == 0 + except OSError: + open_ = False + if open_: + checks.append({"port": port, "service": svc, "risk": risk, "status": "OPEN"}) + + # Try HTTP login page detection + try: + import httpx + r = httpx.get(f"http://{target}", timeout=3, follow_redirects=True) + body = r.text.lower() + if any(w in body for w in ["password", "username", "login", "admin"]): + checks.append({"port": 80, "service": "HTTP Login Page", "risk": "MEDIUM — check default credentials", "status": "DETECTED"}) + except Exception: + pass + + if json_out: + console.print_json(json.dumps({"target": target, "findings": checks})) + return + + if not checks: + success(f"No common vulnerabilities found on {target}") + return + + table = make_table("PORT", "SERVICE", "STATUS", "RISK", title=f"🛡️ Security Check — {target}") + for c in checks: + risk = c["risk"] + risk_style = "bright_red" if "HIGH" in risk else "yellow" if "MEDIUM" in risk else "dim" + table.add_row( + f"[cyan]{c['port']}[/cyan]", + c["service"], + f"[bright_red]{c['status']}[/bright_red]", + f"[{risk_style}]{risk}[/{risk_style}]", + ) + console.print(table) + console.print("\n[dim]⚡ Tested against your own router only — change defaults immediately[/dim]") + + +@app.command("deauth-test") +def deauth_test( + bssid: Annotated[str, typer.Argument(help="Your own AP BSSID (MAC address), e.g. AA:BB:CC:DD:EE:FF")], + client: Annotated[str, typer.Option("--client", "-c", help="Client MAC to test (default: broadcast)")] = "ff:ff:ff:ff:ff:ff", + count: Annotated[int, typer.Option("--count", "-n", help="Number of deauth frames to send.")] = 5, + iface: Annotated[str, typer.Option("--iface", "-i", help="Wireless interface in monitor mode.")] = "wlan0mon", +) -> None: + """ + Send deauth frames to YOUR OWN access point to test its resilience. + + ⚠️ Only use on your own AP. Requires monitor mode + root. + Set up monitor mode first: sudo airmon-ng start wlan0 + + Example: + sudo devha wifilab deauth-test AA:BB:CC:DD:EE:FF --iface wlan0mon + """ + if os.geteuid() != 0: + error("Deauth test requires root: sudo devha wifilab deauth-test ...") + raise typer.Exit(1) + + warn("⚡ Deauth test — YOUR OWN AP only. Illegal on others' networks.") + + try: + from scapy.all import RadioTap, Dot11, Dot11Deauth, sendp, conf as scapy_conf # type: ignore[import] + scapy_conf.verb = 0 + except ImportError: + error("Scapy not installed.") + raise typer.Exit(1) + + info(f"Sending {count} deauth frames to [cyan]{bssid}[/cyan] via [cyan]{iface}[/cyan]...") + + pkt = ( + RadioTap() + / Dot11(addr1=client, addr2=bssid, addr3=bssid) + / Dot11Deauth(reason=7) + ) + + with Progress(SpinnerColumn(), "[cyan]Sending deauth frames...", BarColumn(), TimeElapsedColumn(), console=console) as p: + task = p.add_task("send", total=count) + for i in range(count): + sendp(pkt, iface=iface, count=1, inter=0.1) + p.advance(task) + + success(f"Sent {count} deauth frames to {bssid}.") + console.print("[dim]If your own devices disconnected — your AP responded correctly to deauth.[/dim]") + console.print("[dim]Enable 802.11w (Management Frame Protection) on your router to block this.[/dim]") diff --git a/devha/fx.py b/devha/fx.py new file mode 100644 index 0000000..44a22bc --- /dev/null +++ b/devha/fx.py @@ -0,0 +1,113 @@ +"""Movie-hacker visual effects — glitch text, typing animation, matrix.""" + +from __future__ import annotations + +import random +import sys +import time + +from rich.console import Console +from rich.live import Live +from rich.text import Text + +console = Console() + +_GLITCH_CHARS = "!@#$%^&*<>?/|\\[]{}~`ハミヒーウシナモニ01" +_MATRIX_CHARS = "ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ0123456789" + + +def glitch_print(text: str, iterations: int = 6, delay: float = 0.06) -> None: + """Print text with a glitch effect that resolves to the real text.""" + chars = list(text) + resolved = [False] * len(chars) + + with Live(console=console, refresh_per_second=20) as live: + for _ in range(iterations): + display = Text() + for i, ch in enumerate(chars): + if resolved[i] or ch == " ": + display.append(ch, "bold bright_green" if resolved[i] else "") + else: + display.append(random.choice(_GLITCH_CHARS), "bold bright_red") + live.update(display) + time.sleep(delay) + + # Resolve random characters each round + unresolve = [i for i, r in enumerate(resolved) if not r and chars[i] != " "] + for i in random.sample(unresolve, min(len(unresolve), max(1, len(unresolve) // 3))): + resolved[i] = True + + # Final resolved text + live.update(Text(text, style="bold bright_green")) + + +def typing_print(text: str, delay: float = 0.03, style: str = "bright_green") -> None: + """Print text character by character like a movie hacker.""" + for ch in text: + console.print(ch, end="", style=style) + sys.stdout.flush() + jitter = delay + random.uniform(-delay * 0.3, delay * 0.3) + time.sleep(max(0.005, jitter)) + console.print() + + +def matrix_splash(duration: float = 2.0, width: int = 60, height: int = 15) -> None: + """Display a matrix rain splash screen.""" + columns = [[random.choice(_MATRIX_CHARS) for _ in range(height)] for _ in range(width)] + offsets = [random.randint(0, height) for _ in range(width)] + speeds = [random.uniform(0.5, 2.0) for _ in range(width)] + ticks = [0.0] * width + + start = time.time() + with Live(console=console, refresh_per_second=15) as live: + while time.time() - start < duration: + now = time.time() - start + lines = [] + for row in range(height): + line = Text() + for col in range(width): + ch_row = (offsets[col] + row) % height + ch = columns[col][ch_row] + dist = (row - offsets[col]) % height + if dist == 0: + line.append(ch, "bold bright_white") + elif dist < 3: + line.append(ch, "bold bright_green") + elif dist < 8: + line.append(ch, "green") + else: + line.append(ch, "dim green") + lines.append(line) + + # Scroll columns + for col in range(width): + ticks[col] += 0.07 + if ticks[col] >= 1.0 / speeds[col]: + ticks[col] = 0 + offsets[col] = (offsets[col] + 1) % height + columns[col][random.randint(0, height - 1)] = random.choice(_MATRIX_CHARS) + + combined = Text("\n").join(lines) + live.update(combined) + time.sleep(0.05) + + +def hacker_boot(target: str = "") -> None: + """Cinematic boot sequence — movie-style hacker startup.""" + lines = [ + "[dim cyan]Initializing devha Hacking Studio...[/dim cyan]", + "[dim cyan]Loading modules: portscan · wifi · osint · crypto · passlab · packetlab[/dim cyan]", + f"[dim cyan]Target: [bold]{target or 'interactive mode'}[/bold][/dim cyan]", + "[bright_green][ OK ] All systems nominal[/bright_green]", + "[bright_green][ OK ] Ethics module loaded — own systems only[/bright_green]", + ] + + matrix_splash(1.2, width=70, height=8) + + for line in lines: + console.print(line) + time.sleep(0.15) + + time.sleep(0.2) + glitch_print(" DEVHA HACKING STUDIO v2.0 ", iterations=8, delay=0.05) + time.sleep(0.3) diff --git a/devha/studio.py b/devha/studio.py new file mode 100644 index 0000000..c83e803 --- /dev/null +++ b/devha/studio.py @@ -0,0 +1,221 @@ +"""devha Hacking Studio — Textual TUI main menu.""" + +from __future__ import annotations + +import asyncio +import random +import time +from typing import Callable + +from rich.text import Text +from textual.app import App, ComposeResult +from textual.binding import Binding +from textual.containers import Container, Horizontal, Vertical +from textual.reactive import reactive +from textual.widgets import Footer, Header, Label, Static, RichLog + +# ─── Matrix rain effect ─────────────────────────────────────────────────────── + +_MATRIX_CHARS = "ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ01アイウエオカキクケコ" + +_MENU_ITEMS = [ + ("1", "🔌", "Network Scanner", "Port scan · OS detect · CVE lookup"), + ("2", "📡", "WiFi Lab", "Scan · Device map · Security test"), + ("3", "👤", "OSINT Suite", "Username · Harvest · Subdomains"), + ("4", "🔐", "Crypto & Cipher", "Encode · Decode · Crack · Steg"), + ("5", "🕸️ ", "Web Recon", "Dirscan · Headers · Crawl"), + ("6", "🔑", "Password Lab", "Hash ID · Crack · Generator"), + ("7", "📦", "Packet Lab", "Capture · ARP scan · Builder"), + ("8", "🛡️ ", "Headers Audit", "Security score · Missing headers"), + ("9", "🏓", "Ping & Trace", "ICMP · Traceroute · Latency"), +] + +_BANNER = """[bold cyan] + ██████╗ ███████╗██╗ ██╗██╗ ██╗ █████╗ + ██╔══██╗██╔════╝██║ ██║██║ ██║██╔══██╗ + ██║ ██║█████╗ ██║ ██║███████║███████║ + ██║ ██║██╔══╝ ╚██╗ ██╔╝██╔══██║██╔══██║ + ██████╔╝███████╗ ╚████╔╝ ██║ ██║██║ ██║ + ╚═════╝ ╚══════╝ ╚═══╝ ╚═╝ ╚═╝╚═╝ ╚═╝[/bold cyan] +[dim cyan] Hacking Studio v2.0 · Ethical use only · Own systems only[/dim cyan]""" + + +class MatrixColumn(Static): + """Single column of falling matrix characters.""" + + DEFAULT_CSS = """ + MatrixColumn { + width: 1; + color: green; + text-opacity: 60%; + } + """ + + chars: reactive[str] = reactive("") + + def on_mount(self) -> None: + self.col_chars = [random.choice(_MATRIX_CHARS) for _ in range(40)] + self.offset = random.randint(0, 30) + self.speed = random.uniform(0.05, 0.15) + self.set_interval(self.speed, self._tick) + + def _tick(self) -> None: + self.offset = (self.offset + 1) % 40 + self.col_chars[random.randint(0, 39)] = random.choice(_MATRIX_CHARS) + self.chars = "\n".join(self.col_chars) + + def render(self) -> Text: + t = Text() + chars = self.col_chars + for i, ch in enumerate(chars): + dist = (i - self.offset) % len(chars) + if dist == 0: + t.append(ch, "bold bright_white") + elif dist < 4: + t.append(ch, "bold bright_green") + elif dist < 10: + t.append(ch, "green") + else: + t.append(ch, "dim green") + return t + + +class MenuCard(Static): + """A single menu item card.""" + + DEFAULT_CSS = """ + MenuCard { + border: round #1a1a2e; + padding: 0 1; + margin: 0 0 1 0; + background: #0a0a0f; + color: #aaaaaa; + } + MenuCard:hover { + border: round cyan; + background: #0d1f2d; + color: bright_white; + } + MenuCard.selected { + border: round bright_cyan; + background: #0d2137; + color: bright_white; + } + """ + + def __init__(self, key: str, icon: str, title: str, desc: str, **kwargs): + super().__init__(**kwargs) + self.key = key + self.icon = icon + self.title = title + self.desc = desc + + def render(self) -> Text: + t = Text() + t.append(f" [{self.key}] ", "bold cyan") + t.append(f"{self.icon} ", "") + t.append(self.title, "bold bright_white") + t.append(f" — {self.desc}", "dim") + return t + + +class DevhaStudio(App): + """devha Hacking Studio — main TUI application.""" + + CSS = """ + Screen { + background: #050508; + layers: matrix menu; + } + + #matrix-bg { + layer: matrix; + layout: horizontal; + width: 100%; + height: 100%; + opacity: 15%; + overflow: hidden; + } + + #main-container { + layer: menu; + align: center middle; + width: 100%; + height: 100%; + } + + #center-panel { + width: 70; + height: auto; + background: #050508; + border: double cyan; + padding: 1 2; + } + + #banner { + width: 100%; + text-align: center; + margin: 0 0 1 0; + } + + #ethics-note { + color: dim yellow; + text-align: center; + margin: 0 0 1 0; + } + + #menu-list { + width: 100%; + height: auto; + } + + Footer { + background: #050508; + color: cyan; + } + """ + + BINDINGS = [ + Binding("1", "launch('network')", "Network"), + Binding("2", "launch('wifi')", "WiFi Lab"), + Binding("3", "launch('osint')", "OSINT"), + Binding("4", "launch('cipher')", "Crypto"), + Binding("5", "launch('web')", "Web"), + Binding("6", "launch('password')", "Passwords"), + Binding("7", "launch('packets')", "Packets"), + Binding("8", "launch('headers')", "Headers"), + Binding("9", "launch('ping')", "Ping"), + Binding("q", "quit", "Quit"), + ] + + TITLE = "devha Hacking Studio" + + def compose(self) -> ComposeResult: + # Matrix background + with Horizontal(id="matrix-bg"): + for _ in range(80): + yield MatrixColumn() + + # Main menu + with Container(id="main-container"): + with Vertical(id="center-panel"): + yield Static(_BANNER, id="banner") + yield Static( + "⚡ Own systems only · Stay legal · Be ethical", + id="ethics-note", + ) + with Vertical(id="menu-list"): + for key, icon, title, desc in _MENU_ITEMS: + yield MenuCard(key, icon, title, desc) + + yield Footer() + + def action_launch(self, module: str) -> None: + """Launch a module — exit TUI and run the command.""" + self.exit(result=module) + + +def run_studio() -> str | None: + """Run the TUI and return the selected module name.""" + app = DevhaStudio() + return app.run() diff --git a/pyproject.toml b/pyproject.toml index a6cd17c..67fd96f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [tool.poetry] name = "devha" -version = "0.1.0" -description = "Developer & Hacking CLI — an ethical hacking and developer toolkit" +version = "2.0.0" +description = "devha Hacking Studio — all-in-one ethical hacking and developer toolkit" authors = ["devha contributors "] license = "MIT" readme = "README.md"