From c4c52959d2073db4e580c3301cf85cd822a1883f Mon Sep 17 00:00:00 2001 From: Adrian Bulat Date: Sun, 17 Sep 2017 20:35:51 +0100 Subject: [PATCH] Initial commit --- README.rst | 0 examples/detect_landmarks_in_image.py | 1 + face_alignment/__init__.py | 8 + face_alignment/api.py | 174 ++++++++++++++++++ face_alignment/models.py | 250 ++++++++++++++++++++++++++ face_alignment/utils.py | 209 +++++++++++++++++++++ setup.cfg | 29 +++ setup.py | 55 ++++++ 8 files changed, 726 insertions(+) create mode 100644 README.rst create mode 100644 examples/detect_landmarks_in_image.py create mode 100644 face_alignment/__init__.py create mode 100644 face_alignment/api.py create mode 100644 face_alignment/models.py create mode 100644 face_alignment/utils.py create mode 100644 setup.cfg create mode 100644 setup.py diff --git a/README.rst b/README.rst new file mode 100644 index 00000000..e69de29b diff --git a/examples/detect_landmarks_in_image.py b/examples/detect_landmarks_in_image.py new file mode 100644 index 00000000..e071961f --- /dev/null +++ b/examples/detect_landmarks_in_image.py @@ -0,0 +1 @@ +import face_alignment as fa diff --git a/face_alignment/__init__.py b/face_alignment/__init__.py new file mode 100644 index 00000000..40c6bf0d --- /dev/null +++ b/face_alignment/__init__.py @@ -0,0 +1,8 @@ +# -*- coding: utf-8 -*- + +__author__ = """Adrian Bulat""" +__email__ = 'adrian.bulat@nottingham.ac.uk' +__version__ = '0.1.0' + +from .api import FaceAlignment, LandmarksType, NetworkSize + \ No newline at end of file diff --git a/face_alignment/api.py b/face_alignment/api.py new file mode 100644 index 00000000..e8206d6d --- /dev/null +++ b/face_alignment/api.py @@ -0,0 +1,174 @@ +import os +import glob +import dlib +import torch +import torch.nn as nn +from torch.autograd import Variable +from enum import Enum +from skimage import io +import urllib.request + +from .models import FAN, ResNetDepth +from .utils import * + +class LandmarksType(Enum): + _2D = 1 + _2halfD = 2 + _3D = 3 + +class NetworkSize(Enum): + #TINY = 1 + #SMALL = 2 + #MEDIUM = 3 + LARGE = 4 + + def __new__(cls, value): + member = object.__new__(cls) + member._value_ = value + return member + + def __int__(self): + return self.value + +class FaceAlignment: + """Initialize the face alignment pipeline + + Args: + landmarks_type (``LandmarksType`` object): an enum defining the type of predicted points. + network_size (``NetworkSize`` object): an enum defining the size of the network (for the 2D and 2.5D points). + enable_cuda (bool, optional): If True, all the computations will be done on a CUDA-enabled GPU (recommended). + enable_cudnn (bool, optional): If True, cudnn library will be used + flip_input (bool, optional): Increase the network accuracy by doing a second forward passed with the flipped version of the image + + Example: + >>> FaceAlignment(NetworkSize.2D, flip_input=False) + """ + def __init__(self, landmarks_type, network_size=NetworkSize.LARGE, enable_cuda=True, enabled_cudnn=True, flip_input = True): + self.enable_cuda = enable_cuda + self.flip_input = flip_input + base_path = os.path.join(appdata_dir('face_alignment'),"data") + # Initialise the face detector + if enable_cuda: + path_to_detector = os.path.join(base_path, "mmod_human_face_detector.dat") + if not os.path.isfile(path_to_detector): + if not os.path.exists(base_path): + os.makedirs(base_path) + print("Downloading the face detection CNN. Please wait...") + + urllib.request.urlretrieve("https://www.adrianbulat.com/downloads/dlib/mmod_human_face_detector.dat", + os.path.join(path_to_detector), reporthook) + + self.face_detector = dlib.cnn_face_detection_model_v1(path_to_detector) + + else: + self.face_detector = dlib.get_frontal_face_detector() + + # Initialise the face alignemnt networks + self.face_alignemnt_net = nn.DataParallel(FAN(int(network_size))) + if landmarks_type == LandmarksType._2D: + network_name = '2DFAN-'+str(int(network_size))+'.pth.tar' + else: + network_name = '2DFAN-'+str(int(network_size))+'.pth.tar' + fan_path = os.path.join(base_path, network_name) + + if not os.path.isfile(path_to_detector): + print("Downloading the Face Alignment Network(FAN). Please wait...") + + urllib.request.urlretrieve("https://www.adrianbulat.com/downloads/python-fan/"+network_name, + os.path.join(fan_path), reporthook) + + fan_weights = torch.load(fan_path) + self.face_alignemnt_net.load_state_dict(fan_weights['state_dict']) + + if self.enable_cuda: + self.face_alignemnt_net.cuda() + self.face_alignemnt_net.train() + + # Initialiase the depth prediciton network + if landmarks_type == LandmarksType._3D: + self.depth_prediciton_net = nn.DataParallel(ResNetDepth()) + depth_model_path = os.path.join(base_path,'depth.pth.tar') + if not os.path.isfile(depth_model_path): + print("Downloading the Face Alignment depth Network (FAN-D). Please wait...") + + urllib.request.urlretrieve("https://www.adrianbulat.com/downloads/python-fan/depth.pth.tar", + os.path.join(depth_model_path), reporthook) + + depth_weights = torch.load(os.path.join(base_path,'depth_model_path')) + self.depth_prediciton_net.load_state_dict(depth_weights['state_dict']) + + if enable_cuda: + self.depth_prediciton_net.cuda() + self.depth_prediciton_net.eval() + + def detect_faces(self, image): + """Run the dlib face detector over an image + + Args: + image (``ndarray`` object or string): either the path to the image or an image previosly opened on which face detection will be performed. + + Returns: + Returns a list of detected faces + """ + return self.face_detector(image, 1) + + def get_landmarks(self, input_image, all_faces=False): + if type(input_image) is str: + try: + image = io.imread(input_image) + except IOError: + print("error opening file :: ", input_image) + return None + else: + image = input_image + + detected_faces = self.detect_faces(image) + if len(detected_faces)>0: + landmarks = [] + for i, d in enumerate(detected_faces): + if i>1 and not all_faces: + break + + center = torch.FloatTensor([d.rect.right()-(d.rect.right()-d.rect.left())/2,d.rect.bottom()-(d.rect.bottom()-d.rect.top())/2]) + center[1] = center[1] - (d.rect.bottom()-d.rect.top())*0.1 + scale = (d.rect.right()-d.rect.left()+d.rect.bottom()-d.rect.top())/200 + + inp = crop(image, center, scale) + inp = torch.from_numpy(inp.transpose((2,0,1))).float().div(255.0).unsqueeze_(0) + + if self.enable_cuda: + inp.cuda() + + out = self.face_alignemnt_net(Variable(inp, volatile=True))[-1].data.cpu() + if self.flip_input: + out += flip(self.face_alignemnt_net(Variable(flip(inp), volatile=True))[-1].data.cpu(), is_label=True) + + pts, pts_img = get_preds_fromhm(out, center, scale) + + landmarks.append(pts_img.numpy()) + #TODO Remove plot + import matplotlib.pyplot as plt + plt.imshow(inp.view(3,256,256).float().numpy().swapaxes(0,1).swapaxes(1,2)) + pts = pts.view(68,2).numpy() + plt.plot(pts[:,0],pts[:,1],'ro') + plt.show() + + else: + print("Warning: No faces were detected.") + return None + + return landmarks + + + def process_folder(self, path, all_faces=False): + types = ('*.jpg', '.png') + images_list = [] + for files in types: + images_list.extend(glob.glob(files)) + + predictions = [] + for image_name in images_list: + predictions.append(image_name, self.get_landmarks(image_name, all_faces)) + + return predictions + diff --git a/face_alignment/models.py b/face_alignment/models.py new file mode 100644 index 00000000..3b41afc5 --- /dev/null +++ b/face_alignment/models.py @@ -0,0 +1,250 @@ +import torch +import torch.nn as nn +import torch.nn.functional as F +import math + +def conv3x3(in_planes, out_planes, strd=1, padding=1, bias=False): + "3x3 convolution with padding" + return nn.Conv2d(in_planes, out_planes, kernel_size=3, + stride=strd, padding=padding, bias=bias) + +class ConvBlock(nn.Module): + def __init__(self, in_planes, out_planes): + super(ConvBlock, self).__init__() + self.bn1 = nn.BatchNorm2d(in_planes) + self.conv1 = conv3x3(in_planes, int(out_planes/2)) + self.bn2 = nn.BatchNorm2d(int(out_planes/2)) + self.conv2 = conv3x3(int(out_planes/2), int(out_planes/4)) + self.bn3 = nn.BatchNorm2d(int(out_planes/4)) + self.conv3 = conv3x3(int(out_planes/4), int(out_planes/4)) + + if in_planes != out_planes: + self.downsample = nn.Sequential( + nn.BatchNorm2d(in_planes), + nn.ReLU(True), + nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=1, bias=False), + ) + else: + self.downsample = None + + def forward(self, x): + residual = x + + out1 = self.bn1(x) + out1 = F.relu(out1,True) + out1 = self.conv1(out1) + + out2 = self.bn2(out1) + out2 = F.relu(out2,True) + out2 = self.conv2(out2) + + out3 = self.bn3(out2) + out3 = F.relu(out3,True) + out3 = self.conv3(out3) + + out3 = torch.cat((out1,out2,out3),1) + + if self.downsample is not None: + residual = self.downsample(residual) + + out3 += residual + + return out3 + +class Bottleneck(nn.Module): + + expansion = 4 + + def __init__(self, inplanes, planes, stride=1, downsample=None): + super(Bottleneck, self).__init__() + self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=1, bias=False) + self.bn1 = nn.BatchNorm2d(planes) + self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, + padding=1, bias=False) + self.bn2 = nn.BatchNorm2d(planes) + self.conv3 = nn.Conv2d(planes, planes * 4, kernel_size=1, bias=False) + self.bn3 = nn.BatchNorm2d(planes * 4) + self.relu = nn.ReLU(inplace=True) + self.downsample = downsample + self.stride = stride + + def forward(self, x): + residual = x + + out = self.conv1(x) + out = self.bn1(out) + out = self.relu(out) + + out = self.conv2(out) + out = self.bn2(out) + out = self.relu(out) + + out = self.conv3(out) + out = self.bn3(out) + + if self.downsample is not None: + residual = self.downsample(x) + + out += residual + out = self.relu(out) + + return out + +class HourGlass(nn.Module): + def __init__(self, num_modules, depth, num_features): + super(HourGlass, self).__init__() + self.num_modules = num_modules + self.depth = depth + self.features = num_features + + self._generate_network(self.depth) + + def _generate_network(self, level): + self.add_module('b1_'+str(level),ConvBlock(256,256)) + + self.add_module('b2_'+str(level),ConvBlock(256,256)) + + if level > 1: + self._generate_network(level-1) + else: + self.add_module('b2_plus_'+str(level),ConvBlock(256,256)) + + self.add_module('b3_'+str(level),ConvBlock(256,256)) + + def _forward(self, level, inp): + # Upper branch + up1 = inp + up1 = self._modules['b1_'+str(level)](up1) + + # Lower branch + low1 = F.max_pool2d(inp,2,stride=2) + low1 = self._modules['b2_'+str(level)](low1) + + if level > 1: + low2 = self._forward(level-1, low1) + else: + low2 = low1 + low2 = self._modules['b2_plus_'+str(level)](low2) + + low3 = low2 + low3 = self._modules['b3_'+str(level)](low3) + + up2 = F.upsample(low3,scale_factor=2,mode='nearest') + + return up1 + up2 + + def forward(self, x): + return self._forward(self.depth,x) + + +class FAN(nn.Module): + + def __init__(self, num_modules=1): + super(FAN, self).__init__() + self.num_modules = num_modules + + # Base part + self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3) + self.bn1 = nn.BatchNorm2d(64) + self.conv2 = ConvBlock(64, 128) + self.conv3 = ConvBlock(128, 128) + self.conv4 = ConvBlock(128, 256) + + # Stacking part + for hg_module in range(self.num_modules): + self.add_module('m'+str(hg_module), HourGlass(1, 4, 256)) + self.add_module('top_m_'+str(hg_module),ConvBlock(256,256)) + self.add_module('conv_last'+str(hg_module), nn.Conv2d(256, 256, kernel_size=1, stride=1, padding=0)) + self.add_module('l'+str(hg_module), nn.Conv2d(256,68, kernel_size=1,stride=1,padding=0)) + self.add_module('bn_end'+str(hg_module), nn.BatchNorm2d(256)) + + if hg_moduleimage.shape[1] or ul[1]>image.shape[0] or br[0]<1 or br[1]<1): + return image + size = 6*sigma+1 + g = _gaussian(size) + g_x = [max(1, -ul[0]), min(br[0], image.shape[1]) - max(1, ul[0]) + max(1, -ul[0])] + g_y = [max(1, -ul[1]), min(br[1], image.shape[0]) - max(1, ul[1]) + max(1, -ul[1])] + img_x = [max(1, ul[0]), min(br[0],image.shape[1])] + img_y = [max(1, ul[1]), min(br[1],image.shape[0])] + assert (g_x[0] > 0 and g_y[1]>0) + image[img_y[0]-1:img_y[1],img_x[0]-1:img_x[1]] = image[img_y[0]-1:img_y[1],img_x[0]-1:img_x[1]]+g[g_y[0]-1:g_y[1],g_x[0]-1:g_x[1]] + image[image>1] = 1 + return image + +def transform(point, center, scale, resolution, invert=False): + _pt = torch.ones(3) + _pt[0] = point[0]; _pt[1] = point[1] + + h = 200*scale + t = torch.eye(3) + t[0,0] = resolution/h + t[1,1] = resolution/h + t[0,2] = resolution * (-center[0]/h+0.5) + t[1,2] = resolution * (-center[1]/h+0.5) + + if invert: + t = torch.inverse(t) + new_point = (t @ _pt)[0:2] + + return new_point.int() + +def crop(image, center, scale, resolution=256): + # Crop around the center point + """ Crops the image around the center. Input is expected to be an np.ndarray """ + ul = transform([1, 1], center, scale, resolution, True) + br = transform([resolution, resolution], center, scale, resolution, True) + pad = math.ceil(torch.norm((ul-br).float())/2 - (br[0]-ul[0])/2) + if image.ndim > 2: + newDim = np.array([br[1]-ul[1], br[0]-ul[0], image.shape[2]], dtype=np.int32) + newImg = np.zeros(newDim, dtype=np.uint8) + else: + newDim = np.array([br[1]-ul[1], br[0]-ul[0]], dtype=np.int) + newImg = np.zeros(newDim, dtype=np.uint8) + ht = image.shape[0] + wd = image.shape[1] + newX = np.array([max(1, -ul[0] + 1), min(br[0], wd) - ul[0]], dtype=np.int32) + newY = np.array([max(1, -ul[1] + 1), min(br[1], ht) - ul[1]], dtype=np.int32) + oldX = np.array([max(1, ul[0] + 1), min(br[0], wd)], dtype=np.int32) + oldY = np.array([max(1, ul[1] + 1), min(br[1], ht)], dtype=np.int32) + newImg[newY[0]-1:newY[1], newX[0]-1:newX[1]] = image[oldY[0]-1:oldY[1], oldX[0]-1:oldX[1], :] + newImg = cv2.resize(newImg, dsize=(resolution, resolution), + interpolation=cv2.INTER_LINEAR) + return newImg + +def get_preds_fromhm(hm, center, scale): + max, idx = torch.max(hm.view(hm.size(0), hm.size(1), hm.size(2)*hm.size(3)),2) + preds = idx.view(idx.size(0),idx.size(1),1).repeat(1,1,2).float() + preds[...,0].apply_(lambda x: (x-1)%hm.size(3)+1) + preds[...,1].add_(-1).div_(hm.size(2)).floor().add_(1) + + for i in range(preds.size(0)): + for j in range (preds.size(1)): + hm_ = hm[i,j,:] + pX, pY = preds[i,j,0], preds[i,j,1] + if pX>1 and pX<64 and pY>1 and pY<64: + diff = torch.FloatTensor([hm_[int(pY),int(pX)+1]-hm_[int(pY),int(pX)-1], hm_[int(pY)+1,int(pX)]-hm_[int(pY)-1,int(pX)]]) + preds[i,j].add(diff.sign().mul(.25)) + + preds.add(-.5) + + preds_orig = torch.zeros(preds.size()) + for i in range(hm.size(0)): + for j in range(hm.size(1)): + preds_orig[i,j] = transform(preds[i,j],center,scale,hm.size(2),True) + + return preds, preds_orig + +# From pyzolib/paths.py (https://bitbucket.org/pyzo/pyzolib/src/tip/paths.py) +def appdata_dir(appname=None, roaming=False): + """ appdata_dir(appname=None, roaming=False) + + Get the path to the application directory, where applications are allowed + to write user specific files (e.g. configurations). For non-user specific + data, consider using common_appdata_dir(). + If appname is given, a subdir is appended (and created if necessary). + If roaming is True, will prefer a roaming directory (Windows Vista/7). + """ + + # Define default user directory + userDir = os.getenv('FACEALIGNMENT_USERDIR', None) + if userDir is None: + userDir = os.path.expanduser('~') + if not os.path.isdir(userDir): # pragma: no cover + userDir = '/var/tmp' # issue #54 + + # Get system app data dir + path = None + if sys.platform.startswith('win'): + path1, path2 = os.getenv('LOCALAPPDATA'), os.getenv('APPDATA') + path = (path2 or path1) if roaming else (path1 or path2) + elif sys.platform.startswith('darwin'): + path = os.path.join(userDir, 'Library', 'Application Support') + # On Linux and as fallback + if not (path and os.path.isdir(path)): + path = userDir + + # Maybe we should store things local to the executable (in case of a + # portable distro or a frozen application that wants to be portable) + prefix = sys.prefix + if getattr(sys, 'frozen', None): + prefix = os.path.abspath(os.path.dirname(sys.executable)) + for reldir in ('settings', '../settings'): + localpath = os.path.abspath(os.path.join(prefix, reldir)) + if os.path.isdir(localpath): # pragma: no cover + try: + open(os.path.join(localpath, 'test.write'), 'wb').close() + os.remove(os.path.join(localpath, 'test.write')) + except IOError: + pass # We cannot write in this directory + else: + path = localpath + break + + # Get path specific for this app + if appname: + if path == userDir: + appname = '.' + appname.lstrip('.') # Make it a hidden directory + path = os.path.join(path, appname) + if not os.path.isdir(path): # pragma: no cover + os.mkdir(path) + + # Done + return path + +def reporthook(count, block_size, total_size): + global start_time + if count == 0: + start_time = time.time() + return + duration = time.time() - start_time + progress_size = int(count * block_size) + speed = int(progress_size / (1024 * duration)) + percent = min(int(count*blockSize*100/totalSize),100) + sys.stdout.write("\r...%d%%, %d MB, %d KB/s, %d seconds passed" % + (percent, progress_size / (1024 * 1024), speed, duration)) + sys.stdout.flush() + + +def shuffle_lr(parts, pairs=None): + if pairs is None: + pairs = [[0,16], [1,15], [2,14], [3,13], [4,12], [5,11], [6,10], [7,9], [17,26], [18,25], [19,24], [20,23], [21,22], [36,45], [37,44], [38,43], [39,42], [41,46], [40,47], [31,35], [32,34], [50,52], [49,53], [48,54], [61,63], [60,64], [67,65], [59,55], [58,56]] + for matched_p in pairs: + idx1, idx2 = matched_p[0], matched_p[1] + tmp = np.copy(parts[..., idx1]) + np.copyto(parts[..., idx1], parts[..., idx2]) + np.copyto(parts[..., idx2], tmp) + return parts + +def flip(tensor, is_label=False): + if isinstance(tensor, torch.Tensor): + tensor = tensor.numpy() + was_squeezed = False + if tensor.ndim == 4: + tensor = np.squeeze(tensor) + was_squeezed = True + if is_label: + tensor = tensor.swapaxes(0,1).swapaxes(1,2) + tensor = cv2.flip(shuffle_lr(tensor), 1).reshape(tensor.shape) + tensor = tensor.swapaxes(0,1).swapaxes(1,2) + print(tensor.shape) + else: + tensor = cv2.flip(tensor, 1).reshape(tensor.shape) + if was_squeezed: + tensor = np.expand_dims(tensor, axis=0) + return torch.from_numpy(tensor) + + diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 00000000..17b7a11f --- /dev/null +++ b/setup.cfg @@ -0,0 +1,29 @@ +[bumpversion] +current_version = 0.1.0 +commit = True +tag = True + +[bumpversion:file:setup.py] +search = version='{current_version}' +replace = version='{new_version}' + +[bumpversion:file:face_alignment/__init__.py] +search = __version__ = '{current_version}' +replace = __version__ = '{new_version}' + +[bdist_wheel] +universal = 1 + +[flake8] +exclude = + .github, + examples, + docs, + .tox, + bin, + dist, + tools, + *.egg-info, + __init__.py, + *.yml +max-line-length = 160 \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 00000000..156425fc --- /dev/null +++ b/setup.py @@ -0,0 +1,55 @@ +from setuptools import setup +# To use consisten encodings +from codecs import open +from os import path + +here = path.abspath(path.dirname(__file__)) + +# Get the long description from the README file +with open(path.join(here, 'README.rst'), encoding='utf-8') as readme_file: + long_description = readme_file.read() + +requirements = [ + 'torch', + 'dlib>=19.5', + 'numpy', + 'scipy>=0.17.0', + 'opencv-python', + 'scikit-image' +] + +setup( + name='face_alignment', + version='0.1.0', + + description="Detector 2D or 3D face landmarks from Python", + long_description = long_description, + + # Author details + author="Adrian Bulat", + author_email="adrian.bulat@nottingham.ac.uk", + url="https://github.com/1adrianb/face-alignment" + + # Package info + packages=find_packages(exclude=('test',)), + + install_requires=requirements, + license='BSD', + zip_safe=True, + + classifiers=[ + 'Development Status :: 4 - Beta', + 'Intended Audience :: Developers', + 'License :: OSI Approved :: MIT License', + 'Natural Language :: English', + + # Supported python versions + 'Programming Language :: Python :: 2', + 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.3', + 'Programming Language :: Python :: 3.4', + 'Programming Language :: Python :: 3.5', + 'Programming Language :: Python :: 3.6', + ], +)