【深度学习】在PyTorch中使用 LSTM 自动编码器进行时间序列异常检测
寫在前面
環境準備
本次數據集的格式.arff,需要用到arff2pandas模塊讀取。
#?!nvidia-smi #?!pip?install?-qq?arff2pandas #?!pip?install?-q?-U?watermark另外本次運行環境可通過如下方法查看。
%reload_ext?watermark %watermark?-v?-p?numpy,pandas,torch,arff2pandasPython implementation: CPython Python version : 3.8.8 IPython version : 7.22.0numpy : 1.19.5 pandas : 1.2.4 torch : 1.9.1 arff2pandas: 1.0.1導入相關模塊
import?torchimport?copy import?numpy?as?np import?pandas?as?pd import?seaborn?as?sns from?pylab?import?rcParams import?matplotlib.pyplot?as?plt from?matplotlib?import?rc from?sklearn.model_selection?import?train_test_split from?torch?import?nn,?optim import?torch.nn.functional?as?F from?arff2pandas?import?a2p%matplotlib?inline %config?InlineBackend.figure_format='retina' sns.set(style='whitegrid',?palette='muted',?font_scale=1.2) HAPPY_COLORS_PALETTE?=?["#01BEFE",?"#FFDD00",?"#FF7D00",?"#FF006D",?"#ADFF02",?"#8F00FF"] sns.set_palette(sns.color_palette(HAPPY_COLORS_PALETTE)) rcParams['figure.figsize']?=?15,?8 RANDOM_SEED?=?42 np.random.seed(RANDOM_SEED) torch.manual_seed(RANDOM_SEED)<torch._C.Generator at 0x7fa5fcf70c50>本文核心內容
本案例使用真實的心電圖 (ECG) 數據來檢測患者心跳的異常情況。我們將一起構建一個 LSTM 自動編碼器,使用來自單個心臟病患者的真實心電圖數據對其進行訓練,并將在新的樣本中,使用訓練好的模型對其進行預測分類為正常或異常來來檢測異常心跳。
本案例主要圍繞以下幾大核心展開。
從時間序列數據中準備用于異常檢測的數據集
使用 PyTorch 構建 LSTM 自動編碼器
訓練和評估模型
設定異常檢測的閾值
將新的樣本分類為正常或異常
數據集
該數據集包含 5,000 個通過 ECG 獲得的時間序列樣本,樣本一共具有 140 個時間步長。每個序列對應于一個患有充血性心力衰竭的患者的一次心跳。
心電圖(ECG 或 EKG)是一種通過測量心臟的電活動來檢查心臟功能的測試。每一次心跳,都會有一個電脈沖(或電波)穿過您的心臟。這種波會導致肌肉擠壓并從心臟泵出血液。?來源[1]
我們有 5 種類型的心跳類別,他們分別是:
正常 (N)
室性早搏 (R-on-T PVC)
室性早搏 (PVC)
室上性早搏或異位搏動(SP 或 EB)
未分類的搏動 (UB)。
假設心臟健康,典型心率為每分鐘 70 到 75 次,每個心動周期或心跳大約需要 0.8 秒才能完成該周期。頻率:每分鐘 60–100 次(人類)持續時間:0.6–1 秒(人類)?來源[2]
如果你的設備安裝有 GPU,這將是非常好的,因為他的運行速度更快,可以節約你寶貴的時間。
device?=?torch.device("cuda"?if?torch.cuda.is_available()?else?"cpu")如下圖所示,數據有多種格式,我們將加載.arff格式的文件到pandas數據幀中。
數據獲取方法:?在公眾號『機器學習研習院』中后臺消息框回復【heart】免費獲取!
with?open('./data/ECG5000/ECG5000_TRAIN.arff')?as?f:train?=?a2p.load(f) with?open('./data/ECG5000/ECG5000_TEST.arff')?as?f:test?=?a2p.load(f)把訓練和測試數據組合成一個單一的數據框。兩者的加成,將為我們提供更多數據來訓練我們的自動編碼器。
df?=?train.append(test) df?=?df.sample(frac=1.0) df.shape(5000, 141)看下數據集樣貌。
df.head()我們有5000個例子。每一行代表一個心跳記錄。我們重新命名所有的類。并將最后一列重命名為target,這樣在后面引用它將更為方便。
CLASS_NORMAL?=?1 class_names?=?['Normal','R?on?T','PVC','SP','UB']new_columns?=?list(df.columns) new_columns[-1]?=?'target' df.columns?=?new_columns探索性數據分析
通過函數value_counts()可以看看每個不同的心跳類分別有多少個樣本。
df.target.value_counts()1 2919 2 1767 4 194 3 96 5 24 Name: target, dtype: int64當然,為了更加直觀,我們通過可視化方法將心跳類別通過sns.countplot()清晰展示出。
ax?=?sns.countplot(x="target",?data=df,order?=?df['target'].value_counts().index) ax.set_xticklabels(class_names);通過統計分析,我們發現普通類的樣本最多。這個結果是非常理想的,也是意料之中的(異常檢測中的異常往往是最少的),又因為我們需要使用這些正常類的數據來訓練模型。
接下來,我們看一下每個類的平均時間序列(前面和后面做一個標準差平滑)。
首先定義一個輔助繪圖函數。
def?plot_time_series_class(data,?class_name,?ax,?n_steps=10):"""param?data:數據param?class_name:?不同心跳類名param?ax:畫布"""time_series_df?=?pd.DataFrame(data)#?平滑時間窗口smooth_path?=?time_series_df.rolling(n_steps).mean()#?路徑偏差path_deviation?=?2?*?time_series_df.rolling(n_steps).std()#?以正負偏差上下定義界限under_line?=?(smooth_path?-?path_deviation)[0]over_line?=?(smooth_path?+?path_deviation)[0]#?繪制平滑曲線ax.plot(smooth_path,?linewidth=2)ax.fill_between(path_deviation.index,under_line,over_line,alpha=.125)ax.set_title(class_name)根據上面的定義的輔助函數,循環繪制每個心跳類的平滑曲線。
#?獲取所有不同心跳類別 classes?=?df.target.unique() #?定義畫布 fig,?axs?=?plt.subplots(nrows=len(classes)?//?3?+?1,ncols=3,sharey=True,figsize=(14,?8)) #?循環繪制曲線 for?i,?cls?in?enumerate(classes):ax?=?axs.flat[i]data?=?df[df.target?==?cls]?\.drop(labels='target',?axis=1)?\.mean(axis=0)?\.to_numpy()plot_time_series_class(data,?class_names[i],?ax)fig.delaxes(axs.flat[-1]) fig.tight_layout();根據上面五種心跳類的可視化結果看出,正常類具有與所有其他類明顯不同的特征,這也許就是我們構建的模型能夠檢測出異常的關鍵所在。
LSTM 自動編碼器
自動編碼器是個啥
自編碼器模型架構圖解自動編碼器模型是一種神經網絡,旨在以無監督的方式學習恒等函數以重建原始輸入,同時在此過程中壓縮數據,從而發現更有效和壓縮的表示。
該網絡可以看作由兩部分組成:一個編碼器函數??和一個生成重構的解碼器?
編碼器網絡:將原始的高維輸入轉換為潛在的低維代碼。輸入尺寸大于輸出尺寸。
解碼器網絡:解碼器網絡從代碼中恢復數據,輸出層可能越來越大。
編碼器網絡本質上完成了降維,就像我們如何使用主成分分析(PCA)或矩陣分解(MF)一樣。此外,自動編碼器針對代碼中的數據重構進行了顯式優化。一個好的中間表示不僅可以捕獲潛在變量,而且有利于完整的解壓過程。
該模型包含由???參數化的編碼器函數??和由?θ?參數化的解碼器函數?。在瓶頸層為輸入x學習的低維代碼為??,重構輸入為?θ?。
參數 (θ,?) 一起學習以輸出與原始輸入相同的重構數據樣本,θ?,或者換句話說,學習恒等函數。有多種指標可以量化兩個向量之間的差異,例如激活函數為 sigmoid 時的交叉熵,或者像 MSE 損失一樣簡單:
θ?θ?
心電數據異常檢測
我們將使用正常的心跳作為模型的訓練數據,并記錄重構損失。但首先需要準備數據。
數據預處理
獲取所有正常的心跳并刪除目標類的列。
normal_df?=?df[df.target?==?str(CLASS_NORMAL)].drop(labels='target',?axis=1) normal_df.shape(2919, 140)合并所有其他類并將它們標記為異常。
anomaly_df?=?df[df.target?!=?str(CLASS_NORMAL)].drop(labels='target',?axis=1) anomaly_df.shape(2081, 140)將正常類樣本分為訓練集、驗證集和測試集。
train_df,?val_df?=?train_test_split(normal_df,test_size=0.15,random_state=RANDOM_SEED)val_df,?test_df?=?train_test_split(val_df,test_size=0.33,?random_state=RANDOM_SEED)需要將樣本轉換為張量,使用它們來訓練自動編碼器。為此編寫一個輔助函數來實現樣本數據類型的轉換,以便后續復用。
def?create_dataset(df):sequences?=?df.astype(np.float32).to_numpy().tolist()dataset?=?[torch.tensor(s).unsqueeze(1).float()?for?s?in?sequences]n_seq,?seq_len,?n_features?=?torch.stack(dataset).shapereturn?dataset,?seq_len,?n_features關于torch.unsqueeze()?和?torch.stack()?詳解可參見文末。
轉換示例:
每個時間序列將被轉換為形狀?序列長度?x *特征數量 *的二維張量 。在我們的例子中為140x1的二維張量。
接下來將所有需要用到的數據集進行如上轉換。
#?_?表示不需要該項 train_dataset,?seq_len,?n_features?=?create_dataset(train_df) val_dataset,?_,?_?=?create_dataset(val_df) test_normal_dataset,?_,?_?=?create_dataset(test_df) test_anomaly_dataset,?_,?_?=?create_dataset(anomaly_df)構建 LSTM 自動編碼器
自動編碼器的工作是獲取一些輸入數據,將其通過模型傳遞,并獲得輸入的重構,重構應該盡可能匹配輸入。
從某種意義上說,自動編碼器試圖只學習數據中最重要的特征,這里使用幾個 LSTM 層(即LSTM Autoencoder)來捕獲數據的時間依賴性。接下來我們一起看看如何將時間序列數據提供給自動編碼器。
為了將序列分類為正常或異常,需要設定一個閾值,并規定高于該閾值時,心跳是異常的。
重構損失
當訓練一個自動編碼器時,模型目標是盡可能地重構輸入。這里的目標是通過最小化損失函數來實現的(就像在監督學習中一樣)。這里所使用的損失函數被稱為重構損失。常用的重構損失是交叉熵損失和均方誤差。
接下來將以GitHub[3]中的 LSTM Autoencoder為基礎,并進行一些小調整。因為模型的工作是重建時間序列數據,因此該模型需要從編碼器開始定義。
class?Encoder(nn.Module):"""定義一個編碼器的子類,繼承父類?nn.Modul"""def?__init__(self,?seq_len,?n_features,?embedding_dim=64):super(Encoder,?self).__init__()self.seq_len,?self.n_features?=?seq_len,?n_featuresself.embedding_dim,?self.hidden_dim?=?embedding_dim,?2?*?embedding_dim#?使用雙層LSTMself.rnn1?=?nn.LSTM(input_size=n_features,hidden_size=self.hidden_dim,num_layers=1,batch_first=True)self.rnn2?=?nn.LSTM(input_size=self.hidden_dim,hidden_size=embedding_dim,num_layers=1,batch_first=True)def?forward(self,?x):x?=?x.reshape((1,?self.seq_len,?self.n_features))x,?(_,?_)?=?self.rnn1(x)x,?(hidden_n,?_)?=?self.rnn2(x)return?hidden_n.reshape((self.n_features,?self.embedding_dim))編碼器使用兩個LSTM層壓縮時間序列數據輸入。
接下來,我們將使用Decoder對壓縮表示進行解碼。
class?Decoder(nn.Module):"""定義一個解碼器的子類,繼承父類?nn.Modul"""def?__init__(self,?seq_len,?input_dim=64,?n_features=1):super(Decoder,?self).__init__()self.seq_len,?self.input_dim?=?seq_len,?input_dimself.hidden_dim,?self.n_features?=?2?*?input_dim,?n_featuresself.rnn1?=?nn.LSTM(input_size=input_dim,hidden_size=input_dim,num_layers=1,batch_first=True)self.rnn2?=?nn.LSTM(input_size=input_dim,hidden_size=self.hidden_dim,num_layers=1,batch_first=True)self.output_layer?=?nn.Linear(self.hidden_dim,?n_features)def?forward(self,?x):x?=?x.repeat(self.seq_len,?self.n_features)x?=?x.reshape((self.n_features,?self.seq_len,?self.input_dim))x,?(hidden_n,?cell_n)?=?self.rnn1(x)x,?(hidden_n,?cell_n)?=?self.rnn2(x)x?=?x.reshape((self.seq_len,?self.hidden_dim))return?self.output_layer(x)編碼器和解碼器均包含兩個 LSTM 層和一個提供最終重建的輸出層。
這里將所有內容包裝成一個易于使用的模塊了。
class?RecurrentAutoencoder(nn.Module):"""定義一個自動編碼器的子類,繼承父類?nn.Module并且自動編碼器通過編碼器和解碼器傳遞輸入"""def?__init__(self,?seq_len,?n_features,?embedding_dim=64):super(RecurrentAutoencoder,?self).__init__()self.encoder?=?Encoder(seq_len,?n_features,?embedding_dim).to(device)self.decoder?=?Decoder(seq_len,?embedding_dim,?n_features).to(device)def?forward(self,?x):x?=?self.encoder(x)x?=?self.decoder(x)return?x自動編碼器類已經定義好,接下來創建一個它的實例。
model?=?RecurrentAutoencoder(seq_len,?n_features,?128) model?=?model.to(device)訓練模型
自動編碼器模型已經定義好。接下來需要訓練模型。下面為訓練過程編寫一個輔助函數train_model。
def?train_model(model,?train_dataset,?val_dataset,?n_epochs):optimizer?=?torch.optim.Adam(model.parameters(),?lr=1e-3)criterion?=?nn.L1Loss(reduction='sum').to(device)history?=?dict(train=[],?val=[])best_model_wts?=?copy.deepcopy(model.state_dict())best_loss?=?10000.0for?epoch?in?range(1,?n_epochs?+?1):model?=?model.train()train_losses?=?[]for?seq_true?in?train_dataset:optimizer.zero_grad()seq_true?=?seq_true.to(device)seq_pred?=?model(seq_true)loss?=?criterion(seq_pred,?seq_true)loss.backward()optimizer.step()train_losses.append(loss.item())val_losses?=?[]model?=?model.eval()with?torch.no_grad():for?seq_true?in?val_dataset:seq_true?=?seq_true.to(device)seq_pred?=?model(seq_true)loss?=?criterion(seq_pred,?seq_true)val_losses.append(loss.item())train_loss?=?np.mean(train_losses)val_loss?=?np.mean(val_losses)history['train'].append(train_loss)history['val'].append(val_loss)if?val_loss?<?best_loss:best_loss?=?val_lossbest_model_wts?=?copy.deepcopy(model.state_dict())print(f'Epoch?{epoch}:?train?loss?{train_loss}?val?loss?{val_loss}')model.load_state_dict(best_model_wts)return?model.eval(),?history在每個epoch中,訓練過程為模型提供所有訓練樣本,并評估驗證集上的模型效果。注意,這里使用的批處理大小為1 ,即模型一次只能得到一個序列。另外還記錄了過程中的訓練和驗證集損失。
值得注意的是,重構時做的是最小化L1損失,它測量的是 MAE(平均絕對誤差),似乎比 MSE(均方誤差)更好。
最后,我們將獲得具有最小驗證誤差的模型,并使用該模型進行接下來的異常檢測預。現在開始做一些訓練。
#?這一步耗時較長 model,?history?=?train_model(model,?train_dataset,?val_dataset,?n_epochs=150 )繪制模型損失
繪制模型在訓練和測試數據集上面的損失曲線。
ax?=?plt.figure().gca() ax.plot(history['train']) ax.plot(history['val']) plt.ylabel('Loss') plt.xlabel('Epoch') plt.legend(['train',?'test']) plt.title('Loss?over?training?epochs') plt.show();從可視化結果看出,我們所訓練的模型收斂得很好。看起來我們可能需要一個更大的驗證集來優化模型,但本文就不做展開了,現在就這樣了。
保存模型
存儲模型以備后用。模型保存是必須要做的,他是保存和避免我們寶貴工作不被浪費的重要步驟。
MODEL_PATH?=?'model.pth' torch.save(model,?MODEL_PATH)如果要下載和加載預訓練模型,請取消注釋下一行。
#?model?=?torch.load('model.pth') #?model?=?model.to(device)設定閾值
有了訓練好了的模型,可以看看訓練集上的重構誤差。同樣編寫一個輔助函數來使用模型預測結果。
def?predict(model,?dataset):predictions,?losses?=?[],?[]criterion?=?nn.L1Loss(reduction='sum').to(device)with?torch.no_grad():model?=?model.eval()for?seq_true?in?dataset:seq_true?=?seq_true.to(device)seq_pred?=?model(seq_true)loss?=?criterion(seq_pred,?seq_true)predictions.append(seq_pred.cpu().numpy().flatten())losses.append(loss.item())return?predictions,?losses該預測函數遍歷數據集中的每個樣本并記錄預測結果和損失。
_,?losses?=?predict(model,?train_dataset) sns.distplot(losses,?bins=50,?kde=True);從圖結果看,該閾值設定為26較為合適。
THRESHOLD?=?26評估
利用上面設定的閾值,我們可以將問題轉化為一個簡單的二分類任務:
如果一個例子的重構損失低于閾值,我們將其歸類為"正常"心跳
或者,如果損失高于閾值,我們會將其歸類為**"異常"**
正常心跳
我們檢查一下模型在正常心跳上的表現如何。這里使用新的測試集中的正常心跳。
predictions,?pred_losses?=?predict(model,?test_normal_dataset) sns.distplot(pred_losses,?bins=50,?kde=True);計算下模型預測正確的樣本有多少。
correct?=?sum(l?<=?THRESHOLD?for?l?in?pred_losses) print(f'Correct?normal?predictions:?{correct}/{len(test_normal_dataset)}')Correct normal predictions: 142/145異常心跳
我們對異常樣本執行相同的操作,由于異常心跳和正常心跳的樣本數量不一致,因此需要獲得一個與正常心跳大小相同的子集,并對異常子集進行模型的預測。
anomaly_dataset?=?test_anomaly_dataset[:len(test_normal_dataset)] predictions,?pred_losses?=?predict(model,?anomaly_dataset) sns.distplot(pred_losses,?bins=50,?kde=True);最后計算高于閾值的樣本數量,而這些樣本將被視為異常心跳數據。
correct?=?sum(l?>?THRESHOLD?for?l?in?pred_losses) print(f'Correct?anomaly?predictions:?{correct}/{len(anomaly_dataset)}')Correct anomaly predictions: 142/145由此可見,我們得到了很好的結果。在現實項目中,可以根據要容忍的錯誤類型來調整閾值。在這種情況下,可能希望誤報(正常心跳被視為異常)多于漏報(異常被視為正常)。
樣本對比觀察
可以疊加真實的和重構的時間序列值,看看它們有多接近。得到相比的結果,可以針對一些正常和異常情況進行處理。
#?定義輔助函數 def?plot_prediction(data,?model,?title,?ax):predictions,?pred_losses?=?predict(model,?[data])ax.plot(data,?label='true')ax.plot(predictions[0],?label='reconstructed')ax.set_title(f'{title}?(loss:?{np.around(pred_losses[0],?2)})')ax.legend() #?繪圖 fig,?axs?=?plt.subplots(nrows=2,ncols=6,sharey=True,sharex=True,figsize=(22,?8))for?i,?data?in?enumerate(test_normal_dataset[:6]):plot_prediction(data,?model,?title='Normal',?ax=axs[0,?i])for?i,?data?in?enumerate(test_anomaly_dataset[:6]):plot_prediction(data,?model,?title='Anomaly',?ax=axs[1,?i])fig.tight_layout();到目前為止,該實戰案例已經告一段落了。在本案例中,我們一起學習了如何使用 PyTorch 創建 LSTM 自動編碼器并使用它來檢測 ECG 數據中的心跳異常。
附
torch.unsqueeze 詳解
torch.unsqueeze(input,?dim,?out=None)返回一個新的張量,對輸入的既定位置插入維度 1
作用:擴展維度
注意:?返回張量與輸入張量共享內存,所以改變其中一個的內容會改變另一個。
參數:
tensor?(Tensor) – 輸入張量
dim?(int) – 插入維度的索引,如果dim為負,則將會被轉化dim+input.dim()+1
out?(Tensor, optional) – 結果張量
例子:
x?=?torch.Tensor([1,?2,?3,?4]) torch.unsqueeze(x,?0)?? >>>?tensor([[1.,?2.,?3.,?4.]]) torch.unsqueeze(x,?1) >>>?tensor([[1.],[2.],[3.],?[4.]])torch.stack() 詳解
沿著一個新維度對輸入張量序列進行連接。序列中所有的張量都應該為相同形狀。
簡而言之:把多個二維的張量湊成一個三維的張量;多個三維的湊成一個四維的張量…以此類推,也就是在增加新的維度進行堆疊。
outputs?=?torch.stack(inputs,?dim=0)?→?Tensor參數:
inputs?(sequence of Tensors) - 待連接的張量序列。
注:python的序列數據只有list和tuple。函數中的輸入inputs只允許是序列;且序列內部的張量元素,必須shape相等。dim?(int) 新的維度, 必須在0到len(outputs)之間。注:len(outputs)是生成數據的維度大小,也就是outputs的維度值。dim是選擇生成的維度,必須滿足0<=dim<len(outputs);len(outputs)是輸出后的tensor的維度大小。
例子:
#?假設是時間步T1的輸出 T1?=?torch.tensor([[1,?2,?3],[4,?5,?6],[7,?8,?9]]) #?假設是時間步T2的輸出 T2?=?torch.tensor([[10,?20,?30],[40,?50,?60],[70,?80,?90]]) torch.stack((T1,T2),dim=1) >>>?tensor([[[?1,??2,??3], ...????????[10,?20,?30]], ... ...???????[[?4,??5,??6], ...????????[40,?50,?60]], ... ...???????[[?7,??8,??9], ...????????[70,?80,?90]]]) torch.stack((T1,T2),dim=0) >>>?tensor([[[?1,??2,??3], ...?????????[?4,??5,??6], ...?????????[?7,??8,??9]], ... ...????????[[10,?20,?30], ...?????????[40,?50,?60], ...?????????[70,?80,?90]]])參考資料
[1]?
來源:?https://www.heartandstroke.ca/heart/tests/electrocardiogram
[2]?來源:?https://en.wikipedia.org/wiki/Cardiac_cycle
[3]?GitHub:?https://github.com/shobrook/sequitur
[4]?參考原文:?https://curiousily.com/posts/time-series-anomaly-detection-using-lstm-autoencoder-with-pytorch-in-python/
[5]?Sequitur - Recurrent Autoencoder (RAE):?https://github.com/shobrook/sequitur
[6]?Towards Never-Ending Learning from Time Series Streams:?https://www.cs.ucr.edu/~eamonn/neverending.pdf
[7]?LSTM Autoencoder for Anomaly Detection:?https://towardsdatascience.com/lstm-autoencoder-for-anomaly-detection-e1f4f2ee7ccf
往期精彩回顧適合初學者入門人工智能的路線及資料下載中國大學慕課《機器學習》(黃海廣主講)機器學習及深度學習筆記等資料打印機器學習在線手冊深度學習筆記專輯《統計學習方法》的代碼復現專輯 AI基礎下載本站qq群955171419,加入微信群請掃碼:總結
以上是生活随笔為你收集整理的【深度学习】在PyTorch中使用 LSTM 自动编码器进行时间序列异常检测的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Android如何回调编码后的音视频数据
- 下一篇: 别人家的孩子!高校博士实现Nature、