多模态数据

解压多模态音频文本的数据到指定文件夹。

查看数据格式

import sys
import subprocess

# 自动安装 pandas(如未安装)
def ensure_package(pkg):
    try:
        __import__(pkg)
    except ImportError:
        print(f"[+] 正在自动安装 {pkg} ...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", pkg])

ensure_package("pandas")
ensure_package("pyarrow")  # pandas 读 parquet 需要 pyarrow

import pandas as pd
df = pd.read_parquet(r"f:\dataset\train-00001-of-00003.parquet")
print("字段名:", df.columns)
print("前5行:")
print(df.head())
#print(df['audio'].iloc[0])
print(df['sentence'].iloc[0])

输出内容

字段名: Index(['file_id', 'sentence', 'audio', '__index_level_0__'], dtype='object')
前5行:
               file_id  ... __index_level_0__
0  bur_7712_8367089238  ...               465
1  bur_7447_9491098611  ...              1447
2  bur_4632_0098345295  ...              2501
3  bur_9762_5479387529  ...              1907
4  bur_5189_0472171856  ...              1104

[5 rows x 4 columns]
တစ်ခါတည်း တိုင်လုံးကျော် ကို ဖြဲလိုက် ဟောက်လိုက် ဟိန်းလိုက်ကြတာမှ

