Deep-dive · Maker stack (Blender + 3D printing)¶
Phase: 8 (tracker) Versione: maggio 2026
1. AI 3D generation 2026¶
TRELLIS-2 (Microsoft, MIT) — self-hosted¶
CVPR 2025 Spotlight, 4B params, struttura sparsa voxel "O-Voxel" field-free.
| GPU | VRAM | Risoluzione max | Tempo |
|---|---|---|---|
| RTX 3090/4080 | 16 GB | 512³ | ~6 s |
| RTX 4090/A6000 | 24 GB | 1024³ | ~17 s |
| A100/H100 | 40-80 GB | 1536³ | ~60 s |
git clone https://github.com/microsoft/TRELLIS.2.git
cd TRELLIS.2
bash setup.sh --new-env
conda activate trellis2
# GPU senza flash-attn (es. V100):
pip install xformers
export ATTN_BACKEND=xformers
Output: GLB (default OPAQUE), OBJ, PLY, Radiance Fields, 3D Gaussians. Pesi: microsoft/TRELLIS.2-4B su HuggingFace.
SPAR3D (Stability AI) — self-hosted¶
Two-stage: point cloud diffusion + mesh generation conditioned. 0.7s per oggetto, edit point cloud in 0.3s. Licenza Stability AI Community.
TripoSR (VAST AI / Stability) — self-hosted¶
Singola immagine → mesh in <10 s GPU consumer. Approccio feed-forward senza diffusion. Adatto a batch dove velocità > qualità assoluta.
Meshy / CSM — cloud commerciale¶
| Tool | Latenza | Costo | Output |
|---|---|---|---|
| Meshy | ~60s text-to-3D | ~0.10 USD/gen | GLB, OBJ, FBX, STL, USDZ |
| CSM | 30-120s | Su richiesta | GLB, OBJ |
import asyncio
import httpx
async def meshy_text_to_3d(prompt: str, api_key: str) -> str:
headers = {"Authorization": f"Bearer {api_key}"}
async with httpx.AsyncClient() as client:
r = await client.post(
"https://api.meshy.ai/openapi/v2/text-to-3d",
json={"mode": "preview", "prompt": prompt, "art_style": "realistic"},
headers=headers,
)
r.raise_for_status()
task_id = r.json()["result"]
for _ in range(120):
await asyncio.sleep(5)
status = await client.get(
f"https://api.meshy.ai/openapi/v2/text-to-3d/{task_id}", headers=headers
)
data = status.json()
if data["status"] == "SUCCEEDED":
return data["model_urls"]["glb"]
if data["status"] == "FAILED":
raise RuntimeError(f"Meshy failed: {data.get('task_error')}")
raise TimeoutError("Meshy task timeout")
GLB → STL conversion¶
import trimesh
def glb_to_stl(glb_path: str, stl_path: str) -> None:
scene = trimesh.load(glb_path)
if isinstance(scene, trimesh.Scene):
mesh = trimesh.util.concatenate(
[g for g in scene.geometry.values() if isinstance(g, trimesh.Trimesh)]
)
else:
mesh = scene
mesh.export(stl_path)
2. Blender bpy automation¶
Installazione headless¶
Limite architetturale
bpy può essere importato una sola volta per processo. In server long-running, esegui ogni operazione mesh in subprocess dedicato.
Pipeline GLB → repair → STL¶
"""
agents/maker-agent/blender_pipeline.py
Eseguire come subprocess isolato.
"""
import logging
import math
from pathlib import Path
log = logging.getLogger(__name__)
def process_glb_to_stl(
glb_path: str,
stl_path: str,
scale: float = 1.0,
rotation_z_deg: float = 0.0,
) -> None:
import bpy
log.info("Pipeline: %s → %s", glb_path, stl_path)
# 1. Pulizia scena
bpy.ops.wm.read_factory_settings(use_empty=True)
# 2. Import GLB
bpy.ops.import_scene.gltf(filepath=glb_path)
# 3. Seleziona mesh + join
mesh_objects = [o for o in bpy.context.scene.objects if o.type == "MESH"]
if not mesh_objects:
raise ValueError(f"Nessuna mesh in {glb_path}")
bpy.ops.object.select_all(action="DESELECT")
for obj in mesh_objects:
obj.select_set(True)
bpy.context.view_layer.objects.active = mesh_objects[0]
bpy.ops.object.join()
merged = bpy.context.active_object
# 4. Scale + rotate + apply
merged.scale = (scale, scale, scale)
merged.rotation_euler[2] = math.radians(rotation_z_deg)
bpy.ops.object.transform_apply(scale=True, rotation=True, location=False)
# 5. Repair: dedup vertices, normali coerenti, fill holes
bpy.ops.object.mode_set(mode="EDIT")
bpy.ops.mesh.select_all(action="SELECT")
bpy.ops.mesh.remove_doubles(threshold=0.0001)
bpy.ops.mesh.normals_make_consistent(inside=False)
bpy.ops.mesh.fill_holes(sides=4)
bpy.ops.object.mode_set(mode="OBJECT")
# 6. Export STL
Path(stl_path).parent.mkdir(parents=True, exist_ok=True)
bpy.ops.wm.stl_export(filepath=stl_path, export_selected_objects=True, ascii_format=False)
if __name__ == "__main__":
import argparse
p = argparse.ArgumentParser()
p.add_argument("--glb", required=True)
p.add_argument("--stl", required=True)
p.add_argument("--scale", type=float, default=1.0)
p.add_argument("--rotation-z", type=float, default=0.0)
args = p.parse_args()
logging.basicConfig(level=logging.INFO)
process_glb_to_stl(args.glb, args.stl, args.scale, args.rotation_z)
# Invocazione safe da Jarvis
import subprocess, sys
def run_blender_pipeline(glb_path: str, stl_path: str, scale: float = 1.0) -> None:
result = subprocess.run(
[sys.executable, "/opt/jarvis/maker/blender_pipeline.py",
"--glb", glb_path, "--stl", stl_path, "--scale", str(scale)],
capture_output=True, text=True, timeout=120,
)
if result.returncode != 0:
raise RuntimeError(f"Blender error:\n{result.stderr}")
Blender 4.2+ extension system¶
# blender_manifest.toml
schema_version = "1.0.0"
id = "jarvis_maker"
version = "0.1.0"
name = "Jarvis Maker Integration"
maintainer = "open-jarvis"
type = "add-on"
blender_version_min = "4.2.0"
license = ["SPDX:Apache-2.0"]
3. 3D printer control¶
Klipper + Moonraker (stack primario)¶
Moonraker porta default 7125. REST + WebSocket JSON-RPC 2.0.
| Endpoint | Funzione |
|---|---|
GET /printer/info | Info firmware Klipper |
GET /printer/objects/query | Stato (temp, posizione) |
POST /printer/objects/subscribe | Subscribe real-time |
POST /printer/print/start | Avvia stampa |
POST /printer/print/pause | Pausa |
POST /printer/print/resume | Resume |
POST /printer/print/cancel | Cancella |
POST /server/files/upload | Upload G-code |
POST /printer/gcode/script | G-code raw |
"""
agents/maker-agent/moonraker_client.py
Async REST + WebSocket client.
"""
import asyncio
import json
import httpx
import websockets
class MoonrakerClient:
def __init__(self, host: str = "localhost", port: int = 7125):
self._base = f"http://{host}:{port}"
self._ws_url = f"ws://{host}:{port}/websocket"
self._http = httpx.AsyncClient(timeout=30.0)
self._rpc_id = 0
async def get(self, path: str, **params):
r = await self._http.get(f"{self._base}{path}", params=params)
r.raise_for_status()
return r.json()["result"]
async def post(self, path: str, body: dict | None = None):
r = await self._http.post(f"{self._base}{path}", json=body or {})
r.raise_for_status()
return r.json().get("result", {})
async def start_print(self, filename: str):
await self.post("/printer/print/start", {"filename": filename})
async def cancel_print(self):
await self.post("/printer/print/cancel")
async def get_printer_status(self) -> dict:
result = await self.get(
"/printer/objects/query",
objects="print_stats,extruder,heater_bed,toolhead,virtual_sdcard",
)
return result["status"]
async def upload_file(self, local_path: str, remote_name: str) -> str:
with open(local_path, "rb") as f:
r = await self._http.post(
f"{self._base}/server/files/upload",
files={"file": (remote_name, f, "application/octet-stream")},
)
r.raise_for_status()
return r.json()["result"]["item"]["path"]
async def subscribe_and_watch(self, objects: dict, on_update):
async with websockets.connect(self._ws_url) as ws:
self._rpc_id += 1
await ws.send(json.dumps({
"jsonrpc": "2.0",
"method": "printer.objects.subscribe",
"params": {"objects": objects},
"id": self._rpc_id,
}))
async for raw in ws:
msg = json.loads(raw)
if msg.get("id") == self._rpc_id and "result" in msg:
for obj_name, data in msg["result"].get("status", {}).items():
await on_update(obj_name, data)
elif msg.get("method") == "notify_status_update":
for obj_name, data in msg["params"][0].items():
await on_update(obj_name, data)
async def close(self):
await self._http.aclose()
OctoPrint + OctoEverywhere¶
OctoPrint REST porta 5000. Auth via X-Api-Key. OctoEverywhere Gadget AI failure detection (>320K maker), include MCP server github.com/OctoEverywhere/mcp.
Bambu Lab MQTT (porta 8883 TLS)¶
"""
agents/maker-agent/bambu_client.py
MQTT TLS Bambu Lab LAN mode.
"""
import json
import ssl
from dataclasses import dataclass, field
import paho.mqtt.client as mqtt
@dataclass
class BambuClient:
serial: str
ip: str
access_code: str
_state: dict = field(default_factory=dict)
_client: mqtt.Client | None = None
def connect(self):
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE # Bambu self-signed cert
self._client = mqtt.Client(client_id="jarvis", protocol=mqtt.MQTTv311)
self._client.username_pw_set("bblp", self.access_code)
self._client.tls_set_context(ctx)
self._client.on_message = self._on_message
self._client.connect(self.ip, 8883, keepalive=60)
self._client.subscribe(f"device/{self.serial}/report")
self._client.loop_start()
def _on_message(self, _c, _u, msg):
payload = json.loads(msg.payload)
if "print" in payload:
self._state.update(payload["print"])
def stop_print(self):
cmd = {"print": {"command": "stop", "sequence_id": "1"}}
self._client.publish(f"device/{self.serial}/request", json.dumps(cmd))
def get_state(self) -> dict:
return dict(self._state)
bambu-moonraker-shim: bridge che emula Moonraker API verso Bambu (P1P, P1S, X1C, A1) per uniformità client.
PrusaLink / Prusa Connect¶
OpenAPI spec: prusa3d/Prusa-Link-Web/spec/openapi.yaml.
class PrusaLinkClient:
def __init__(self, host: str, api_key: str, port: int = 80):
self._base = f"http://{host}:{port}/api/v1"
self._headers = {"X-Api-Key": api_key}
async def start_print(self, path: str):
async with httpx.AsyncClient() as c:
r = await c.post(
f"{self._base}/job",
headers=self._headers,
json={"command": "start", "path": path},
)
r.raise_for_status()
4. mcp-3D-printer-server (unificazione)¶
DMontgomery40/mcp-3D-printer-server unifica Bambu, OctoPrint, Klipper, Duet, Prusa, Creality + operazioni STL (scala, rotazione) + slicing.
"""
agents/maker-agent/mcp_printer_server.py
"""
from mcp.server import FastMCP
from agents.maker.moonraker_client import MoonrakerClient
mcp = FastMCP("jarvis-printer")
_moonraker = MoonrakerClient(host="192.168.1.100", port=7125)
@mcp.tool()
async def printer_start(filename: str) -> str:
"""Avvia stampa file presente sulla stampante."""
await _moonraker.start_print(filename)
return f"Stampa avviata: {filename}"
@mcp.tool()
async def printer_status() -> dict:
"""Stato corrente: temp, progresso, velocità."""
return await _moonraker.get_printer_status()
@mcp.tool()
async def printer_cancel() -> str:
await _moonraker.cancel_print()
return "Stampa cancellata."
Vantaggio: ogni agente LangGraph chiama printer.start, printer.status senza conoscere se dietro c'è Moonraker, OctoPrint, Bambu o Prusa.
5. Slicer automation CLI¶
"""
agents/maker-agent/slicer.py
"""
import subprocess
from dataclasses import dataclass
from pathlib import Path
PRUSA_SLICER_BIN = "/usr/bin/prusa-slicer"
MATERIAL_PROFILES = {
"PLA": "0.20mm QUALITY @MK4",
"PETG": "0.20mm QUALITY PETG @MK4",
"ABS": "0.20mm QUALITY ABS @MK4",
"TPU": "0.30mm DRAFT TPU @MK4",
}
@dataclass(frozen=True)
class SliceResult:
gcode_path: str
estimated_time_s: int
filament_used_g: float
def slice_stl(stl_path: str, output_dir: str, material: str = "PLA",
printer: str = "Original Prusa MK4") -> SliceResult:
if material not in MATERIAL_PROFILES:
raise ValueError(material)
out = Path(output_dir)
out.mkdir(parents=True, exist_ok=True)
gcode = str(out / f"{Path(stl_path).stem}.gcode")
cmd = [
PRUSA_SLICER_BIN, "--export-gcode", "--output", gcode,
"--printer", printer,
"--print-settings", MATERIAL_PROFILES[material],
"--filament-settings", material,
stl_path,
]
r = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if r.returncode != 0:
raise RuntimeError(f"Slicer error:\n{r.stderr}")
return SliceResult(gcode_path=gcode, estimated_time_s=_parse_time(r.stdout),
filament_used_g=_parse_filament(r.stdout))
def _parse_time(out: str) -> int:
for line in out.splitlines():
if "estimated printing time" in line.lower():
parts = line.split("=")[-1].strip()
seconds = 0
for part in parts.split():
if part.endswith("h"):
seconds += int(part[:-1]) * 3600
elif part.endswith("m"):
seconds += int(part[:-1]) * 60
elif part.endswith("s"):
seconds += int(part[:-1])
return seconds
return 0
def _parse_filament(out: str) -> float:
for line in out.splitlines():
if "filament used [g]" in line.lower():
return float(line.split("=")[-1].strip())
return 0.0
6. End-to-end LangGraph workflow¶
Scenario: "stampa un porta-penne esagonale"
"""
agents/maker-agent/print_workflow.py
"""
import asyncio
import sys
import subprocess
from dataclasses import dataclass
from langgraph.graph import END, START, StateGraph
from agents.maker.moonraker_client import MoonrakerClient
from agents.maker.slicer import slice_stl
MOONRAKER_HOST = "192.168.1.100"
WORK_DIR = "/tmp/jarvis_maker"
@dataclass
class PrintState:
user_request: str = ""
glb_path: str = ""
stl_path: str = ""
gcode_path: str = ""
filament_g: float = 0.0
estimated_time_s: int = 0
remote_filename: str = ""
step: str = ""
async def generate_3d_model(state: PrintState) -> PrintState:
try:
glb = await _trellis_generate(state.user_request)
except Exception:
glb = await _meshy_generate(state.user_request)
return PrintState(**{**state.__dict__, "glb_path": glb, "step": "generated"})
async def repair_mesh(state: PrintState) -> PrintState:
stl = f"{WORK_DIR}/output.stl"
r = subprocess.run(
[sys.executable, "/opt/jarvis/maker/blender_pipeline.py",
"--glb", state.glb_path, "--stl", stl, "--scale", "1.0"],
capture_output=True, text=True, timeout=120,
)
if r.returncode != 0:
raise RuntimeError(f"Blender: {r.stderr}")
return PrintState(**{**state.__dict__, "stl_path": stl, "step": "repaired"})
async def slice_model(state: PrintState) -> PrintState:
result = slice_stl(state.stl_path, f"{WORK_DIR}/gcode", material="PLA")
return PrintState(**{
**state.__dict__,
"gcode_path": result.gcode_path,
"filament_g": result.filament_used_g,
"estimated_time_s": result.estimated_time_s,
"step": "sliced",
})
async def send_to_printer(state: PrintState) -> PrintState:
client = MoonrakerClient(host=MOONRAKER_HOST)
remote = "jarvis_print.gcode"
await client.upload_file(state.gcode_path, remote)
await client.start_print(remote)
await client.close()
return PrintState(**{**state.__dict__, "remote_filename": remote, "step": "printing"})
async def monitor_and_notify(state: PrintState) -> PrintState:
client = MoonrakerClient(host=MOONRAKER_HOST)
done = asyncio.Event()
async def on_update(obj_name, data):
if obj_name == "print_stats" and data.get("state") in ("complete", "error", "cancelled"):
done.set()
sub_task = asyncio.create_task(
client.subscribe_and_watch(
objects={"print_stats": ["state", "print_duration", "filament_used"]},
on_update=on_update,
)
)
await done.wait()
sub_task.cancel()
await client.close()
return PrintState(**{**state.__dict__, "step": "complete"})
def build_print_graph():
graph = StateGraph(PrintState)
graph.add_node("generate_3d", generate_3d_model)
graph.add_node("repair_mesh", repair_mesh)
graph.add_node("slice", slice_model)
graph.add_node("send_to_printer", send_to_printer)
graph.add_node("monitor", monitor_and_notify)
graph.add_edge(START, "generate_3d")
graph.add_edge("generate_3d", "repair_mesh")
graph.add_edge("repair_mesh", "slice")
graph.add_edge("slice", "send_to_printer")
graph.add_edge("send_to_printer", "monitor")
graph.add_edge("monitor", END)
return graph.compile()
async def run_print_job(user_request: str) -> PrintState:
return await build_print_graph().ainvoke(PrintState(user_request=user_request))
7. Dependency summary¶
[project.optional-dependencies]
maker = [
"bpy==4.3.0",
"fake-bpy-module-4.3",
"trimesh>=4.4.0",
"websockets>=13.0",
"httpx>=0.27.0",
"paho-mqtt>=2.1.0",
"langgraph>=0.2.0",
"moonraker-api>=2.1.5",
]