gaze_estimation.py 16.4 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458
import argparse
import time
import numpy as np
import cv2 as cv


# ------------------------Service operations------------------------
def weight_path(model_path):
    """ Get path of weights based on path to IR

    Params:
    model_path: the string contains path to IR file

    Return:
    Path to weights file
    """
    assert model_path.endswith('.xml'), "Wrong topology path was provided"
    return model_path[:-3] + 'bin'


def build_argparser():
    """ Parse arguments from command line

    Return:
    Pack of arguments from command line
    """
    parser = argparse.ArgumentParser(description='This is an OpenCV-based version of Gaze Estimation example')

    parser.add_argument('--input',
                        help='Path to the input video file')
    parser.add_argument('--out',
                        help='Path to the output video file')
    parser.add_argument('--facem',
                        default='face-detection-retail-0005.xml',
                        help='Path to OpenVINO face detection model (.xml)')
    parser.add_argument('--faced',
                        default='CPU',
                        help='Target device for the face detection' +
                        '(e.g. CPU, GPU, VPU, ...)')
    parser.add_argument('--headm',
                        default='head-pose-estimation-adas-0001.xml',
                        help='Path to OpenVINO head pose estimation model (.xml)')
    parser.add_argument('--headd',
                        default='CPU',
                        help='Target device for the head pose estimation inference ' +
                        '(e.g. CPU, GPU, VPU, ...)')
    parser.add_argument('--landm',
                        default='facial-landmarks-35-adas-0002.xml',
                        help='Path to OpenVINO landmarks detector model (.xml)')
    parser.add_argument('--landd',
                        default='CPU',
                        help='Target device for the landmarks detector (e.g. CPU, GPU, VPU, ...)')
    parser.add_argument('--gazem',
                        default='gaze-estimation-adas-0002.xml',
                        help='Path to OpenVINO gaze vector estimaiton model (.xml)')
    parser.add_argument('--gazed',
                        default='CPU',
                        help='Target device for the gaze vector estimation inference ' +
                        '(e.g. CPU, GPU, VPU, ...)')
    parser.add_argument('--eyem',
                        default='open-closed-eye-0001.xml',
                        help='Path to OpenVINO open closed eye model (.xml)')
    parser.add_argument('--eyed',
                        default='CPU',
                        help='Target device for the eyes state inference (e.g. CPU, GPU, VPU, ...)')
    return parser


# ------------------------Support functions for custom kernels------------------------
def intersection(surface, rect):
    """ Remove zone of out of bound from ROI

    Params:
    surface: image bounds is rect representation (top left coordinates and width and height)
    rect: region of interest is also has rect representation

    Return:
    Modified ROI with correct bounds
    """
    l_x = max(surface[0], rect[0])
    l_y = max(surface[1], rect[1])
    width = min(surface[0] + surface[2], rect[0] + rect[2]) - l_x
    height = min(surface[1] + surface[3], rect[1] + rect[3]) - l_y
    if width < 0 or height < 0:
        return (0, 0, 0, 0)
    return (l_x, l_y, width, height)


def process_landmarks(r_x, r_y, r_w, r_h, landmarks):
    """ Create points from result of inference of facial-landmarks network and size of input image

    Params:
    r_x: x coordinate of top left corner of input image
    r_y: y coordinate of top left corner of input image
    r_w: width of input image
    r_h: height of input image
    landmarks: result of inference of facial-landmarks network

    Return:
    Array of landmarks points for one face
    """
    lmrks = landmarks[0]
    raw_x = lmrks[::2] * r_w + r_x
    raw_y = lmrks[1::2] * r_h + r_y
    return np.array([[int(x), int(y)] for x, y in zip(raw_x, raw_y)])


def eye_box(p_1, p_2, scale=1.8):
    """ Get bounding box of eye

    Params:
    p_1: point of left edge of eye
    p_2: point of right edge of eye
    scale: change size of box with this value

    Return:
    Bounding box of eye and its midpoint
    """

    size = np.linalg.norm(p_1 - p_2)
    midpoint = (p_1 + p_2) / 2
    width = scale * size
    height = width
    p_x = midpoint[0] - (width / 2)
    p_y = midpoint[1] - (height / 2)
    return (int(p_x), int(p_y), int(width), int(height)), list(map(int, midpoint))


# ------------------------Custom graph operations------------------------
@cv.gapi.op('custom.GProcessPoses',
            in_types=[cv.GArray.GMat, cv.GArray.GMat, cv.GArray.GMat],
            out_types=[cv.GArray.GMat])
