遍历 ROOT 目录,导出带媒体属性(时长/分辨率)的清单到 CSV(UTF-8)。
import os, sys, csv, json, subprocess, shutil, time
from pathlib import Path
from datetime import datetime
# ======= 配置 =======
ROOT = r"D:\test"
OUT_CSV = r"D:\list.csv"
INCLUDE_DIRS = False # True 也会把文件夹写入(无媒体属性)
# ====================
# --- 自动安装依赖 ---
def ensure(pkg, import_name=None):
name = import_name or pkg
try:
__import__(name)
except ImportError:
print(f"[+] 正在安装 {pkg} ...")
subprocess.check_call([sys.executable, "-m", "pip", "install", "-U", pkg])
ensure("Pillow", "PIL")
ensure("mutagen")
ensure("opencv-python", "cv2")
from PIL import Image
from mutagen import File as MutagenFile
import cv2
# --- 媒体类型与扩展名 ---
IMAGE_EXT = {".jpg", ".jpeg", ".png", ".webp", ".bmp", ".tif", ".tiff", ".gif"}
AUDIO_EXT = {".mp3", ".m4a", ".aac", ".flac", ".wav", ".ogg", ".opus", ".wma", ".aiff", ".ape"}
VIDEO_EXT = {
".mp4", ".mkv", ".mov", ".avi", ".wmv", ".flv", ".webm", ".m4v", ".ts", ".mts", ".m2ts", ".3gp"
}
# --- 工具:ffprobe 是否可用 ---
def has_ffprobe():
exe = shutil.which("ffprobe")
if not exe:
return False
try:
subprocess.run([exe, "-version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
return True
except Exception:
return False
FFPROBE_OK = has_ffprobe()
def probe_video_ffprobe(path: str):
"""用 ffprobe 读取视频分辨率与时长(秒)。返回 (width, height, duration_sec) 或 (None, None, None)"""
try:
cmd = [
"ffprobe",
"-v", "error",
"-select_streams", "v:0",
"-show_entries", "stream=width,height,duration,avg_frame_rate",
"-of", "json",
path,
]
p = subprocess.run(cmd, capture_output=True, text=True, check=True)
data = json.loads(p.stdout or "{}")
streams = data.get("streams") or []
if not streams:
return (None, None, None)
s = streams[0]
w = s.get("width")
h = s.get("height")
# 有些容器里时长在 format,简单尝试 stream.duration,不行就再探 format
dur = s.get("duration")
if dur is None:
cmd2 = ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=nk=1:nw=1", path]
p2 = subprocess.run(cmd2, capture_output=True, text=True, check=True)
try:
dur = float(p2.stdout.strip())
except Exception:
dur = None
try:
dur = float(dur) if dur is not None else None
except Exception:
dur = None
return (int(w) if w else None, int(h) if h else None, dur)
except Exception:
return (None, None, None)
def probe_video_opencv(path: str):
"""用 OpenCV 读取分辨率与时长(估算)。返回 (width, height, duration_sec)"""
cap = cv2.VideoCapture(path)
if not cap.isOpened():
return (None, None, None)
try:
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or None
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or None
fps = cap.get(cv2.CAP_PROP_FPS)
frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
dur = None
if fps and fps > 0 and frames and frames > 0:
dur = float(frames) / float(fps)
return (w, h, dur)
finally:
cap.release()
def get_video_meta(path: str):
if FFPROBE_OK:
w, h, dur = probe_video_ffprobe(path)
if (w and h) or (dur is not None):
return w, h, dur
# 回退
return probe_video_opencv(path)
def get_audio_duration(path: str):
try:
m = MutagenFile(path)
if m and getattr(m, "info", None):
dur = getattr(m.info, "length", None)
return float(dur) if dur else None
except Exception:
pass
return None
def get_image_size(path: str):
try:
with Image.open(path) as im:
return im.width, im.height
except Exception:
return None, None
def fmt_iso(ts: float):
try:
return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S")
except Exception:
return ""
def classify(ext: str):
ext = ext.lower()
if ext in IMAGE_EXT:
return "image"
if ext in AUDIO_EXT:
return "audio"
if ext in VIDEO_EXT:
return "video"
return "other"
def walk_files(root: str):
for dirpath, dirnames, filenames in os.walk(root):
for fn in filenames:
yield os.path.join(dirpath, fn)
if INCLUDE_DIRS:
for d in dirnames:
yield os.path.join(dirpath, d)
def main():
root = str(Path(ROOT))
out = str(Path(OUT_CSV))
Path(Path(out).parent).mkdir(parents=True, exist_ok=True)
cols = ["path", "type", "width", "height", "resolution", "duration_sec", "size_bytes", "mtime"]
count = 0
with open(out, "w", newline="", encoding="utf-8") as f:
w = csv.writer(f)
w.writerow(cols)
for p in walk_files(root):
try:
P = Path(p)
st = P.stat()
if P.is_dir():
row = [p, "dir", "", "", "", "", st.st_size, fmt_iso(st.st_mtime)]
w.writerow(row)
continue
ext = P.suffix
kind = classify(ext)
width = height = ""
duration = ""
if kind == "image":
w0, h0 = get_image_size(p)
if w0 and h0:
width, height = str(w0), str(h0)
elif kind == "audio":
dur = get_audio_duration(p)
if dur is not None:
duration = f"{dur:.3f}"
elif kind == "video":
wv, hv, dv = get_video_meta(p)
if wv: width = str(wv)
if hv: height = str(hv)
if dv is not None:
duration = f"{dv:.3f}"
res = f"{width}x{height}" if width and height else ""
row = [p, kind, width, height, res, duration, st.st_size, fmt_iso(st.st_mtime)]
w.writerow(row)
count += 1
if count % 500 == 0:
print(f"[+] 已写入 {count} 条...")
except Exception as e:
# 出错也写一行便于排查
row = [p, "error", "", "", "", "", "", f"ERROR: {e}"]
w.writerow(row)
print(f"[OK] 导出完成 -> {out}")
if __name__ == "__main__":
main()
❤️ 转载文章请注明出处,谢谢!❤️