解压多模态音频文本的数据到指定文件夹。
查看数据格式
import sys
import subprocess
# 自动安装 pandas(如未安装)
def ensure_package(pkg):
try:
__import__(pkg)
except ImportError:
print(f"[+] 正在自动安装 {pkg} ...")
subprocess.check_call([sys.executable, "-m", "pip", "install", pkg])
ensure_package("pandas")
ensure_package("pyarrow") # pandas 读 parquet 需要 pyarrow
import pandas as pd
df = pd.read_parquet(r"f:\dataset\train-00001-of-00003.parquet")
print("字段名:", df.columns)
print("前5行:")
print(df.head())
#print(df['audio'].iloc[0])
print(df['sentence'].iloc[0])
输出内容
字段名: Index(['file_id', 'sentence', 'audio', '__index_level_0__'], dtype='object')
前5行:
file_id ... __index_level_0__
0 bur_7712_8367089238 ... 465
1 bur_7447_9491098611 ... 1447
2 bur_4632_0098345295 ... 2501
3 bur_9762_5479387529 ... 1907
4 bur_5189_0472171856 ... 1104
[5 rows x 4 columns]
တစ်ခါတည်း တိုင်လုံးကျော် ကို ဖြဲလိုက် ဟောက်လိုက် ဟိန်းလိုက်ကြတာမှ
指定格式数据解压
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
批量从 Parquet 中提取音频和对应文本,并按照原始文件路径结构存放。
支持指定输入文件和输出目录;如果不传参,将使用脚本中默认路径。
"""
import subprocess
import sys
import os
import io
import argparse
# 默认路径,可根据需要修改
DEFAULT_PARQUET = r"f:\dataset\train-00001-of-00003.parquet"
DEFAULT_OUTPUT = r"F:\dataset"
# 安装依赖函数
def install_if_missing(pkg):
try:
__import__(pkg)
except ImportError:
print(f"[+] Installing {pkg}…")
subprocess.check_call([sys.executable, "-m", "pip", "install", pkg])
# 安装依赖
for pkg in ("pyarrow", "pandas", "soundfile", "requests"):
install_if_missing(pkg)
# 导入必需库
import pyarrow.parquet as pq
import pandas as pd
import soundfile as sf
import requests
# 参数解析
parser = argparse.ArgumentParser(
description="Extract audio and text from Parquet, preserving original structure."
)
parser.add_argument(
"--parquet", default=DEFAULT_PARQUET,
help=f"Parquet 文件路径,默认: {DEFAULT_PARQUET}"
)
parser.add_argument(
"--output", default=DEFAULT_OUTPUT,
help=f"输出根目录,默认: {DEFAULT_OUTPUT}"
)
args = parser.parse_args()
parquet_path = args.parquet
out_base_dir = args.output
print(f"📂 使用 Parquet: {parquet_path}")
print(f"📂 输出目录: {out_base_dir}")
# 读取 Parquet
try:
df = pq.read_table(
parquet_path, columns=["file_id", "sentence", "audio"]
).to_pandas()
except Exception as e:
print(f"❌ 无法读取 Parquet: {e}")
sys.exit(1)
# 遍历并导出数据
for _, row in df.iterrows():
fid = row["file_id"]
sentence = row["sentence"]
audio = row["audio"]
# 确定原始子路径
sub_path = None
if isinstance(audio, dict) and audio.get("path"):
orig_path = audio["path"].replace("\\", "/")
sub_path = os.path.dirname(orig_path)
target_dir = os.path.join(out_base_dir, sub_path) if sub_path else out_base_dir
os.makedirs(target_dir, exist_ok=True)
# 导出 WAV
wav_path = os.path.join(target_dir, f"{fid}.wav")
if isinstance(audio, dict) and audio.get("bytes") is not None:
with open(wav_path, "wb") as f:
f.write(audio["bytes"])
elif isinstance(audio, dict) and audio.get("array") is not None:
sf.write(wav_path, audio["array"], audio["sampling_rate"])
elif isinstance(audio, dict) and audio.get("path"):
path = audio["path"]
if path.startswith("http"):
data = requests.get(path).content
else:
with open(path, "rb") as rf:
data = rf.read()
with open(wav_path, "wb") as f:
f.write(data)
else:
print(f"⚠️ Unsupported audio format for {fid}")
# 导出文本
txt_path = os.path.join(target_dir, f"{fid}.txt")
with open(txt_path, "w", encoding="utf-8") as f:
f.write(sentence)
print("✅ 全部文件已按原始目录结构导出到:", out_base_dir)
通用数据集解压
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import subprocess
import os
def ensure_package(pkg):
try:
__import__(pkg)
except ImportError:
print(f"[+] Installing {pkg}...")
subprocess.check_call([sys.executable, "-m", "pip", "install", pkg])
for pkg in ("pyarrow", "pandas"):
ensure_package(pkg)
import pyarrow.parquet as pq
import pandas as pd
DEFAULT_PARQUET = r"f:\dataset\train-00001-of-00003.parquet"
DEFAULT_OUTPUT = r"F:\dataset"
ID_COL = "file_id" # 你自己的主键
def get_ext_from_path(path):
path = str(path)
if "." in path:
return "." + path.split(".")[-1].lower()
return ""
def smart_text_ext(val):
if not isinstance(val, str):
return ".txt"
s = val.strip()
if s.startswith("{") or s.startswith("["):
return ".json"
if s.lower().startswith("<html"):
return ".html"
if s.lower().startswith("<?xml") or (s.startswith("<") and s.endswith(">")):
return ".xml"
if s.startswith("WEBVTT"):
return ".vtt"
if s.startswith("[Script Info]"):
return ".ass"
if s.count("\n") > 1 and all(x in s for x in ["0:", "1:", "2:"]):
return ".srt"
if s.startswith("---") or "# " in s:
return ".md"
if "," in s and "\n" in s and not s.startswith("{"):
return ".csv"
if s.lower().startswith("%pdf"):
return ".pdf"
return ".txt"
parquet_path = DEFAULT_PARQUET
out_dir = DEFAULT_OUTPUT
os.makedirs(out_dir, exist_ok=True)
table = pq.read_table(parquet_path)
df = table.to_pandas()
print("==== 字段结构 ====")
print(table.schema)
print("==== 示例 ====")
print(df.head())
fields = df.columns.tolist()
for idx, row in df.iterrows():
sid = str(row[ID_COL]) if ID_COL in row and not pd.isnull(row[ID_COL]) else str(idx)
path_refs = {}
# 先导出所有二进制内容(图像/音频/视频等)
for fld in fields:
val = row[fld]
if isinstance(val, dict) and "bytes" in val and val["bytes"] and "path" in val and val["path"]:
path_val = val["path"].replace("\\", "/")
path_refs[fld] = path_val # 用于后续文本配对
file_path = os.path.join(out_dir, path_val)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "wb") as f:
f.write(val["bytes"])
# 再导出所有文本/结构化内容(txt/json/csv/html等)
for fld in fields:
val = row[fld]
if isinstance(val, dict): continue
# 是否和已有二进制字段同名(配对,保持同目录同名)
pair_found = False
for k, v in path_refs.items():
if fld.startswith(k) or k.startswith(fld) or fld in k or k in fld:
base, _ = os.path.splitext(v)
ext = smart_text_ext(val)
file_path = os.path.join(out_dir, base + ext)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "w", encoding="utf-8") as f:
f.write(str(val))
pair_found = True
break
if not pair_found:
# 其它文本/数字字段单独存
if isinstance(val, str):
ext = smart_text_ext(val)
fld_dir = os.path.join(out_dir, fld)
os.makedirs(fld_dir, exist_ok=True)
file_path = os.path.join(fld_dir, f"{sid}{ext}")
with open(file_path, "w", encoding="utf-8") as f:
f.write(val)
elif isinstance(val, (int, float, bool)):
fld_dir = os.path.join(out_dir, fld)
os.makedirs(fld_dir, exist_ok=True)
file_path = os.path.join(fld_dir, f"{sid}.txt")
with open(file_path, "w", encoding="utf-8") as f:
f.write(str(val))
elif isinstance(val, (bytes, bytearray)):
fld_dir = os.path.join(out_dir, fld)
os.makedirs(fld_dir, exist_ok=True)
file_path = os.path.join(fld_dir, f"{sid}.bin")
with open(file_path, "wb") as f:
f.write(val)
if idx % 100 == 0:
print(f"已处理 {idx} 条")
print(f"✅ 所有内容已批量还原导出到:{out_dir}")