class GProcessPoses:
    @staticmethod
    def outMeta(arr_desc0, arr_desc1, arr_desc2):
        return cv.empty_array_desc()


@cv.gapi.op('custom.GParseEyes',
            in_types=[cv.GArray.GMat, cv.GArray.Rect, cv.GOpaque.Size],
            out_types=[cv.GArray.Rect, cv.GArray.Rect, cv.GArray.Point, cv.GArray.Point])
class GParseEyes:
    @staticmethod
    def outMeta(arr_desc0, arr_desc1, arr_desc2):
        return cv.empty_array_desc(), cv.empty_array_desc(), \
               cv.empty_array_desc(), cv.empty_array_desc()


@cv.gapi.op('custom.GGetStates',
            in_types=[cv.GArray.GMat, cv.GArray.GMat],
            out_types=[cv.GArray.Int, cv.GArray.Int])
class GGetStates:
    @staticmethod
    def outMeta(arr_desc0, arr_desc1):
        return cv.empty_array_desc(), cv.empty_array_desc()


# ------------------------Custom kernels------------------------
@cv.gapi.kernel(GProcessPoses)
class GProcessPosesImpl:
    """ Custom kernel. Processed poses of heads
    """
    @staticmethod
    def run(in_ys, in_ps, in_rs):
        """ Сustom kernel executable code

        Params:
        in_ys: yaw angle of head
        in_ps: pitch angle of head
        in_rs: roll angle of head

        Return:
        Arrays with heads poses
        """
        return [np.array([ys[0], ps[0], rs[0]]).T for ys, ps, rs in zip(in_ys, in_ps, in_rs)]


@cv.gapi.kernel(GParseEyes)
class GParseEyesImpl:
    """ Custom kernel. Get information about eyes
    """
    @staticmethod
    def run(in_landm_per_face, in_face_rcs, frame_size):
        """ Сustom kernel executable code

        Params:
        in_landm_per_face: landmarks from inference of facial-landmarks network for each face
        in_face_rcs: bounding boxes for each face
        frame_size: size of input image

        Return:
        Arrays of ROI for left and right eyes, array of midpoints and
        array of landmarks points
        """
        left_eyes = []
        right_eyes = []
        midpoints = []
        lmarks = []
        surface = (0, 0, *frame_size)
        for landm_face, rect in zip(in_landm_per_face, in_face_rcs):
            points = process_landmarks(*rect, landm_face)
            lmarks.extend(points)

            rect, midpoint_l = eye_box(points[0], points[1])
            left_eyes.append(intersection(surface, rect))

            rect, midpoint_r = eye_box(points[2], points[3])
            right_eyes.append(intersection(surface, rect))

            midpoints.append(midpoint_l)
            midpoints.append(midpoint_r)
        return left_eyes, right_eyes, midpoints, lmarks


@cv.gapi.kernel(GGetStates)
class GGetStatesImpl:
    """ Custom kernel. Get state of eye - open or closed
    """
    @staticmethod
    def run(eyesl, eyesr):
        """ Сustom kernel executable code

        Params:
        eyesl: result of inference of open-closed-eye network for left eye
        eyesr: result of inference of open-closed-eye network for right eye

        Return:
        States of left eyes and states of right eyes
        """
        out_l_st = [int(st) for eye_l in eyesl for st in (eye_l[:, 0] < eye_l[:, 1]).ravel()]
        out_r_st = [int(st) for eye_r in eyesr for st in (eye_r[:, 0] < eye_r[:, 1]).ravel()]
        return out_l_st, out_r_st


