A.I. Tools

Running a Stable Diffusion Cluster on GCP with tensorflow-serving (Part 2) | by Thushan Ganegedara | Mar, 2023


Creating the artifacts and deploying the model on the cluster

In part 1, we learned how to use terraform to set up and manage our infrastructure conveniently. In this part, we will continue on our journey to deploy a running Stable Diffusion model on the provisioned cluster.

Note: You can follow this tutorial end-to-end even if you’re a free user (as long as you have some of free tier credits left).

All images, unless otherwise noted, are by the author

Github: https://github.com/thushv89/tf-serving-gke

Let’s take a look at what the final result would be.

Some images generated by the deployed Stable Diffusion model

What is Stable Diffusion anyway?

There are five main components that builds up the Stable Diffusion model:

Tokenizer — Tokenizes a given string to a list of tokens (numerical IDs)Text encoder — Takes in the tokenized text and produces a text embeddingDiffusion model — Takes in the text embedding and a latent image (initially noise) as an input and incrementally refine the latent image to encoder more and more useful information (visually pleasing)Decoder — Takes in the final latent image and produces an actual imageImage encoder (used for the in-painting feature — we’ll be ignoring this for this exercise)

The principal ground-shattering idea behind stable diffusion (diffusion models) is,

If you add a bit of noise to an image gradually for many steps, you will end up with an image containing noise. By reversing the order of the process, you can have an input (noise) and a target (original image). Then a model is trained to predict the original image from noise.

All of the above components work cohesively to achieve this idea.

Storing the Stable Diffusion model

Code: https://github.com/thushv89/tf-serving-gke/blob/master/notebooks/savedmodel_stable_diffusion.ipynb

In order to construct a Stable Diffusion model, we’ll be using the keras_cv library, which includes a collection of popular deep learning vision models for image classification, segmentation, generative AI, etc. You can find a tutorial here, which explains how to use the StableDiffusion in keras_cv. You can open up a notebook and play with the model to familiarize yourself.

Our goal here is to save the StableDiffusionmodel in the SavedModel format; the go-to standard for serializing TensorFlow models. One crucial requirement to do this is making sure all operations used are TensorFlow graph compatible. Unfortunately, this is not the case.

The current version of the model uses a TensorFlow graph incompatible tokenizer, so it needs to be brought out of the packaged model and used in a separate step.The current version uses predict_on_batch in order to generate an image, which is not supported by TensorFlow graph building.

Fixing the model

In order to patch up the eager mode StableDiffusion model, we’ll create a new model called StableDiffusionNoTokenizer. Through out this new model, we’ll replace all predict_on_batch() calls with graph compatible __call__() calls. We’ll also be decoupling the tokenization process from the model as the name suggests. Additionally, in the generate_image() function, we’ll be replacing,

