Skill
软链接
创建软链接
ln -s /data/cht/ky/datasets/zed_runc /home/cht/ky/datasets
查看软链接
find ~/ky -type l -ls
ls -la ~/ky/datasets
ls -lb 查看软链接
unlink xxxx
rysnc
REMOTE_USER="cht"
REMOTE_HOST="218"
REMOTE_BASE="~/ky"
RSYNC_OPTS="-avz --delete --progress"
rsync $RSYNC_OPTS -e ssh /home/linux/ky/GSFusion/ \
"${REMOTE_USER}@${REMOTE_HOST}:${REMOTE_BASE}/GSFusion/"
rsync $RSYNC_OPTS -e ssh /home/linux/ky/ORB_SLAM3/ \
"${REMOTE_USER}@${REMOTE_HOST}:${REMOTE_BASE}/ORB_SLAM3/"
rsync $RSYNC_OPTS -e ssh /home/linux/ky/Fast-FoundationStereo/ \
"${REMOTE_USER}@${REMOTE_HOST}:${REMOTE_BASE}/Fast-FoundationStereo/"
————————————————————————————————————
普通rsync(incremental)
rsync -avz /path/file.txt cht@218:/home/user/data/
rsync -avz /path/dir/ cht@218:/home/user/data/
rsync -avz --delete --progress cht@218:/home/remote_dir/ /local/path/
(data后面要+斜杠!!)
rsync -avz cht@218:/home/remote_file.txt /local/path/
某些文件不传
rsync -avz --progress \
--exclude 'gsfusion_out/' \
--exclude 'gsfusion.yaml' \
--exclude 'gsfusion_orb_run.yaml' \
--exclude 'pose_orb_gsfusion.txt' \
cht@218:/home/cht/ky/datasets/zed_runj/ ./zed_runj/
压缩文件夹
zip -r mydata.zip ./myfolder
curl
curl -L -o depth_anything_v2_vitl.pth "https://huggingface.co/depth-anything/Depth-Anything-V2-Large/resolve/cbbb86a30ce19b5684b7a05155dc7e6cbc7685b9/depth_anything_v2_vitl.pth"
(-L:跟随跳转(Hugging Face 常会 302 到实际存储地址,不加容易下到 HTML 或失败)。
-o 文件名:保存成指定文件名;当前目录会生成 depth_anything_v2_vitl.pth。
--progress-bar 显示进度条)
下载文件
终端不自动配代理,需要自己设置
wget -e "https_proxy=[http://127.0.0.1](http://127.0.0.1):10808" -c [https://www.simulation.openfields.fr/phocadownload/CloudCompare_20260128_151607.dmg]
git clone
export http_proxy=http://127.0.0.1:10808
export https_proxy=http://127.0.0.1:10808
git clone ....
避免占用满盘的 /tmp 和家目录缓存:
mkdir -p /data/cht/tmp /data/cht/pip-cache
export TMPDIR=/data/cht/tmp
export PIP_CACHE_DIR=/data/cht/pip-cache
diff
diff -qr /home/cht/ky /data/cht/ky
进程状态查询
ps -p 40292,111682,97264 -o pid,stat,cmd
ps -o ppid= -p 40292 看父进程
cuda
Introduction
CUDA(Compute Unified Device Architecture,统一计算设备架构)是由 NVIDIA 推出的一个并行计算平台和编程模型。它允许软件开发者利用 NVIDIA GPU(图形处理器)进行通用计算(GPGPU),从而显著加快计算密集型应用的运行速度。
在 CUDA 出现之前,GPU 主要用于图形渲染。CUDA 的诞生让开发者可以使用类 C/C++ 语言来编写在 GPU 上运行的代码,处理科学计算、人工智能、视频处理等任务
CUDA 的核心架构概念
CUDA 的设计核心是将计算任务拆分为成千上万个小的、可并行执行的任务。
异构计算 (Heterogeneous Computing)
CUDA 编程涉及两个主要角色:
Host(主机): 指 CPU 及其内存。负责控制逻辑、内存分配和数据准备。
Device(设备): 指 GPU 及其显存。负责大规模的并行计算。
编程模型:核函数 (Kernel)
Kernel 是 CUDA 编程的核心。它是在 GPU 上并行执行的函数。当你调用一个 Kernel 时,它不是只运行一次,而是由大量的 线程 (Threads) 同时运行。
CUDA 的层级结构 (Hierarchy)
为了管理数以万计的线程,CUDA 采用了三层逻辑结构:
Thread (线程): 最小的执行单位。
Thread Block (线程块): 一组线程的集合。同一个 Block 内的线程可以进行协作,通过共享内存交换数据。
Grid (网格): 所有的 Thread Block 组成了 Grid。一个 Grid 对应一次 Kernel 的启动。
硬件映射
SP (Streaming Processor): 也称 CUDA Core,是执行单个线程的硬件单元。
SM (Streaming Multiprocessor): 一个 SM 包含多个 SP。一个 Thread Block 会被调度到一个 SM 上执行。
CUDA 的工作流程
编写一个典型的 CUDA 程序通常遵循以下四个步骤:
分配显存: 在 GPU (Device) 上使用
cudaMalloc分配空间。拷贝数据: 将数据从 CPU (Host) 内存拷贝到 GPU 显存 (
cudaMemcpy)。启动 Kernel: 配置 Grid 和 Block 的维度,在 GPU 上并行执行计算任务。
写回结果: 将计算结果从 GPU 拷贝回 CPU 内存,并释放显存。
CUDA 与 CUDA Toolkit
PyTorch 自带的 CUDA (Runtime) —— “家具搬运工”
当你通过 conda install pytorch torchvision pytorch-cuda=12.1 安装时,你得到的是 CUDA Runtime(运行库)。
它的作用: 仅仅是为了让已经写好的程序(预编译好的模型)(
.so或.pyd文件)能在显卡上跑起来。
完整的 CUDA Toolkit —— “家具制造工具箱”
这是 NVIDIA 官方提供的完整开发包。
核心组件:
nvcc(CUDA 编译器)。它的作用: 当你需要把 C++/CUDA 源代码(比如你刚才安装的
diff-gaussian-rasterization)编译成电脑能识别的二进制文件时,必须用到它。
pytorch
PyTorch 的核心支柱
张量 (Tensors)
Tensor 是 PyTorch 中最基础的数据结构。它在数学上是一个多维数组,与 NumPy 的 ndarray 非常相似,但有一个致命的优势:Tensor 可以直接在 GPU 上运行,从而利用 CUDA 加速。
自动求导 (Autograd)
在神经网络中,计算梯度(反向传播)是最复杂的数学部分。PyTorch 的 torch.autograd 模块能够自动记录你在 Tensor 上执行的所有操作,并在你需要时自动计算导数。
你只需定义正向传播(Forward Pass)。
loss.backward()会自动完成复杂的链式法则计算。
动态计算图 (Dynamic Computational Graph)
这是 PyTorch 区别于早期 TensorFlow 的最大特征。
静态图: 必须先定义完整的网络结构,然后才能喂入数据(像建房子,图纸画好才能动工)。
动态图 (Define-by-Run): 图是在代码运行时实时构建的。这意味着你可以使用 Python 的
if分写、for循环来改变网络结构,调试起来就像调试普通 Python 程序一样简单。
核心模块
PyTorch 的标准工作流
准备数据: 将原始数据转换为 PyTorch Tensors。
构建模型: 继承
nn.Module类,在__init__中定义层,在forward中定义数据流向。定义损失与优化器: 选择衡量错误的标准(如交叉熵)和更新权重的方法。
循环训练:
清零梯度:
optimizer.zero_grad()。前向传播: 得到预测值。
计算损失: 比较预测值与真实值的差异。
反向传播:
loss.backward()计算每个参数的梯度。更新参数:
optimizer.step()。
PyTorch 与 CUDA
PyTorch 通过简单的指令就能将计算迁移到显卡上:
检测设备:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")搬运模型:
model.to(device)搬运数据:
data.to(device)
pip install -e 命令全流程(/data/cht/ky/gaus-slam/third_party/diff-gaussian-rasterization-w-depth为例)
解析与入口:pip 怎么决定“怎么装”
解析参数:确认是 本地路径、editable。
读项目元数据:
若有
pyproject.toml:按 PEP 517/518,用里面声明的 build-system(通常是setuptools.build_meta)当构建后端。若只有
setup.py:新版本 pip 仍可能生成/假定最小pyproject.toml或走 legacy,但 editable 在 pip 21+ 常走prepare_metadata_for_build_editable/build_editable这一套。
结论:对你这种带 CUDA 扩展 的包,pip 一定要跑一段“构建逻辑”(至少生成元数据,再编译扩展),不会只拷贝几个
.py就结束。
2. 默认(无 --no-build-isolation):隔离构建环境
创建临时目录,例如
/tmp/pip-build-env-xxxxx。Installing build dependencies:在该临时环境里安装pyproject.toml里[build-system] requires = ...列出的包(如setuptools、wheel,通常没有torch)。启动子进程(你日志里的
pip/_vendor/pyproject_hooks/_in_process/_in_process.py):这个子进程的
sys.path以临时环境为主,看不到你在gaus里装的 torch。
调用构建后端,例如:
get_requires_for_build_editable或prepare_metadata_for_build_wheel/prepare_metadata_for_build_editable
这一步会 执行 setup.py(或 setuptools 从 setup.py 读配置)。你的 setup.py 前几行是:
setup.pyLines 12-13
from setuptools import setup
from torch.utils.cpp_extension import CUDAExtension, BuildExtension因此 一进 setup.py 就要 import torch。隔离环境里 没有 torch → ModuleNotFoundError: No module named 'torch'(你第一次报错)。
此时还没执行 nvcc、也没去 GitHub——失败点就在 解释器 import。
3. 加上 --no-build-isolation 之后
不再用
/tmp/pip-build-env-...里那套隔离 Python 来跑构建钩子(或显著减少隔离)。构建子进程用的
sys.path包含当前已激活环境(如gaus)→import torch成功。继续
prepare_metadata_for_build_editable:仍会执行/分析setup.py,此时会进一步import torch.utils.cpp_extension。torch/utils/cpp_extension.py里有from pkg_resources import packaging(PyTorch 1.12)。若pkg_resources不可用(setuptools 82 等)→No module named 'pkg_resources'(你第二次报错)。
当我们遇到 import pkg_resources 失败时,我们是怎么知道 这个包来自于 setuptools 的,应该如何查阅信息?
当你遇到 ModuleNotFoundError: No module named 'xxx',但运行 pip install xxx 又提示找不到包时,可以采用以下几种方案
访问 pypi.org。

没法直接查到,因为它不是独立的项目:
pkg_resources就像是笔记本电脑自带的“触控板驱动”,它不是独立卖的,而是作为setuptools这个大礼包的一部分打包赠送的。截图里的第 5 个和第 6 个搜索结果透露了其原产地setuptools
如果在一个运行正常的环境中想验证来源,可以这样做:
import pkg_resources print(pkg_resources.__file__)
conda install setuptools 显示已安装,但 import pkg_resources 仍失败,为何能判定是 setuptools 版本过新 导致的
setuptools 新版本不再提供/不再暴露顶层 pkg_resources。
pip install --force-reinstall "setuptools==69.5.1" wheel解释此命令
仍然没有编译 CUDA——还是 纯 Python 导入链 阶段。
4. 元数据准备好之后:build_editable / 编译扩展
元数据阶段通过后,pip 会进入 真正构建(你日志里的 Building editable for diff_gaussian_rasterization):
setup.py里定义的ext_modules:你的项目是CUDAExtension(..., sources=[... .cu, ext.cpp])。cmdclass={'build_ext': BuildExtension}:由 PyTorch 的BuildExtension接管,内部会调nvcc/g++,按TORCH_CUDA_ARCH_LIST、可见 GPU、CUDA_HOME等生成diff_gaussian_rasterization._C这个扩展模块。只有走到这里,才涉及 CUDA 编译、驱动、
libcuda。你后来在_get_cuda_arch_flags或cuInit上的问题,都属于 这一阶段或运行阶段,和前面的torch/pkg_resourcesimport 是不同阶段。editable 的结果:在
site-packages里写入 指向源码目录的链接方式(.pth+ dist-info / 现代 editable 布局),使import diff_gaussian_rasterization使用/data/cht/ky/.../diff_gaussian_rasterization下的包 + 已编译的_C。
报错 IndexError: list index out of range
torch/utils/cpp_extension.py试图获取当前显卡的架构信息(例如sm_86或sm_80),以便告诉编译器该按什么标准编译 CUDA 代码。但是,因为以下原因,PyTorch 没能拿到任何架构信息,导致
arch_list变成了空的[]。当它尝试执行arch_list[-1] += '+PTX'(访问最后一个元素)时,就报了“索引超出范围”的错误。为什么系统检测不到 CUDA?
驱动是否正常: 输入 nvidia-smi。如果报错,说明宿主机显卡驱动有问题。
NVCC 是否安装: 输入 nvcc -V。如果你正在编译 CUDA 插件,必须安装完整的 CUDA Toolkit,而不仅仅是 PyTorch 自带的运行时。
权限问题: 如果你在 Docker 容器或特定的服务器环境下,请确保启动命令中包含了
--gpus all。python -c "import torch; print(torch.cuda.device_count(), torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'no cuda')" 确认 PyTorch 能不能通过驱动调动显卡进行计算。
在 执行上面这个python命令时,未发现显卡,警告 UserWarning: CUDA driver initialization failed, you might not have a CUDA gpu.
这通常意味着 PyTorch 找到了显卡,但打不开驱动的接口。可能是因为Conda 环境里混入了不兼容的 libcuda.so。
复习:动态链接库
在 Linux 中,它们通常以 .so(Shared Object)结尾;在 Windows 中,则是 .dll。
核心逻辑: 以前写程序,所有需要的代码都要塞进同一个文件里(这叫静态链接),导致程序又大又笨。 现在,程序员把常用的功能(比如如何绘图、如何计算数学公式、如何调用显卡)拆出来,做成独立的文件。
“共享”特性:一个
libcuda.so文件可以同时被 PyTorch、TensorFlow 和你的自定义代码使用。内存里只需要存一份,省空间。“动态”特性:程序运行的时候才去“借”这些代码,而不是在编译时就死死地绑在一起。
ldconfig -p | grep libcuda
ldconfig
Configure dynamic linker run-time bindings。
它是一个系统管理工具,用于配置动态链接器的运行绑定。它会扫描
/lib、/usr/lib等目录,并维护一个名为/etc/ld.so.cache的缓存文件,让系统能快速找到共享库(.so文件)。
-p (print)
作用:这个选项告诉
ldconfig打印(列出)当前缓存中所有已知的共享库名称及其对应的文件路径。

调用 CUDA Driver API 进行debug
(gaus) cht@10-71-106-251:/data/cht/ky/gaus-slam/third_party/diff-gaussian-rasterization-w-depth$ python << 'EOF'
> import ctypes
> lib = ctypes.CDLL("libcuda.so.1")
> cuInit = lib.cuInit
> cuInit.argtypes = [ctypes.c_uint]
> cuInit.restype = ctypes.c_int
> err = cuInit(0)
> print("cuInit returned:", err, "(0 = CUDA_SUCCESS)")
> if err != 0:
> # 可选:再打印 device count API
> pass
> EOF
cuInit returned: 3 (0 = CUDA_SUCCESS) python << 'EOF' ... EOF: 作用:它允许你在命令行里直接写多行 Python 代码,然后一次性喂给
python解释器执行,而不需要先创建一个.py文件。'EOF':这是一个标识符,告诉系统:“从这里开始是代码,直到再次看到 EOF 为止”。

3:代表CUDA_ERROR_NOT_INITIALIZED虽然系统里有libcuda.so文件(索引正常),但驱动程序无法与硬件(显卡)正常通信。
检查版本是否一致
(ffs) cht@10-71-106-251:/data/cht/ky/Fast-FoundationStereo$ dpkg -S /lib/x86_64-linux-gnu/libcuda.so.1 2>/dev/null || rpm -qf /lib/x86_64-linux-gnu/libcuda.so.1 2>/dev/null
Please ask your administrator.
(ffs) cht@10-71-106-251:/data/cht/ky/Fast-FoundationStereo$ ls -l /lib/x86_64-linux-gnu/libcuda.so.1
lrwxrwxrwx 1 root root 20 5月 29 2025 /lib/x86_64-linux-gnu/libcuda.so.1 -> libcuda.so.575.57.08
(ffs) cht@10-71-106-251:/data/cht/ky/Fast-FoundationStereo$ cat /proc/driver/nvidia/version
NVRM version: NVIDIA UNIX Open Kernel Module for x86_64 575.57.08 Release Build (dvs-builder@U22-I3-H04-01-5) Sat May 24 07:03:13 UTC 2025
GCC version: gcc version 9.4.0 (Ubuntu 9.4.0-1ubuntu1~20.04.2) 在 Linux 系统中,NVIDIA 驱动并不是一个单一的整体,而是分为两个核心部分,它们必须严格匹配才能工作
查看错误日志 dmesg | grep -i "NVRM" | tail -n 20

NV_ERR_NO_MEMORY(0x51)。这说明驱动程序在尝试初始化时,想要申请一部分系统内存(RAM)来作为管理缓存,但操作系统内核拒绝了这一请求
numpy : 入门到入土
核心数据结构:ndarray (多维数组)
NumPy 的灵魂是
ndarray对象。相比于 Python 自带的list,NumPy 数组的元素类型必须相同,这使得它在内存中连续存储,计算速度极快。
import numpy as np
# 从列表创建一维/二维数组
arr1 = np.array([1, 2, 3])
arr2 = np.array([[1, 2], [3, 4]])
# 常用内置创建函数
zeros_arr = np.zeros((3, 3)) # 创建 3x3 全 0 数组
ones_arr = np.ones((2, 4)) # 创建 2x4 全 1 数组
range_arr = np.arange(0, 10, 2) # 类似 range(),结果为 [0, 2, 4, 6, 8]
lin_arr = np.linspace(0, 1, 5) # 0到1之间等距生成5个数 [0., 0.25, 0.5, 0.75, 1.]数组属性查看
arr = np.array([[1, 2, 3], [4, 5, 6]])
print(arr.shape) # 输出 (2, 3) -> 表示2行3列
print(arr.ndim) # 输出 2 -> 数组维度(二维)
print(arr.size) # 输出 6 -> 元素总个数
print(arr.dtype) # 输出 int64 (或 int32) -> 数据类型索引与切片 (Indexing & Slicing) ★
arr = np.arange(10) # [0 1 2 3 4 5 6 7 8 9]
# 基础切片
print(arr[2:5]) # 输出 [2 3 4]
# 多维数组索引
arr2d = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
print(arr2d[1, 2]) # 输出 6 (第2行,第3列)
print(arr2d[:, 1]) # 输出 [2 5 8] (提取所有行的第2列)
# 布尔索引 (非常常用于数据过滤)
print(arr[arr > 5]) # 输出 [6 7 8 9]形状操作 (Shape Manipulation)
arr = np.arange(12) # 12个元素的1维数组
# 改变形状 (不改变数据本身)
reshaped = arr.reshape(3, 4) # 变成 3行4列 的二维数组
# 展平数组
flattened = reshaped.flatten() # 重新变回1维数组
# 数组拼接
a = np.array([[1, 2], [3, 4]])
b = np.array([[5, 6]])
np.concatenate((a, b.T), axis=1) # 沿列方向拼接数学运算与广播机制 (Broadcasting)
a = np.array([1, 2, 3])
b = np.array([4, 5, 6])
# 元素级运算
print(a + b) # [5 7 9]
print(a * b) # [4 10 18]
# 统计聚合
print(a.sum()) # 6
print(a.mean()) # 2.0 (平均值)
print(a.max()) # 3
# 广播机制 (Broadcasting)
# 当数组形状不同时,NumPy 会自动扩展它们以进行计算
print(a + 10) # [11 12 13] (10被"广播"到了每一个元素上)command
ffs
CUDA_VISIBLE_DEVICES=1
python scripts/zed_ffs_record_tum.py --out_dir ~/ky/datasets/zed_runb --max_frames n
python scripts/zed_stereo_record_tum.py --out_dir ~/ky/datasets/zed_runb --max_frames n
python scripts/zed_stereo_record_tum.py --svo_record --svo_preview --max_frames 3000 --out_dir ~/ky/datasets/zed_runc --target_fps 6
python scripts/run_ffs_offline.py ~/ky/datasets/zed_runc --zfar 12
ORB_ZED_ROBUST_ORB=1 bash ~/ky/Fast-FoundationStereo/scripts/run_orb_slam3_zed_tum.sh ~/ky/datasets/zed_runc
GSFUSION_POSE_MODE=orb bash ~/ky_mirror/Fast-FoundationStereo/scripts/run_gsfusion_on_tum.sh ~/ky_mirror/datasets/zed_runh
GSFUSION_ORB_START_LINE=266 GSFUSION_POSE_MODE=orb bash ~/ky/Fast-FoundationStereo/scripts/run_gsfusion_on_tum.sh ~/ky/datasets/zed_runf
————————————————————————————————————————————
guas
SKIP_FFS=1 USE_MP=1 bash ~/ky/Fast-FoundationStereo/scripts/run_gaus_slam_zed.sh ~/ky/datasets/zed_runc
CUDA_VISIBLE_DEVICES=1 && SKIP_FFS=1 bash ~/ky_mirror/Fast-FoundationStereo/scripts/run_gaus_slam_zed.sh ~/ky_mirror/datasets/zed_runh
export GAUS_LOCALMAP_FRAMES=24 && export GAUS_NUM_BA_ITER=80 && export GAUS_NUM_TRACK_ITER=80 && SKIP_FFS=1 bash ~/ky/Fast-FoundationStereo/scripts/run_gaus_slam_zed.sh ~/ky/datasets/zed_runf 2>&1 | tee ~/ky/datasets/zed_runf/gaus_slam_run.log
export CUDA_VISIBLE_DEVICES=2 && export PYTORCH_CUDA_ALLOC_CONF=max_split_size_mb:64 && export GAUS_LOCALMAP_FRAMES=16 && export GAUS_NUM_BA_ITER=60 && export GAUS_NUM_TRACK_ITER=60 && export GAUS_NUM_COVIS_SUBMAPS=12 && GAUS_FRAME_STRIDE=1 SKIP_FFS=1 bash ~/ky/Fast-FoundationStereo/scripts/run_gaus_slam_zed.sh ~/ky/datasets/zed_runf 2>&1 | tee ~/ky/datasets/zed_runf/gaus_slam_run.log
cd ~/ky_mirror/gaus-slam
python scripts/convert_gaussians_ply_for_supersplat.py \
output/zed_chunked_run1/part000/save/gaussians.ply \
-o output/zed_chunked_run1/part000/save/gaussians_supersplat.ply
python scripts/chunked_gaus_zed.py --dataset ~/ky_mirror/datasets/zed_runf --out-root output/zed_chunke
d_run1 --chunk-size 400 --overlap 80
python scripts/downsample_gaussians_ply.py /home/cht/ky_mirror/gaus-slam/output/zed_chunked_run1/merged_gaussians.ply -o /home/cht/ky_mirror/gaus-slam/output/zed_chunked_run1/huge_5pct.ply --fraction 0.3 --seed 0
________________________________________________________________________________
datagen
cd ~/cht/SimpleProc && conda activate datagen && CUDA_VISIBLE_DEVICES=2 python datagen/run_local_scene_generation.py --output-dir output/stereo_batch --batch-start 51 --batch-end 100 --stereo-baseline 0.1 --batch-render
cd ~/cht/SimpleProc/infinigen && CUDA_VISIBLE_DEVICES=1 ../blender/blender -b -P ../datagen/render_stereo_video.py -- --input /home/cht/cht/SimpleProc/output/stereo_batch/scene_0/scene.blend --output /home/cht/cht/SimpleProc/output/stereo_video_run0 --frames 360 --arc_degrees 360 --orbit_axis Z --total_pixels $((576*768)) --save_cams 1 --save_depth 1
--frames 360 调的更高可以使得采样频率变高
cd ~/cht/SimpleProc/infinigen && CUDA_VISIBLE_DEVICES=1 ../blender/blender -b -P ../datagen/render_stereo_video.py -- --input /home/cht/cht/SimpleProc/output/stereo_batch/scene_0/scene.blend --output /home/cht/cht/SimpleProc/output/stereo_video_run1 --frames 360 --arc_degrees 540 --orbit_axis Z --total_pixels $((576*768)) --save_cams 1 --save_depth 1 --orbit_radius_factor 1.5 --random_motion_deg 10 --random_motion_smooth 25 --motion_seed 42 --wobble_deg 7 --wobble_cycles 3
--orbit_radius_factor(默认 1.0) 在挂到 StereoVideoOrbit 父物体之后,把两个 camrig 相对枢轴的 局部位移统一乘以该系数。大于 1 时等价于绕着场景枢轴退远一圈
--random_motion_deg(默认 0)
在主轴轨道之上叠加 平滑随机欧拉角,XYZ 三路独立噪声,峰值约为设定度数(帧间用滑动平均平滑)。
--random_motion_smooth(默认 21) 平滑窗口越大,抖动越「缓」、越不像抽搐
--arc_degrees:仍可加大总转角(例如 540、720**)做多圈或更大弧长
--wobble_deg:在与主轴垂直的平面内做 正弦摆动(近似椭圆 tilt)。
--wobble_cycles:整条序列里摆动的周期数(例如3)。
ffmpeg -y -framerate 30 -i /home/cht/cht/SimpleProc/output/stereo_video_run0/left/%06d.png -framerate 30 -i /home/cht/cht/SimpleProc/output/stereo_video_run0/right/%06d.png -filter_complex "[0:v][1:v]hstack=inputs=2[v]" -map "[v]" -c:v libx264 -pix_fmt yuv420p /home/cht/cht/SimpleProc/output/stereo_video_run0/stereo_sbs.mp4
————————————
infinigen
python -m infinigen.datagen.manage_jobs --output_folder outputs/hello_stereo --overwrite --num_scenes 1 --specific_seed 0 --configs desert.gin simple.gin extras/stereo_training.gin --pipeline_configs local_256GB.gin stereo.gin blender_gt.gin --pipeline_overrides LocalScheduleHandler.use_gpu=True
(deset_gin 可以换成 snowy_mountain.gin、forest.gin、mountain.gin、plain.gin、river.gin、coast.gin、cliff.gin、canyon.gin、cave.gin、arctic.gin、kelp_forest.gin、under_water.gin、coral_reef.gin)
双目单图
cd ~/cht/infinigen && python -m infinigen.datagen.manage_jobs --output_folder outputs/stereo_video_rrt --overwrite --num_scenes 1 --specific_seed 0 --configs desert.gin simple.gin extras/stereo_training.gin extras/rrt_cam_nature.gin --pipeline_configs local_256GB.gin stereo_video.gin blender_gt.gin --pipeline_overrides LocalScheduleHandler.use_gpu=True iterate_scene_tasks.frame_range=[1,120] iterate_scene_tasks.cam_block_size=24 --overrides fine_terrain.mesher_backend="OcMesher"
双目动态图像系列
________________________________________________________________________
pi3x
cd /home/cht/cht/Pi3 && PYTORCH_CUDA_ALLOC_CONF=expandable_segm
ents:True python scripts/waft_stereo_pi3x_baseline.py --left_dir /mnt/nas/share/home/cht/ky_mirror/datasets
/zed_runi/rgb --right_dir /mnt/nas/share/home/cht/ky_mirror/datasets/zed_runi/right --waft_root /home/cht/c
ht/WAFT-Stereo --waft_config configs/SynLarge/DAv2L-5.yaml --waft_ckpt /home/cht/cht/WAFT-Stereo/ckpts/SynL
arge/DAv2L-5.pth --baseline_m 0.54 --fx_pixel 536.7 --max_frames 1045 --pixel_limit 120000 --pi3x_ckpt /hom
e/cht/cht/Pi3/ckpts/Pi3X/model.safetensors --out_dir /home/cht/cht/Pi3/out_vo --vo_fusion --vo_chunk_size 6
0 --vo_overlap 20
DS=/home/cht/ky_mirror/datasets/zed_runj
PI3=/mnt/nas/share/home/cht/cht/Pi3/out_vo
mv "$DS/depth" "$DS/depth_ffs_backup"
mkdir -p "$DS/depth"
for i in $(seq 0 1044); do
src=$(printf "$PI3/depth_%03d.png" "$i")
dst=$(printf "$DS/depth/%06d.png" "$i")
ln -sf "$src" "$dst"
done
CUDA_VISIBLE_DEVICES=5 GSFUSION_POSE_MODE=orb GSFUSION_MAX_FRAMES=1045 \
bash ~/ky_mirror/Fast-FoundationStereo/scripts/run_gsfusion_on_tum.sh ~/ky_mirror/datasets/zed_runj orb
Bash语法
Shebang
#!/usr/bin/env bash告诉系统 请使用
/bin/bash这个程序来解释并运行接下来的内容
变量 (Variables)
注意:赋值时
=两边不能有空格。
name="Gemini"
echo "Hello, $name"条件判断 (Conditionals)
Bash 的括号非常讲究,建议使用
[[ ... ]]。
if [[ $name == "Gemini" ]]; then
echo "识别成功"
else
echo "未知用户"
fi"$( ... )" 和 "${ ... }"
"$( ... )"
它的作用是:执行括号内的命令,并把命令打印出来的结果拿回来使用。
双引号""
作用: 防止路径中包含空格。
后果: 如果你的文件夹叫
My Projects(带空格),没有引号的话,Bash 会把空间切断,导致脚本报错“找不到 My 文件夹”。加上引号后,整个路径会被视为一个整体。
${ ... }:参数扩展防止歧义(边界定界)
name="data"
echo "$name_file" # 错误:系统会去找名为 $name_file 的变量
echo "${name}_file" # 正确:打印 data_file
默认值处理
# 如果 FFS_ROOT 没设置,就使用后面的默认路径
ROOT="${FFS_ROOT:-/default/path}"
ffs 和 guas-slam相关脚本 Fast-FoundationStereo/scripts/run_gaus_slam_zed.sh为例
set -euo pipefail
-e(errexit): 只要任何一个命令返回非零值(即出错),脚本立即退出。-u(nounset): 如果使用了未定义的变量,报错并退出。防止因为拼写错误(比如把$RESULT写成$RESUL)导致意想不到的后果。-o pipefail: 只要管道命令(如A | B | C)中任何一个环节失败,整个管道就被视为失败。默认情况下,Bash 只看最后一个命令的成功与否。
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
${BASH_SOURCE[0]}永远能拿到脚本文件本身的路径dirname 去掉路径中的文件名,只保留目录部分
cd ... && pwd—— 转换为绝对路径
"${FFS_ROOT:-$SCRIPT_DIR/..}"
:- 这是核心操作符,意思是:“如果左边的变量未设置或为空,则使用右边的值”
DS_INPUT="${1:?用法: $0 /path/to/zed_dataset}"
(核心作用是:“如果用户运行脚本时没给参数,直接报错并自毁。”)
:? 这是一个 “必填检查” 操作符。
如果
$1存在且不为空,它就返回 $1 的值。如果
$1为空或未设置,它会把后面的字符串作为错误信息打印到标准错误流(stderr),并立即终止脚本(退出状态码为 1)。
文件存在性校验
if [[ ! -f "$DS/meta.json" ]]; then
echo "错误: 缺少 $DS/meta.json" >&2
exit 1
fiif [[ ... ]]; then ... fi 是条件判断,[[...]]这样更稳健
-f:这是一个文件测试操作符,检查路径是否为普通文件(如果路径是一个目录或者不存在,则为假)。>&2
默认情况下,
echo的输出去往“标准输出 (stdout)”。在 Linux 规范中,报错信息应该通过“标准错误 (stderr)”通道发出。这样即使你把脚本的正常运行结果保存到文件里(例如 ./script.sh > output.txt),错误提示依然能直接显示在你的屏幕上,而不会混进结果文件里。
[[ -z "$(ls -A "$DS/depth" 2>/dev/null || true)" ]]
ls -A:列出目录下所有文件,但不包括.和..(当前目录和父目录)。2>/dev/null:如果文件夹不存在,
ls会报错。这行代码把错误信息扔掉,不显示在屏幕上。|| true:这是一个安全锁。防止 ls 报错导致整个脚本因为 set -e 而崩溃。$( ... ):获取命令执行的输出。-z:检查字符串是否为空。
结论:如果文件夹存在但里面没东西,该条件为“真”。
Scal3R 对 VGGT的利用
入口: run.py , run_inference
和上面我写的框架类似,run.py 组装好 request 和 config ,然后给 run_inference 进行最终的 command执行
辅助函数 load_data()
收集图片路径 (Image Collection)
if recorder is not None:
recorder.record('collect_images.begin', input_dir=args.input_dir, image_patterns=args.image_patterns)
image_paths = collect_image_paths(args.input_dir, args.image_patterns)
if args.max_images > 0:
image_paths = image_paths[:args.max_images]
# ... recorder 打点与 maybe_stop_after ...
功能: 根据传入的
input_dir和image_patterns(如*.png,*.jpg),扫描目标文件夹,获取所有符合条件的图片绝对路径。截断: 如果配置了
max_images(大于 0),则截取前 N 张图片,常用于快速测试或截断过长的序列。探针 (Recorder): 在开始和结束时记录状态(图片总数、首尾图片路径),并支持在收集完成后提前终止程序(
maybe_stop_after),方便调试。
加载与预处理图片 (Load & Preprocess)
# ... recorder 打点 ...
sequence, height, width = load_and_preprocess_images(
image_paths,
dataset_cfg,
preprocess_workers=args.preprocess_workers,
)
# ... recorder 打点与 maybe_stop_after ...
功能: 调用
load_and_preprocess_images,使用多线程(由preprocess_workers控制)并行读取图片。处理内容: 根据
dataset_cfg中的规则,这通常包括调整图片尺寸(Resize)、中心裁剪(Center Crop)或颜色归一化等操作。返回: 返回预处理好的图像序列(sequence),以及统一后的高度(height)和宽度(width)。
计算分块参数 (Chunking Math)
这是该函数中最核心的逻辑之一,用于将长序列拆分成模型可处理的小块,这也是 VGGT-Long 和 Scal3R 解决显存限制的关键策略 。
n_samples = len(sequence)
block_size, overlap_size = args.block_size, args.overlap_size
assert block_size > overlap_size, f'[ERROR] block_size {block_size} must be larger than overlap_size {overlap_size}'
n_srcs = block_size
n_blocks = (n_samples - overlap_size + (n_srcs - overlap_size) - 1) // (n_srcs - overlap_size)
if n_blocks == 0 or n_samples <= block_size:
block_size = n_samples
n_srcs = n_samples
n_blocks = 1
防御性编程: 确保块的大小(
block_size)严格大于重叠的大小(overlap_size),否则分块逻辑会陷入死循环或计算错误。计算总块数 (
n_blocks): 这是一个经典的滑动窗口计算公式。序列总长度为
n_samples。除了第一个块,后续每个块向前步进的距离是 (block_size - overlap_size)。
公式
(n_samples - overlap_size + step - 1) // step相当于向上取整,确保即使最后剩下的图片不足一个完整的block_size,也会被分配到一个新的块中。
极短序列兜底: 如果总图片数
n_samples还不如一个block_size大,那么就只划分 1 个块(n_blocks = 1),块的大小等于图片总数。默认 一个块 含 60张图
backend.py --> main()
@log_exceptions(logger, "Unhandled exception in backend")
def main():作用: 这是一个 Python 装饰器。由于
backend.py是被主进程(run.py)拉起的独立子进程,如果它在跑 GPU 算力时因为某些底层错误(比如显存溢出、张量维度不对)突然崩溃,直接抛出系统级的 Traceback 会非常难看,且容易导致父进程假死。优雅兜底: 这个装饰器会像一张网一样兜住 main() 函数里所有未被捕获的崩溃异常,用
logger把它工工整整地记录到日志文件里,然后再安全退出。
args = parse_args()
args.config = str(resolve_release_path(args.config))
args.result_dir = str(resolve_release_path(args.result_dir or get_default_output_dir("run")))
args.runtime_dir = str(resolve_release_path(args.runtime_dir) if args.runtime_dir else join(args.result_dir, 'runtime'))
if args.checkpoint:
args.checkpoint = str(resolve_release_path(args.checkpoint))
if args.loop_ckpt:
args.loop_ckpt = str(resolve_release_path(args.loop_ckpt))
if args.offload_dir:
args.offload_dir = str(resolve_release_path(args.offload_dir))
if args.probe_dir:
args.probe_dir = str(resolve_release_path(args.probe_dir))子进程 接受 参数,全换成绝对路径
device = torch.device(args.device)将传入的设备字符串(比如
cuda、cuda:0或cpu)转化为 PyTorch 能够识别的硬件对象。在此之后,所有的模型、张量(Tensor)都会认准这个device进行计算。
sampler, dataset_cfg = build_sampler_from_config(args.config, device, args.checkpoint)sampler是已加载权重、在指定设备上 eval 的 Scal3R 模型;dataset_cfg是与配置里 dataloader/val 对齐的、带默认补全的图像/相机预处理参数字典,供后续和训练时数据管线一致地预处理输入。
amp_enabled = bool(args.test_use_amp and device.type == 'cuda')
amp_dtype = torch.bfloat16 if device.type == 'cuda' and torch.cuda.get_device_capability(device)[0] >= 8 else torch.float16AMP 的意义: 开启混合精度后,PyTorch 会自动把一部分计算降级到 16 位浮点数。这不仅能将显存占用减半,还能在较新的显卡上让计算速度翻倍。
with torch.no_grad():
with torch.amp.autocast('cuda', enabled=amp_enabled, dtype=amp_dtype):
output = forward(sampler, batches, args, recorder=recorder)在训练模型时,PyTorch 会悄悄记录所有的计算步骤以便反向传播算梯度,这会吃掉海量的显存。因为我们现在是在做推理(Inference),加上这句,就彻底掐断了梯度记录机制
让刚才配置好的混合精度策略(AMP)在
forward运行期间全局生效。巨大的数据块被送进
forward函数(也就是送进了 VGGT 骨干网络和后续的各种 Decoder),吐出了一堆局部的相机位姿、深度图和 3D 点云(存放在output中)
processed, output, batches, indices, visualize = post_process(
output,
batches,
...
alignment='sim3_wet',
)神经网络跑出来的 output 并不是直接可用的。为什么? 因为在长序列拆分策略中,网络是“逐块(Chunk)”处理的。它算出来的 3D 坐标全是局部的(Local)。第一个块认为自己是世界中心,第二个块也认为自己是世界中心。
对齐与缝合:
post_process函数就是做几何对齐的。它利用相邻数据块之间的重叠区(Overlap),把所有局部的 3D 点云和相机轨迹,通过 Sim(3) 变换(旋转、平移、缩放)强制拼接到同一个全局绝对坐标系下。消除漂移: 如果之前检测到了闭环(
n_blocks_loop > 0),它还会在这里触发位姿图优化(PGO),把长时间累积的漂移误差狠狠地拉回正轨。
辅助函数 store_agg_state()
backend.py --> forward()
output = [None for _ in range(len(batches))]
batch0 = materialize_payload(batches[0])
B, S = batch0.meta.rgb.shape[:2]
H, W = batch0.meta.H[0].item(), batch0.meta.W[0].item()
N = len(batches)
del batch0
assert B == 1, f'[ERROR] this implementation only supports B=1 for sequential inference, got B={B}.'
探路兵
batch0:程序首先把第一个数据块(batch0)从硬盘或内存深处“具现化(materialize)”出来。提取关键维度:从
batch0的元数据(meta)中,提取出张量的核心维度:B: Batch size(批次大小),这里断言必须等于 1,因为目前做的是单序列的顺序推理。S: Sequence length(序列长度),也就是一个数据块里包含了几张图片(即block_size)。H,W: 图片的高和宽。N: 总数据块的数量。
agg_state_refs = [None for _ in range(len(batches))]
dpt_state_refs = [dict() for _ in range(len(batches))]
dpt_layer_set = set(model.agg_regator.intermediate_layer_idx)深度学习模型(特别是像 Transformer 这种基于序列的模型)在逐层、逐块处理数据时,需要极其精细的状态管理 (State Management)。
agg_state_refs(Aggregator States):这是一个列表,用来存放每个数据块在进入特征聚合网络(Aggregator)时的中间状态。dpt_state_refs(Depth/Point States):用来存放需要跨层传递的特征(往往是用于最后生成深度图或点云的中间层输出)。某层 forward_layer 若需要 temp_output,在 persist_dpt_state 里按 层号 写入
对每个 batch 各建一个「自己的」空字典
agg = attention 主干还在算时要带着跑的状态;dpt = 从指定层「抽出来」给头网络做 dense 预测的特征字典。
dpt_layer_set:从模型的配置中读取,记录“在第几层我们需要把特征提取出来,放进上面的dpt_state_refs里存着”。set 是 Python 的 集合 类型:无序、元素不重复,适合做 「某元素在不在集合里」 的 O(1) 平均查找。
为什么叫
_refs(References)? 因为如果你的数据量非常大,这些中间状态(巨大的张量)可能会爆掉显存。所以代码后续可能会采用一种叫“显存卸载(Offloading)”的技术——把这些状态临时写到硬盘上,内存里只存一个“引用(Reference)”。当需要用的时候,再通过这个引用把它从硬盘里读出来
for b, batch_ref in enumerate(batches):
batch = materialize_payload(batch_ref)
rgb = rearrange(to_cuda(batch.meta.rgb, args.device), 'b n (h w) c -> b n c h w', h=H, w=W)enumerate(batches) 遍历列表 batches,每次给出 (下标, 元素)。
具现化 (
materialize_payload): 为了省内存,前面的阶段可能已经把打包好的数据块卸载(Offload)到硬盘上了,内存里只有一个很小的引用(batch_ref)。这句话就是把真正几十兆的图片数据从硬盘瞬间拉回内存。上显卡 (
to_cuda): 把数据真正送入 GPU 的显存。张量整形 (
rearrange): 这是一句极其优雅的einops代码。图片数据在预处理阶段往往被展平了(h w 混在一起)。这句代码像变形金刚一样,把扁平的数据重新组装成标准的 5 维视频张量格式:B(批次),N(帧数),C(通道数 RGB),H(高度),W(宽度),以符合 3D 卷积或 Vision Transformer 的输入标准。来自
einops.rearrange:用 带名字的维度模式 描述 输入形状 → 输出形状;模式里的
h、w是 符号名,必须和 实际张量尺寸 对上。这里H、W是前面从 batch 里算好的 整型高宽,传给rearrange
agg_state = model.agg_regator.prepare(rgb)
agg_state_refs[b] = store_agg_state(agg_state, args, b)
...
del batch, rgb, agg_state
if should_release_runtime_state(args):
release_memory(args.device)Dino v2 提取特征
存储并换出 (
store_agg_state): 算出来的特征(agg_state)极大,如果把所有数据块的特征都堆在显存里,显卡秒爆。这个函数会把算好的特征安全地存起来(如果配置了--streaming_state,它甚至会把特征写到硬盘上),并返回一个极小的引用。阅后即焚 (
del): 毫不留情地利用 Python 的del关键字,手动删掉刚刚用完的原始图片张量(rgb)、载荷(batch)和特征(agg_state)。倒垃圾 (
release_memory): 光del还没用,显存通常会有碎片。这里底层通常调用了torch.cuda.empty_cache(),强制 PyTorch 把显存归还给操作系统,保证下一个数据块进来时,显存是绝对干净和充足的。
for j in range(model.agg_regator.aa_block_num):
forward_intermediate_layers = collect_intermediate_layers(model, j)
need_forward_outputs = len(forward_intermediate_layers) > 0
...
for b, _ in enumerate(batches):先遍历层 再遍历块
传统思维: 通常我们会把一个数据块(Block)完整地跑完网络的所有层(Layer 0 到 Layer 24),然后再跑下一个数据块。
Scal3R 的特殊设计(Layer-First): 这里是先把所有的数据块都跑完第 j 层,然后再让所有数据块一起进入第 j+1 层。
为什么要这样反直觉? 因为 Scal3R 架构中包含 TTT(测试时训练)。TTT 需要在某一层(比如第 11 层)收集**整个长视频(所有块)**的梯度,计算出一个全局更新的权重,然后再应用给所有块。如果不采用这种“齐头并进”的遍历方式,就无法实现跨块的信息同步。
aa.block_num : 整条交替注意力堆栈一共要调用多少次 forward_layer
collect_intermedia_layers
函数做的事:在 [start_index, start_index+1, …, start_index+aa_block_size-1] 里,筛出 落在 intermediate_layer_idx 集合里的那些层号,返回 list[int]。
为 DPT 分配/填充这些层输出 在做 3D 重建或深度图预测时,不仅需要网络最后一层的高级语义特征,还需要中间层的底层纹理特征
如果 函数为真的话,就在 下面的 batches用 temp_output接住 这一层的输出
agg_state = materialize_payload(agg_state_refs[b])
updated_state = model.agg_regator.forward_layer(
index=j, output=temp_output, **to_cuda(agg_state, args.device),
)提取状态: 把第 b 个数据块在上一层(j-1)算好的状态(
agg_state),从内存或硬盘里具现化并拉回 GPU(to_cuda)。进入注意力层: 调用
forward_layer。在这一层里,视频帧之间会进行密集的 Attention 计算,理解物体在不同视角下的遮挡和运动。算完后,吐出updated_state准备给下一层用。用在「函数调用」里,表示:把右边那个「字典类对象」拆成一组关键字参数; 若:
d = to_cuda(agg_state, args.device) # 假设得到 {"tokens": t, "pos": p, "B": B, ...}则:
forward_layer(index=j, output=temp_output, **d)等价于(键名要合法、且不能与已有参数冲突):
forward_layer(index=j, output=temp_output, tokens=t, pos=p, B=B, ...)也就是:把 agg_state 里各字段当作 forward_layer 的具名参数传进去。
agg_state_refs[b] = store_agg_state(updated_state, args, b)
if temp_output is not None:
persist_dpt_state(temp_output, dpt_state_refs[b], args, b)
del agg_state, updated_state, temp_output
if should_release_runtime_state(args):
release_memory(args.device)存储并卸载 (
store_agg_state&persist_dpt_state): 刚刚算出来的updated_state(本层状态)和temp_output(中间层特征)加起来可能好几 GB。程序绝不允许它们一直待在显卡里。它会立刻把这些张量踢回内存(甚至写入硬盘),在显存里只保留一个极小的“引用指针”。
with torch.amp.autocast('cuda', enabled=False):
if (model.agg_regator.frame_use_ttt or model.agg_regator.global_use_ttt) and j in model.agg_regator.ttt_layer_idx:
apply_ttt(model, agg_state_refs, dpt_state_refs, args, j, dpt_layer_set)autocast('cuda', enabled=False)
就在几行代码之前,我们开启了 test_use_amp(混合精度加速)。为什么到了这里,又要用 enabled=False 强行把它关掉?
致命的梯度消失: 前向推理(Forward)时,张量的数值比较大,用 16 位浮点数(FP16/BF16)算又快又省显存。但是,TTT 的本质是在推理时做了一次微型的反向传播(Backpropagation)。
物理极限: 梯度的数值往往极其微小(比如 $10^{-6}$ 级别)。如果继续用 16 位浮点数来算梯度,这些微小的数值会直接“下溢”变成 0(Gradient Underflow),导致模型学不到任何东西,甚至直接崩溃报 NaN。所以,在这里必须强行切回 32 位最高精度(FP32),这叫**“拿速度换稳定性”**。
apply_ttt 修改什么?
agg_state_refs(每个 block)
agg_state_refs[block_index] = store_agg_state(updated_state, args, block_index)updated_state 来自 ttt_apply,里面的 tokens 会 加上 该层用 更新后的 (w_0,w_1,w_2) 跑 global_blocks[layer_index].ttt(...) 的输出(见 aggregator.ttt_apply 里 tokens += ...)。
因此:每个 block 的 aggregator 状态里的 tokens(以及打包进 dict 的其它字段)被换成 TTT apply 之后的新状态;若开 streaming,则是 换磁盘引用
dpt_state_refs
当 layer_index in dpt_layer_set 时构造 temp_output,ttt_apply 里若 index in self.intermediate_layer_idx 会 改写 output[index](把 原 DPT 特征 与 新 tokens 视图 再 concat)

ttt_update
for b, dpt_state in enumerate(dpt_state_refs):
dpt_state[-1] = dpt_state[model.agg_regator.depth - 1]
remove_payload(agg_state_refs[b])
agg_state_refs[b] = None
``````````````````````````````````````````etc...
clear_dpt_state(dpt_state_refs[b])
del batch, block_output, cam_map
del cam_maps, xyz_map, xyz_cnf, dpt_map, dpt_cnf, rgb_feats, rgb
if should_release_runtime_state(args):
release_memory(args.device)
if recorder is not None:
recorder.record(
f'decoder_block_{b:02d}.done',
block_index=int(b),
offload_outputs=bool(args.offload_outputs),
)
maybe_stop_after(f'decoder_block_{b:02d}.done', args, recorder)
pbar.update()
pbar.close()
if recorder is not None:
recorder.record('decoder.done', n_blocks=int(N))
maybe_stop_after('decoder.done', args, recorder)
return output
执行三大解码头等等
cam_maps = model.cam_decoder(rgb_feats)
xyz_map, xyz_cnf = model.xyz_decoder(
rgb_feats, images=rgb, patch_start_idx=model.agg_regator.patch_start_idx,
)
dpt_map, dpt_cnf = model.dpt_decoder(
rgb_feats, images=rgb, patch_start_idx=model.agg_regator.patch_start_idx,
)aggregator.py --> forward_layer()
def forward_layer(
self,
index: int,
tokens: torch.Tensor,
B: int, S: int, P: int, C: int,
pos: torch.Tensor = None,
cameras: torch.Tensor = None,
camera_dropout: bool = False,
output: dict = None,
):index: 当前正在处理的注意力层的索引(Layer Index)。tokens: 输入的特征张量。B, S, P, C: 描述张量形状的四个核心维度:B (Batch): 批次大小。
S (Sequence): 序列长度(视频帧数或图像组的数量)。
P (Patch): 每张图像被划分的 Patch(块)数量。
C (Channel): 特征的通道维度。
pos: 旋转位置编码(Rotary Position Embedding, RoPE),用于注入空间位置信息。cameras/camera_dropout: 相机内参/外参的先验特征,以及是否应用 Dropout 以防止过拟合。output: 一个字典,用于暂存需要传递给下游解码器(如深度图预测)的中间层特征。
# Perform one layer of attention based on the aa_order
# NOTE: only pure original transformer block here, no TTT is enabled
for attn_type in self.aa_order:代码会遍历
self.aa_order(在初始化中默认为["frame", "global"])
if attn_type == "frame":
tokens, _, frame_intermediates = self._process_frame_attention(
tokens, B, S, P, C, index,
pos=pos, cam=cameras, cam_drop=camera_dropout,
enable_ttt=False,
)触发条件:当遍历到
"frame"时执行。行为:调用
_process_frame_attention方法。该方法会将tokens的形状视作 (B*S, P, C),这意味着注意力机制只在同一帧的 P 个 Patch 之间进行计算。目的:提取和强化单张图像内部的局部纹理和空间几何细节。
返回值:更新后的
tokens,以及提取出的帧内中间特征frame_intermediates。
elif attn_type == "global":
tokens, _, global_intermediates = self._process_global_attention(
tokens, B, S, P, C, index,
pos=pos, cam=cameras, cam_drop=camera_dropout,
enable_ttt=False,
output=output,
)触发条件:当遍历到
"global"时执行。行为:调用
_process_global_attention方法。该方法通常会将tokens的形状视作 (B, S*P, C),让注意力机制跨越时间维度,在所有 S 帧的所有 Patch 之间进行交互。目的:建立跨帧的匹配关系和全局 3D 几何一致性(类似传统 SfM 中的特征点匹配与三角测量)。
返回值:再次更新的
tokens,以及提取出的全局中间特征global_intermediates。
backend.py --> apply_ttt
def apply_ttt(model, agg_state_refs, dpt_state_refs, args, layer_index: int, dpt_layer_set: set[int]):
ttt_order_grad = deepcopy(model.ttt_order[0:1])
ttt_order_grad = [order._replace(use_cached=False)._replace(cache_last=False) for order in ttt_order_grad]
ttt_order_apply = deepcopy(model.ttt_order[-1:])
ttt_order_apply = [order._replace(use_cached=False)._replace(cache_last=False) for order in ttt_order_apply]
w0_grad_sum, w1_grad_sum, w2_grad_sum = None, None, None
shared_tokens = None何为 deepcopy?
model.ttt_order[0:1]:取列表ttt_order的 第 0 个元素,但保持 类型仍是 list(长度为 1),而不是单个元素。deepcopy(...):得到 一份全新的 list,且里面的TTTOperator(或类似 dataclass/namedtuple) 若是 可变嵌套结构,也会被 递归复制;之后对ttt_order_grad里对象的 原地修改(例如_replace、改字段)不会 改到model.ttt_order里那份。newdict = dict(lastdict) 浅拷贝,里面元素是 list 的话会被同步修改
ttt_order 哪来的?
ttt_order 是 Scal3R 模型上的一个 实例属性,在 Scal3R.__init__ 里赋值,来源是 default_ttt_order()。

即:长度为 2 的列表,元素类型是 TTTOperator(在 scal3r/utils/ttt_utils.py 里定义)。语义上是 两阶段默认策略:先 只 update、不 apply,再 只 apply、不 update(具体由 ttt_utils 里读这些字段)
backend.py 中 有 scal3r 的 初始化?
backend.py里没有直接写Scal3R(...)这种「在文件里 new 一个Scal3R」的初始化。模型是在
main()里通过build_sampler_from_config间接建出来的:sampler, dataset_cfg = build_sampler_from_config(args.config, device, args.checkpoint)
_replace 返回一个新实例,只改你传入的字段,其余字段从旧
order拷贝。order._replace(use_cached=False)关闭了所有的缓存. 因为现在是在做跨 Block 的大循环累加,如果不关闭缓存,显存会随着处理的 Block 数量线性爆炸。这是一种极端的显存保护机制
w0_grad_sum, w1_grad_sum, w2_grad_sum = None, None, None
shared_tokens = Nonew0_grad_sum等:TTT 模块内部有三个需要更新的权重矩阵。这里初始化为None,准备在后续的循环中,把每个 Block 算出来的梯度“累加”到这些变量上(即 Gradient Accumulation)。这使得模型能在逻辑上“一次性”看到整个长视频的全局梯度,而物理上却是一块一块处理的。shared_tokens:用于暂存更新权重时需要的共享 Token 引用。
Aggregator 类核心方法 ttt_gradient。
在 Scal3R 的设计中,TTT 分为三个标准步骤:计算梯度(Gradient) -> 更新权重(Update) -> 应用特征(Apply)
第一步:在前向推理时,计算出用于微调模型权重的梯度,但不直接修改权重
""" Only global attention is used for TTT gradient computation """
assert self.global_use_ttt, "Global TTT must be enabled for compute_ttt_gradient"
assert not self.training, "Model must not be in training mode for compute_ttt_gradient"为什么只用全局注意力? 注释明确指出,只有全局(跨帧)注意力会被用于计算 TTT 的梯度。因为 TTT 的目的是让模型在当前视频序列中找到最佳的多视角几何一致性,这必须依赖跨帧的信息交互。
两个
assert断言:必须在配置中开启了全局 TTT (
global_use_ttt)。模型必须处于评估/推理模式 (
not self.training)。TTT 是“测试时”训练,如果在标准训练阶段调用它,会导致计算图和梯度更新的严重混乱。
# Deal with shape things
if tokens.shape != (B, S * P, C):
tokens = tokens.view(B, S, P, C).view(B, S * P, C)
if pos is not None and pos.shape != (B, S * P, 2):
pos = pos.view(B, S, P, 2).view(B, S * P, 2)它将
tokens的形状强制转换为(B, S * P, C)。
# Deal with TTT parameters
ttt_cache, ttt_fastw, ttt_steps = self._get_ttt_state(self.global_ttt_caches[index])模型需要知道当前层(
index)的 TTT 进度。通过访问self.global_ttt_caches,提取出三个核心变量:ttt_cache:历史缓存,用于加速梯度的自回归计算。ttt_fastw:当前的**快速权重(Fast Weights)**状态。ttt_steps:当前已经进行了多少步 TTT 迭代。
# Compute TTT inner loop update gradients, no actual update
w0_grad, w1_grad, w2_grad = self.global_blocks[index].ttt.gradient(
tokens, pos, ttt_order, ttt_cache, ttt_fastw, ttt_steps, self.block_token,
batch_size=B, S=S, P=P, C=C, patch_start_idx=self.patch_start_idx,
)
return w0_grad, w1_grad, w2_grad调用内部梯度函数:将整理好的数据和状态,传入当前层对应的
global_blocks[index].ttt.gradient方法中。返回梯度字典:计算完成后,返回
w0_grad, w1_grad, w2_grad。这些是针对 TTT 模块内部三个不同权重矩阵(通常对应于输入投影、隐藏层变换和输出投影)的梯度值。分离设计的精妙之处:请注意注释
no actual update。这个函数只负责“算”,不负责“改”。这种解耦设计使得系统可以将多个数据块(Batches)的梯度汇总(Gradient Accumulation)后,再统一执行更新(由ttt_update方法负责),这对于处理因显存限制而切分的长视频序列尤为重要。
aggregator.py --> processframe_attention()
def _process_frame_attention(
self, tokens, B, S, P, C, frame_idx,
pos=None, cam=None, cam_drop=False,
ttt_order=None, enable_ttt=True,
):
# If needed, reshape tokens or positions:
if tokens.shape != (B * S, P, C):
tokens = tokens.view(B, S, P, C).view(B * S, P, C).view(...):在 不拷贝数据(张量需 内存连续,否则可能要先 .contiguous())的前提下,只改「形状」解释方式
reshape功能类似可以接受不连续,若不连续 → 往往会 先拷贝成连续再 view,一般能成功,但可能 多一次拷贝。
if self.frame_use_ttt:
ttt_cache, ttt_fastw, ttt_steps = self._get_ttt_state(self.frame_ttt_caches[frame_idx])
else:
ttt_cache = None
ttt_fastw = tuple()
ttt_steps = 0代码首先检查是否启用了帧内 TTT (
self.frame_use_ttt)。如果启用,调用
_get_ttt_state从缓存self.frame_ttt_caches中提取当前层(frame_idx)的 TTT 状态。ttt_cache:用于加速 TTT 计算的缓存数据。ttt_fastw(Fast Weights):这是 TTT 的灵魂!它代表“快速权重”。与模型训练好后固定不变的“慢速权重”不同,快速权重是在推理当前序列时临时计算并叠加在基础权重上的。ttt_steps:记录当前 TTT 优化已经执行了多少步。
# by default, self.aa_block_size=1, which processes one block at a time
for _ in range(self.aa_block_size):
tokens = self.frame_blocks[frame_idx](
tokens, pos, cam, cam_drop,
ttt_order, ttt_cache, ttt_fastw, ttt_steps, self.block_token, enable_ttt, B, S, P, C, self.patch_start_idx,
)
frame_idx += 1
intermediates.append(tokens.view(B, S, P, C))self.aa_block_size:控制每次交替注意力(Alternating Attention)连续执行多少个同类型的 Transformer 块。注释中说明默认值为 1,即“算一层帧内,就算一层全局”,以此交替。self.frame_blocks[frame_idx](...):这是真正执行计算的地方。它调用了具体的 Transformer Block。输入参数极其丰富:除了常规的
tokens和位置编码pos,它还将刚刚准备好的ttt_fastw(快速权重)和控制 TTT 流程的ttt_order传了进去。结合上文讲到的张量形状重塑,此时模型正在并行的处理每一帧图像,提取其内部的局部特征。
收集结果:
frame_idx += 1:指针下移,为下一层做准备。intermediates.append(...):将计算完的tokens重新变回 4D 形状 (B, S, P, C),并存入intermediates列表中。这些中间结果后续会被提供给深度图解码器(DPT Head)使用。
DPT 出特征
scal3r/pipelines/backend.py → forward
初始化
dpt_state_refs = [dict() for _ in range(len(batches))],每个 block 一个 「层号 → 特征」 容器。dpt_layer_set = set(model.agg_regator.intermediate_layer_idx):哪些全局层要参与 DPT / TTT 里对output的读写。
Aggregator 阶段
对每个宏层
j:collect_intermediate_layers(model, j)(同文件):判断本步是否要向output里塞中间特征。model.agg_regator.forward_layer(..., output=temp_output)若 temp_output 非空:
persist_dpt_state(temp_output, dpt_state_refs[b], args, b)(offload_utils)→ 把本 block 本步产生的 各层 concat 特征 写入 dpt_state_refs[b][layer_id](可 offload)。
若该层要做 TTT:
apply_ttt(...)里会persist_dpt_state更新 对应层 在 dpt_state_refs 里的张量(与ttt_apply里改写的output[index]一致)。
Aggregator 结束后
整理传递给解码器的最终特征,并无情地清理掉不再需要的聚合器状态,以释放极为宝贵的显存或内存
for b, dpt_state in enumerate(dpt_state_refs): dpt_state[-1] = dpt_state[model.agg_regator.depth - 1] remove_payload(agg_state_refs[b]) agg_state_refs[b] = Nonedpt_state是一个字典,键(Key)是层号,值(Value)是那一层输出的特征张量。model.agg_regator.depth - 1代表聚合器的最后一层的绝对索引(例如,如果是 24 层的网络,最后一层就是索引 23)。 下游的解码器(比如相机预测头)通常会默认通过键-1来直接获取网络最深层、语义最丰富的全局特征remove_payload(...):因为聚合器(Aggregator)的前向传播和 TTT 权重更新已经彻底结束,这些庞大的状态数据已经完成了历史使命。remove_payload会在底层清理这些数据,如果它们之前被offload到了磁盘或内存,这里会直接将相关文件或缓存抹除。= None:解除 Python 变量对该对象的引用。这样一来,Python 的垃圾回收机制(GC)就会立即介入,将内存完全回收。
Decoder阶段
rgb = rearrange(batch.meta.rgb, 'b n (h w) c -> b n c h w', h=H, w=W).to(args.device) cam_maps = model.cam_decoder(rgb_feats) xyz_map, xyz_cnf = model.xyz_decoder( rgb_feats, images=rgb, patch_start_idx=model.agg_regator.patch_start_idx, ) dpt_map, dpt_cnf = model.dpt_decoder( rgb_feats, images=rgb, patch_start_idx=model.agg_regator.patch_start_idx, )rgb重排: 使用einops.rearrange将原始的 RGB 图像从展平的块状态(h w)还原为标准的 2D 卷积输入形状:(批次 Batch, 帧数 N, 通道 C, 高度 H, 宽度 W),并放入 GPU。相机解码头 (
cam_decoder):仅利用提取到的特征序列 rgb_feats,预测相机的内参和外参(位姿信息)。3D 坐标解码头 (
xyz_decoder):结合深层特征 rgb_feats 和原始图像 rgb,预测场景中每个像素在世界坐标系下的 3D 坐标 (xyz_map),同时输出模型对该预测的置信度 (xyz_cnf)。深度解码头 (
dpt_decoder):原理与xyz_decoder类似,但输出的是每个像素对应的深度图 (dpt_map) 和相应的深度置信度 (dpt_cnf)。
相机预测头(CameraHead)

关键网络组件
该模块在 __init__ 中初始化了几个专门负责不同任务的子网络:
干线网络 (
self.trunk):由多个 Transformer 块(Block)组成的序列(深度默认为 4)。它负责在特征空间中处理和加工相机 Token。空位姿嵌入 (
self.empty_pose_tokens&self.embed_pose):因为预测是迭代的,第一次预测时没有“上一次的位姿”,所以系统学习了一个可优化的零向量作为初始状态。调制生成器 (
self.poseLN_modulation):一个包含 SiLU 激活的 MLP。它接收当前的位姿状态,输出用于特征调制的三个向量:shift(偏移)、scale(缩放)和gate(门控)。预测输出层 (
self.pose_branch):一个将高维特征(如 2048 维)降维映射到目标参数维度(如 9 维)的 MLP。

坐标 和 预测深度
在初始化阶段,
DPTHead搭建了特征重构和融合所需的各个组件:在
DPTHead中,处理这“多层数据”的核心逻辑是构建特征金字塔并进行自顶向下的多尺度融合。它并没有把所有层的数据一股脑全用上,而是经过了精心的挑选和加工。具体来说,
DPTHead对待这多层数据的处理流程可以分为四个关键步骤:1. 精准抽取与过滤 (Select & Filter)
模型并不需要骨干网络每一层的特征,而是挑选了几个具有代表性的“中间层”。
抽层:通过
intermediate_layer_idx(默认通常是[4, 11, 17, 23]),模型只提取这 4 层的特征。这 4 层分别代表了从浅层(高分辨率纹理)到深层(低分辨率全局语义)的阶梯型信息。过滤特殊 Token:提取出来的特征包含了
camera_token、register_token等辅助 Token。代码通过切片x[:, :, patch_start_idx:]将这些非图像特征剔除,只保留纯粹的图像 Patch 特征。
2. 空间重构与特征金字塔化 (Reshape & Pyramid)
ViT 骨干网络输出的是一维的序列(Sequence),必须将其还原为二维的图像结构。
序列转图像:利用
reshape和permute操作,将一维的 Token 序列重新拼接成二维的特征图形状(B*S, C, patch_h, patch_w)。通道投影 (
projects):通过一组 1x1 的卷积层,将这 4 层特征统一映射到预设的通道维度(例如[256, 512, 1024, 1024])。分辨率调整 (
resize_layers):为了后续能够对齐融合,这 4 层特征经过了不同的卷积/反卷积处理,形成了分辨率由大到小的特征金字塔:浅层特征(第 4 层):使用步长为 4 的反卷积进行大幅放大。
中层特征(第 11 层):使用步长为 2 的反卷积进行中幅放大。
中深特征(第 17 层):保持不变(Identity)。
深层特征(第 23 层):使用步长为 2 的卷积进行缩小。
反卷积

3. 倒序残差融合 (Reverse Residual Fusion /
scratch_forward)这是 DPT 架构的灵魂所在,也是它如何“糅合”这 4 层数据的关键。模型采用的是由深到浅、逐级上采样融合的策略:
第一步:取出最深层(第 23 层,语义最强但分辨率最低)的特征。
第二步:将其上采样(放大尺寸),并与次深层(第 17 层)的特征在
refinenet3(特征融合块)中相加,完成第一次融合。第三步:将融合后的结果再次上采样,与第 11 层的特征在
refinenet2中相加。第四步:重复此过程,直到与最浅层(第 4 层,细节最丰富)的特征在
refinenet1中完成最终融合。
巧妙的内存管理:在代码实现中,每完成一次两层的融合,就会立刻使用
del删掉前置层的特征(例如del layer_4_rn, layer_4),从而在处理多层庞大数据时极限节省 GPU 显存。4. 插值对齐与最终预测 (Interpolate & Predict)
经过彻底融合后的特征图,包含了深层的全局几何理解和浅层的局部边缘细节。
代码最后使用
custom_interpolate(双线性插值)将这张特征图强制放大回原始图像的绝对物理分辨率(H x W)。最后送入
output_conv2,将其压缩成目标维度(如深度图维数为 2,3D 坐标维数为 4),从而输出最终的密集几何预测。
总结来说,
DPTHead将多层数据视作不同层级的“积木”:最底层的积木提供大局观(如整体的房间结构),最高层的积木提供边缘细节(如桌子的轮廓)。通过提取、重构尺寸并逐级相加,完美地融合了这些积木,最终生成了高精度的 3D 重建结果。1
Fast-FoundationStereo
run_demo.py
parser = argparse.ArgumentParser()
parser.add_argument('--model_dir', default=f'{code_dir}/../weights/23-36-37/model_best_bp2_serialize.pth', type=str)
parser.add_argument('--left_file', default=f'{code_dir}/../demo_data/left.png', type=str)
parser.add_argument('--right_file', default=f'{code_dir}/../demo_data/right.png', type=str)
parser.add_argument('--intrinsic_file', default=f'{code_dir}/../demo_data/K.txt', type=str, help='camera intrinsic matrix and baseline file')
parser.add_argument('--out_dir', default='/home/bowen/debug/stereo_output', type=str)
parser.add_argument('--remove_invisible', default=1, type=int)
parser.add_argument('--denoise_cloud', default=0, type=int)
parser.add_argument('--denoise_nb_points', type=int, default=30, help='number of points to consider for radius outlier removal')
parser.add_argument('--denoise_radius', type=float, default=0.03, help='radius to use for outlier removal')
parser.add_argument('--scale', default=1, type=float)
parser.add_argument('--hiera', default=0, type=int)
parser.add_argument('--get_pc', type=int, default=1, help='save point cloud output')
parser.add_argument('--valid_iters', type=int, default=8, help='number of flow-field updates during forward pass')
parser.add_argument('--max_disp', type=int, default=192, help='maximum disparity')
parser.add_argument('--zfar', type=float, default=100, help="max depth to include in point cloud")
args = parser.parse_args()A. 基础输入/输出 (I/O)
--model_dir:预训练权重文件的路径。默认指向的是23-36-37这个模型。--left_file/--right_file:左右双目图像。--intrinsic_file:相机内参和基线(Baseline)。划重点:没有这个文件,模型就只能输出视差(像素单位),没法计算真实的深度(米单位)。--out_dir:保存结果的地方。
B. 算法性能调优 (Performance)
--valid_iters(默认 8):GRU 的迭代次数。次数越多越准,但越慢。--max_disp(默认 192):最大搜索视差。如果物体离镜头极近,需要调大。--scale(默认 1):图像缩放。如果显存不够或者想更快,可以设为 0.5。--hiera:是否开启层级(Hierarchical)推理,通常用于处理高分辨率图像。
C. 点云处理 (Post-processing)
--get_pc:是否生成 3D 点云。--remove_invisible:是否剔除左图看不到(由于遮挡或视场差异)的区域。--denoise_cloud:是否对点云进行去噪。点云由于匹配误差经常会有“毛刺”,开启后会调用 Open3D 进行滤波。--zfar:远端裁剪距离。超过这个深度的点(比如背景里的远山)会被丢弃。
model = torch.load(args.model_dir, map_location='cpu', weights_only=False)
model.args.valid_iters = args.valid_iters
model.args.max_disp = args.max_disp
model.cuda().eval()
scale = args.scalemodel.cuda() 这个方法的作用是将模型的所有参数(weights)和缓冲区(buffers)从系统的内存(CPU)移动到显卡的显存(GPU)中。它的含义: 告诉程序:“接下来的所有矩阵运算,请调用 NVIDIA 显卡的 CUDA 核心来跑,不要用 CPU。”
将模型设置为评估/推理模式。
Batch Normalization (BN):在训练时,BN 使用当前 Batch 的均值和方差;在
eval()模式下,它会固定使用训练期间累计的全局滚动平均值。Dropout:在训练时,它会随机丢弃一部分神经元;在
eval()模式下,它会关闭丢弃行为,让所有神经元都参与计算。
因为 .cuda() 和 .eval() 都会返回模型对象本身,所以你可以像“套娃”一样连着写; 可链式调用
img0 = imageio.imread(args.left_file)
img1 = imageio.imread(args.right_file)
if len(img0.shape)==2:
img0 = np.tile(img0[...,None], (1,1,3))
img1 = np.tile(img1[...,None], (1,1,3))
img0 = img0[...,:3]
img1 = img1[...,:3]
H,W = img0.shape[:2]解释在把图片塞给神经网络之前,必须确保它们的格式、通道数和维度是完全一致的。
1.2 行 , 使用
imageio库将图片文件读入为 NumPy 数组。如果
shape的长度是 2(只有 H 和 W),说明这是一张灰度图。np.tile(img0[..., None], (1, 1, 3))(像 EdgeNext 或 ViT 这样的深度学习骨干网络,通常要求输入必须是 3 通道的(RGB)。即使图片本身没颜色,也要伪造出 3 个通道。)...:前面所有轴原样保留,等价于 img0[:, :](对 2D 就是整幅图)。None,表示「在这里插入一个长度为 1 的新轴」。np.tile(a, reps):把a按reps在每个维度上重复拼接(不是广播成更大形状再算别的,就是沿轴堆叠副本)。
img0[...,:3]怎么读...(省略号)
表示「前面所有轴都取全范围」,等价于在...的位置补上足够多的:。若
img0形状是(H, W, 3),则 img0[...,:3] 等价于 img0[:,:,:3],结果仍是(H, W, 3)(若本来就是 3 通道,相当于不变)。若形状是
(H, W, 4)(例如 RGBA),则等价于img0[:,:,:3],丢掉第 4 通道,得到(H, W, 3)。
img0 = cv2.resize(img0, fx=scale, fy=scale, dsize=None)
img1 = cv2.resize(img1, dsize=(img0.shape[1], img0.shape[0]))
H,W = img0.shape[:2]
img0_ori = img0.copy()
img1_ori = img1.copy()
logging.info(f"img0: {img0.shape}")
imageio.imwrite(f'{args.out_dir}/left.png', img0)
imageio.imwrite(f'{args.out_dir}/right.png', img1)
img0 = torch.as_tensor(img0).cuda().float()[None].permute(0,3,1,2)
img1 = torch.as_tensor(img1).cuda().float()[None].permute(0,3,1,2)
padder = InputPadder(img0.shape, divis_by=32, force_square=False)
img0, img1 = padder.pad(img0, img1)解释尺度缩放 (Resizing)
fx=scale, fy=scale:按照你之前设置的args.scale(比如 0.5)进行等比例缩放。缩放是为了在推理速度和精度之间找平衡。备份原始图:
img0_ori = img0.copy()。这很重要,因为后续计算点云时,需要用这张图的颜色来给点云“上色”。
张量化与格式转换 (To Tensor & Permute)
torch.as_tensor(img0):将 NumPy 数组转换为 PyTorch 张量。.cuda():送入显存。.float():转换成浮点型(神经网络不吃uint8整数)。[None]:增加“批次”(Batch)维度。形状从 (H, W, C) 变成 (1, H, W, C)。.permute(0,3,1,2):最关键的一步。OpenCV 的顺序是 H, W, C(高度、宽度、通道),而 PyTorch 的卷积层要求顺序是 N, C, H, W(批次、通道、高度、宽度)。
边界填充 (Input Padding)
为什么要补齐? 深度学习模型(如 FFS 使用的 EdgeNext)通常包含多次下采样(Pooling/Stride Conv)。如果模型下采样 5 次,那么输入尺寸必须是 25 = 32 的倍数。
举例:如果缩放后的图像是 400 * 300,它不能被 32 整除。
InputPadder会自动计算,将其填充(补黑边或复制边界)到 416 * 320。
logging.info(f"Start forward, 1st time run can be slow due to compilation")
with torch.amp.autocast('cuda', enabled=True, dtype=AMP_DTYPE):
if not args.hiera:
disp = model.forward(img0, img1, iters=args.valid_iters, test_mode=True, optimize_build_volume='pytorch1')
else:
disp = model.run_hierachical(img0, img1, iters=args.valid_iters, test_mode=True, small_ratio=0.5)
logging.info("forward done")
disp = padder.unpad(disp.float())
disp = disp.data.cpu().numpy().reshape(H,W).clip(0, None)混合精度加速
推理策略:常规 vs. 层级
A. 常规模式 (
model.forward)这是标准流程,直接在当前分辨率下进行立体匹配。
iters: 之前提到的 GRU 迭代次数,决定了优化的精细度。test_mode=True: 告诉模型:“我现在是在考试,直接给我最终答案(最后的视差图)就好,不需要把中间过程的草稿纸(中间迭代结果)传回来。”optimize_build_volume: 指定构建成本体积的算法实现。
B. 层级模式 (
model.run_hierachical)这是处理高分辨率图像的利器。
原理:先将图片缩小(
small_ratio=0.5)跑一次,得到一个粗略的深度分布;然后利用这个“粗略答案”作为指引,在原图分辨率下进行精细匹配。优点:能更好地处理巨大的视差(Disparity),提高对大片平滑区域的匹配稳定性。
格式转换
padder.unpad(...): 还记得之前为了凑 32 的倍数补的“黑边”吗?这一步会精准地把它们切掉,恢复到原始的 H*W 尺寸。.float(): 强制转回全精度,避免后续计算出现舍入误差。.cpu().numpy(): 把数据从 GPU 搬回 CPU 内存,并转成我们熟悉的 NumPy 格式。.reshape(H, W): 将形状从 (1, 1, H, W) 压缩为二维的 (H, W) 矩阵。.clip(0, None): 这是一个物理常识过滤。视差(Disparity)不可能是负数,这行代码确保所有负值(通常是模型的一点噪声)都被修正为 0。
cmap = None
min_val = None
max_val = None
vis = vis_disparity(disp, min_val=min_val, max_val=max_val, cmap=cmap, color_map=cv2.COLORMAP_TURBO)
vis = np.concatenate([img0_ori, img1_ori, vis], axis=1)
imageio.imwrite(f'{args.out_dir}/disp_vis.png', vis)
s = 1280/vis.shape[1]
resized_vis = cv2.resize(vis, (int(vis.shape[1]*s), int(vis.shape[0]*s)))
cv2.imshow('disp', resized_vis[:,:,::-1])
cv2.waitKey(0)解释将模型计算出的“冷冰冰”的数值矩阵(视差图)转化成直观的彩色深度图,并将其与原始照片拼在一起显示出来
五维代价体积的 3维卷积 的具体过程
数据的形状:5 维张量 (5D Tensor)

3D 卷积的“立方体”滑动过程

foundation_stereo.py --> "外层" forward()
def forward(self, image1, image2, iters=12, test_mode=False, low_memory=False, init_disp=None, profile=False, optimize_build_volume='pytorch1'):
""" Estimate disparity between pair of frames """
B,C,H,W = image1.shape
low_memory = low_memory or (self.args.get('low_memory', False))
image1 = normalize_image(image1)
image2 = normalize_image(image2)
with torch.amp.autocast('cuda', enabled=self.args.mixed_precision, dtype=U.AMP_DTYPE):
out = self.feature(torch.cat([image1, image2], dim=0))
features_left = [o[:B] for o in out]
features_right = [o[B:] for o in out]
stem_2x = self.stem_2(image1)图像标准化 (
normalize_image)将图像像素值从 [0, 255] 转化到符合 ImageNet 统计分布的范围(通常是均值为 0,方差为 1 左右)
torch.cat([image1, image2], dim=0)
torch.cat(tensors, dim=d):在第 d 维上,把多个张量首尾相接拼成一条更长的轴(其它维尺寸必须一致)。dim=0:在**第 0 维(batch 维)上拼接。(torch.cat([image1, image2], dim=0)→(2B, C, H, W))self.feature(...):把拼好的张量送进同一个特征网络(通常是 ResNet/DINO 等),得到多尺度特征列表。这样做的合理性
左右共用同一套
self.feature、同一组权重:左、右应经过完全相同的编码;拆成两次self.feature(image1)和self.feature(image2)在数学上等价于一次feature(cat([image1, image2], dim=0))再切开(在典型批量归一化推理/固定统计下行为一致;训练时若 BN 依赖 batch 统计,大 batch 会略有差异,但立体匹配里常见做法是共享 backbone,这样写是标准技巧)。
if optimize_build_volume=='pytorch1':
gwc_volume = build_gwc_volume_optimized_pytorch1(features_left[0], features_right[0], self.args.max_disp//4, self.cv_group, normalize=self.args.normalize)
elif optimize_build_volume=='triton':
gwc_volume = build_gwc_volume_triton(features_left[0], features_right[0], self.args.max_disp//4, self.cv_group, normalize=self.args.normalize)
else:
raise RuntimeError(f"Invalid optimize_build_volume: {optimize_build_volume}")
left_tmp = self.proj_cmb(features_left[0])
right_tmp = self.proj_cmb(features_right[0])
concat_volume = build_concat_volume_optimized_pytorch1(left_tmp, right_tmp, maxdisp=self.args.max_disp//4)
del left_tmp, right_tmp
comb_volume = torch.cat([gwc_volume, concat_volume], dim=1)
del concat_volume, gwc_volume解释构建代价空间(Cost Volume)
构建 GWC 体积 (分组相关性 - 算相似度)
GWC (Group-wise Correlation) 是一种计算左右图特征“内积”的方法。它把特征通道分成几组(
cv_group),然后在每个给定的视差(d ∈ [0,max_disp//4])下,计算左图和右图特征的相似度。数值越大,说明越匹配。为什么是
max_disp//4? 因为输入的特征图 features_left[0] 分辨率已经是原图的 1/4(这一缩放是在 self.feature(Feature.forward)里完成的),所以搜索范围也要相应缩小到 1/4
构建 Concat 体积 (特征拼接 - 存全局语义)
self.proj_cmb
为什么需要这一层?
d_out[0] 较大:直接用它建 concat volume 会让 3D 卷积(
corr_stem等) 的输入通道变胖,需对特征进行降维/压缩。1×1 投影:把参与 concat 分支 的特征压到固定 12 维,使 concat 代价体固定为 24 通道,和网络其它超参(如
volume_dim、corr_stem)对齐。
定义 self.proj_cmb = nn.Conv2d(self.feature.d_out[0], self.concat_channel//2, kernel_size=1, padding=0)
类型:1×1 卷积(nn.Conv2d,kernel_size=1),只做逐像素线性变换
输入通道:self.feature.d_out[0],即 Feature 输出的第一档特征 features_*[0] 的通道数(EdgeNeXt+FPN 里最“细”、分辨率最高的那层,约为 1/4 尺度(通道数最多))
输出通道:
self.concat_channel // 2= 24 // 2 = 12。左右 共用同一个 proj_cmb,保证左右特征在同一低维空间里
concat_volume = build_concat_volume_optimized_pytorch1(left_tmp, right_tmp, maxdisp=self.args.max_disp//4)
为什么要用 Concat? GWC 算的是“相对相似度”,这会导致一个问题:两个全黑的像素(比如阴影),它们算出来的相似度也很高。模型会犯迷糊。
输入是 已对齐 的左右特征(本项目里
proj_cmb之后的 1/4 尺度):refimg_fea:(参考)图特征(B, C, H, W)

comb_volume = torch.cat([gwc_volume, concat_volume], dim=1)
将“算相似度”的 GWC 体积和“存语义”的 Concat 体积在通道维度(
dim=1)合并,形成一个终极的 组合成本体积 (comb_volume)。这让模型既具备精确的局部匹配能力,又具备全局的语义推理能力
信息融合、降维压缩和初步去噪
comb_volume = self.corr_stem(comb_volume)其中,定义为
self.corr_stem = nn.Sequential(
nn.Conv3d(self.proj_cmb.out_channels*2+self.cv_group, volume_dim, kernel_size=1),
BasicConv(volume_dim, volume_dim, kernel_size=3, padding=1, is_3d=True),
ResnetBasicBlock3D(volume_dim, volume_dim, kernel_size=3, stride=1, padding=1),
ResnetBasicBlock3D(volume_dim, volume_dim, kernel_size=3, stride=1, padding=1),
)corr_stem的任务就是对这个巨大的 4D 张量(通道、视差、高度、宽度)进行信息融合、降维压缩和初步去噪nn.Conv3d
proj_cmb.out_channels*2:这是左图特征和右图特征拼接(Concat)后的通道数kernel_size=1的作用:在 3D 空间中,1x1x1 卷积不改变空间和视差的分辨率,它的唯一作用是跨通道混合信息。它把相似度分数(GWC)和语义特征(Concat)揉捏在一起。降维 (
volume_dim):3D 卷积极其消耗显存!这里强行把臃肿的输入通道数压缩到一个固定的、较小的值(volume_dim,通常在 Fast 架构中是 28 或 32)。这不仅提取了精华,还是 FFS 能实现实时推理的核心显存优化手段。、
BasicConv(volume_dim, volume_dim, kernel_size=3, padding=1, is_3d=True)
3D 视野:它在 (H, W, Disparity) 三个维度上同时滑动一个 3x3x3 的立方体窗口(一共有五维 B, H, W, C, D)。
在 H 和 W 维度上,它能平滑图像的噪点。
在 Disparity 维度上,它能平滑匹配概率。如果视差为 10 的概率很高,那么它会“顺便”让视差为 9 和 11 的概率也稍微提高,让深度分布变得连续。
图像特征引导的注意力机制(Feature-Guided Attention)
comb_volume = self.corr_feature_att(comb_volume, features_left[0])这句代码
comb_volume = self.corr_feature_att(comb_volume, features_left[0])是立体匹配算法中一个非常经典且极其有效的设计,被称为 “图像特征引导的注意力机制(Feature-Guided Attention)”。在经历上一层
corr_stem的 3D 卷积处理后,comb_volume已经是一个包含了丰富匹配信息的 4D 空间,但它有一个致命的弱点:它容易“忘本”。在构建这个 3D 空间时,原始图像的结构信息(哪里是边缘,哪里是平滑的墙面)被淡化了。这句代码的任务,就是把左图(参考图)的 2D 图像特征作为“向导”,重新注入到 3D 匹配空间中。
类定义
class FeatureAtt(nn.Module):
def __init__(self, cv_chan, feat_chan):
super(FeatureAtt, self).__init__()
self.feat_att = nn.Sequential(
BasicConv(feat_chan, feat_chan//2, kernel_size=1, stride=1, padding=0),
nn.Conv2d(feat_chan//2, cv_chan, 1)
)
def forward(self, cv, feat):
'''
@cv: cost volume (B,C,D,H,W)
@feat: (B,C,H,W)
'''
feat_att = self.feat_att(feat).unsqueeze(2) #(B,C,1,H,W)
cv = torch.sigmoid(feat_att)*cv
return cv在 PyTorch 里,tensor.unsqueeze(dim) 表示:在维度下标 dim 的位置插入一个长度为 1 的新轴,不改变实际数据,只改「形状怎么解释」。
输入
feat:也就是传入的features_left[0]。它是骨干网络提取的左图 1/4 分辨率特征图(未做代价体积的原图)(包含形状、纹理、颜色分布等信息),形状是(B, C_feat, H, W)。降维映射:通过两个 1x1 卷积(先压缩到一半通道,再映射到和代价体积一样的通道数
cv_chan),网络从左图中提取出一张 “注意力权重图”。此时它的形状是(B, C_cv, H, W)。广播(Broadcasting):在随后的乘法中,这个权重图会沿着
D维度复制(广播)。意思是:对于左图上的某一个像素 (H, W),无论它在右图的哪个视差 D 位置进行匹配,都共享同一个注意力权重。
Sigmoid激活:把刚才算出来的注意力权重压缩到0 到 1之间。相当于生成了一张“滤网”。
* cv(逐元素相乘):将滤网盖在庞大的 3D 代价体积上。
代价聚合(Cost Aggregation)
comb_volume = self.cost_agg(comb_volume, features_left)在立体匹配中,这一步被称为代价聚合(Cost Aggregation)。它使用的是一个经典的 3D 沙漏网络(Hourglass / U-Net 架构)。
如果说前面的代码是“在局部找相似”,那么这段代码的任务就是“联系全局上下文,修正局部错误”。比如:一面纯白的墙,局部看哪里都一样(匹配概率乱七八糟),沙漏网络通过“看大局”,依靠墙边缘的深度,把整面墙的深度推导出来。
def forward(self, x, features):
conv1 = self.conv1(x)
conv1 = self.feature_att_8(conv1, features[1])
conv2 = self.conv2(conv1)
conv2 = self.feature_att_16(conv2, features[2])
conv3 = self.conv3(conv2)
conv3 = self.feature_att_32(conv3, features[3])
if not hasattr(self, 'post32_to_16') or self.post32_to_16 is None:
conv3_up = self.conv3_up(conv3)
conv2 = torch.cat((conv3_up, conv2), dim=1)
conv2 = self.agg_0(conv2)
conv2 = self.feature_att_up_16(conv2, features[2])
else:
conv2 = self.post32_to_16(conv2, conv3, features[2])
if not hasattr(self, 'post16_to_8') or self.post16_to_8 is None:
conv2_up = self.conv2_up(conv2)
conv1 = torch.cat((conv2_up, conv1), dim=1)
conv1 = self.agg_1(conv1)
conv1 = self.feature_att_up_8(conv1, features[1])
else:
conv1 = self.post16_to_8(conv1, conv2, features[1])
conv = self.conv1_up(conv1)
if not hasattr(self, 'post8_to_4') or self.post8_to_4 is None:
x = self.conv_patch(x)
x = self.atts["4"](x)
x = F.interpolate(x, scale_factor=4, mode='trilinear', align_corners=False)
conv = conv + x
conv = self.conv_out(conv)
else:
conv = self.post8_to_4(x, conv)
return conv
A. 下采样(Encoder:获取全局视野)
输入的
comb_volume是 1/4 原图分辨率。conv1-->conv2-->conv3:利用stride=2的 3D 卷积,将空间分辨率依次压缩到 1/8、1/16、1/32。物理意义:随着分辨率越来越小,网络“一眼”能看到的实际图像范围越来越大(感受野变大)。在 1/32 的极小分辨率下,网络能理解诸如“这是一整块天空”或“这是一辆完整的车”这样的全局语义,从而解决大面积无纹理区域的匹配难题。
分辨率是什么?
数字图像是由无数个色彩点组成的方阵,这些点被称为“像素”。当你改变分辨率时,你最直接改变的是这个方阵的行数和列数。
改变分辨率并不是简单的“变大变小”,而是一个数学计算过程。当你强制改变分辨率时,计算机会面临以下问题:
缩小图像: 必须丢弃一部分像素。为了不让图像失真,计算机会对邻近像素进行加权平均。
放大图像: 需要“凭空”创造出原本不存在的像素。
B. 上采样与跳跃连接(Decoder:恢复高清边缘)
conv3_up-->conv2_up-->conv1_up:利用 3D 反卷积(Deconv),把分辨率一步步从 1/32 还原回 1/4。torch.cat(跳跃连接):在还原的过程中,如果只用极度压缩的信息,物体的边缘会糊成一团。因此,代码使用了经典的 U-Net 技巧:比如把反卷积得到的 1/16 特征,和下采样时保留下来的 1/16 原汁原味特征(conv2)拼接在一起,再通过agg_0融合。这保证了既有全局大局观,又有局部清晰度。内部经典操作:先 kernel_size = 1 融合 通道C,然后 经过 三维卷积,视差上的跨度尤其大 (kernel_disp=17)
FFS 优化细节
在普通的模型里,图像特征只在最开始注入一次。但在 FFS 的沙漏网络里 它在每一个分辨率层级(1/8, 1/16, 1/32),无论是下楼还是上楼,都把对应分辨率的左图 2D 特征(
features[1], features[2], features[3])拿过来相乘过滤一遍!conv1 = self.feature_att_8(conv1, features[1]) conv2 = self.feature_att_16(conv2, features[2]) conv3 = self.feature_att_32(conv3, features[3]) ... conv2 = self.feature_att_up_16(conv2, features[2])
非对称卷积减负 (Conv3dNormActReduced)
传统的 3D 卷积在处理代价体积(Cost Volume)时极其缓慢,因为它需要同时在三个维度(视差 D、高度 H、宽度 W)上滑动机法。
Conv3dNormActReduced的核心思想是 “空间与视差的因子分解(Factorization)”def forward(self, x): # x 形状: (B, C, D, H, W) x = self.conv1(x) # 负责平面 H, W x = self.conv2(x) # 负责纵深 D return x
深度在物理世界中通常是连续变化的。长卷积实际上是在 D 轴上施加了一个平滑先验。
抗噪: 17 个位置的上下文信息被强制融合。这意味着某个位置想要成为“赢家”,不仅自己概率要高,还得看周围兄弟节点的支持。
抹掉假峰: 孤立的噪声峰值因为缺乏周围权重的支持,在经过这种长距离的加权平均后,会被周围的低概率值“稀释”掉。
视差 Transformer 旁路
conv = self.conv1_up(conv1) if not hasattr(self, 'post8_to_4') or self.post8_to_4 is None: x = self.conv_patch(x) x = self.atts["4"](x) x = F.interpolate(x, scale_factor=4, mode='trilinear', align_corners=False) conv = conv + x conv = self.conv_out(conv) else: conv = self.post8_to_4(x, conv)self.conv_patch = nn.Sequential( nn.Conv3d(in_channels, in_channels, kernel_size=4, stride=4, padding=0, groups=in_channels), nn.BatchNorm3d(in_channels), )self.atts = nn.ModuleDict({ "4": CostVolumeDisparityAttention(d_model=in_channels, nhead=4, dim_feedforward=in_channels, norm_first=False, num_transformer=4, max_len=self.cfg['max_disp']//16), })在代码实现中,
hourglass的主线任务是通过一系列 3D 反卷积(conv1_up,conv2_up等)将压缩的特征还原。“旁路”逻辑如下:
输入分流:它直接取沙漏网络的输入
x(即原始的 1/4 分辨率代价体积)。并行处理:它不经过复杂的沙漏下采样/上采样,而是通过一个轻量级的 conv_patch 快速下采样,然后进入
CostVolumeDisparityAttention(视差注意力模块)。ormer 只在这个维度上结果融合:处理完的结果经过插值还原后,直接通过
conv = conv + x加回到主线特征中。
这就像在主干道旁边修了一条高架快车道,专门处理主干道(卷积层)处理不好的特殊信息。
核心机制:针对“视差轴”的注意力

为什么要专门处理“视差轴”?
在复杂的场景下(如重复纹理、半透明物体、细小前景),代价值在 D 轴上往往会出现多个峰值。
卷积的局限:3D 卷积只能通过局部的加权平均来平滑峰值,容易被错误的强假峰(False Positive)带偏。
Transformer 的优势:它具有全局感受野。通过注意力机制,它能分析整个视差概率分布的形状。如果它发现远处的视差分布更符合全局几何逻辑,它可以利用注意力权重直接加强远处的真正峰值,并抑制近处的假峰。
for i in range(len(self.sa)): x = self.sa[i](x, window_size=window_size)自注意力(Self-Attention):在这个像素的深度轴上,视差 10 处的特征会去“观察”视差 50、视差 100 处的特征。 (空间维度 (H, W) 和批次 B 合并了)
解决多峰歧义:在面对重复纹理(比如一排长得一模一样的栅栏)或透明反光物体时,代价体积在 D 轴上往往会算出好几个高分(假峰值)。3D 卷积由于视野有限(只看周围几个点),很容易选错。
降维打击:Transformer 拥有全局感受野。它能一次性统观所有视差的得分分布,结合自身的 C 通道语义,瞬间判断出:“虽然视差 50 和 100 得分都很高,但根据全局语义逻辑,视差 50 才是真身,100 是玻璃反光产生的倒影”。于是,它会通过注意力权重,狠狠地压制视差 100 的得分,提拔视差 50。
对于官方的 nn.Transformer ,输入为 (N, L, C);
N(Batch/独立样本数):这部分数据是完全平行、互不干扰的,L(Sequence Length/序列长度):Transformer 只在这个维度上计算注意力(互相观察)。所以这里 B * H * W 是独立的
为什么要加上 BatchNorm?
nn.BatchNorm3d(C) 的实现原理

为什么 conv_patch 需要加 BatchNorm?
A. 为 Transformer 准备“标准化”输入
conv_patch之后紧接着就是CostVolumeDisparityAttention(基于 Transformer 的自注意力模块)。注意力敏感性:Transformer 内部的
Softmax函数对输入数值的绝对大小非常敏感。如果输入值过大,Softmax会进入饱和区,导致梯度消失。标准化作用:BN 确保了送入 Transformer 的“Token”特征分布稳定,使得注意力权重的计算更加平滑且有效
B. 沙漏主路(
conv)经过了多层BasicConv处理,每一层内部通常都带有 BN。逻辑一致性:为了让两路特征在相加(Element-wise sum)时不会因为量级差异太大(例如一路是 1.0 左右,另一路是 100.0 左右)而导致其中一路被“淹没”,旁路也必须使用 BN 来保证两者的特征分布处于相似的统计区间。
为什么 BasicConv 中也要加 BatchNorm?
痛点:在训练时,前一层权重的微小更新,会导致输出的数据分布发生变化。这种变化经过几十层网络的传递和放大,会让后面的网络层无所适从,导致训练极度不稳定。
BN 的作用:它像一个“稳压器”,强行把每一次卷积输出的特征分布拉回到均值为 0,方差为 1 的标准状态。这确保了无论网络多深,每一层接收到的数据都是可预期的,从而防止了梯度的消失或爆炸。
转化 2D 视差图
logits = self.classifier(comb_volume).squeeze(1)
prob = F.softmax(logits, dim=1)
if init_disp is None:
init_disp = disparity_regression(prob, self.args.max_disp//4) 解释self.classifier
self.classifier = nn.Sequential( BasicConv(volume_dim, volume_dim//2, kernel_size=3, padding=1, is_3d=True), ResnetBasicBlock3D(volume_dim//2, volume_dim//2, kernel_size=3, stride=1, padding=1), nn.Conv3d(volume_dim//2, 1, kernel_size=7, padding=3), )先将通道数降维到一半,再经过残差块
最后经过 Conv3d, 其通道数变为 1, 给出一个具体的打分;用巨大的
kernel_size=7
在视差维度(
dim=1,即 D 轴)上应用 Softmax 函数。disparity_regression

效果:假设视差 10 的概率是 0.6,视差 11 的概率是 0.4。如果是硬取最大,视差就是 10。但在 Soft-Argmax 下,视差会是 10 0.6 + 11 0.4 = 10.4。
视差微调(Refinement)
cnet_list = self.cnet(features_left[0], features_left[1], features_left[2])
cnet_list = list(cnet_list)
net_list = [torch.tanh(x[0]) for x in cnet_list]
inp_list = [torch.relu(x[1]) for x in cnet_list]
inp_list = [self.cam(x) * x for x in inp_list]
att = [self.sam(x) for x in inp_list]cnet
self.cnet = ContextNetSharedBackbone(args, c04=self.feature.d_out[0], c08=self.feature.d_out[1], c16=self.feature.d_out[2], output_dim=[args.hidden_dims, context_dims])class ContextNetSharedBackbone(nn.Module): def __init__(self, args, c04, c08, c16, output_dim=[(128,128,128), (128,128,128)], norm_fn='batch', downsample=3): super().__init__() self.args = args self.conv04 = nn.ModuleList([ nn.Conv2d(c04, output_dim[0][0], kernel_size=3, padding=1), nn.Conv2d(c04, output_dim[1][0], kernel_size=3, padding=1), ]) def forward(self, x4, x8, x16): outputs04 = [] for i in range(len(self.conv04)): outputs04.append(self.conv04[i](x4)) return (outputs04,)这是这段代码的核心。它包含了两个并行的 3 * 3 卷积层。它们都接收 1/4 分辨率的特征(
c04),但输出不同的维度:第一个卷积 (
output_dim[0][0]):负责生成 GRU 的初始隐藏状态(Hidden State)。对应之前参数args.hidden_dims。第二个卷积 (
output_dim[1][0]):负责生成 GRU 的上下文输入(Context Feature)。对应之前参数context_dimshidden_dims 是配置里的一个 列表/元组,表示 迭代更新视差(GRU / update_block 一路)里「隐状态」在各层的通道宽度
context_dims:本质就是 args.hidden_dims 的别名
(第一个卷积被训练成了“记忆提取器”,第二个卷积被训练成了“当前线索提取器”。)

加上逗号后,再加上(), 返回一个元组
net_list = [torch.tanh(x[0]) for x in cnet_list]
提取 x[0], 即给即将上场的 GRU 准备的初始隐藏状态(Hidden State)
torch.tanh
双曲正切函数的数学公式如下:

具有以下三个极其关键的特性:
严格的边界 [-1, 1]:无论输入的数值有多极端(比如输入 10000),输出也只会被无限压缩逼近 1;如果输入 -10000,输出就无限逼近 -1。
零中心化(Zero-centered):当输入 x = 0 时,输出 y = 0。并且它在原点附近是高度对称的(中心对称的奇函数)。
非线性变形:在靠近 0 的区域(大约 [-2, 2]),它近似于一条直线(线性),这意味着特征能原汁原味地传递
在此使用,可以 避免梯度爆炸

BN 不能放在循环神经网络中 。 在 GRU 中,第 1 次迭代的记忆分布和第 15 次迭代的记忆分布是截然不同的。 如果你为所有迭代步骤共享同一个均值和方差,这在数学上是完全错误的,因为记忆在不断演化。
可以用layernorm,都能在不污染他人记忆的前提下,稳定自身的数值分布. 但这里为了效率,直接用 函数映射了
线性整流函数(Rectified Linear Unit)
通道注意力机制(Channel Attention Mechanism)
inp_list = [self.cam(x) * x for x in inp_list]self.cam(x)的任务是输出一个 (B, C, 1, 1) 的权重向量,也就是给每一个特征通道打一个 0 到 1 之间的分数. 然后将 inp_list 乘以权重
空间注意力机制(Spatial Attention Mechanism)
avg_out = torch.mean(x, dim=1, keepdim=True) max_out, _ = torch.max(x, dim=1, keepdim=True)动作:在通道维度 (
dim=1) 上进行操作。对于图像上的任意一个坐标点 (h, w),它背后本来有 C 个特征数值。torch.mean(平均值):把这 C 个数值加起来求平均。物理意义:计算这个像素点上“所有特征激活的平均强度”。torch.max(最大值):找出这 C 个数值里最大的那个。物理意义:寻找这个像素点上“最强烈的单一显著性特征”。结果:原本厚厚的特征张量 (B, C, H, W),被无情地拍扁成了两张单通道的灰度图 (B, 1, H, W)。
拼接与大视野审视 (7 * 7 卷积)
return self.sigmoid(x)
通过
Sigmoid函数,把卷积出来的数值强行压缩到 0 到 1 之间。最终产出:一张黑白分明的 2D 空间热力图(Heatmap)。数值越接近 1(越亮),代表这个像素点在立体匹配中越关键、越容易出错(比如复杂的遮挡边缘、树叶交错处);数值越接近 0(越暗),代表这个区域越平坦简单(比如一大片纯白色的墙)
准备 GRU 迭代微调
geo_fn = Combined_Geo_Encoding_Volume(features_left[0].to(self.dtype), features_right[0].to(self.dtype), comb_volume.to(self.dtype), num_levels=self.args.corr_levels)
b, c, h, w = features_left[0].shape
coords = torch.arange(w, dtype=torch.float, device=init_disp.device).reshape(1,1,w,1).repeat(b, h, 1, 1)
disp = init_disp.to(self.dtype)
disp_preds = []Combined_Geo_Encoding_Volume: 将左图特征、右图特征以及之前辛苦算好的代价体积(
comb_volume)打包,构建一个geo_fn(几何编码体积函数)在初始化伊始,它会调用静态方法
corr处理左右图的初始特征图(init_fmap1和init_fmap2):全对相关性计算:利用torch.einsum计算左图每个像素与右图同水平线上所有像素的内积。结果形状:生成的init_corr形状为 (B, H, W1, 1, W2),代表了原始的、未经神经网络进一步加工的几何匹配强度。处理原始相关性矩阵:将
init_corr变形为 (B* H * W, 1, 1, W2); 将输入的geo_volume(形状为 (B, C, D, H, W))重排并变形为 (B * H * W, C [init = 1], 1, D)。self.geo_volume_pyramid.append(geo_volume) self.init_corr_pyramid.append(init_corr) for _ in range(self.num_levels-1): geo_volume = F.avg_pool2d(geo_volume, [1,2], stride=[1,2]) self.geo_volume_pyramid.append(geo_volume) for _ in range(self.num_levels-1): init_corr = F.avg_pool2d(init_corr, [1,2], stride=[1,2]) self.init_corr_pyramid.append(init_corr)标准的 2D 池化接收的格式是
(Batch, Channels, Height, Width)。当我们对形状为(N, C, 1, D)的张量应用F.avg_pool2d(..., [1,2], stride=[1,2])时,在 Height (高度) 维度上:核大小是1,步长也是1。因为高度本身就是 1,所以它在这个维度上什么都没做。在 Width (视差 D) 维度上:核大小是2,步长也是2。这意味着它会两两一组,且不重叠地框住相邻的数据。
获取当前工作分辨率(通常是原图的 1/4)下的批次大小 b、通道数 c、高度 h 和宽度 w。
构建基准 X 坐标网格
torch.arange(w):生成一个从 0 到 w-1 的数列(比如 0, 1, 2... 79)。.reshape(1,1,w,1)和.repeat(b, h, 1, 1):把这根一维的线,沿着高度方向(h)和批次方向(b)复制,铺满整个屏幕。最终得到形状为
(b, h, w, 1)的张量。对于图像上的任意一点 (y, x),这个张量里存的值就是它的绝对横坐标 x。
将上一阶段 Soft-Argmax 算出来的初稿视差图(
init_disp),正式赋值给工作变量disp。初始化一个空列表。物理意义(深层监督):在接下来的每一次 GRU 循环(比如迭代 12 次)结束后,都会把当前更新好的
disp存进这个列表里在训练阶段,模型不会只拿着最后一次的结果去算误差(Loss),而是会把这 12 次所有的中间结果都拿去和真实深度图(Ground Truth)算误差,这被称为深层监督(Deep Supervision)。它能迫使网络在初期的几次迭代中就快速向正确答案靠拢,大幅加速模型收敛,并防止梯度消失
ConvGRU
门控
如果没有门控,神经网络就像一个没有记忆滤网的漏斗,旧的信息很快会被新信息冲刷掉(这就是 RNN 梯度消失的原因)。
门控机制通常由两个部分组成:一个 Sigmoid 函数 和 一个逐元素乘法
在具体的模型里,这些“门”各司其职
遗忘门/重置门(Forget/Reset Gate): “断舍离”。决定哪些旧的垃圾记忆该扔掉了。
更新门/输入门(Update/Input Gate): “记笔记”。决定哪些新的信息值得被存入大脑。
输出门(Output Gate): “深思熟虑”。决定当前大脑里的所有信息,哪些应该拿出来作为当前的结论。
门控机制的出现,本质上是为了解决长程依赖问题。
在传统的神经网络中,信息传递是一层压一层,传递路径越长,梯度就越容易在连乘中变成 0(梯度消失)。门控机制通过“加法更新”而非单纯的“乘法堆叠”,为梯度提供了一条“高速公路”,使得模型能够学习到几百甚至上千个时间步之前的关键信息。
重置门
重置门 rt 的计算方式与其它门类似,都是基于当前输入 xt 和上一个时刻的隐藏状态 ht-1

ht-1 表示上一时刻的隐藏状态,xt 表示现在输入给模型的数据,Wr 是可学习的权重矩阵,矩阵乘法,它将高维的输入映射到一个特征空间,算出每一个记忆维度的“相关性得分”。
Sigmoid 函数,它的唯一任务是把计算结果压缩到 (0, 1) 之间 ; 结果趋近于 1:代表“完全保留”,门是大开的。结果趋近于 0:代表“完全抹除”,门是关闭的
rt 是一个与隐藏状态维度相同的向量,是一个信号 . 如果你看到 rt 接近 0:说明模型认为之前的记忆ht-1 跟当前的任务没关系,直接“重置”掉,


重置门最关键的一步发生在计算 候选隐藏状态 的时候。公式如下
更新门
GRU

ConvGRU
为什么要从 GRU 升级到 ConvGRU?
在标准的 GRU 中,输入 Xt 和隐藏状态 Ht-1 必须被拉平成一维向量。
后果: 图像中像素与像素之间的空间位置关系(比如左边和右边是相连的)被彻底破坏了。
ConvGRU 的对策: 保持输入是三维张量 (Channels, Height, Width),用一个滑动窗口(卷积核)去扫描。这样,模型在记住时间的同时,也能记住物体的形状和位置。
迭代微调循环(Refinement Loop)
for itr in range(iters):
disp = disp.detach()
geo_feat = geo_fn(disp, coords, dx=self.dx, low_memory=low_memory)
with torch.amp.autocast('cuda', enabled=self.args.mixed_precision, dtype=U.AMP_DTYPE):
net_list, mask_feat_4, delta_disp = self.update_block(net_list, inp_list, geo_feat.to(self.dtype), disp, att)
disp = disp + delta_disp.to(self.dtype)
if test_mode and itr < iters-1:
continue
# upsample predictions
disp_up = self.upsample_disp(disp.to(self.dtype), mask_feat_4.to(self.dtype), stem_2x.to(self.dtype))
disp_preds.append(disp_up)
geo_fn
在 Python 中,并不是只有函数(
function)才能加括号运行。如果一个类定义了__call__方法,那么这个类的实例就会变成一个可调用对象。当你写下
obj(args)时,Python 解释器在底层会自动将其转化为obj.__call__(args)。def __call__(self, disp, coords, dx, low_memory=True): b, _, h, w = disp.shape out_pyramid = [] for i in range(self.num_levels): with torch.profiler.record_function(f"make disp_lvl {i}"): geo_volume = self.geo_volume_pyramid[i] x0 = dx + disp.view(b*h*w, 1, 1, 1) / 2**i获取当前视差图
disp的尺寸。遍历之前在
__init__中准备好的几何特征金字塔。 粗对齐 + 精对齐(这里 self.geo_volume_pyramid 是根据comb_volume这个聚合完的特征 制作的 金字塔 )
(disp是 将 c 归一化,然后对不同视差评分加权平均(d化为1)的结果) (这时候数值的意思不是C,而是视差应该是多少)
(dx = torch.arange(-r, r+1, requires_grad=False, dtype=torch.int8).reshape(1, 1, 2*r+1, 1))
with torch.profiler.record_function(f"bilinear_sampler geo_volume {i}"): if low_memory: geo_volume = bilinear_sampler1d(geo_volume, x0, mode='bilinear', align_corners=True) else: y0 = torch.zeros_like(x0) disp_lvl = torch.cat([x0,y0], dim=-1) geo_volume = bilinear_sampler(geo_volume, disp_lvl, low_memory=low_memory) geo_volume = geo_volume.view(b, h, w, -1) #(b, h, h, 3x3xC)神经网络生成的代价体积(Cost Volume)是离散的像素点,但 GRU 预测的视差(Disparity)通常是连续的浮点数(例如 10.42)。
bilinear_sampler的任务就是:当你给出一个不落在整数坐标上的视差值时,通过“四点取样”帮你算出一个合理的、平滑的特征值(算一个 猜出来的C)在
geo_volume这一行数据中,10.4 并不是一个真实存在的索引。bilinear_sampler会立刻锁定它周围最靠近的整数索引提供梯度 (Gradient Flow): 这是最关键的一点。如果直接“取整”采样(Nearest Neighbor),函数是阶跃的,导数为 0,模型没法训练。 双线性采样是线性可导的。 当 GRU 微调视差时,采样出的
geo_feat会随之发生连续变化。这让 Loss 产生的梯度能顺着这条“线性电缆”反向传播,告诉 GRU 应该往哪个方向继续修正。

geo_volume = bilinear_sampler(geo_volume, disp_lvl, low_memory=low_memory) 的 意思是 用 disp_lvl 这个 左图中每个像素对应的深度值{D}以及其余 dx 个 在 {D}上做偏移的 一系列 {D+x1, D+x2....} , 作为索引,去插值好的 geo_volume 中取出 匹配特征值
with torch.profiler.record_function(f"make init_coords_lvl {i}"): init_corr = self.init_corr_pyramid[i] # (B*H*W, 1, 1, W2) init_x0 = coords.view(b*h*w, 1, 1, 1)/2**i - disp.view(b*h*w, 1, 1, 1) / 2**i + dx # X on right image变量解释
coords(绝对横坐标) , 代表了左图像素的原始物理位置 (x_left)。在代码中,它是由torch.arange(w)生成并铺满全图的,表示图像中每一个点在水平方向上的绝对索引。dx(局部偏移量):它代表了以预测点为中心的搜索窗口半径。它是一个相对位移向量(例如[-4, -3, ..., 0, ..., 4]),用于在当前预测值附近进行微调探测。此时 将 init_corr 形状 为 (B* H * W, 1, 1, W2). 是全对相关性矩阵(All-pairs Correlation), 即同一个高度上两张图片的每一个像素都会做运算.
coords - disp的结果就是该左图像素点在右图中对应的预测横坐标。+ dx:局部搜索偏移。
这让
init_x0不仅仅指向右图的一个点,而是指向以预测点为中心的 9 个邻居点。
. 为什么要用
coords - disp?(对比geo_volume)这是理解这段代码最难的一点:
对于
geo_volume:它的维度本身就是“视差轴”。你只需要告诉它“我要看视差为 10 的地方”,所以采样坐标只需要disp + dx。对于
init_corr:它是全对相关性矩阵(All-pairs Correlation)。它的最后一个维度 W_2 代表的是右图的列索引。如果你想知道某个视差下的匹配好不好,你必须亲自算出这个视差对应右图的哪个位置(即 x_left - d),然后去右图那个位置“抠”特征。
out_pyramid.append(geo_volume) out_pyramid.append(init_corr) with torch.profiler.record_function(f"make out_pyramid"): out_pyramid = torch.cat(out_pyramid, dim=-1) return out_pyramid.permute(0, 3, 1, 2) #(B,C,H,W)在
for循环内部,每一层级采样出的geo_volume和init_corr都会被存入out_pyramid列表动作:将列表中所有的张量沿着最后一个维度(通道维度)拼接起来。
维度计算:
每个采样结果的形状是 (B, H, W, 9)(假设采样半径 r=4,即
dx有 9 个点)。如果
num_levels=2,拼接后的总通道数就是:9 (geo0) + 9 (corr0) + 9 (geo1}) + 9 (corr1}) = 36。
with torch.amp.autocast('cuda', enabled=self.args.mixed_precision, dtype=U.AMP_DTYPE): net_list, mask_feat_4, delta_disp = self.update_block(net_list, inp_list, geo_feat.to(self.dtype), disp, att)self.update_block(通常是一个 ConvGRU)
update.py
以scal3r为例完成练习工程
https://gemini.google.com/app/314af1ac43abaf2b?hl=zh
第 1 步:目录与「发布根」
framework

最外层的
shell_practice/是整个项目的工作区根文件夹,包含了配置文件、说明文档以及实际的代码包。pyproject.toml: Python 项目的现代构建配置文件。用于定义项目的元数据、依赖项以及打包配置,这是标准化 Python 工程的基础。README.md: 项目的说明文档
这个目录专门用于存放与 Scal3R 对齐的 YAML 配置文件。它也是 path.py 中寻找“发布根 (Release Root)”时的关键定位标志。
base.yaml: 基础配置文件(占位)。按照您的第 2 步规划,它未来将包含顶层的data和model等公共默认配置项。models/demo.yaml: 模型特定的配置文件(占位)。用于练习特定模型的参数配置以及多层 YAML 的合并逻辑。
Python 核心代码包 (
shell_practice/)这是内部真正的 Python 模块包,包含了工程的所有业务逻辑和运行脚本。
__init__.py: 空文件,用于标识shell_practice是一个标准的 Python 包。run.py: 项目的 CLI 执行入口。它对应您要求的支持python -m shell_practice.run指令,并将在后续步骤中承担解析参数、加载配置和调用推理核心的任务。
引擎与工具模块 (
shell_practice/engine/)这个子目录用于存放项目底层的通用逻辑和工具类代码。
__init__.py: 标识engine是一个 Python 子包。path.py: 路径解析核心模块。根据您的第 1 步要求,这里实现了两个关键函数:get_release_root()(通过__file__向上查找包含configs/文件夹的绝对路径)以及resolve_release_path(path)(基于发布根智能拼接或直接返回绝对路径)。
__init__.py
from shell_practice import __version__时,Python 实际上是直接去shell_practice/__init__.py这个文件中寻找__version__这个变量的在 Python 中,
__init__.py是一个特殊的文件。它的存在告诉 Python:“请把当前这个文件夹(shell_practice)当作一个正规的代码包(Package)来看待。” 当你从包的顶层导入任何东西时(比如from shell_practice import xxx),Python 会最先、且默认去执行__init__.py里面的代码。所以,定义在里面的__version__变量自然就能被外界直接拿到了。
__version__ = "0.1.0"提供一个包级别的
__version__属性是 Python 社区的一个强烈共识(源自早期的 PEP 396 规范)。 这使得用户或开发者可以在不查看源代码、也不翻阅pyproject.toml的情况下,直接在代码或交互式终端里快速确认当前包的版本:
>>> import shell_practice
>>> shell_practice.__version__
'0.1.0'__init__.py 父文件调用子文件的函数
main.pyutils/(子文件夹)__init__.py(通常是个空文件,告诉 Python 这是一个包)network.py
main.py 可以如下
# 方式 1
from utils import network
network.get_data()
# 方式 2
from utils.network import get_data
get_data()包级 re-export

init.py:
from .scal3r import Scal3R, build_sampler_from_config
__all__ = ["Scal3R", "build_sampler_from_config"]
这时候, backend.py 可以
from scal3r.models import build_sampler_from_config, 而不必from scal3r.models.scal3rimport build_sampler_from_config__all_ 这个可以删去
pyproject.toml
[build-system](构建系统声明)
[build-system]
requires = ["setuptools>=68"]
build-backend = "setuptools.build_meta"
[project](项目核心元数据)
[project]
name = "shell-practice"
version = "0.1.0"
description = "Scal3R-style engineering shell practice (no model)"
readme = "README.md"
requires-python = ">=3.10"
dependencies = []
[project.scripts](命令行入口配置)
[project.scripts]
shell-practice = "shell_practice.run:main"
[tool.setuptools.packages.find](包查找规则)
[tool.setuptools.packages.find]
where = ["."]
include = ["shell_practice*"]
path.py
from __future__ import annotations
from shell_practice import __version__
from shell_practice.engine.path import get_release_root, resolve_release_path__future__ : Python 解释器自带的内置模块(Built-in Module)。Python 在执行
import语句时,会按照特定的优先级顺序去查找包。内置模块拥有最高的查找优先级__future__是一个特殊模块,用于让当前版本的 Python 提前使用未来版本的新特性。在这里,annotations的作用是让代码里的类型提示(Type Hints)变成字符串来延迟解析,从而避免复杂的循环引用问题,并提升程序的启动速度。
为什么能找到 shell_practice?
Python 寻找外部包依赖于一个名叫
sys.path的环境变量列表。Python 会按顺序遍历这个列表里的目录,看里面有没有叫shell_practice的文件夹且包含__init__.py在你的这个工程中,能找到它通常是因为以下两种情况之一:
情况 A:基于当前工作目录(你目前大概率是这种) 当你在终端中处于项目的根目录(例如
/home/linux/218/shell_practice/,也就是pyproject.toml所在的目录),并运行python -m shell_practice.run时,Python 会自动将你当前所在的目录(项目根目录)偷偷塞进sys.path的第一个位置。因此,Python 顺理成章地在当前目录下看到了名为shell_practice的文件夹,并成功将其识别为一个包。情况 B:基于包管理器安装(Editable Install) 如果你在这个项目根目录下执行过类似 pip install -e . 的命令,
pip就会读取你的pyproject.toml文件,然后在你 Python 环境的全局包目录(site-packages)里建一个“快捷方式”(通常是一个.pth文件),指向你电脑上的这个代码文件夹。这样,无论你在终端的哪个目录下运行代码,Python 都能顺着网线找到它。
def get_release_root() -> Path:
"""这个函数的目的是找到整个项目的根目录(即 shell_practice/ 最外层的那个文件夹),它返回的是一个绝对路径。"""
here = Path(__file__).resolve()
for parent in (here.parent, *here.parents):
configs = parent / "configs"
if configs.is_dir():
return parent
raise RuntimeError(
"Could not find release root: no parent directory contains a `configs/` folder. "
"Run from the installed `shell_practice` package layout where repo root has configs/."
)
def resolve_release_path(path: str | Path) -> Path:
"""转化为一个绝对安全的绝对路径。"""
p = Path(path)
if p.is_absolute():
return p.resolve()
return (get_release_root() / p).resolve()
魔法变量
__file__:它代表当前这行代码所在文件的路径(也就是path.py的位置)。resolve():将其转化为绝对路径* 解包
here.parents(eg. ['/home/user/project/engine', '/home/user/project', '/home/user', '/home', '/']) 是一个列表,而 here.parent (eg. /home/user/project/engine) , 当我们需要将 列表中的元素和here.parent 拼成一个元组的时候,就要解包
for parent in here.parents 不需要解包,因为for...in... 本身可以遍历可迭代对象, 而 (here.parent, here.parents) 不行
path: str | Path
Python 3.10 版本引入的类型注解(Type Hinting)新语法,专门用来表示“联合类型”(Union Type)。
简单来说,
path: str | Path的翻译就是:参数path可以是一个字符串 (str),或者 (|) 是一个pathlib.Path对象。两者皆可接受。
pip install -e . -q
生成了什么?
项目根目录里(仓库中)
*.egg-info/(你这里是shell_practice.egg-info/)
里面是 包元数据:包名、版本、PKG-INFO、SOURCES.txt、依赖、entry_points.txt等。
当前 Python 环境的 site-packages 里
会有一个指向你项目的 可编辑安装记录(具体形式随 setuptools/pip 版本略有不同):例如
.pth文件、或.dist-info/ 链接,让 import shell_practice 能找到你的源码路径。
发生了什么?
import shell_practice、python -m shell_practice.run会从你克隆/编辑的那份源码加载。shell-practice控制台命令(若在pyproject.toml里[project.scripts]配了)会指向shell_practice.run:main,也是在当前环境里可用。
完成 第 2 步:YAML 与 data / model 嵌套

pyproject.toml
dependencies = [
"PyYAML>=6",
]Python 标准库自带了解析 JSON 的能力,但不包含解析 YAML 的内置包。因为你在第 2 步中要求实现
load_yaml并且要处理复杂的configs/base.yaml等文件,所以必须引入第三方库PyYAM
[project.optional-dependencies]
dev = ["pytest>=7"]变化点:这是一个全新增加的区块,专门用来定义“可选依赖(Optional Dependencies)”。
为什么修改:你在第 2 步中编写了
test_config.py来验证复杂的配置合并逻辑(比如子配置如何覆盖父配置)。pytest就是用来运行这些测试的框架。工程最佳实践:
为什么不把
pytest直接写进上面的dependencies里?因为如果别人只是想运行你的工具(生产环境),他们根本不需要安装测试工具。把它们混在一起会让项目变得臃肿。放在 optional-dependencies 下的 dev(代表 development,开发环境)组里,是一种非常优雅的工程规范。
实际影响(如何使用):
普通安装:运行
pip install -e .,只会安装PyYAML。开发者安装:如果要在本地跑测试,你需要带上
[dev]标签,运行命令改为 pip install -e ".[dev]" (注意引号)。这样pip才会把pytest一并安装上。
base.yaml
# 与 Scal3R configs/base.yaml 对齐的 data / model 嵌套(练习用数值可改)。
release:
name: shell_practice_release_workspace
model:
name: demo
data:
image_patterns: "*.png,*.jpg,*.jpeg,*.bmp"
max_images: -1
preprocess_workers: 32
block_size: 60
overlap_size: 30
loop_size: 20
use_loop: 1
use_xyz_align: 0
save_dpt: 1
save_xyz: 1
loop_ckpt: ""
streaming_state: 0
offload_batches: 0
offload_outputs: 0
cleanup_offload: 1
它的设计哲学是:把所有可能用到的参数都在这里赋一个默认值。这样,后续其他特定的配置文件(比如 demo.yaml)只需要继承它,然后覆盖掉自己关心的那几个参数即可,极大减少了代码冗余。
release.name:shell_practice_release_workspace当前工程运行空间的命名,通常用于在生成输出结果时,作为根文件夹的名称或日志标签。
model.name:demo指定默认使用的模型或流水线名称。在真实工程中,Python 代码通常会根据这个字段去动态实例化对应的模型类(比如 if cfg.model.name == 'demo': return DemoModel())。
文件读取与并行控制:
image_patterns:*.png,*.jpg,*.jpeg,*.bmp(指定引擎去目录里找哪些格式的图片)。max_images:-1(限制最大处理张数。-1通常在程序中代表“不限制,处理所有找到的图片”)。preprocess_workers:32(预处理的并发数。使用 32 个线程或进程来加速图片的读取、缩放等前期工作)。
切片与块处理逻辑(核心算法参数):
block_size:60overlap_size:30解释:处理几千张图片时显存肯定会爆。工程通常会把图片分成一个个“块”(Block)来处理。这里的意思是:每 60 张图片作为一个批次丢进模型,为了保证前后块拼接顺畅,相邻的两个块要保留 30 张图片的重叠部分(Overlap)。
高级算法开关:
loop_size:20,use_loop:1,loop_ckpt:""解释:这大概率对应 3D 重建中的**“闭环检测”(Loop Closure)**。
use_loop: 1表示开启,loop_size控制检测的帧间距或范围,loop_ckpt则用于指定专门用于闭环的额外模型权重路径(这里为空字符串,表示不使用或使用默认)。
use_xyz_align:0解释:是否启用基于三维坐标(XYZ)的对齐优化(0 表示默认关闭)。
落盘与保存(I/O):
save_dpt:1(开启保存深度图 Depth Map)。save_xyz:1(开启保存三维点云数据)。
显存管理与卸载(Offload):
streaming_state:0,offload_batches:0,offload_outputs:0,cleanup_offload:1解释:这是现代大规模推理框架的标准配置。当 GPU 显存不够时,程序会把暂时用不到的张量(Tensors)“卸载”(Offload)到系统内存(CPU RAM)甚至硬盘上。这里默认全部关闭(
0),但一旦前面的任务跑完了,cleanup_offload: 1保证会及时清理掉这些临时文件或显存碎片。
base.yaml
configs:
- configs/base.yaml
这是什么: 这是一个特殊的语法指令。当你运行程序,把这个
demo.yaml传给底层解析引擎(也就是你在第 2 步写的config.py)时,引擎看到这个configs字段会立刻停下来。它的动作: 引擎会顺着这个路径,先把
configs/base.yaml里的所有内容(比如那二十几个data处理参数)完完整整地“抄”一份到内存里,作为初始地基。
model:
name: demo
checkpoint: data/checkpoints/demo_placeholder.pt
它的动作: 在引擎把
base.yaml的地基打好之后,它接着往下读。当它读到这个model字典时,它会把这里面的值与地基里的值进行合并(Merge)。
io.py
load_yaml 函数
"""最小 IO:仅 YAML(对齐 Scal3R engine.io 的 load 用途)。"""
from __future__ import annotations
from pathlib import Path
from typing import Any
def load_yaml(path: str | Path) -> Any:
import yaml
p = Path(path)
with p.open("r", encoding="utf-8") as f:
return yaml.safe_load(f)
safe_load 的作用: 它是被阉割过的安全版本,只允许解析基础的数据结构(字典、列表、数字、字符串等),把潜在的执行漏洞完全堵死了。在现代 Python 开发中,读取不受信任的 YAML 文件必须使用 safe_load
config.py
经典的设计模式:分层继承式配置引擎。
它的核心使命是将零散的多个 YAML 文件,通过继承关系(configs 列表)和覆盖逻辑,最终“揉”成一个完整的、可以直接指导程序运行的 Python 字典。
from __future__ import annotations
from os import PathLike
from pathlib import Path
from typing import Any
from shell_practice.engine import io
from shell_practice.engine.path import get_release_root
def _resolve_config_path(config_ref: str | PathLike[str], parent_dir: str) -> str:
ref = Path(str(config_ref)).expanduser()
if ref.is_absolute() and ref.is_file():
return str(ref.resolve())
parent = Path(parent_dir)
for root in (parent, get_release_root()):
candidate = (root / ref).resolve() if not ref.is_absolute() else ref.resolve()
if candidate.is_file():
return str(candidate)
raise FileNotFoundError(f"Could not resolve config path: {config_ref}")from os import PathLike作用: 导入一个类型协议。它代表了**“任何长得像路径的东西”**(比如纯字符串,或者
pathlib.Path对象)。用于给函数参数做类型标注,告诉调用者:“传字符串或 Path 对象都可以”。
str | PathLike[str] 也可以写成 str | Path , PathLike[str]可以识别自定义的path
绝对路径
expanduser()会把路径里的~符号自动展开成当前用户的家目录(比如/home/user/)。如果用户传进来的本身就是一个绝对路径(比如
/data/config.yaml),并且这个文件真实存在(is_file()),那就直接返回它的规范化绝对路径,不需要再费劲找了
def _merge_dicts(base: Any, update: Any) -> Any:
if isinstance(base, dict) and isinstance(update, dict):
merged = dict(base)
for key, value in update.items():
if isinstance(value, dict):
value = dict(value)
if value.pop("_delete_", False):
merged[key] = value
continue
if key in merged:
merged[key] = _merge_dicts(merged[key], value)
else:
merged[key] = value
return merged
return update字典深度合并(Deep Merge)。
原生的 Python 字典 .update() 方法只能做“浅合并”(如果遇到同名字典,新字典会直接把老字典整个拍死替换掉)。而这段代码会像剥洋葱一样,一层一层深入进去,只覆盖最小粒度的变动。
isinstance
isinstance(3, int) # True
isinstance("hi", str) # True
isinstance([], list) # True
isinstance([], (list, tuple)) # True(是 list 或 tuple 之一)
merged = dict(base) 这一步非常关键。它创建了父配置的一个副本,所有的合并操作都在这个副本上进行,绝不修改原本传入的字典对象
update的 格式
update = {
"data": {"block_size": 8},
"model": {"name": "demo"},
}
update.items()的「格式」dict.items()返回的是 「键值对」的可迭代视图,每次迭代得到(key, value)二元组
value.pop("_delete_", False)
若字典里 有 键
"_delete_":删掉这个键,并返回它原来的值(value[_delete_])(例如True/False/ 别的真值假值)。若 没有 这个键:不报错,返回默认值
False。
if key in merged:表示:合并结果里已经有这个键(来自原来的
base)。这时不能把
value直接整块覆盖掉(否则深层嵌套会丢base里同键下其它子键),所以要 递归合并:merged[key] = mergedicts(merged[key], value)
def load_config(config_path: str | PathLike[str]) -> dict[str, Any]:
"""
加载 YAML:支持顶层 ``configs:`` 列表引用更多 YAML,按顺序深合并,当前文件最后生效。
写入 ``__config_path__`` 为本函数**入口**解析后的绝对路径字符串。
"""
resolved = _resolve_config_path(str(config_path), str(Path.cwd().resolve()))
current = dict(_load_yaml_root(resolved))
base_refs = current.pop("configs", [])
if isinstance(base_refs, (str, PathLike)):
base_refs = [base_refs]
merged: dict[str, Any] = {}
for base_ref in base_refs:
base_cfg = load_config(_resolve_config_path(str(base_ref), str(Path(resolved).parent)))
merged = _merge_dicts(merged, dict(base_cfg))
merged = _merge_dicts(merged, current)
merged["__config_path__"] = resolved
_validate_required(merged)
return mergedcurrent = dict(_load_yaml_root(resolved))
去找到目标配置(比如
demo.yaml)的绝对路径。
base_cfg = load_config(_resolve_config_path(str(base_ref), str(Path(resolved).parent)))
递归 如果
base.yaml里面还继承了super_base.yaml,程序就会继续无限往下钻,直到找到一个没有任何继承的最底层文
run.py
parser = argparse.ArgumentParser(description="shell_practice release entry")
parser.add_argument(
"--config",
type=str,
default="configs/models/demo.yaml",
help="Config path relative to release root or absolute.",
)
args = parser.parse_args()这是什么:
argparse是 Python 官方提供的一款非常强大的命令行参数解析工具。它的作用: 它让你的 Python 脚本不再是“写死”的。用户在终端运行时,可以通过传入
--config参数来动态指定要跑哪个配置。默认值兜底:
default="configs/models/demo.yaml"这个设计很贴心。如果你在终端什么都不传,直接敲python -m shell_practice.run,它就会默认去跑 demo.yaml。如果你以后新建了一个test.yaml,你只需敲 ...run --config configs/test.yaml 就能无缝切换。args 的类型是 argparse.Namespace:一个简单命名空间对象,每个命令行参数对应 一个同名属性(--xxx-yyy 会变成 args.xxx_yyy)。
import pytest
在 pytest 中,你只需要用 Python 原生的 assert 关键字:
def test_math():
x = 1 + 1
assert x == 2 # 判断相等
assert "a" in "apple" # 判断包含
assert not False # 判断布尔值跑全量测试:
pytest(自动扫描当前目录下所有的 test_.py 文件并执行)*
跑指定文件:
pytest tests/test_config.py跑指定的某一个函数:
pytest tests/test_config.py::test_delete_merge
完成 第 3 步:InferenceRequest + run_inference 外形
在像 Scal3R 或大型深度学习框架中,通常不会把所有代码都在一个 Python 进程里跑完,而是采用**“主进程准备数据与计划 -> 拉起独立子进程执行”**的模式,以防止显存泄漏或实现多卡并行。
framework
shell_practice/
├── README.md
├── pyproject.toml
├── .gitignore
├── configs/
│ ├── base.yaml
│ └── models/
│ └── demo.yaml
├── shell_practice/ # Python 包
│ ├── __init__.py
│ ├── run.py # CLI:配置摘要 + 可选 run_inference
│ ├── dataloaders/
│ │ ├── __init__.py
│ │ └── datasets/
│ │ ├── __init__.py
│ │ └── image_folder_dataset.py
│ ├── engine/
│ │ ├── __init__.py
│ │ ├── path.py # get_release_root, resolve_release_path
│ │ ├── io.py # load_yaml
│ │ └── config.py # load_config 链式合并
│ ├── pipelines/
│ │ ├── __init__.py
│ │ ├── inference.py # InferenceRequest, run_inference
│ │ └── backend.py # 占位子进程入口
│ └── utils/
│ ├── __init__.py
│ └── data_utils.py # ensure_dir, write_json, write_lines
└── tests/
├── test_config.py
└── test_inference.pyimport json
from collections.abc import Iterable
def write_lines(path: str | Path, lines: Iterable[str]) -> None:
...
★data_utils.py
def ensure_dir(path: str | Path) -> str:
相当于 Linux 终端里的
mkdir -p命令
def write_json(path: str | Path, payload: dict[str, Any]) -> None:
把 Python 字典安全地保存为
.json文件。ensure_dir(str(p.parent)) 这一行是这个函数的灵魂。很多新手在写文件时,如果父文件夹不存在,
open()函数会直接抛出FileNotFoundError导致程序崩溃。这里在写文件前,先强制把文件所在的父目录建好,彻底断绝了这种低级 Bug。
def write_lines(path: str | Path, lines: Iterable[str]) -> None:
把一个列表里的字符串按行写入到文本文件中。在工程里,它用来生成
image_manifest.txt(图片绝对路径清单)
★image_folder_dataset.py
去指定的文件夹里,把所有符合要求的图片找出来,排好队,并报告它们的绝对位置
resolved = sorted({abspath(expanduser(path)) for path in images})
集合去重 ({...}):注意这里用的是大括号
{},这在 Python 里代表集合推导式(Set Comprehension)。集合天生自带去重功能。如果在某些极端系统下glob返回了重复的路径,这一步会瞬间把重复项剔除。强制排序 (sorted(...)):这是最致命的一步! * 为什么一定要排序?在像 Scal3R 这样的三维重建或 SLAM 工程中,输入的图片通常是从视频里抽出来的连续帧(比如
001.jpg,002.jpg,003.jpg)。glob找文件的顺序是随机的(依赖于操作系统的底层文件系统)。如果不排序,程序可能先读第 1 帧,再读第 80 帧,这会导致计算相机轨迹时直接崩溃。sorted保证了时间序列的绝对连贯。
@dataclass
专门用于那些主要用来存储数据的类
传统写法
class Point:
def __init__(self, x: int, y: int):
self.x = x
self.y = y
# 痛点 1:代码啰嗦。x 和 y 都要写 3 遍!
# 痛点 2:打印出来是毫无意义的内存地址
p1 = Point(10, 20)
print(p1) # 输出类似:<__main__.Point object at 0x7f8b9c...>
# 痛点 3:无法直接比较值
p2 = Point(10, 20)
print(p1 == p2) # 输出 False,因为它们在内存中的地址不同,即使数值一样加入这个装饰器后
from dataclasses import dataclass
@dataclass
class Point:
x: int
y: int非常自然地给变量设置默认值
max_images: int | None (这个量允许是 int,也允许是 None)
若不传、不显式赋值,就用 None
@dataclass
class ImageFolderDataset:
input_dir: str
image_patterns: tuple[str, ...]
max_images: int | None = None # 直接赋默认值注意规则:带有默认值的字段,必须放在没有默认值的字段后面。
如果你想把一个列表(
list)或字典(dict)作为默认值,千万不要直接写[]或{}。因为在 Python 中,类的默认可变对象会被所有实例共享,导致改了一个,其他的全跟着变
from dataclasses import dataclass, field
@dataclass
class Student:
name: str
# 错误写法:scores: list = []
# 正确写法:使用 field 每次生成一个新列表
scores: list = field(default_factory=list)image_patterns: tuple[str, ...]
元素全是 str、长度任意的元组
from glob import glob
它允许你使用通配符(如
*或?)在操作系统的文件系统里进行“模糊搜索”。
images.extend 和 images.append 区别
images = ["a.png"]
images.append(["b.png", "c.png"]) # images -> ["a.png", ["b.png", "c.png"]] 多了一层列表
images = ["a.png"]
images.extend(["b.png", "c.png"]) # images -> ["a.png", "b.png", "c.png"] 拆开接进去append(x):把x当成一个整体 追加一项。extend(iterable):对iterable里每一项 各追加一项(相当于多次append,但一次完成)。
★inference.py
def _get_nested(config: dict[str, Any], *keys: str, default: Any = None) -> Any:
current: Any = config
for key in keys:
if not isinstance(current, dict) or key not in current:
return default
current = current[key]
return current为了防止程序因为“找不到键”或“数据类型不对”而突然崩溃。(如果出错 立刻返回默认值 )
*keys(不定长参数):可以传入任意数量的键,像剥洋葱一样指定路径。比如传"data", "advanced", "block_size"
data_cfg = _get_nested(config, "data", default={}) or {}
model_cfg = _get_nested(config, "model", default={}) or {}
release_root = str(get_release_root())A or B
先算
A。若
A在布尔语境下为真(truthy),整个表达式结果就是 A,不会去算B。若
A为假(falsy),整个表达式结果是B。
dataset = ImageFolderDataset(
input_dir=request.input_dir,
image_patterns=tuple(
pattern.strip()# 去空格
for pattern in data_cfg.get("image_patterns", "*.png,*.jpg,*.jpeg,*.bmp").split(",")#按,切分,并成列表
if pattern.strip() # 去掉空白后若是空字符串,不要这一项
),
max_images=request.max_images,
)
image_paths = dataset.list_images()字典内置方法 dict.get() ; 第一个参数是键名,第二个是可选的默认值
tuple(pattern.strip() for pattern in ... if pattern.strip())
把生成器一次性消费掉,得到 不可变元组,满足类型 tuple[str, ...]
manifest_path = join(runtime_dir, "image_manifest.txt")
plan_path = join(runtime_dir, "run_plan.json")
write_lines(manifest_path, [str(path) for path in image_paths])规划路径: 提前把任务清单(
manifest)和作战计划(plan)的文件名定好,统统放进runtime_dir里,不污染用户的最终输出目录。落盘图片路径: 将刚才收集到的
image_paths(里面全是Path对象,所以要用str(path)转成字符串)按行写入image_manifest.txt。
command: list[str] = [
sys.executable,
"-m",
backend_module,
"--config",
str(resolve_release_path(request.config_path)),
"--input_dir",
str(request.input_dir),
"--result_dir",
str(output_dir),
"--runtime_dir",
str(runtime_dir),
"--image_patterns",
str(data_cfg.get("image_patterns", "*.png,*.jpg,*.jpeg,*.bmp")),
"--preprocess_workers",
str(request.preprocess_workers or data_cfg.get("preprocess_workers", 32)),
"--block_size",
str(request.block_size or data_cfg.get("block_size", 60)),
"--overlap_size",
str(request.overlap_size or data_cfg.get("overlap_size", 30)),
"--loop_size",
str(data_cfg.get("loop_size", 20)),
"--use_loop",
str(request.use_loop if request.use_loop is not None else data_cfg.get("use_loop", 1)),
"--use_xyz_align",
str(request.use_xyz_align if request.use_xyz_align is not None else data_cfg.get("use_xyz_align", 0)),
"--save_dpt",
str(request.save_dpt if request.save_dpt is not None else data_cfg.get("save_dpt", 1)),
"--save_xyz",
str(request.save_xyz if request.save_xyz is not None else data_cfg.get("save_xyz", 1)),
"--streaming_state",
str(request.streaming_state if request.streaming_state is not None else data_cfg.get("streaming_state", 0)),
"--offload_batches",
str(request.offload_batches if request.offload_batches is not None else data_cfg.get("offload_batches", 0)),
"--offload_outputs",
str(request.offload_outputs if request.offload_outputs is not None else data_cfg.get("offload_outputs", 0)),
"--cleanup_offload",
str(request.cleanup_offload if request.cleanup_offload is not None else data_cfg.get("cleanup_offload", 1)),
]解释它的核心任务是:把主进程手里掌握的所有线索(用户刚敲的命令、YAML 里的配置、硬盘上的绝对路径),序列化成一条极长的、可以直接在终端里执行的命令行字符串(就像你在终端里手动敲
python -m backend --config xxx --block_size 60 ...一样)。sys.executable的妙用: 它获取的是当前主进程正在使用的那个 Python 解释器的绝对路径(比如/home/user/miniconda/envs/myenv/bin/python)。这 100% 保证了主进程和子进程处于同一个虚拟环境中,绝不会串台
参数拼接 str(request.block_size or data_cfg.get("block_size", 60))
最高优先级:
request.block_size。代表用户刚才在终端里强行指定的(比如--block_size 99)。如果有,直接用它。第二优先级:
data_cfg.get("block_size")。如果用户终端没传(为None),程序就会触发or逻辑,去 YAML 配置文件(data_cfg)里找。最低优先级(保底):
60。如果连 YAML 配置文件里都忘了写这个参数,程序就用框架写死的绝对默认值60。
代码分成了两拨。上面一拨用的是短小精悍的
or(比如request.overlap_size or ...),但到了下面这一拨,突然变得极其啰嗦str(request.use_xyz_align if request.use_xyz_align is not None else data_cfg.get("use_xyz_align", 0))
为什么不能写成
request.use_xyz_align or data_cfg.get(...)?因为
use_xyz_align是一个开关参数,它的值是0或1。在 Python 中,数字
0会被判定为False(假值)用 is not None else
if max_align_points_per_frame is not None:
command.extend(["--max_align_points_per_frame", str(max_align_points_per_frame)])
if request.checkpoint:
command.extend(["--checkpoint", str(request.checkpoint)])
if request.device:
command.extend(["--device", str(request.device)])选填部分, 如无也不会有默认值
payload: dict[str, Any] = {
"status": "ready_to_execute" if not request.dry_run else "dry_run",
...
"image_count": len(image_paths),
"first_image": str(image_paths[0]) if image_paths else "",
...
"block_size": request.block_size or data_cfg.get("block_size", 60),
...
"command": command,
}
write_json(plan_path, payload)payload字典 (备份保存)在拉起子进程之前,主控程序把这次任务所有最终敲定的参数(包括合并后的参数、第一张和最后一张图片的路径、图片总数,以及最后拼出来的那个极长的
command列表),统统打包成了一个巨大的字典,并写进了runtime_dir下的run_plan.json文件中
if not request.dry_run:
subprocess.run(command, check=True, cwd=release_root)check=True(严格报错): 这是一个极其关键的防御参数。如果子进程(backend.py)在运行过程中崩溃了(比如显存爆了退出码不是 0),主进程看到check=True会立刻跟着抛出CalledProcessError异常并停止,而不是假装无事发生继续往下跑。cwd=release_root(锚定大本营): 强制要求子进程在启动时,把它的“当前工作目录”(Current Working Directory)锁定在整个工程的根目录下。
import subprocess
subprocess是 Python 标准库中极其强大且常用的模块。它的核心使命非常明确:允许你在 Python 代码中启动并控制其他外部程序或系统命令subprocess.run() , 它的特点是阻塞式的
import subprocess
# 推荐写法:把命令拆成列表的形式
command = ["echo", "Hello World!"]
# 执行命令。主进程会在这里等待它执行完毕
subprocess.run(command)import sys
sys.executable:获取当前 Python 的绝对路径它返回的是当前正在运行这段代码的 Python 解释器的物理路径(例如
/usr/bin/python3或~/miniconda3/envs/scal3r/bin/python)。工程应用: ```python command = [sys.executable, "-m", backend_module, ...] subprocess.run(command)
sys.argv:获取原始命令行参数列表假如你在终端运行:
python run.py --config demo.yaml那么在代码里打印 sys.argv,结果会是:['run.py', '--config', 'demo.yaml']
sys.path:模块搜索路径这是一个列表,包含了许多目录路径
当你写
import yaml或from shell_practice import run时,Python 解释器其实是一个“路痴”。它只能乖乖地拿着包名,按照sys.path列表里的目录顺序,一个一个文件夹去找。如果找遍了列表里的所有目录都没找到,它就会抛出ModuleNotFoundError。这就是为什么我们在开发时,经常需要通过 pip install -e . 把本地目录悄悄塞进 sys.path 的原因。
★backend.py
接受父进程传给ta的一切参数~
★test_inference.py
def test_run_inference_dry_run_writes_plan_and_manifest_no_subprocess(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
monkeypatch.chdir(tmp_path)
root = Path(__file__).resolve().parents[1]
cfg = _demo_config(root)
tmp_path: Path pytest 内置 fixture:提供一个 临时目录 的 pathlib.Path,每个测试 独立、结束后 可清理。类型注解写成 Path 方便 IDE/类型检查。
monkeypatch: pytest.MonkeyPatch pytest 内置 fixture:得到一个 MonkeyPatch 对象,用来在测试期间 临时改环境/模块属性/.chdir 等,测试结束 自动恢复。
monkeypatch.chdir(tmp_path) 把进程的 当前工作目录(cwd) 改成 tmp_path 指向的那个临时目录。
def test_run_inference_executes_backend_stub(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
monkeypatch.chdir(tmp_path)
root = Path(__file__).resolve().parents[1]
cfg = _demo_config(root)root = Path(__file__).resolve().parents[1]
__file__ : 当前模块对应的源文件路径(字符串)
转成
Path对象.parents : 所有上级目录组成的序列:
parents[0]= 上一级,parents[1]= 上两级,以此类推
第 4 步:python -m ...run CLI(对齐 scal3r.run)
framework
shell_practice/
├── README.md
├── pyproject.toml
├── .gitignore
├── configs/
│ ├── base.yaml
│ └── models/
│ └── demo.yaml
├── shell_practice/
│ ├── __init__.py
│ ├── run.py # CLI(对齐 scal3r.run)
│ ├── dataloaders/
│ │ ├── __init__.py
│ │ └── datasets/
│ │ ├── __init__.py
│ │ └── image_folder_dataset.py
│ ├── engine/
│ │ ├── __init__.py
│ │ ├── path.py # 发布根、resolve、get_default_output_dir 等
│ │ ├── io.py
│ │ └── config.py
│ ├── pipelines/
│ │ ├── __init__.py
│ │ ├── inference.py
│ │ └── backend.py
│ └── utils/
│ ├── __init__.py
│ └── data_utils.py
└── tests/
├── test_config.py
├── test_inference.py
└── test_run.pyshell_practice/
├── README.md
├── pyproject.toml
├── .gitignore
├── configs/
│ ├── base.yaml
│ └── models/
│ └── demo.yaml
├── shell_practice/
│ ├── __init__.py
│ ├── run.py # CLI(对齐 scal3r.run)
│ ├── dataloaders/
│ │ ├── __init__.py
│ │ └── datasets/
│ │ ├── __init__.py
│ │ └── image_folder_dataset.py
│ ├── engine/
│ │ ├── __init__.py
│ │ ├── path.py # 发布根、resolve、get_default_output_dir 等
│ │ ├── io.py
│ │ └── config.py
│ ├── pipelines/
│ │ ├── __init__.py
│ │ ├── inference.py
│ │ └── backend.py
│ └── utils/
│ ├── __init__.py
│ └── data_utils.py
└── tests/
├── test_config.py
├── test_inference.py
└── test_run.py★run.py
run.py 不写第二套推理逻辑,只负责把命令行翻译成对 inference.py 的一次调用 ; 因为 run_inference不直接读取命令行参数
import logging
basic
import logging
# 1. 基础配置:设置门槛为 INFO
logging.basicConfig(level=logging.INFO)
# 2. 开始输出
logging.debug("这是一句废话,门槛是INFO,所以我不会被打印出来")
logging.info("程序启动了!")
logging.warning("注意,内存快满了!")
logging.error("报错:找不到文件!")格式化
真实的日志绝不仅仅是一句话,它还需要带上时间戳、文件来源和严重级别
import logging
# 配置日志的“长相” (format)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s", # 时间 | 级别 | 具体内容
datefmt="%Y-%m-%d %H:%M:%S" # 时间的格式
)
logging.info("开始处理数据")
logging.error("处理失败")
# 屏幕输出会变成这样:
# 2026-04-23 10:05:12 | INFO | 开始处理数据
# 2026-04-23 10:05:12 | ERROR | 处理失败getLogger
为什么要“获取一个 Logger”? 当你的工程有几十个 Python 文件时(比如
inference.py,backend.py都有日志),如果大家都用全局的logging.info(),排错时你会完全不知道这句话是哪个文件打印的。
logger = logging.getLogger("shell_practice.run")保存到硬盘
logging.basicConfig(
filename="runtime/app.log", # 写入文件而不是屏幕
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s"
)ZED+FFS+GSFusion experiment
相机性质
焦距
物理定义:它是一段距离

像素焦距

决定视场角 (FOV)


很远 的 物体 发过来的光是平行的,这是一种理想化;汇聚到一个点,这实际上也是一种理想化;实际上感光片和这个点的大小相同,能拍到的最大物体经过镜面到达焦点是的大小和感光片相同;这样的说法对吗?
有一定偏差
内参矩阵
内参矩阵标准形式

参数物理含义

从3D到2D

计算机的矩阵形式

在 GSFusion 代码中的应用

三维和四维空间旋转
1. 基石——从二维到三维的跃迁




2. 灵魂——旋转矩阵的数学特性


3. 进阶——任意轴的 3D 旋转



内在旋转 vs. 外在旋转

神奇的结论: 绕固定轴执行 XYZ 旋转的结果,在数学上等同于绕随动轴执行 ZYX 旋转。这就是为什么不同的软件(如 Unity, Blender, SolidWorks)虽然定义的旋转顺序不同,但最终能达成一致。
4. 齐次坐标与 4x4 矩阵



进阶应用 —— 矩阵的拆解与逆转
假设你拿到了一个标准的仿射变换矩阵 M:



GSFusion main method
组成 map(Map 是 三维体素地图的入口:在固定 世界系(或地图系) 下的一块空间里,用 八叉树(Octree) 存每个体素里的 场数据(TSDF、占用等)以及可选的 颜色 / 语义。)
模板
template<Field FldT, Colour ColB, Semantics SemB, Res ResT, int BlockSize>
class Map<se::Data<FldT, ColB, SemB>, ResT, BlockSize> {
public:
typedef Data<FldT, ColB, SemB> DataType;
typedef DataConfig<FldT, ColB, SemB> DataConfigType;
typedef se::Octree<DataType, ResT, BlockSize> OctreeType;

// Representation enums
enum class Field { TSDF, Occupancy };
enum class Colour { On = true, Off = false };
/** \brief The enum used to select the semantic class set used in mapping.
* The value of each enumerator is the number of semantic classes including the background.
*/
enum class Semantics : int { Off = 0, NYUv2 = classes::nyuv2.size(), MP3D = classes::mp3d.size(), COCO = classes::coco.size(), Replica = classes::replica.size() };
// Other enums
enum class Res { Single, Multi };Map 的构造 (Line 601)
se::TSDFColMap<se::Res::Single> map(config.map, config.data);TSDFColData 和TSDFColDataConfig 的区别是什么?
TSDFColData=Data<Field::TSDF, Colour::On, Semantics::Off>TSDFColDataConfig=DataConfig<Field::TSDF, Colour::On, Semantics::Off>

TSDFColMap是Map<TSDFColData, ResT, BlockSize>的别名;TSDFColData固定,不可通过该别名替换。只有
ResT和BlockSize有模板默认值;在实例化类型时可以不写(用默认),也可以显式写TSDFColMap<Res::Multi, 16>这类。构造时只传
config.map、config.data(等),不传ResT/BlockSize
template<se::Res ResT = se::Res::Single, int BlockSize = 8>
using TSDFColMap = Map<TSDFColData, ResT, BlockSize>;
构造用的是
MapConfig/DataConfig(来自 YAML 的map、data),解决的是 地图多大、分辨率、截断、权重 等运行期配置,不改变ResT、BlockSize这两个编译期模板实参。
那为什么 @map_impl.hpp (30-31) 这里 Field FldT, Colour ColB, Semantics SemB又能写在template里呢?


如何构造?
template<Field FldT, Colour ColB, Semantics SemB, Res ResT, int BlockSize>
Map<Data<FldT, ColB, SemB>, ResT, BlockSize>::Map(const MapConfig& map_config, const DataConfig<FldT, ColB, SemB>& data_config) :
octree_ptr_(new Octree<DataType, ResT, BlockSize>(std::ceil(map_config.dim.maxCoeff() / map_config.res))),
resolution_(map_config.res),
dimension_(Eigen::Vector3f::Constant(octree_ptr_->getSize() * resolution_)),
T_MW_(map_config.T_MW),
T_WM_(se::math::to_inverse_transformation(T_MW_)),
lb_M_(Eigen::Vector3f::Zero()),
ub_M_(dimension_),
data_config_(data_config)
{
const Eigen::Vector3f t_MW = se::math::to_translation(T_MW_);
if (t_MW.x() < 0 || t_MW.x() >= dimension_.x() || t_MW.y() < 0 || t_MW.y() >= dimension_.y() || t_MW.z() < 0 || t_MW.z() >= dimension_.z()) {
std::cout << "World origin is outside the map" << std::endl;
}
}config.map

config.data
octree_ptr_(new Octree<...>(std::ceil(...))) 在算什么?
这是 初始化列表 里给
octree_ptr_赋值:在堆上new一棵Octree,交给shared_ptr管理。
Octree里还会再处理一次(至少2×BlockSize,并向上取成 2 的幂),得到最终的立方体边长size_
创建针孔相机数学模型
构造 PinholeCamera 时具体干了什么?参考 GSFusion/src/sensor/pinhole_camera.cpp
se::PinholeCamera::PinholeCamera(const PinholeCameraConfig& c) : se::SensorBase<se::PinholeCamera>(c), model(c.width, c.height, c.fx, c.fy, c.cx, c.cy, _distortion), scaled_pixel(1 / c.fx)
{
left_hand_frame = (c.fx < 0.0f) ^ (c.fy < 0.0f);
computeFrustumVertices();
computeFrustumNormals();
// ... asserts ...
horizontal_fov = 2.0f * atanf(c.width / (2.0f * c.fx));
vertical_fov = 2.0f * atanf(c.height / (2.0f * c.fy));
}这段代码详细过程
偏差

std::vector<gs::Camera> gs_cam_list
坐标转换
Eigen::Matrix4f T_WB = Eigen::Matrix4f::Identity(); //< Body to world transformation
//这里先设成单位阵,相当于「还没读位姿」时的占位
Eigen::Matrix4f T_BS = sensor.T_BS; //< Sensor to body transformation
//传感器相对机体的外参,若没写,就用单位阵
Eigen::Matrix4f T_WS = T_WB * T_BS; //< Sensor to world transformation
//相机坐标系到世界坐标系的变换世界坐标系下的坐标 = M_ 世界坐标系下的机体 * M_机体坐标系下的相机 * 相机坐标系下的坐标
ZED + SLAM + FFS + GSFusion
command
python scripts/zed_ffs_record_tum.py --out_dir ~/ky/datasets/zed_runb --max_frames n
python scripts/zed_stereo_record_tum.py --out_dir ~/ky/datasets/zed_runb --max_frames n
python scripts/zed_stereo_record_tum.py --svo_record --svo_preview --max_frames 3000 --out_dir ~/ky/datasets/zed_runc
ORB_ZED_ROBUST_ORB=1 bash ~/ky/Fast-FoundationStereo/scripts/run_orb_slam3_zed_tum.sh ~/ky/datasets/zed_runc
GSFUSION_POSE_MODE=orb \
bash ~/ky/Fast-FoundationStereo/scripts/run_gsfusion_on_tum.sh ~/ky/datasets/zed_runc
————————————————————————————————————————————
SKIP_FFS=1 bash /home/linux/ky/Fast-FoundationStereo/scripts/run_gaus_slam_zed.sh /home/linux/ky/datasets/zed_runc
ffs 接口
img0 = torch.as_tensor(left_np).cuda().float()[None].permute(0, 3, 1, 2)
img1 = torch.as_tensor(right_np).cuda().float()[None].permute(0, 3, 1, 2)
padder = InputPadder(img0.shape, divis_by=32, force_square=False)
img0_pad, img1_pad = padder.pad(img0, img1)NumPy → GPU,NHWC → NCHW
left_np/right_np:前面从 ZED 取图并转成 RGB 的数组,形状一般是(H, W, 3),dtype 多为uint8。.float():转成float32,和后面model.forward的输入一致。[None]:在维度 0 上扩一维,得到 batch,形状(1, H, W, 3)。.permute(0, 3, 1, 2):从 NHWC 换成 PyTorch 卷积常用的 NCHW:(1, 3, H, W)。
InputPadder 补到32的倍数
Foundation Stereo 里有多层下采样/上采样,通常要求 高、宽能被某个数整除(这里用
divis_by=32)。InputPadder(在core/utils/utils.py)会算出在高度、宽度上各补多少像素,使H、W变为 32 的倍数;force_square=False表示不强行成正方形,只按当前矩形补。
with torch.amp.autocast("cuda", enabled=True, dtype=AMP_DTYPE):
if not args.hiera:
disp = model.forward(
img0_pad,
img1_pad,
iters=args.valid_iters,
test_mode=True,
optimize_build_volume="pytorch1",
)
else:
disp = model.run_hierachical(
img0_pad,
img1_pad,
iters=args.valid_iters,
test_mode=True,
small_ratio=0.5,
)在 CUDA 上开启 自动混合精度(AMP):适合在 FP16 里算的算子会用半精度,减少显存、加快推理
分支一:not args.hiera
img0_pad/img1_pad:左图、右图(已 padding),模型内部会做归一化等预处理test_mode=True:在foundation_stereo.py里表示推理模式:只在最后一次迭代上做上采样并返回最终视差,中间结果不全部保留,省算力。optimize_build_volume="pytorch1":构造代价体(cost volume)时用 纯 PyTorch 优化实现
分支二:args.hiera
由粗到精策略——先把图像按
small_ratio=0.5缩小,在全图上跑一次forward得到粗视差;上采样到原尺寸后作为init_disp,再在原分辨率上再跑一次forward细化。典型用途:视差范围大或希望 coarse 阶段先稳住匹配时更有利;代价是 推理要跑两遍(粗 + 细),延迟一般会更高。
disp = padder.unpad(disp.float())
disp_np = disp.detach().cpu().numpy().reshape(H, W)disp.float()前面在
autocast里推理时,很多算子是半精度;.float()转成 FP32。
padder.unpad(...)推理前用
InputPadder按divis_by=32对左右图做了 replicate padding,使H、W能被 32 整除。unpad会按记录下来的self._pad把多出来的边裁掉,空间尺寸回到 原始left_np的 H×W
detach()从计算图里拆开,不需要反传,节省图、避免误用梯度。
reshape(H, W)
把
(1,1,H,W)摊成(H, W)
depth_m = np.zeros_like(disp_save, dtype=np.float32)
valid = np.isfinite(disp_save) & (disp_save > 0)
depth_m[valid] = K_scaled[0, 0] * baseline / disp_save[valid]
valid &= np.isfinite(depth_m) & (depth_m > 0) & (depth_m <= args.zfar)先按视差图同样大小建一张深度图,初值全 0;只有后面被判为
valid的像素才会被写入真实深度valid
np.isfinite(disp_save):排除inf、nandisp_save > 0:视差必须为正。在本脚本约定下,深度公式是
Z = f × baseline / d,需要d > 0才能得到有限、物理合理的正值深度;d ≤ 0没有采用或表示匹配无效。
depth
K_scaled[0, 0]
左相机在当前分辨率/缩放下的
fx(像素单位)。
baseline:左右光心间距(米),来自 ZED 标定disp_save[valid]:视差(像素),与fx、图像尺度一致。depth_m <= args.zfar:去掉过远的点
cv2.imwrite(str(out_dir / rgb_file), cv2.cvtColor(left_np, cv2.COLOR_RGB2BGR))
cv2.imwrite(str(out_dir / dep_file), depth_u16)
frgb.write(f"{ts:.9f} {rgb_file}\n")
fdep.write(f"{ts:.9f} {dep_file}\n")
把左目 RGB、FFS 深度各写一张 PNG,并在
rgb.txt/depth.txt里各记一行「时间戳 + 相对路径」,形成标准 TUM 关联列表
SLAM 接口
write_associations
def write_associations(dataset: Path, out_name: str = "associations.txt") -> Path:
rgb_rows = read_tum_index(dataset / "rgb.txt")
dep_rows = read_tum_index(dataset / "depth.txt")
if len(rgb_rows) != len(dep_rows):
raise SystemExit(
f"rgb.txt ({len(rgb_rows)} 行) 与 depth.txt ({len(dep_rows)} 行) 行数不一致,请先对齐序列。"
)
out = dataset / out_name
lines: list[str] = []
for (tr, sr), (td, dd) in zip(rgb_rows, dep_rows):
# rgbd_tum LoadImages: t_rgb path_rgb t_depth path_depth
lines.append(f"{tr:.9f} {sr} {td:.9f} {dd}\n")
out.write_text("".join(lines), encoding="utf-8")
return outlines: list[str] = []
lines:变量名。: list[str]:类型注解(PEP 585 写法),表示「字符串的列表」。= []:初始化为空列表。
for (tr, sr), (td, dd) in zip(rgb_rows, dep_rows):
这是嵌套元组拆包(nested unpacking):
左边两个括号对应 zip 出来的两个二元组,再各自拆成两个变量
str.join(iterable):把lines里所有字符串首尾相接。此方法在 main函数里被调用一次
写 orb_slam3_rgbd.yaml
def write_orb_yaml(
dataset: Path,
meta_path: Path | None,
out_name: str = "orb_slam3_rgbd.yaml",
*,
orb_n_features: int = ORB_DEFAULT_N_FEATURES,
orb_ini_th_fast: int = ORB_DEFAULT_INI_TH_FAST,
orb_min_th_fast: int = ORB_DEFAULT_MIN_TH_FAST,
) -> Path:
mp = meta_path or (dataset / "meta.json")
if not mp.is_file():
raise SystemExit(f"缺少 {mp},无法生成 ORB 标定 yaml(请先完整录制一集)。")
meta = json.loads(mp.read_text(encoding="utf-8"))
K = meta["K_scaled"]
fx, fy = float(K[0][0]), float(K[1][1])
cx, cy = float(K[0][2]), float(K[1][2])
w, h = int(meta["width"]), int(meta["height"])
depth_scale = float(meta.get("tum_depth_scale", 5000.0))
baseline = float(meta.get("baseline_m", 0.12))
# ORB-SLAM3 Settings.cc 要求 Camera.fps 为整数(readParameter<int>)
fps = int(round(float(meta.get("fps", 30))))
# 左目 retrieve 一般为矫正后图像,畸变近似 0;若你改用未矫正流,需填 ZED 标定畸变系数。
text = f"""%YAML:1.0先读 meta.json(由ffs创建),从内参矩阵取针孔参数,读深度比例与基线,拼进
orb_slam3_rgbd.yaml
脚本 Fast-FoundationStereo/scripts/run_orb_slam3_zed_tum.sh
python3 "${SCRIPT_DIR}/orb_slam3_zed_tum.py" "${DATASET}" "$@"
cd "${DATASET}"
# rgbd_tum 将轨迹写到当前工作目录
exec "${RGBD_TUM}" "${VOCAB}" "${DATASET}/orb_slam3_rgbd.yaml" "${DATASET}" "${DATASET}/associations.txt"在跑 SLAM 之前(或并行准备文件),根据该数据集生成/更新
associations.txt和orb_slam3_rgbd.yaml把当前 shell 的工作目录切到数据集根目录
用
rgbd_tum可执行文件替换当前 shell 进程(ORB_SLAM3/Examples/RGB-D/rgbd_tum.cc)

