if __name__ == '__main__':
    ARGUMENTS = build_argparser().parse_args()

    # ------------------------Demo's graph------------------------
    g_in = cv.GMat()

    # Detect faces
    face_inputs = cv.GInferInputs()
    face_inputs.setInput('data', g_in)
    face_outputs = cv.gapi.infer('face-detection', face_inputs)
    faces = face_outputs.at('detection_out')

    # Parse faces
    sz = cv.gapi.streaming.size(g_in)
    faces_rc = cv.gapi.parseSSD(faces, sz, 0.5, False, False)

    # Detect poses
    head_inputs = cv.GInferInputs()
    head_inputs.setInput('data', g_in)
    face_outputs = cv.gapi.infer('head-pose', faces_rc, head_inputs)
    angles_y = face_outputs.at('angle_y_fc')
    angles_p = face_outputs.at('angle_p_fc')
    angles_r = face_outputs.at('angle_r_fc')

    # Parse poses
    heads_pos = GProcessPoses.on(angles_y, angles_p, angles_r)

    # Detect landmarks
    landmark_inputs = cv.GInferInputs()
    landmark_inputs.setInput('data', g_in)
    landmark_outputs = cv.gapi.infer('facial-landmarks', faces_rc,
                                     landmark_inputs)
    landmark = landmark_outputs.at('align_fc3')

    # Parse landmarks
    left_eyes, right_eyes, mids, lmarks = GParseEyes.on(landmark, faces_rc, sz)

    # Detect eyes
    eyes_inputs = cv.GInferInputs()
    eyes_inputs.setInput('input.1', g_in)
    eyesl_outputs = cv.gapi.infer('open-closed-eye', left_eyes, eyes_inputs)
    eyesr_outputs = cv.gapi.infer('open-closed-eye', right_eyes, eyes_inputs)
    eyesl = eyesl_outputs.at('19')
    eyesr = eyesr_outputs.at('19')

    # Process eyes states
    l_eye_st, r_eye_st = GGetStates.on(eyesl, eyesr)

    # Gaze estimation
    gaze_inputs = cv.GInferListInputs()
    gaze_inputs.setInput('left_eye_image', left_eyes)
    gaze_inputs.setInput('right_eye_image', right_eyes)
    gaze_inputs.setInput('head_pose_angles', heads_pos)
    gaze_outputs = cv.gapi.infer2('gaze-estimation', g_in, gaze_inputs)
    gaze_vectors = gaze_outputs.at('gaze_vector')

    out = cv.gapi.copy(g_in)
    # ------------------------End of graph------------------------

    comp = cv.GComputation(cv.GIn(g_in), cv.GOut(out,
                                                 faces_rc,
                                                 left_eyes,
                                                 right_eyes,
                                                 gaze_vectors,
                                                 angles_y,
                                                 angles_p,
                                                 angles_r,
                                                 l_eye_st,
                                                 r_eye_st,
                                                 mids,
                                                 lmarks))

    # Networks
    face_net = cv.gapi.ie.params('face-detection', ARGUMENTS.facem,
                                 weight_path(ARGUMENTS.facem), ARGUMENTS.faced)
    head_pose_net = cv.gapi.ie.params('head-pose', ARGUMENTS.headm,
                                      weight_path(ARGUMENTS.headm), ARGUMENTS.headd)
    landmarks_net = cv.gapi.ie.params('facial-landmarks', ARGUMENTS.landm,
                                      weight_path(ARGUMENTS.landm), ARGUMENTS.landd)
    gaze_net = cv.gapi.ie.params('gaze-estimation', ARGUMENTS.gazem,
                                 weight_path(ARGUMENTS.gazem), ARGUMENTS.gazed)
    eye_net = cv.gapi.ie.params('open-closed-eye', ARGUMENTS.eyem,
                                weight_path(ARGUMENTS.eyem), ARGUMENTS.eyed)

    nets = cv.gapi.networks(face_net, head_pose_net, landmarks_net, gaze_net, eye_net)

    # Kernels pack
    kernels = cv.gapi.kernels(GParseEyesImpl, GProcessPosesImpl, GGetStatesImpl)

    # ------------------------Execution part------------------------
    ccomp = comp.compileStreaming(args=cv.gapi.compile_args(kernels, nets))
    source = cv.gapi.wip.make_capture_src(ARGUMENTS.input)
    ccomp.setSource(cv.gin(source))
    ccomp.start()

    frames = 0
    fps = 0
    print('Processing')
    START_TIME = time.time()

    while True:
        start_time_cycle = time.time()
        has_frame, (oimg,
                    outr,
                    l_eyes,
                    r_eyes,
                    outg,
                    out_y,
                    out_p,
                    out_r,
                    out_st_l,
                    out_st_r,
                    out_mids,
                    outl) = ccomp.pull()

        if not has_frame:
            break

        # Draw
        GREEN = (0, 255, 0)
        RED = (0, 0, 255)
        WHITE = (255, 255, 255)
        BLUE = (255, 0, 0)
        PINK = (255, 0, 255)
        YELLOW = (0, 255, 255)

        M_PI_180 = np.pi / 180
        M_PI_2 = np.pi / 2
        M_PI = np.pi

        FACES_SIZE = len(outr)

        for i, out_rect in enumerate(outr):
            # Face box
            cv.rectangle(oimg, out_rect, WHITE, 1)
            rx, ry, rwidth, rheight = out_rect

            # Landmarks
            lm_radius = int(0.01 * rwidth + 1)
            lmsize = int(len(outl) / FACES_SIZE)
            for j in range(lmsize):
                cv.circle(oimg, outl[j + i * lmsize], lm_radius, YELLOW, -1)

            # Headposes
            yaw = out_y[i]
            pitch = out_p[i]
            roll = out_r[i]
            sin_y = np.sin(yaw[:] * M_PI_180)
            sin_p = np.sin(pitch[:] * M_PI_180)
            sin_r = np.sin(roll[:] * M_PI_180)

            cos_y = np.cos(yaw[:] * M_PI_180)
            cos_p = np.cos(pitch[:] * M_PI_180)
            cos_r = np.cos(roll[:] * M_PI_180)

            axis_length = 0.4 * rwidth
            x_center = int(rx + rwidth / 2)
            y_center = int(ry + rheight / 2)

            # center to right
            cv.line(oimg, [x_center, y_center],
                    [int(x_center + axis_length * (cos_r * cos_y + sin_y * sin_p * sin_r)),
                     int(y_center + axis_length * cos_p * sin_r)],
                    RED, 2)

            # center to top
            cv.line(oimg, [x_center, y_center],
                    [int(x_center + axis_length * (cos_r * sin_y * sin_p + cos_y * sin_r)),
                     int(y_center - axis_length * cos_p * cos_r)],
                    GREEN, 2)

            # center to forward
            cv.line(oimg, [x_center, y_center],
                    [int(x_center + axis_length * sin_y * cos_p),
                     int(y_center + axis_length * sin_p)],
                    PINK, 2)

            scale_box = 0.002 * rwidth
            cv.putText(oimg, "head pose: (y=%0.0f, p=%0.0f, r=%0.0f)" %
                       (np.round(yaw), np.round(pitch), np.round(roll)),
                       [int(rx), int(ry + rheight + 5 * rwidth / 100)],
                       cv.FONT_HERSHEY_PLAIN, scale_box * 2, WHITE, 1)

            # Eyes boxes
            color_l = GREEN if out_st_l[i] else RED
            cv.rectangle(oimg, l_eyes[i], color_l, 1)
            color_r = GREEN if out_st_r[i] else RED
            cv.rectangle(oimg, r_eyes[i], color_r, 1)

            # Gaze vectors
            norm_gazes = np.linalg.norm(outg[i][0])
            gaze_vector = outg[i][0] / norm_gazes

            arrow_length = 0.4 * rwidth
            gaze_arrow = [arrow_length * gaze_vector[0], -arrow_length * gaze_vector[1]]
            left_arrow = [int(a+b) for a, b in zip(out_mids[0 + i * 2], gaze_arrow)]
            right_arrow = [int(a+b) for a, b in zip(out_mids[1 + i * 2], gaze_arrow)]
            if out_st_l[i]:
                cv.arrowedLine(oimg, out_mids[0 + i * 2], left_arrow, BLUE, 2)
            if out_st_r[i]:
                cv.arrowedLine(oimg, out_mids[1 + i * 2], right_arrow, BLUE, 2)

            v0, v1, v2 = outg[i][0]

            gaze_angles = [180 / M_PI * (M_PI_2 + np.arctan2(v2, v0)),
                           180 / M_PI * (M_PI_2 - np.arccos(v1 / norm_gazes))]
            cv.putText(oimg, "gaze angles: (h=%0.0f, v=%0.0f)" %
                       (np.round(gaze_angles[0]), np.round(gaze_angles[1])),
                       [int(rx), int(ry + rheight + 12 * rwidth / 100)],
                       cv.FONT_HERSHEY_PLAIN, scale_box * 2, WHITE, 1)

        # Add FPS value to frame
        cv.putText(oimg, "FPS: %0i" % (fps), [int(20), int(40)],
                   cv.FONT_HERSHEY_PLAIN, 2, RED, 2)

        # Show result
        cv.imshow('Gaze Estimation', oimg)
        cv.waitKey(1)

        fps = int(1. / (time.time() - start_time_cycle))
        frames += 1
    EXECUTION_TIME = time.time() - START_TIME
    print('Execution successful')
    print('Mean FPS is ', int(frames / EXECUTION_TIME))