利用KD树进行异常检测
什么是KD樹
要說KD樹,我們得先說一下什么是KNN算法。
KNN是k-NearestNeighbor的簡稱,原理很簡單:當你有一堆已經標注好的數據時,你知道哪些是正類,哪些是負類。當新拿到一個沒有標注的數據時,你想知道它是哪一類的。只要找到它的鄰居(離它距離短)的點是什么類別的,所謂近朱者赤近墨者黑,KNN就是采用了類似的方法。
如上圖,當有新的點不知道是哪一類時,只要看看離它最近的幾個點是什么類別,我們就判斷它是什么類別。
舉個例子:我們將k取3(就是每次看看新來的數據點的三個住的最近的鄰居),那么我們將所有數據點和新來的數據點計算一次距離,然后排序,取前三個數據點,讓它們舉手表決。兩票及以上的類別我們就認為是新的數據點的類別。
很簡單也很好的想法,但是,我們要注意到當測試集數據比較大時,由于每次未標注的數據點都要和全部的已標注的數據點進行一次距離計算,然后排序??梢哉f時間開銷非常大。我們在此基礎上,想到了一種存儲點與點之間關系的算法來通過空間換時間。
有一篇博文寫KD樹還不錯
點擊此處查看
舉個例子:有一個二維的數據集: T={(2,3),(5,4),(9,6),(4,7),(8,1),(7,2)}
通過你已經學習的KD樹的算法,按照依次選擇維度,取各維中位數,是否得出和下面一樣的KD樹?
異常檢測
我們的數據來自于KDD Cup 1999 Data 點我下載數據
數據格式如下圖
數據的含義如下:
我們這次實驗針對正常和DDOS攻擊兩種情況進行檢測。
取特征范圍為(1,9)U(22,31)的特征中的數值型特征,最終得到16維的特征向量。將數據隨機化處理后按照7:3的比例分成訓練集和測試集。下面是我們組做好的訓練集和測試集
點我下載
處理完的訓練集和測試集如下圖:
下面是具體實現:
# coding=utf8 import math import time import matplotlib.pyplot as plt import numpy as npold_settings = np.seterr(all='ignore')#定義節點類型 class KD_node:def __init__(self, point=None, split=None, left=None, right=None):''':param point: 數據點的特征向量:param split: 切分的維度:param left: 左兒子:param right: 右兒子'''self.point = pointself.split = splitself.left = leftself.right = right計算方差,以利用方差大小進行判斷在哪一維進行切分
def computeVariance(arrayList):''':param arrayList: 所有數據某一維的向量:return: 返回'''for ele in arrayList:ele = float(ele)LEN = float(len(arrayList))array = np.array(arrayList)sum1 = float(array.sum())array2 = np.dot(array, array.T)sum2 = float(array2.sum())mean = sum1 / LENvariance = sum2 / LEN - mean ** 2return variance建樹
def createKDTree(root, data_list):''':param root: 輸入一個根節點,以此建樹:param data_list: 數據列表:return: 返回根節點'''LEN = len(data_list)if LEN == 0:return# 數據點的維度dimension = len(data_list[0]) - 1 #去掉了最后一維標簽維# 方差max_var = 0# 最后選擇的劃分域split = 0for i in range(dimension):ll = []for t in data_list:ll.append(t[i])var = computeVariance(ll) #計算出在這一維的方差大小if var > max_var:max_var = varsplit = i# 根據劃分域的數據對數據點進行排序data_list = list(data_list)data_list.sort(key=lambda x: x[split]) #按照在切分維度上的大小進行排序data_list = np.array(data_list)# 選擇下標為len / 2的點作為分割點point = data_list[LEN / 2]root = KD_node(point, split)root.left = createKDTree(root.left, data_list[0:(LEN / 2)])#遞歸的對切分到左兒子和右兒子的數據再建樹root.right = createKDTree(root.right, data_list[(LEN / 2 + 1):LEN])return root def computeDist(pt1, pt2):''':param pt1: 特征向量1:param pt2: 特征向量2:return: 兩個向量的歐氏距離'''sum_dis = 0.0for i in range(len(pt1)):sum_dis += (pt1[i] - pt2[i]) ** 2#實現的歐氏距離計算,效率很低的版本,可以改成numpy的向量運算return math.sqrt(sum_dis) def findNN(root, query):''':param root: 建立好的KD樹的樹根:param query: 查詢數據:return: 與這個數據最近的前三個節點'''# 初始化為root的節點NN = root.pointmin_dist = computeDist(query, NN)nodeList = []temp_root = rootdist_list = [temp_root.point, None, None] #用來存儲前三個節點##二分查找建立路徑while temp_root:nodeList.append(temp_root) #對向下走的路徑進行壓棧處理dd = computeDist(query, temp_root.point) #計算當前最近節點和查詢點的距離大小if min_dist > dd:NN = temp_root.pointmin_dist = dd# 當前節點的劃分域temp_split = temp_root.splitif query[temp_split] <= temp_root.point[temp_split]:temp_root = temp_root.leftelse:temp_root = temp_root.right##回溯查找while nodeList:back_point = nodeList.pop()back_split = back_point.splitif dist_list[1] is None:dist_list[2] = dist_list[1]dist_list[1] = back_point.pointelif dist_list[2] is None:dist_list[2] = back_point.pointif abs(query[back_split] - back_point.point[back_split]) < min_dist: #當查詢點和回溯點的距離小于當前最小距離時,另一個區域有希望存在更近的節點#如果大于這個距離,可以理解為假設在二維空間上,直角三角形的直角邊已經不滿足要求了,那么斜邊也一定不滿足要求if query[back_split] < back_point.point[back_split]:temp_root = back_point.rightelse:temp_root = back_point.leftif temp_root:nodeList.append(temp_root)curDist = computeDist(query, temp_root.point)if min_dist > curDist:min_dist = curDistdist_list[2] = dist_list[1]dist_list[1] = dist_list[0]dist_list[0] = temp_root.pointelif dist_list[1] is None or curDist < computeDist(dist_list[1], query):dist_list[2] = dist_list[1]dist_list[1] = temp_root.pointelif dist_list[2] is None or curDist < computeDist(dist_list[1], query):dist_list[2] = temp_root.pointreturn dist_list進行判斷
def judge_if_normal(dist_list):''':param dist_list: 利用findNN查找出的最近三個節點進行投票表決:return: '''normal_times = 0except_times = 0for i in dist_list:if abs(i[-1] - 0.0) < 1e-7: #浮點數的比較normal_times += 1else:except_times += 1if normal_times > except_times: #判斷是normalreturn Trueelse:return False數據預處理
def pre_data(path):f = open(path)lines = f.readlines()f.close()lstall = []for line in lines:lstn = []lst = line.split(",")u = 0y = 0for i in range(0, 9):if lst[i].isdigit():lstn.append(float(lst[i]))u += 1else:passfor j in range(21, 31):try:lstn.append(float(lst[j]))y += 1except:passif lst[len(lst) - 1] == "smurf.\n" or lst[len(lst) - 1] == "teardrop.\n":lstn.append(int("1"))else:lstn.append(int("0"))lstall.append(lstn)nplst = np.array(lstall, dtype=np.float16)return nplst下面就是個人的測試代碼了,大概運行了40分鐘才全跑完
def my_test(all_train_data, all_test_data, train_data_num):train_data = all_train_data[:train_data_num]train_time_start = time.time()root = KD_node()root = createKDTree(root, train_data)train_time_end = time.time()train_time = train_time_end - train_time_startright = 0error = 0test_time_start = time.time()for i in range(len(all_test_data)):if judge_if_normal(findNN(root, all_test_data[i])) is True and abs(all_test_data[i][-1] - 0.0) < 1e-7:right += 1elif judge_if_normal(findNN(root, all_test_data[i])) is False and abs(all_test_data[i][-1] - 1.0) < 1e-7:right += 1else:error += 1test_time_end = time.time()test_time = test_time_end - test_time_startright_ratio = float(right) / (right + error)return right_ratio, train_time, test_timedef draw(train_num_list=[10, 100, 1000, 10000], train_data=[], test_data=[]):train_time_list = []test_time_list = []right_ratio_list = []for i in train_num_list:print 'start run ' + i.__str__()temp = my_test(train_data, test_data, i)right_ratio_list.append(temp[0])train_time_list.append(temp[1])test_time_list.append(temp[2])plt.title('train data num from ' + train_num_list[0].__str__() + ' to ' + train_num_list[:-1].__str__())plt.subplot(311)plt.plot(train_num_list, right_ratio_list, c='b')plt.xlabel('train data num')plt.ylabel('right ratio')plt.grid(True)plt.subplot(312)plt.plot(train_num_list, train_time_list, c='r')plt.xlabel('train data num')plt.ylabel('time of train data (s)')plt.grid(True)plt.subplot(313)plt.plot(train_num_list, test_time_list, c='g')plt.xlabel('train data num')plt.ylabel('time of test data (s)')plt.grid(True)plt.show()data = pre_data('KDD-test\ddos+normal_70.txt') data2 = pre_data('KDD-test\ddos+normal_30.txt') ''' 建議開始將測試數據調小點,因為時間很長,下面這是全部訓練集和全部測試集,共花費了40分鐘才跑完。我是第六代i7 6700HQ+16G內存+1070+win10 ''' draw(train_num_list=[10, 100, 500, 1000, 2000, 3000, 5000, 10000, 15000, 20000, 50000, 100000, 265300],train_data=data[:], test_data=data2[:])跑完的效果如圖所示:
正確率最終達到95%以上。開始出現的波動我們懷疑是數據在開始沒有達到良好的隨機效果。
訓練時間與訓練數據量明顯成線性關系
測試時間確實和理論一致,是Nlog(M)的時間復雜度。應該說這種時間復雜度的降低是我們使用KD樹而不是原版的KNN最重要的地方。在原來的KNN算法下,假設訓練集大小為M,測試集大小為N,則查詢時間復雜度可以達到O(MN),但是我們降低到O(Nlog(M)),還是挺合算的。
本次實驗可以優化的地方很多,但是時間匆忙,沒有做更深的擴展。歡迎大家提出更多建議。
轉載于:https://www.cnblogs.com/chuxiuhong/p/5982580.html
總結
以上是生活随笔為你收集整理的利用KD树进行异常检测的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: codevs 1230 元素查找
- 下一篇: 优雅的使用 PhpStorm 来开发 L