|
|
|
@ -118,8 +118,6 @@ class PromptServer():
|
|
|
|
|
type_dir = folder_paths.get_input_directory()
|
|
|
|
|
elif dir_type == "input":
|
|
|
|
|
type_dir = folder_paths.get_input_directory()
|
|
|
|
|
elif dir_type == "clipspace":
|
|
|
|
|
type_dir = folder_paths.get_clipspace_directory()
|
|
|
|
|
elif dir_type == "temp":
|
|
|
|
|
type_dir = folder_paths.get_temp_directory()
|
|
|
|
|
elif dir_type == "output":
|
|
|
|
@ -127,73 +125,63 @@ class PromptServer():
|
|
|
|
|
|
|
|
|
|
return type_dir
|
|
|
|
|
|
|
|
|
|
@routes.post("/upload/image")
|
|
|
|
|
async def upload_image(request):
|
|
|
|
|
post = await request.post()
|
|
|
|
|
def image_upload(post, image_save_function=None):
|
|
|
|
|
image = post.get("image")
|
|
|
|
|
|
|
|
|
|
upload_dir = get_dir_by_type(post.get("type"))
|
|
|
|
|
|
|
|
|
|
if not os.path.exists(upload_dir):
|
|
|
|
|
os.makedirs(upload_dir)
|
|
|
|
|
image_upload_type = post.get("type")
|
|
|
|
|
upload_dir = get_dir_by_type(image_upload_type)
|
|
|
|
|
|
|
|
|
|
if image and image.file:
|
|
|
|
|
filename = image.filename
|
|
|
|
|
if not filename:
|
|
|
|
|
return web.Response(status=400)
|
|
|
|
|
|
|
|
|
|
subfolder = post.get("subfolder", "")
|
|
|
|
|
full_output_folder = os.path.join(upload_dir, os.path.normpath(subfolder))
|
|
|
|
|
|
|
|
|
|
if os.path.commonpath((upload_dir, os.path.abspath(full_output_folder))) != upload_dir:
|
|
|
|
|
return web.Response(status=400)
|
|
|
|
|
|
|
|
|
|
if not os.path.exists(full_output_folder):
|
|
|
|
|
os.makedirs(full_output_folder)
|
|
|
|
|
|
|
|
|
|
split = os.path.splitext(filename)
|
|
|
|
|
filepath = os.path.join(full_output_folder, filename)
|
|
|
|
|
|
|
|
|
|
i = 1
|
|
|
|
|
while os.path.exists(os.path.join(upload_dir, filename)):
|
|
|
|
|
while os.path.exists(filepath):
|
|
|
|
|
filename = f"{split[0]} ({i}){split[1]}"
|
|
|
|
|
i += 1
|
|
|
|
|
|
|
|
|
|
filepath = os.path.join(upload_dir, filename)
|
|
|
|
|
if image_save_function is not None:
|
|
|
|
|
image_save_function(image, post, filepath)
|
|
|
|
|
else:
|
|
|
|
|
with open(filepath, "wb") as f:
|
|
|
|
|
f.write(image.file.read())
|
|
|
|
|
|
|
|
|
|
with open(filepath, "wb") as f:
|
|
|
|
|
f.write(image.file.read())
|
|
|
|
|
|
|
|
|
|
return web.json_response({"name" : filename})
|
|
|
|
|
return web.json_response({"name" : filename, "subfolder": subfolder, "type": image_upload_type})
|
|
|
|
|
else:
|
|
|
|
|
return web.Response(status=400)
|
|
|
|
|
|
|
|
|
|
@routes.post("/upload/image")
|
|
|
|
|
async def upload_image(request):
|
|
|
|
|
post = await request.post()
|
|
|
|
|
return image_upload(post)
|
|
|
|
|
|
|
|
|
|
@routes.post("/upload/mask")
|
|
|
|
|
async def upload_mask(request):
|
|
|
|
|
post = await request.post()
|
|
|
|
|
image = post.get("image")
|
|
|
|
|
original_image = post.get("original_image")
|
|
|
|
|
|
|
|
|
|
upload_dir = get_dir_by_type(post.get("type"))
|
|
|
|
|
|
|
|
|
|
if not os.path.exists(upload_dir):
|
|
|
|
|
os.makedirs(upload_dir)
|
|
|
|
|
|
|
|
|
|
if image and image.file:
|
|
|
|
|
filename = image.filename
|
|
|
|
|
if not filename:
|
|
|
|
|
return web.Response(status=400)
|
|
|
|
|
|
|
|
|
|
split = os.path.splitext(filename)
|
|
|
|
|
i = 1
|
|
|
|
|
while os.path.exists(os.path.join(upload_dir, filename)):
|
|
|
|
|
filename = f"{split[0]} ({i}){split[1]}"
|
|
|
|
|
i += 1
|
|
|
|
|
|
|
|
|
|
filepath = os.path.join(upload_dir, filename)
|
|
|
|
|
|
|
|
|
|
original_pil = Image.open(original_image.file).convert('RGBA')
|
|
|
|
|
def image_save_function(image, post, filepath):
|
|
|
|
|
original_pil = Image.open(post.get("original_image").file).convert('RGBA')
|
|
|
|
|
mask_pil = Image.open(image.file).convert('RGBA')
|
|
|
|
|
|
|
|
|
|
# alpha copy
|
|
|
|
|
new_alpha = mask_pil.getchannel('A')
|
|
|
|
|
original_pil.putalpha(new_alpha)
|
|
|
|
|
|
|
|
|
|
original_pil.save(filepath)
|
|
|
|
|
|
|
|
|
|
return web.json_response({"name": filename})
|
|
|
|
|
else:
|
|
|
|
|
return web.Response(status=400)
|
|
|
|
|
|
|
|
|
|
return image_upload(post, image_save_function)
|
|
|
|
|
|
|
|
|
|
@routes.get("/view")
|
|
|
|
|
async def view_image(request):
|
|
|
|
@ -201,10 +189,6 @@ class PromptServer():
|
|
|
|
|
filename = request.rel_url.query["filename"]
|
|
|
|
|
filename,output_dir = folder_paths.annotated_filepath(filename)
|
|
|
|
|
|
|
|
|
|
if request.rel_url.query.get("type", "input") and filename.startswith("clipspace/"):
|
|
|
|
|
output_dir = folder_paths.get_clipspace_directory()
|
|
|
|
|
filename = filename[10:]
|
|
|
|
|
|
|
|
|
|
# validation for security: prevent accessing arbitrary path
|
|
|
|
|
if filename[0] == '/' or '..' in filename:
|
|
|
|
|
return web.Response(status=400)
|
|
|
|
|