From 1b35c1c85879690a0106a45287a3efd7d32d67c6 Mon Sep 17 00:00:00 2001 From: karie <25291562+kariemoorman@users.noreply.github.com> Date: Mon, 13 Apr 2026 23:50:57 -0700 Subject: [PATCH 1/2] fix: hardening --- CHANGELOG.md | 9 + pyproject.toml | 4 +- .../audiostego/core/audio_steganography.py | 11 +- .../imagestego/core/image_steganography.py | 265 ++++++++++++------ tests/test_image_multiformat_coder.py | 4 +- tests/test_image_steganography.py | 8 +- 6 files changed, 198 insertions(+), 103 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index df09217..62d4477 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,16 @@ # Changelog ## 0.0.3 + +### Added + - Added test image generation to GH0STB1T imagestego +- Added MCP Server integration, with 10 tools (audio_encode, audio_decode, audio_capacity, audio_analyze, generate_audio_carrier, image_encode, image_decode, image_capacity, image_analyze, generate_image_carrier) + +### Fixed +- Random seeding for LSBStego +- Constant time for audio and image steganography + ## 0.0.2 - 2026-02-05 diff --git a/pyproject.toml b/pyproject.toml index 1bf9d7e..6b71d82 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,7 +54,7 @@ license = "Apache-2.0" license-files = ["LICENSE"] dependencies = [ - "pycryptodomex>=3.21.0", + "pycryptodome>=3.15.0", "pydub>=0.25.1", "audioop-lts>=0.2.2", "soundfile>=0.12.1", @@ -106,7 +106,7 @@ mcp = [ # ] all = [ - "pycryptodomex>=3.21.0", + "pycryptodome>=3.15.0", "pydub>=0.25.1", "audioop-lts>=0.2.2", "soundfile>=0.12.1", diff --git a/src/ghostbit/audiostego/core/audio_steganography.py b/src/ghostbit/audiostego/core/audio_steganography.py index c0f42f7..d3d8b80 100644 --- a/src/ghostbit/audiostego/core/audio_steganography.py +++ b/src/ghostbit/audiostego/core/audio_steganography.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 import os +import hmac import struct import hashlib import logging @@ -655,7 +656,8 @@ def set_key_argon(self, password: str) -> None: verification_plaintext = b"STEGO_VERIFY_2025" nonce = os.urandom(12) aesgcm = AESGCM(bytes(self.aes_key)) - verification_ciphertext = aesgcm.encrypt(nonce, verification_plaintext, None) + key_commitment = hashlib.sha256(bytes(self.aes_key)).digest() + verification_ciphertext = aesgcm.encrypt(nonce, verification_plaintext, key_commitment) self.key_verif_block = bytearray(nonce + verification_ciphertext) logger.debug(f"Secure Argon2id key set (kdf_version={self.kdf_version})") @@ -1043,7 +1045,7 @@ def analyze_stream(self, stream: BinaryIO, dsc_head26_version: str) -> bool: if ( not self.key_verif_block - or bytes(self.key_verif_block) != stored_key_verif + or not hmac.compare_digest(bytes(self.key_verif_block), stored_key_verif) ): logger.info( f"Password required for encrypted content (KDF v{self.kdf_version})" @@ -1079,10 +1081,11 @@ def analyze_stream(self, stream: BinaryIO, dsc_head26_version: str) -> bool: nonce = raw[:12] ciphertext = raw[12:] aesgcm = AESGCM(bytes(self.aes_key)) + key_commitment = hashlib.sha256(bytes(self.aes_key)).digest() try: - plaintext = aesgcm.decrypt(nonce, ciphertext, None) - if plaintext != b"STEGO_VERIFY_2025": + plaintext = aesgcm.decrypt(nonce, ciphertext, key_commitment) + if not hmac.compare_digest(plaintext, b"STEGO_VERIFY_2025"): raise AudioSteganographyException("Invalid password") except Exception: logger.error("Password verification failed") diff --git a/src/ghostbit/imagestego/core/image_steganography.py b/src/ghostbit/imagestego/core/image_steganography.py index bd16d52..ae220bf 100755 --- a/src/ghostbit/imagestego/core/image_steganography.py +++ b/src/ghostbit/imagestego/core/image_steganography.py @@ -7,6 +7,7 @@ import random import secrets import logging +import hashlib from PIL import Image from enum import IntEnum from dataclasses import dataclass @@ -91,7 +92,8 @@ def _encrypt_data(self, data: bytes, password: str) -> Tuple[bytes, bytes, bytes key = self._derive_key(password, salt) aesgcm = AESGCM(key) - ciphertext = aesgcm.encrypt(nonce, data, None) + key_commitment = hashlib.sha256(key).digest() + ciphertext = aesgcm.encrypt(nonce, data, key_commitment) return salt, nonce, ciphertext @@ -101,9 +103,10 @@ def _decrypt_data( """Decrypt data using AES-256-GCM""" key = self._derive_key(password, salt) aesgcm = AESGCM(key) + key_commitment = hashlib.sha256(key).digest() try: - plaintext = aesgcm.decrypt(nonce, ciphertext, None) + plaintext = aesgcm.decrypt(nonce, ciphertext, key_commitment) return plaintext except Exception: raise ImageSteganographyException( @@ -304,17 +307,87 @@ def parse_payload( class LSBStego(BaseStego): """LSB steganography for lossless formats""" - def __init__(self): - self.key = 43 + SEED_SIZE = 32 # 256 bits + SEED_PIXELS = 86 # ceil(256 / 3 channels) - def get_capacity(self, cover_path: str) -> int: + def _write_seed_to_pixels(self, pixels: Any, width: int, height: int, seed: bytes) -> None: + """Write shuffle seed into the first SEED_PIXELS pixels sequentially (LSB of RGB).""" + bits = [] + for byte in seed: + for i in range(8): + bits.append((byte >> (7 - i)) & 1) + + bit_idx = 0 + for pixel_idx in range(self.SEED_PIXELS): + x = pixel_idx % width + y = pixel_idx // width + r, g, b, a = pixels[x, y] + if bit_idx < len(bits): + r = (r & 0xFE) | bits[bit_idx] + bit_idx += 1 + if bit_idx < len(bits): + g = (g & 0xFE) | bits[bit_idx] + bit_idx += 1 + if bit_idx < len(bits): + b = (b & 0xFE) | bits[bit_idx] + bit_idx += 1 + pixels[x, y] = (r, g, b, a) + + def _read_seed_from_pixels(self, pixels: Any, width: int, height: int) -> bytes: + """Read shuffle seed from the first SEED_PIXELS pixels sequentially (LSB of RGB).""" + bits: list[int] = [] + total_bits = self.SEED_SIZE * 8 + + for pixel_idx in range(self.SEED_PIXELS): + x = pixel_idx % width + y = pixel_idx // width + r, g, b, a = pixels[x, y] + if len(bits) < total_bits: + bits.append(r & 1) + if len(bits) < total_bits: + bits.append(g & 1) + if len(bits) < total_bits: + bits.append(b & 1) + + seed_bytes = bytearray() + for i in range(0, len(bits), 8): + byte = 0 + for j in range(8): + if i + j < len(bits): + byte = (byte << 1) | bits[i + j] + seed_bytes.append(byte) + return bytes(seed_bytes) + + def _get_shuffled_coords(self, width: int, height: int, seed: bytes) -> list: + """Get deterministically shuffled coordinates, excluding seed-reserved pixels.""" + seed_positions = set() + for pixel_idx in range(self.SEED_PIXELS): + seed_positions.add((pixel_idx % width, pixel_idx // width)) + + coords = [ + (x, y) for y in range(height) for x in range(width) + if (x, y) not in seed_positions + ] + + seed_int = int.from_bytes(hashlib.sha256(seed).digest(), "big") + rng = random.Random(seed_int) + rng.shuffle(coords) + return coords + + def get_capacity(self, cover_path_or_image: Union[str, Image.Image]) -> int: """Calculate maximum bytes that can be hidden using LSB (RGB channels only)""" logger.debug("Calculating LSB capacity") try: - with Image.open(cover_path).convert("RGB") as img: + if isinstance(cover_path_or_image, Image.Image): + img = cover_path_or_image.convert("RGB") width, height = img.size channels = len(img.getbands()) - return (width * height * min(channels, 3)) // 8 + return ((width * height - self.SEED_PIXELS) * min(channels, 3)) // 8 + else: + with Image.open(cover_path_or_image).convert("RGB") as img: + width, height = img.size + channels = len(img.getbands()) + return ((width * height - self.SEED_PIXELS) * min(channels, 3)) // 8 except Exception as e: logger.error("Failed to calculate LSB capacity") raise ImageSteganographyException(f"Failed to calculate LSB capacity: {e}") @@ -356,105 +429,119 @@ def bit_generator(data_bytes): except Exception as e: raise ImageSteganographyException(f"Failed to encode image: {e}") - def encode(self, cover_path: str, data: bytes) -> Image.Image: - """Embed data using random LSB""" + def encode(self, cover_path_or_image: Union[str, Image.Image], data: bytes) -> Image.Image: + """Embed data using random LSB. Accepts file path or PIL Image.""" try: - logger.info(f"Opening cover image: {cover_path}") - with Image.open(cover_path) as image: - img: Image.Image = image.convert("RGBA") - width, height = img.size - pixels: Any = img.load() - logger.debug(f"Image size: {width}x{height}, mode: {img.mode}") + if isinstance(cover_path_or_image, Image.Image): + logger.info("Encoding from PIL Image object") + img: Image.Image = cover_path_or_image.convert("RGBA") + else: + logger.info(f"Opening cover image: {cover_path_or_image}") + img = Image.open(cover_path_or_image).convert("RGBA") - total_bits = len(data) * 8 - logger.info(f"Embedding {len(data)} bytes ({total_bits} bits)") + width, height = img.size + pixels: Any = img.load() + logger.debug(f"Image size: {width}x{height}, mode: {img.mode}") - def bit_generator(data_bytes): - for byte in data_bytes: - for i in range(8): - yield (byte >> (7 - i)) & 1 + total_bits = len(data) * 8 + logger.info(f"Embedding {len(data)} bytes ({total_bits} bits)") - logger.debug("Converting data to a bit generator") - bits = bit_generator(data) + def bit_generator(data_bytes): + for byte in data_bytes: + for i in range(8): + yield (byte >> (7 - i)) & 1 - logger.debug("Generating shuffled coordinates") - coords = [(x, y) for y in range(height) for x in range(width)] - random.seed(self.key) - random.shuffle(coords) - logger.debug(f"Shuffled {len(coords)} pixel coordinates with key") - - logger.debug("Embedding bits into pixels") - bit_count = 0 - for idx, (x, y) in enumerate(coords): - r, g, b, a = pixels[x, y] - for channel_name, channel_value in zip(["R", "G", "B"], [r, g, b]): - try: - bit = next(bits) - if channel_name == "R": - r = (r & 0xFE) | bit - elif channel_name == "G": - g = (g & 0xFE) | bit - else: # B - b = (b & 0xFE) | bit - bit_count += 1 - except StopIteration: - break - pixels[x, y] = (r, g, b, a) + logger.debug("Converting data to a bit generator") + bits = bit_generator(data) + + logger.debug("Generating and embedding shuffle seed") + seed = secrets.token_bytes(self.SEED_SIZE) + self._write_seed_to_pixels(pixels, width, height, seed) + + logger.debug("Generating shuffled coordinates") + coords = self._get_shuffled_coords(width, height, seed) + logger.debug(f"Shuffled {len(coords)} pixel coordinates with seed") - if bit_count >= total_bits: - logger.debug(f"All bits embedded at pixel index {idx}") + logger.debug("Embedding bits into pixels") + bit_count = 0 + for idx, (x, y) in enumerate(coords): + r, g, b, a = pixels[x, y] + for channel_name, channel_value in zip(["R", "G", "B"], [r, g, b]): + try: + bit = next(bits) + if channel_name == "R": + r = (r & 0xFE) | bit + elif channel_name == "G": + g = (g & 0xFE) | bit + else: # B + b = (b & 0xFE) | bit + bit_count += 1 + except StopIteration: break + pixels[x, y] = (r, g, b, a) - logger.info(f"Finished embedding {bit_count} bits into image") - return img + if bit_count >= total_bits: + logger.debug(f"All bits embedded at pixel index {idx}") + break + + logger.info(f"Finished embedding {bit_count} bits into image") + return img except Exception as e: logger.error(f"Failed to encode image: {e}", exc_info=True) raise ImageSteganographyException(f"Failed to encode image: {e}") - def decode(self, stego_image: str, data_length: int) -> bytes: - """Decode data hidden with random LSB embedding""" + def decode(self, stego_image: Union[str, Image.Image], data_length: int) -> bytes: + """Decode data hidden with random LSB embedding. Accepts file path or PIL Image.""" try: - logger.info(f"Opening stego image: {stego_image}") - with Image.open(stego_image) as image: - img: Image.Image = image.convert("RGBA") - width, height = img.size - logger.debug(f"Image size: {width}x{height}, mode: {img.mode}") + if isinstance(stego_image, Image.Image): + logger.info("Decoding from PIL Image object") + img: Image.Image = stego_image.convert("RGBA") + else: + logger.info(f"Opening stego image: {stego_image}") + img = Image.open(stego_image).convert("RGBA") + + width, height = img.size + logger.debug(f"Image size: {width}x{height}, mode: {img.mode}") + + pixels: Any = img.load() + total_bits = data_length * 8 + logger.info(f"Expecting {total_bits} bits ({data_length} bytes)") + + seed = self._read_seed_from_pixels(pixels, width, height) + logger.debug("Read shuffle seed from image") + coords = self._get_shuffled_coords(width, height, seed) + logger.debug(f"Shuffled {len(coords)} pixel coordinates with seed") + + bits: list[int] = [] + for idx, (x, y) in enumerate(coords): + r, g, b, a = pixels[x, y] + if len(bits) < total_bits: + bits.append(r & 1) + if len(bits) < total_bits: + bits.append(g & 1) + if len(bits) < total_bits: + bits.append(b & 1) + if len(bits) >= total_bits: + logger.debug( + f"Collected all {total_bits} bits at pixel index {idx}" + ) + break - pixels: Any = img.load() - total_bits = data_length * 8 - logger.info(f"Expecting {total_bits} bits ({data_length} bytes)") - - coords = [(x, y) for y in range(height) for x in range(width)] - random.seed(self.key) - random.shuffle(coords) - logger.debug(f"Shuffled {len(coords)} pixel coordinates with key") - - bits: list[int] = [] - for idx, (x, y) in enumerate(coords): - r, g, b, a = pixels[x, y] - if len(bits) < total_bits: - bits.append(r & 1) - if len(bits) < total_bits: - bits.append(g & 1) - if len(bits) < total_bits: - bits.append(b & 1) - if len(bits) >= total_bits: - logger.debug( - f"Collected all {total_bits} bits at pixel index {idx}" - ) - break + data_bytes = bytearray() + for i in range(0, len(bits), 8): + byte = 0 + for j in range(8): + if i + j < len(bits): + byte = (byte << 1) | bits[i + j] + data_bytes.append(byte) + logger.info(f"Decoded {len(data_bytes)} bytes from image") - data_bytes = bytearray() - for i in range(0, len(bits), 8): - byte = 0 - for j in range(8): - if i + j < len(bits): - byte = (byte << 1) | bits[i + j] - data_bytes.append(byte) - logger.info(f"Decoded {len(data_bytes)} bytes from image") + return bytes(data_bytes) - return bytes(data_bytes) + except Exception as e: + logger.error(f"Failed to decode image: {e}", exc_info=True) + raise ImageSteganographyException(f"Failed to decode image: {e}") except Exception as e: logger.error(f"Failed to decode image: {e}", exc_info=True) diff --git a/tests/test_image_multiformat_coder.py b/tests/test_image_multiformat_coder.py index 1255f38..ab48924 100644 --- a/tests/test_image_multiformat_coder.py +++ b/tests/test_image_multiformat_coder.py @@ -176,7 +176,7 @@ def test_calculate_capacity_png(self, coder, tmp_path): assert isinstance(result, dict) assert result["format"] == "PNG" assert result["algorithm"] == "LSB" - assert result["capacity_bytes"] == 3750 + assert result["capacity_bytes"] == 3717 assert result["capacity_kb"] == pytest.approx(3.662, rel=0.01) assert result["capacity_mb"] == pytest.approx(0.00357, rel=0.01) @@ -217,7 +217,7 @@ def test_calculate_capacity_large_image(self, coder, tmp_path): result = coder.calculate_capacity(str(path)) - assert result["capacity_bytes"] == 375000 + assert result["capacity_bytes"] == 374967 def test_calculate_capacity_nonexistent_file(self, coder): """Test capacity calculation with nonexistent file""" diff --git a/tests/test_image_steganography.py b/tests/test_image_steganography.py index f100708..7287c9d 100644 --- a/tests/test_image_steganography.py +++ b/tests/test_image_steganography.py @@ -318,7 +318,7 @@ def test_get_capacity(self, lsb_stego, test_image): capacity = lsb_stego.get_capacity(test_image) # 100x100 pixels * 3 channels / 8 bits = 3750 bytes - assert capacity == 3750 + assert capacity == 3717 def test_get_capacity_large_image(self, lsb_stego, tmp_path): """Test capacity with larger image""" @@ -327,7 +327,7 @@ def test_get_capacity_large_image(self, lsb_stego, tmp_path): img.save(path) capacity = lsb_stego.get_capacity(str(path)) - assert capacity == 375000 # 1000*1000*3/8 + assert capacity == 374967 # 1000*1000*3/8 def test_encode_small_data(self, lsb_stego, test_image): """Test encoding small amount of data""" @@ -431,10 +431,6 @@ def test_decode_invalid_image(self, lsb_stego, tmp_path): with pytest.raises(ImageSteganographyException): lsb_stego.decode(str(invalid_file), 100) - def test_key_consistency(self, lsb_stego): - """Test that the key attribute is set correctly""" - assert hasattr(lsb_stego, 'key') - assert lsb_stego.key == 43 class TestPaletteStego: From 14588d2f8619de92b794fff787585f0deffd0992 Mon Sep 17 00:00:00 2001 From: karie <25291562+kariemoorman@users.noreply.github.com> Date: Tue, 14 Apr 2026 00:03:01 -0700 Subject: [PATCH 2/2] chore: add Union --- src/ghostbit/imagestego/core/image_steganography.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ghostbit/imagestego/core/image_steganography.py b/src/ghostbit/imagestego/core/image_steganography.py index ae220bf..dd69a4d 100755 --- a/src/ghostbit/imagestego/core/image_steganography.py +++ b/src/ghostbit/imagestego/core/image_steganography.py @@ -11,7 +11,7 @@ from PIL import Image from enum import IntEnum from dataclasses import dataclass -from typing import List, Optional, Tuple, Any +from typing import List, Optional, Tuple, Union, Any from cryptography.hazmat.primitives.kdf.argon2 import Argon2id from cryptography.hazmat.primitives.ciphers.aead import AESGCM