""" extract_credentials.py – Extrahiert Anycubic LAN-MQTT-Credentials aus dem RAM des laufenden AnycubicSlicerNext-Prozesses. Voraussetzungen: - AnycubicSlicerNext läuft und ist mit dem Drucker verbunden - Gleiches Benutzerkonto wie der Slicer-Prozess (kein Admin nötig) Verwendung: python3 extract_credentials.py [--write-env] [--env-file ../.env] Funktionsweise: 1. Prozess "AnycubicSlicer.exe" (Windows) bzw. "AnycubicSlicer" (Linux) finden 2. Speicherseiten des Prozesses durchsuchen (nur r/rw, keine Exec-Pages) 3. Nach MQTT-Credential-Patterns suchen: Username: user[A-Za-z0-9]{8,12} Password: [A-Za-z0-9+/]{13,18} Drucker-IP: d{1,3}.d{1,3}.d{1,3}.d{1,3} 4. Kandidaten nach Plausibilität filtern und ausgeben 5. Optional: .env-Datei schreiben """ import argparse import os import re import struct import sys import platform # --------------------------------------------------------------------------- # Plattform-Erkennung # --------------------------------------------------------------------------- IS_WINDOWS = platform.system() == "Windows" IS_LINUX = platform.system() == "Linux" # --------------------------------------------------------------------------- # Pattern # --------------------------------------------------------------------------- # Username: "user" + 8–12 alphanumerische Zeichen (drucker-generiert) RE_USERNAME = re.compile(rb'user[A-Za-z0-9]{8,12}(?=[^A-Za-z0-9]|$)') # Password: 13–20 alphanumerische Zeichen (kein / da kein RTSP-Pfad) # Anycubic-Passwörter: gemischte Groß/Klein/Ziffern, kein Slash RE_PASSWORD = re.compile(rb'[A-Za-z0-9]{13,20}(?=[^A-Za-z0-9]|$)') # Kontext-Pattern: sucht Passwort das direkt nach "password" im Speicher steht RE_PASSWORD_CTX = re.compile(rb'(?:password|passwd|Password)\x00{0,4}([A-Za-z0-9]{10,25})(?=[^A-Za-z0-9]|$)', re.IGNORECASE) # Proximity-Pattern: Username gefolgt von Passwort in naher Umgebung (<512 Bytes) RE_USER_PASS_PROXIMITY = re.compile( rb'(user[A-Za-z0-9]{8,12}).{1,512}?([A-Za-z0-9]{13,20})(?=[^A-Za-z0-9]|$)', re.DOTALL ) # IPv4-Adresse (kein localhost, kein Broadcast) RE_IP = re.compile(rb'(? "int | None": """Findet die PID eines Prozesses anhand des Namens (case-insensitive).""" import ctypes import ctypes.wintypes TH32CS_SNAPPROCESS = 0x00000002 class PROCESSENTRY32(ctypes.Structure): _fields_ = [ ("dwSize", ctypes.wintypes.DWORD), ("cntUsage", ctypes.wintypes.DWORD), ("th32ProcessID", ctypes.wintypes.DWORD), ("th32DefaultHeapID", ctypes.POINTER(ctypes.c_ulong)), ("th32ModuleID", ctypes.wintypes.DWORD), ("cntThreads", ctypes.wintypes.DWORD), ("th32ParentProcessID", ctypes.wintypes.DWORD), ("pcPriClassBase", ctypes.c_long), ("dwFlags", ctypes.wintypes.DWORD), ("szExeFile", ctypes.c_char * 260), ] snap = ctypes.windll.kernel32.CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0) if snap == ctypes.wintypes.HANDLE(-1).value: return None entry = PROCESSENTRY32() entry.dwSize = ctypes.sizeof(PROCESSENTRY32) try: if not ctypes.windll.kernel32.Process32First(snap, ctypes.byref(entry)): return None while True: if entry.szExeFile.decode("utf-8", errors="replace").lower() == name.lower(): return entry.th32ProcessID if not ctypes.windll.kernel32.Process32Next(snap, ctypes.byref(entry)): break finally: ctypes.windll.kernel32.CloseHandle(snap) return None def _win_read_memory(pid: int, chunk_size: int = 0x10000) -> "list[bytes]": """Liest alle lesbaren Speicherseiten eines Windows-Prozesses.""" import ctypes import ctypes.wintypes PROCESS_VM_READ = 0x0010 PROCESS_QUERY_INFORMATION = 0x0400 MEM_COMMIT = 0x1000 PAGE_NOACCESS = 0x01 PAGE_GUARD = 0x100 class MEMORY_BASIC_INFORMATION(ctypes.Structure): _fields_ = [ ("BaseAddress", ctypes.c_void_p), ("AllocationBase", ctypes.c_void_p), ("AllocationProtect", ctypes.wintypes.DWORD), ("RegionSize", ctypes.c_size_t), ("State", ctypes.wintypes.DWORD), ("Protect", ctypes.wintypes.DWORD), ("Type", ctypes.wintypes.DWORD), ] k32 = ctypes.windll.kernel32 handle = k32.OpenProcess(PROCESS_VM_READ | PROCESS_QUERY_INFORMATION, False, pid) if not handle: raise PermissionError(f"OpenProcess fehlgeschlagen (PID {pid}): {ctypes.GetLastError()}") chunks = [] addr = 0 mbi = MEMORY_BASIC_INFORMATION() try: while k32.VirtualQueryEx(handle, ctypes.c_void_p(addr), ctypes.byref(mbi), ctypes.sizeof(mbi)): skip = ( mbi.State != MEM_COMMIT or mbi.Protect & PAGE_NOACCESS or mbi.Protect & PAGE_GUARD or mbi.RegionSize > 256 * 1024 * 1024 # >256 MB überspringen ) if not skip: buf = ctypes.create_string_buffer(mbi.RegionSize) read = ctypes.c_size_t(0) if k32.ReadProcessMemory(handle, ctypes.c_void_p(addr), buf, mbi.RegionSize, ctypes.byref(read)): chunks.append(bytes(buf[:read.value])) addr += mbi.RegionSize if addr >= 0x7FFFFFFFFFFF: # Ende des User-Space (64-bit) break finally: k32.CloseHandle(handle) return chunks # --------------------------------------------------------------------------- # Linux – Speicher lesen via /proc/{pid}/mem # --------------------------------------------------------------------------- def _linux_find_pid(name: str) -> "int | None": """Findet PID anhand des Prozessnamens in /proc.""" for entry in os.listdir("/proc"): if not entry.isdigit(): continue try: cmdline = open(f"/proc/{entry}/cmdline", "rb").read() if name.lower().encode() in cmdline.lower(): return int(entry) except (PermissionError, FileNotFoundError): continue return None def _linux_read_memory(pid: int) -> "list[bytes]": """Liest lesbare Speichersegmente aus /proc/{pid}/mem.""" chunks = [] maps_path = f"/proc/{pid}/maps" mem_path = f"/proc/{pid}/mem" try: maps = open(maps_path).readlines() mem = open(mem_path, "rb") except PermissionError: raise PermissionError( f"Kein Zugriff auf /proc/{pid}/mem — " "Script als gleicher Benutzer wie der Slicer starten." ) for line in maps: parts = line.split() if len(parts) < 2: continue perms = parts[1] if "r" not in perms: # nur lesbare Seiten continue if "x" in perms: # Code-Seiten überspringen (keine Strings) continue try: start, end = [int(x, 16) for x in parts[0].split("-")] except ValueError: continue size = end - start if size > 256 * 1024 * 1024: # >256 MB überspringen continue try: mem.seek(start) data = mem.read(size) if data: chunks.append(data) except (OSError, ValueError): continue mem.close() return chunks # --------------------------------------------------------------------------- # Pattern-Suche # --------------------------------------------------------------------------- def _is_valid_ip(ip_bytes: bytes) -> bool: parts = ip_bytes.split(b".") if len(parts) != 4: return False try: nums = [int(p) for p in parts] except ValueError: return False if nums[0] in (0, 127, 255): return False if nums == [0, 0, 0, 0]: return False return all(0 <= n <= 255 for n in nums) def search_chunks(chunks: "list[bytes]") -> dict: """Durchsucht Speicher-Chunks nach Credential-Patterns.""" usernames = {} # value → count passwords = {} ips = {} device_ids = {} total = len(chunks) for i, chunk in enumerate(chunks): if i % 50 == 0 or i == total - 1: pct = (i + 1) * 100 // total mb_done = sum(len(c) for c in chunks[:i+1]) / 1024 / 1024 print(f"\r[*] Analysiere ... {pct:3d}% ({mb_done:.0f} MB)", end="", flush=True) for m in RE_USERNAME.finditer(chunk): v = m.group().decode("ascii", errors="replace") usernames[v] = usernames.get(v, 0) + 1 # Proximity: Passwort das innerhalb von 512 Bytes nach einem Username steht for m in RE_USER_PASS_PROXIMITY.finditer(chunk): pw = m.group(2).decode("ascii", errors="replace") has_upper = any(c.isupper() for c in pw) has_lower = any(c.islower() for c in pw) has_digit = any(c.isdigit() for c in pw) if has_upper and has_lower and has_digit: passwords[pw] = passwords.get(pw, 0) + 500 for m in RE_PASSWORD.finditer(chunk): v = m.group().decode("ascii", errors="replace") # Filter: mindestens 2 Großbuchstaben + 2 Kleinbuchstaben + 1 Ziffer has_upper = sum(1 for c in v if c.isupper()) >= 2 has_lower = sum(1 for c in v if c.islower()) >= 2 has_digit = sum(1 for c in v if c.isdigit()) >= 1 if has_upper and has_lower and has_digit: passwords[v] = passwords.get(v, 0) + 1 for m in RE_IP.finditer(chunk): ip = m.group(1) if _is_valid_ip(ip): v = ip.decode("ascii") ips[v] = ips.get(v, 0) + 1 for m in RE_DEVICE_ID.finditer(chunk): v = m.group().decode("ascii") device_ids[v] = device_ids.get(v, 0) + 1 print() # Zeilenumbruch nach Fortschrittszeile # Nach Häufigkeit sortieren, häufigste zuerst def top(d, n=10): return sorted(d.items(), key=lambda x: -x[1])[:n] return { "usernames": top(usernames), "passwords": top(passwords), "ips": top(ips, 10), "device_ids": top(device_ids), } # --------------------------------------------------------------------------- # Hauptprogramm # --------------------------------------------------------------------------- SLICER_NAMES = [ "AnycubicSlicerNext.exe", # Windows "AnycubicSlicer.exe", "AnycubicSlicerNext", # Linux "AnycubicSlicer", ] def find_slicer_pid() -> "tuple[int, str] | tuple[None, None]": for name in SLICER_NAMES: if IS_WINDOWS: pid = _win_find_pid(name) else: pid = _linux_find_pid(name) if pid: return pid, name return None, None def read_process(pid: int) -> "list[bytes]": if IS_WINDOWS: return _win_read_memory(pid) else: return _linux_read_memory(pid) def write_env(results: dict, env_path: str, username: str, password: str, ip: str, mode_id: str = "20030", device_id: str = ""): lines = [] if os.path.isfile(env_path): with open(env_path) as f: lines = f.readlines() def set_val(key, val): nonlocal lines for i, line in enumerate(lines): if line.startswith(f"{key}="): lines[i] = f"{key}={val}\n" return lines.append(f"{key}={val}\n") set_val("PRINTER_IP", ip) set_val("MQTT_USERNAME", username) set_val("MQTT_PASSWORD", password) if device_id: set_val("DEVICE_ID", device_id) if mode_id: set_val("MODE_ID", mode_id) with open(env_path, "w") as f: f.writelines(lines) print(f"\n✓ Credentials in '{env_path}' gespeichert.") def main(): parser = argparse.ArgumentParser( description="Extrahiert MQTT-Credentials aus dem RAM des AnycubicSlicer-Prozesses" ) parser.add_argument("--write-env", action="store_true", help="Gefundene Credentials in .env schreiben") parser.add_argument("--env-file", default=None, help="Pfad zur .env-Datei (Standard: ../. env relativ zu diesem Script)") parser.add_argument("--pid", type=int, default=None, help="Prozess-PID direkt angeben (überspringt Auto-Erkennung)") parser.add_argument("--verbose", action="store_true", help="Alle Kandidaten ausgeben, nicht nur den besten") args = parser.parse_args() # .env-Pfad bestimmen if args.env_file: env_path = args.env_file else: env_path = os.path.join(os.path.dirname(__file__), "..", ".env") env_path = os.path.normpath(env_path) # Prozess finden if args.pid: pid, proc_name = args.pid, f"PID {args.pid}" else: print("[*] Suche AnycubicSlicer-Prozess ...") pid, proc_name = find_slicer_pid() if not pid: print("✗ AnycubicSlicer nicht gefunden. Bitte den Slicer starten und " "mit dem Drucker verbinden, dann erneut ausführen.") sys.exit(1) print(f"[*] Prozess gefunden: {proc_name} (PID {pid})") print(f"[*] Lese Prozess-Speicher ...") try: chunks = read_process(pid) except PermissionError as e: print(f"✗ Zugriffsfehler: {e}") sys.exit(1) total_mb = sum(len(c) for c in chunks) / 1024 / 1024 print(f"[*] {len(chunks)} Speichersegmente gelesen ({total_mb:.1f} MB)") print(f"[*] Durchsuche nach Credentials ...") results = search_chunks(chunks) # Ausgabe print("\n" + "="*55) print(" ERGEBNISSE") print("="*55) def show(label, items, verbose): if not items: print(f" {label:12s} — nicht gefunden") return items[0][0] if items else "" best = items[0][0] print(f" {label:12s} {best} (Treffer: {items[0][1]})") if verbose and len(items) > 1: for val, cnt in items[1:]: print(f" {'':12s} {val} (Treffer: {cnt})") return best best_user = show("Username", results["usernames"], args.verbose) best_pass = show("Password", results["passwords"], args.verbose) best_device = show("Device-ID", results["device_ids"], args.verbose) # IP: 192.168.x.x bevorzugen lan_ips = [(ip, cnt) for ip, cnt in results["ips"] if ip.startswith("192.168.") or ip.startswith("10.") or ip.startswith("172.")] if not lan_ips: lan_ips = results["ips"] best_ip = show("Drucker-IP", lan_ips, args.verbose) print("="*55) if not best_user or not best_pass: print("\n⚠ Keine vollständigen Credentials gefunden.") print(" Stelle sicher dass der Slicer MIT dem Drucker verbunden ist.") sys.exit(1) if args.write_env: write_env(results, env_path, best_user, best_pass, best_ip, device_id=best_device) else: print(f"\nHinweis: --write-env übergeben um Credentials in '{env_path}' zu speichern.") if __name__ == "__main__": main()