diff --git a/gen_cfeatures.py b/gen_cfeatures.py index 743491d..461ad67 100644 --- a/gen_cfeatures.py +++ b/gen_cfeatures.py @@ -1,24 +1,15 @@ import datetime import os, time -import pandas as pd import argparse import traceback, sys import re -from pathlib import Path -from typing import List, Tuple, Dict, Any, Optional, Callable, Protocol - -import numpy as np -from numpy import signedinteger -from PIL import Image -from huggingface_hub import hf_hub_download -from huggingface_hub.utils import HfHubHTTPError import concurrent.futures import json import os.path from functools import lru_cache -from typing import Union, List +from typing import Union, List, Optional import numpy as np from PIL import Image @@ -26,6 +17,7 @@ from imgutils.data import MultiImagesTyping, load_images, ImageTyping from imgutils.utils import open_onnx_model +from onnxruntime import InferenceSession try: from typing import Literal @@ -34,7 +26,6 @@ hf_fs = HfFileSystem() - _VALID_MODEL_NAMES = [ os.path.basename(os.path.dirname(file)) for file in hf_fs.glob('deepghs/ccip_onnx/*/model.ckpt') @@ -44,8 +35,8 @@ EXTENSIONS: List[str] = ['.png', '.jpg', '.jpeg', ".PNG", ".JPG", ".JPEG"] -BATCH_SIZE: int = 10 # max size for M1 MBA GPU -PROGRESS_INTERVAL: int = 1000 +BATCH_SIZE: int = 20 +PROGRESS_INTERVAL: int = 100 WORKER_NUM: int = 8 @@ -64,14 +55,10 @@ def print_traceback() -> None: class Predictor: def __init__(self) -> None: - pass - # self.last_loaded_repo: Optional[str] = None + self.embed_model: Optional[InferenceSession] = None + self.threshold: float = -1.0 # self.tagger_model: Optional[nn.Module] = None - # self.tag_names: Optional[List[str]] = None - # self.rating_index: Optional[List[int]] = None - # self.general_index: Optional[List[int]] = None - # self.character_index: Optional[List[int]] = None - # self.transform: Optional[Callable] = None + def list_files_recursive(self, dir_path: str) -> List[str]: file_list: List[str] = [] @@ -104,42 +91,8 @@ def list_files_recursive(self, dir_path: str) -> List[str]: # # return padded_image - # def load_model(self) -> None: - # if self.tagger_model is not None: - # return - # - # self.tagger_model = timm.create_model("hf-hub:" + TAGGER_VIT_MODEL_REPO).eval() - # state_dict = timm.models.load_state_dict_from_hf(TAGGER_VIT_MODEL_REPO) - # self.tagger_model.load_state_dict(state_dict) - # - # print("Loading tag list...") - # self.load_labels_hf(repo_id=TAGGER_VIT_MODEL_REPO) - # - # print("Creating data transform...") - # self.transform = create_transform(**resolve_data_config(self.tagger_model.pretrained_cfg, model=self.tagger_model)) - - def write_to_file(self, csv_line: str) -> None: - self.f.write(csv_line + '\n') - - # def gen_image_tensor(self, file_path: str) -> Tensor | None: - # img: Image.Image = None - # try: - # img = Image.open(file_path) - # img.load() - # img_tmp = self.prepare_image(img) - # # run the model's input transform to convert to tensor and rescale - # input: Tensor = self.transform(img_tmp) - # # NCHW image RGB to BGR - # input = input[[2, 1, 0]] - # return input - # except Exception as e: - # if img is not None: - # img.close() - # error_class: type = type(e) - # error_description: str = str(e) - # err_msg: str = '%s: %s' % (error_class, error_description) - # print(err_msg) - # return None + # def write_to_file(self, csv_line: str) -> None: + # self.f.write(csv_line + '\n') def filter_files_by_date(self, file_list: List[str], added_date: datetime.date) -> List[str]: filtered_list: List[str] = [] @@ -163,31 +116,35 @@ def _preprocess_image(self, image: Image.Image, size: int = 384): return data - @lru_cache() - def _open_feat_model(self, model): + #@lru_cache() + def _open_feat_model(self, model) -> InferenceSession: return open_onnx_model(hf_hub_download( - f'deepghs/ccip_onnx', - f'{model}/model_feat.onnx', - )) - - @lru_cache() - def _open_metric_model(self, model): - return open_onnx_model(hf_hub_download( - f'deepghs/ccip_onnx', - f'{model}/model_metrics.onnx', - )) - - @lru_cache() + f'deepghs/ccip_onnx', + f'{model}/model_feat.onnx', + ), + mode = 'CUDAExecutionProvider', + ) + + # @lru_cache() + # def _open_metric_model(self, model): + # return open_onnx_model(hf_hub_download( + # f'deepghs/ccip_onnx', + # f'{model}/model_metrics.onnx', + # )) + # + # @lru_cache() def _open_metrics(self, model): with open(hf_hub_download(f'deepghs/ccip_onnx', f'{model}/metrics.json'), 'r') as f: return json.load(f) - - @lru_cache() - def _open_cluster_metrics(self, model): - with open(hf_hub_download(f'deepghs/ccip_onnx', f'{model}/cluster.json'), 'r') as f: - return json.load(f) - - def ccip_batch_extract_features(self, images: MultiImagesTyping, size: int = 384, model: str = _DEFAULT_MODEL_NAMES): + # + # @lru_cache() + # def _open_cluster_metrics(self, model): + # with open(hf_hub_download(f'deepghs/ccip_onnx', f'{model}/cluster.json'), 'r') as f: + # return json.load(f) + + #def ccip_batch_extract_features(self, images: MultiImagesTyping, size: int = 384, model: str = _DEFAULT_MODEL_NAMES): + def ccip_batch_extract_features(self, images: List[np.ndarray], size: int = 384, + model: str = _DEFAULT_MODEL_NAMES) -> np.ndarray: """ Extracts the feature vectors of multiple images using the specified model. :param images: The input images from which to extract the feature vectors. @@ -207,9 +164,11 @@ def ccip_batch_extract_features(self, images: MultiImagesTyping, size: int = 384 >>> feat.shape, feat.dtype ((3, 768), dtype('float32')) """ - images = load_images(images, mode='RGB') - data = np.stack([self._preprocess_image(item, size=size) for item in images]).astype(np.float32) - output, = self._open_feat_model(model).run(['output'], {'input': data}) + # images = load_images(images, mode='RGB') + # data = np.stack([self._preprocess_image(item, size=size) for item in images]).astype(np.float32) + data = np.stack(images).astype(np.float32) + # output, = self._open_feat_model(model).run(['output'], {'input': data}) + output, = self.embed_model.run(['output'], {'input': data}) return output def ccip_extract_feature(self, image: ImageTyping, size: int = 384, model: str = _DEFAULT_MODEL_NAMES): @@ -234,19 +193,43 @@ def ccip_extract_feature(self, image: ImageTyping, size: int = 384, model: str = """ return self.ccip_batch_extract_features([image], size, model)[0] - _FeatureOrImage = Union[ImageTyping, np.ndarray] + def ccip_default_threshold(self, model: str = _DEFAULT_MODEL_NAMES) -> float: + """ + Retrieves the default threshold value obtained from model metrics in the Hugging Face model repository. + :param model: The name of the model to use for feature extraction. (default: ``ccip-caformer-24-randaug-pruned``) + The available model names are: ``ccip-caformer-24-randaug-pruned``, + ``ccip-caformer-6-randaug-pruned_fp32``, ``ccip-caformer-5_fp32``. + :type model: str + :return: The default threshold value obtained from model metrics. + :rtype: float + Examples:: + >>> from imgutils.metrics import ccip_default_threshold + >>> + >>> ccip_default_threshold() + 0.17847511429108218 + >>> ccip_default_threshold('ccip-caformer-6-randaug-pruned_fp32') + 0.1951224011983088 + >>> ccip_default_threshold('ccip-caformer-5_fp32') + 0.18397327797685215 + """ + return self._open_metrics(model)['threshold'] + + # _FeatureOrImage = Union[ImageTyping, np.ndarray] - def _p_feature(self, x: _FeatureOrImage, size: int = 384, model: str = _DEFAULT_MODEL_NAMES): - if isinstance(x, np.ndarray): # if feature - return x - else: # is image or path - return self.ccip_extract_feature(x, size, model) + # def _p_feature(self, x: _FeatureOrImage, size: int = 384, model: str = _DEFAULT_MODEL_NAMES): + # if isinstance(x, np.ndarray): # if feature + # return x + # else: # is image or path + # return self.ccip_extract_feature(x, size, model) def predict( self, - tensors: List[np.ndarray], - ) -> List[str]: - pass + images: List[np.ndarray], + ) -> np.ndarray: + print("Running inference...") + ret = self.ccip_batch_extract_features(images) + print("Processing results...") + return ret # batched_tensor = torch.stack(tensors, dim=0) # # print("Running inference...") @@ -267,28 +250,27 @@ def predict( # print("Processing results...") # preds = outputs.numpy() + def gen_image_ndarray(self, file_path) -> np.ndarray | None: + try: + img: Image.Image = load_images([file_path], mode='RGB')[0] + ret_arr: np.ndarray = self._preprocess_image(img, size=384) + return ret_arr + except Exception as e: + error_class: type = type(e) + error_description: str = str(e) + err_msg: str = '%s: %s' % (error_class, error_description) + print(err_msg) + return None + def process_directory(self, dir_path: str, added_date: datetime.date | None = None) -> None: file_list: List[str] = self.list_files_recursive(dir_path) print(f'{len(file_list)} files found') - # tag new images after specified date - if added_date is not None: - file_list = self.filter_files_by_date(file_list, added_date) - print(f'{len(file_list)} files found after {added_date}') - # backup tags-wd-tagger.txt with copying to tags-wd-tagger.txt.bak - if os.path.exists('tags-wd-tagger.txt'): - with open('tags-wd-tagger.txt', 'r', encoding='utf-8') as f: - with open('tags-wd-tagger.txt.bak', 'w', encoding='utf-8') as f_bak: - f_bak.write(f.read()) - else: - print('tags-wd-tagger.txt not found') - exit(1) - - self.f = open('tags-wd-tagger.txt', 'a', encoding='utf-8') - - self.load_model() + # self.load_model() + self.embed_model = self._open_feat_model(_DEFAULT_MODEL_NAMES) + self.threshold = self.ccip_default_threshold(_DEFAULT_MODEL_NAMES) - tensors: List[Tensor] = [] + ndarrs: List[np.ndarray] = [] fpathes: List[str] = [] start: float = time.perf_counter() last_cnt: int = 0 @@ -297,37 +279,40 @@ def process_directory(self, dir_path: str, added_date: datetime.date | None = No passed_idx: int = 0 with concurrent.futures.ThreadPoolExecutor(max_workers=WORKER_NUM) as executor: # dispatch get Tensor task to processes - future_to_path = {executor.submit(self.gen_image_tensor, file_path): file_path for file_path in + future_to_path = {executor.submit(self.gen_image_ndarray, file_path): file_path for file_path in file_list[0: BATCH_SIZE]} passed_idx += BATCH_SIZE while passed_idx < len(file_list): for future in concurrent.futures.as_completed(future_to_path): path = future_to_path[future] try: - tensor = future.result() - if tensor is None: + ndarr = future.result() + if ndarr is None: failed_cnt += 1 cnt -= 1 # continue - if tensor is not None: - tensors.append(tensor) + if ndarr is not None: + ndarrs.append(ndarr) fpathes.append(path) - if len(tensors) >= BATCH_SIZE - failed_cnt: + if len(ndarrs) >= BATCH_SIZE - failed_cnt: # submit load Tensor tasks for next batch end_idx = passed_idx + BATCH_SIZE if end_idx > len(file_list): end_idx = len(file_list) - future_to_path = {executor.submit(self.gen_image_tensor, file_path): file_path for file_path + future_to_path = {executor.submit(self.gen_image_ndarray, file_path): file_path for file_path in file_list[passed_idx: end_idx]} passed_idx = end_idx # run inference - results_in_csv_format: List[str] = self.predict(tensors) - for idx, line in enumerate(results_in_csv_format): - self.write_to_file(fpathes[idx] + ',' + line) - tensors = [] + # dimension of results: (batch_size, 768) + results: np.ndarray = self.predict(ndarrs) + # for idx, line in enumerate(results_in_csv_format): + # self.write_to_file(fpathes[idx] + ',' + line) + # for arr in results: + # print(arr.astype(float)) + ndarrs = [] fpathes = [] failed_cnt = 0