TensorFlow
TensorFlow is a framework for machine and deep
learning. It provides official Python and C++ language bindings, but this
article focuses on the Python API. It supports CPU and GPU execution for
single-node and multi-node systems.
General
To learn how to use TensorFlow take a look at the official guide and tutorials.
Setup
For general setup of the Python environment consult the instructions from Machine and Deep Learning Frameworks. TensorFlow can then be installed via pip or conda. Please find more information here.
The native TensorFlow package only supports NVDIA GPUs, but AMD is providing a ROCM-enabled version to be used with AMD GPUs [1].
NOTE: Often, HPC centers also provide optimized, pre-defined Apptainer or Docker containers that already contain all required packages. In that case, users do not need to create their own separate virtual environment but can conveniently use the container for execution.
Building models
Building machine-learning models can be done from scratch or by using high-level APIs like Keras This offers many ways to implement a wide range of machine-learning algorithms and models. This article omits code examples of how to build models in TensorFlow to avoid duplication of already existing guides and recommends using the official tutorials.
Model training
Training a model with TensorFlow includes multiple steps:
- deciding on the type of training: single device or distributed; CPU or GPU
- define or load the model to be trained
- load and pre-process your dataset
- if applicable, choose and setup the distribution strategy
These steps will be explained in detail in a following example based on the MNIST dataset of handwritten digits to train a neural network that performs image classification. This assumes familiarity with the concepts of machine learning and neural networks.
Defining a model
The following code defines a neural network using Keras.
import tensorflow as tf
model = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
opt = tf.keras.optimizers.Adadelta(0.015 * hvd.size())
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=opt,
metrics=['accuracy'])
Loading the dataset
Now a dataset needs to be loaded. In this case this includes defining transformations for the training data, loading the MNIST dataset, applying shuffling on the samples and packing them into batches of 64 samples.
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(60000).repeat().batch(64)
Training the model
The model is then trained for 10 epochs by calling a high-level Keras function on the model. This removes the need of writing a custom training loop for simpler models.
model.fit(train_dataset, epochs=10, steps_per_epoch=70)
Distributed training
Depending on the system and the available hardware distributed training may require more considerations than simple single device training.
Distributed training can be enabled by different modules and libraries:
tf.distribute.Strategy
: TensorFlow's native distributed training API- Horovod: A distributed training library supported by various ML/DL frameworks
The previously mentioned libraries/modules enable the setup of the distributed
environment and handle calls to the communication libraries. Additionally it is
necessary to think about the required type of distributed parallelism.
As mentioned in the overview article different parallelization strategies
exist for different use-cases. tf.distribute.Strategy
only supports data
parallelism while Horovod supports data and basic model parallelism. For
different strategies additional libraries might be required.
Please find more information in TensorFlow distribute article.
The following example makes use of data parallelism to speed up training:
Native distributed training
Using TensorFlow's native distributed training requires some additional work to
be configured correctly. The code below works with NVIDIA GPUs on one or more
nodes. In case of a single node with multiple GPUs, the MirroredStrategy
will be used for distributed training. In case of multiple nodes it will use
MultiWorkerMirroredStrategy
instead.
world_size = 1
if 'WORLD_SIZE' in os.environ:
world_size = int(os.environ['WORLD_SIZE'])
world_rank = 0
if world_size > 1:
world_rank = int(os.environ['RANK'])
local_rank = int(os.environ['LOCAL_RANK'])
local_batch_size = 64
l_gpu_devices = tf.config.list_physical_devices('GPU')
strategy = None
if world_size == 1 and len(l_gpu_devices) > 0:
# for single host use multi GPU training via MirroredStrategy
strategy = tf.distribute.MirroredStrategy()
elif world_size > 1:
if len(l_gpu_devices) > 0:
tf.config.set_visible_devices(l_gpu_devices[args.local_rank], 'GPU')
strategy = tf.distribute.MultiWorkerMirroredStrategy(communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL))
else:
strategy = tf.distribute.MultiWorkerMirroredStrategy()
model = ... # Define model
optimizer = ... # Set optimizer
dataset = ... # Load dataset
model.fit(
dataset,
batch_size=local_batch_size,
steps_per_epoch=70,
epochs=10
)
Setting up the environment
Setting up the distributed environment requires a few extra steps. It has to be ensured that basic information like the world size and the global and local rank are correctly set on every worker. A script for a SLURM environment that is also compatible with apptainer could look like the following:
#!/bin/bash
# for regular environment
export RANK=${SLURM_PROCID}
export LOCAL_RANK=${SLURM_LOCALID}
export WORLD_SIZE=${SLURM_NTASKS}
# make variables also available inside singularity container
export APPTAINERENV_RANK=${SLURM_PROCID}
export APPTAINERENV_LOCAL_RANK=${SLURM_LOCALID}
export APPTAINERENV_WORLD_SIZE=${SLURM_NTASKS}
# make additional SLURM variables available to container
export APPTAINERENV_SLURM_CPUS_PER_TASK=${SLURM_CPUS_PER_TASK}
export APPTAINERENV_SLURM_NTASKS_PER_NODE=${SLURM_NTASKS_PER_NODE}
export APPTAINERENV_SLURM_NNODES=${SLURM_NNODES}
export APPTAINERENV_SLURM_JOB_NODELIST=${SLURM_JOB_NODELIST}
export APPTAINERENV_R_WLM_ABAQUSHOSTLIST="${R_WLM_ABAQUSHOSTLIST}"
This has to be run on every worker before invoking the training script.
TensorFlow expects the information about each worker in the JSON format
provided via the TF_CONFIG
environment variable. A Python script to parse
the information provided by SLURM into the right format could look like this
(NOTE: This custom script uses a site-specific environment variable R_WLM_ABAQUSHOSTLIST
to get a
pre-formatted list of all allocated nodes. This probably has to be adjusted
depending on the used cluster):
import os
import json
def get_job_node_list_slurm():
host_list_val = eval(os.environ['R_WLM_ABAQUSHOSTLIST']) # or maybe parse SLURM_JOB_NODELIST
host_list = []
for x in host_list_val:
host_list.append(x[0])
host_list = list(set(host_list))
return host_list
def build_tf_config():
# general settings
port_range_start = 23456
tasks_per_node = int(os.environ['SLURM_NTASKS_PER_NODE'])
# create worker list
list_hosts = sorted(get_job_node_list_slurm())
list_workers = []
for host in list_hosts:
for i in range(tasks_per_node):
list_workers.append(f"{host}:{port_range_start+i}")
# create config and set environment variable
tf_config = {
'cluster': {
'worker': list_workers
},
'task': {'type': 'worker', 'index': int(os.environ['SLURM_PROCID'])}
}
str_dump = json.dumps(tf_config)
print(str_dump)
if __name__ == '__main__':
build_tf_config()
This can then be used to set the environment variable to the right value on each worker before calling the training process:
export TF_CONFIG=$(python -W ignore tensorflow_create_tfconfig.py)
Horovod
Using Horovod simplifies the setup by not requiring additional scripts to setup the environment. Adapted for Horovod the native code translates to the following:
import horovod.keras as hvd
# Horovod setup
hvd.init()
# Pin GPU to be used to process local rank (one GPU per process)
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
model = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
# Horovod: adjust learning rate based on number of GPUs.
opt = tf.keras.optimizers.Adadelta(0.015 * hvd.size())
# Horovod: add Horovod Distributed Optimizer.
opt = hvd.DistributedOptimizer(opt)
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=opt,
metrics=['accuracy'])
callbacks = [hvd.callbacks.BroadcastGlobalVariablesCallback(0),]
dataset = ... # Load dataset
model.fit(dataset, callbacks=callbacks, epochs=30, steps_per_epoch=70)
The script can then be called either by using the horovodrun
command or by
directly launching the processes through calls to srun
, mpirun
or similar.
Inference
Using a trained model for inference can be straightforward as loading a pre-trained model and feeding it with data samples. Also take a look at the [https://developer.nvidia.com/triton-inference-server Triton Inference Server] to use inference in production environments or improve inference throughput by pre-compiling a model through TensorRT
Distributed inference is much simpler than distributed training as there is no communication involved between the different instances. Inference can be scaled to an arbitrary amount of GPUs by simply starting single GPU inference processes on different parts of an inference dataset, but requires a large enough number of samples to scale efficiently.