release: v0.9.1-beta14
This commit is contained in:
@@ -1,23 +1,23 @@
|
||||
"""
|
||||
extract_credentials.py – Extrahiert Anycubic LAN-MQTT-Credentials aus dem RAM
|
||||
des laufenden AnycubicSlicerNext-Prozesses.
|
||||
extract_credentials.py – Extracts Anycubic LAN-MQTT credentials from the RAM
|
||||
of the running AnycubicSlicerNext process.
|
||||
|
||||
Voraussetzungen:
|
||||
- AnycubicSlicerNext läuft und ist mit dem Drucker verbunden
|
||||
- Gleiches Benutzerkonto wie der Slicer-Prozess (kein Admin nötig)
|
||||
Requirements:
|
||||
- AnycubicSlicerNext is running and connected to the printer
|
||||
- Same user account as the slicer process (no admin required)
|
||||
|
||||
Verwendung:
|
||||
Usage:
|
||||
python3 extract_credentials.py [--write-env] [--env-file ../.env]
|
||||
|
||||
Funktionsweise:
|
||||
1. Prozess "AnycubicSlicer.exe" (Windows) bzw. "AnycubicSlicer" (Linux) finden
|
||||
2. Speicherseiten des Prozesses durchsuchen (nur r/rw, keine Exec-Pages)
|
||||
3. Nach MQTT-Credential-Patterns suchen:
|
||||
How it works:
|
||||
1. Find process "AnycubicSlicer.exe" (Windows) or "AnycubicSlicer" (Linux)
|
||||
2. Scan memory pages of the process (only r/rw, no exec pages)
|
||||
3. Search for MQTT credential patterns:
|
||||
Username: user[A-Za-z0-9]{8,12}
|
||||
Password: [A-Za-z0-9+/]{13,18}
|
||||
Drucker-IP: d{1,3}.d{1,3}.d{1,3}.d{1,3}
|
||||
4. Kandidaten nach Plausibilität filtern und ausgeben
|
||||
5. Optional: .env-Datei schreiben
|
||||
Printer IP: d{1,3}.d{1,3}.d{1,3}.d{1,3}
|
||||
4. Filter candidates by plausibility and print results
|
||||
5. Optionally write .env file
|
||||
"""
|
||||
|
||||
import argparse
|
||||
@@ -28,48 +28,48 @@ import sys
|
||||
import platform
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Plattform-Erkennung
|
||||
# Platform detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
IS_WINDOWS = platform.system() == "Windows"
|
||||
IS_LINUX = platform.system() == "Linux"
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pattern
|
||||
# Patterns
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Username: "user" + 8–12 alphanumerische Zeichen (drucker-generiert)
|
||||
# Username: "user" + 8–12 alphanumeric characters (printer-generated)
|
||||
RE_USERNAME = re.compile(rb'user[A-Za-z0-9]{8,12}(?=[^A-Za-z0-9]|$)')
|
||||
|
||||
# Password: 13–20 alphanumerische Zeichen (kein / da kein RTSP-Pfad)
|
||||
# Anycubic-Passwörter: gemischte Groß/Klein/Ziffern, kein Slash
|
||||
# Password: 13–20 alphanumeric characters (no / since no RTSP path)
|
||||
# Anycubic passwords: mixed upper/lower/digits, no slash
|
||||
RE_PASSWORD = re.compile(rb'[A-Za-z0-9]{13,20}(?=[^A-Za-z0-9]|$)')
|
||||
|
||||
# Kontext-Pattern: sucht Passwort das direkt nach "password" im Speicher steht
|
||||
# Context pattern: password directly following "password" in memory
|
||||
RE_PASSWORD_CTX = re.compile(rb'(?:password|passwd|Password)\x00{0,4}([A-Za-z0-9]{10,25})(?=[^A-Za-z0-9]|$)', re.IGNORECASE)
|
||||
|
||||
# Proximity-Pattern: Username gefolgt von Passwort in naher Umgebung (<512 Bytes)
|
||||
# Proximity pattern: username followed by password within close range (<512 bytes)
|
||||
RE_USER_PASS_PROXIMITY = re.compile(
|
||||
rb'(user[A-Za-z0-9]{8,12}).{1,512}?([A-Za-z0-9]{13,20})(?=[^A-Za-z0-9]|$)',
|
||||
re.DOTALL
|
||||
)
|
||||
|
||||
# IPv4-Adresse (kein localhost, kein Broadcast)
|
||||
# IPv4 address (no localhost, no broadcast)
|
||||
RE_IP = re.compile(rb'(?<![.\d])(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})(?![.\d])')
|
||||
|
||||
# mode_id: 5-stellige Zahl (z.B. 20030)
|
||||
# mode_id: 5-digit number (e.g. 20030)
|
||||
RE_MODE_ID = re.compile(rb'(?<!\d)(2\d{4})(?!\d)')
|
||||
|
||||
# device_id: 32 Hex-Zeichen (MD5-Format)
|
||||
# device_id: 32 hex characters (MD5 format)
|
||||
RE_DEVICE_ID = re.compile(rb'[0-9a-f]{32}(?=[^0-9a-f]|$)')
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Windows – Speicher lesen via ctypes / ReadProcessMemory
|
||||
# Windows – read memory via ctypes / ReadProcessMemory
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _win_find_pid(name: str) -> "int | None":
|
||||
"""Findet die PID eines Prozesses anhand des Namens (case-insensitive)."""
|
||||
"""Find the PID of a process by name (case-insensitive)."""
|
||||
import ctypes
|
||||
import ctypes.wintypes
|
||||
|
||||
@@ -110,15 +110,15 @@ def _win_find_pid(name: str) -> "int | None":
|
||||
|
||||
|
||||
def _win_read_memory(pid: int, chunk_size: int = 0x10000) -> "list[bytes]":
|
||||
"""Liest alle lesbaren Speicherseiten eines Windows-Prozesses."""
|
||||
"""Read all readable memory pages of a Windows process."""
|
||||
import ctypes
|
||||
import ctypes.wintypes
|
||||
|
||||
PROCESS_VM_READ = 0x0010
|
||||
PROCESS_VM_READ = 0x0010
|
||||
PROCESS_QUERY_INFORMATION = 0x0400
|
||||
MEM_COMMIT = 0x1000
|
||||
PAGE_NOACCESS = 0x01
|
||||
PAGE_GUARD = 0x100
|
||||
MEM_COMMIT = 0x1000
|
||||
PAGE_NOACCESS = 0x01
|
||||
PAGE_GUARD = 0x100
|
||||
|
||||
class MEMORY_BASIC_INFORMATION(ctypes.Structure):
|
||||
_fields_ = [
|
||||
@@ -134,7 +134,7 @@ def _win_read_memory(pid: int, chunk_size: int = 0x10000) -> "list[bytes]":
|
||||
k32 = ctypes.windll.kernel32
|
||||
handle = k32.OpenProcess(PROCESS_VM_READ | PROCESS_QUERY_INFORMATION, False, pid)
|
||||
if not handle:
|
||||
raise PermissionError(f"OpenProcess fehlgeschlagen (PID {pid}): {ctypes.GetLastError()}")
|
||||
raise PermissionError(f"OpenProcess failed (PID {pid}): {ctypes.GetLastError()}")
|
||||
|
||||
chunks = []
|
||||
addr = 0
|
||||
@@ -147,7 +147,7 @@ def _win_read_memory(pid: int, chunk_size: int = 0x10000) -> "list[bytes]":
|
||||
mbi.State != MEM_COMMIT or
|
||||
mbi.Protect & PAGE_NOACCESS or
|
||||
mbi.Protect & PAGE_GUARD or
|
||||
mbi.RegionSize > 256 * 1024 * 1024 # >256 MB überspringen
|
||||
mbi.RegionSize > 256 * 1024 * 1024 # skip >256 MB regions
|
||||
)
|
||||
if not skip:
|
||||
buf = ctypes.create_string_buffer(mbi.RegionSize)
|
||||
@@ -156,7 +156,7 @@ def _win_read_memory(pid: int, chunk_size: int = 0x10000) -> "list[bytes]":
|
||||
buf, mbi.RegionSize, ctypes.byref(read)):
|
||||
chunks.append(bytes(buf[:read.value]))
|
||||
addr += mbi.RegionSize
|
||||
if addr >= 0x7FFFFFFFFFFF: # Ende des User-Space (64-bit)
|
||||
if addr >= 0x7FFFFFFFFFFF: # end of user space (64-bit)
|
||||
break
|
||||
finally:
|
||||
k32.CloseHandle(handle)
|
||||
@@ -165,11 +165,11 @@ def _win_read_memory(pid: int, chunk_size: int = 0x10000) -> "list[bytes]":
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Linux – Speicher lesen via /proc/{pid}/mem
|
||||
# Linux – read memory via /proc/{pid}/mem
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _linux_find_pid(name: str) -> "int | None":
|
||||
"""Findet PID anhand des Prozessnamens in /proc."""
|
||||
"""Find PID by process name in /proc."""
|
||||
for entry in os.listdir("/proc"):
|
||||
if not entry.isdigit():
|
||||
continue
|
||||
@@ -183,7 +183,7 @@ def _linux_find_pid(name: str) -> "int | None":
|
||||
|
||||
|
||||
def _linux_read_memory(pid: int) -> "list[bytes]":
|
||||
"""Liest lesbare Speichersegmente aus /proc/{pid}/mem."""
|
||||
"""Read readable memory segments from /proc/{pid}/mem."""
|
||||
chunks = []
|
||||
maps_path = f"/proc/{pid}/maps"
|
||||
mem_path = f"/proc/{pid}/mem"
|
||||
@@ -193,8 +193,8 @@ def _linux_read_memory(pid: int) -> "list[bytes]":
|
||||
mem = open(mem_path, "rb")
|
||||
except PermissionError:
|
||||
raise PermissionError(
|
||||
f"Kein Zugriff auf /proc/{pid}/mem — "
|
||||
"Script als gleicher Benutzer wie der Slicer starten."
|
||||
f"No access to /proc/{pid}/mem — "
|
||||
"run the script as the same user as the slicer process."
|
||||
)
|
||||
|
||||
for line in maps:
|
||||
@@ -202,16 +202,16 @@ def _linux_read_memory(pid: int) -> "list[bytes]":
|
||||
if len(parts) < 2:
|
||||
continue
|
||||
perms = parts[1]
|
||||
if "r" not in perms: # nur lesbare Seiten
|
||||
if "r" not in perms: # readable pages only
|
||||
continue
|
||||
if "x" in perms: # Code-Seiten überspringen (keine Strings)
|
||||
if "x" in perms: # skip code pages (no strings)
|
||||
continue
|
||||
try:
|
||||
start, end = [int(x, 16) for x in parts[0].split("-")]
|
||||
except ValueError:
|
||||
continue
|
||||
size = end - start
|
||||
if size > 256 * 1024 * 1024: # >256 MB überspringen
|
||||
if size > 256 * 1024 * 1024: # skip >256 MB regions
|
||||
continue
|
||||
try:
|
||||
mem.seek(start)
|
||||
@@ -226,7 +226,7 @@ def _linux_read_memory(pid: int) -> "list[bytes]":
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pattern-Suche
|
||||
# Pattern search
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _is_valid_ip(ip_bytes: bytes) -> bool:
|
||||
@@ -245,7 +245,7 @@ def _is_valid_ip(ip_bytes: bytes) -> bool:
|
||||
|
||||
|
||||
def search_chunks(chunks: "list[bytes]") -> dict:
|
||||
"""Durchsucht Speicher-Chunks nach Credential-Patterns."""
|
||||
"""Search memory chunks for credential patterns."""
|
||||
usernames = {} # value → count
|
||||
passwords = {}
|
||||
ips = {}
|
||||
@@ -256,12 +256,12 @@ def search_chunks(chunks: "list[bytes]") -> dict:
|
||||
if i % 50 == 0 or i == total - 1:
|
||||
pct = (i + 1) * 100 // total
|
||||
mb_done = sum(len(c) for c in chunks[:i+1]) / 1024 / 1024
|
||||
print(f"\r[*] Analysiere ... {pct:3d}% ({mb_done:.0f} MB)", end="", flush=True)
|
||||
print(f"\r[*] Scanning ... {pct:3d}% ({mb_done:.0f} MB)", end="", flush=True)
|
||||
for m in RE_USERNAME.finditer(chunk):
|
||||
v = m.group().decode("ascii", errors="replace")
|
||||
usernames[v] = usernames.get(v, 0) + 1
|
||||
|
||||
# Proximity: Passwort das innerhalb von 512 Bytes nach einem Username steht
|
||||
# Proximity: password within 512 bytes after a username
|
||||
for m in RE_USER_PASS_PROXIMITY.finditer(chunk):
|
||||
pw = m.group(2).decode("ascii", errors="replace")
|
||||
has_upper = any(c.isupper() for c in pw)
|
||||
@@ -272,7 +272,7 @@ def search_chunks(chunks: "list[bytes]") -> dict:
|
||||
|
||||
for m in RE_PASSWORD.finditer(chunk):
|
||||
v = m.group().decode("ascii", errors="replace")
|
||||
# Filter: mindestens 2 Großbuchstaben + 2 Kleinbuchstaben + 1 Ziffer
|
||||
# Filter: at least 2 uppercase + 2 lowercase + 1 digit
|
||||
has_upper = sum(1 for c in v if c.isupper()) >= 2
|
||||
has_lower = sum(1 for c in v if c.islower()) >= 2
|
||||
has_digit = sum(1 for c in v if c.isdigit()) >= 1
|
||||
@@ -289,9 +289,9 @@ def search_chunks(chunks: "list[bytes]") -> dict:
|
||||
v = m.group().decode("ascii")
|
||||
device_ids[v] = device_ids.get(v, 0) + 1
|
||||
|
||||
print() # Zeilenumbruch nach Fortschrittszeile
|
||||
print() # newline after progress line
|
||||
|
||||
# Nach Häufigkeit sortieren, häufigste zuerst
|
||||
# Sort by frequency, most frequent first
|
||||
def top(d, n=10):
|
||||
return sorted(d.items(), key=lambda x: -x[1])[:n]
|
||||
|
||||
@@ -304,7 +304,7 @@ def search_chunks(chunks: "list[bytes]") -> dict:
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Hauptprogramm
|
||||
# Main
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
SLICER_NAMES = [
|
||||
@@ -359,95 +359,95 @@ def write_env(results: dict, env_path: str,
|
||||
|
||||
with open(env_path, "w") as f:
|
||||
f.writelines(lines)
|
||||
print(f"\n✓ Credentials in '{env_path}' gespeichert.")
|
||||
print(f"\n✓ Credentials saved to '{env_path}'.")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Extrahiert MQTT-Credentials aus dem RAM des AnycubicSlicer-Prozesses"
|
||||
description="Extract MQTT credentials from the AnycubicSlicer process RAM"
|
||||
)
|
||||
parser.add_argument("--write-env", action="store_true",
|
||||
help="Gefundene Credentials in .env schreiben")
|
||||
help="Write found credentials to .env file")
|
||||
parser.add_argument("--env-file", default=None,
|
||||
help="Pfad zur .env-Datei (Standard: ../. env relativ zu diesem Script)")
|
||||
help="Path to .env file (default: ../.env relative to this script)")
|
||||
parser.add_argument("--pid", type=int, default=None,
|
||||
help="Prozess-PID direkt angeben (überspringt Auto-Erkennung)")
|
||||
help="Specify process PID directly (skips auto-detection)")
|
||||
parser.add_argument("--verbose", action="store_true",
|
||||
help="Alle Kandidaten ausgeben, nicht nur den besten")
|
||||
help="Show all candidates, not just the best match")
|
||||
args = parser.parse_args()
|
||||
|
||||
# .env-Pfad bestimmen
|
||||
# Determine .env path
|
||||
if args.env_file:
|
||||
env_path = args.env_file
|
||||
else:
|
||||
env_path = os.path.join(os.path.dirname(__file__), "..", ".env")
|
||||
env_path = os.path.normpath(env_path)
|
||||
|
||||
# Prozess finden
|
||||
# Find process
|
||||
if args.pid:
|
||||
pid, proc_name = args.pid, f"PID {args.pid}"
|
||||
else:
|
||||
print("[*] Suche AnycubicSlicer-Prozess ...")
|
||||
print("[*] Searching for AnycubicSlicer process ...")
|
||||
pid, proc_name = find_slicer_pid()
|
||||
if not pid:
|
||||
print("✗ AnycubicSlicer nicht gefunden. Bitte den Slicer starten und "
|
||||
"mit dem Drucker verbinden, dann erneut ausführen.")
|
||||
print("✗ AnycubicSlicer not found. Please start the slicer, connect it "
|
||||
"to the printer, then run this script again.")
|
||||
sys.exit(1)
|
||||
|
||||
print(f"[*] Prozess gefunden: {proc_name} (PID {pid})")
|
||||
print(f"[*] Lese Prozess-Speicher ...")
|
||||
print(f"[*] Process found: {proc_name} (PID {pid})")
|
||||
print(f"[*] Reading process memory ...")
|
||||
|
||||
try:
|
||||
chunks = read_process(pid)
|
||||
except PermissionError as e:
|
||||
print(f"✗ Zugriffsfehler: {e}")
|
||||
print(f"✗ Permission error: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
total_mb = sum(len(c) for c in chunks) / 1024 / 1024
|
||||
print(f"[*] {len(chunks)} Speichersegmente gelesen ({total_mb:.1f} MB)")
|
||||
print(f"[*] Durchsuche nach Credentials ...")
|
||||
print(f"[*] {len(chunks)} memory segments read ({total_mb:.1f} MB)")
|
||||
print(f"[*] Searching for credentials ...")
|
||||
|
||||
results = search_chunks(chunks)
|
||||
|
||||
# Ausgabe
|
||||
# Output
|
||||
print("\n" + "="*55)
|
||||
print(" ERGEBNISSE")
|
||||
print(" RESULTS")
|
||||
print("="*55)
|
||||
|
||||
def show(label, items, verbose):
|
||||
if not items:
|
||||
print(f" {label:12s} — nicht gefunden")
|
||||
print(f" {label:12s} — not found")
|
||||
return items[0][0] if items else ""
|
||||
best = items[0][0]
|
||||
print(f" {label:12s} {best} (Treffer: {items[0][1]})")
|
||||
print(f" {label:12s} {best} (matches: {items[0][1]})")
|
||||
if verbose and len(items) > 1:
|
||||
for val, cnt in items[1:]:
|
||||
print(f" {'':12s} {val} (Treffer: {cnt})")
|
||||
print(f" {'':12s} {val} (matches: {cnt})")
|
||||
return best
|
||||
|
||||
best_user = show("Username", results["usernames"], args.verbose)
|
||||
best_pass = show("Password", results["passwords"], args.verbose)
|
||||
best_device = show("Device-ID", results["device_ids"], args.verbose)
|
||||
best_user = show("Username", results["usernames"], args.verbose)
|
||||
best_pass = show("Password", results["passwords"], args.verbose)
|
||||
best_device = show("Device-ID", results["device_ids"], args.verbose)
|
||||
|
||||
# IP: 192.168.x.x bevorzugen
|
||||
# IP: prefer 192.168.x.x
|
||||
lan_ips = [(ip, cnt) for ip, cnt in results["ips"]
|
||||
if ip.startswith("192.168.") or ip.startswith("10.") or ip.startswith("172.")]
|
||||
if not lan_ips:
|
||||
lan_ips = results["ips"]
|
||||
best_ip = show("Drucker-IP", lan_ips, args.verbose)
|
||||
best_ip = show("Printer IP", lan_ips, args.verbose)
|
||||
|
||||
print("="*55)
|
||||
|
||||
if not best_user or not best_pass:
|
||||
print("\n⚠ Keine vollständigen Credentials gefunden.")
|
||||
print(" Stelle sicher dass der Slicer MIT dem Drucker verbunden ist.")
|
||||
print("\n⚠ No complete credentials found.")
|
||||
print(" Make sure the slicer is connected to the printer.")
|
||||
sys.exit(1)
|
||||
|
||||
if args.write_env:
|
||||
write_env(results, env_path, best_user, best_pass, best_ip,
|
||||
device_id=best_device)
|
||||
else:
|
||||
print(f"\nHinweis: --write-env übergeben um Credentials in '{env_path}' zu speichern.")
|
||||
print(f"\nTip: pass --write-env to save credentials to '{env_path}'.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user