Files
KX-Bridge-Release/extract_credentials.py

455 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
extract_credentials.py Extrahiert Anycubic LAN-MQTT-Credentials aus dem RAM
des laufenden AnycubicSlicerNext-Prozesses.
Voraussetzungen:
- AnycubicSlicerNext läuft und ist mit dem Drucker verbunden
- Gleiches Benutzerkonto wie der Slicer-Prozess (kein Admin nötig)
Verwendung:
python3 extract_credentials.py [--write-env] [--env-file ../.env]
Funktionsweise:
1. Prozess "AnycubicSlicer.exe" (Windows) bzw. "AnycubicSlicer" (Linux) finden
2. Speicherseiten des Prozesses durchsuchen (nur r/rw, keine Exec-Pages)
3. Nach MQTT-Credential-Patterns suchen:
Username: user[A-Za-z0-9]{8,12}
Password: [A-Za-z0-9+/]{13,18}
Drucker-IP: d{1,3}.d{1,3}.d{1,3}.d{1,3}
4. Kandidaten nach Plausibilität filtern und ausgeben
5. Optional: .env-Datei schreiben
"""
import argparse
import os
import re
import struct
import sys
import platform
# ---------------------------------------------------------------------------
# Plattform-Erkennung
# ---------------------------------------------------------------------------
IS_WINDOWS = platform.system() == "Windows"
IS_LINUX = platform.system() == "Linux"
# ---------------------------------------------------------------------------
# Pattern
# ---------------------------------------------------------------------------
# Username: "user" + 812 alphanumerische Zeichen (drucker-generiert)
RE_USERNAME = re.compile(rb'user[A-Za-z0-9]{8,12}(?=[^A-Za-z0-9]|$)')
# Password: 1320 alphanumerische Zeichen (kein / da kein RTSP-Pfad)
# Anycubic-Passwörter: gemischte Groß/Klein/Ziffern, kein Slash
RE_PASSWORD = re.compile(rb'[A-Za-z0-9]{13,20}(?=[^A-Za-z0-9]|$)')
# Kontext-Pattern: sucht Passwort das direkt nach "password" im Speicher steht
RE_PASSWORD_CTX = re.compile(rb'(?:password|passwd|Password)\x00{0,4}([A-Za-z0-9]{10,25})(?=[^A-Za-z0-9]|$)', re.IGNORECASE)
# Proximity-Pattern: Username gefolgt von Passwort in naher Umgebung (<512 Bytes)
RE_USER_PASS_PROXIMITY = re.compile(
rb'(user[A-Za-z0-9]{8,12}).{1,512}?([A-Za-z0-9]{13,20})(?=[^A-Za-z0-9]|$)',
re.DOTALL
)
# IPv4-Adresse (kein localhost, kein Broadcast)
RE_IP = re.compile(rb'(?<![.\d])(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})(?![.\d])')
# mode_id: 5-stellige Zahl (z.B. 20030)
RE_MODE_ID = re.compile(rb'(?<!\d)(2\d{4})(?!\d)')
# device_id: 32 Hex-Zeichen (MD5-Format)
RE_DEVICE_ID = re.compile(rb'[0-9a-f]{32}(?=[^0-9a-f]|$)')
# ---------------------------------------------------------------------------
# Windows Speicher lesen via ctypes / ReadProcessMemory
# ---------------------------------------------------------------------------
def _win_find_pid(name: str) -> "int | None":
"""Findet die PID eines Prozesses anhand des Namens (case-insensitive)."""
import ctypes
import ctypes.wintypes
TH32CS_SNAPPROCESS = 0x00000002
class PROCESSENTRY32(ctypes.Structure):
_fields_ = [
("dwSize", ctypes.wintypes.DWORD),
("cntUsage", ctypes.wintypes.DWORD),
("th32ProcessID", ctypes.wintypes.DWORD),
("th32DefaultHeapID", ctypes.POINTER(ctypes.c_ulong)),
("th32ModuleID", ctypes.wintypes.DWORD),
("cntThreads", ctypes.wintypes.DWORD),
("th32ParentProcessID", ctypes.wintypes.DWORD),
("pcPriClassBase", ctypes.c_long),
("dwFlags", ctypes.wintypes.DWORD),
("szExeFile", ctypes.c_char * 260),
]
snap = ctypes.windll.kernel32.CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0)
if snap == ctypes.wintypes.HANDLE(-1).value:
return None
entry = PROCESSENTRY32()
entry.dwSize = ctypes.sizeof(PROCESSENTRY32)
try:
if not ctypes.windll.kernel32.Process32First(snap, ctypes.byref(entry)):
return None
while True:
if entry.szExeFile.decode("utf-8", errors="replace").lower() == name.lower():
return entry.th32ProcessID
if not ctypes.windll.kernel32.Process32Next(snap, ctypes.byref(entry)):
break
finally:
ctypes.windll.kernel32.CloseHandle(snap)
return None
def _win_read_memory(pid: int, chunk_size: int = 0x10000) -> "list[bytes]":
"""Liest alle lesbaren Speicherseiten eines Windows-Prozesses."""
import ctypes
import ctypes.wintypes
PROCESS_VM_READ = 0x0010
PROCESS_QUERY_INFORMATION = 0x0400
MEM_COMMIT = 0x1000
PAGE_NOACCESS = 0x01
PAGE_GUARD = 0x100
class MEMORY_BASIC_INFORMATION(ctypes.Structure):
_fields_ = [
("BaseAddress", ctypes.c_void_p),
("AllocationBase", ctypes.c_void_p),
("AllocationProtect", ctypes.wintypes.DWORD),
("RegionSize", ctypes.c_size_t),
("State", ctypes.wintypes.DWORD),
("Protect", ctypes.wintypes.DWORD),
("Type", ctypes.wintypes.DWORD),
]
k32 = ctypes.windll.kernel32
handle = k32.OpenProcess(PROCESS_VM_READ | PROCESS_QUERY_INFORMATION, False, pid)
if not handle:
raise PermissionError(f"OpenProcess fehlgeschlagen (PID {pid}): {ctypes.GetLastError()}")
chunks = []
addr = 0
mbi = MEMORY_BASIC_INFORMATION()
try:
while k32.VirtualQueryEx(handle, ctypes.c_void_p(addr),
ctypes.byref(mbi), ctypes.sizeof(mbi)):
skip = (
mbi.State != MEM_COMMIT or
mbi.Protect & PAGE_NOACCESS or
mbi.Protect & PAGE_GUARD or
mbi.RegionSize > 256 * 1024 * 1024 # >256 MB überspringen
)
if not skip:
buf = ctypes.create_string_buffer(mbi.RegionSize)
read = ctypes.c_size_t(0)
if k32.ReadProcessMemory(handle, ctypes.c_void_p(addr),
buf, mbi.RegionSize, ctypes.byref(read)):
chunks.append(bytes(buf[:read.value]))
addr += mbi.RegionSize
if addr >= 0x7FFFFFFFFFFF: # Ende des User-Space (64-bit)
break
finally:
k32.CloseHandle(handle)
return chunks
# ---------------------------------------------------------------------------
# Linux Speicher lesen via /proc/{pid}/mem
# ---------------------------------------------------------------------------
def _linux_find_pid(name: str) -> "int | None":
"""Findet PID anhand des Prozessnamens in /proc."""
for entry in os.listdir("/proc"):
if not entry.isdigit():
continue
try:
cmdline = open(f"/proc/{entry}/cmdline", "rb").read()
if name.lower().encode() in cmdline.lower():
return int(entry)
except (PermissionError, FileNotFoundError):
continue
return None
def _linux_read_memory(pid: int) -> "list[bytes]":
"""Liest lesbare Speichersegmente aus /proc/{pid}/mem."""
chunks = []
maps_path = f"/proc/{pid}/maps"
mem_path = f"/proc/{pid}/mem"
try:
maps = open(maps_path).readlines()
mem = open(mem_path, "rb")
except PermissionError:
raise PermissionError(
f"Kein Zugriff auf /proc/{pid}/mem — "
"Script als gleicher Benutzer wie der Slicer starten."
)
for line in maps:
parts = line.split()
if len(parts) < 2:
continue
perms = parts[1]
if "r" not in perms: # nur lesbare Seiten
continue
if "x" in perms: # Code-Seiten überspringen (keine Strings)
continue
try:
start, end = [int(x, 16) for x in parts[0].split("-")]
except ValueError:
continue
size = end - start
if size > 256 * 1024 * 1024: # >256 MB überspringen
continue
try:
mem.seek(start)
data = mem.read(size)
if data:
chunks.append(data)
except (OSError, ValueError):
continue
mem.close()
return chunks
# ---------------------------------------------------------------------------
# Pattern-Suche
# ---------------------------------------------------------------------------
def _is_valid_ip(ip_bytes: bytes) -> bool:
parts = ip_bytes.split(b".")
if len(parts) != 4:
return False
try:
nums = [int(p) for p in parts]
except ValueError:
return False
if nums[0] in (0, 127, 255):
return False
if nums == [0, 0, 0, 0]:
return False
return all(0 <= n <= 255 for n in nums)
def search_chunks(chunks: "list[bytes]") -> dict:
"""Durchsucht Speicher-Chunks nach Credential-Patterns."""
usernames = {} # value → count
passwords = {}
ips = {}
device_ids = {}
total = len(chunks)
for i, chunk in enumerate(chunks):
if i % 50 == 0 or i == total - 1:
pct = (i + 1) * 100 // total
mb_done = sum(len(c) for c in chunks[:i+1]) / 1024 / 1024
print(f"\r[*] Analysiere ... {pct:3d}% ({mb_done:.0f} MB)", end="", flush=True)
for m in RE_USERNAME.finditer(chunk):
v = m.group().decode("ascii", errors="replace")
usernames[v] = usernames.get(v, 0) + 1
# Proximity: Passwort das innerhalb von 512 Bytes nach einem Username steht
for m in RE_USER_PASS_PROXIMITY.finditer(chunk):
pw = m.group(2).decode("ascii", errors="replace")
has_upper = any(c.isupper() for c in pw)
has_lower = any(c.islower() for c in pw)
has_digit = any(c.isdigit() for c in pw)
if has_upper and has_lower and has_digit:
passwords[pw] = passwords.get(pw, 0) + 500
for m in RE_PASSWORD.finditer(chunk):
v = m.group().decode("ascii", errors="replace")
# Filter: mindestens 2 Großbuchstaben + 2 Kleinbuchstaben + 1 Ziffer
has_upper = sum(1 for c in v if c.isupper()) >= 2
has_lower = sum(1 for c in v if c.islower()) >= 2
has_digit = sum(1 for c in v if c.isdigit()) >= 1
if has_upper and has_lower and has_digit:
passwords[v] = passwords.get(v, 0) + 1
for m in RE_IP.finditer(chunk):
ip = m.group(1)
if _is_valid_ip(ip):
v = ip.decode("ascii")
ips[v] = ips.get(v, 0) + 1
for m in RE_DEVICE_ID.finditer(chunk):
v = m.group().decode("ascii")
device_ids[v] = device_ids.get(v, 0) + 1
print() # Zeilenumbruch nach Fortschrittszeile
# Nach Häufigkeit sortieren, häufigste zuerst
def top(d, n=10):
return sorted(d.items(), key=lambda x: -x[1])[:n]
return {
"usernames": top(usernames),
"passwords": top(passwords),
"ips": top(ips, 10),
"device_ids": top(device_ids),
}
# ---------------------------------------------------------------------------
# Hauptprogramm
# ---------------------------------------------------------------------------
SLICER_NAMES = [
"AnycubicSlicerNext.exe", # Windows
"AnycubicSlicer.exe",
"AnycubicSlicerNext", # Linux
"AnycubicSlicer",
]
def find_slicer_pid() -> "tuple[int, str] | tuple[None, None]":
for name in SLICER_NAMES:
if IS_WINDOWS:
pid = _win_find_pid(name)
else:
pid = _linux_find_pid(name)
if pid:
return pid, name
return None, None
def read_process(pid: int) -> "list[bytes]":
if IS_WINDOWS:
return _win_read_memory(pid)
else:
return _linux_read_memory(pid)
def write_env(results: dict, env_path: str,
username: str, password: str, ip: str,
mode_id: str = "20030", device_id: str = ""):
lines = []
if os.path.isfile(env_path):
with open(env_path) as f:
lines = f.readlines()
def set_val(key, val):
nonlocal lines
for i, line in enumerate(lines):
if line.startswith(f"{key}="):
lines[i] = f"{key}={val}\n"
return
lines.append(f"{key}={val}\n")
set_val("PRINTER_IP", ip)
set_val("MQTT_USERNAME", username)
set_val("MQTT_PASSWORD", password)
if device_id:
set_val("DEVICE_ID", device_id)
if mode_id:
set_val("MODE_ID", mode_id)
with open(env_path, "w") as f:
f.writelines(lines)
print(f"\n✓ Credentials in '{env_path}' gespeichert.")
def main():
parser = argparse.ArgumentParser(
description="Extrahiert MQTT-Credentials aus dem RAM des AnycubicSlicer-Prozesses"
)
parser.add_argument("--write-env", action="store_true",
help="Gefundene Credentials in .env schreiben")
parser.add_argument("--env-file", default=None,
help="Pfad zur .env-Datei (Standard: ../. env relativ zu diesem Script)")
parser.add_argument("--pid", type=int, default=None,
help="Prozess-PID direkt angeben (überspringt Auto-Erkennung)")
parser.add_argument("--verbose", action="store_true",
help="Alle Kandidaten ausgeben, nicht nur den besten")
args = parser.parse_args()
# .env-Pfad bestimmen
if args.env_file:
env_path = args.env_file
else:
env_path = os.path.join(os.path.dirname(__file__), "..", ".env")
env_path = os.path.normpath(env_path)
# Prozess finden
if args.pid:
pid, proc_name = args.pid, f"PID {args.pid}"
else:
print("[*] Suche AnycubicSlicer-Prozess ...")
pid, proc_name = find_slicer_pid()
if not pid:
print("✗ AnycubicSlicer nicht gefunden. Bitte den Slicer starten und "
"mit dem Drucker verbinden, dann erneut ausführen.")
sys.exit(1)
print(f"[*] Prozess gefunden: {proc_name} (PID {pid})")
print(f"[*] Lese Prozess-Speicher ...")
try:
chunks = read_process(pid)
except PermissionError as e:
print(f"✗ Zugriffsfehler: {e}")
sys.exit(1)
total_mb = sum(len(c) for c in chunks) / 1024 / 1024
print(f"[*] {len(chunks)} Speichersegmente gelesen ({total_mb:.1f} MB)")
print(f"[*] Durchsuche nach Credentials ...")
results = search_chunks(chunks)
# Ausgabe
print("\n" + "="*55)
print(" ERGEBNISSE")
print("="*55)
def show(label, items, verbose):
if not items:
print(f" {label:12s} — nicht gefunden")
return items[0][0] if items else ""
best = items[0][0]
print(f" {label:12s} {best} (Treffer: {items[0][1]})")
if verbose and len(items) > 1:
for val, cnt in items[1:]:
print(f" {'':12s} {val} (Treffer: {cnt})")
return best
best_user = show("Username", results["usernames"], args.verbose)
best_pass = show("Password", results["passwords"], args.verbose)
best_device = show("Device-ID", results["device_ids"], args.verbose)
# IP: 192.168.x.x bevorzugen
lan_ips = [(ip, cnt) for ip, cnt in results["ips"]
if ip.startswith("192.168.") or ip.startswith("10.") or ip.startswith("172.")]
if not lan_ips:
lan_ips = results["ips"]
best_ip = show("Drucker-IP", lan_ips, args.verbose)
print("="*55)
if not best_user or not best_pass:
print("\n⚠ Keine vollständigen Credentials gefunden.")
print(" Stelle sicher dass der Slicer MIT dem Drucker verbunden ist.")
sys.exit(1)
if args.write_env:
write_env(results, env_path, best_user, best_pass, best_ip,
device_id=best_device)
else:
print(f"\nHinweis: --write-env übergeben um Credentials in '{env_path}' zu speichern.")
if __name__ == "__main__":
main()