Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/85-serve #95

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions airflow/dags/_recbole_inference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import math
from typing import List

import torch

from recbole.quick_start import load_data_and_model
from recbole.data.interaction import Interaction

class Inference:

def __init__(self, modelpath):

self.modelpath = modelpath

# 저장된 아티팩트 로드
self.config, self.model, self.dataset, train_data, valid_data, test_data = load_data_and_model(self.modelpath)



def prep_inference_data(self, user_id_and_feedbacks) -> Interaction:

item_id_list, item_length = [], []
for user_id_and_feedback in user_id_and_feedbacks:

# 레시피토큰 -> id 로 변환
recipe_ids = [self.dataset.token2id(self.dataset.iid_field, item_token) for item_token in user_id_and_feedback['feedbacks']]

item_id_list.append(recipe_ids)
item_length.append(len(recipe_ids))

item_dict = {
'item_id_list': torch.tensor(item_id_list, dtype=torch.int64).to(self.config['device']),
'item_length': torch.tensor(item_length, dtype=torch.int64).to(self.config['device'])
}

# Interaction 객체 생성
interaction = Interaction(item_dict)

return interaction

def sasrec_inference(self, user_id_and_feedbacks: list, k: int=20, batch_size: int=4096):

num_users = len(user_id_and_feedbacks)
recommended_result = []

for i in range(math.ceil(num_users/batch_size)):

# prep data
batch_data = user_id_and_feedbacks[i*batch_size: (i+1)*batch_size]
user_ids = [data['_id'] for data in batch_data]
inference_data = self.prep_inference_data(batch_data)

# prediction
scores = self.model.full_sort_predict(inference_data).view(num_users, -1)
probas = torch.sigmoid(scores)

# 확률이 높은 아이템 20개 추출
topk_proba, topk_item = torch.topk(probas, k, dim=1)

# recipe id to token
recipe_tokens = [
self.dataset.id2token(self.dataset.iid_field, item_token).tolist()\
for item_token in topk_item.detach().cpu().numpy()]

for user, recommended_items in zip(user_ids, recipe_tokens):
recommended_result.append({
'_id': user,
'recommended_items': recommended_items
})

return recommended_result

if __name__ == '__main__':

# 설정 파일과 모델 저장 경로 설정
modelpath = '/home/judy/level2-3-recsys-finalproject-recsys-01/ml/Sequential/saved/BERT4Rec-Mar-24-2024_00-51-09.pth'

from db_operations import fetch_user_history

# 데이터 얻기
user_id_and_feedbacks = fetch_user_history()

recommended_items = sasrec_inference(
modelpath,
user_id_and_feedbacks)

from bson import ObjectId
from pymongo import MongoClient
from db_config import db_host, db_port

# 추천 결과 확인하기
client = MongoClient(host=db_host, port=db_port)
db = client.dev

for recommended_item in recommended_items:
print(recommended_item['_id'])
for recipe in recommended_item['recommended_items']:
for r in db['recipes'].find({'recipe_sno': recipe}):
print(r['food_name'], end=' | ')
print()
2 changes: 1 addition & 1 deletion airflow/dags/batch_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

def fetch_and_push_user_histories(result_type:str=None, **context):
if result_type:
user_id_and_feedbacks = fetch_user_histores(result_type)
user_id_and_feedbacks = fetch_user_histories(result_type)
context["ti"].xcom_push(key='user_id_and_feedbacks_cb', value=user_id_and_feedbacks)
else:
user_id_and_feedbacks = fetch_user_histories()
Expand Down
9 changes: 5 additions & 4 deletions airflow/dags/db_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ def fetch_user_history(user_id, result_type='recipe_sno'):
# 추가 피드백 있는 경우
if 'feedback_history' in u:
feedbacks.append(u['feedback_history'])

# 피드백 _id를 recipe_sno 로 변경
recipe_snos = []
for recipe in feedbacks:
for r in db['recipes'].find({'_id': recipe}):
for r in db['recipes'].find({'_id': ObjectId(recipe)}):
recipe_snos.append(r['recipe_sno'])

