Skip to main content

Real use case: Machine Learning

Welcome to the fifth part of our tutorial series. After getting comfortable with running basic HPC tasks on the DeepSquare platform, we're now ready to delve into more advanced features. In this tutorial, we'll be harnessing the full potential of our supercomputers through multi-node distributed training, powered by MPI and Horovod.

MPI and DeepSquare

DeepSquare leverages the power of supercomputers, primarily designed for robust high-performance computing tasks, to handle complex and data-intensive deep learning models. These supercomputers, different from standard cloud servers, utilize specialized hardware like GPUs or TPUs and parallel processing techniques for optimal performance.

A critical enabler of this high-performance parallel computing is the Message Passing Interface (MPI). This communication protocol, extensively used in supercomputers, facilitates data exchange and coordination between different processors and nodes during parallel computing tasks. The unique advantages of MPI include scalability, flexibility, low-latency, and high-bandwidth communication, making it an integral component of DeepSquare's ecosystem and many scientific and engineering applications.

Adapting the ML example for distributed training

Modifying the source code to allow distributed training

To distribute the training of our machine learning models across multiple nodes, we'll be leveraging Horovod, an open-source tool designed for distributed training of deep learning models.

Let's take our previous example from Part 3, the Deep Layer Aggregation model used on CIFAR 10.

  1. Initialize Horovod: Begin by initializing Horovod at the start of your script with:

    hvd.init()

    at the initialization of the script. hvd.init() will load the environment variables from MPI.

  2. Assign a GPU per process: Check if a GPU is available and assign one to each process using hvd.local_rank()

    if torch.cuda.is_available():
    torch.cuda.set_device(hvd.local_rank())
  3. Partition dataset among workers:

    We use PyTorch's DistributedSampler.

    trainset = torchvision.datasets.CIFAR10(train=True, ...)
    trainsampler = torch.utils.data.distributed.DistributedSampler(
    trainset, num_replicas=hvd.size(), rank=hvd.rank()
    )
    trainloader = torch.utils.data.DataLoader(
    trainset, batch_size=128, sampler=trainsampler
    )

    testset = torchvision.datasets.CIFAR10(train=False, ...)
    testsampler = torch.utils.data.distributed.DistributedSampler(
    testset, num_replicas=hvd.size(), rank=hvd.rank()
    )
    testloader = torch.utils.data.DataLoader(
    testset, batch_size=100, sampler=testsampler
    )
  4. Wrap the optimizer: Use the hvd.DistributedOptimizer to wrap your optimizer.

    # The optimizer was the Stochastic Gradient Descent optimizer
    optimizer = optim.SGD(model.parameters())
    optimizer = hvd.DistributedOptimizer(
    optimizer, named_parameters=model.named_parameters()
    )
  5. Broadcast the initial variable states: Ensure all processes are on the same page by broadcasting the initial variable states from rank 0 to all other processes.

    hvd.broadcast_parameters(model.state_dict(), root_rank=0)
  6. Save checkpoints on worker 0 only: Modify your code to save checkpoints only on worker 0 to prevent potential corruption from other workers.

    def test(epoch: int):
    global BEST_ACC

    # Use evaluation/test mode
    model.eval()

    # ...

    # For each batch
    for batch_idx, (inputs, targets) in enumerate(testloader):
    # ...

    outputs = model(inputs) # Test
    loss = criterion(outputs, targets) # Compute loss

    # ... # Count number of corrects

    acc = 100.0 * correct / len(testsampler) # Compute accuracy = 100 * correct / total
    if acc > BEST_ACC and hvd.rank() == 0: # Checkpoint only if the model is better and only on worker 0.
    print("Saving..")
    state = {
    "net": model.state_dict(),
    "acc": acc,
    "epoch": epoch,
    }
    torch.save(state, "path/to/checkpoint.pth")
    BEST_ACC = acc

    You would load the model like this:

    checkpoint = torch.load("path/to/checkpoint.pth")
    model.load_state_dict(checkpoint["net"])
    BEST_ACC = checkpoint["acc"]
    start_epoch = checkpoint["epoch"]

As we allocate a GPU per process, the resource allocation and usage will change accordingly.

Here's the completed code. The code includes options such as --no-cuda to disable GPU, or --horovod to enable Horovod. As a result, you can compare the code with or without Horovod, taking into account or not args.horovod, respectively.

To learn more about Horovod, please read the official documentation of Horovod.

Adapting the workflow to run on MPI

Now, let's look at how to incorporate these changes in different types of workflows:

Workflow with 4 tasks and 1 GPU per task
resources:
tasks: 4
gpus: 4
cpusPerTask: 8
memPerCpu: 2048

enableLogging: true

## Load environment variables
env:
## Configure point-to-point messaging layer to use UCX.
- key: "OMPI_MCA_pml"
value: "ucx"
## Configure the byte transfer layer. The "^" prefix means "to exclude". vader, tcp, openib and uct have been excluded.
## vader being the shared-memory transport. It is recommended to not use any of the BTL when using UCX.
## UCX already uses its own transports (see ucx_info -d).
- key: "OMPI_MCA_btl",
value: "^vader,tcp,openib,uct"

input:
http:
url: https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz

output:
s3:
region: at-vie-1
bucketUrl: s3://cifar10
path: "/"
accessKeyId: EXO***
secretAccessKey: "***"
endpointUrl: https://sos-at-vie-1.exo.io
continuousOutputSync: true

steps:
- name: train
run:
command: /.venv/bin/python3 main.py --horovod --checkpoint_out=$DEEPSQUARE_OUTPUT/ckpt.pth --dataset=$DEEPSQUARE_INPUT/
resources:
tasks: 4
gpusPerTask: 1
container:
image: deepsquare-io/cifar-10-example:latest
registry: ghcr.io
workDir: "/app"

Congrats! You've now learned how to supercharge your HPC workloads with MPI and Horovod on DeepSquare.

The performance of HPC clusters heavily depends on the effectiveness of the message passing interface (MPI) utilized to distribute tasks across the nodes. DeepSquare offers a decentralized network of HPC clusters, eliminating the need to maintain personal infrastructure, while providing an efficient MPI-based communication framework to enable seamless distributed computing.

The combination of DeepSquare's capacity and MPI creates a potent solution for scaling HPC workloads. This approach offers researchers and engineers a cost-effective solution for HPC, enabling them to focus on developing new technologies and unlocking new possibilities without the added burden of infrastructure maintenance.

What's next?

Although we've finished our getting started guide, there are still great features that you can use to speed up your research and development.

You can also read the guides to learn about the advanced features of DeepSquare.

Want to create your own application with DeepSquare? You might be interested in the DeepSquare SDK.

Don't forget! If you are lost when writing a workflow, you can use the Workflow API reference as your companion.