Skip to content

Commit

Permalink
merge dev, making 2.3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
papuSpartan committed Oct 27, 2024
2 parents 65b0eb7 + 6179dee commit 8fd65eb
Show file tree
Hide file tree
Showing 9 changed files with 240 additions and 161 deletions.
17 changes: 15 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
# Change Log
Formatting: [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), [Semantic Versioning](https://semver.org/spec/v2.0.0.html)

## [2.3.0] - 2024-10-26

## Added
- Compatibility for some extensions which mostly only do postprocessing (e.g. Adetailer)
- Separate toggle state for img2img tab so txt2img can be enabled and t2i disabled or vice versa

## Changed
- Status tab will now automatically refresh
- Main toggle is now in the form of an InputAccordion

## Fixed
- An issue affecting controlnet and inpainting
- Toggle state sometimes desyncing when the page was refreshed

## [2.2.2] - 2024-8-30

### Fixed

- Unavailable state sometimes being ignored

## [2.2.1] - 2024-5-16
Expand Down Expand Up @@ -82,4 +95,4 @@ Formatting: [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), [Semantic
- Worker randomly disconnecting when under high load due to handling a previous request

### Removed
- Certain superfluous warnings in logs related to third party extensions
- Certain superfluous warnings in logs related to third party extensions
21 changes: 20 additions & 1 deletion javascript/distributed.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,23 @@

function confirm_restart_workers(_) {
return confirm('Restart remote workers?')
}
}

// live updates
function update() {
try {
let currentTab = get_uiCurrentTabContent()
let buttons = document.querySelectorAll('#distributed-refresh-status')
for(let i = 0; i < buttons.length; i++) {
if(currentTab.contains(buttons[i])) {
buttons[i].click()
break
}
}
} catch (e) {
if (!(e instanceof TypeError)) {
throw e
}
}
}
setInterval(update, 1500)
195 changes: 94 additions & 101 deletions scripts/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from threading import Thread
from typing import List
import gradio
from torchvision.transforms import ToTensor
import urllib3
from PIL import Image
from modules import processing
Expand All @@ -24,7 +25,7 @@
from scripts.spartan.control_net import pack_control_net
from scripts.spartan.shared import logger
from scripts.spartan.ui import UI
from scripts.spartan.world import World, State
from scripts.spartan.world import World, State, Job

old_sigint_handler = signal.getsignal(signal.SIGINT)
old_sigterm_handler = signal.getsignal(signal.SIGTERM)
Expand Down Expand Up @@ -61,7 +62,7 @@ def show(self, is_img2img):
return scripts.AlwaysVisible

def ui(self, is_img2img):
extension_ui = UI(world=self.world)
extension_ui = UI(world=self.world, is_img2img=is_img2img)
# root, api_exposed = extension_ui.create_ui()
components = extension_ui.create_ui()

Expand All @@ -71,77 +72,61 @@ def ui(self, is_img2img):
# return some components that should be exposed to the api
return components

def add_to_gallery(self, processed, p):
"""adds generated images to the image gallery after waiting for all workers to finish"""
def api_to_internal(self, job) -> ([], [], [], [], []):
# takes worker response received from api and returns parsed objects in internal sdwui format. E.g. all_seeds

def processed_inject_image(image, info_index, save_path_override=None, grid=False, response=None):
image_params: json = response['parameters']
image_info_post: json = json.loads(response["info"]) # image info known after processing
num_response_images = image_params["batch_size"] * image_params["n_iter"]

seed = None
subseed = None
negative_prompt = None
pos_prompt = None
image_params: json = job.worker.response['parameters']
image_info_post: json = json.loads(job.worker.response["info"]) # image info known after processing
all_seeds, all_subseeds, all_negative_prompts, all_prompts, images = [], [], [], [], []

for i in range(len(job.worker.response["images"])):
try:
if num_response_images > 1:
seed = image_info_post['all_seeds'][info_index]
subseed = image_info_post['all_subseeds'][info_index]
negative_prompt = image_info_post['all_negative_prompts'][info_index]
pos_prompt = image_info_post['all_prompts'][info_index]
else:
seed = image_info_post['seed']
subseed = image_info_post['subseed']
negative_prompt = image_info_post['negative_prompt']
pos_prompt = image_info_post['prompt']
if image_params["batch_size"] * image_params["n_iter"] > 1:
all_seeds.append(image_info_post['all_seeds'][i])
all_subseeds.append(image_info_post['all_subseeds'][i])
all_negative_prompts.append(image_info_post['all_negative_prompts'][i])
all_prompts.append(image_info_post['all_prompts'][i])
else: # only a single image received
all_seeds.append(image_info_post['seed'])
all_subseeds.append(image_info_post['subseed'])
all_negative_prompts.append(image_info_post['negative_prompt'])
all_prompts.append(image_info_post['prompt'])
except IndexError:
# like with controlnet masks, there isn't always full post-gen info, so we use the first images'
logger.debug(f"Image at index {i} for '{job.worker.label}' was missing some post-generation data")
processed_inject_image(image=image, info_index=0, response=response)
return

processed.all_seeds.append(seed)
processed.all_subseeds.append(subseed)
processed.all_negative_prompts.append(negative_prompt)
processed.all_prompts.append(pos_prompt)
processed.images.append(image) # actual received image

# generate info-text string
# modules.ui_common -> update_generation_info renders to html below gallery
images_per_batch = p.n_iter * p.batch_size
# zero-indexed position of image in total batch (so including master results)
true_image_pos = len(processed.images) - 1
num_remote_images = images_per_batch * p.batch_size
if p.n_iter > 1: # if splitting by batch count
num_remote_images *= p.n_iter - 1
# # like with controlnet masks, there isn't always full post-gen info, so we use the first images'
# logger.debug(f"Image at index {info_index} for '{job.worker.label}' was missing some post-generation data")
# self.processed_inject_image(image=image, info_index=0, job=job, p=p)
# return
logger.critical(f"Image at index {i} for '{job.worker.label}' was missing some post-generation data")
continue

logger.debug(f"image {true_image_pos + 1}/{self.world.p.batch_size * p.n_iter}, "
f"info-index: {info_index}")
# parse image
image_bytes = base64.b64decode(job.worker.response["images"][i])
image = Image.open(io.BytesIO(image_bytes))
transform = ToTensor()
images.append(transform(image))

if self.world.thin_client_mode:
p.all_negative_prompts = processed.all_negative_prompts
return all_seeds, all_subseeds, all_negative_prompts, all_prompts, images

try:
info_text = image_info_post['infotexts'][i]
except IndexError:
if not grid:
logger.warning(f"image {true_image_pos + 1} was missing info-text")
info_text = processed.infotexts[0]
info_text += f", Worker Label: {job.worker.label}"
processed.infotexts.append(info_text)

# automatically save received image to local disk if desired
if cmd_opts.distributed_remotes_autosave:
save_image(
image=image,
path=p.outpath_samples if save_path_override is None else save_path_override,
basename="",
seed=seed,
prompt=pos_prompt,
info=info_text,
extension=opts.samples_format
)
def inject_job(self, job: Job, p, pp):
"""Adds the work completed by one Job via its worker response to the processing and postprocessing objects"""
all_seeds, all_subseeds, all_negative_prompts, all_prompts, images = self.api_to_internal(job)

p.seeds.extend(all_seeds)
p.subseeds.extend(all_subseeds)
p.negative_prompts.extend(all_negative_prompts)
p.prompts.extend(all_prompts)

num_local = self.world.p.n_iter * self.world.p.batch_size + (opts.return_grid - self.world.thin_client_mode)
num_injected = len(pp.images) - self.world.p.batch_size
for i, image in enumerate(images):
# modules.ui_common -> update_generation_info renders to html below gallery
gallery_index = num_local + num_injected + i # zero-indexed point of image in total gallery
job.gallery_map.append(gallery_index) # so we know where to edit infotext
pp.images.append(image)
logger.debug(f"image {gallery_index + 1 + self.world.thin_client_mode}/{self.world.num_gallery()}")

def update_gallery(self, pp, p):
"""adds all remotely generated images to the image gallery after waiting for all workers to finish"""

# get master ipm by estimating based on worker speed
master_elapsed = time.time() - self.master_start
Expand All @@ -158,8 +143,7 @@ def processed_inject_image(image, info_index, save_path_override=None, grid=Fals
logger.debug("all worker request threads returned")
webui_state.textinfo = "Distributed - injecting images"

# some worker which we know has a good response that we can use for generating the grid
donor_worker = None
received_images = False
for job in self.world.jobs:
if job.worker.response is None or job.batch_size < 1 or job.worker.master:
continue
Expand All @@ -170,8 +154,7 @@ def processed_inject_image(image, info_index, save_path_override=None, grid=Fals
if (job.batch_size * p.n_iter) < len(images):
logger.debug(f"requested {job.batch_size} image(s) from '{job.worker.label}', got {len(images)}")

if donor_worker is None:
donor_worker = job.worker
received_images = True
except KeyError:
if job.batch_size > 0:
logger.warning(f"Worker '{job.worker.label}' had no images")
Expand All @@ -185,41 +168,27 @@ def processed_inject_image(image, info_index, save_path_override=None, grid=Fals
logger.exception(e)
continue

# visibly add work from workers to the image gallery
for i in range(0, len(images)):
image_bytes = base64.b64decode(images[i])
image = Image.open(io.BytesIO(image_bytes))
# adding the images in
self.inject_job(job, p, pp)

# inject image
processed_inject_image(image=image, info_index=i, response=job.worker.response)

if donor_worker is None:
# TODO fix controlnet masks returned via api having no generation info
if received_images is False:
logger.critical("couldn't collect any responses, the extension will have no effect")
return

# generate and inject grid
if opts.return_grid and len(processed.images) > 1:
grid = image_grid(processed.images, len(processed.images))
processed_inject_image(
image=grid,
info_index=0,
save_path_override=p.outpath_grids,
grid=True,
response=donor_worker.response
)

# cleanup after we're doing using all the responses
for worker in self.world.get_workers():
worker.response = None

p.batch_size = len(processed.images)
p.batch_size = len(pp.images)
webui_state.textinfo = ""
return

# p's type is
# "modules.processing.StableDiffusionProcessing*"
def before_process(self, p, *args):
if not self.world.enabled:
logger.debug("extension is disabled")
is_img2img = getattr(p, 'init_images', False)
if is_img2img and self.world.enabled_i2i is False:
logger.debug("extension is disabled for i2i")
return
elif not is_img2img and self.world.enabled is False:
logger.debug("extension is disabled for t2i")
return
self.world.update(p)

Expand All @@ -234,6 +203,14 @@ def before_process(self, p, *args):
continue
title = script.title()

if title == "ADetailer":
adetailer_args = p.script_args[script.args_from:script.args_to]

# InputAccordion main toggle, skip img2img toggle
if adetailer_args[0] and adetailer_args[1]:
logger.debug(f"adetailer is skipping img2img, returning control to wui")
return

# check for supported scripts
if title == "ControlNet":
# grab all controlnet units
Expand Down Expand Up @@ -346,18 +323,34 @@ def before_process(self, p, *args):
p.batch_size = self.world.master_job().batch_size
self.master_start = time.time()

# generate images assigned to local machine
p.do_not_save_grid = True # don't generate grid from master as we are doing this later.
self.runs_since_init += 1
return

def postprocess(self, p, processed, *args):
if not self.world.enabled:
def postprocess_batch_list(self, p, pp, *args, **kwargs):
if not self.world.thin_client_mode and p.n_iter != kwargs['batch_number'] + 1: # skip if not the final batch
return

is_img2img = getattr(p, 'init_images', False)
if is_img2img and self.world.enabled_i2i is False:
return
elif not is_img2img and self.world.enabled is False:
return

if self.master_start is not None:
self.add_to_gallery(p=p, processed=processed)
self.update_gallery(p=p, pp=pp)


def postprocess(self, p, processed, *args):
for job in self.world.jobs:
if job.worker.response is not None:
for i, v in enumerate(job.gallery_map):
infotext = json.loads(job.worker.response['info'])['infotexts'][i]
infotext += f", Worker Label: {job.worker.label}"
processed.infotexts[v] = infotext

# cleanup
for worker in self.world.get_workers():
worker.response = None
# restore process_images_inner if it was monkey-patched
processing.process_images_inner = self.original_process_images_inner
# save any dangling state to prevent load_config in next iteration overwriting it
Expand Down
11 changes: 9 additions & 2 deletions scripts/spartan/control_net.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
# https://github.com/Mikubill/sd-webui-controlnet/wiki/API#examples-1

import copy
from PIL import Image
from modules.api.api import encode_pil_to_base64
from scripts.spartan.shared import logger
import numpy as np
import json
import enum


def np_to_b64(image: np.ndarray):
Expand Down Expand Up @@ -58,10 +61,14 @@ def pack_control_net(cn_units) -> dict:
unit['mask'] = mask_b64 # mikubill
unit['mask_image'] = mask_b64 # forge


# serialize all enums
for k in unit.keys():
if isinstance(unit[k], enum.Enum):
unit[k] = unit[k].value

# avoid returning duplicate detection maps since master should return the same one
unit['save_detected_map'] = False
# remove anything unserializable
del unit['input_mode']

try:
json.dumps(controlnet)
Expand Down
7 changes: 4 additions & 3 deletions scripts/spartan/pmodels.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,18 @@ class Worker_Model(BaseModel):
default=False
)
state: Optional[Any] = Field(default=1, description="The last known state of this worker")
user: Optional[str] = Field(description="The username to be used when authenticating with this worker")
password: Optional[str] = Field(description="The password to be used when authenticating with this worker")
user: Optional[str] = Field(description="The username to be used when authenticating with this worker", default=None)
password: Optional[str] = Field(description="The password to be used when authenticating with this worker", default=None)
pixel_cap: Optional[int] = Field(default=-1, description="Max amount of pixels to allow one worker to handle at the same time. -1 means there is no limit")

class ConfigModel(BaseModel):
workers: List[Dict[str, Worker_Model]]
benchmark_payload: Dict = Field(
benchmark_payload: Benchmark_Payload = Field(
default=Benchmark_Payload,
description='the payload used when benchmarking a node'
)
job_timeout: Optional[int] = Field(default=3)
enabled: Optional[bool] = Field(description="Whether the extension as a whole should be active or disabled", default=True)
enabled_i2i: Optional[bool] = Field(description="Same as above but for image to image", default=True)
complement_production: Optional[bool] = Field(description="Whether to generate complementary images to prevent under-utilizing hardware", default=True)
step_scaling: Optional[bool] = Field(description="Whether to downscale requested steps in order to meet time constraints", default=False)
Loading

0 comments on commit 8fd65eb

Please sign in to comment.