玩命加载中...
# 利用Autoencoder进行无监督异常检测
Autoencoder算法是一种常见的基于神经网络的无监督学习降维方法([其他常见降维方法](http://sofasofa.io/forum_main_post.php?postid=1000442))。
本教程中,我们利用python keras实现Autoencoder,并在信用卡欺诈数据集上实践。
完整代码在第4节。
预计学习用时:30分钟。
本教程基于**Python 3.6**。
原创者:**SofaSofa TeamM** | 修改校对:SofaSofa TeamC |
----
### 1. Autoencoder简介
Autoencoder,中文称作自编码器,是一种无监督式学习模型。本质上它使用了一个神经网络来产生一个高维输入的低维表示。Autoencoder与主成分分析PCA类似,但是Autoencoder在使用非线性激活函数时克服了PCA线性的限制。
Autoencoder包含两个主要的部分,encoder(编码器)和 decoder(解码器)。Encoder的作用是用来发现给定数据的压缩表示,decoder是用来重建原始输入。在训练时,decoder 强迫 autoencoder 选择最有信息量的特征,最终保存在压缩表示中。最终压缩后的表示就在中间的coder层当中。
以下图为例,原始数据的维度是10,encoder和decoder分别有两层,中间的coder共有3个节点,也就是说原始数据被降到了只有3维。Decoder根据降维后的数据再重建原始数据,重新得到10维的输出。从Input到Ouptut的这个过程中,autoencoder实际上也起到了降噪的作用。
### 2. Autoencoder无监督异常检测
异常检测(anomaly detection)通常分为有监督和无监督两种情形。在无监督的情况下,我们没有异常样本用来学习,而算法的基本上假设是异常点服从不同的分布。根据正常数据训练出来的Autoencoder,能够将正常样本重建还原,但是却无法将异于正常分布的数据点较好地还原,导致还原误差较大。
如果样本的特征都是数值变量,我们可以用MSE或者MAE作为还原误差。例如上图,如果输入样本为
$$X=(X\_1,X\_2,\\cdots, X\_{10})$$
经过Autoencoder重建的结果为
$$X^R=(X^R\_1,X^R\_2,\cdots, X^R\_{10}).$$
还原误差MSE为
$$\frac{1}{10}\sum\_{i=1}^{10}(X\_i-X\_i^R)^2$$
还原误差MAE为
$$\frac{1}{10}\sum\_{i=1}^{10}\left|X\_i-X\_i^R\right|$$
**当还原误差大于某个阈值时,我们将其标记为异常值**。
### 3. 利用Antoencoder检测信用卡欺诈
下面我们利用Keras来构造autoencoder,并在信用卡数据上实践。完整代码在第4节。
首先,我们先读取数据([数据下载,提取码njxn](https://pan.baidu.com/s/1U6dJX31sGHTxkPfw-4Lovg))
```python
d = pd.read_csv('SofaSofa_Anomaly.csv')
```
我们看到样本中欺诈数(阳性样本)的比例是非常非常低的。
```python
num_nonfraud = np.sum(d['Class'] == 0)
num_fraud = np.sum(d['Class'] == 1)
plt.bar(['Fraud', 'non-fraud'], [num_fraud, num_nonfraud], color='dodgerblue')
plt.show()
```
这个数据集已经已经过了预处理,特征`V1`到`V2`是数值变量,我们只需要对`Time`和`Amount`进行处理。
```python
data = d.drop(['Time'], axis=1)
data['Amount'] = StandardScaler().fit_transform(data[['Amount']])
```
下面我们只用负样本做训练集,并设置如下参数来训练autoencoder。这个网络共有四层,激活函数分别是`tanh`,`relu`,`tanh`,`relu`。
```python
encoding_dim = 16
num_epoch = 50
batch_size = 32
encoding_dim = 16
num_epoch = 50
batch_size = 32
input_layer = Input(shape=(input_dim, ))
encoder = Dense(encoding_dim, activation="tanh",
activity_regularizer=regularizers.l1(10e-5))(input_layer)
encoder = Dense(int(encoding_dim / 2), activation="relu")(encoder)
decoder = Dense(int(encoding_dim / 2), activation='tanh')(encoder)
decoder = Dense(input_dim, activation='relu')(decoder)
autoencoder = Model(inputs=input_layer, outputs=decoder)
autoencoder.compile(optimizer='adam',
loss='mean_squared_error',
metrics=['mae'])
checkpointer = ModelCheckpoint(filepath="SofaSofa_model.h5",
verbose=0,
save_best_only=True)
history = autoencoder.fit(X_train, X_train,
epochs=num_epoch,
batch_size=batch_size,
shuffle=True,
validation_data=(X_test, X_test),
verbose=1).history
```
Autoencoder的训练和测试的MAE和MSE分别为
下面利用训练好的autoencoder,重建测试集
```python
pred_test = autoencoder.predict(X_test)
pred_fraud = autoencoder.predict(X_fraud)
```
下图分别是还原结果的MAE和MSE,其中**橘黄色的点是信用欺诈,也就是异常点;蓝色是正常点**。我们可以看出异常点的还原误差明显很高。
不管是用MAE还是MSE作为划分标准,模型的表现都算是很好的。**PR AUC分别是0.51和0.44,而ROC AUC都达到了0.95**。
更进一步,我们可以同时用MAE和MSE作为划分标准去优化PR AUC和ROC AUC,在这里就不再详细讨论了。
### 4. 完整代码
```python
import warnings
warnings.filterwarnings("ignore")
import pandas as pd
import numpy as np
import pickle
import matplotlib.pyplot as plt
plt.style.use('seaborn')
import tensorflow as tf
import seaborn as sns
from sklearn.model_selection import train_test_split
from keras.models import Model, load_model
from keras.layers import Input, Dense
from keras.callbacks import ModelCheckpoint
from keras import regularizers
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import roc_curve, auc, precision_recall_curve
print("SofaSofa.io rocks!")
```
```python
# 读取数据
d = pd.read_csv('SofaSofa_Anomaly.csv')
# 查看样本比例
num_nonfraud = np.sum(d['Class'] == 0)
num_fraud = np.sum(d['Class'] == 1)
plt.bar(['Fraud', 'non-fraud'], [num_fraud, num_nonfraud], color='dodgerblue')
plt.show()
# 删除时间列,对Amount进行标准化
data = d.drop(['Time'], axis=1)
data['Amount'] = StandardScaler().fit_transform(data[['Amount']])
# 提取负样本,并且按照8:2切成训练集和测试集
mask = (data['Class'] == 0)
X_train, X_test = train_test_split(data[mask], test_size=0.2, random_state=920)
X_train = X_train.drop(['Class'], axis=1).values
X_test = X_test.drop(['Class'], axis=1).values
# 提取所有正样本,作为测试集的一部分
X_fraud = data[~mask].drop(['Class'], axis=1).values
# 设置Autoencoder的参数
# 隐藏层节点数分别为16,8,8,16
# epoch为50,batch size为32
input_dim = X_train.shape[1]
encoding_dim = 16
num_epoch = 50
batch_size = 32
input_layer = Input(shape=(input_dim, ))
encoder = Dense(encoding_dim, activation="tanh",
activity_regularizer=regularizers.l1(10e-5))(input_layer)
encoder = Dense(int(encoding_dim / 2), activation="relu")(encoder)
decoder = Dense(int(encoding_dim / 2), activation='tanh')(encoder)
decoder = Dense(input_dim, activation='relu')(decoder)
autoencoder = Model(inputs=input_layer, outputs=decoder)
autoencoder.compile(optimizer='adam',
loss='mean_squared_error',
metrics=['mae'])
# 模型保存为SofaSofa_model.h5,并开始训练模型
checkpointer = ModelCheckpoint(filepath="SofaSofa_model.h5",
verbose=0,
save_best_only=True)
history = autoencoder.fit(X_train, X_train,
epochs=num_epoch,
batch_size=batch_size,
shuffle=True,
validation_data=(X_test, X_test),
verbose=1,
callbacks=[checkpointer]).history
# 画出损失函数曲线
plt.figure(figsize=(14, 5))
plt.subplot(121)
plt.plot(history['loss'], c='dodgerblue', lw=3)
plt.plot(history['val_loss'], c='coral', lw=3)
plt.title('model loss')
plt.ylabel('mse'); plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper right')
plt.subplot(122)
plt.plot(history['mean_absolute_error'], c='dodgerblue', lw=3)
plt.plot(history['val_mean_absolute_error'], c='coral', lw=3)
plt.title('model mae')
plt.ylabel('mae'); plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper right');
```
```python
# 读取模型
autoencoder = load_model('SofaSofa_model.h5')
# 利用训练好的autoencoder重建测试集
pred_test = autoencoder.predict(X_test)
pred_fraud = autoencoder.predict(X_fraud)
# 计算还原误差MSE和MAE
mse_test = np.mean(np.power(X_test - pred_test, 2), axis=1)
mse_fraud = np.mean(np.power(X_fraud - pred_fraud, 2), axis=1)
mae_test = np.mean(np.abs(X_test - pred_test), axis=1)
mae_fraud = np.mean(np.abs(X_fraud - pred_fraud), axis=1)
mse_df = pd.DataFrame()
mse_df['Class'] = [0] * len(mse_test) + [1] * len(mse_fraud)
mse_df['MSE'] = np.hstack([mse_test, mse_fraud])
mse_df['MAE'] = np.hstack([mae_test, mae_fraud])
mse_df = mse_df.sample(frac=1).reset_index(drop=True)
```
```python
# 分别画出测试集中正样本和负样本的还原误差MAE和MSE
markers = ['o', '^']
markers = ['o', '^']
colors = ['dodgerblue', 'coral']
labels = ['Non-fraud', 'Fraud']
plt.figure(figsize=(14, 5))
plt.subplot(121)
for flag in [1, 0]:
temp = mse_df[mse_df['Class'] == flag]
plt.scatter(temp.index,
temp['MAE'],
alpha=0.7,
marker=markers[flag],
c=colors[flag],
label=labels[flag])
plt.title('Reconstruction MAE')
plt.ylabel('Reconstruction MAE'); plt.xlabel('Index')
plt.subplot(122)
for flag in [1, 0]:
temp = mse_df[mse_df['Class'] == flag]
plt.scatter(temp.index,
temp['MSE'],
alpha=0.7,
marker=markers[flag],
c=colors[flag],
label=labels[flag])
plt.legend(loc=[1, 0], fontsize=12); plt.title('Reconstruction MSE')
plt.ylabel('Reconstruction MSE'); plt.xlabel('Index')
plt.show()
```
```python
# 画出Precision-Recall曲线
plt.figure(figsize=(14, 6))
for i, metric in enumerate(['MAE', 'MSE']):
plt.subplot(1, 2, i+1)
precision, recall, _ = precision_recall_curve(mse_df['Class'], mse_df[metric])
pr_auc = auc(recall, precision)
plt.title('Precision-Recall curve based on %s\nAUC = %0.2f'%(metric, pr_auc))
plt.plot(recall[:-2], precision[:-2], c='coral', lw=4)
plt.xlabel('Recall'); plt.ylabel('Precision')
plt.show()
# 画出ROC曲线
plt.figure(figsize=(14, 6))
for i, metric in enumerate(['MAE', 'MSE']):
plt.subplot(1, 2, i+1)
fpr, tpr, _ = roc_curve(mse_df['Class'], mse_df[metric])
roc_auc = auc(fpr, tpr)
plt.title('Receiver Operating Characteristic based on %s\nAUC = %0.2f'%(metric, roc_auc))
plt.plot(fpr, tpr, c='coral', lw=4)
plt.plot([0,1],[0,1], c='dodgerblue', ls='--')
plt.ylabel('TPR'); plt.xlabel('FPR')
plt.show()
```
```python
# 画出MSE、MAE散点图
markers = ['o', '^']
colors = ['dodgerblue', 'coral']
labels = ['Non-fraud', 'Fraud']
plt.figure(figsize=(10, 5))
for flag in [1, 0]:
temp = mse_df[mse_df['Class'] == flag]
plt.scatter(temp['MAE'],
temp['MSE'],
alpha=0.7,
marker=markers[flag],
c=colors[flag],
label=labels[flag])
plt.legend(loc=[1, 0])
plt.ylabel('Reconstruction RMSE'); plt.xlabel('Reconstruction MAE')
plt.show()
```
参考文献:[medium-curiousily](https://medium.com/@curiousily/credit-card-fraud-detection-using-autoencoders-in-keras-tensorflow-for-hackers-part-vii-20e0c85301bd)