Revision 31eaad34de64ee2bee3248c165fd931bf222916a (click the page title to view the current version)

Python/cnn.py

#!/usr/bin/env python3

import torch
import torch.nn as nn, torch.nn.functional as F
import pandas as pd
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt

import numpy as np
import sklearn.metrics as skl

DEBUG=True


def plotImages(train_images,train_labels,fn="test.png"):
    """
    Plot the six first images in the given dataset.
    """
    for i in range(0, 6):
        plt.subplot(160 + (i+1))
        plt.imshow(train_images[i].reshape(28,28), cmap=plt.get_cmap('gray'))
        plt.title(train_labels[i])
    if fn:
        plt.savefig(fn)

def getTensorDataSet(images,labels,debug=DEBUG):
    """
    Transform the dataset given as images and labels into the
    tensor format used by PyTorch.

    From Listing 6-3 (Ketkar 2021).
    """
    images_tensor = torch.tensor(images)/255.0
    images_tensor = images_tensor.view(-1,1,28,28)
    labels_tensor = torch.tensor(labels)
    if debug:
        print("Labels Shape:",labels_tensor.shape)
        print("Images Shape:",images_tensor.shape)
    return TensorDataset(images_tensor, labels_tensor)

# Listing 6-4. Defining the CNN and the Helper Functions

class ConvNet(nn.Module):
    """
    Simple convolutional network for demonstration.
    """
    def __init__(self, num_classes=10):
        "The constructor defines the elements of the network."
        super(ConvNet, self).__init__()

        self.conv_unit_1 = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))

        self.conv_unit_2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2,stride=2))

        self.fc1 = nn.Linear(7*7*32, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        "The forward method defines how data flows through the network."
        out = self.conv_unit_1(x)
        out = self.conv_unit_2(out)
        out = out.view(out.size(0), -1)
        out = self.fc1(out)
        out = self.fc2(out)
        out = F.log_softmax(out,dim=1)
        return out


class ML:
    "Class to set up a CNN system with all the necessary components."
    def __init__(self):
        "The constructor defines the machine learning system. "
        self.device = None # torch.device('cpu')
        self.model = ConvNet(10)
        if self.device: self.model = self.model.to(device)
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=0.001) 
    def addData(self,train_tensor,val_tensor):
        """Load Train and Validation TensorDatasets into the data generator 
        for Training"""
        self.train_loader = DataLoader(train_tensor, batch_size=64,
                 num_workers=2, shuffle=True)
        self.val_loader = DataLoader(val_tensor, batch_size=64,
                 num_workers=2, shuffle=True)
    def trainModel(self,num_epochs=5):
        """Train the ML system.

        Return loss and accuracy for every epoch, both for the training
        and validation sets.
        """
        total_step = len(self.train_loader)
        tr = self.evaluate(self.train_loader)
        vr = self.evaluate(self.val_loader)
        tl = [tr]
        vl = [vr]
        print(f'Average Training Loss: {tr[0]:.4f} '
                  f'Accuracy: {100.0*tr[1]:.3f}%)')
        print(f'Average Validation Loss: {vr[0]:.4f} '
                    f'Accuracy: {100.0*vr[1]:.3f}%)')

        for epoch in range(num_epochs):
            for i, (images, labels) in enumerate(self.train_loader):
                if self.device:
                    images = images.to(device)
                    labels = labels.to(device)

                outputs = self.model(images)
                loss = self.criterion(outputs, labels)

                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()

            print ('Epoch [{}/{}], Loss: {:.4f}' .format(epoch+1,
                   num_epochs, loss.item()))
            tr = self.evaluate(self.train_loader)
            vr = self.evaluate(self.val_loader)
            tl.append(tr)
            vl.append(vr)
            print(f'Average Training Loss: {tr[0]:.4f} '
                  f'Accuracy: {100.0*tr[1]:.3f}%)')
            print(f'Average Validation Loss: {vr[0]:.4f} '
                    f'Accuracy: {100.0*vr[1]:.3f}%)')
        return ( vl, tl )

    def evaluate(self,data_loader):
        """
        Evaluate the model on the given data set.
        Return the average loss (according to the loss function and
        the accuracy.
        """
        self.model.eval()
        loss = 0
        correct = 0
        for data, target in data_loader:
            if self.device:
                data = data.to(device)
                target = target.to(device)
            output = self.model(data)
            loss += F.cross_entropy(output, target, size_average=False).data.item()
            predicted = output.data.max(1, keepdim=True)[1]
            correct += (target.reshape(-1,1) == predicted.
            reshape(-1,1)).float().sum()
        loss /= len(data_loader.dataset)
        return ( loss, correct / len(data_loader.dataset) )
    
        # print(f'Average Loss: {loss:.4f}, Accuracy: {correct}/{len(data_loader.dataset)} ({100. * correct / len(data_loader.dataset):.3f}%)')
    def training_predictions(self):
        """
        Get predictions on the the training set, returning a pair
        of actual classes and predicted class.
        """
        return self.make_predictions(self.train_loader)
    def make_predictions(self,data_loader=None):
        """
        Get predictions on the the validation set, returning a pair
        of actual classes and predicted class.

        The `data_loader` argument can be given to make predictions on 
        a different dataset.
        """
        if data_loader == None: data_loader = self.val_loader
        self.model.eval()
        test_preds = torch.LongTensor()
        actual = torch.LongTensor()
    
        for data, target in data_loader:
            if self.device: data = data.to(self.device)
            output = self.model(data)
            # Predict  output/Take the index of the output with max value
            preds = output.cpu().data.max(1, keepdim=True)[1]

            # Combine tensors from each batch
            test_preds = torch.cat((test_preds, preds), dim=0)
            actual = torch.cat((actual,target),dim=0)
        return actual,test_preds
    def getData( self, fn="kaggle/train.csv", testsize=0.2 ):
        """
        Load a CSV file and split it into training and validation sets.
    
        The CSV contains a flat file of images, i.e. each 28*28 image is
        flattened into a row of 784 colums (1 column represents a pixel value).
        For CNN, we would need to reshape this to our desired shape.
       
        Sample data can be found at 
        URL: https://www.kaggle.com/c/digit-recognizer/data
    
        From Listing 6-3 (Ketkar 2021).
        """
    
        train_df = pd.read_csv(fn)
        train_labels = train_df['label'].values
        train_images = (train_df.iloc[:,1:].values).astype('float32')
        self.train_images, self.val_images, \
                self.train_labels, self.val_labels = train_test_split(
                  train_images, train_labels, random_state=2020, test_size=testsize)
    
        # Reshape the flat row into [#images,#Channels,#Width, Height]
        # grayscale -> just 1 channel
    
        self.train_images = self.train_images.reshape(
                self.train_images.shape[0],1,28, 28)
        self.val_images = self.val_images.reshape(
                self.val_images.shape[0],1,28, 28)
        self.addData( getTensorDataSet(self.train_images, self.train_labels),
                      getTensorDataSet(self.val_images,self.val_labels) )

    def plotImages(self):
        """Plot the six first images of the training set"""
        return plotImages(self.train_images,self.train_labels)

