Difference between revisions of "TensorFlow"

From HPC Wiki
Jump to navigation Jump to search
 
(One intermediate revision by the same user not shown)
Line 1: Line 1:
 
[[Category:HPC-Developer]] [[Category:HPC-User]]
 
[[Category:HPC-Developer]] [[Category:HPC-User]]
  
== Description ==
+
[https://www.tensorflow.org/ TensorFlow] is a framework for machine and deep
 +
learning. It provides official Python and C++ language bindings, but this
 +
article focuses on the Python API. It supports CPU and GPU execution for
 +
single-node and multi-node systems.
  
tbd.
+
== General ==
  
== Distributed training ==
+
To learn how to use TensorFlow take a look at the
 +
[https://www.tensorflow.org/guide official guide] and
 +
[https://www.tensorflow.org/tutorials tutorials].
  
tbd.
+
=== Setup ===
 +
 
 +
For general setup of the Python environment consult the instructions from
 +
[[Machine_and_Deep_Learning_Frameworks#General_setup_and_software_environment|Machine and Deep Learning Frameworks]].
 +
TensorFlow can then be installed via pip or conda. Please find more information [https://www.tensorflow.org/install here].
 +
 
 +
The native TensorFlow package only supports NVDIA GPUs, but AMD is providing a
 +
ROCM-enabled version to be used with AMD GPUs
 +
[https://rocm.docs.amd.com/projects/install-on-linux/en/latest/install/3rd-party/tensorflow-install.html].
 +
 
 +
'''NOTE:''' Often, HPC centers also provide optimized, pre-defined Apptainer or Docker containers that already contain all required packages. In that case, users do not need to create their own separate virtual environment but can conveniently use the container for execution.
 +
 
 +
== Building models ==
 +
 
 +
Building machine-learning models can be done from scratch or by using
 +
high-level APIs like [https://keras.io/ Keras] This offers many ways to
 +
implement a wide range of machine-learning algorithms and models. This article
 +
omits code examples of how to build models in TensorFlow to avoid duplication
 +
of already existing guides and recommends using the official tutorials.
 +
 
 +
== Model training ==
 +
 
 +
Training a model with TensorFlow includes multiple steps:
 +
* deciding on the type of training: single device or distributed; CPU or GPU
 +
* define or load the model to be trained
 +
* load and pre-process your dataset
 +
* if applicable, choose and setup the distribution strategy
 +
 
 +
These steps will be explained in detail in a following example based on the
 +
MNIST dataset of handwritten digits to train a neural network that performs
 +
image classification.
 +
This assumes familiarity with the concepts of machine learning and neural networks.
 +
 
 +
=== Defining a model ===
 +
 
 +
The following code defines a neural network using Keras.
 +
 
 +
<syntaxhighlight lang="python">
 +
import tensorflow as tf
 +
 
 +
model = tf.keras.Sequential([
 +
            tf.keras.layers.InputLayer(input_shape=(28, 28)),
 +
            tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
 +
            tf.keras.layers.Conv2D(32, 3, activation='relu'),
 +
            tf.keras.layers.Flatten(),
 +
            tf.keras.layers.Dense(128, activation='relu'),
 +
            tf.keras.layers.Dense(10)
 +
        ])
 +
 
 +
opt = tf.keras.optimizers.Adadelta(0.015 * hvd.size())
 +
model.compile(
 +
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
 +
    optimizer=opt,
 +
    metrics=['accuracy'])
 +
</syntaxhighlight>
 +
 
 +
=== Loading the dataset ===
 +
 
 +
Now a dataset needs to be loaded. In this case this includes defining
 +
transformations for the training data, loading the MNIST dataset, applying
 +
shuffling on the samples and packing them into batches of 64 samples.
 +
 
 +
<syntaxhighlight lang="python">
 +
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
 +
x_train = x_train / np.float32(255)
 +
y_train = y_train.astype(np.int64)
 +
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(60000).repeat().batch(64)
 +
</syntaxhighlight>
 +
 
 +
=== Training the model ===
 +
 
 +
The model is then trained for 10 epochs by calling a high-level Keras function
 +
on the model. This removes the need of writing a custom training loop for
 +
simpler models.
 +
 
 +
<syntaxhighlight lang="python">
 +
model.fit(train_dataset, epochs=10, steps_per_epoch=70)
 +
</syntaxhighlight>
 +
 
 +
=== Distributed training ===
 +
 
 +
Depending on the system and the available hardware distributed training may
 +
require more considerations than simple single device training.
 +
 
 +
Distributed training can be enabled by different modules and libraries:
 +
* <syntaxhighlight lang="bash" inline>tf.distribute.Strategy</syntaxhighlight>: TensorFlow's native distributed training API
 +
* '''[https://Horovod.ai/ Horovod]''': A distributed training library supported by various ML/DL frameworks
 +
 
 +
The previously mentioned libraries/modules enable the setup of the distributed
 +
environment and handle calls to the communication libraries. Additionally it is
 +
necessary to think about the required type of distributed parallelism.
 +
As mentioned in the [[Machine_and_Deep_Learning_Frameworks#Distributed_training/fine-tuning|overview article]] different parallelization strategies
 +
exist for different use-cases. <syntaxhighlight lang="bash" inline>tf.distribute.Strategy</syntaxhighlight> only supports data
 +
parallelism while '''Horovod''' supports data and basic model parallelism. For
 +
different strategies additional libraries might be required.
 +
Please find more information in TensorFlow [https://www.tensorflow.org/tutorials/distribute/keras distribute] article.
 +
 
 +
The following example makes use of data parallelism to speed up training:
 +
 
 +
==== Native distributed training ====
 +
 
 +
Using TensorFlow's native distributed training requires some additional work to
 +
be configured correctly. The code below works with NVIDIA GPUs on one or more
 +
nodes. In case of a single node with multiple GPUs, the <syntaxhighlight lang="python" inline>MirroredStrategy</syntaxhighlight>
 +
will be used for distributed training. In case of multiple nodes it will use
 +
<syntaxhighlight lang="python" inline>MultiWorkerMirroredStrategy</syntaxhighlight> instead.
 +
 
 +
<syntaxhighlight lang="python">
 +
world_size = 1
 +
if 'WORLD_SIZE' in os.environ:
 +
    world_size = int(os.environ['WORLD_SIZE'])
 +
 
 +
world_rank = 0
 +
if world_size > 1:
 +
    world_rank = int(os.environ['RANK'])
 +
    local_rank = int(os.environ['LOCAL_RANK'])
 +
local_batch_size = 64
 +
 
 +
l_gpu_devices = tf.config.list_physical_devices('GPU')
 +
 
 +
strategy = None
 +
if world_size == 1 and len(l_gpu_devices) > 0:
 +
    # for single host use multi GPU training via MirroredStrategy
 +
    strategy = tf.distribute.MirroredStrategy()
 +
elif world_size > 1:
 +
    if len(l_gpu_devices) > 0:
 +
        tf.config.set_visible_devices(l_gpu_devices[args.local_rank], 'GPU')
 +
        strategy = tf.distribute.MultiWorkerMirroredStrategy(communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL))
 +
    else:
 +
        strategy = tf.distribute.MultiWorkerMirroredStrategy()
 +
 
 +
model = ... # Define model
 +
optimizer = ... # Set optimizer
 +
dataset = ... # Load dataset
 +
model.fit(
 +
    dataset,
 +
    batch_size=local_batch_size,
 +
    steps_per_epoch=70,
 +
    epochs=10
 +
)
 +
</syntaxhighlight>
 +
 
 +
===== Setting up the environment =====
 +
 
 +
Setting up the distributed environment requires a few extra steps. It has to be
 +
ensured that basic information like the world size and the global and local
 +
rank are correctly set on every worker. A script for a SLURM environment that
 +
is also compatible with apptainer could look like the following:
 +
 
 +
<syntaxhighlight lang="bash">
 +
#!/bin/bash
 +
 
 +
# for regular environment
 +
export RANK=${SLURM_PROCID}
 +
export LOCAL_RANK=${SLURM_LOCALID}
 +
export WORLD_SIZE=${SLURM_NTASKS}
 +
 
 +
# make variables also available inside singularity container
 +
export APPTAINERENV_RANK=${SLURM_PROCID}
 +
export APPTAINERENV_LOCAL_RANK=${SLURM_LOCALID}
 +
export APPTAINERENV_WORLD_SIZE=${SLURM_NTASKS}
 +
 
 +
# make additional SLURM variables available to container
 +
export APPTAINERENV_SLURM_CPUS_PER_TASK=${SLURM_CPUS_PER_TASK}
 +
export APPTAINERENV_SLURM_NTASKS_PER_NODE=${SLURM_NTASKS_PER_NODE}
 +
export APPTAINERENV_SLURM_NNODES=${SLURM_NNODES}
 +
export APPTAINERENV_SLURM_JOB_NODELIST=${SLURM_JOB_NODELIST}
 +
export APPTAINERENV_R_WLM_ABAQUSHOSTLIST="${R_WLM_ABAQUSHOSTLIST}"
 +
</syntaxhighlight>
 +
 
 +
This has to be run on every worker before invoking the training script.
 +
 
 +
TensorFlow expects the information about each worker in the JSON format
 +
provided via the <syntaxhighlight lang="bash" inline>TF_CONFIG</syntaxhighlight> environment variable. A Python script to parse
 +
the information provided by SLURM into the right format could look like this
 +
(NOTE: This custom script uses a site-specific environment variable <syntaxhighlight lang="bash" inline>R_WLM_ABAQUSHOSTLIST</syntaxhighlight> to get a
 +
pre-formatted list of all allocated nodes. This probably has to be adjusted
 +
depending on the used cluster):
 +
 
 +
<syntaxhighlight lang="python">
 +
import os
 +
import json
 +
 
 +
def get_job_node_list_slurm():
 +
    host_list_val      = eval(os.environ['R_WLM_ABAQUSHOSTLIST']) # or maybe parse SLURM_JOB_NODELIST
 +
    host_list          = []
 +
    for x in host_list_val:
 +
        host_list.append(x[0])
 +
    host_list = list(set(host_list))
 +
    return host_list
 +
 
 +
def build_tf_config():
 +
    # general settings
 +
    port_range_start    = 23456
 +
    tasks_per_node      = int(os.environ['SLURM_NTASKS_PER_NODE'])
 +
 
 +
    # create worker list
 +
    list_hosts          = sorted(get_job_node_list_slurm())
 +
    list_workers        = []
 +
    for host in list_hosts:
 +
        for i in range(tasks_per_node):
 +
            list_workers.append(f"{host}:{port_range_start+i}")
 +
 
 +
    # create config and set environment variable
 +
    tf_config = {
 +
        'cluster': {
 +
            'worker': list_workers
 +
        },
 +
        'task': {'type': 'worker', 'index': int(os.environ['SLURM_PROCID'])}
 +
    }
 +
 
 +
    str_dump = json.dumps(tf_config)
 +
    print(str_dump)
 +
 
 +
if __name__ == '__main__':
 +
    build_tf_config()
 +
</syntaxhighlight>
 +
 
 +
This can then be used to set the environment variable to the right value on
 +
each worker before calling the training process:
 +
 
 +
<syntaxhighlight lang="bash">
 +
export TF_CONFIG=$(python -W ignore tensorflow_create_tfconfig.py)
 +
</syntaxhighlight>
 +
 
 +
==== Horovod ====
 +
 
 +
Using Horovod simplifies the setup by not requiring additional scripts to setup the environment.
 +
Adapted for Horovod the native code translates to the following:
 +
 
 +
<syntaxhighlight lang="python">
 +
import horovod.keras as hvd
 +
 
 +
# Horovod setup
 +
hvd.init()
 +
 
 +
# Pin GPU to be used to process local rank (one GPU per process)
 +
gpus = tf.config.experimental.list_physical_devices('GPU')
 +
if gpus:
 +
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
 +
 
 +
model = tf.keras.Sequential([
 +
    tf.keras.layers.InputLayer(input_shape=(28, 28)),
 +
    tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
 +
    tf.keras.layers.Conv2D(32, 3, activation='relu'),
 +
    tf.keras.layers.Flatten(),
 +
    tf.keras.layers.Dense(128, activation='relu'),
 +
    tf.keras.layers.Dense(10)
 +
])
 +
# Horovod: adjust learning rate based on number of GPUs.
 +
opt = tf.keras.optimizers.Adadelta(0.015 * hvd.size())
 +
# Horovod: add Horovod Distributed Optimizer.
 +
opt = hvd.DistributedOptimizer(opt)
 +
model.compile(
 +
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
 +
    optimizer=opt,
 +
    metrics=['accuracy'])
 +
 
 +
callbacks = [hvd.callbacks.BroadcastGlobalVariablesCallback(0),]
 +
 
 +
dataset = ... # Load dataset
 +
model.fit(dataset, callbacks=callbacks, epochs=30, steps_per_epoch=70)
 +
</syntaxhighlight>
 +
 
 +
The script can then be called either by using the <syntaxhighlight lang="bash" inline>horovodrun</syntaxhighlight> command or by
 +
directly launching the processes through calls to <syntaxhighlight lang="bash" inline>srun</syntaxhighlight>, <syntaxhighlight lang="bash" inline>mpirun</syntaxhighlight> or similar.
 +
 
 +
== Inference ==
 +
 
 +
Using a trained model for inference can be straightforward as loading a
 +
pre-trained model and feeding it with data samples.
 +
Also take a look at the [https://developer.nvidia.com/triton-inference-server Triton
 +
Inference Server] to use inference in production environments or improve
 +
inference throughput by pre-compiling a model through
 +
[https://developer.nvidia.com/tensorrt-getting-started TensorRT]
 +
 
 +
Distributed inference is much simpler than distributed training as there is no
 +
communication involved between the different instances. Inference can be scaled
 +
to an arbitrary amount of GPUs by simply starting single GPU inference
 +
processes on different parts of an inference dataset, but requires a large
 +
enough number of samples to scale efficiently.

Latest revision as of 15:14, 7 November 2024


TensorFlow is a framework for machine and deep learning. It provides official Python and C++ language bindings, but this article focuses on the Python API. It supports CPU and GPU execution for single-node and multi-node systems.

General

To learn how to use TensorFlow take a look at the official guide and tutorials.

Setup

For general setup of the Python environment consult the instructions from Machine and Deep Learning Frameworks. TensorFlow can then be installed via pip or conda. Please find more information here.

The native TensorFlow package only supports NVDIA GPUs, but AMD is providing a ROCM-enabled version to be used with AMD GPUs [1].

NOTE: Often, HPC centers also provide optimized, pre-defined Apptainer or Docker containers that already contain all required packages. In that case, users do not need to create their own separate virtual environment but can conveniently use the container for execution.

Building models

Building machine-learning models can be done from scratch or by using high-level APIs like Keras This offers many ways to implement a wide range of machine-learning algorithms and models. This article omits code examples of how to build models in TensorFlow to avoid duplication of already existing guides and recommends using the official tutorials.

Model training

Training a model with TensorFlow includes multiple steps:

  • deciding on the type of training: single device or distributed; CPU or GPU
  • define or load the model to be trained
  • load and pre-process your dataset
  • if applicable, choose and setup the distribution strategy

These steps will be explained in detail in a following example based on the MNIST dataset of handwritten digits to train a neural network that performs image classification. This assumes familiarity with the concepts of machine learning and neural networks.

Defining a model

The following code defines a neural network using Keras.

import tensorflow as tf

model = tf.keras.Sequential([
            tf.keras.layers.InputLayer(input_shape=(28, 28)),
            tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
            tf.keras.layers.Conv2D(32, 3, activation='relu'),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.Dense(10)
        ])

opt = tf.keras.optimizers.Adadelta(0.015 * hvd.size())
model.compile(
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    optimizer=opt,
    metrics=['accuracy'])

Loading the dataset

Now a dataset needs to be loaded. In this case this includes defining transformations for the training data, loading the MNIST dataset, applying shuffling on the samples and packing them into batches of 64 samples.

(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(60000).repeat().batch(64)

Training the model

The model is then trained for 10 epochs by calling a high-level Keras function on the model. This removes the need of writing a custom training loop for simpler models.

model.fit(train_dataset, epochs=10, steps_per_epoch=70)

Distributed training

Depending on the system and the available hardware distributed training may require more considerations than simple single device training.

Distributed training can be enabled by different modules and libraries:

  • tf.distribute.Strategy: TensorFlow's native distributed training API
  • Horovod: A distributed training library supported by various ML/DL frameworks

The previously mentioned libraries/modules enable the setup of the distributed environment and handle calls to the communication libraries. Additionally it is necessary to think about the required type of distributed parallelism. As mentioned in the overview article different parallelization strategies exist for different use-cases. tf.distribute.Strategy only supports data parallelism while Horovod supports data and basic model parallelism. For different strategies additional libraries might be required. Please find more information in TensorFlow distribute article.

The following example makes use of data parallelism to speed up training:

Native distributed training

Using TensorFlow's native distributed training requires some additional work to be configured correctly. The code below works with NVIDIA GPUs on one or more nodes. In case of a single node with multiple GPUs, the MirroredStrategy will be used for distributed training. In case of multiple nodes it will use MultiWorkerMirroredStrategy instead.

world_size = 1
if 'WORLD_SIZE' in os.environ:
    world_size = int(os.environ['WORLD_SIZE'])

world_rank = 0
if world_size > 1:
    world_rank = int(os.environ['RANK'])
    local_rank = int(os.environ['LOCAL_RANK'])
local_batch_size = 64

l_gpu_devices = tf.config.list_physical_devices('GPU')

strategy = None
if world_size == 1 and len(l_gpu_devices) > 0:
    # for single host use multi GPU training via MirroredStrategy
    strategy = tf.distribute.MirroredStrategy()
elif world_size > 1:
    if len(l_gpu_devices) > 0:
        tf.config.set_visible_devices(l_gpu_devices[args.local_rank], 'GPU')
        strategy = tf.distribute.MultiWorkerMirroredStrategy(communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL))
    else:
        strategy = tf.distribute.MultiWorkerMirroredStrategy()

model = ... # Define model
optimizer = ... # Set optimizer
dataset = ... # Load dataset
model.fit(
    dataset,
    batch_size=local_batch_size,
    steps_per_epoch=70,
    epochs=10
)
Setting up the environment

Setting up the distributed environment requires a few extra steps. It has to be ensured that basic information like the world size and the global and local rank are correctly set on every worker. A script for a SLURM environment that is also compatible with apptainer could look like the following:

#!/bin/bash

# for regular environment
export RANK=${SLURM_PROCID}
export LOCAL_RANK=${SLURM_LOCALID}
export WORLD_SIZE=${SLURM_NTASKS}

# make variables also available inside singularity container
export APPTAINERENV_RANK=${SLURM_PROCID}
export APPTAINERENV_LOCAL_RANK=${SLURM_LOCALID}
export APPTAINERENV_WORLD_SIZE=${SLURM_NTASKS}

# make additional SLURM variables available to container
export APPTAINERENV_SLURM_CPUS_PER_TASK=${SLURM_CPUS_PER_TASK}
export APPTAINERENV_SLURM_NTASKS_PER_NODE=${SLURM_NTASKS_PER_NODE}
export APPTAINERENV_SLURM_NNODES=${SLURM_NNODES}
export APPTAINERENV_SLURM_JOB_NODELIST=${SLURM_JOB_NODELIST}
export APPTAINERENV_R_WLM_ABAQUSHOSTLIST="${R_WLM_ABAQUSHOSTLIST}"

This has to be run on every worker before invoking the training script.

TensorFlow expects the information about each worker in the JSON format provided via the TF_CONFIG environment variable. A Python script to parse the information provided by SLURM into the right format could look like this (NOTE: This custom script uses a site-specific environment variable R_WLM_ABAQUSHOSTLIST to get a pre-formatted list of all allocated nodes. This probably has to be adjusted depending on the used cluster):

import os
import json

def get_job_node_list_slurm():
    host_list_val       = eval(os.environ['R_WLM_ABAQUSHOSTLIST']) # or maybe parse SLURM_JOB_NODELIST
    host_list           = []
    for x in host_list_val:
        host_list.append(x[0])
    host_list = list(set(host_list))
    return host_list

def build_tf_config():
    # general settings
    port_range_start    = 23456
    tasks_per_node      = int(os.environ['SLURM_NTASKS_PER_NODE'])

    # create worker list
    list_hosts          = sorted(get_job_node_list_slurm())
    list_workers        = []
    for host in list_hosts:
        for i in range(tasks_per_node):
            list_workers.append(f"{host}:{port_range_start+i}")

    # create config and set environment variable
    tf_config = {
        'cluster': {
            'worker': list_workers
        },
        'task': {'type': 'worker', 'index': int(os.environ['SLURM_PROCID'])}
    }

    str_dump = json.dumps(tf_config)
    print(str_dump)

if __name__ == '__main__':
    build_tf_config()

This can then be used to set the environment variable to the right value on each worker before calling the training process:

export TF_CONFIG=$(python -W ignore tensorflow_create_tfconfig.py)

Horovod

Using Horovod simplifies the setup by not requiring additional scripts to setup the environment. Adapted for Horovod the native code translates to the following:

import horovod.keras as hvd

# Horovod setup
hvd.init()

# Pin GPU to be used to process local rank (one GPU per process)
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

 model = tf.keras.Sequential([
     tf.keras.layers.InputLayer(input_shape=(28, 28)),
     tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
     tf.keras.layers.Conv2D(32, 3, activation='relu'),
     tf.keras.layers.Flatten(),
     tf.keras.layers.Dense(128, activation='relu'),
     tf.keras.layers.Dense(10)
 ])
 # Horovod: adjust learning rate based on number of GPUs.
 opt = tf.keras.optimizers.Adadelta(0.015 * hvd.size())
 # Horovod: add Horovod Distributed Optimizer.
 opt = hvd.DistributedOptimizer(opt)
 model.compile(
     loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
     optimizer=opt,
     metrics=['accuracy'])

callbacks = [hvd.callbacks.BroadcastGlobalVariablesCallback(0),]

dataset = ... # Load dataset
model.fit(dataset, callbacks=callbacks, epochs=30, steps_per_epoch=70)

The script can then be called either by using the horovodrun command or by directly launching the processes through calls to srun, mpirun or similar.

Inference

Using a trained model for inference can be straightforward as loading a pre-trained model and feeding it with data samples. Also take a look at the [https://developer.nvidia.com/triton-inference-server Triton Inference Server] to use inference in production environments or improve inference throughput by pre-compiling a model through TensorRT

Distributed inference is much simpler than distributed training as there is no communication involved between the different instances. Inference can be scaled to an arbitrary amount of GPUs by simply starting single GPU inference processes on different parts of an inference dataset, but requires a large enough number of samples to scale efficiently.