if result_type == 'recipe_sno':
Expand Down Expand Up @@ -52,11 +53,11 @@ def fetch_user_histories(result_type='recipe_sno'):
feedbacks = u['initial_feedback_history']
# 추가 피드백 있는 경우
if 'feedback_history' in u:
feedbacks.append(u['feedback_history'])
feedbacks.extend(u['feedback_history'])
# 피드백 _id를 recipe_sno 로 변경
recipe_snos = []
for recipe in feedbacks:
for r in db['recipes'].find({'_id': recipe}):
for r in db['recipes'].find({'_id': ObjectId(recipe)}):
recipe_snos.append(r['recipe_sno'])

if result_type == 'recipe_sno':
Expand Down Expand Up @@ -104,7 +105,7 @@ def update_model_recommendations(recommended_results, collection_name, meta={},

db[collection_name].insert_many(data)

print('push data into db')
# print('push data into db')

def cb_inference(user_id_and_feedbacks: list, k: int=20, batch_size: int=4096):

Expand Down
18 changes: 16 additions & 2 deletions airflow/dags/recbole_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,20 @@ def prep_inference_data(user_id_and_feedbacks, dataset, config) -> Interaction:
item_id_list.append(recipe_ids)
item_length.append(len(recipe_ids))

# 모든 리스트를 동일한 길이로 패딩하기 위해 최대 길이 찾기
max_length = 100 #max(len(item) for item in item_id_list)

# 모든 리스트를 최대 길이에 맞게 패딩
padded_item_id_list = [[0] * (max_length - len(item)) + item for item in item_id_list]
target_list = [item[-100:] for item in padded_item_id_list]
# item_length = [ max_length for item in item_id_list]


print("prep inference data", len(target_list), len(item_length))
print("prep inference data", target_list, item_length)

item_dict = {
'item_id_list': torch.tensor(item_id_list, dtype=torch.int64).to(config['device']),
'item_id_list': torch.tensor(target_list, dtype=torch.int64).to(config['device']),
'item_length': torch.tensor(item_length, dtype=torch.int64).to(config['device'])
}

Expand All @@ -43,7 +55,9 @@ def sasrec_inference(modelpath: str, user_id_and_feedbacks: list, k: int=20, bat
inference_data = prep_inference_data(batch_data, dataset, config)

# prediction
scores = model.full_sort_predict(inference_data).view(num_users, -1)
scores = model.full_sort_predict(inference_data)#.view(num_users, -1)
print(scores.size())
print(num_users)
probas = torch.sigmoid(scores)

# 확률이 높은 아이템 20개 추출
Expand Down
51 changes: 51 additions & 0 deletions airflow/dags/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from __future__ import annotations
import bentoml
import threading # threading 라이브러리 추가

from db_operations import fetch_user_history, update_model_recommendations, cb_inference, blending_results
from _recbole_inference import Inference

@bentoml.service(
resources={"cpu": "6"},
traffic={"timeout": 20},
)
class OnlineServing:
def __init__(self) -> None:

# 설정 파일과 모델 저장 경로 설정
modelpath = '/home/judy/level2-3-recsys-finalproject-recsys-01/ml/Sequential/saved/SASRec-Mar-29-2024_19-28-51.pth'
self.inferencer = Inference(modelpath)

def save_data_async(self, data, collection_name, input_type='recipe_sno', meta={}):
# 데이터 저장 작업을 비동기적으로 수행
update_model_recommendations(
data,
collection_name=collection_name,
input_type=input_type,
meta=meta,
)

@bentoml.api
def recommend(self, user_id: str):

hybrid_data = fetch_user_history(user_id)
hybrid_data = self.inferencer.sasrec_inference(hybrid_data)

cb_data = fetch_user_history(user_id, result_type='recipe_id')
cb_data = cb_inference(cb_data)

data = blending_results(hybrid_data, cb_data)

update_model_recommendations(
data,
collection_name='model_recommendation_history_total',
input_type='recipe_id',
meta={'model_version': '0.0.1'},
)

# 비동기로 DB에 결과 기록
threading.Thread(target=self.save_data_async, args=(hybrid_data, 'model_recommendation_history_hybrid',)).start()
threading.Thread(target=self.save_data_async, args=(cb_data, 'model_recommendation_history_cb', 'recipe_id',)).start()


return { "message": "Prediction successful" }