|
|
|
@ -1,6 +1,7 @@
|
|
|
|
|
#Original code can be found on: https://github.com/XLabs-AI/x-flux/blob/main/src/flux/controlnet.py
|
|
|
|
|
|
|
|
|
|
import torch
|
|
|
|
|
import math
|
|
|
|
|
from torch import Tensor, nn
|
|
|
|
|
from einops import rearrange, repeat
|
|
|
|
|
|
|
|
|
@ -13,34 +14,38 @@ import comfy.ldm.common_dit
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ControlNetFlux(Flux):
|
|
|
|
|
def __init__(self, image_model=None, dtype=None, device=None, operations=None, **kwargs):
|
|
|
|
|
def __init__(self, latent_input=False, image_model=None, dtype=None, device=None, operations=None, **kwargs):
|
|
|
|
|
super().__init__(final_layer=False, dtype=dtype, device=device, operations=operations, **kwargs)
|
|
|
|
|
|
|
|
|
|
self.main_model_double = 19
|
|
|
|
|
self.main_model_single = 38
|
|
|
|
|
# add ControlNet blocks
|
|
|
|
|
self.controlnet_blocks = nn.ModuleList([])
|
|
|
|
|
for _ in range(self.params.depth):
|
|
|
|
|
controlnet_block = operations.Linear(self.hidden_size, self.hidden_size, dtype=dtype, device=device)
|
|
|
|
|
# controlnet_block = zero_module(controlnet_block)
|
|
|
|
|
self.controlnet_blocks.append(controlnet_block)
|
|
|
|
|
self.pos_embed_input = operations.Linear(self.in_channels, self.hidden_size, bias=True, dtype=dtype, device=device)
|
|
|
|
|
self.gradient_checkpointing = False
|
|
|
|
|
self.input_hint_block = nn.Sequential(
|
|
|
|
|
operations.Conv2d(3, 16, 3, padding=1, dtype=dtype, device=device),
|
|
|
|
|
nn.SiLU(),
|
|
|
|
|
operations.Conv2d(16, 16, 3, padding=1, dtype=dtype, device=device),
|
|
|
|
|
nn.SiLU(),
|
|
|
|
|
operations.Conv2d(16, 16, 3, padding=1, stride=2, dtype=dtype, device=device),
|
|
|
|
|
nn.SiLU(),
|
|
|
|
|
operations.Conv2d(16, 16, 3, padding=1, dtype=dtype, device=device),
|
|
|
|
|
nn.SiLU(),
|
|
|
|
|
operations.Conv2d(16, 16, 3, padding=1, stride=2, dtype=dtype, device=device),
|
|
|
|
|
nn.SiLU(),
|
|
|
|
|
operations.Conv2d(16, 16, 3, padding=1, dtype=dtype, device=device),
|
|
|
|
|
nn.SiLU(),
|
|
|
|
|
operations.Conv2d(16, 16, 3, padding=1, stride=2, dtype=dtype, device=device),
|
|
|
|
|
nn.SiLU(),
|
|
|
|
|
operations.Conv2d(16, 16, 3, padding=1, dtype=dtype, device=device)
|
|
|
|
|
)
|
|
|
|
|
self.latent_input = latent_input
|
|
|
|
|
self.pos_embed_input = operations.Linear(self.in_channels, self.hidden_size, bias=True, dtype=dtype, device=device)
|
|
|
|
|
if not self.latent_input:
|
|
|
|
|
self.input_hint_block = nn.Sequential(
|
|
|
|
|
operations.Conv2d(3, 16, 3, padding=1, dtype=dtype, device=device),
|
|
|
|
|
nn.SiLU(),
|
|
|
|
|
operations.Conv2d(16, 16, 3, padding=1, dtype=dtype, device=device),
|
|
|
|
|
nn.SiLU(),
|
|
|
|
|
operations.Conv2d(16, 16, 3, padding=1, stride=2, dtype=dtype, device=device),
|
|
|
|
|
nn.SiLU(),
|
|
|
|
|
operations.Conv2d(16, 16, 3, padding=1, dtype=dtype, device=device),
|
|
|
|
|
nn.SiLU(),
|
|
|
|
|
operations.Conv2d(16, 16, 3, padding=1, stride=2, dtype=dtype, device=device),
|
|
|
|
|
nn.SiLU(),
|
|
|
|
|
operations.Conv2d(16, 16, 3, padding=1, dtype=dtype, device=device),
|
|
|
|
|
nn.SiLU(),
|
|
|
|
|
operations.Conv2d(16, 16, 3, padding=1, stride=2, dtype=dtype, device=device),
|
|
|
|
|
nn.SiLU(),
|
|
|
|
|
operations.Conv2d(16, 16, 3, padding=1, dtype=dtype, device=device)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def forward_orig(
|
|
|
|
|
self,
|
|
|
|
@ -58,8 +63,10 @@ class ControlNetFlux(Flux):
|
|
|
|
|
|
|
|
|
|
# running on sequences img
|
|
|
|
|
img = self.img_in(img)
|
|
|
|
|
controlnet_cond = self.input_hint_block(controlnet_cond)
|
|
|
|
|
controlnet_cond = rearrange(controlnet_cond, "b c (h ph) (w pw) -> b (h w) (c ph pw)", ph=2, pw=2)
|
|
|
|
|
if not self.latent_input:
|
|
|
|
|
controlnet_cond = self.input_hint_block(controlnet_cond)
|
|
|
|
|
controlnet_cond = rearrange(controlnet_cond, "b c (h ph) (w pw) -> b (h w) (c ph pw)", ph=2, pw=2)
|
|
|
|
|
|
|
|
|
|
controlnet_cond = self.pos_embed_input(controlnet_cond)
|
|
|
|
|
img = img + controlnet_cond
|
|
|
|
|
vec = self.time_in(timestep_embedding(timesteps, 256))
|
|
|
|
@ -82,13 +89,25 @@ class ControlNetFlux(Flux):
|
|
|
|
|
block_res_sample = controlnet_block(block_res_sample)
|
|
|
|
|
controlnet_block_res_samples = controlnet_block_res_samples + (block_res_sample,)
|
|
|
|
|
|
|
|
|
|
return {"input": (controlnet_block_res_samples * 10)[:19]}
|
|
|
|
|
|
|
|
|
|
repeat = math.ceil(self.main_model_double / len(controlnet_block_res_samples))
|
|
|
|
|
if self.latent_input:
|
|
|
|
|
out_input = ()
|
|
|
|
|
for x in controlnet_block_res_samples:
|
|
|
|
|
out_input += (x,) * repeat
|
|
|
|
|
else:
|
|
|
|
|
out_input = (controlnet_block_res_samples * repeat)
|
|
|
|
|
return {"input": out_input[:self.main_model_double]}
|
|
|
|
|
|
|
|
|
|
def forward(self, x, timesteps, context, y, guidance=None, hint=None, **kwargs):
|
|
|
|
|
hint = hint * 2.0 - 1.0
|
|
|
|
|
patch_size = 2
|
|
|
|
|
if self.latent_input:
|
|
|
|
|
hint = comfy.ldm.common_dit.pad_to_patch_size(hint, (patch_size, patch_size))
|
|
|
|
|
hint = rearrange(hint, "b c (h ph) (w pw) -> b (h w) (c ph pw)", ph=patch_size, pw=patch_size)
|
|
|
|
|
else:
|
|
|
|
|
hint = hint * 2.0 - 1.0
|
|
|
|
|
|
|
|
|
|
bs, c, h, w = x.shape
|
|
|
|
|
patch_size = 2
|
|
|
|
|
x = comfy.ldm.common_dit.pad_to_patch_size(x, (patch_size, patch_size))
|
|
|
|
|
|
|
|
|
|
img = rearrange(x, "b c (h ph) (w pw) -> b (h w) (c ph pw)", ph=patch_size, pw=patch_size)
|