Anomaly Detection in TensorFlow and Keras Using the Autoencoder Method
Anomaly Detection in Tensorflow

Anomaly Detection in TensorFlow and Keras Using the Autoencoder Method

All the tutorials about TensorFlow and neural networks I have shared until now have been about supervised learning. This one will be about the Autoenocder which is an unsupervised learning technique. If I want to express it simply, autoencoders reduce the noises from the data by compressing the input data, and encoding and reconstructing the data. That way autoencoders can reduce the dimensionality or the noise of the data and focus on the real focal point of the input data.

As you can see from the introduction to the autoencoders here there is more than one process required.

  1. First, a model to compress the input data which is the encoder model.
  2. Then another model to reconstruct the compressed data that should be as close as the input data which is a decoder model.

In this process, it can remove the noise, reduce the dimensionality, and clear up the input data.

In this tutorial, I will explain in detail how an autoencoder works with a working example.

For this example, I chose to use a public dataset (Apache License 2.0) named deep_weeds.

import tensorflow as tf
import tensorflow_datasets as tfds
ds = tfds.load('deep_weeds', split='train', shuffle_files=True)

Data Preparation

We need to prepare a dataset for this unsupervised anomaly detection example. Only one class will be taken as our main class that will be considered as the valid class. And I will put a few data from another class as an anomaly. Then we will develop the model to see if we can find that few anomaly data.

I chose class 5 as the valid class and class 1 as the anomaly. In the code block below, I am taking all the data of classes 5 and 1 first and creating lists of the images and their corresponding labels.

import numpy as np
images_main = []
images_anomaly = []
labels_main= []
labels_anomaly = []
ds = ds.prefetch(tf.data.AUTOTUNE)
for example in ds:
#print(np.array(example['label']))
if np.array(example['label']) == 5:
images_main.append(example["image"])
labels_main.append(example["label"])
if np.array(example['label']) == 1:
images_anomaly.append(example["image"])
labels_anomaly.append(example["label"])

Let’s see the shape of the main image (images of class 5) data here:

np.array(images_main).shape

Output:

(1009, 256, 256, 3)

The image shapes are (256, 256, 3) and we have a total of 1009 data for class 5.

However, we do not need all the data from class 1. Because class 1 is the anomaly class. So, only 1% of the class 1 data will be taken for the training.

parc = round(len(labels_anomaly) * 0.01)
images_anomaly = np.array(images_anomaly)[:parc]
# stacking the main images and anomaly images together
total_images = np.vstack([images_main, images_anomaly])

The shape of the total_images:

total_images.shape

Output:

(1020, 256, 256, 3)

We have a total of 1020 images for training. As we saw earlier, we have 1009 class 5 images, and we took 1020–1009 = 11 of class 1 images which is our anomaly.

Let’s see if we can develop an autoencoder model in Keras and Tensorflow to detect these anomalies.

Model Development

This is the fun part! But first, we should do the necessary imports:

# import the necessary packages
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import Conv2DTranspose
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.layers import Activation
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Reshape
from tensorflow.keras.layers import Input
from tensorflow.keras.models import Model
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.datasets import mnist
from tensorflow.keras.models import load_model
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import numpy as np
import random
import cv2

Some of the data should be kept separately for testing purposes. The train_test_split method from the sklearn library can be used for that. Remember, as this is an unsupervised learning method, the labels are not necessary. We will only split the images.

(train_x, test_x) = train_test_split(total_images, test_size=0.2, random_state=0)

Finally, the autoencoder model. We will build a Convolution_Autoencoder class which is a Convolutional Neural Network. The class has the build method where we will define the Autoencoder model.

The ‘build’ takes width, depth, height, filters, and latentDim as parameters. Here, width, depth, and height are the dimensions of the images that is (256, 256, 3) for us as we have seen with the total_images.shape method above.

The parameter ‘filters’ is the filter for the convolution layers.

The ‘latentDim’ is the size of our compressed layer after the encoder method.

In this build method, the first part is an encoder model which is a simple Convolutional Neural Network.

Once the encoder portion is done, a decoder model is developed using Conv2DTranspose layers to reconstruct the data again.

Then, we construct the autoencoder model which is actually a combination of both encoder and decoder models.

Finally, we return the encoder, decoder, and autoencoder models.

class Convolution_Autoencoder:
@staticmethod
def build(width, height, depth, filters=(16, 32, 64), latentDim=32):
input_shape = (height, width, depth)
chanDim = -1

inputs = Input(shape=input_shape)
x = inputs

for f in filters:
x = Conv2D(f, (3, 3), strides = 2, padding=“same”)(x)
x = LeakyReLU(alpha=0.3)(x)
x = BatchNormalization(axis=chanDim)(x)

volume = K.int_shape(x)
x = Flatten()(x)
latent = Dense(latentDim)(x)

#encoder model
encoder = Model(inputs, latent, name=“encoder”)

#compressed representation
latent_layer_input = Input(shape=(latentDim,))
x = Dense(np.prod(volume[1:]))(latent_layer_input)

x = Reshape((volume[1], volume[2], volume[3]))(x)

