Commit 2a934cec authored by raojy's avatar raojy
Browse files

first

parent 4b618aa3
{
"id": "a92af27a-0106-4c6f-9d1c-f9783b652f44",
"revision": 0,
"last_node_id": 5,
"last_link_id": 3,
"nodes": [
{
"id": 1,
"type": "SenseNovaU1LocalLoader",
"pos": [
-535.8793125610355,
146.00520475769034
],
"size": [
504,
432
],
"flags": {},
"order": 0,
"mode": 0,
"inputs": [],
"outputs": [
{
"name": "u1_model",
"type": "SENSENOVA_U1_LOCAL_MODEL",
"links": [
1
]
},
{
"name": "model_info_json",
"type": "STRING",
"links": null
}
],
"properties": {
"Node name for S&R": "SenseNovaU1LocalLoader"
},
"widgets_values": [
"sensenova/SenseNova-U1-8B-MoT",
"",
"cuda",
"bfloat16",
"auto",
"none",
"",
"full",
""
]
},
{
"id": 2,
"type": "SenseNovaU1LocalTextToImage",
"pos": [
2.00001409912079,
146.00000553894034
],
"size": [
565.953125,
887.203125
],
"flags": {},
"order": 1,
"mode": 0,
"inputs": [
{
"name": "u1_model",
"type": "SENSENOVA_U1_LOCAL_MODEL",
"link": 1
}
],
"outputs": [
{
"name": "images",
"type": "IMAGE",
"links": [
2
]
},
{
"name": "text",
"type": "STRING",
"links": null
},
{
"name": "think_text",
"type": "STRING",
"links": [
3
]
},
{
"name": "metadata_json",
"type": "STRING",
"links": null
}
],
"properties": {
"Node name for S&R": "SenseNovaU1LocalTextToImage"
},
"widgets_values": [
"这张信息图的标题是“SenseNova-U1”,采用现代极简科技矩阵风格。整体布局为水平三列网格结构,背景是带有极浅银灰色细密点阵的哑光纯白高级纸张纹理,画面长宽比为16:9。\\n\\n排版采用严谨的视觉层级:主标题使用粗体无衬线黑体字,正文使用清晰的现代等宽字体。配色方案极其克制,以纯白色为底,深炭黑为主视觉文字和边框,浅石板灰用于背景色块和次要信息区分,图标采用精致的银灰色线框绘制。\\n\\n在画面正上方居中位置,使用醒目的深炭黑粗体字排布着大标题“SenseNova-U1”。标题正下方是浅石板灰色的等宽字体副标题“新一代端到端统一多模态大模型家族”。\\n\\n画面主体分为左、中、右三个相等的垂直信息区块,区块之间通过充足的负空间进行物理隔离。\\n\\n左侧区块的主题是概述。顶部有一个银灰色线框绘制的、由放大镜和齿轮交织的图标,旁边是粗体小标题“Overview”。该区块内从上到下垂直排列着三个要点:第一个要点旁边是一个代表文档与照片重叠的极简图标,紧跟着文字“多模态模型家族,统一文本/图像理解和生成”。向下是由两个相连的同心圆组成的架构图标,配有文字“基于NEO-Unify架构(端到端统一理解和生成)”。最下方是一个带有斜线划掉的眼睛和漏斗形状的图标,明确指示文本“无需视觉编码器(VE)和变分自编码器(VAE)”。\\n\\n中间区块展示模型矩阵。顶部是一个包含两个分支节点的树状网络图标,旁边是粗体小标题“两个模型规格”。区块内分为上下两个包裹在浅石板灰色极细边框内的卡片。上方的卡片内画着一个代表高密度的实心几何立方体图标,大字标注“SenseNova-U1-8B-MoT”,下方是等宽字体说明“8B MoT 密集主干模型”。下方的卡片内画着一个带有闪电符号的网状发光大脑图标,大字标注“SenseNova-U1-A3B-MoT”,下方是等宽字体说明“A3B MoT 混合专家(MoE)主干模型”。在这两个独立卡片的正下方,左侧放置一个笑脸轮廓图标搭配文字“将在HF等平台公开”,右侧放置一个带有折角的书面报告图标搭配文字“将发布技术报告”。\\n\\n右侧区块呈现核心优势。顶部是一个代表巅峰的上升阶梯折线图图标,旁边是粗体小标题“Highlights”。该区块内部垂直分布着四个带有浅石板灰底色的长方形色块,每个色块内部左侧对应一个具体的图标,右侧为文字。第一个色块内是一个无缝相连的莫比乌斯环图标,配文“原生统一架构,无VE和VAE”。第二个色块内是一个顶端带有星星的奖杯图标,配文“单一统一模型在理解和生成任务上均达到SOTA性能”。第三个色块内是代表文本行与拍立得照片交替穿插的图标,配文“强大的原生交错推理能力(模型原生生成图像进行推理)”。最后一个色块内是一个被切分出一小块的硬币与详细饼状图结合的图标,配文“能生成复杂信息图表,性价比出色”。",
"2720x1536|16:9",
4,
"none",
3,
0,
1,
50,
1,
42,
false,
false
]
},
{
"id": 3,
"type": "PreviewImage",
"pos": [
680.5872600708007,
135.8365686645507
],
"size": [
614.59375,
393.609375
],
"flags": {},
"order": 2,
"mode": 0,
"inputs": [
{
"name": "images",
"type": "IMAGE",
"link": 2
}
],
"outputs": [],
"properties": {
"Node name for S&R": "PreviewImage"
},
"widgets_values": []
},
{
"id": 4,
"type": "PreviewAny",
"pos": [
684.4660397338865,
621.6779303283691
],
"size": [
616.296875,
241
],
"flags": {},
"order": 3,
"mode": 0,
"inputs": [
{
"name": "source",
"type": "*",
"link": 3
}
],
"outputs": [],
"properties": {
"Node name for S&R": "PreviewAny"
},
"widgets_values": [
null,
null,
null
]
}
],
"links": [
[
1,
1,
0,
2,
0,
"SENSENOVA_U1_LOCAL_MODEL"
],
[
2,
2,
0,
3,
0,
"IMAGE"
],
[
3,
2,
2,
4,
0,
"STRING"
]
],
"groups": [],
"config": {},
"extra": {
"workflowRendererVersion": "Vue",
"ds": {
"scale": 0.7513148009015777,
"offset": [
736.3823906250004,
23.45817187500008
]
},
"frontendVersion": "1.39.19",
"VHS_latentpreview": false,
"VHS_latentpreviewrate": 0,
"VHS_MetadataImage": true,
"VHS_KeepIntermediate": true
},
"version": 0.4
}
from __future__ import annotations
import base64
import binascii
from io import BytesIO
from urllib.parse import urlparse
import numpy as np
from PIL import Image
MAX_IMAGE_BYTES = 50 * 1024 * 1024
def is_http_url(value: str) -> bool:
parsed = urlparse(value)
return parsed.scheme in {"http", "https"} and bool(parsed.netloc)
def strip_data_url(value: str) -> str:
if value.startswith("data:") and "," in value:
return value.split(",", 1)[1]
return value
def is_image_data_url(value: str) -> bool:
return value.startswith("data:image/") and ";base64," in value
def is_supported_vision_image_url(value: str) -> bool:
return is_http_url(value) or is_image_data_url(value)
def decode_base64_image(value: str) -> Image.Image:
try:
data = base64.b64decode(strip_data_url(value), validate=True)
except (binascii.Error, ValueError) as exc:
raise RuntimeError("Image response contains invalid base64 data.") from exc
return image_from_bytes(data)
def image_from_bytes(data: bytes) -> Image.Image:
if len(data) > MAX_IMAGE_BYTES:
raise RuntimeError("Image response is larger than the 50MB safety limit.")
try:
with Image.open(BytesIO(data)) as image:
return image.convert("RGB")
except Exception as exc:
raise RuntimeError("Image response could not be decoded by Pillow.") from exc
def pil_to_comfy_image(image: Image.Image):
try:
import torch
except ImportError as exc:
raise RuntimeError("PyTorch is required by ComfyUI to output IMAGE tensors.") from exc
rgb_image = image.convert("RGB")
array = np.array(rgb_image, dtype=np.float32, copy=True) / 255.0
array = np.ascontiguousarray(array)
tensor = torch.from_numpy(array).unsqueeze(0)
return tensor.contiguous().float()
def image_bytes_to_comfy_image(data: bytes):
return pil_to_comfy_image(image_from_bytes(data))
def comfy_image_info(image) -> str:
shape = tuple(image.shape) if hasattr(image, "shape") else "<unknown>"
dtype = getattr(image, "dtype", "<unknown>")
device = getattr(image, "device", "<unknown>")
is_contiguous = image.is_contiguous() if hasattr(image, "is_contiguous") else "<unknown>"
try:
min_value = float(image.min())
max_value = float(image.max())
value_range = f"{min_value:.6f}..{max_value:.6f}"
except Exception:
value_range = "<unknown>"
return f"shape={shape}; dtype={dtype}; device={device}; contiguous={is_contiguous}; range={value_range}"
def comfy_image_to_pil(image) -> Image.Image:
if hasattr(image, "detach"):
image = image.detach().cpu().numpy()
array = np.asarray(image)
if array.ndim == 4:
if array.shape[0] < 1:
raise RuntimeError("ComfyUI IMAGE batch is empty.")
array = array[0]
if array.ndim != 3 or array.shape[-1] not in {3, 4}:
raise RuntimeError("ComfyUI IMAGE must have shape [B,H,W,C] or [H,W,C].")
array = np.clip(array, 0.0, 1.0)
array = (array * 255.0).round().astype(np.uint8)
return Image.fromarray(array).convert("RGB")
def comfy_batch_to_pil_images(image) -> list[Image.Image]:
if hasattr(image, "detach"):
image = image.detach().cpu().numpy()
array = np.asarray(image)
if array.ndim == 3:
array = array[None, ...]
if array.ndim != 4 or array.shape[-1] not in {3, 4}:
raise RuntimeError("ComfyUI IMAGE batch must have shape [B,H,W,C].")
array = np.clip(array, 0.0, 1.0)
array = (array * 255.0).round().astype(np.uint8)
return [Image.fromarray(item).convert("RGB") for item in array]
def pil_to_png_data_url(image: Image.Image) -> str:
buffer = BytesIO()
image.convert("RGB").save(buffer, format="PNG")
encoded = base64.b64encode(buffer.getvalue()).decode("ascii")
return f"data:image/png;base64,{encoded}"
def comfy_image_to_png_data_url(image) -> str:
return pil_to_png_data_url(comfy_image_to_pil(image))
from __future__ import annotations
import argparse
import os
import shutil
import subprocess
import sys
from pathlib import Path
DEFAULT_NODE_NAME = "ComfyUI-SenseNova-U1"
def _link_or_junction(source: Path, target: Path) -> str:
"""Create a directory symlink, falling back to an NTFS junction on Windows.
Windows blocks `os.symlink` unless the user is an administrator or
Developer Mode is enabled (WinError 1314). A directory junction
(`mklink /J`) provides the same `Path.resolve()`-followable semantics
without any privilege; ComfyUI loads through it and the loader's
auto-discovery still finds the monorepo source.
"""
try:
os.symlink(source, target, target_is_directory=True)
return "Linked"
except OSError as exc:
if sys.platform != "win32":
raise
try:
subprocess.check_call(
["cmd", "/c", "mklink", "/J", str(target), str(source)],
stdout=subprocess.DEVNULL,
)
return "Junctioned"
except (subprocess.CalledProcessError, FileNotFoundError) as junc_exc:
raise SystemExit(
f"Could not create a symlink at {target}: {exc}\n"
f"Falling back to `mklink /J` also failed: {junc_exc}\n"
"Re-run with --copy, enable Windows Developer Mode "
"(Settings → For Developers), or run this script as Administrator."
) from exc
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Install the SenseNova-U1 ComfyUI app.")
parser.add_argument(
"--comfyui",
required=True,
help="Path to the ComfyUI checkout that contains the custom_nodes directory.",
)
parser.add_argument(
"--name",
default=DEFAULT_NODE_NAME,
help=f"Directory name under ComfyUI/custom_nodes (default: {DEFAULT_NODE_NAME}).",
)
parser.add_argument(
"--copy",
action="store_true",
help="Copy files instead of creating a symlink.",
)
parser.add_argument(
"--install-deps",
action="store_true",
help="Run pip install -r requirements.txt with the current Python.",
)
parser.add_argument(
"--force",
action="store_true",
help="Replace an existing symlink or directory at the target path.",
)
return parser.parse_args()
def main() -> None:
args = parse_args()
app_dir = Path(__file__).resolve().parent
repo_dir = app_dir.parents[1]
comfyui_dir = Path(args.comfyui).expanduser().resolve()
custom_nodes = comfyui_dir / "custom_nodes"
target = custom_nodes / args.name
if not custom_nodes.is_dir():
raise SystemExit(f"ComfyUI custom_nodes directory not found: {custom_nodes}")
if target.exists() or target.is_symlink():
if not args.force:
raise SystemExit(
f"Target already exists: {target}\nRe-run with --force to replace it, or choose another --name."
)
if target.is_symlink() or target.is_file():
target.unlink()
else:
shutil.rmtree(target)
if args.copy:
shutil.copytree(app_dir, target, ignore=shutil.ignore_patterns("__pycache__"))
action = "Copied"
else:
action = _link_or_junction(app_dir, target)
print(f"{action} SenseNova-U1 ComfyUI app:")
print(f" {target} -> {app_dir}")
if not args.copy:
# Default symlink (or Windows junction) mode: local_pipeline.py's
# default_source_path() resolves __file__ through the link back to
# this monorepo and discovers <repo>/src automatically. No env var
# needed for local inference.
print(f"\n{action} mode: SENSENOVA_U1_SRC auto-resolves to")
print(f" {repo_dir / 'src'}")
print("via local_pipeline.default_source_path(), because the loader")
print("file is linked back into this checkout. Moving or renaming")
print("the monorepo breaks that link — re-run install.py afterwards.")
else:
# --copy mode: files live under <ComfyUI>/custom_nodes/, no symlink
# to follow, so the user must point SENSENOVA_U1_SRC explicitly.
print("\nCopy mode: auto-discovery is disabled (no symlink to follow).")
print(f"Set SENSENOVA_U1_SRC={repo_dir / 'src'}")
print("in the ComfyUI launch environment, or fill the loader node's")
print("`sensenova_u1_src` input.")
if args.install_deps:
requirements = app_dir / "requirements.txt"
subprocess.check_call([sys.executable, "-m", "pip", "install", "-r", str(requirements)])
print("\nFor local inference, also install the SenseNova-U1 runtime in the ComfyUI Python environment:")
print(f" {sys.executable} -m pip install -e {repo_dir}")
print(" Restart ComfyUI.")
else:
print("\nNext steps:")
print(f" {sys.executable} -m pip install -r {app_dir / 'requirements.txt'}")
print(f" {sys.executable} -m pip install -e {repo_dir} # for local inference")
print(" Restart ComfyUI.")
if __name__ == "__main__":
main()
from __future__ import annotations
import contextlib
import json
import logging
import math
import os
import sys
from contextlib import contextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import numpy as np
from PIL import Image
try:
from .image_utils import comfy_image_to_pil, pil_to_comfy_image
except ImportError: # pragma: no cover - supports direct imports during tests
from image_utils import comfy_image_to_pil, pil_to_comfy_image
LOGGER = logging.getLogger(__name__)
def _vram_snapshot(label: str, *, device: str = "cuda", reset_peak: bool = False) -> None:
"""Log allocated/reserved/peak CUDA memory plus pinned-host stats with ``label``.
Used to trace the VRAM growth that shows up under
``vram_mode='balanced'`` inside ComfyUI but not when the same code runs
via ``examples/t2i/inference.py``. Cheap (~1 ms) and never raises so it
can be sprinkled liberally; falls back to a no-op when CUDA is missing.
"""
try:
import torch
if not torch.cuda.is_available():
return
dev = torch.device(device)
alloc = torch.cuda.memory_allocated(dev) / (1024**3)
reserved = torch.cuda.memory_reserved(dev) / (1024**3)
peak = torch.cuda.max_memory_allocated(dev) / (1024**3)
LOGGER.info(
"[vram] %-44s | alloc=%6.2f GiB reserved=%6.2f GiB peak=%6.2f GiB",
label,
alloc,
reserved,
peak,
)
if reset_peak:
torch.cuda.reset_peak_memory_stats(dev)
except Exception as exc: # pragma: no cover - diagnostic only
LOGGER.debug("vram snapshot %r failed: %s", label, exc)
@contextmanager
def _progress_hook(model: Any, total_steps: int):
"""Temporarily wrap ``model.unpatchify`` so each call advances a
ComfyUI :class:`ProgressBar`.
``unpatchify`` is invoked exactly once at the end of every sampling
step in t2i / it2i / interleave generation, so it is a precise and
non-invasive progress signal that does not require modifying the
model code. If ``comfy.utils.ProgressBar`` is unavailable (e.g. tests
outside ComfyUI), we still install the wrapper and emit a log line
with the final step count so users get feedback on the terminal.
"""
pbar = None
try:
from comfy.utils import ProgressBar # type: ignore[import-not-found]
pbar = ProgressBar(max(1, int(total_steps)))
except Exception: # pragma: no cover - ComfyUI runtime not present
pbar = None
if not hasattr(model, "unpatchify"):
yield
return
original = model.unpatchify
counter = {"n": 0}
def wrapped(*args, **kwargs):
out = original(*args, **kwargs)
counter["n"] += 1
if pbar is not None:
try:
pbar.update(1)
except Exception:
pass
_vram_snapshot(f"sampling step {counter['n']}/{total_steps}")
# Log a heartbeat at every multiple of total_steps so users can see
# multi-image interleave progress past the saturated bar.
if total_steps and counter["n"] % total_steps == 0:
LOGGER.info(
"SenseNova U1 sampling: image #%d ready (%d steps).",
counter["n"] // total_steps,
total_steps,
)
return out
model.unpatchify = wrapped
try:
yield
finally:
try:
del model.unpatchify # restore the class-level binding
except AttributeError:
try:
model.unpatchify = original
except Exception:
pass
if counter["n"]:
LOGGER.info(
"SenseNova U1 sampling: %d step(s) completed (target=%d).",
counter["n"],
total_steps,
)
LOCAL_MODEL_TYPE = "SENSENOVA_U1_LOCAL_MODEL"
INTERLEAVE_RESULT_TYPE = "SENSENOVA_INTERLEAVE_RESULT"
DEFAULT_SEED = 42
DEFAULT_SOURCE_PATH = ""
DEFAULT_TARGET_PIXELS = 2048 * 2048
DEFAULT_IMAGE_PATCH_SIZE = 32
DEFAULT_INTERLEAVE_SYSTEM_MESSAGE = (
"You are a multimodal assistant capable of reasoning with both text and images. "
"You support two modes:\n\n"
"Think Mode: When reasoning is needed, you MUST start with a <think></think> block "
"and place all reasoning inside it. You MUST interleave text with generated images "
"using tags like <image1>, <image2>. Images can ONLY be generated between <think> and "
"</think>, and may be referenced in the final answer.\n\n"
"Non-Think Mode: When no reasoning is needed, directly provide the answer without reasoning. "
"Do not use tags like <image1>, <image2>; present any images naturally alongside the text.\n\n"
"After the think block, always provide a concise, user-facing final answer. "
"The answer may include text, images, or both. Match the user's language in both reasoning "
"and the final answer."
)
T2I_RESOLUTIONS: dict[str, tuple[int, int]] = {
"1:1": (2048, 2048),
"16:9": (2720, 1536),
"9:16": (1536, 2720),
"3:2": (2496, 1664),
"2:3": (1664, 2496),
"4:3": (2368, 1760),
"3:4": (1760, 2368),
"1:2": (1440, 2880),
"2:1": (2880, 1440),
"1:3": (1152, 3456),
"3:1": (3456, 1152),
}
INTERLEAVE_RESOLUTIONS: dict[str, tuple[int, int]] = {
"1:1": (1536, 1536),
"16:9": (2048, 1152),
"9:16": (1152, 2048),
"3:2": (1888, 1248),
"2:3": (1248, 1888),
"4:3": (1760, 1312),
"3:4": (1312, 1760),
"1:2": (1088, 2144),
"2:1": (2144, 1088),
"1:3": (864, 2592),
"3:1": (2592, 864),
}
T2I_RESOLUTION_OPTIONS = tuple(f"{width}x{height}|{ratio}" for ratio, (width, height) in T2I_RESOLUTIONS.items())
INTERLEAVE_RESOLUTION_OPTIONS = tuple(
f"{width}x{height}|{ratio}" for ratio, (width, height) in INTERLEAVE_RESOLUTIONS.items()
)
DTYPE_OPTIONS = ("bfloat16", "float16", "float32")
CFG_NORM_OPTIONS = ("none", "global", "channel", "cfg_zero_star")
ATTN_BACKEND_OPTIONS = ("auto", "flash", "sdpa")
DEVICE_MAP_OPTIONS = ("none", "auto", "balanced", "balanced_low_0", "sequential")
VRAM_MODE_OPTIONS = ("full", "low", "balanced")
DEFAULT_VRAM_MODE = "full"
# vram_mode -> prefetch_count (the underlying knob on the layer-offload wrapper)
# 0 = no offload, 1 = synchronous, >=2 = async prefetch this many layers ahead.
# Absolute VRAM is workload-dependent (KV cache grows with image/text count in
# interleave mode), so modes describe the *mechanism*, not a fixed budget.
_VRAM_MODE_TO_PREFETCH: dict[str, int] = {
"full": 0, # no offload, whole model on GPU
"low": 1, # sync per-layer offload, smallest weight footprint, slowest
"balanced": 2, # async prefetch, overlaps H2D with compute
}
DEFAULT_LAYERS_ATTR = "language_model.model.layers"
_NORM_MEAN = (0.5, 0.5, 0.5)
_NORM_STD = (0.5, 0.5, 0.5)
@dataclass
class LocalGenerationResult:
images: Any
text: str
think_text: str
metadata: dict[str, Any]
interleave_result: dict[str, Any] | None = None
class SenseNovaU1LocalModel:
def __init__(
self,
*,
model_path: str,
sensenova_u1_src: str = "",
device: str = "cuda",
dtype: str = "bfloat16",
attn_backend: str = "auto",
device_map: str = "none",
max_memory: str = "",
gguf_checkpoint: str = "",
vram_mode: str = DEFAULT_VRAM_MODE,
) -> None:
if not model_path.strip():
raise RuntimeError("Local model_path cannot be empty.")
if vram_mode not in _VRAM_MODE_TO_PREFETCH:
raise RuntimeError(f"Unsupported vram_mode={vram_mode!r}. Choose one of {VRAM_MODE_OPTIONS}.")
prefetch_count = _VRAM_MODE_TO_PREFETCH[vram_mode]
injected_path = _maybe_add_source_path(sensenova_u1_src)
model_path = _resolve_local_model_path(model_path)
torch = _import_torch()
sensenova_u1, load_model_and_tokenizer, _ = _import_sensenova_u1()
if attn_backend not in ATTN_BACKEND_OPTIONS:
raise RuntimeError(f"Unsupported attention backend: {attn_backend}")
sensenova_u1.set_attn_backend(attn_backend)
torch_dtype = _resolve_dtype(torch, dtype)
normalized_device_map = None if device_map == "none" else device_map
normalized_gguf = gguf_checkpoint.strip() or None
offloading = prefetch_count > 0
if offloading and normalized_device_map:
LOGGER.warning(
"SenseNova U1 loader: vram_mode=%r overrides device_map=%r "
"(layer offload is incompatible with accelerate placement).",
vram_mode,
normalized_device_map,
)
normalized_device_map = None
if normalized_gguf and normalized_device_map:
# diffusers' GGUF quantizer skips accelerate sharding — let the user know.
raise RuntimeError("gguf_checkpoint cannot be combined with a device_map; pick one.")
self.device = device
self.dtype = dtype
self.model_path = model_path
self.attn_backend = attn_backend
self.gguf_checkpoint = normalized_gguf or ""
self.vram_mode = vram_mode
self.prefetch_count = int(prefetch_count)
self.effective_attn_backend = sensenova_u1.effective_attn_backend()
_vram_snapshot(f"loader: pre-load (vram_mode={vram_mode})", device=device, reset_peak=True)
self.model, self.tokenizer = load_model_and_tokenizer(
model_path,
dtype=torch_dtype,
device=device,
device_map=normalized_device_map,
max_memory=max_memory or None,
gguf_checkpoint=normalized_gguf,
for_offload=offloading,
)
_vram_snapshot(f"loader: post-load (for_offload={offloading})", device=device)
_maybe_remove_source_path(injected_path)
@property
def info(self) -> dict[str, Any]:
return {
"model_path": self.model_path,
"device": self.device,
"dtype": self.dtype,
"attn_backend": self.attn_backend,
"effective_attn_backend": self.effective_attn_backend,
"gguf_checkpoint": self.gguf_checkpoint,
"vram_mode": self.vram_mode,
"prefetch_count": self.prefetch_count,
}
def _offload_ctx(self):
"""Return a context manager that yields the model wrapped for layer
offload, or a no-op nullcontext yielding ``self.model`` when offload
is disabled (``prefetch_count == 0``)."""
if self.prefetch_count == 0:
return contextlib.nullcontext(self.model)
torch = _import_torch()
from sensenova_u1.utils import offload_layers_async, offload_layers_sync
target = torch.device(self.device)
if self.prefetch_count == 1:
inner = offload_layers_sync(self.model, DEFAULT_LAYERS_ATTR, target)
else:
inner = offload_layers_async(self.model, DEFAULT_LAYERS_ATTR, target, prefetch_count=self.prefetch_count)
return self._instrumented_offload_ctx(inner)
@contextmanager
def _instrumented_offload_ctx(self, inner):
"""Wrap the offload context manager so we can snapshot VRAM at the
boundaries that matter when diagnosing leaks across repeated runs in
ComfyUI: just before the wrapper is built, just after, and on the way
out (after teardown + ``model.to('cpu')`` + ``empty_cache``).
"""
_vram_snapshot(
f"offload_ctx: enter (prefetch_count={self.prefetch_count}, vram_mode={self.vram_mode})",
device=self.device,
reset_peak=True,
)
try:
with inner as offloaded:
_vram_snapshot("offload_ctx: wrapper ready", device=self.device)
yield offloaded
_vram_snapshot("offload_ctx: forward done (pre-teardown)", device=self.device)
finally:
_vram_snapshot("offload_ctx: exit (post-teardown+empty_cache)", device=self.device)
def text_to_image(
self,
*,
prompt: str,
width: int,
height: int,
cfg_scale: float,
cfg_norm: str,
timestep_shift: float,
cfg_interval: tuple[float, float],
num_steps: int,
batch_size: int,
seed: int,
think_mode: bool,
) -> LocalGenerationResult:
if not prompt.strip():
raise RuntimeError("Text-to-image prompt cannot be empty.")
_check_cfg_interval(cfg_interval)
torch = _import_torch()
with (
torch.inference_mode(),
self._offload_ctx() as offloaded,
_progress_hook(self.model, num_steps),
):
out = offloaded.t2i_generate(
self.tokenizer,
prompt,
image_size=(width, height),
cfg_scale=cfg_scale,
cfg_norm=cfg_norm,
timestep_shift=timestep_shift,
cfg_interval=cfg_interval,
num_steps=num_steps,
batch_size=batch_size,
seed=seed,
think_mode=think_mode,
)
if think_mode:
tensor, think_text = out
else:
tensor = out
think_text = ""
return LocalGenerationResult(
images=_batch_tensor_to_comfy_image(tensor),
text="",
think_text=think_text,
metadata={
**self.info,
"task": "text-to-image",
"width": width,
"height": height,
"seed": seed,
"batch_size": batch_size,
"num_steps": num_steps,
"think_mode": think_mode,
},
)
def edit_image(
self,
*,
prompt: str,
input_image: Any,
width: int | None,
height: int | None,
target_pixels: int,
cfg_scale: float,
img_cfg_scale: float,
cfg_norm: str,
timestep_shift: float,
cfg_interval: tuple[float, float],
num_steps: int,
batch_size: int,
seed: int,
think_mode: bool,
) -> LocalGenerationResult:
if not prompt.strip():
raise RuntimeError("Image editing prompt cannot be empty.")
if cfg_norm == "cfg_zero_star":
raise RuntimeError("cfg_zero_star is only supported for local text-to-image.")
pil_image = comfy_image_to_pil(input_image)
# Match the Terminal pipeline by upsampling small inputs to the same
# pixel budget before they hit the model; otherwise edits on sub-2K
# images come out noticeably softer than `examples/editing/inference.py`.
pil_image = _resize_input_to_budget(pil_image, target_pixels)
out_width, out_height = _resolve_edit_size(
pil_image,
width=width,
height=height,
target_pixels=target_pixels,
)
_check_cfg_interval(cfg_interval)
torch = _import_torch()
with (
torch.inference_mode(),
self._offload_ctx() as offloaded,
_progress_hook(self.model, num_steps),
):
out = offloaded.it2i_generate(
self.tokenizer,
prompt,
[pil_image],
image_size=(out_width, out_height),
cfg_scale=cfg_scale,
img_cfg_scale=img_cfg_scale,
cfg_norm=cfg_norm,
timestep_shift=timestep_shift,
cfg_interval=cfg_interval,
num_steps=num_steps,
batch_size=batch_size,
seed=seed,
think_mode=think_mode,
)
if think_mode:
tensor, think_text = out
else:
tensor = out
think_text = ""
return LocalGenerationResult(
images=_batch_tensor_to_comfy_image(tensor),
text="",
think_text=think_text,
metadata={
**self.info,
"task": "image-editing",
"width": out_width,
"height": out_height,
"seed": seed,
"batch_size": batch_size,
"num_steps": num_steps,
"target_pixels": target_pixels,
"think_mode": think_mode,
},
)
def interleave(
self,
*,
prompt: str,
input_image: Any | None,
width: int,
height: int,
cfg_scale: float,
img_cfg_scale: float,
timestep_shift: float,
cfg_interval: tuple[float, float],
num_steps: int,
seed: int,
think_mode: bool,
system_message: str,
) -> LocalGenerationResult:
if not prompt.strip():
raise RuntimeError("Interleave prompt cannot be empty.")
_, _, smart_resize = _import_sensenova_u1()
input_images: list[Image.Image] = []
if input_image is not None:
pil_image = comfy_image_to_pil(input_image)
resized_height, resized_width = smart_resize(pil_image.height, pil_image.width)
width, height = resized_width, resized_height
input_images.append(pil_image)
_check_cfg_interval(cfg_interval)
torch = _import_torch()
# Interleave can emit multiple images, each running num_steps sampling
# steps. The bar saturates at the first image; subsequent images are
# tracked via LOGGER (one line per completed image).
with (
torch.inference_mode(),
self._offload_ctx() as offloaded,
_progress_hook(self.model, num_steps),
):
text, image_tensors = offloaded.interleave_gen(
self.tokenizer,
prompt,
images=input_images,
image_size=(width, height),
cfg_scale=cfg_scale,
img_cfg_scale=img_cfg_scale,
timestep_shift=timestep_shift,
cfg_interval=cfg_interval,
num_steps=num_steps,
system_message=system_message,
think_mode=think_mode,
seed=seed,
)
images = [_single_tensor_to_pil(tensor) for tensor in image_tensors]
metadata = {
**self.info,
"task": "interleave",
"width": width,
"height": height,
"seed": seed,
"num_steps": num_steps,
"think_mode": think_mode,
"num_output_images": len(image_tensors),
}
interleave_result = build_interleave_result(
text=text,
num_images=len(images),
metadata=metadata,
)
if not images:
images = [Image.new("RGB", (1, 1), (0, 0, 0))]
return LocalGenerationResult(
images=_pil_images_to_comfy_batch(images),
text=text,
think_text=interleave_result["think_text"],
metadata=metadata,
interleave_result=interleave_result,
)
def default_device() -> str:
"""Best-available accelerator string for the ComfyUI loader's default UI value.
Resolved at node-registration time. Prefers CUDA > XPU > CPU; ``torch``
is imported lazily so this module stays importable in tooling that
doesn't have torch installed.
"""
try:
import torch
except ImportError:
return "cuda"
if torch.cuda.is_available():
return "cuda"
if torch.xpu.is_available():
return "xpu"
return "cpu"
def default_source_path() -> str:
"""Resolve a `sensenova_u1` source path for the loader's default input.
Precedence:
1. ``SENSENOVA_U1_SRC`` env var (manual override; wins everywhere).
2. Monorepo symlink auto-discovery — *location-bound*: only fires when
this file resolves to ``<repo>/apps/comfyui/local_pipeline.py``,
which happens when ``install.py`` (default mode) symlinks the
directory into ``<ComfyUI>/custom_nodes/ComfyUI-SenseNova-U1``.
``--copy`` mode and Registry / Manager installs land somewhere
else, so this branch is skipped for them.
3. ``DEFAULT_SOURCE_PATH`` (empty) — falls back to the installed
``sensenova_u1`` wheel in the ComfyUI Python environment.
"""
env = os.environ.get("SENSENOVA_U1_SRC", "").strip()
if env:
return env
# Path.resolve() follows symlinks, so the install.py-created link
# leads back to the monorepo checkout.
here = Path(__file__).resolve()
if here.parent.name == "comfyui" and here.parents[1].name == "apps":
repo_src = here.parents[2] / "src"
if repo_src.is_dir():
return str(repo_src)
return DEFAULT_SOURCE_PATH
def parse_resolution_option(value: str) -> tuple[int, int]:
size = value.split("|", 1)[0].strip()
width, height = size.split("x", 1)
return int(width), int(height)
def output_to_tuple(result: LocalGenerationResult) -> tuple[Any, str, str, str]:
return (
result.images,
result.text,
result.think_text,
json.dumps(result.metadata, ensure_ascii=False),
)
def interleave_output_to_tuple(result: LocalGenerationResult) -> tuple[Any, str, str, str, dict[str, Any]]:
interleave_result = result.interleave_result or build_interleave_result(
text=result.text,
num_images=0,
metadata=result.metadata,
)
return (
result.images,
result.text,
result.think_text,
json.dumps(result.metadata, ensure_ascii=False),
interleave_result,
)
def build_interleave_result(
*,
text: str,
num_images: int,
metadata: dict[str, Any],
) -> dict[str, Any]:
parts = _parse_interleave_parts(text, num_images)
think_text = "\n\n".join(part["text"] for part in parts if part["type"] == "think")
return {
"version": 1,
"parts": parts,
"text": text,
"think_text": think_text,
"num_images": num_images,
"metadata": metadata,
}
def interleave_result_to_markdown(result: dict[str, Any], *, include_think: bool = True) -> str:
lines: list[str] = []
for part in result.get("parts", []):
part_type = part.get("type")
if part_type == "think":
if include_think:
lines.extend(["<details><summary>think</summary>", "", str(part.get("text", "")), "", "</details>"])
elif part_type == "text":
text = str(part.get("text", "")).strip()
if text:
lines.append(text)
elif part_type == "image":
lines.append(f"[image:{int(part.get('index', 0))}]")
return "\n\n".join(line for line in lines if line != "")
def _parse_interleave_parts(text: str, num_images: int) -> list[dict[str, Any]]:
parts: list[dict[str, Any]] = []
image_index = 0
chunks = text.split("<image>")
num_image_tags = len(chunks) - 1
for index, chunk in enumerate(chunks):
_append_text_and_think_parts(parts, chunk)
if index < num_image_tags:
if image_index < num_images:
parts.append({"type": "image", "index": image_index})
else:
parts.append({"type": "image", "index": image_index, "missing": True})
image_index += 1
while image_index < num_images:
parts.append({"type": "image", "index": image_index})
image_index += 1
return parts
def _append_text_and_think_parts(parts: list[dict[str, Any]], chunk: str) -> None:
remaining = chunk
while remaining:
start = remaining.find("<think>")
if start < 0:
_append_text_part(parts, remaining)
return
_append_text_part(parts, remaining[:start])
after_start = start + len("<think>")
end = remaining.find("</think>", after_start)
if end < 0:
think_text = remaining[after_start:]
remaining = ""
else:
think_text = remaining[after_start:end]
remaining = remaining[end + len("</think>") :]
if think_text.strip():
parts.append({"type": "think", "text": think_text.strip()})
def _append_text_part(parts: list[dict[str, Any]], text: str) -> None:
if text.strip():
parts.append({"type": "text", "text": text.strip()})
def _maybe_add_source_path(source_path: str) -> list[str]:
"""Inject source_path into sys.path for this session only; returns the
injected path so _maybe_remove_source_path can undo it."""
source_path = source_path.strip()
if not source_path:
source_path = default_source_path()
if not source_path:
return []
path = Path(source_path).expanduser()
if path.name != "src" and (path / "src").is_dir():
path = path / "src"
path_str = str(path)
if path.is_dir() and path_str not in sys.path:
sys.path.insert(0, path_str)
return [path_str]
return []
def _maybe_remove_source_path(injected: list[str]) -> None:
"""Remove paths injected by _maybe_add_source_path, keeping any the user
may have added independently."""
for p in injected:
if p in sys.path:
sys.path.remove(p)
def _import_torch():
try:
import torch
except ImportError as exc:
raise RuntimeError("Local SenseNova-U1 inference requires PyTorch in ComfyUI.") from exc
return torch
def _import_sensenova_u1():
try:
import sensenova_u1
from sensenova_u1.models.neo_unify.utils import smart_resize
from sensenova_u1.utils import load_model_and_tokenizer
except ImportError as exc:
raise RuntimeError(
"Local SenseNova-U1 inference requires the `sensenova_u1` package. "
"Install it into the ComfyUI Python environment, e.g.:\n"
" pip install 'sensenova-u1 @ git+https://github.com/OpenSenseNova/SenseNova-U1'\n"
"Or, for monorepo development, set SENSENOVA_U1_SRC=/path/to/SenseNova-U1/src "
"(or fill the loader's `sensenova_u1_src` input)."
) from exc
return sensenova_u1, load_model_and_tokenizer, smart_resize
def _resolve_local_model_path(model_path: str) -> str:
if Path(model_path).exists():
return model_path
try:
from huggingface_hub import snapshot_download
return snapshot_download(model_path, local_files_only=True)
except Exception:
return model_path
def _resolve_dtype(torch, dtype: str):
try:
return {
"bfloat16": torch.bfloat16,
"float16": torch.float16,
"float32": torch.float32,
}[dtype]
except KeyError as exc:
raise RuntimeError(f"Unsupported dtype: {dtype}") from exc
def _denorm(x):
torch = _import_torch()
mean = torch.tensor(_NORM_MEAN, device=x.device, dtype=x.dtype).view(1, 3, 1, 1)
std = torch.tensor(_NORM_STD, device=x.device, dtype=x.dtype).view(1, 3, 1, 1)
return (x * std + mean).clamp(0, 1)
def _single_tensor_to_pil(tensor) -> Image.Image:
if tensor.ndim == 3:
tensor = tensor.unsqueeze(0)
return _tensor_batch_to_pil(tensor)[0]
def _tensor_batch_to_pil(batch) -> list[Image.Image]:
arr = _denorm(batch.float()).permute(0, 2, 3, 1).cpu().numpy()
arr = (arr * 255.0).round().astype(np.uint8)
return [Image.fromarray(a).convert("RGB") for a in arr]
def _batch_tensor_to_comfy_image(batch):
images = _tensor_batch_to_pil(batch)
return _pil_images_to_comfy_batch(images)
def _pil_images_to_comfy_batch(images: list[Image.Image]):
torch = _import_torch()
tensors = [pil_to_comfy_image(image) for image in images]
return torch.cat(tensors, dim=0)
def _resize_input_to_budget(image: Image.Image, target_pixels: int) -> Image.Image:
"""Match the Terminal pipeline (`examples/editing/inference.py`):
rescale the source image so its total pixels equal ``target_pixels``,
keeping aspect ratio, snapping H/W to the model's grid factor, and using
LANCZOS resampling. Without this step a small input (e.g. 1024x1024)
would be passed through to the model as-is, costing visible detail.
"""
_, _, smart_resize = _import_sensenova_u1()
resized_height, resized_width = smart_resize(
height=image.height,
width=image.width,
factor=DEFAULT_IMAGE_PATCH_SIZE,
min_pixels=target_pixels,
max_pixels=target_pixels,
)
if (resized_width, resized_height) == image.size:
return image
return image.resize((resized_width, resized_height), Image.LANCZOS)
def _resolve_edit_size(
image: Image.Image,
*,
width: int | None,
height: int | None,
target_pixels: int,
) -> tuple[int, int]:
if width is not None or height is not None:
if width is None or height is None:
raise RuntimeError("width and height must be provided together.")
_check_grid_divisible(width, height)
return width, height
_, _, smart_resize = _import_sensenova_u1()
resized_height, resized_width = smart_resize(
height=image.height,
width=image.width,
factor=DEFAULT_IMAGE_PATCH_SIZE,
min_pixels=target_pixels,
max_pixels=target_pixels,
)
return resized_width, resized_height
def _check_grid_divisible(width: int, height: int) -> None:
if width % DEFAULT_IMAGE_PATCH_SIZE or height % DEFAULT_IMAGE_PATCH_SIZE:
raise RuntimeError(
f"Output resolution ({width}x{height}) must be a multiple of {DEFAULT_IMAGE_PATCH_SIZE} on both axes."
)
def _check_cfg_interval(cfg_interval: tuple[float, float]) -> None:
lo, hi = cfg_interval
if not 0.0 <= lo <= hi <= 1.0:
raise RuntimeError("cfg_interval must satisfy 0.0 <= start <= end <= 1.0.")
def target_pixels_from_megapixels(megapixels: float) -> int:
minimum = DEFAULT_IMAGE_PATCH_SIZE * DEFAULT_IMAGE_PATCH_SIZE
return max(minimum, math.floor(megapixels * 1_000_000))
from __future__ import annotations
import hashlib
import json
import logging
import tempfile
import uuid
from pathlib import Path
from typing import Any
from comfy_api.latest import ComfyExtension, io
try:
from .api_client import (
CHAT_MODELS,
IMAGE_MODELS,
IMAGE_SIZE_OPTIONS,
VISION_MODELS,
SenseNovaClient,
)
from .image_utils import (
comfy_batch_to_pil_images,
comfy_image_info,
comfy_image_to_png_data_url,
image_bytes_to_comfy_image,
)
from .local_pipeline import (
ATTN_BACKEND_OPTIONS,
CFG_NORM_OPTIONS,
DEFAULT_INTERLEAVE_SYSTEM_MESSAGE,
DEFAULT_SEED,
DEFAULT_VRAM_MODE,
DEVICE_MAP_OPTIONS,
DTYPE_OPTIONS,
INTERLEAVE_RESOLUTION_OPTIONS,
INTERLEAVE_RESULT_TYPE,
LOCAL_MODEL_TYPE,
T2I_RESOLUTION_OPTIONS,
VRAM_MODE_OPTIONS,
SenseNovaU1LocalModel,
default_device,
default_source_path,
interleave_output_to_tuple,
interleave_result_to_markdown,
output_to_tuple,
parse_resolution_option,
target_pixels_from_megapixels,
)
from .prompt_utils import load_prompt_template
except ImportError: # pragma: no cover - supports direct imports during tests
from api_client import (
CHAT_MODELS,
IMAGE_MODELS,
IMAGE_SIZE_OPTIONS,
VISION_MODELS,
SenseNovaClient,
)
from image_utils import (
comfy_batch_to_pil_images,
comfy_image_info,
comfy_image_to_png_data_url,
image_bytes_to_comfy_image,
)
from local_pipeline import (
ATTN_BACKEND_OPTIONS,
CFG_NORM_OPTIONS,
DEFAULT_INTERLEAVE_SYSTEM_MESSAGE,
DEFAULT_SEED,
DEFAULT_VRAM_MODE,
DEVICE_MAP_OPTIONS,
DTYPE_OPTIONS,
INTERLEAVE_RESOLUTION_OPTIONS,
INTERLEAVE_RESULT_TYPE,
LOCAL_MODEL_TYPE,
T2I_RESOLUTION_OPTIONS,
VRAM_MODE_OPTIONS,
SenseNovaU1LocalModel,
default_device,
default_source_path,
interleave_output_to_tuple,
interleave_result_to_markdown,
output_to_tuple,
parse_resolution_option,
target_pixels_from_megapixels,
)
from prompt_utils import load_prompt_template
CATEGORY = "SenseNova"
LOCAL_CATEGORY = f"{CATEGORY}/Local"
VISION_SYSTEM_PROMPT = "You are a careful vision assistant. Describe only visible details."
BUILDER_PROMPT_TEMPLATE = "builder_prompt.txt"
LOGGER = logging.getLogger(__name__)
LocalModelIO = io.Custom(LOCAL_MODEL_TYPE)
InterleaveResultIO = io.Custom(INTERLEAVE_RESULT_TYPE)
_GGUF_FOLDER_CANDIDATES: tuple[str, ...] = ("gguf", "diffusion_models")
def _list_gguf_options() -> list[str]:
"""Combo options for SenseNovaU1LocalLoader.gguf_checkpoint.
Always starts with an empty string (= no GGUF, load via safetensors), then
every `.gguf` filename found under any registered folder in
``_GGUF_FOLDER_CANDIDATES`` (`gguf` for the dedicated layout, plus the
stock ComfyUI `diffusion_models` folder where ComfyUI-GGUF style packs
live). Returns just ``[""]`` when folder_paths is unavailable or no
matching files exist, so the schema still loads cleanly outside ComfyUI.
"""
found: set[str] = set()
try:
import folder_paths
for folder in _GGUF_FOLDER_CANDIDATES:
try:
files = folder_paths.get_filename_list(folder)
except Exception:
continue
for f in files:
if f.lower().endswith(".gguf"):
found.add(f)
except Exception:
pass
return ["", *sorted(found)]
def _resolve_gguf_choice(value: str) -> str:
"""Map a Combo selection back to an absolute path.
Searches the configured folders in order; the first registered folder
that contains the file wins. If the value isn't a registered filename
(e.g. workflow JSON edited to point at a literal path), it is returned
unchanged so SenseNovaU1LocalModel can treat it as an absolute path.
"""
if not value:
return ""
try:
import folder_paths
for folder in _GGUF_FOLDER_CANDIDATES:
try:
full = folder_paths.get_full_path(folder, value)
except Exception:
continue
if full:
return full
except Exception:
pass
return value
_LOCAL_MODEL_CACHE: dict[tuple, SenseNovaU1LocalModel] = {}
def _evict_model_cache(keep_key: tuple | None = None) -> None:
to_evict = [k for k in _LOCAL_MODEL_CACHE if k != keep_key]
for k in to_evict:
old = _LOCAL_MODEL_CACHE.pop(k)
try:
del old.model
except Exception:
pass
try:
del old.tokenizer
except Exception:
pass
del old
if to_evict:
# Force a GC pass *before* empty_cache so any tensors waiting on
# cyclic refs / lingering hooks actually drop their CUDA memory back
# to the caching allocator. Without this, empty_cache() can't reclaim
# the old model's VRAM and the next load OOMs partway through inference.
try:
import gc
import torch
gc.collect()
if torch.cuda.is_available():
torch.cuda.synchronize()
torch.cuda.empty_cache()
# Old model may have been CPU-pinned (vram_mode != "full");
# release the pinned host blocks too.
if hasattr(torch._C, "_host_emptyCache"):
torch._C._host_emptyCache()
except Exception:
pass
LOGGER.info("SenseNova U1 loader: evicted %d cached model(s) from VRAM.", len(to_evict))
class SenseNovaChat(io.ComfyNode):
@classmethod
def define_schema(cls) -> io.Schema:
return io.Schema(
node_id="SenseNovaChat",
display_name="SenseNova Chat",
category=CATEGORY,
inputs=[
io.String.Input("text", multiline=True, default=""),
io.String.Input(
"system_prompt",
multiline=True,
default="You are a helpful assistant. Answer clearly and concisely.",
),
io.Combo.Input("model", options=list(CHAT_MODELS), default=CHAT_MODELS[0]),
io.Float.Input("temperature", default=0.7, min=0.0, max=2.0, step=0.1),
io.Float.Input("top_p", default=1.0, min=0.0, max=1.0, step=0.05),
io.Int.Input("max_tokens", default=2048, min=1, max=65536),
io.Int.Input("timeout", default=120, min=10, max=600),
],
outputs=[
io.String.Output(display_name="text"),
io.String.Output(display_name="usage_json"),
io.String.Output(display_name="raw_json"),
],
)
@classmethod
def execute(
cls,
text: str,
system_prompt: str,
model: str,
temperature: float,
top_p: float,
max_tokens: int,
timeout: int,
) -> io.NodeOutput:
client = SenseNovaClient.from_env()
result = client.chat(
text=text,
system_prompt=system_prompt,
model=model,
temperature=temperature,
top_p=top_p,
max_tokens=max_tokens,
timeout=timeout,
)
return io.NodeOutput(
result.text,
json.dumps(result.usage, ensure_ascii=False),
json.dumps(result.raw, ensure_ascii=False),
)
class SenseNovaImageGenerate(io.ComfyNode):
@classmethod
def define_schema(cls) -> io.Schema:
return io.Schema(
node_id="SenseNovaImageGenerate",
display_name="SenseNova Image Generate",
category=CATEGORY,
inputs=[
io.String.Input("prompt", multiline=True, default=""),
io.Combo.Input("model", options=list(IMAGE_MODELS), default=IMAGE_MODELS[0]),
io.Combo.Input("size", options=list(IMAGE_SIZE_OPTIONS), default=IMAGE_SIZE_OPTIONS[0]),
io.Int.Input("timeout", default=300, min=30, max=900),
],
outputs=[
io.Image.Output(display_name="images"),
io.String.Output(display_name="image_base64"),
io.String.Output(display_name="image_url"),
io.String.Output(display_name="raw_json"),
io.String.Output(display_name="image_info"),
],
)
@classmethod
def execute(cls, prompt: str, model: str, size: str, timeout: int) -> io.NodeOutput:
client = SenseNovaClient.from_env()
result = client.generate_image(prompt=prompt, model=model, size=size, timeout=timeout)
image = image_bytes_to_comfy_image(result.image_bytes)
image_info = comfy_image_info(image)
LOGGER.info(
"SenseNova image generated: bytes=%s; url=%s; %s",
len(result.image_bytes),
bool(result.image_url),
image_info,
)
return io.NodeOutput(
image,
result.image_base64,
result.image_url,
json.dumps(result.raw, ensure_ascii=False),
image_info,
)
class SenseNovaPromptBuilder(io.ComfyNode):
@classmethod
def define_schema(cls) -> io.Schema:
return io.Schema(
node_id="SenseNovaPromptBuilder",
display_name="SenseNova Prompt Builder",
category=CATEGORY,
inputs=[
io.String.Input("prompt", multiline=True, default=""),
io.String.Input(
"system_prompt",
multiline=True,
default=load_prompt_template(BUILDER_PROMPT_TEMPLATE),
),
io.Combo.Input("model", options=list(CHAT_MODELS), default=CHAT_MODELS[0]),
io.Float.Input("temperature", default=0.3, min=0.0, max=2.0, step=0.1),
io.Float.Input("top_p", default=1.0, min=0.0, max=1.0, step=0.05),
io.Int.Input("max_tokens", default=2048, min=1, max=65536),
io.Int.Input("timeout", default=120, min=10, max=600),
],
outputs=[
io.String.Output(display_name="prompt"),
io.String.Output(display_name="usage_json"),
io.String.Output(display_name="raw_json"),
],
)
@classmethod
def execute(
cls,
prompt: str,
system_prompt: str,
model: str,
temperature: float,
top_p: float,
max_tokens: int,
timeout: int,
) -> io.NodeOutput:
client = SenseNovaClient.from_env()
result = client.chat(
text=prompt,
system_prompt=system_prompt,
model=model,
temperature=temperature,
top_p=top_p,
max_tokens=max_tokens,
timeout=timeout,
)
return io.NodeOutput(
result.text,
json.dumps(result.usage, ensure_ascii=False),
json.dumps(result.raw, ensure_ascii=False),
)
class SenseNovaVisionURL(io.ComfyNode):
@classmethod
def define_schema(cls) -> io.Schema:
return io.Schema(
node_id="SenseNovaVisionURL",
display_name="SenseNova Vision URL",
category=CATEGORY,
inputs=[
io.String.Input("image_url", default=""),
io.String.Input("prompt", multiline=True, default="Describe this image."),
io.String.Input("system_prompt", multiline=True, default=VISION_SYSTEM_PROMPT),
io.Combo.Input("model", options=list(VISION_MODELS), default=VISION_MODELS[0]),
io.Float.Input("temperature", default=0.2, min=0.0, max=2.0, step=0.1),
io.Float.Input("top_p", default=1.0, min=0.0, max=1.0, step=0.05),
io.Int.Input("max_tokens", default=2048, min=1, max=65536),
io.Int.Input("timeout", default=120, min=10, max=600),
],
outputs=[
io.String.Output(display_name="text"),
io.String.Output(display_name="usage_json"),
io.String.Output(display_name="raw_json"),
],
)
@classmethod
def execute(
cls,
image_url: str,
prompt: str,
system_prompt: str,
model: str,
temperature: float,
top_p: float,
max_tokens: int,
timeout: int,
) -> io.NodeOutput:
client = SenseNovaClient.from_env()
result = client.vision_chat(
image_url=image_url,
prompt=prompt,
system_prompt=system_prompt,
model=model,
temperature=temperature,
top_p=top_p,
max_tokens=max_tokens,
timeout=timeout,
)
return io.NodeOutput(
result.text,
json.dumps(result.usage, ensure_ascii=False),
json.dumps(result.raw, ensure_ascii=False),
)
class SenseNovaVisionImage(io.ComfyNode):
@classmethod
def define_schema(cls) -> io.Schema:
return io.Schema(
node_id="SenseNovaVisionImage",
display_name="SenseNova Vision Image",
category=CATEGORY,
inputs=[
io.Image.Input("image"),
io.String.Input("prompt", multiline=True, default="Describe this image."),
io.String.Input("system_prompt", multiline=True, default=VISION_SYSTEM_PROMPT),
io.Combo.Input("model", options=list(VISION_MODELS), default=VISION_MODELS[0]),
io.Float.Input("temperature", default=0.2, min=0.0, max=2.0, step=0.1),
io.Float.Input("top_p", default=1.0, min=0.0, max=1.0, step=0.05),
io.Int.Input("max_tokens", default=2048, min=1, max=65536),
io.Int.Input("timeout", default=120, min=10, max=600),
],
outputs=[
io.String.Output(display_name="text"),
io.String.Output(display_name="usage_json"),
io.String.Output(display_name="raw_json"),
],
)
@classmethod
def execute(
cls,
image,
prompt: str,
system_prompt: str,
model: str,
temperature: float,
top_p: float,
max_tokens: int,
timeout: int,
) -> io.NodeOutput:
client = SenseNovaClient.from_env()
result = client.vision_chat(
image_url=comfy_image_to_png_data_url(image),
prompt=prompt,
system_prompt=system_prompt,
model=model,
temperature=temperature,
top_p=top_p,
max_tokens=max_tokens,
timeout=timeout,
)
return io.NodeOutput(
result.text,
json.dumps(result.usage, ensure_ascii=False),
json.dumps(result.raw, ensure_ascii=False),
)
class SenseNovaU1LocalLoader(io.ComfyNode):
@classmethod
def define_schema(cls) -> io.Schema:
return io.Schema(
node_id="SenseNovaU1LocalLoader",
display_name="SenseNova U1 Local Loader",
category=LOCAL_CATEGORY,
inputs=[
io.String.Input(
"model_path",
default="sensenova/SenseNova-U1-8B-MoT",
tooltip="HuggingFace model id or local checkpoint directory.",
),
io.String.Input(
"sensenova_u1_src",
default=default_source_path(),
tooltip="Optional SenseNova-U1 source checkout or src directory.",
),
io.String.Input(
"device",
default=default_device(),
tooltip="Compute device, e.g. 'cuda', 'cuda:0', 'xpu', 'xpu:0', 'cpu'. Defaults to the best available accelerator.",
),
io.Combo.Input("dtype", options=list(DTYPE_OPTIONS), default="bfloat16"),
io.Combo.Input("attn_backend", options=list(ATTN_BACKEND_OPTIONS), default="auto"),
io.Combo.Input(
"device_map",
options=list(DEVICE_MAP_OPTIONS),
default="none",
tooltip=(
"Multi-GPU sharding via accelerate. 'none' = single device "
"(default). auto/balanced/balanced_low_0/sequential split layers "
"across all visible GPUs. For *single-GPU VRAM reduction* use "
"vram_mode instead — they are mutually exclusive."
),
),
io.String.Input(
"max_memory",
default="",
tooltip=(
"Per-device memory budget for device_map (e.g. 0=20GiB,1=20GiB,cpu=64GiB). "
"Only relevant when device_map != 'none'."
),
),
io.Combo.Input(
"vram_mode",
options=list(VRAM_MODE_OPTIONS),
default=DEFAULT_VRAM_MODE,
tooltip=(
"Single-GPU layer-offload mode (controls weight residency only; "
"activations / KV cache grow with workload — especially in interleave "
"mode where each generated image enlarges the cache).\n"
" full — no offload, whole model on GPU, fastest (default)\n"
" low — synchronous per-layer CPU<->GPU swap, smallest weight\n"
" footprint, slowest\n"
" balanced — async prefetch, overlaps H2D with compute, faster than low\n"
"Anything other than 'full' forces device_map='none' (use device_map "
"for multi-GPU sharding instead)."
),
),
io.Combo.Input(
"gguf_checkpoint",
options=_list_gguf_options(),
default="",
tooltip=(
"Optional .gguf quantized checkpoint, picked from "
"`<comfyui>/models/gguf/` or `<comfyui>/models/diffusion_models/`. "
"Empty (default) loads safetensors via from_pretrained. When set, weights "
"are loaded via the diffusers GGUF quantizer; device_map must be 'none'. "
"Requires the [gguf] extra (gguf>=0.10.0, diffusers>=0.30.0). Restart "
"ComfyUI to refresh the list after dropping new files into either folder."
),
),
],
outputs=[
LocalModelIO.Output(display_name="u1_model"),
io.String.Output(display_name="model_info_json"),
],
)
@classmethod
def fingerprint_inputs(
cls,
model_path: str,
sensenova_u1_src: str,
device: str,
dtype: str,
attn_backend: str,
device_map: str,
max_memory: str,
vram_mode: str,
gguf_checkpoint: str,
) -> str:
key = (
model_path.strip(),
sensenova_u1_src.strip(),
device.strip(),
dtype,
attn_backend,
device_map,
max_memory.strip(),
vram_mode,
_resolve_gguf_choice(gguf_checkpoint.strip()),
)
return hashlib.sha256(str(key).encode()).hexdigest()
@classmethod
def execute(
cls,
model_path: str,
sensenova_u1_src: str,
device: str,
dtype: str,
attn_backend: str,
device_map: str,
max_memory: str,
vram_mode: str,
gguf_checkpoint: str,
) -> io.NodeOutput:
resolved_gguf = _resolve_gguf_choice(gguf_checkpoint.strip())
cache_key = (
model_path.strip(),
sensenova_u1_src.strip(),
device.strip(),
dtype,
attn_backend,
device_map,
max_memory.strip(),
vram_mode,
resolved_gguf,
)
if cache_key not in _LOCAL_MODEL_CACHE:
_evict_model_cache()
if resolved_gguf:
LOGGER.info(
"SenseNova U1 loader: loading %s with GGUF checkpoint %s",
model_path,
resolved_gguf,
)
else:
LOGGER.info("SenseNova U1 loader: loading model from %s", model_path)
_LOCAL_MODEL_CACHE[cache_key] = SenseNovaU1LocalModel(
model_path=model_path,
sensenova_u1_src=sensenova_u1_src,
device=device,
dtype=dtype,
attn_backend=attn_backend,
device_map=device_map,
max_memory=max_memory,
gguf_checkpoint=resolved_gguf,
vram_mode=vram_mode,
)
else:
LOGGER.info("SenseNova U1 loader: reusing cached model for %s", model_path)
model = _LOCAL_MODEL_CACHE[cache_key]
return io.NodeOutput(model, json.dumps(model.info, ensure_ascii=False))
class SenseNovaU1LocalTextToImage(io.ComfyNode):
@classmethod
def define_schema(cls) -> io.Schema:
return io.Schema(
node_id="SenseNovaU1LocalTextToImage",
display_name="SenseNova U1 Local Text to Image",
category=LOCAL_CATEGORY,
inputs=[
LocalModelIO.Input("u1_model"),
io.String.Input("prompt", multiline=True, default=""),
io.Combo.Input(
"resolution",
options=list(T2I_RESOLUTION_OPTIONS),
default=T2I_RESOLUTION_OPTIONS[0],
),
io.Float.Input("cfg_scale", default=4.0, min=0.0, max=20.0, step=0.1),
io.Combo.Input("cfg_norm", options=list(CFG_NORM_OPTIONS), default="none"),
io.Float.Input("timestep_shift", default=3.0, min=0.0, max=20.0, step=0.1),
io.Float.Input("cfg_interval_start", default=0.0, min=0.0, max=1.0, step=0.05),
io.Float.Input("cfg_interval_end", default=1.0, min=0.0, max=1.0, step=0.05),
io.Int.Input("num_steps", default=50, min=1, max=200),
io.Int.Input("batch_size", default=1, min=1, max=16),
io.Int.Input("seed", default=DEFAULT_SEED, min=0, max=2**31 - 1),
io.Boolean.Input("think_mode", default=False),
],
outputs=[
io.Image.Output(display_name="images"),
io.String.Output(display_name="text"),
io.String.Output(display_name="think_text"),
io.String.Output(display_name="metadata_json"),
],
)
@classmethod
def execute(
cls,
u1_model: SenseNovaU1LocalModel,
prompt: str,
resolution: str,
cfg_scale: float,
cfg_norm: str,
timestep_shift: float,
cfg_interval_start: float,
cfg_interval_end: float,
num_steps: int,
batch_size: int,
seed: int,
think_mode: bool,
) -> io.NodeOutput:
width, height = parse_resolution_option(resolution)
result = u1_model.text_to_image(
prompt=prompt,
width=width,
height=height,
cfg_scale=cfg_scale,
cfg_norm=cfg_norm,
timestep_shift=timestep_shift,
cfg_interval=(cfg_interval_start, cfg_interval_end),
num_steps=num_steps,
batch_size=batch_size,
seed=seed,
think_mode=think_mode,
)
LOGGER.info("SenseNova U1 local T2I generated: %s", comfy_image_info(result.images))
return io.NodeOutput(*output_to_tuple(result))
class SenseNovaU1LocalImageEdit(io.ComfyNode):
@classmethod
def define_schema(cls) -> io.Schema:
return io.Schema(
node_id="SenseNovaU1LocalImageEdit",
display_name="SenseNova U1 Local Image Edit",
category=LOCAL_CATEGORY,
inputs=[
LocalModelIO.Input("u1_model"),
io.Image.Input("image"),
io.String.Input("prompt", multiline=True, default=""),
io.Boolean.Input("auto_size", default=True),
io.Int.Input("width", default=2048, min=32, max=8192, step=32),
io.Int.Input("height", default=2048, min=32, max=8192, step=32),
io.Float.Input(
"target_megapixels",
default=4.194304,
min=0.25,
max=32.0,
step=0.25,
),
io.Float.Input("cfg_scale", default=4.0, min=0.0, max=20.0, step=0.1),
io.Float.Input("img_cfg_scale", default=1.0, min=0.0, max=20.0, step=0.1),
io.Combo.Input("cfg_norm", options=list(CFG_NORM_OPTIONS[:-1]), default="none"),
io.Float.Input("timestep_shift", default=3.0, min=0.0, max=20.0, step=0.1),
io.Float.Input("cfg_interval_start", default=0.0, min=0.0, max=1.0, step=0.05),
io.Float.Input("cfg_interval_end", default=1.0, min=0.0, max=1.0, step=0.05),
io.Int.Input("num_steps", default=50, min=1, max=200),
io.Int.Input("batch_size", default=1, min=1, max=16),
io.Int.Input("seed", default=DEFAULT_SEED, min=0, max=2**31 - 1),
io.Boolean.Input("think_mode", default=False, optional=True),
],
outputs=[
io.Image.Output(display_name="images"),
io.String.Output(display_name="text"),
io.String.Output(display_name="think_text"),
io.String.Output(display_name="metadata_json"),
],
)
@classmethod
def execute(
cls,
u1_model: SenseNovaU1LocalModel,
image,
prompt: str,
auto_size: bool,
width: int,
height: int,
target_megapixels: float,
cfg_scale: float,
img_cfg_scale: float,
cfg_norm: str,
timestep_shift: float,
cfg_interval_start: float,
cfg_interval_end: float,
num_steps: int,
batch_size: int,
seed: int,
think_mode: bool = False,
) -> io.NodeOutput:
result = u1_model.edit_image(
prompt=prompt,
input_image=image,
width=None if auto_size else width,
height=None if auto_size else height,
target_pixels=target_pixels_from_megapixels(target_megapixels),
cfg_scale=cfg_scale,
img_cfg_scale=img_cfg_scale,
cfg_norm=cfg_norm,
timestep_shift=timestep_shift,
cfg_interval=(cfg_interval_start, cfg_interval_end),
num_steps=num_steps,
batch_size=batch_size,
seed=seed,
think_mode=think_mode,
)
LOGGER.info("SenseNova U1 local edit generated: %s", comfy_image_info(result.images))
return io.NodeOutput(*output_to_tuple(result))
class SenseNovaU1LocalInterleave(io.ComfyNode):
@classmethod
def define_schema(cls) -> io.Schema:
return io.Schema(
node_id="SenseNovaU1LocalInterleave",
display_name="SenseNova U1 Local Interleave",
category=LOCAL_CATEGORY,
inputs=[
LocalModelIO.Input("u1_model"),
io.String.Input("prompt", multiline=True, default=""),
io.Combo.Input(
"resolution",
options=list(INTERLEAVE_RESOLUTION_OPTIONS),
default=INTERLEAVE_RESOLUTION_OPTIONS[1],
),
io.String.Input(
"system_message",
multiline=True,
default=DEFAULT_INTERLEAVE_SYSTEM_MESSAGE,
),
io.Float.Input("cfg_scale", default=4.0, min=0.0, max=20.0, step=0.1),
io.Float.Input("img_cfg_scale", default=1.0, min=0.0, max=20.0, step=0.1),
io.Float.Input("timestep_shift", default=3.0, min=0.0, max=20.0, step=0.1),
io.Float.Input("cfg_interval_start", default=0.0, min=0.0, max=1.0, step=0.05),
io.Float.Input("cfg_interval_end", default=1.0, min=0.0, max=1.0, step=0.05),
io.Int.Input("num_steps", default=50, min=1, max=200),
io.Int.Input("seed", default=DEFAULT_SEED, min=0, max=2**31 - 1),
io.Boolean.Input("think_mode", default=True),
io.Image.Input("image", optional=True),
],
outputs=[
io.Image.Output(display_name="images"),
io.String.Output(display_name="text"),
io.String.Output(display_name="think_text"),
io.String.Output(display_name="metadata_json"),
InterleaveResultIO.Output(display_name="interleave_result"),
],
)
@classmethod
def execute(
cls,
u1_model: SenseNovaU1LocalModel,
prompt: str,
resolution: str,
system_message: str,
cfg_scale: float,
img_cfg_scale: float,
timestep_shift: float,
cfg_interval_start: float,
cfg_interval_end: float,
num_steps: int,
seed: int,
think_mode: bool,
image=None,
) -> io.NodeOutput:
width, height = parse_resolution_option(resolution)
result = u1_model.interleave(
prompt=prompt,
input_image=image,
width=width,
height=height,
cfg_scale=cfg_scale,
img_cfg_scale=img_cfg_scale,
timestep_shift=timestep_shift,
cfg_interval=(cfg_interval_start, cfg_interval_end),
num_steps=num_steps,
seed=seed,
think_mode=think_mode,
system_message=system_message,
)
LOGGER.info("SenseNova U1 local interleave generated: %s", comfy_image_info(result.images))
return io.NodeOutput(*interleave_output_to_tuple(result))
class SenseNovaInterleavePreview(io.ComfyNode):
@classmethod
def define_schema(cls) -> io.Schema:
return io.Schema(
node_id="SenseNovaInterleavePreview",
display_name="SenseNova Interleave Preview",
category=LOCAL_CATEGORY,
is_output_node=True,
inputs=[
InterleaveResultIO.Input("interleave_result"),
io.Boolean.Input("include_think", default=False),
io.Image.Input("images", optional=True),
],
outputs=[
io.String.Output(display_name="markdown"),
],
)
@classmethod
def execute(
cls,
interleave_result: dict,
include_think: bool,
images=None,
) -> io.NodeOutput:
markdown = interleave_result_to_markdown(interleave_result, include_think=include_think)
saved_images: list[dict[str, str]] = _save_preview_images(images) if images is not None else []
# Structured parts let the frontend render text and images in their
# original interleaved order instead of stacking them.
parts_payload: list[dict[str, Any]] = []
for part in interleave_result.get("parts", []):
ptype = part.get("type")
if ptype == "think" and not include_think:
continue
if ptype in ("text", "think"):
text = str(part.get("text", "")).strip()
if text:
parts_payload.append({"type": ptype, "text": text})
elif ptype == "image":
idx = int(part.get("index", 0))
img = saved_images[idx] if 0 <= idx < len(saved_images) else None
if img is None:
parts_payload.append({"type": "image", "index": idx, "missing": True})
else:
parts_payload.append(
{
"type": "image",
"index": idx,
"filename": img.get("filename", ""),
"subfolder": img.get("subfolder", ""),
"image_type": img.get("type", "temp"),
}
)
# The custom `parts` field is consumed by web/sensenova_interleave_preview.js;
# `text` mirrors the legacy v1 ui shape.
return io.NodeOutput(
markdown,
ui={"text": [markdown], "parts": parts_payload},
)
def _save_preview_images(images) -> list[dict[str, str]]:
managed_by_comfyui = False
try:
import folder_paths
output_dir = Path(folder_paths.get_temp_directory())
managed_by_comfyui = True
except Exception:
output_dir = Path(tempfile.gettempdir()) / "sensenova_comfyui_preview"
output_dir.mkdir(parents=True, exist_ok=True)
if not managed_by_comfyui:
for stale in output_dir.glob("sensenova_interleave_*.png"):
try:
stale.unlink()
except OSError:
pass
saved: list[dict[str, str]] = []
for index, image in enumerate(comfy_batch_to_pil_images(images)):
filename = f"sensenova_interleave_{uuid.uuid4().hex}_{index:03d}.png"
image.save(output_dir / filename, format="PNG")
saved.append({"filename": filename, "subfolder": "", "type": "temp"})
return saved
class SenseNovaExtension(ComfyExtension):
async def get_node_list(self) -> list[type[io.ComfyNode]]:
return [
SenseNovaChat,
SenseNovaImageGenerate,
SenseNovaPromptBuilder,
SenseNovaVisionURL,
SenseNovaVisionImage,
SenseNovaU1LocalLoader,
SenseNovaU1LocalTextToImage,
SenseNovaU1LocalImageEdit,
SenseNovaU1LocalInterleave,
SenseNovaInterleavePreview,
]
async def comfy_entrypoint() -> SenseNovaExtension:
return SenseNovaExtension()
from __future__ import annotations
from pathlib import Path
PROMPTS_DIR = Path(__file__).resolve().parent / "prompts"
def load_prompt_template(filename: str) -> str:
path = PROMPTS_DIR / filename
try:
return path.read_text(encoding="utf-8").strip()
except FileNotFoundError as exc:
raise RuntimeError(f"Prompt template not found: {path}") from exc
You are a world-renowned "Senior Visual Information Architect" and "AI Image Prompt Engineering Expert." You specialize in transforming fragmented or chaotic [Raw Information] into highly structured, professional Infographic Generation Prompts. Your work is defined by rigorous visual logic, precise spatial organization, and an density of useful information.
# Task
Reconstruct the user’s [Raw Information] into a comprehensive visual synthesis prompt (approx. 400-600 words). Your objective is to guide large image models (e.g., Gemini, Midjourney, DALL-E 3) to render an information-dense infographic featuring advanced typography, a vivid visual style, and perfect structural clarity based solely on your textual description.
# Step-by-Step Methodology
1. **Content Expansion & Textualization**: Analyze the [Raw Information] to extract its core intent.
- Detailing: Extract every entity, number, color, and phrase from the [Raw Information]. Do not summarize.
- Categorization: Define sub-categories with distinct visual markers.
- Density Enrichment: If the input is brief, supplement it with professional annotations, sub-headings, body text and "Pro-tips" or "Key Insights" related to the topic to maximize the "information load".
2. **Adaptive Structural Analysis**:
- User-Defined Priority: First, check if the user has provided specific layout instructions (e.g., "three-column grid," "horizontal timeline"). If present, strictly follow these instructions.
- Logic-Driven Inference: If no layout is specified, analyze the [Raw Information] for its underlying logic (chronological, hierarchical, process-oriented, or comparative) and design a spatial architecture that best serves that logic.
3. **Style Tonal Setting**: If no specific style is provided, assign a unique aesthetic that complements the content (e.g., French hand-drawn collage, modern minimalist matrix, or industrial technical blueprint).
4. **Data Preservation & Encoding**: Ensure all numbers, dates, and proper nouns are 100% preserved. Convert these into explicit visual labels, charts, or callouts within the prompt. Detect the language of the [Raw Information] and use it for 100% of the output. If input is Chinese, output Chinese. If input is English, output English. No mixing.
# Strict Constraints
1. **Strict Language Parity**: Maintain absolute language consistency. If the [Raw Information] is in Chinese, the entire output must be in Chinese; if in English, the output must be in English. No code-switching.
2. **Fidelity to [Raw Information]**: You are prohibited from omitting any proper nouns, dates, colors, or specific values provided in the input.
3. **The "Zero Nonsense" Rule**: STRICTLY FORBIDDEN to include introductory, summary, or meta-commentary text (e.g., "Here is the refined prompt..."). Do not explain design choices or justify element omissions (e.g., do not mention "implied flow"). Start the response immediately with the visual description.
4. **Visual Precision:
- Textures: Mandatorily describe background textures (e.g., off-white aged paper, light gray grid, or black halftone shadows).
- Typography: Explicitly specify font styles for different hierarchies (e.g., bold serif for titles, condensed mono-space for technical data).
5. **Text Rendering Protocol**:
- Quotes for Content: Every piece of text intended to appear in the image MUST be enclosed in quotes.
- No Quotes for Style: NEVER use quotation marks for descriptions of [Style Description], [Layout Structure], colors or any non-textual elements.
6. **Relational Arrow Logic**: Minimize the use of arrows. Rely on spatial proximity or alignment to imply connectivity. If arrows are requested, avoid generic orientations like "horizontal." Instead, specify their precise starting point and target destination.
7. **Semantic Icon Correspondence (CRITICAL)**: You must specifically describe the visual content of every icon to ensure it matches the quoted text. (e.g., "Next to the text 'Apple' is a detailed illustration of a red delicious apple with a green leaf.") Do not use generic terms like "an icon" or "a graphic" without specifying what it is.
8. **No Hexadecimal Codes**: Never use codes like #xxxx. Use descriptive color names (e.g., sage green, deep navy blue, terracotta).
# Output Format (If the [Raw Information] is in Chinese, please translate the following content into Chinese. If the [Raw Information] is in English, please keep the following content in English.)
The theme of the infographic is [Subject Name] (or 此信息图的主题是: [Subject Name]), [Style Description]. The overall layout is [Layout Structure], with a background of [Background Details].
Provide a smooth and fluent description of the prompts for generating professional infographics. The title is: "Subject Name", [Description of elements or icons in the infographic], [Position], and embed the text information within it, enclosed in quotes.
---
Please receive the user's [Raw Information] and directly output the restructured professional image generation prompt:
# Metadata read by `Comfy-Org/publish-node-action` when this directory is
# published to https://registry.comfy.org as the standalone repository
# `OpenSenseNova/ComfyUI-SenseNova-U1`. The SenseNova-U1 monorepo's build
# (hatchling) ignores this file — see ../../pyproject.toml.
#
# Intentionally minimal: no [tool.ruff] / [tool.pytest] / [build-system]
# sections, so the monorepo's root configuration continues to apply to files
# under this directory.
[project]
name = "ComfyUI-SenseNova-U1"
version = "0.1.4"
description = "SenseNova-U1 custom nodes for ComfyUI (API + local inference)."
readme = "README.md"
license = { file = "LICENSE" }
requires-python = ">=3.10"
authors = [{ name = "OpenSenseNova" }]
classifiers = [
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3",
]
dependencies = [
"httpx",
"numpy",
"pillow",
"python-dotenv",
# Tarball URL (not git+https) so pip skips submodule init; see
# requirements.txt for the full rationale. Bump to /tags/vX.Y.Z.tar.gz
# before each official Comfy Registry publish for reproducibility.
"sensenova-u1 @ https://github.com/OpenSenseNova/SenseNova-U1/archive/refs/heads/main.tar.gz",
]
[project.urls]
Homepage = "https://github.com/OpenSenseNova/ComfyUI-SenseNova-U1"
Repository = "https://github.com/OpenSenseNova/ComfyUI-SenseNova-U1"
Source = "https://github.com/OpenSenseNova/SenseNova-U1/tree/main/apps/comfyui"
[tool.comfy]
PublisherId = "sensenova"
DisplayName = "ComfyUI-SenseNova-U1"
Icon = ""
httpx
numpy
pillow
python-dotenv
# Local inference path. Use a GitHub-generated tarball, not git+https, so pip
# doesn't run `git submodule update --init --recursive` — that pulls
# evaluation/easi/* (hundreds of MB of benchmarking subrepos) which ComfyUI
# users never need. GitHub's archive tarball already strips submodule trees.
# Monorepo developers can `pip install -e .` from the repo root instead and
# skip this line via `pip install -r requirements.txt --no-deps`.
sensenova-u1 @ https://github.com/OpenSenseNova/SenseNova-U1/archive/refs/heads/main.tar.gz
import { app } from "../../scripts/app.js";
import { api } from "../../scripts/api.js";
// SenseNova Interleave Preview renders text and images in their original
// interleaved order on the node. The backend pushes a structured
// `ui.parts` array; we map each entry to a DOM node here.
const STYLE_ID = "sensenova-interleave-preview-styles";
const STYLE_CSS = `
.sn-interleave {
padding: 8px;
box-sizing: border-box;
overflow: auto;
font-family: ui-sans-serif, system-ui, -apple-system, "Segoe UI", sans-serif;
font-size: 13px;
line-height: 1.5;
color: var(--input-text, #ddd);
background: var(--comfy-input-bg, #1e1e1e);
border: 1px solid var(--border-color, #333);
border-radius: 6px;
word-break: break-word;
}
.sn-interleave > * { margin: 0 0 8px 0; }
.sn-interleave-text { white-space: pre-wrap; }
.sn-interleave-think {
padding: 6px 8px;
border-left: 3px solid var(--node-selected-color, #6c757d);
background: var(--comfy-menu-bg, #2a2a2a);
color: var(--descrip-text, #aaa);
font-style: italic;
white-space: pre-wrap;
}
.sn-interleave-think summary {
cursor: pointer;
font-style: normal;
font-weight: 600;
}
.sn-interleave-think > div { margin-top: 4px; }
.sn-interleave-image { text-align: center; }
.sn-interleave-image img {
max-width: 100%;
max-height: 480px;
border-radius: 4px;
border: 1px solid var(--border-color, #333);
}
.sn-interleave-placeholder {
color: var(--descrip-text, #888);
font-style: italic;
}
`;
function ensureStyles() {
if (document.getElementById(STYLE_ID)) return;
const style = document.createElement("style");
style.id = STYLE_ID;
style.textContent = STYLE_CSS;
document.head.appendChild(style);
}
function buildImageUrl(part) {
const params = new URLSearchParams({
filename: part.filename || "",
type: part.image_type || "temp",
subfolder: part.subfolder || "",
// Cache-bust because temp filenames may collide across runs.
rand: Math.random().toString(36).slice(2),
});
return api?.apiURL ? api.apiURL(`/view?${params}`) : `/view?${params}`;
}
const RENDERERS = {
text(part) {
const div = document.createElement("div");
div.className = "sn-interleave-text";
div.textContent = part.text || "";
return div;
},
think(part) {
const details = document.createElement("details");
details.className = "sn-interleave-think";
const summary = document.createElement("summary");
summary.textContent = "think";
details.appendChild(summary);
const body = document.createElement("div");
body.textContent = part.text || "";
details.appendChild(body);
return details;
},
image(part) {
const wrap = document.createElement("div");
wrap.className = "sn-interleave-image";
if (part.missing || !part.filename) {
const span = document.createElement("span");
span.className = "sn-interleave-placeholder";
span.textContent = `[image:${part.index} missing]`;
wrap.appendChild(span);
} else {
const img = document.createElement("img");
img.alt = `image ${part.index}`;
img.src = buildImageUrl(part);
wrap.appendChild(img);
}
return wrap;
},
};
function renderParts(container, parts) {
container.innerHTML = "";
if (!parts?.length) {
const empty = document.createElement("div");
empty.className = "sn-interleave-placeholder";
empty.textContent = "(no interleaved output)";
container.appendChild(empty);
return;
}
for (const part of parts) {
const renderer = RENDERERS[part.type];
if (renderer) container.appendChild(renderer(part));
}
}
app.registerExtension({
name: "sensenova.interleave_preview",
async beforeRegisterNodeDef(nodeType, nodeData) {
if (nodeData?.name !== "SenseNovaInterleavePreview") return;
ensureStyles();
const onNodeCreated = nodeType.prototype.onNodeCreated;
nodeType.prototype.onNodeCreated = function () {
const result = onNodeCreated?.apply(this, arguments);
const container = document.createElement("div");
container.className = "sn-interleave";
const hint = document.createElement("div");
hint.className = "sn-interleave-placeholder";
hint.textContent = "Interleave preview output will appear here after the workflow runs.";
container.appendChild(hint);
this.addDOMWidget?.("preview", "interleave_preview", container, {
serialize: false,
hideOnZoom: false,
});
this._snContainer = container;
// Suppress ComfyUI's default node-header image strip; we render images inline.
this.imgs = [];
return result;
};
const onExecuted = nodeType.prototype.onExecuted;
nodeType.prototype.onExecuted = function (message) {
onExecuted?.apply(this, arguments);
if (!this._snContainer) return;
renderParts(this._snContainer, Array.isArray(message?.parts) ? message.parts : []);
this.imgs = [];
this.setDirtyCanvas?.(true, true);
};
},
});
Place demo case images referenced by `README.md` / `README_CN.md` in this folder.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment