以下是一段python 代码,可以用来将特定文件夹内的图片压缩成webp 格式

使用前请确保安装过osPIL

import os
from PIL import Image

# 手动指定参数和路径
INPUT_FOLDER = "."  # 图片所在的文件夹路径,当前目录用"."表示
QUALITY_LEVEL = 80  # WebP 压缩质量 (1-100, 数值越高质量越好)


def compress_to_webp(input_folder, quality=80):
    """
    Compresses all images in the input folder to WebP format and removes the original files.
    
    Args:
        input_folder (str): Path to the folder containing images to compress
        quality (int): Quality level for WebP compression (1-100, higher is better quality)
    """
    # Supported image formats
    supported_formats = ('.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif', '.gif')
    
    # Get all image files in the folder
    image_files = []
    for filename in os.listdir(input_folder):
        if filename.lower().endswith(supported_formats):
            image_files.append(filename)
    
    print(f"Found {len(image_files)} images to convert...")
    
    for filename in image_files:
        input_path = os.path.join(input_folder, filename)
        
        # Create output path with .webp extension
        base_name = os.path.splitext(filename)[0]
        output_path = os.path.join(input_folder, f"{base_name}.webp")
        
        try:
            # Open and compress the image
            with Image.open(input_path) as img:
                # Convert RGBA to RGB if necessary (for compatibility)
                if img.mode in ("RGBA", "P"):
                    img = img.convert("RGB")
                
                # Save as WebP with specified quality
                img.save(output_path, "WEBP", quality=quality, optimize=True)
                
                # Remove the original file after successful conversion
                os.remove(input_path)
                
                print(f"Compressed and converted: {filename} -> {base_name}.webp")
        
        except Exception as e:
            print(f"Error processing {filename}: {str(e)}")
    
    print("Compression completed!")


def main():
    # 检查指定的文件夹是否存在
    if not os.path.isdir(INPUT_FOLDER):
        print(f"Error: {INPUT_FOLDER} is not a valid directory")
        return
    
    compress_to_webp(INPUT_FOLDER, QUALITY_LEVEL)


if __name__ == "__main__":
    main()

多线程版本

import os
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from PIL import Image
import multiprocessing

# 手动指定参数和路径
INPUT_FOLDER = "."  # 图片所在的文件夹路径,当前目录用"."表示
QUALITY_LEVEL = 80  # WebP 压缩质量 (1-100, 数值越高质量越好)
AUTO_MAX_WORKERS = True  # 是否自动设置线程数
MANUAL_MAX_WORKERS = 4   # 手动设置的线程数(当 AUTO_MAX_WORKERS 为 False 时使用)


def get_optimal_thread_count():
    """
    获取最优线程数
    返回逻辑 CPU 核心数,但不超过 8 个(避免过多线程开销)
    """
    cpu_count = multiprocessing.cpu_count()
    # 通常使用 CPU 核心数,但不超过 8 个线程(对于 I/O 密集型任务)
    optimal_count = min(cpu_count, 8)
    return optimal_count


def process_single_image(input_path, output_path, quality=80):
    """
    处理单个图片文件
    
    Args:
        input_path (str): 输入图片路径
        output_path (str): 输出 WebP 路径
        quality (int): WebP 压缩质量
    """
    try:
        # Open and compress the image
        with Image.open(input_path) as img:
            # Convert RGBA to RGB if necessary (for compatibility)
            if img.mode in ("RGBA", "P"):
                img = img.convert("RGB")
            
            # Save as WebP with specified quality
            img.save(output_path, "WEBP", quality=quality, optimize=True)
            
            # Remove the original file after successful conversion
            os.remove(input_path)
            
            print(f"Compressed and converted: {os.path.basename(input_path)} -> {os.path.basename(output_path)}")
            return True
            
    except Exception as e:
        print(f"Error processing {os.path.basename(input_path)}: {str(e)}")
        return False


def compress_to_webp(input_folder, quality=80, max_workers=None):
    """
    Compresses all images in the input folder to WebP format using multiple threads
    and removes the original files.
    
    Args:
        input_folder (str): Path to the folder containing images to compress
        quality (int): Quality level for WebP compression (1-100, higher is better quality)
        max_workers (int or None): Maximum number of worker threads (None for auto)
    """
    # 自动计算线程数
    if max_workers is None:
        max_workers = get_optimal_thread_count()
    
    print(f"Using {max_workers} threads for processing...")
    
    # Supported image formats
    supported_formats = ('.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif', '.gif')
    
    # Get all image files in the folder
    image_files = []
    for filename in os.listdir(input_folder):
        if filename.lower().endswith(supported_formats):
            image_files.append(filename)
    
    print(f"Found {len(image_files)} images to convert...")
    
    if not image_files:
        print("No images found to process.")
        return
    
    # Prepare tasks
    tasks = []
    for filename in image_files:
        input_path = os.path.join(input_folder, filename)
        
        # Create output path with .webp extension
        base_name = os.path.splitext(filename)[0]
        output_path = os.path.join(input_folder, f"{base_name}.webp")
        
        tasks.append((input_path, output_path, quality))
    
    # Process images in parallel
    successful_count = 0
    failed_count = 0
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        future_to_task = {executor.submit(process_single_image, task[0], task[1], task[2]): task 
                         for task in tasks}
        
        # Collect results as they complete
        for future in as_completed(future_to_task):
            success = future.result()
            if success:
                successful_count += 1
            else:
                failed_count += 1
    
    print(f"Compression completed! Total: {len(tasks)}, Successful: {successful_count}, Failed: {failed_count}")


def main():
    # 检查指定的文件夹是否存在
    if not os.path.isdir(INPUT_FOLDER):
        print(f"Error: {INPUT_FOLDER} is not a valid directory")
        return
    
    # 确定使用的线程数
    if AUTO_MAX_WORKERS:
        thread_count = get_optimal_thread_count()
        print(f"Auto-detecting optimal threads: {thread_count}")
    else:
        thread_count = MANUAL_MAX_WORKERS
        print(f"Using manual thread count: {thread_count}")
    
    compress_to_webp(INPUT_FOLDER, QUALITY_LEVEL, thread_count)


if __name__ == "__main__":
    main()