#Recostructing the image with a decoder model
for f in filters[::-1]:
x = Conv2DTranspose(f, (3, 3), strides=2, padding=“same”)(x)
x = LeakyReLU(alpha=0.3)(x)
x = BatchNormalization(axis=chanDim)(x)

x = Conv2DTranspose(depth, (3, 3), padding=“same”)(x)

outputs = Activation(“sigmoid”)(x)

decoder = Model(latent_layer_input, outputs, name=“decoder”)

autoencoder = Model(inputs, decoder(encoder(inputs)), name=“autoencoder”)

return (encoder, decoder, autoencoder)

Model development is done. It’s time to run the model and see if it works. It should run like any other TensorFlow model.

Here we will compile the model first with Adam optimizer. And also, I used a decay in the learning rate and the ‘mse’ as the loss.

epochs = 50
lr_start = 0.001
batchSize = 32

(encoder, decoder, autoencoder) = Convolution_Autoencoder.build(256, 256, 3)
opt = tf.keras.optimizers.legacy.Adam(lr = lr_start, decay = lr_start / epochs)
autoencoder.compile(loss = “mse”, optimizer = opt)

Finally, running the model. Remember, this is an unsupervised learning method. So there won’t be any label in the model training. Instead, we need to pass two training features which will be just train_x twice. If you notice the build method in the Convolution_Autoencoder class, autoencoder looks like this there:

autoencoder = Model(inputs, decoder(encoder(inputs)), name="autoencoder")

In the Model above, we need to pass inputs which is train_x first, and then decoder(encoder(inputs)) where we need to pass the train_x again. Same for the test_x as well.

Before you begin the mode training, I should warn you that it is very slow in the default setting of Google Colab. You can make it way faster by running this in the GPU. Please change the settings of your Google Colab notebook before you run this.

history = autoencoder.fit(
train_x, train_x,
validation_data=(test_x, test_x),
epochs=30,
batch_size=batchSize)

Output:

Epoch 1/30
26/26 [==============================] - 15s 157ms/step - loss: 12963.2842 - val_loss: 13428.3906
Epoch 2/30
26/26 [==============================] - 2s 87ms/step - loss: 12924.1787 - val_loss: 13392.3418
Epoch 3/30
26/26 [==============================] - 2s 88ms/step - loss: 12911.4551 - val_loss: 13401.3350
Epoch 4/30
26/26 [==============================] - 2s 92ms/step - loss: 12905.8975 - val_loss: 13344.5596
...
...
Epoch 27/30
26/26 [==============================] - 2s 89ms/step - loss: 12890.9102 - val_loss: 13322.1299
Epoch 28/30
26/26 [==============================] - 2s 89ms/step - loss: 12890.8701 - val_loss: 13322.0820
Epoch 29/30
26/26 [==============================] - 2s 89ms/step - loss: 12890.8428 - val_loss: 13322.0488

As you can see there are not many changes to losses, simply because here we do not have labels. Instead, we pass the training features to it twice. Losses come from comparing the original images to the reconstructed images by autoencoders.

Model Evaluation

Model evaluation is different than a regular supervised learning model in autoencoders as this is not a supervised learning method. Let’s do that step by step.

First, we will do the prediction as usual, which will be the decoded images by the autoencoder model.

Then, you calculate the mean squared error using the original errors and the reconstructed error and save it to the ‘errors’ list. Here is the code for that.

decoded = autoencoder.predict(test_x)
errors = []

for (image, recon) in zip(total_images, decoded):
mse = np.mean((image – recon) ** 2)
errors.append(mse)

As we have the ‘mse’ for all the images in the test set, we choose a threshold. Here I am using 95% quantile using np. quantile method and getting indices from the ‘errors’ where ‘mse’ is greater than the threshold. When ‘mse’ is greater than the threshold error we decided we will consider them as an anomaly.

thresh = np.quantile(errors, 0.95)
idxs = np.where(np.array(errors) >= thresh)[0]
idxs

Output:

array([  9,  10,  35,  59,  84, 134, 146, 188, 200, 201, 202])

Now, let’s get back to the image dataset ‘total_images’ that we prepared for the training earlier. We need to check if the indices we have which are more than the threshold are actually the anomaly:

for i in idxs:
if total_images[i] in images_anomaly:
print(True)

Output:

True
True
True
True
True
True
True
True
True
True
True

Yes!! They are all anomaly data. If you count the number of ‘True’ above we have 11 ‘True’ here. We can check how many anomaly data we originally had in the ‘images_anomaly’:

len(images_anomaly)

Output:

11

So, we found all the anomaly data using the autoencoder model.

Conclusion

I have another anomaly detection tutorial that uses probability to find the anomaly. Please check the ‘More Reading’ section below. Here we used TensorFlow and Keras which are much more advanced tools for images and more complex data. As I mentioned in the Introduction, autoencoders can be used in a variety of other tasks as well. I will be sharing more use cases in my future posts on autoencoders and also more cutting-edge techniques in TensorFlow and Keras.

Feel free to follow me on Twitter and like my Facebook page.

 

#DataScience #MachineLearning #ArtificialIntelligence #Tensorflow #Python #DataAnalytics

Leave a Reply

Close Menu