指定格式数据解压

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
批量从 Parquet 中提取音频和对应文本,并按照原始文件路径结构存放。
支持指定输入文件和输出目录;如果不传参,将使用脚本中默认路径。
"""
import subprocess
import sys
import os
import io
import argparse

# 默认路径,可根据需要修改
DEFAULT_PARQUET = r"f:\dataset\train-00001-of-00003.parquet"
DEFAULT_OUTPUT  = r"F:\dataset"

# 安装依赖函数
def install_if_missing(pkg):
    try:
        __import__(pkg)
    except ImportError:
        print(f"[+] Installing {pkg}…")
        subprocess.check_call([sys.executable, "-m", "pip", "install", pkg])

# 安装依赖
for pkg in ("pyarrow", "pandas", "soundfile", "requests"):
    install_if_missing(pkg)

# 导入必需库
import pyarrow.parquet as pq
import pandas as pd
import soundfile as sf
import requests

# 参数解析
parser = argparse.ArgumentParser(
    description="Extract audio and text from Parquet, preserving original structure."
)
parser.add_argument(
    "--parquet", default=DEFAULT_PARQUET,
    help=f"Parquet 文件路径,默认: {DEFAULT_PARQUET}"
)
parser.add_argument(
    "--output", default=DEFAULT_OUTPUT,
    help=f"输出根目录,默认: {DEFAULT_OUTPUT}"
)
args = parser.parse_args()

parquet_path = args.parquet
out_base_dir = args.output

print(f"📂 使用 Parquet: {parquet_path}")
print(f"📂 输出目录: {out_base_dir}")

# 读取 Parquet
try:
    df = pq.read_table(
        parquet_path, columns=["file_id", "sentence", "audio"]
    ).to_pandas()
except Exception as e:
    print(f"❌ 无法读取 Parquet: {e}")
    sys.exit(1)

# 遍历并导出数据
for _, row in df.iterrows():
    fid      = row["file_id"]
    sentence = row["sentence"]
    audio    = row["audio"]

    # 确定原始子路径
    sub_path = None
    if isinstance(audio, dict) and audio.get("path"):
        orig_path = audio["path"].replace("\\", "/")
        sub_path = os.path.dirname(orig_path)
    target_dir = os.path.join(out_base_dir, sub_path) if sub_path else out_base_dir
    os.makedirs(target_dir, exist_ok=True)

    # 导出 WAV
    wav_path = os.path.join(target_dir, f"{fid}.wav")
    if isinstance(audio, dict) and audio.get("bytes") is not None:
        with open(wav_path, "wb") as f:
            f.write(audio["bytes"])
    elif isinstance(audio, dict) and audio.get("array") is not None:
        sf.write(wav_path, audio["array"], audio["sampling_rate"])
    elif isinstance(audio, dict) and audio.get("path"):
        path = audio["path"]
        if path.startswith("http"):
            data = requests.get(path).content
        else:
            with open(path, "rb") as rf:
                data = rf.read()
        with open(wav_path, "wb") as f:
            f.write(data)
    else:
        print(f"⚠️ Unsupported audio format for {fid}")

    # 导出文本
    txt_path = os.path.join(target_dir, f"{fid}.txt")
    with open(txt_path, "w", encoding="utf-8") as f:
        f.write(sentence)

print("✅ 全部文件已按原始目录结构导出到:", out_base_dir)

通用数据集解压

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
import subprocess
import os

def ensure_package(pkg):
    try:
        __import__(pkg)
    except ImportError:
        print(f"[+] Installing {pkg}...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", pkg])

for pkg in ("pyarrow", "pandas"):
    ensure_package(pkg)

import pyarrow.parquet as pq
import pandas as pd

DEFAULT_PARQUET = r"f:\dataset\train-00001-of-00003.parquet"
DEFAULT_OUTPUT  = r"F:\dataset"
ID_COL = "file_id"  # 你自己的主键

def get_ext_from_path(path):
    path = str(path)
    if "." in path:
        return "." + path.split(".")[-1].lower()
    return ""

def smart_text_ext(val):
    if not isinstance(val, str):
        return ".txt"
    s = val.strip()
    if s.startswith("{") or s.startswith("["):
        return ".json"
    if s.lower().startswith("<html"):
        return ".html"
    if s.lower().startswith("<?xml") or (s.startswith("<") and s.endswith(">")):
        return ".xml"
    if s.startswith("WEBVTT"):
        return ".vtt"
    if s.startswith("[Script Info]"):
        return ".ass"
    if s.count("\n") > 1 and all(x in s for x in ["0:", "1:", "2:"]):
        return ".srt"
    if s.startswith("---") or "# " in s:
        return ".md"
    if "," in s and "\n" in s and not s.startswith("{"):
        return ".csv"
    if s.lower().startswith("%pdf"):
        return ".pdf"
    return ".txt"

parquet_path = DEFAULT_PARQUET
out_dir = DEFAULT_OUTPUT
os.makedirs(out_dir, exist_ok=True)

table = pq.read_table(parquet_path)
df = table.to_pandas()
print("==== 字段结构 ====")
print(table.schema)
print("==== 示例 ====")
print(df.head())

fields = df.columns.tolist()

for idx, row in df.iterrows():
    sid = str(row[ID_COL]) if ID_COL in row and not pd.isnull(row[ID_COL]) else str(idx)
    path_refs = {}
    # 先导出所有二进制内容(图像/音频/视频等)
    for fld in fields:
        val = row[fld]
        if isinstance(val, dict) and "bytes" in val and val["bytes"] and "path" in val and val["path"]:
            path_val = val["path"].replace("\\", "/")
            path_refs[fld] = path_val  # 用于后续文本配对
            file_path = os.path.join(out_dir, path_val)
            os.makedirs(os.path.dirname(file_path), exist_ok=True)
            with open(file_path, "wb") as f:
                f.write(val["bytes"])
    # 再导出所有文本/结构化内容(txt/json/csv/html等)
    for fld in fields:
        val = row[fld]
        if isinstance(val, dict): continue
        # 是否和已有二进制字段同名(配对,保持同目录同名)
        pair_found = False
        for k, v in path_refs.items():
            if fld.startswith(k) or k.startswith(fld) or fld in k or k in fld:
                base, _ = os.path.splitext(v)
                ext = smart_text_ext(val)
                file_path = os.path.join(out_dir, base + ext)
                os.makedirs(os.path.dirname(file_path), exist_ok=True)
                with open(file_path, "w", encoding="utf-8") as f:
                    f.write(str(val))
                pair_found = True
                break
        if not pair_found:
            # 其它文本/数字字段单独存
            if isinstance(val, str):
                ext = smart_text_ext(val)
                fld_dir = os.path.join(out_dir, fld)
                os.makedirs(fld_dir, exist_ok=True)
                file_path = os.path.join(fld_dir, f"{sid}{ext}")
                with open(file_path, "w", encoding="utf-8") as f:
                    f.write(val)
            elif isinstance(val, (int, float, bool)):
                fld_dir = os.path.join(out_dir, fld)
                os.makedirs(fld_dir, exist_ok=True)
                file_path = os.path.join(fld_dir, f"{sid}.txt")
                with open(file_path, "w", encoding="utf-8") as f:
                    f.write(str(val))
            elif isinstance(val, (bytes, bytearray)):
                fld_dir = os.path.join(out_dir, fld)
                os.makedirs(fld_dir, exist_ok=True)
                file_path = os.path.join(fld_dir, f"{sid}.bin")
                with open(file_path, "wb") as f:
                    f.write(val)
    if idx % 100 == 0:
        print(f"已处理 {idx} 条")

print(f"✅ 所有内容已批量还原导出到:{out_dir}")