Qwen-Image-2512-Pixel-Art-LoRA 批量生成与自动化脚本编写Python实战你是不是也遇到过这种情况想用Qwen-Image-2512-Pixel-Art-LoRA这个像素风模型做一套游戏角色图或者给几十个商品生成像素风格的宣传图结果发现一张一张手动操作不仅效率低还特别容易出错。光是复制粘贴提示词、点生成、保存图片一套流程下来半天时间就没了。我之前给一个独立游戏项目做美术资源需要生成上百张不同职业、不同装备的像素角色图就差点被这个重复劳动给逼疯。后来实在受不了就用Python写了个自动化脚本把整个流程串了起来。从那以后批量生成图片就变成了“一键启动坐等收图”的轻松活儿。今天我就把这个方法分享给你不管你是要生成几十张还是几百张图片都能用这个脚本大幅提升效率。咱们不搞那些虚的直接上干货从怎么读取批量提示词到怎么调用模型再到怎么自动保存图片一步步带你用Python把整个流程自动化。1. 准备工作环境与思路梳理在动手写代码之前咱们先把“家伙事儿”准备好再把整个流程想清楚。这样写起代码来才不会手忙脚乱。首先你得确保你的电脑上已经装好了Python。我建议用Python 3.8或以上的版本比较稳定。然后咱们需要通过pip安装几个必要的库。打开你的命令行终端Windows上是CMD或PowerShellMac或Linux上是Terminal输入下面这行命令pip install requests pandas简单解释一下这两个库是干嘛的requests这是Python里用来发送网络请求的“神器”咱们调用模型的API全靠它。pandas处理表格数据比如CSV、Excel特别方便咱们用来读取批量提示词文件。接下来你得确认你的Qwen-Image-2512-Pixel-Art-LoRA模型服务已经启动并且运行正常。通常这类模型会提供一个HTTP API接口比如http://localhost:8000/v1/images/generations这样的地址。你需要知道这个接口的完整URL以及它需要的参数格式。一般来说文档里会说明需要发送一个JSON数据里面包含prompt提示词、model模型名这些信息。最后咱们来梳理一下整个自动化脚本要干哪些事心里有个路线图读取任务从一个文件比如CSV或JSON里把一大堆提示词读进来。发送请求把这些提示词一个个或者一批批地发送给模型的API。处理结果拿到API返回的图片数据把它保存成电脑上的图片文件。记录日志谁成功生成了谁失败了失败的原因是什么都得记下来方便事后查看和重试。应对错误网络偶尔会抽风API也可能暂时忙不过来脚本得能处理这些意外比如等一会儿再重试。思路清晰了咱们就正式开始写代码。2. 从文件读取批量提示词批量生产的核心是“批量”第一步就是怎么高效地把一堆提示词交给程序。我最推荐用CSV文件因为它用Excel或者记事本都能轻松编辑特别直观。假设你有一个prompts.csv文件内容是这样的id,prompt,output_name 1,a pixel art warrior holding a sword,warrior_sword.png 2,a pixel art mage casting a fireball,mage_fireball.png 3,a pixel art rogue hiding in shadows,rogue_shadow.png 4,a pixel art healer with glowing staff,healer_staff.png第一列是序号第二列是具体的提示词第三列是你想给生成的图片起的名字。用pandas读取这个文件非常简单import pandas as pd def load_prompts_from_csv(file_path): 从CSV文件加载提示词和对应的输出文件名。 参数: file_path (str): CSV文件的路径。 返回: list: 一个列表里面每个元素都是一个字典包含id, prompt, output_name。 try: df pd.read_csv(file_path) # 确保必要的列存在 required_columns [prompt, output_name] for col in required_columns: if col not in df.columns: raise ValueError(fCSV文件中缺少必要的列: {col}) # 如果没有id列就自动生成一个 if id not in df.columns: df[id] range(1, len(df) 1) # 将DataFrame转换为字典列表方便后续处理 tasks df.to_dict(records) print(f成功从 {file_path} 加载了 {len(tasks)} 个任务。) return tasks except FileNotFoundError: print(f错误找不到文件 {file_path}) return [] except Exception as e: print(f读取CSV文件时发生错误: {e}) return [] # 使用示例 tasks load_prompts_from_csv(prompts.csv) for task in tasks[:2]: # 打印前两个任务看看 print(task)运行这段代码它会打印出类似这样的内容说明数据已经成功读进来了成功从 prompts.csv 加载了 4 个任务。 {id: 1, prompt: a pixel art warrior holding a sword, output_name: warrior_sword.png} {id: 2, prompt: a pixel art mage casting a fireball, output_name: mage_fireball.png}有的朋友可能更喜欢用JSON格式它的结构更灵活。比如一个prompts.json文件[ { id: 1, prompt: a pixel art warrior holding a sword, output_name: warrior_sword.png }, { id: 2, prompt: a pixel art mage casting a fireball, output_name: mage_fireball.png } ]读取JSON文件的代码也很简单import json def load_prompts_from_json(file_path): 从JSON文件加载提示词任务。 try: with open(file_path, r, encodingutf-8) as f: tasks json.load(f) print(f成功从 {file_path} 加载了 {len(tasks)} 个任务。) return tasks except FileNotFoundError: print(f错误找不到文件 {file_path}) return [] except json.JSONDecodeError: print(f错误{file_path} 不是有效的JSON格式) return [] # 使用示例 tasks load_prompts_from_json(prompts.json)两种方式都可以看你个人习惯。我个人觉得CSV在批量编辑时更方便一些。3. 核心功能调用API与保存图片数据准备好了接下来就是最核心的一步跟模型API“对话”拿到图片并存下来。这里我们会写一个函数专门处理单个生成任务。首先你需要知道你的模型API的具体地址和它期望的请求格式。我们假设API地址是http://localhost:8000/v1/images/generations它接受一个JSON请求里面至少要有prompt和model字段返回的JSON里有一个data字段里面是图片的Base64编码字符串或者直接是一个图片URL。我们先写一个处理单个任务的函数import requests import base64 import os from datetime import datetime def generate_and_save_image(api_url, prompt, output_path, model_nameQwen-Image-2512-Pixel-Art-LoRA, timeout30): 调用图像生成API并将生成的图片保存到本地。 参数: api_url (str): 模型API的完整地址。 prompt (str): 图像描述提示词。 output_path (str): 图片保存的完整路径包括文件名。 model_name (str): 要使用的模型名称。 timeout (int): 请求超时时间秒。 返回: tuple: (是否成功, 错误信息或成功信息) # 1. 准备请求数据 payload { prompt: prompt, model: model_name, # 你可能还需要其他参数如size, num_images等请根据你的API文档调整 size: 1024x1024, num_images: 1 } headers { Content-Type: application/json } try: # 2. 发送POST请求 print(f正在生成: {prompt[:50]}...) # 只打印前50个字符避免刷屏 response requests.post(api_url, jsonpayload, headersheaders, timeouttimeout) # 3. 检查响应状态 response.raise_for_status() # 如果状态码不是200会抛出HTTPError异常 result response.json() # 4. 解析响应获取图片数据 # 注意这里需要根据你的API实际返回结构进行调整 # 常见情况1返回的是Base64字符串 if data in result and len(result[data]) 0: image_data result[data][0].get(b64_json) # 假设字段名是b64_json if image_data: # 解码Base64并保存为图片 image_bytes base64.b64decode(image_data) with open(output_path, wb) as f: f.write(image_bytes) return True, f图片已保存至: {output_path} # 常见情况2返回的是图片URL image_url result[data][0].get(url) if image_url: # 从URL下载图片 img_response requests.get(image_url, timeouttimeout) img_response.raise_for_status() with open(output_path, wb) as f: f.write(img_response.content) return True, f图片已从URL下载并保存至: {output_path} # 如果以上都不是说明返回结构可能不符合预期 return False, fAPI返回了成功状态但无法解析图片数据。响应内容: {result} except requests.exceptions.Timeout: return False, 请求超时可能是网络问题或API响应过慢。 except requests.exceptions.HTTPError as e: return False, fHTTP错误: {e} except requests.exceptions.RequestException as e: return False, f网络请求异常: {e} except Exception as e: return False, f处理过程中发生未知错误: {e}这个函数干了这么几件事构造请求、发送请求、检查是否成功、解析返回的图片数据可能是Base64码也可能是个网址最后把图片数据写到本地文件里。里面加了详细的错误处理这样万一出问题我们能知道大概是什么原因。4. 效率提升使用并发处理如果任务列表里有几十上百个提示词一个个按顺序处理太慢了。这时候就该“并发”出场了。我们可以用Python的concurrent.futures模块里的ThreadPoolExecutor它允许我们同时发起多个网络请求等于是让多个任务“齐头并进”。这里有个重要的概念叫“并发数”也就是同时能进行多少个任务。这个数不是越大越好设得太大会把你自己的电脑或者模型服务器压垮。一般根据你的网络和服务器性能设置在5到20之间比较稳妥。下面我们写一个批量处理的函数它负责创建线程池把任务分配下去from concurrent.futures import ThreadPoolExecutor, as_completed import time def batch_generate_images(tasks, api_url, output_dirgenerated_images, max_workers5): 批量生成图片使用线程池提高效率。 参数: tasks (list): 任务列表每个任务是一个包含id, prompt, output_name的字典。 api_url (str): 模型API地址。 output_dir (str): 图片输出目录。 max_workers (int): 最大并发线程数。 返回: dict: 包含成功和失败任务统计的字典。 # 创建输出目录如果不存在 os.makedirs(output_dir, exist_okTrue) # 准备日志记录 log_file os.path.join(output_dir, fgeneration_log_{datetime.now().strftime(%Y%m%d_%H%M%S)}.txt) results { total: len(tasks), success: 0, failed: 0, failed_tasks: [] # 记录失败的任务详情方便重试 } print(f开始批量生成共 {len(tasks)} 个任务并发数: {max_workers}) print(f日志将保存至: {log_file}) # 定义一个内部函数用于在线程中执行单个任务 def process_single_task(task): task_id task[id] prompt task[prompt] output_name task.get(output_name, fimage_{task_id}.png) output_path os.path.join(output_dir, output_name) success, message generate_and_save_image(api_url, prompt, output_path) # 记录日志 log_entry f任务ID: {task_id} | 提示词: {prompt[:30]}... | 状态: {成功 if success else 失败} | 信息: {message}\n with open(log_file, a, encodingutf-8) as f: f.write(log_entry) return task_id, success, message # 使用线程池并发执行 start_time time.time() with ThreadPoolExecutor(max_workersmax_workers) as executor: # 提交所有任务到线程池 future_to_task {executor.submit(process_single_task, task): task for task in tasks} # 遍历已完成的任务 for future in as_completed(future_to_task): task future_to_task[future] try: task_id, success, message future.result() if success: results[success] 1 print(f✓ 任务 {task_id} 完成) else: results[failed] 1 results[failed_tasks].append(task) print(f✗ 任务 {task_id} 失败: {message}) except Exception as e: results[failed] 1 results[failed_tasks].append(task) print(f✗ 任务 {task[id]} 执行过程中异常: {e}) end_time time.time() # 打印最终统计 print(f\n{*50}) print(f批量生成完成) print(f总耗时: {end_time - start_time:.2f} 秒) print(f总计: {results[total]} | 成功: {results[success]} | 失败: {results[failed]}) print(f详细日志见: {log_file}) if results[failed_tasks]: print(f\n失败的任务ID: {[t[id] for t in results[failed_tasks]]}) # 可以选择将失败的任务保存到单独的文件以便重试 retry_file os.path.join(output_dir, failed_tasks.json) with open(retry_file, w, encodingutf-8) as f: json.dump(results[failed_tasks], f, indent2, ensure_asciiFalse) print(f失败任务列表已保存至: {retry_file}) return results这个函数是咱们脚本的“大脑”。它创建线程池把每个提示词生成任务丢进去并行执行。每完成一个就记录一下成功还是失败并把结果写到日志文件里。全部完成后会给你一份清晰的统计报告告诉你花了多少时间成了几个败了几个。失败的任务还会单独存成一个JSON文件方便你之后排查问题或者重试。5. 让脚本更健壮错误重试与日志网络世界充满不确定性偶尔一次请求失败很正常。一个健壮的脚本应该能应对这种小意外。我们可以给单个任务加上重试机制比如失败后等几秒再试一次最多试三次。修改一下我们之前写的generate_and_save_image函数给它包装一个带重试功能的外壳def generate_with_retry(api_url, prompt, output_path, max_retries3, retry_delay5, **kwargs): 带重试机制的图像生成函数。 参数: max_retries (int): 最大重试次数。 retry_delay (int): 重试前等待的秒数。 **kwargs: 传递给generate_and_save_image的其他参数。 for attempt in range(max_retries): success, message generate_and_save_image(api_url, prompt, output_path, **kwargs) if success: return True, message else: if attempt max_retries - 1: # 如果不是最后一次重试 print(f 第{attempt 1}次尝试失败{retry_delay}秒后重试... 错误: {message}) time.sleep(retry_delay) else: print(f 第{attempt 1}次尝试失败已达最大重试次数。) return False, f经过{max_retries}次重试后仍然失败。最后错误: {message}然后在process_single_task这个内部函数里把调用generate_and_save_image改成调用generate_with_retry就行了。关于日志我们在batch_generate_images函数里已经实现了基本的文件日志。这个日志文件会记录每个任务的ID、提示词片段、状态和详细信息。有了它就算你关掉脚本第二天回来也能知道昨天哪些生成了哪些没生成为什么没生成。6. 完整脚本与使用示例好了我们把上面所有的代码块组合起来形成一个完整的、可以直接运行的脚本。我还会加一个简单的命令行参数解析让你不用改代码就能指定任务文件和输出目录。创建一个新文件比如叫batch_pixel_art.py把下面的代码全部复制进去#!/usr/bin/env python3 Qwen-Image-2512-Pixel-Art-LoRA 批量图像生成脚本 支持从CSV/JSON读取提示词并发调用API自动保存图片并记录日志。 import argparse import json import os import time import base64 from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed import pandas as pd import requests # ---------- 第一部分数据加载 ---------- def load_prompts_from_csv(file_path): 从CSV文件加载提示词任务。 try: df pd.read_csv(file_path) required_columns [prompt, output_name] for col in required_columns: if col not in df.columns: raise ValueError(fCSV文件中缺少必要的列: {col}) if id not in df.columns: df[id] range(1, len(df) 1) tasks df.to_dict(records) print(f成功从 {file_path} 加载了 {len(tasks)} 个任务。) return tasks except FileNotFoundError: print(f错误找不到文件 {file_path}) return [] except Exception as e: print(f读取CSV文件时发生错误: {e}) return [] def load_prompts_from_json(file_path): 从JSON文件加载提示词任务。 try: with open(file_path, r, encodingutf-8) as f: tasks json.load(f) print(f成功从 {file_path} 加载了 {len(tasks)} 个任务。) return tasks except FileNotFoundError: print(f错误找不到文件 {file_path}) return [] except json.JSONDecodeError: print(f错误{file_path} 不是有效的JSON格式) return [] # ---------- 第二部分核心生成函数带重试---------- def generate_and_save_image(api_url, prompt, output_path, model_nameQwen-Image-2512-Pixel-Art-LoRA, timeout30): 调用图像生成API并保存图片。 payload { prompt: prompt, model: model_name, size: 1024x1024, num_images: 1 } headers {Content-Type: application/json} try: print(f正在生成: {prompt[:50]}...) response requests.post(api_url, jsonpayload, headersheaders, timeouttimeout) response.raise_for_status() result response.json() if data in result and len(result[data]) 0: # 尝试Base64格式 image_data result[data][0].get(b64_json) if image_data: image_bytes base64.b64decode(image_data) with open(output_path, wb) as f: f.write(image_bytes) return True, f图片已保存至: {output_path} # 尝试URL格式 image_url result[data][0].get(url) if image_url: img_response requests.get(image_url, timeouttimeout) img_response.raise_for_status() with open(output_path, wb) as f: f.write(img_response.content) return True, f图片已从URL下载并保存至: {output_path} return False, fAPI返回了成功状态但无法解析图片数据。 except requests.exceptions.Timeout: return False, 请求超时。 except requests.exceptions.HTTPError as e: return False, fHTTP错误: {e} except requests.exceptions.RequestException as e: return False, f网络请求异常: {e} except Exception as e: return False, f处理过程中发生未知错误: {e} def generate_with_retry(api_url, prompt, output_path, max_retries3, retry_delay5, **kwargs): 带重试机制的图像生成。 for attempt in range(max_retries): success, message generate_and_save_image(api_url, prompt, output_path, **kwargs) if success: return True, message else: if attempt max_retries - 1: print(f 第{attempt 1}次尝试失败{retry_delay}秒后重试...) time.sleep(retry_delay) return False, f经过{max_retries}次重试后仍然失败。 # ---------- 第三部分批量并发处理 ---------- def batch_generate_images(tasks, api_url, output_dirgenerated_images, max_workers5): 批量生成图片的主函数。 os.makedirs(output_dir, exist_okTrue) log_file os.path.join(output_dir, fgeneration_log_{datetime.now().strftime(%Y%m%d_%H%M%S)}.txt) results {total: len(tasks), success: 0, failed: 0, failed_tasks: []} print(f开始批量生成共 {len(tasks)} 个任务并发数: {max_workers}) print(f日志将保存至: {log_file}) def process_single_task(task): task_id task[id] prompt task[prompt] output_name task.get(output_name, fimage_{task_id}.png) output_path os.path.join(output_dir, output_name) success, message generate_with_retry(api_url, prompt, output_path) log_entry f任务ID: {task_id} | 提示词: {prompt[:30]}... | 状态: {成功 if success else 失败} | 信息: {message}\n with open(log_file, a, encodingutf-8) as f: f.write(log_entry) return task_id, success, message start_time time.time() with ThreadPoolExecutor(max_workersmax_workers) as executor: future_to_task {executor.submit(process_single_task, task): task for task in tasks} for future in as_completed(future_to_task): task future_to_task[future] try: task_id, success, message future.result() if success: results[success] 1 print(f✓ 任务 {task_id} 完成) else: results[failed] 1 results[failed_tasks].append(task) print(f✗ 任务 {task_id} 失败: {message}) except Exception as e: results[failed] 1 results[failed_tasks].append(task) print(f✗ 任务 {task[id]} 执行过程中异常: {e}) end_time time.time() # 输出统计报告 print(f\n{*50}) print(f批量生成完成) print(f总耗时: {end_time - start_time:.2f} 秒) print(f总计: {results[total]} | 成功: {results[success]} | 失败: {results[failed]}) print(f详细日志见: {log_file}) if results[failed_tasks]: retry_file os.path.join(output_dir, failed_tasks_for_retry.json) with open(retry_file, w, encodingutf-8) as f: json.dump(results[failed_tasks], f, indent2, ensure_asciiFalse) print(f失败任务列表已保存至: {retry_file}可用于重试。) return results # ---------- 第四部分主程序入口 ---------- def main(): parser argparse.ArgumentParser(description批量生成像素艺术图片) parser.add_argument(--input, -i, requiredTrue, help输入文件路径 (CSV 或 JSON)) parser.add_argument(--api, -a, defaulthttp://localhost:8000/v1/images/generations, help模型API地址) parser.add_argument(--output, -o, defaultgenerated_images, help图片输出目录) parser.add_argument(--workers, -w, typeint, default5, help并发线程数 (默认: 5)) args parser.parse_args() # 根据文件扩展名选择加载方式 if args.input.endswith(.csv): tasks load_prompts_from_csv(args.input) elif args.input.endswith(.json): tasks load_prompts_from_csv(args.input) else: print(错误输入文件必须是CSV (.csv) 或 JSON (.json) 格式。) return if not tasks: print(没有加载到任何任务程序退出。) return print(fAPI地址: {args.api}) print(f输出目录: {args.output}) print(f并发数: {args.workers}) print(- * 50) # 开始批量生成 batch_generate_images(tasks, args.api, args.output, args.workers) if __name__ __main__: main()如何使用这个脚本准备任务文件创建一个prompts.csv文件格式就像前面例子那样。确保模型服务运行启动你的Qwen-Image-2512-Pixel-Art-LoRA服务并记下API地址默认是http://localhost:8000/v1/images/generations如果不是请修改--api参数。运行脚本打开命令行进入脚本所在目录运行python batch_pixel_art.py --input prompts.csv --output my_pixel_arts --workers 8--input: 指定你的任务文件。--output: 指定一个文件夹名字生成的图片都会放在里面。--workers: 指定并发数比如8表示同时处理8个任务。运行起来后你就能看到终端里刷刷地打印任务进度成功打勾✓失败打叉✗。全部完成后去my_pixel_arts文件夹里你所有的像素画就都在那儿了旁边还有一个日志文件告诉你生成过程的详情。7. 总结与后续优化思路整个脚本跑下来你会发现批量生成图片一下子变得轻松多了。不用再守在电脑前重复操作把任务列表准备好运行脚本然后就可以去忙别的事情。这个脚本的核心价值就是把重复、枯燥的劳动自动化把时间还给你让你能更专注于创意和提示词的打磨上。当然这个脚本只是一个起点你可以根据自己的需求对它进行“魔改”。比如你可能想根据不同的提示词让模型生成不同尺寸的图片那就在任务文件里加一列size然后在生成函数里读取这个参数。或者你想在生成每张图片后自动给它加个水印那就在保存图片之后调用一个图像处理库比如PIL来处理一下。还有一点可以优化的是任务管理。如果任务量非常大成千上万张你可能需要考虑把任务队列和状态持久化比如用数据库来记录这样即使脚本中途意外停止重启后也能从断点继续而不是从头再来。不过对于绝大多数日常批量生成的需求今天咱们写的这个脚本已经足够强大了。它解决了从读取、请求、并发、错误处理到日志记录的全流程问题。希望这个实战教程能帮你把Qwen-Image-2512-Pixel-Art-LoRA这个好工具用得更加得心应手真正释放出AI批量创作的威力。获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。