if __name__ == "__main__":

    ml = ML()
    print(ml.model)

# Normally, we would use a large training set; taking only 20% of
# the data for testing is a typical rule of thumb.  However, it
# is interesting to see how the network behaves with very small
# training sets.  Here we have taken 99.84% of the data for testing,
# leaving 67 images for training.  You should play with the numbers.

    ml.getData( fn="kaggle/train.csv", testsize=0.9984 )
    ml.plotImages()

# For large test sets, 250 epochs is a lot, but with small test sets,
# the results may be quite erratic, and we use a lot of epochs to 
# see if it stabilises.

    (vl, tl) = ml.trainModel(num_epochs=250)

# The training method returns loss and accuracy per epoch, both
# for the training set (tl) and the validation set (vl).
# We plot the accuracies below.

    vl = [ x[1].item() for x in vl ]
    tl = [ x[1].item() for x in tl ]
    xrange = range(len(vl))

    plt.figure()
    plt.plot( xrange, vl, "+r", label="Validation" )
    plt.plot( xrange, tl, "xb", label="Training" )
    plt.legend()
    plt.xlabel("epoch")
    plt.ylabel("accuracy")
    plt.savefig( "accuracy.png" )

# The confusion matrix is a useful tool to assess the performance.
# This is calculated for us by the SciKit library.

    actual, predicted = ml.make_predictions()
    actual = np.array(actual).reshape(-1,1)
    predicted = np.array(predicted).reshape(-1,1)
    print("Validation Accuracy-",round(skl.accuracy_score(actual, predicted),4)*100)
    print("\nConfusion Matrix\n",skl.confusion_matrix(actual,predicted))