Federated Learning Made Easy with FEDn

Part 1: Developing your own FL project

Hello there! This blog post has the goal of demonstrating to you how easy it is to federate your machine learning model with FEDn. In the end of this blog post, you will have the understanding of how to use FEDn and you will be able to federate your own project.

Federated learning is becoming increasingly popular, so I am assuming that you know what it is. If not, here are some great resources to get you on the right track.

FEDn is a popular framework for federated learning, developed by Scaleout. Learn more about it at this link.

The MNIST Dataset

Let’s assume that we have code available that defines and trains a machine learning model for handwritten digit classification on the MNIST dataset.

Example images from the MNIST dataset

Here’s the code that defines the model in PyTorch:

class Net(torch.nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = torch.nn.Linear(784, 64)
        self.fc2 = torch.nn.Linear(64, 32)
        self.fc3 = torch.nn.Linear(32, 10)

    def forward(self, x):
        x = torch.nn.functional.relu(self.fc1(x.reshape(x.size(0), 784)))
        x = torch.nn.functional.dropout(x, p=0.5, training=self.training)
        x = torch.nn.functional.relu(self.fc2(x))
        x = torch.nn.functional.log_softmax(self.fc3(x), dim=1)
        return x

     return Net()

And here’s how we usually train the model on the MNIST dataset:

# Download and load MNIST dataset
transform = transforms.Compose([transforms.ToTensor()])
train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

model = Net()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
criterion = torch.nn.NLLLoss()

# Training loop
epochs = 10
for epoch in range(epochs):  
    for batch_idx, (data, target) in enumerate(train_loader):  
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

Now we want to transfer this example into a federated setting. For this, we will split the MNIST dataset over different clients and use federated learning to train the model we defined above.

Federated Learning with FEDn

To federate our training with FEDn, we need to create a so-called “compute package”. This is a folder containing all necessary code to train your model in a federated setting. More importantly, it defines the model we want to use, how we want to train & evaluate it, and reads in the data we want to train on. It is the heart of your FL project. After we have created the compute package and uploaded it to FEDn, FEDn distributes the it across all clients. When we start the federated training, FEDn sends the initial model (seed model) to the clients and asks them to update the model parameters by training on their local data. FEDn receives the updated model parameters from the clients and aggregates them into a new global model. This process is repeated for a defined number of rounds.

The Compute Package — The Heart of your FL project

What is the compute package and how can we create our own? The compute package is essentially a folder that contains all the necessary code that FEDn uses to run your FL project. This includes files that specify the machine learning model that should be used, how the local data is loaded to a client, and how the model is trained on the local data.

FEDn is frame-work agnostic — even though we’re using Python in this blog post, you could use any programming language or machine learning framework in the compute package.

The figure above visualizes the files the compute package folder usually contains. FEDn calls the functions we define in these files, for example to train or validate the model. For this, FEDn uses so-called entry points that are defined in the fedn.yaml file which is shown below. For example for training, FEDn runs the code we define in the train.py file by executing the command “python train.py”.

python_env: python_env.yaml 

entry_points:
  build:
    command: python model.py
  startup:
    command: python data.py
  train:
    command: python train.py
  validate:
    command: python validate.py

Implementing the Compute Package

I will now walk you through each file in the compute package. The goal is to give you an understanding of the code, so you can implement your own compute package for your project.

data.py file: Splitting and loading the data

First, we download the MNIST dataset using the get_data function and save in the data folder.

def get_data(out_dir='data'):
    # Make dir if necessary
    if not os.path.exists(out_dir):
        os.mkdir(out_dir)

    # Only download if not already downloaded
    if not os.path.exists(f'{out_dir}/train'):
        torchvision.datasets.MNIST(
            root=f'{out_dir}/train', transform=torchvision.transforms.ToTensor, train=True, download=True)
    if not os.path.exists(f'{out_dir}/test'):
        torchvision.datasets.MNIST(
            root=f'{out_dir}/test', transform=torchvision.transforms.ToTensor, train=False, download=True)

Then, we randomly split the data across the clients. In this case, every client will have access to the same amount of data. For client 1, the data will be stored in the folder data/clients/1. This function, together with the get_data function from above, is called before we start with the training (more precisely, the two functions are called by FEDn through the entry point “startup”).

def split(out_dir='data'):
    n_splits = int(os.environ.get("FEDN_NUM_DATA_SPLITS", 2))
    # Make dir
    if not os.path.exists(f'{out_dir}/clients'):
        os.mkdir(f'{out_dir}/clients')

    # Load and convert to dict
    train_data = torchvision.datasets.MNIST(
        root=f'{out_dir}/train', transform=torchvision.transforms.ToTensor, train=True)
    test_data = torchvision.datasets.MNIST(
        root=f'{out_dir}/test', transform=torchvision.transforms.ToTensor, train=False)
    data = {
        'x_train': splitset(train_data.data, n_splits),
        'y_train': splitset(train_data.targets, n_splits),
        'x_test': splitset(test_data.data, n_splits),
        'y_test': splitset(test_data.targets, n_splits),
    }
    # Make splits
    for i in range(n_splits):
        subdir = f'{out_dir}/clients/{str(i+1)}'
        if not os.path.exists(subdir):
            os.mkdir(subdir)
        torch.save({
            'x_train': data['x_train'][i],
            'y_train': data['y_train'][i],
            'x_test': data['x_test'][i],
            'y_test': data['y_test'][i],
        },
            f'{subdir}/mnist.pt')

In the load_data function, we specify how FEDn should load the data for a client. This function will be called when a clients starts model training.

def load_data(data_path, is_train=True):
    if data_path is None:
        data_path = os.environ.get("FEDN_DATA_PATH", abs_path+'/data/clients/1/mnist.pt')
    data = torch.load(data_path)
    if is_train:
        X = data['x_train']
        y = data['y_train']
    else:
        X = data['x_test']
        y = data['y_test']
    # Normalize
    X = X / 255
    return X, y

model.py file: Defining our model

Now it’s time to define the model we want to federate! We do this in the compile_model() function. We will use the same model as we defined above in the centralized setting. We can just paste in the code from above!

On top of the file, we initialize the so-called numpyhelper. It transforms the model parameters to numpy format so they can be transferred between the clients and FEDn. If you’re building your own project using frameworks such as PyTorch or TensorFlow, there is no need to change the helper. Learn more here.
HELPER_MODULE = 'numpyhelper'
helper = get_helper(HELPER_MODULE)

def compile_model():
    class Net(torch.nn.Module):
        def __init__(self):
            super(Net, self).__init__()
            self.fc1 = torch.nn.Linear(784, 64)
            self.fc2 = torch.nn.Linear(64, 32)
            self.fc3 = torch.nn.Linear(32, 10)

        def forward(self, x):
            x = torch.nn.functional.relu(self.fc1(x.reshape(x.size(0), 784)))
            x = torch.nn.functional.dropout(x, p=0.5, training=self.training)
            x = torch.nn.functional.relu(self.fc2(x))
            x = torch.nn.functional.log_softmax(self.fc3(x), dim=1)
            return x

    return Net()

In order to send the current model parameters after a training round, and to read in the new global model parameters in the beginning of a training round, we define save_parameters() and load_parameters() functions. For this, we use the numpyhelper which saves our updated PyTorch model parameters as a numpy array. In the beginning of a training round, it loads the new global model from numpy format and transforms it back to PyTorch format.

If you’re building your own project using frameworks such as PyTorch or TensorFlow, there’s no need to change the save_parameters or load_parameters function.

def save_parameters(model, out_path):
    parameters_np = [val.cpu().numpy() for _, val in model.state_dict().items()]
    helper.save(parameters_np, out_path)

def load_parameters(model_path):
    model = compile_model()
    parameters_np = helper.load(model_path)

    params_dict = zip(model.state_dict().keys(), parameters_np)
    state_dict = collections.OrderedDict({key: torch.tensor(x) for key, x in params_dict})
    model.load_state_dict(state_dict, strict=True)
    return model

Finally, the init_seed function is used to create the first global model (seed model). We upload the seed model to FEDn, which sends it to the clients as an initial model in the first training round.

def init_seed(out_path='seed.npz'):
    # Init and save
    model = compile_model()
    save_parameters(model, out_path)

train.py: Model training

The train.py file contains all the code needed to train the model. The training is identical to the centralized setting, but notice that we only train for 1 epoch in each training round. After the training is finished, the new model parameters are saved via save_parameters which we already defined in the model.py file. After each training round, the updated model parameters of all clients are aggregated by FEDn to a new global model.

def train(in_model_path, out_model_path, data_path=None, batch_size=32, epochs=1, lr=0.01):
    # Load data
    x_train, y_train = load_data(data_path)

    # Load parmeters and initialize model
    model = load_parameters(in_model_path)

    # Train
    optimizer = torch.optim.SGD(model.parameters(), lr=lr)
    n_batches = int(math.ceil(len(x_train) / batch_size))
    criterion = torch.nn.NLLLoss()
    for e in range(epochs):  # epoch loop
        for b in range(n_batches):  # batch loop
            # Retrieve current batch
            batch_x = x_train[b * batch_size : (b + 1) * batch_size]
            batch_y = y_train[b * batch_size : (b + 1) * batch_size]
            # Train on batch
            optimizer.zero_grad()
            outputs = model(batch_x)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
            # Log
            if b % 100 == 0:
                print(f"Epoch {e}/{epochs-1} | Batch: {b}/{n_batches-1} | Loss: {loss.item()}")

    # Metadata needed for aggregation server side
    metadata = {
        # num_examples are mandatory
        "num_examples": len(x_train),
        "batch_size": batch_size,
        "epochs": epochs,
        "lr": lr,
    }

    # Save JSON metadata file (mandatory)
    save_metadata(metadata, out_model_path)

    # Save model update (mandatory)
    save_parameters(model, out_model_path)

validate.py: Evaluating the model

After the clients have produced their model updates, the new aggregated model is validated on the local client test data. The code for it is located in the validate.py file. The metrics, such as loss or accuracy, are saved and will later be displayed by FEDn. The metrics can also be downloaded after training.

def validate(in_model_path, out_json_path, data_path=None):
    # Load data
    x_train, y_train = load_data(data_path)
    x_test, y_test = load_data(data_path, is_train=False)

    # Load model
    model = load_parameters(in_model_path)
    model.eval()

    # Evaluate
    criterion = torch.nn.NLLLoss()
    with torch.no_grad():
        train_out = model(x_train)
        training_loss = criterion(train_out, y_train)
        training_accuracy = torch.sum(torch.argmax(train_out, dim=1) == y_train) / len(train_out)
        test_out = model(x_test)
        test_loss = criterion(test_out, y_test)
        test_accuracy = torch.sum(torch.argmax(test_out, dim=1) == y_test) / len(test_out)

    # JSON schema
    report = {
        "training_loss": training_loss.item(),
        "training_accuracy": training_accuracy.item(),
        "test_loss": test_loss.item(),
        "test_accuracy": test_accuracy.item(),
    }

    # Save JSON
    save_metrics(report, out_json_path)

python_env.yaml: Managing the Python libraries

The python_env.yaml defines all necessary dependencies that are required to run the code. In this case, we want to install FEDn, torch, torchvision, and numpy. FEDn builds a virtual environment with the name mnist-pytorch containing the specified libraries defined here.

name: mnist-pytorch
build_dependencies:
  - pip
  - setuptools
  - wheel==0.37.1
dependencies:
  - torch==2.2.1
  - torchvision==0.17.1
  - fedn==0.9.0

That’s it! We finally got our compute package. We now have all the code necessary to start with the actual federated training with FEDn.

All code I presented here is available here.

Here are some useful resources to help you understand the compute package and develop your own.

By Jonas Frankemölle, Machine Learning Engineer