timesteps = tf.range(1, 1000, 1000 // num_steps)alphas, alphas_prev = self._get_initial_alphas(timesteps)progbar = keras.utils.Progbar(len(timesteps))iteration = 0for index, timestep in list(enumerate(timesteps))[::-1]:latent_prev = latent # Set aside the previous latent vectort_emb = self._get_timestep_embedding(timestep, batch_size)unconditional_latent = self.diffusion_model.predict_on_batch([latent, t_emb, unconditional_context])latent = self.diffusion_model.predict_on_batch([latent, t_emb, context])latent = unconditional_latent + unconditional_guidance_scale * (latent – unconditional_latent)a_t, a_prev = alphas[index], alphas_prev[index]pred_x0 = (latent_prev – math.sqrt(1 – a_t) * latent) / math.sqrt(a_t)latent = (latent * math.sqrt(1.0 – a_prev) + math.sqrt(a_prev) * pred_x0)iteration += 1progbar.update(iteration)


latent = self.diffusion_reverse_loop(latent,context=context, unconditional_context=unconditional_context, batch_size=batch_size, unconditional_guidance_scale=unconditional_guidance_scale,num_steps=num_steps,)


@tf.functiondef diffusion_reverse_loop(self, latent, context, unconditional_context, batch_size, unconditional_guidance_scale, num_steps):

index = num_steps -1cond = tf.math.greater(index, -1)timesteps = tf.range(1, 1000, 1000 // num_steps)alphas, alphas_prev = self._get_initial_alphas(timesteps)iter_partial_fn = functools.partial(self._diffusion_reverse_iter, timesteps=timesteps, alphas=alphas, alphas_prev=alphas_prev, context=context, unconditional_context=unconditional_context, batch_size=batch_size, unconditional_guidance_scale=unconditional_guidance_scale, num_steps=num_steps)

latent, index = tf.while_loop(cond=lambda _, i: tf.math.greater(i, -1), body=iter_partial_fn, loop_vars=[latent, index])

return latent

@tf.functiondef _diffusion_reverse_iter(self, latent_prev, index, timesteps, alphas, alphas_prev, context, unconditional_context, batch_size, unconditional_guidance_scale, num_steps):

t_emb = self._get_timestep_embedding(timesteps[index], batch_size)

combined_latent = self.diffusion_model([tf.concat([latent_prev, latent_prev],axis=0), tf.concat([t_emb, t_emb], axis=0), tf.concat([context, unconditional_context], axis=0)], training=False)latent, unconditional_latent = tf.split(combined_latent, 2, axis=0)latent = unconditional_latent + unconditional_guidance_scale * (latent – unconditional_latent)a_t, a_prev = alphas[index], alphas_prev[index]pred_x0 = (latent_prev – tf.math.sqrt(1 – a_t) * latent) / tf.math.sqrt(a_t)latent = latent * tf.math.sqrt(1.0 – a_prev) + tf.math.sqrt(a_prev) * pred_x0index -= 1

return latent, index

Two main changes I’ve done are:

Instead of a Python for loop, I’ve used the tf.while_loop which is more performant in TensorFlow.Combined the two separate calls to the diffusion_model to a single call and later split the outputs.

There are other changes such as replacing various operations with TensorFlow equivalent (e.g. np.clip() -> tf.clip_by_value()), you can compare and contrast the original model, with this version to compare and contrast.

When working with TensorFlow in the graph execution mode, you can use tf.print() statements in order to ensure the validity of the code during execution. Please refer to the Appendix for more information about tf.print().

Once the underlying model is fixed, we can create the following model, which can be executed in graph mode without a hiccup.

class StableDiffusionTFModel(tf.keras.models.Model):

def __init__(self):super().__init__()self.image_width = self.image_height = 384self.model = StableDiffusionNoTokenizer(img_width=self.image_width, img_height=self.image_height, encoded_text_length=None, jit_compile=True)# This forces the model download its components# self.image_encoder is only required for in-painting – we will ignore this functionality in this excerciseself.text_encoder = self.model.text_encoderself.diffusion_model = self.model.diffusion_modelself.decoder = self.model.decoder

self.default_num_steps = tf.constant(40) self.default_batch_size = tf.constant(2)

# These negative prompt tokens are borrowed from the original stable diffusion modelself.default_negative_prompt_tokens = tf.constant([49406, 8159, 267, 83, 3299, 267, 21101, 8893, 3500, 267, 21101, 8893, 4804, 267, 21101, 8893, 1710, 267, 620, 539, 6481, 267, 38626, 267, 12598, 943, 267, 4231, 34886, 267, 4231, 7072, 267, 4231, 5706, 267, 1518, 15630, 267, 561, 6528, 267, 3417, 268, 3272, 267, 1774, 620, 539, 6481, 267, 21977, 267, 2103, 794, 267, 2103, 15376, 267, 38013, 267, 4160, 267, 2505, 2110, 267, 782, 23257, 49407, 49407, 49407, 49407, 49407, 49407, 49407, 49407, 49407], dtype=tf.int32)

def call(self, inputs):

encoded_text = self.text_encoder([inputs[“tokens”], self.model._get_pos_ids()], training=False)

images = self.model.generate_image(encoded_text, negative_prompt_tokens=inputs.get(“negative_prompt_tokens”, self.default_negative_prompt_tokens),num_steps=inputs.get(“num_steps”, self.default_num_steps), batch_size=inputs.get(“batch_size”, self.default_batch_size))return images

model = StableDiffusionTFModel()

This model takes in the following inputs:

input_tokens: Tokenized representation of the input stringnegative_prompt_tokens: Tokenized representation of the negative prompt (more about negative prompting: here)num_steps: Number of steps to run the diffusion process forbatch_size: Number of images to generate per image

Here’s an example usage of this model:

# Tokenizing the promptstokenizer = SimpleTokenizer()

def generate_tokens(tokenizer, prompt, MAX_PROMPT_LENGTH):

inputs = tokenizer.encode(prompt)if len(inputs) > MAX_PROMPT_LENGTH:raise ValueError(f”Prompt is too long (should be <= {MAX_PROMPT_LENGTH} tokens)”)phrase = tf.concat([inputs, ([49407] * (MAX_PROMPT_LENGTH – len(inputs)))], axis=0)return phrase

tokens = generate_tokens(tokenizer, “a ferrari car with wings”, MAX_PROMPT_LENGTH)

# Invoking the modelall_images = []num_steps = 30tokens = generate_tokens(tokenizer, “a castle in Norway overlooking a glacier, landscape, surrounded by fairies fighting trolls, sunset, high quality”, MAX_PROMPT_LENGTH)neg_tokens = generate_tokens(tokenizer, “ugly, tiling, poorly drawn hands, poorly drawn feet, poorly drawn face, out of frame, mutation, mutated, extra limbs, extra legs, extra arms, disfigured, deformed, cross-eye, body out of frame, blurry, bad art, bad anatomy, blurred, text, watermark, grainy”, MAX_PROMPT_LENGTH)images = model({“tokens”: tokens, “negative_prompt_tokens”: neg_tokens,”num_steps”: tf.constant(num_steps), “batch_size”: tf.constant(1)})

Remember that, I’m (i.e. Free user tier) heavily restricted by quotas on this project.

No GPU quota at allMax 8 N2 CPUs (If you choose N1 CPUs, you can go up to 12)

Therefore, I cannot use any GPU instances or more than 2 n2-standard-4instances. Stable Diffusion models are quite slow therefore we’ll be challenged by latency using CPU instances.

Here are some details about how long it takes under different parameters. The tests were don on a n2-standard-8 machine on Vertex AI workbench.

Image size (num_steps = 40)— 512×512 image: 474s— 384×384 image: 233sbatch_size and num_steps — batch size = 1: 21.6s (num_steps=5), 67.7s (num_steps=20) and 99.5s (num_steps=30)— batch size = 2, 55.6s (num_steps=5), 121.1s (num_steps=20) and 180.2s (num_steps=30)— batch size=4, 21.6s (num_steps=5), 67.7s (num_steps=20) and 99.5s (num_steps=30)

As you can see, increasing the image_size , batch_size , num_steps lead to increased time consumption. Therefore balancing the computational cost with the image quality, we chose the following parameters for our deployed model.

image_size: 384x384num_steps: 30batch_size: 1

Once the model is created, upload the model to the created GCS bucket.

!gsutil -m cp -r ./stable_diffusion_model gs://<project>-bucket/

This will be the data source we’ll be using in order to deploy our model as a prediction service.

Let’s again take a moment to appreciate some images generated by the model, before continuing on to the next section.

Images generated by the deployed model

Code: https://github.com/thushv89/tf-serving-gke/tree/master/infrastrcture

To deploy our model and setup a prediction service, we need 3 configurations:

configmap.yaml — This defines various variables that are required during the deployment. For example this would encompass the location of the SavedModel on GCS (i.e. accessible through the environment variable MODEL_PATH)deployment.yaml — Deployment defines the pod specifications (e.g. CPU) and containers it should be running. In this case, we’ll be running a single container running tensorflow-serving serving the model located at MODEL_PATH .service.yaml — Service is the mechanism with which we expose our tensorflow-serving app running in our pods. For example we can tell it to expose our pod(s) through a load balancer.

The deployment

Let’s first look at the spec of the deployment :

spec:replicas: 1selector:matchLabels:app: stable-diffusiontemplate:metadata:labels:app: stable-diffusionspec:containers:- name: tf-servingimage: “tensorflow/serving:2.11.0″args:- “–model_name=$(MODEL_NAME)”- “–model_base_path=$(MODEL_PATH)”- “–rest_api_timeout_in_ms=720000″envFrom:- configMapRef:name: tfserving-configsimagePullPolicy: IfNotPresentreadinessProbe:httpGet:path: “/v1/models/stable-diffusion”port: 8501scheme: HTTPinitialDelaySeconds: 30periodSeconds: 15failureThreshold: 10ports:- name: httpcontainerPort: 8501protocol: TCP- name: grpccontainerPort: 8500protocol: TCPresources:requests:cpu: “3”memory: “12Gi”

We can make few interesting observations:

We’re only declaring a single replica in the script, scaling will be setup separately in and will be controlled through an autoscaling policyWe provide a selector, which the service will look for in a deployment to ensure it’s serving on the correct deploymentWe expose two ports; 8501 (HTTP traffic) and 8500 (GRPC traffic)We’ll be requesting 3 “CPU time” and 12Gi per container.

Note 1:A node will be typically running other pods necessitated by Kubernetes (i.e. DNS, monitoring, etc.). Therefore such factors need to be taken into account when stipulating compute resources for the pod. You can see that, though we have a 4 CPUs in a node, we only request 3 (you could request fractional CPU resources as well — e.g. 3.5). You can see the allocatable CPU/Memory of each node in GCP (GCP Console → Clusters → Nodes → Click on a node) or using kubectl describe nodes.If your node is unable to fulfill the compute resources you specify Kubernetes will not be able to run the pods and throw an error (e.g. PodUnschedulable).

Note 2: One of the most crucial arguments you need to be careful about is –rest_api_timeout_in_ms=720000 . It takes about 250s to serve up a single request, so here, we’re setting roughly three times that time as the timeout, to account for any enqueued requests, when we send parallel requests. If you set this to a value that’s too small, your requests will timeout before they are complete.

Defining the service

Here we’re defining a LoadBalancer type service, where we will expose the stable-diffusion app through the GCP load balancer. In this approach, you will be provided with the load balancer’s IP address, where the load balancer will route the traffic to the pods coming on to it. Users will be making requests against the IP address of the load balancer.

metadata:name: stable-diffusionnamespace: defaultlabels:app: stable-diffusionspec:type: LoadBalancerports:- port: 8500protocol: TCPname: tf-serving-grpc- port: 8501protocol: TCPname: tf-serving-httpselector:app: stable-diffusion


There’s an imperative topic that we’ve been putting off; scaling up our service. In real world, you may need to serve thousands, millions or even billions of customers. In order to do so, your service needs to be able to scale up/down the number of nodes/pods in the cluster, based on the demand. Fortunately GCP provides a variety of options, from fully managed autoscaling to semi/fully user managed autoscaling. You can learn more about these in this video.

Here we’ll be using a horizontal pod autoscaler (HPA). The horizontal pod autoscaler will scale up the number of pods, based on some threshold you provide (e.g. CPU or memory usage). Here’s an example.

kubectl autoscale deployment stable-diffusion –cpu-percent=60 –min=1 –max=2

Here we’re giving the HPA a minimum of 1, maximum of 2 pods, and asking it to add more pods if the average CPU across current set of pods go above 60%.

Applying the changes

We now got all the building blocks ready to start our service. Simply run the following commands.

gcloud container clusters get-credentials sd-cluster –zone us-central1-c && \kubectl apply -f tf-serving/configmap.yaml && \kubectl apply -f tf-serving/deployment.yaml && \kubectl autoscale deployment stable-diffusion –cpu-percent=60 –min=1 –max=2 && \kubectl apply -f tf-serving/service.yaml

In order to predict you simply need to make a POST request to the correct URL, with a payload containing the input to the model.

Sequential predictions

As the first example, we show how you can make a series of requests one after the other.

def predict_rest(json_data, url):json_response = requests.post(url, data=json_data)response = json.loads(json_response.text)if “predictions” not in response:print(response)rest_outputs = np.array(response[“predictions”])return rest_outputs

url = f”http://{stable_diffusion_service_ip}:8501/v1/models/stable-diffusion:predict”

tokens_list = [generate_tokens(tokenizer, “A wine glass made from lego bricks, rainbow colored liquid being poured into it, hyper realistic, high detail”, MAX_PROMPT_LENGTH).numpy().tolist(),generate_tokens(tokenizer, “A staircase made from color pencils, hyper realistic, high detail”, MAX_PROMPT_LENGTH).numpy().tolist(),generate_tokens(tokenizer, “A ferrari car in the space astronaut driving it, futuristic, hyper realistic, high detail”, MAX_PROMPT_LENGTH).numpy().tolist(),generate_tokens(tokenizer, “a dragon covered with weapons fighting an army, fire, explosions, hyper realistic, high detail”, MAX_PROMPT_LENGTH).numpy().tolist(),generate_tokens(tokenizer, “A sawing girl in a boat, hyper realistic, high detail”, MAX_PROMPT_LENGTH).numpy().tolist(),

]negative_tokens = generate_tokens(tokenizer, “ugly, tiling, poorly drawn hands, poorly drawn feet, poorly drawn face, out of frame, mutation, mutated, extra limbs, extra legs, extra arms, disfigured, deformed, cross-eye, body out of frame, blurry, bad art, bad anatomy, blurred, text, watermark, grainy”, MAX_PROMPT_LENGTH).numpy().tolist()

all_images = []all_data = []for tokens, negative_tokens in zip(tokens_list, [negative_tokens for _ in range(5)]):all_data.append(generate_json_data(tokens, negative_tokens))

all_images = [predict_rest(data, url) for data in all_data]

This took over 1600s when I ran the experiment. As you might imagine, this setup is quite inefficient, and is unable to leverage the cluster’s ability to scale up.

Parallel predictions

You can use Python’s multiprocessing library to make parallel requests, which is more remnant of real-world user requests.

def predict_rest(input_data, url):json_data, sleep_time = input_data[“data”], input_data[“sleep_time”]

# We add a delay to simulate real world user requeststime.sleep(sleep_time)print(“Making a request”)t1 = time.perf_counter()json_response = requests.post(url, data=json_data)response = json.loads(json_response.text)result = np.array([])try: result = np.array(response[“predictions”])except KeyError:print(f”Couldn’t complete the request {response}”)finally:t2 = time.perf_counter() print(f”It took {t2-t1}s to complete a single request”)return result

t1 = time.perf_counter()

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:all_images_gen = executor.map(functools.partial(predict_rest, url=url), [{“data”: data, “sleep_time”: min(i*20, 60)} for i, data in enumerate(all_data)])all_images = [img for img in all_images_gen]

t2 = time.perf_counter() print(f”It took {t2-t1}s to complete {n_requests} requests”)

This ran in 900s. Therefore, we have a ~180% speed up, by scaling up the cluster to a maximum of 2 pods.

Note about setting timeoutsBe careful when setting up the parallel requests. If you send the parallel requests all at once (since this is only 6 requests), they will likely timeout. This is because it takes time to create a new node and initialize a new pod. So if all requests are made instantly, the load balancer might not even have time to see the 2nd node and end up trying to serve all requests to the single node. Your timeout defined above is counted from the time the request is received (i.e. enter the tensorflow-serving queue), not from the time it starts serving the request. So if the request waits too long in the queue that also counts for the timeout.

You can monitor the compute metrics such as the CPU usage and memory consumption on GCP (GCP → Kubernetes Engine → Services & Ingress → Select your service).

Usage graph for sequential requests (top) Usage for parallel requests (bottom)

In this 2 part tutorial, we,

Setup the infrastructure using terraform (an IaaS tool), which mainly consisted of a cluster and a node-pool (Part 1)Deployed a model and created a prediction service to serve user requests using a Stable Diffusion model (Part 2)

We setup this tutorial in a way that it can be run by even a free tier user. We setup a cluster with 2 nodes and created 1 pod per node. Then we made both sequential and parallel predictions and saw that parallel predictions lead to ~180% gains in higher throughput.

Next steps:

Model warmup — tensorflow-serving offers an easy way to warm up a model. You can parse example requests and they will be loaded and sent to the model, prior to serving actual user requests. This will reduce the latency for the initial user requests.Dynamic batching of requests — You can choose to dynamically batch the incoming requests. This will allow the model to make predictions on a batch of inputs than predicting on each input. Given enough memory, this will likely to provide throughput gains, allowing you to serve lot of requests within reasonable time bounds.

Debugging within pods

When I’m trying to get this up and running, a painstaking issue I faced was running into the following wall of brick.

The errors that were shown in the Workloads → Deployment section

and when I go into one of the pods in the deployment, I get a more sensible (still inconspicuous) error. But it was still inadequate to put a finger on what exactly was wrong.

Events produced by a single pod

So I had to find a way to microscopically investigate the root cause. For that I first logged into the pod’s container in question,

kubectl exec –stdin –tty <container name> — /bin/bash

Once I’m in, all I got to do is capitalize on the “everything is a file” paradigm Linux thrives on. In other words, you can simply tap into a file to see the output/error stream of a process. For example, in my case, tensorflow-serving process had the PID 7, therefore, /proc/7/fd/2 gives the error stream of that process.

tail -n 10 /proc/7/fd/2

In here, I was able to see exactly why this wasn’t kick-starting. It was because the container didn’t have the necessary permission to access the GCS bucket specified in MODEL_PATH .

Using tf.print for debugging

As you know, TensorFlow offers two styles of execution; imperative and declarative. Since we use the __call__() to call the model (i.e. self.model(<inputs>), these calls are executed as graph operations. You may know already that graph execution is notoriously difficult to debug, due to obscurities caused by the internal graph. One solution TensorFlow offers is the usage of tf.print statements.

You can place tf.print statements in your model calls, and those print statements are added as operations to the graph, so you can see values of executed tensors, etc. allowing you to debug the code than throwing darts and hoping for the best.

Make sure your tf.print statement is printing an input that appears immediately before the time you want it to be printed. If you add independent/dummy tf.print statements, they will not get embedded in the graph in the correct position. This may give you the deceptive feeling that some computation is happening very quickly, due to the incorrect placement of the graph.

Note about machine types

There are two main types of machines you can use for this exercise; n1 and n2. N2 instances use 3rd generation Xeon processors that are equipped special instruction sets (AVX-512) to speed up operations such as matrix multiplication. Therefore, CPU bound TensorFlow code runs faster on n2 machines than on n1.

I’d like to acknowledge the ML Developer Programs and the team for the GCP credits provided to make this tutorial a success.

Source link

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button
Translate »