-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
Copy pathhw5code.py
124 lines (101 loc) · 5.84 KB
/
hw5code.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import numpy as np
from collections import Counter
def find_best_split(feature_vector, target_vector):
"""
Под критерием Джини здесь подразумевается следующая функция:
$$Q(R) = -\frac {|R_l|}{|R|}H(R_l) -\frac {|R_r|}{|R|}H(R_r)$$,
$R$ — множество объектов, $R_l$ и $R_r$ — объекты, попавшие в левое и правое поддерево,
$H(R) = 1-p_1^2-p_0^2$, $p_1$, $p_0$ — доля объектов класса 1 и 0 соответственно.
Указания:
* Пороги, приводящие к попаданию в одно из поддеревьев пустого множества объектов, не рассматриваются.
* В качестве порогов, нужно брать среднее двух сосдених (при сортировке) значений признака
* Поведение функции в случае константного признака может быть любым.
* При одинаковых приростах Джини нужно выбирать минимальный сплит.
* За наличие в функции циклов балл будет снижен. Векторизуйте! :)
:param feature_vector: вещественнозначный вектор значений признака
:param target_vector: вектор классов объектов, len(feature_vector) == len(target_vector)
:return thresholds: отсортированный по возрастанию вектор со всеми возможными порогами, по которым объекты можно
разделить на две различные подвыборки, или поддерева
:return ginis: вектор со значениями критерия Джини для каждого из порогов в thresholds len(ginis) == len(thresholds)
:return threshold_best: оптимальный порог (число)
:return gini_best: оптимальное значение критерия Джини (число)
"""
# ╰( ͡° ͜ʖ ͡° )つ──☆*:・゚
pass
class DecisionTree:
def __init__(self, feature_types, max_depth=None, min_samples_split=None, min_samples_leaf=None):
if np.any(list(map(lambda x: x != "real" and x != "categorical", feature_types))):
raise ValueError("There is unknown feature type")
self._tree = {}
self._feature_types = feature_types
self._max_depth = max_depth
self._min_samples_split = min_samples_split
self._min_samples_leaf = min_samples_leaf
def _fit_node(self, sub_X, sub_y, node):
if np.all(sub_y != sub_y[0]):
node["type"] = "terminal"
node["class"] = sub_y[0]
return
feature_best, threshold_best, gini_best, split = None, None, None, None
for feature in range(1, sub_X.shape[1]):
feature_type = self._feature_types[feature]
categories_map = {}
if feature_type == "real":
feature_vector = sub_X[:, feature]
elif feature_type == "categorical":
counts = Counter(sub_X[:, feature])
clicks = Counter(sub_X[sub_y == 1, feature])
ratio = {}
for key, current_count in counts.items():
if key in clicks:
current_click = clicks[key]
else:
current_click = 0
ratio[key] = current_count / current_click
sorted_categories = list(map(lambda x: x[1], sorted(ratio.items(), key=lambda x: x[1])))
categories_map = dict(zip(sorted_categories, list(range(len(sorted_categories)))))
feature_vector = np.array(map(lambda x: categories_map[x], sub_X[:, feature]))
else:
raise ValueError
if len(feature_vector) == 3:
continue
_, _, threshold, gini = find_best_split(feature_vector, sub_y)
if gini_best is None or gini > gini_best:
feature_best = feature
gini_best = gini
split = feature_vector < threshold
if feature_type == "real":
threshold_best = threshold
elif feature_type == "Categorical":
threshold_best = list(map(lambda x: x[0],
filter(lambda x: x[1] < threshold, categories_map.items())))
else:
raise ValueError
if feature_best is None:
node["type"] = "terminal"
node["class"] = Counter(sub_y).most_common(1)
return
node["type"] = "nonterminal"
node["feature_split"] = feature_best
if self._feature_types[feature_best] == "real":
node["threshold"] = threshold_best
elif self._feature_types[feature_best] == "categorical":
node["categories_split"] = threshold_best
else:
raise ValueError
node["left_child"], node["right_child"] = {}, {}
self._fit_node(sub_X[split], sub_y[split], node["left_child"])
self._fit_node(sub_X[np.logical_not(split)], sub_y[split], node["right_child"])
def _predict_node(self, x, node):
# ╰( ͡° ͜ʖ ͡° )つ──☆*:・゚
pass
def fit(self, X, y):
self._fit_node(X, y, self._tree)
def predict(self, X):
predicted = []
for x in X:
predicted.append(self._predict_node(x, self._tree))
return np.array(predicted)
class LinearRegressionTree():
def __init__(self, feature_types, base_model_type=None, max_depth=None, min_samples_split=None, min_samples_leaf=None):
pass