以下是一段python 代码,可以用来将特定文件夹内的图片压缩成webp 格式
使用前请确保安装过os和PIL包
import os
from PIL import Image
# 手动指定参数和路径
INPUT_FOLDER = "." # 图片所在的文件夹路径,当前目录用"."表示
QUALITY_LEVEL = 80 # WebP 压缩质量 (1-100, 数值越高质量越好)
def compress_to_webp(input_folder, quality=80):
"""
Compresses all images in the input folder to WebP format and removes the original files.
Args:
input_folder (str): Path to the folder containing images to compress
quality (int): Quality level for WebP compression (1-100, higher is better quality)
"""
# Supported image formats
supported_formats = ('.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif', '.gif')
# Get all image files in the folder
image_files = []
for filename in os.listdir(input_folder):
if filename.lower().endswith(supported_formats):
image_files.append(filename)
print(f"Found {len(image_files)} images to convert...")
for filename in image_files:
input_path = os.path.join(input_folder, filename)
# Create output path with .webp extension
base_name = os.path.splitext(filename)[0]
output_path = os.path.join(input_folder, f"{base_name}.webp")
try:
# Open and compress the image
with Image.open(input_path) as img:
# Convert RGBA to RGB if necessary (for compatibility)
if img.mode in ("RGBA", "P"):
img = img.convert("RGB")
# Save as WebP with specified quality
img.save(output_path, "WEBP", quality=quality, optimize=True)
# Remove the original file after successful conversion
os.remove(input_path)
print(f"Compressed and converted: {filename} -> {base_name}.webp")
except Exception as e:
print(f"Error processing {filename}: {str(e)}")
print("Compression completed!")
def main():
# 检查指定的文件夹是否存在
if not os.path.isdir(INPUT_FOLDER):
print(f"Error: {INPUT_FOLDER} is not a valid directory")
return
compress_to_webp(INPUT_FOLDER, QUALITY_LEVEL)
if __name__ == "__main__":
main()
多线程版本:
import os
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from PIL import Image
import multiprocessing
# 手动指定参数和路径
INPUT_FOLDER = "." # 图片所在的文件夹路径,当前目录用"."表示
QUALITY_LEVEL = 80 # WebP 压缩质量 (1-100, 数值越高质量越好)
AUTO_MAX_WORKERS = True # 是否自动设置线程数
MANUAL_MAX_WORKERS = 4 # 手动设置的线程数(当 AUTO_MAX_WORKERS 为 False 时使用)
def get_optimal_thread_count():
"""
获取最优线程数
返回逻辑 CPU 核心数,但不超过 8 个(避免过多线程开销)
"""
cpu_count = multiprocessing.cpu_count()
# 通常使用 CPU 核心数,但不超过 8 个线程(对于 I/O 密集型任务)
optimal_count = min(cpu_count, 8)
return optimal_count
def process_single_image(input_path, output_path, quality=80):
"""
处理单个图片文件
Args:
input_path (str): 输入图片路径
output_path (str): 输出 WebP 路径
quality (int): WebP 压缩质量
"""
try:
# Open and compress the image
with Image.open(input_path) as img:
# Convert RGBA to RGB if necessary (for compatibility)
if img.mode in ("RGBA", "P"):
img = img.convert("RGB")
# Save as WebP with specified quality
img.save(output_path, "WEBP", quality=quality, optimize=True)
# Remove the original file after successful conversion
os.remove(input_path)
print(f"Compressed and converted: {os.path.basename(input_path)} -> {os.path.basename(output_path)}")
return True
except Exception as e:
print(f"Error processing {os.path.basename(input_path)}: {str(e)}")
return False
def compress_to_webp(input_folder, quality=80, max_workers=None):
"""
Compresses all images in the input folder to WebP format using multiple threads
and removes the original files.
Args:
input_folder (str): Path to the folder containing images to compress
quality (int): Quality level for WebP compression (1-100, higher is better quality)
max_workers (int or None): Maximum number of worker threads (None for auto)
"""
# 自动计算线程数
if max_workers is None:
max_workers = get_optimal_thread_count()
print(f"Using {max_workers} threads for processing...")
# Supported image formats
supported_formats = ('.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif', '.gif')
# Get all image files in the folder
image_files = []
for filename in os.listdir(input_folder):
if filename.lower().endswith(supported_formats):
image_files.append(filename)
print(f"Found {len(image_files)} images to convert...")
if not image_files:
print("No images found to process.")
return
# Prepare tasks
tasks = []
for filename in image_files:
input_path = os.path.join(input_folder, filename)
# Create output path with .webp extension
base_name = os.path.splitext(filename)[0]
output_path = os.path.join(input_folder, f"{base_name}.webp")
tasks.append((input_path, output_path, quality))
# Process images in parallel
successful_count = 0
failed_count = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_task = {executor.submit(process_single_image, task[0], task[1], task[2]): task
for task in tasks}
# Collect results as they complete
for future in as_completed(future_to_task):
success = future.result()
if success:
successful_count += 1
else:
failed_count += 1
print(f"Compression completed! Total: {len(tasks)}, Successful: {successful_count}, Failed: {failed_count}")
def main():
# 检查指定的文件夹是否存在
if not os.path.isdir(INPUT_FOLDER):
print(f"Error: {INPUT_FOLDER} is not a valid directory")
return
# 确定使用的线程数
if AUTO_MAX_WORKERS:
thread_count = get_optimal_thread_count()
print(f"Auto-detecting optimal threads: {thread_count}")
else:
thread_count = MANUAL_MAX_WORKERS
print(f"Using manual thread count: {thread_count}")
compress_to_webp(INPUT_FOLDER, QUALITY_LEVEL, thread_count)
if __name__ == "__main__":
main()
