#GEOMETRIC #REC_SYSTEM
추천시스템에 Geometric 을 활용해보자.
USER : 1000
ITEM : 5000
으로 각각 세팅하고 Dummy 데이터를 만들고,
import torch
import numpy as np
from torch_geometric.data import Data
from collections import defaultdict
# 사용자와 상품의 개수
num_users = 1000
num_items = 5000
사용자의 메타 데이터
성별, 나이, 가입일
를 생성해보고,
# 사용자 메타 데이터
user_gender = torch.randint(0, 2, (num_users, 1), dtype=torch.float) # 0 or 1
user_age = torch.randint(18, 70, (num_users, 1), dtype=torch.float)
user_signup_days = torch.randint(0, 365*5, (num_users, 1), dtype=torch.float)
상품의 메타 데이터도 만들어보는데
상품 출시일, 분기별 판매량, 성별 선호도
를 생성한다.
# 아이템 메타데이터 (출시 날짜, 분기별 판매량, 남자/여자 선호도 비율)
item_release_days = torch.randint(0, 365*10, (num_items, 1), dtype=torch.float) # days since release
item_quarter_sales = torch.randint(0, 1000, (num_items, 4), dtype=torch.float) # sales in 4 quarters
item_gender_preference = torch.rand(num_items, 2)
위에 만든 데이터는 cat 으로 통합해줘서 학습에 사용해야 하는데
edge 까지 한번에 생성한다.
# 사용자와 아이템의 특징을 별도로 저장
user_features = torch.cat([user_gender, user_age, user_signup_days], dim=1)
item_features = torch.cat([item_release_days, item_quarter_sales, item_gender_preference], dim=1)
# 패딩을 통해 특징의 차원을 동일하게 만듭니다.
user_features_padded = torch.cat([user_features, torch.zeros((num_users, item_features.size(1) - user_features.size(1)))], dim=1)
item_features_padded = item_features
# 노드 특징 텐서 생성 (사용자와 아이템의 특징을 이어붙이지 않음)
x_user = user_features_padded
x_item = item_features_padded
# 사용자-아이템 상호작용을 임의로 생성 (각 사용자마다 5개의 랜덤 아이템을 선호한다고 가정)
edges = []
for user in range(num_users):
items = np.random.choice(num_items, np.random.choice(range(10, 100), size=1)[0], replace=False)
for item in items:
edges.append([user, num_users + item]) # 사용자와 아이템 노드로 연결
# 엣지 인덱스를 생성
edge_index = torch.tensor(edges, dtype=torch.long).t().contiguous()
추가로 Negative Sampling 을 통해 추가 데이터를 확보한다.
새로옵게 생성한 데이터는 edge 에도 추가시켜준다.
# 네거티브 샘플 추가
def negative_sampling(edge_index, num_nodes, num_neg_samples):
neg_edge_index = []
x = defaultdict(int)
a, b = np.unique(np.array(edges)[:, 1], return_counts=True)
# 리팩토링 필요 --> 너무 더티함
for i in range(num_users, num_users + num_items):
x[i] = 0
for i, v in zip(a, b):
x[i] = v
p = list(x.values()) / sum(list(x.values()))
for u in range(num_users):
tmp = np.random.choice(range(num_users, num_users + num_items), size=num_neg_samples, replace=False, p=p)
for i in tmp:
neg_edge_index.append([u, i])
return torch.tensor(neg_edge_index, dtype=torch.long).t().contiguous()
# 500 개 NEGATIVE SAMPLE 생성 시도 --> 알아서 양 조절 필요
neg_edge_index = negative_sampling(edge_index, num_users + num_items, 500)
edge_index = torch.cat([edge_index, neg_edge_index], dim=1)
최종적으로 x, y 데이터는 만드는데
모든걸 통합해서 geometric 에서 사용할 Data 객체도 생성한다.
# 사용자와 아이템의 노드 특징을 결합하여 하나의 텐서로 만듭니다.
x = torch.cat([x_user, x_item], dim=0)
# 라벨 생성 (양성 샘플은 1, 음성 샘플은 0)
y = torch.cat([torch.ones(len(edges)), torch.zeros(len(neg_edge_index[0]))])
# 데이터 객체 생성
data = Data(x=x, edge_index=edge_index, y=y)
Pytorch 를 사용하면 DataLoader 를 만들어야지.
from torch_geometric.loader import DataLoader
# DataLoader 생성
batch_size = 32
loader = DataLoader([data], batch_size=batch_size, shuffle=True)
모델은 정말 대충(?) 만들어두고, 어차피 나중에 열심히 수정해서 최적화 해야하니까.
import torch.nn.functional as F
from torch_geometric.nn import SAGEConv
class GCN(torch.nn.Module):
def __init__(self, input_dim, output_dim):
super(GCN, self).__init__()
self.conv1 = SAGEConv(input_dim, 32)
self.conv2 = SAGEConv(32, 64)
self.conv3 = SAGEConv(64, 64)
self.fc1 = torch.nn.Linear(64, 32)
self.fc2 = torch.nn.Linear(32, output_dim)
def forward(self, data):
x, edge_index = data.x, data.edge_index
x = F.dropout1d(self.conv1(x, edge_index))
x = F.relu(x)
x = F.dropout1d(self.conv2(x, edge_index))
x = F.relu(x)
x = F.dropout1d(self.conv3(x, edge_index))
x = F.relu(x)
x = self.fc1(x)
x = F.relu(x)
x = self.fc2(x)
return x
여기까지는 쉽다.
깜빡하고 SEED 고정을 안했다? 가장 상단에서 해주면 된다.
학습도 대충(!) 해보자.
import torch.optim as optim
data.train_mask = torch.tensor([True] * len(data.y), dtype=torch.bool)
optimizer = optim.Adam(model.parameters(), lr=0.001) #, weight_decay=5e-4)
def train():
model.train()
loss_mean = []
for data in loader:
optimizer.zero_grad()
out = model(data)
# 엣지 특징 추출
edge_pred = out[data.edge_index[0]] + out[data.edge_index[1]]
loss = F.cross_entropy(edge_pred, data.y.long())
loss_mean.append(loss.item())
loss.backward()
optimizer.step()
return np.mean(loss_mean)
epochs = 300
for epoch in range(epochs):
loss = train()
if epoch % 20 == 0:
print(f'Epoch {epoch}, Loss: {loss:.4f}')
## 최종 LOSS : 0.33
loss 만 트랙킹해도 대략 감이 온다.
학습이 끝나면 정확도 한 번 확인해보고,
model.eval()
with torch.no_grad():
for batch_data in loader:
out = model(batch_data)
edge_pred = out[batch_data.edge_index[0]] + out[batch_data.edge_index[1]]
_, pred = edge_pred.max(dim=1)
correct = pred.eq(batch_data.y.long()).sum().item()
accuracy = correct / batch_data.y.size(0)
print(f'Accuracy: {accuracy:.4f}')
## Accuracy: 0.9022
이제 사용자 단위의 추천 결과를 확인해보면,
# 모델 평가 및 모든 상품에 대한 선호 확률 계산
model.eval()
with torch.no_grad():
out = model(data)
user_id = 0
# 사용자의 노드 출력 벡터
user_vector = out[user_id]
# 모든 아이템에 대한 선호 확률 계산
item_vectors = out[num_users:num_users + num_items]
scores = torch.matmul(item_vectors, user_vector)
# 상위 10개의 아이템 선택
top_10_items = torch.topk(scores, 10).indices
print(f'Top 10 recommended items for user {user_id}:')
for idx in top_10_items:
print(f'Item {idx.item()} with score {scores[idx].item():.4f}')
이러면 결과가
혹시 몰라서 사용자 100번 추천 결과도 한 번 보면(user_id 만 0에서 100으로 바꾸면 된다.)
Dummy 데이터로 구현한 것 치고는 모양새는 좋다.
이상한 점수에 동일한 상품 번호가 추천되는 경우도 있다.
이럴때 데이터 전처리 과정부터 모델의 구성 및 형태, 배치 사이즈, 아웃라이어, 등등...
정말 많은 부분을 다시 처음부터 스텝별로 들여다 보아야 정상적인 추천 결과를 만들 수 있다.
마지막으로
Data 객체에서 user, item 노드들의 관계를 graphx 를 통해서 시각화 할 수 있다는데
뭔가 도움이 될 것 같았는데 아니다.