|
| 1 | +""" |
| 2 | +Yolo V1 by tensorflow |
| 3 | +""" |
| 4 | + |
| 5 | +import numpy as np |
| 6 | +import tensorflow as tf |
| 7 | +import cv2 |
| 8 | + |
| 9 | + |
| 10 | +def leak_relu(x, alpha=0.1): |
| 11 | + return tf.maximum(alpha * x, x) |
| 12 | + |
| 13 | +class Yolo(object): |
| 14 | + def __init__(self, weights_file): |
| 15 | + self.verbose = True |
| 16 | + # detection params |
| 17 | + self.S = 7 # cell size |
| 18 | + self.B = 2 # boxes_per_cell |
| 19 | + self.classes = ["aeroplane", "bicycle", "bird", "boat", "bottle", |
| 20 | + "bus", "car", "cat", "chair", "cow", "diningtable", |
| 21 | + "dog", "horse", "motorbike", "person", "pottedplant", |
| 22 | + "sheep", "sofa", "train","tvmonitor"] |
| 23 | + self.C = len(self.classes) # number of classes |
| 24 | + # offset for box center (top left point of each cell) |
| 25 | + self.x_offset = np.transpose(np.reshape(np.array([np.arange(self.S)]*self.S*self.B), |
| 26 | + [self.B, self.S, self.S]), [1, 2, 0]) |
| 27 | + self.y_offset = np.transpose(self.x_offset, [1, 0, 2]) |
| 28 | + |
| 29 | + self.threshold = 0.2 # confidence scores threshold |
| 30 | + self.iou_threshold = 0.5 |
| 31 | + |
| 32 | + self.sess = tf.Session() |
| 33 | + self._build_net() |
| 34 | + self._load_weights(weights_file) |
| 35 | + |
| 36 | + def _build_net(self): |
| 37 | + """build the network""" |
| 38 | + if self.verbose: |
| 39 | + print("Start to build the network ...") |
| 40 | + self.images = tf.placeholder(tf.float32, [None, 448, 448, 3]) |
| 41 | + net = self._conv_layer(self.images, 1, 64, 7, 2) |
| 42 | + net = self._maxpool_layer(net, 1, 2, 2) |
| 43 | + net = self._conv_layer(net, 2, 192, 3, 1) |
| 44 | + net = self._maxpool_layer(net, 2, 2, 2) |
| 45 | + net = self._conv_layer(net, 3, 128, 1, 1) |
| 46 | + net = self._conv_layer(net, 4, 256, 3, 1) |
| 47 | + net = self._conv_layer(net, 5, 256, 1, 1) |
| 48 | + net = self._conv_layer(net, 6, 512, 3, 1) |
| 49 | + net = self._maxpool_layer(net, 6, 2, 2) |
| 50 | + net = self._conv_layer(net, 7, 256, 1, 1) |
| 51 | + net = self._conv_layer(net, 8, 512, 3, 1) |
| 52 | + net = self._conv_layer(net, 9, 256, 1, 1) |
| 53 | + net = self._conv_layer(net, 10, 512, 3, 1) |
| 54 | + net = self._conv_layer(net, 11, 256, 1, 1) |
| 55 | + net = self._conv_layer(net, 12, 512, 3, 1) |
| 56 | + net = self._conv_layer(net, 13, 256, 1, 1) |
| 57 | + net = self._conv_layer(net, 14, 512, 3, 1) |
| 58 | + net = self._conv_layer(net, 15, 512, 1, 1) |
| 59 | + net = self._conv_layer(net, 16, 1024, 3, 1) |
| 60 | + net = self._maxpool_layer(net, 16, 2, 2) |
| 61 | + net = self._conv_layer(net, 17, 512, 1, 1) |
| 62 | + net = self._conv_layer(net, 18, 1024, 3, 1) |
| 63 | + net = self._conv_layer(net, 19, 512, 1, 1) |
| 64 | + net = self._conv_layer(net, 20, 1024, 3, 1) |
| 65 | + net = self._conv_layer(net, 21, 1024, 3, 1) |
| 66 | + net = self._conv_layer(net, 22, 1024, 3, 2) |
| 67 | + net = self._conv_layer(net, 23, 1024, 3, 1) |
| 68 | + net = self._conv_layer(net, 24, 1024, 3, 1) |
| 69 | + net = self._flatten(net) |
| 70 | + net = self._fc_layer(net, 25, 512, activation=leak_relu) |
| 71 | + net = self._fc_layer(net, 26, 4096, activation=leak_relu) |
| 72 | + net = self._fc_layer(net, 27, self.S*self.S*(self.C+5*self.B)) |
| 73 | + self.predicts = net |
| 74 | + |
| 75 | + def _conv_layer(self, x, id, num_filters, filter_size, stride): |
| 76 | + """Conv layer""" |
| 77 | + in_channels = x.get_shape().as_list()[-1] |
| 78 | + weight = tf.Variable(tf.truncated_normal([filter_size, filter_size, |
| 79 | + in_channels, num_filters], stddev=0.1)) |
| 80 | + bias = tf.Variable(tf.zeros([num_filters,])) |
| 81 | + # padding, note: not using padding="SAME" |
| 82 | + pad_size = filter_size // 2 |
| 83 | + pad_mat = np.array([[0, 0], [pad_size, pad_size], [pad_size, pad_size], [0, 0]]) |
| 84 | + x_pad = tf.pad(x, pad_mat) |
| 85 | + conv = tf.nn.conv2d(x_pad, weight, strides=[1, stride, stride, 1], padding="VALID") |
| 86 | + output = leak_relu(tf.nn.bias_add(conv, bias)) |
| 87 | + if self.verbose: |
| 88 | + print(" Layer %d: type=Conv, num_filter=%d, filter_size=%d, stride=%d, output_shape=%s" \ |
| 89 | + % (id, num_filters, filter_size, stride, str(output.get_shape()))) |
| 90 | + return output |
| 91 | + |
| 92 | + def _fc_layer(self, x, id, num_out, activation=None): |
| 93 | + """fully connected layer""" |
| 94 | + num_in = x.get_shape().as_list()[-1] |
| 95 | + weight = tf.Variable(tf.truncated_normal([num_in, num_out], stddev=0.1)) |
| 96 | + bias = tf.Variable(tf.zeros([num_out,])) |
| 97 | + output = tf.nn.xw_plus_b(x, weight, bias) |
| 98 | + if activation: |
| 99 | + output = activation(output) |
| 100 | + if self.verbose: |
| 101 | + print(" Layer %d: type=Fc, num_out=%d, output_shape=%s" \ |
| 102 | + % (id, num_out, str(output.get_shape()))) |
| 103 | + return output |
| 104 | + |
| 105 | + def _maxpool_layer(self, x, id, pool_size, stride): |
| 106 | + output = tf.nn.max_pool(x, [1, pool_size, pool_size, 1], |
| 107 | + strides=[1, stride, stride, 1], padding="SAME") |
| 108 | + if self.verbose: |
| 109 | + print(" Layer %d: type=MaxPool, pool_size=%d, stride=%d, output_shape=%s" \ |
| 110 | + % (id, pool_size, stride, str(output.get_shape()))) |
| 111 | + return output |
| 112 | + |
| 113 | + def _flatten(self, x): |
| 114 | + """flatten the x""" |
| 115 | + tran_x = tf.transpose(x, [0, 3, 1, 2]) # channle first mode |
| 116 | + nums = np.product(x.get_shape().as_list()[1:]) |
| 117 | + return tf.reshape(tran_x, [-1, nums]) |
| 118 | + |
| 119 | + def _load_weights(self, weights_file): |
| 120 | + """Load weights from file""" |
| 121 | + if self.verbose: |
| 122 | + print("Start to load weights from file:%s" % (weights_file)) |
| 123 | + saver = tf.train.Saver() |
| 124 | + saver.restore(self.sess, weights_file) |
| 125 | + |
| 126 | + def detect_from_file(self, image_file, imshow=True, deteted_boxes_file="boxes.txt", |
| 127 | + detected_image_file="detected_image.jpg"): |
| 128 | + """Do detection given a image file""" |
| 129 | + # read image |
| 130 | + image = cv2.imread(image_file) |
| 131 | + img_h, img_w, _ = image.shape |
| 132 | + predicts = self._detect_from_image(image) |
| 133 | + predict_boxes = self._interpret_predicts(predicts, img_h, img_w) |
| 134 | + self.show_results(image, predict_boxes, imshow, deteted_boxes_file, detected_image_file) |
| 135 | + |
| 136 | + def _detect_from_image(self, image): |
| 137 | + """Do detection given a cv image""" |
| 138 | + img_resized = cv2.resize(image, (448, 448)) |
| 139 | + img_RGB = cv2.cvtColor(img_resized, cv2.COLOR_BGR2RGB) |
| 140 | + img_resized_np = np.asarray(img_RGB) |
| 141 | + _images = np.zeros((1, 448, 448, 3), dtype=np.float32) |
| 142 | + _images[0] = (img_resized_np / 255.0) * 2.0 - 1.0 |
| 143 | + predicts = self.sess.run(self.predicts, feed_dict={self.images: _images})[0] |
| 144 | + return predicts |
| 145 | + |
| 146 | + def _interpret_predicts(self, predicts, img_h, img_w): |
| 147 | + """Interpret the predicts and get the detetction boxes""" |
| 148 | + idx1 = self.S*self.S*self.C |
| 149 | + idx2 = idx1 + self.S*self.S*self.B |
| 150 | + # class prediction |
| 151 | + class_probs = np.reshape(predicts[:idx1], [self.S, self.S, self.C]) |
| 152 | + # confidence |
| 153 | + confs = np.reshape(predicts[idx1:idx2], [self.S, self.S, self.B]) |
| 154 | + # boxes -> (x, y, w, h) |
| 155 | + boxes = np.reshape(predicts[idx2:], [self.S, self.S, self.B, 4]) |
| 156 | + |
| 157 | + # convert the x, y to the coordinates relative to the top left point of the image |
| 158 | + boxes[:, :, :, 0] += self.x_offset |
| 159 | + boxes[:, :, :, 1] += self.y_offset |
| 160 | + boxes[:, :, :, :2] /= self.S |
| 161 | + |
| 162 | + # the predictions of w, h are the square root |
| 163 | + boxes[:, :, :, 2:] = np.square(boxes[:, :, :, 2:]) |
| 164 | + |
| 165 | + # multiply the width and height of image |
| 166 | + boxes[:, :, :, 0] *= img_w |
| 167 | + boxes[:, :, :, 1] *= img_h |
| 168 | + boxes[:, :, :, 2] *= img_w |
| 169 | + boxes[:, :, :, 3] *= img_h |
| 170 | + |
| 171 | + # class-specific confidence scores [S, S, B, C] |
| 172 | + scores = np.expand_dims(confs, -1) * np.expand_dims(class_probs, 2) |
| 173 | + |
| 174 | + scores = np.reshape(scores, [-1, self.C]) # [S*S*B, C] |
| 175 | + boxes = np.reshape(boxes, [-1, 4]) # [S*S*B, 4] |
| 176 | + |
| 177 | + # filter the boxes when score < threhold |
| 178 | + scores[scores < self.threshold] = 0.0 |
| 179 | + |
| 180 | + # non max suppression |
| 181 | + self._non_max_suppression(scores, boxes) |
| 182 | + |
| 183 | + # report the boxes |
| 184 | + predict_boxes = [] # (class, x, y, w, h, scores) |
| 185 | + max_idxs = np.argmax(scores, axis=1) |
| 186 | + for i in range(len(scores)): |
| 187 | + max_idx = max_idxs[i] |
| 188 | + if scores[i, max_idx] > 0.0: |
| 189 | + predict_boxes.append((self.classes[max_idx], boxes[i, 0], boxes[i, 1], |
| 190 | + boxes[i, 2], boxes[i, 3], scores[i, max_idx])) |
| 191 | + return predict_boxes |
| 192 | + |
| 193 | + def _non_max_suppression(self, scores, boxes): |
| 194 | + """Non max suppression""" |
| 195 | + # for each class |
| 196 | + for c in range(self.C): |
| 197 | + sorted_idxs = np.argsort(scores[:, c]) |
| 198 | + last = len(sorted_idxs) - 1 |
| 199 | + while last > 0: |
| 200 | + if scores[sorted_idxs[last], c] < 1e-6: |
| 201 | + break |
| 202 | + for i in range(last): |
| 203 | + if scores[sorted_idxs[i], c] < 1e-6: |
| 204 | + continue |
| 205 | + if self._iou(boxes[sorted_idxs[i]], boxes[sorted_idxs[last]]) > self.iou_threshold: |
| 206 | + scores[sorted_idxs[i], c] = 0.0 |
| 207 | + last -= 1 |
| 208 | + |
| 209 | + def _iou(self, box1, box2): |
| 210 | + """Compute the iou of two boxes""" |
| 211 | + |
| 212 | + inter_w = np.minimum(box1[0]+0.5*box1[2], box2[0]+0.5*box2[2]) - \ |
| 213 | + np.maximum(box1[0]-0.5*box2[2], box2[0]-0.5*box2[2]) |
| 214 | + inter_h = np.minimum(box1[1]+0.5*box1[3], box2[1]+0.5*box2[3]) - \ |
| 215 | + np.maximum(box1[1]-0.5*box2[3], box2[1]-0.5*box2[3]) |
| 216 | + if inter_h < 0 or inter_w < 0: |
| 217 | + inter = 0 |
| 218 | + else: |
| 219 | + inter = inter_w * inter_h |
| 220 | + union = box1[2]*box1[3] + box2[2]*box2[3] - inter |
| 221 | + return inter / union |
| 222 | + |
| 223 | + def show_results(self, image, results, imshow=True, deteted_boxes_file=None, |
| 224 | + detected_image_file=None): |
| 225 | + """Show the detection boxes""" |
| 226 | + img_cp = image.copy() |
| 227 | + if deteted_boxes_file: |
| 228 | + f = open(deteted_boxes_file, "w") |
| 229 | + # draw boxes |
| 230 | + for i in range(len(results)): |
| 231 | + x = int(results[i][1]) |
| 232 | + y = int(results[i][2]) |
| 233 | + w = int(results[i][3]) // 2 |
| 234 | + h = int(results[i][4]) // 2 |
| 235 | + if self.verbose: |
| 236 | + print(" class: %s, [x, y, w, h]=[%d, %d, %d, %d], confidence=%f" % (results[i][0], |
| 237 | + x, y, w, h, results[i][-1])) |
| 238 | + |
| 239 | + cv2.rectangle(img_cp, (x - w, y - h), (x + w, y + h), (0, 255, 0), 2) |
| 240 | + cv2.rectangle(img_cp, (x - w, y - h - 20), (x + w, y - h), (125, 125, 125), -1) |
| 241 | + cv2.putText(img_cp, results[i][0] + ' : %.2f' % results[i][5], (x - w + 5, y - h - 7), |
| 242 | + cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1) |
| 243 | + if deteted_boxes_file: |
| 244 | + f.write(results[i][0] + ',' + str(x) + ',' + str(y) + ',' + |
| 245 | + str(w) + ',' + str(h)+',' + str(results[i][5]) + '\n') |
| 246 | + if imshow: |
| 247 | + cv2.imshow('YOLO_small detection', img_cp) |
| 248 | + cv2.waitKey(1) |
| 249 | + if detected_image_file: |
| 250 | + cv2.imwrite(detected_image_file, img_cp) |
| 251 | + if deteted_boxes_file: |
| 252 | + f.close() |
| 253 | + |
| 254 | +if __name__ == "__main__": |
| 255 | + yolo_net = Yolo("./weights/YOLO_small.ckpt") |
| 256 | + yolo_net.detect_from_file("./test/car.jpg") |
| 257 | + |
| 258 | + |
| 259 | + |
0 commit comments