feat: KX-Bridge 0.9.0-beta1 – initiales Release-Repo

This commit is contained in:
2026-04-20 16:07:43 +02:00
commit 9bfd455a7c
14 changed files with 3411 additions and 0 deletions

454
extract_credentials.py Normal file
View File

@@ -0,0 +1,454 @@
"""
extract_credentials.py Extrahiert Anycubic LAN-MQTT-Credentials aus dem RAM
des laufenden AnycubicSlicerNext-Prozesses.
Voraussetzungen:
- AnycubicSlicerNext läuft und ist mit dem Drucker verbunden
- Gleiches Benutzerkonto wie der Slicer-Prozess (kein Admin nötig)
Verwendung:
python3 extract_credentials.py [--write-env] [--env-file ../.env]
Funktionsweise:
1. Prozess "AnycubicSlicer.exe" (Windows) bzw. "AnycubicSlicer" (Linux) finden
2. Speicherseiten des Prozesses durchsuchen (nur r/rw, keine Exec-Pages)
3. Nach MQTT-Credential-Patterns suchen:
Username: user[A-Za-z0-9]{8,12}
Password: [A-Za-z0-9+/]{13,18}
Drucker-IP: d{1,3}.d{1,3}.d{1,3}.d{1,3}
4. Kandidaten nach Plausibilität filtern und ausgeben
5. Optional: .env-Datei schreiben
"""
import argparse
import os
import re
import struct
import sys
import platform
# ---------------------------------------------------------------------------
# Plattform-Erkennung
# ---------------------------------------------------------------------------
IS_WINDOWS = platform.system() == "Windows"
IS_LINUX = platform.system() == "Linux"
# ---------------------------------------------------------------------------
# Pattern
# ---------------------------------------------------------------------------
# Username: "user" + 812 alphanumerische Zeichen (drucker-generiert)
RE_USERNAME = re.compile(rb'user[A-Za-z0-9]{8,12}(?=[^A-Za-z0-9]|$)')
# Password: 1320 alphanumerische Zeichen (kein / da kein RTSP-Pfad)
# Anycubic-Passwörter: gemischte Groß/Klein/Ziffern, kein Slash
RE_PASSWORD = re.compile(rb'[A-Za-z0-9]{13,20}(?=[^A-Za-z0-9]|$)')
# Kontext-Pattern: sucht Passwort das direkt nach "password" im Speicher steht
RE_PASSWORD_CTX = re.compile(rb'(?:password|passwd|Password)\x00{0,4}([A-Za-z0-9]{10,25})(?=[^A-Za-z0-9]|$)', re.IGNORECASE)
# Proximity-Pattern: Username gefolgt von Passwort in naher Umgebung (<512 Bytes)
RE_USER_PASS_PROXIMITY = re.compile(
rb'(user[A-Za-z0-9]{8,12}).{1,512}?([A-Za-z0-9]{13,20})(?=[^A-Za-z0-9]|$)',
re.DOTALL
)
# IPv4-Adresse (kein localhost, kein Broadcast)
RE_IP = re.compile(rb'(?<![.\d])(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})(?![.\d])')
# mode_id: 5-stellige Zahl (z.B. 20030)
RE_MODE_ID = re.compile(rb'(?<!\d)(2\d{4})(?!\d)')
# device_id: 32 Hex-Zeichen (MD5-Format)
RE_DEVICE_ID = re.compile(rb'[0-9a-f]{32}(?=[^0-9a-f]|$)')
# ---------------------------------------------------------------------------
# Windows Speicher lesen via ctypes / ReadProcessMemory
# ---------------------------------------------------------------------------
def _win_find_pid(name: str) -> "int | None":
"""Findet die PID eines Prozesses anhand des Namens (case-insensitive)."""
import ctypes
import ctypes.wintypes
TH32CS_SNAPPROCESS = 0x00000002
class PROCESSENTRY32(ctypes.Structure):
_fields_ = [
("dwSize", ctypes.wintypes.DWORD),
("cntUsage", ctypes.wintypes.DWORD),
("th32ProcessID", ctypes.wintypes.DWORD),
("th32DefaultHeapID", ctypes.POINTER(ctypes.c_ulong)),
("th32ModuleID", ctypes.wintypes.DWORD),
("cntThreads", ctypes.wintypes.DWORD),
("th32ParentProcessID", ctypes.wintypes.DWORD),
("pcPriClassBase", ctypes.c_long),
("dwFlags", ctypes.wintypes.DWORD),
("szExeFile", ctypes.c_char * 260),
]
snap = ctypes.windll.kernel32.CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0)
if snap == ctypes.wintypes.HANDLE(-1).value:
return None
entry = PROCESSENTRY32()
entry.dwSize = ctypes.sizeof(PROCESSENTRY32)
try:
if not ctypes.windll.kernel32.Process32First(snap, ctypes.byref(entry)):
return None
while True:
if entry.szExeFile.decode("utf-8", errors="replace").lower() == name.lower():
return entry.th32ProcessID
if not ctypes.windll.kernel32.Process32Next(snap, ctypes.byref(entry)):
break
finally:
ctypes.windll.kernel32.CloseHandle(snap)
return None
def _win_read_memory(pid: int, chunk_size: int = 0x10000) -> "list[bytes]":
"""Liest alle lesbaren Speicherseiten eines Windows-Prozesses."""
import ctypes
import ctypes.wintypes
PROCESS_VM_READ = 0x0010
PROCESS_QUERY_INFORMATION = 0x0400
MEM_COMMIT = 0x1000
PAGE_NOACCESS = 0x01
PAGE_GUARD = 0x100
class MEMORY_BASIC_INFORMATION(ctypes.Structure):
_fields_ = [
("BaseAddress", ctypes.c_void_p),
("AllocationBase", ctypes.c_void_p),
("AllocationProtect", ctypes.wintypes.DWORD),
("RegionSize", ctypes.c_size_t),
("State", ctypes.wintypes.DWORD),
("Protect", ctypes.wintypes.DWORD),
("Type", ctypes.wintypes.DWORD),
]
k32 = ctypes.windll.kernel32
handle = k32.OpenProcess(PROCESS_VM_READ | PROCESS_QUERY_INFORMATION, False, pid)
if not handle:
raise PermissionError(f"OpenProcess fehlgeschlagen (PID {pid}): {ctypes.GetLastError()}")
chunks = []
addr = 0
mbi = MEMORY_BASIC_INFORMATION()
try:
while k32.VirtualQueryEx(handle, ctypes.c_void_p(addr),
ctypes.byref(mbi), ctypes.sizeof(mbi)):
skip = (
mbi.State != MEM_COMMIT or
mbi.Protect & PAGE_NOACCESS or
mbi.Protect & PAGE_GUARD or
mbi.RegionSize > 256 * 1024 * 1024 # >256 MB überspringen
)
if not skip:
buf = ctypes.create_string_buffer(mbi.RegionSize)
read = ctypes.c_size_t(0)
if k32.ReadProcessMemory(handle, ctypes.c_void_p(addr),
buf, mbi.RegionSize, ctypes.byref(read)):
chunks.append(bytes(buf[:read.value]))
addr += mbi.RegionSize
if addr >= 0x7FFFFFFFFFFF: # Ende des User-Space (64-bit)
break
finally:
k32.CloseHandle(handle)
return chunks
# ---------------------------------------------------------------------------
# Linux Speicher lesen via /proc/{pid}/mem
# ---------------------------------------------------------------------------
def _linux_find_pid(name: str) -> "int | None":
"""Findet PID anhand des Prozessnamens in /proc."""
for entry in os.listdir("/proc"):
if not entry.isdigit():
continue
try:
cmdline = open(f"/proc/{entry}/cmdline", "rb").read()
if name.lower().encode() in cmdline.lower():
return int(entry)
except (PermissionError, FileNotFoundError):
continue
return None
def _linux_read_memory(pid: int) -> "list[bytes]":
"""Liest lesbare Speichersegmente aus /proc/{pid}/mem."""
chunks = []
maps_path = f"/proc/{pid}/maps"
mem_path = f"/proc/{pid}/mem"
try:
maps = open(maps_path).readlines()
mem = open(mem_path, "rb")
except PermissionError:
raise PermissionError(
f"Kein Zugriff auf /proc/{pid}/mem — "
"Script als gleicher Benutzer wie der Slicer starten."
)
for line in maps:
parts = line.split()
if len(parts) < 2:
continue
perms = parts[1]
if "r" not in perms: # nur lesbare Seiten
continue
if "x" in perms: # Code-Seiten überspringen (keine Strings)
continue
try:
start, end = [int(x, 16) for x in parts[0].split("-")]
except ValueError:
continue
size = end - start
if size > 256 * 1024 * 1024: # >256 MB überspringen
continue
try:
mem.seek(start)
data = mem.read(size)
if data:
chunks.append(data)
except (OSError, ValueError):
continue
mem.close()
return chunks
# ---------------------------------------------------------------------------
# Pattern-Suche
# ---------------------------------------------------------------------------
def _is_valid_ip(ip_bytes: bytes) -> bool:
parts = ip_bytes.split(b".")
if len(parts) != 4:
return False
try:
nums = [int(p) for p in parts]
except ValueError:
return False
if nums[0] in (0, 127, 255):
return False
if nums == [0, 0, 0, 0]:
return False
return all(0 <= n <= 255 for n in nums)
def search_chunks(chunks: "list[bytes]") -> dict:
"""Durchsucht Speicher-Chunks nach Credential-Patterns."""
usernames = {} # value → count
passwords = {}
ips = {}
device_ids = {}
total = len(chunks)
for i, chunk in enumerate(chunks):
if i % 50 == 0 or i == total - 1:
pct = (i + 1) * 100 // total
mb_done = sum(len(c) for c in chunks[:i+1]) / 1024 / 1024
print(f"\r[*] Analysiere ... {pct:3d}% ({mb_done:.0f} MB)", end="", flush=True)
for m in RE_USERNAME.finditer(chunk):
v = m.group().decode("ascii", errors="replace")
usernames[v] = usernames.get(v, 0) + 1
# Proximity: Passwort das innerhalb von 512 Bytes nach einem Username steht
for m in RE_USER_PASS_PROXIMITY.finditer(chunk):
pw = m.group(2).decode("ascii", errors="replace")
has_upper = any(c.isupper() for c in pw)
has_lower = any(c.islower() for c in pw)
has_digit = any(c.isdigit() for c in pw)
if has_upper and has_lower and has_digit:
passwords[pw] = passwords.get(pw, 0) + 500
for m in RE_PASSWORD.finditer(chunk):
v = m.group().decode("ascii", errors="replace")
# Filter: mindestens 2 Großbuchstaben + 2 Kleinbuchstaben + 1 Ziffer
has_upper = sum(1 for c in v if c.isupper()) >= 2
has_lower = sum(1 for c in v if c.islower()) >= 2
has_digit = sum(1 for c in v if c.isdigit()) >= 1
if has_upper and has_lower and has_digit:
passwords[v] = passwords.get(v, 0) + 1
for m in RE_IP.finditer(chunk):
ip = m.group(1)
if _is_valid_ip(ip):
v = ip.decode("ascii")
ips[v] = ips.get(v, 0) + 1
for m in RE_DEVICE_ID.finditer(chunk):
v = m.group().decode("ascii")
device_ids[v] = device_ids.get(v, 0) + 1
print() # Zeilenumbruch nach Fortschrittszeile
# Nach Häufigkeit sortieren, häufigste zuerst
def top(d, n=10):
return sorted(d.items(), key=lambda x: -x[1])[:n]
return {
"usernames": top(usernames),
"passwords": top(passwords),
"ips": top(ips, 10),
"device_ids": top(device_ids),
}
# ---------------------------------------------------------------------------
# Hauptprogramm
# ---------------------------------------------------------------------------
SLICER_NAMES = [
"AnycubicSlicerNext.exe", # Windows
"AnycubicSlicer.exe",
"AnycubicSlicerNext", # Linux
"AnycubicSlicer",
]
def find_slicer_pid() -> "tuple[int, str] | tuple[None, None]":
for name in SLICER_NAMES:
if IS_WINDOWS:
pid = _win_find_pid(name)
else:
pid = _linux_find_pid(name)
if pid:
return pid, name
return None, None
def read_process(pid: int) -> "list[bytes]":
if IS_WINDOWS:
return _win_read_memory(pid)
else:
return _linux_read_memory(pid)
def write_env(results: dict, env_path: str,
username: str, password: str, ip: str,
mode_id: str = "20030", device_id: str = ""):
lines = []
if os.path.isfile(env_path):
with open(env_path) as f:
lines = f.readlines()
def set_val(key, val):
nonlocal lines
for i, line in enumerate(lines):
if line.startswith(f"{key}="):
lines[i] = f"{key}={val}\n"
return
lines.append(f"{key}={val}\n")
set_val("PRINTER_IP", ip)
set_val("MQTT_USERNAME", username)
set_val("MQTT_PASSWORD", password)
if device_id:
set_val("DEVICE_ID", device_id)
if mode_id:
set_val("MODE_ID", mode_id)
with open(env_path, "w") as f:
f.writelines(lines)
print(f"\n✓ Credentials in '{env_path}' gespeichert.")
def main():
parser = argparse.ArgumentParser(
description="Extrahiert MQTT-Credentials aus dem RAM des AnycubicSlicer-Prozesses"
)
parser.add_argument("--write-env", action="store_true",
help="Gefundene Credentials in .env schreiben")
parser.add_argument("--env-file", default=None,
help="Pfad zur .env-Datei (Standard: ../. env relativ zu diesem Script)")
parser.add_argument("--pid", type=int, default=None,
help="Prozess-PID direkt angeben (überspringt Auto-Erkennung)")
parser.add_argument("--verbose", action="store_true",
help="Alle Kandidaten ausgeben, nicht nur den besten")
args = parser.parse_args()
# .env-Pfad bestimmen
if args.env_file:
env_path = args.env_file
else:
env_path = os.path.join(os.path.dirname(__file__), "..", ".env")
env_path = os.path.normpath(env_path)
# Prozess finden
if args.pid:
pid, proc_name = args.pid, f"PID {args.pid}"
else:
print("[*] Suche AnycubicSlicer-Prozess ...")
pid, proc_name = find_slicer_pid()
if not pid:
print("✗ AnycubicSlicer nicht gefunden. Bitte den Slicer starten und "
"mit dem Drucker verbinden, dann erneut ausführen.")
sys.exit(1)
print(f"[*] Prozess gefunden: {proc_name} (PID {pid})")
print(f"[*] Lese Prozess-Speicher ...")
try:
chunks = read_process(pid)
except PermissionError as e:
print(f"✗ Zugriffsfehler: {e}")
sys.exit(1)
total_mb = sum(len(c) for c in chunks) / 1024 / 1024
print(f"[*] {len(chunks)} Speichersegmente gelesen ({total_mb:.1f} MB)")
print(f"[*] Durchsuche nach Credentials ...")
results = search_chunks(chunks)
# Ausgabe
print("\n" + "="*55)
print(" ERGEBNISSE")
print("="*55)
def show(label, items, verbose):
if not items:
print(f" {label:12s} — nicht gefunden")
return items[0][0] if items else ""
best = items[0][0]
print(f" {label:12s} {best} (Treffer: {items[0][1]})")
if verbose and len(items) > 1:
for val, cnt in items[1:]:
print(f" {'':12s} {val} (Treffer: {cnt})")
return best
best_user = show("Username", results["usernames"], args.verbose)
best_pass = show("Password", results["passwords"], args.verbose)
best_device = show("Device-ID", results["device_ids"], args.verbose)
# IP: 192.168.x.x bevorzugen
lan_ips = [(ip, cnt) for ip, cnt in results["ips"]
if ip.startswith("192.168.") or ip.startswith("10.") or ip.startswith("172.")]
if not lan_ips:
lan_ips = results["ips"]
best_ip = show("Drucker-IP", lan_ips, args.verbose)
print("="*55)
if not best_user or not best_pass:
print("\n⚠ Keine vollständigen Credentials gefunden.")
print(" Stelle sicher dass der Slicer MIT dem Drucker verbunden ist.")
sys.exit(1)
if args.write_env:
write_env(results, env_path, best_user, best_pass, best_ip,
device_id=best_device)
else:
print(f"\nHinweis: --write-env übergeben um Credentials in '{env_path}' zu speichern.")
if __name__ == "__main__":
main()