추천 시스템
파일 카탈로그에 업로드
EDA
추천 시스템에 고려해야 할 사항
- 추천 대상자의 성향을 파악하기(협업 기반 필터링)
- 추천 콘텐츠의 성향을 파악하기(콘텐츠 기반 필터링)
- Cold Start Problem
└ 대표적인 해결 모델: MAD(Multi-Armed Bandit)
└ 롱테일 현상(소수에게 인기있는 현상)
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("movie_analysis_uc")
.config("spark.some.config.option", "some-value")
.getOrCreate()
)
spark session 설정하기
# Define the base path for your data in Unity Catalog Volumes
uc_volume_path = "/Volumes/databricks_1dt008/default/movie"
# Load data from Unity Catalog Volume
try:
movies = spark.read.load(f"{uc_volume_path}/movies.csv", format='csv', header=True, inferSchema=True)
ratings = spark.read.load(f"{uc_volume_path}/ratings.csv", format='csv', header=True, inferSchema=True)
links = spark.read.load(f"{uc_volume_path}/links.csv", format='csv', header=True, inferSchema=True)
tags = spark.read.load(f"{uc_volume_path}/tags.csv", format='csv', header=True, inferSchema=True)
print("Data loaded successfully from Unity Catalog Volume.")
except Exception as e:
print(f"Error loading data from Unity Catalog Volume: {e}")
print(f"Please ensure CSV files (movies.csv, ratings.csv, links.csv, tags.csv) exist in {uc_volume_path}")
# dbutils.fs.ls(uc_volume_path) # Uncomment to list files in the volume for debugging
파일 로드
# Ensure ratings DataFrame is loaded before proceeding
if 'ratings' in locals():
tmp1 = ratings.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))
else:
print("Ratings DataFrame not loaded. Please check the data loading step.")
# >>> For the users that rated movies and the movies that were rated:
# >>> Minimum number of ratings per user is 20
# >>> Minimum number of ratings per movie is 1
유저 및 영화 별 데이터 확인
if 'ratings' in locals():
tmp1 = sum(ratings.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))
else:
print("Ratings DataFrame not loaded.")
# >>> 3446 out of 9724 movies are rated by only one user
단일 리뷰 영화 확인
if 'ratings' in locals():
tmp_q1 = ratings.select('userid').distinct().count()
print('There totally have {} users'.format(tmp_q1))
else:
print("Ratings DataFrame not loaded.")
# >>> There totally have 610 users
총 유저 수 확인
if 'movies' in locals():
tmp_q2 = movies.select('movieid').distinct().count()
print('There totally have {} movies'.format(tmp_q2))
else:
print("Movies DataFrame not loaded.")
# >>> There totally have 9742 movies
총 영화 수 확인
from pyspark.sql.functions import col, isnan # isnan might not be needed if rating is null
if 'movies' in locals() and 'ratings' in locals():
tmp_q3_rated_count = ratings.select('movieid').distinct().count()
total_movies_count = movies.select('movieid').distinct().count() # Re-calculate or use tmp_q2
print('{} movies have not been rated'.format(total_movies_count - tmp_q3_rated_count))
# Join movies with ratings to find movies with no ratings
# Ensure movieId columns are of the same type if join issues occur. inferSchema should help.
movies_with_ratings = movies.join(ratings, movies.movieId == ratings.movieId, "left_outer")
# Select movies where the rating (from the ratings table) is null
unrated_movies = (
movies_with_ratings
.where(ratings.rating.isNull())
.select(
movies.movieId,
movies.title
)
.distinct() # Added distinct
)
print("\nList of movies not rated before:")
unrated_movies.show()
else:
print("Movies or Ratings DataFrame not loaded.")
영화별 리뷰 사용자 수 확인
import pyspark.sql.functions as f
if 'movies' in locals():
# Explode the genres string into an array, then explode the array into multiple rows
all_genres_df = (
movies
.withColumn("genre_array", f.split(col("genres"), "\|"))
.select(
f.explode(col("genre_array")).alias("genre")
)
.distinct()
)
distinct_genres_list = [row.genre for row in all_genres_df.collect()]
hashset = set(distinct_genres_list)
print("Distinct genres found:")
print(hashset)
print("Total number of distinct genres: {}".format(len(hashset)))
else:
print("Movies DataFrame not loaded.")
영화 장르 종류 확인
from pyspark.sql.functions import expr, when
if 'movies' in locals() and 'hashset' in locals() and len(hashset) > 0:
q5_base = movies.select("movieid", "title", "genres")
# Create a column for each genre, with 1 if the movie has that genre, 0 otherwise
# This approach is more scalable and idiomatic in Spark than manual list iteration for DataFrame construction
genre_expressions = [
when(col("genres").rlike(genre.replace("(", "\\(").replace(")", "\\)")), 1).otherwise(0).alias(genre)
for genre in hashset if genre != '(no genres listed)' # Handle special characters if any in genre names for alias
]
# Add (no genres listed) separately if it exists, ensuring valid alias
if '(no genres listed)' in hashset:
genre_expressions.append(
when(col("genres") == '(no genres listed)', 1).otherwise(0).alias("no_genres_listed")
)
if genre_expressions: # Check if there are any genres to process
tmp_q5 = q5_base.select(col("movieid"), col("title"), *genre_expressions)
print("\nMovies with one-hot encoded genres:")
tmp_q5.show()
# Example: List "Drama" movies
# Adjust the alias if 'Drama' was changed (e.g., due to special characters)
drama_alias = "Drama" # Assuming 'Drama' is a valid alias
if drama_alias in tmp_q5.columns:
tmp_drama = tmp_q5.filter(col(drama_alias) == 1).select("movieid", "title")
print("\n{} movies are Drama, they are:".format(tmp_drama.count()))
tmp_drama.show()
else:
print(f"\nColumn '{drama_alias}' not found. Available columns: {tmp_q5.columns}")
else:
print("\nNo genres found to process for one-hot encoding.")
else:
print("Movies DataFrame or genre set not available for one-hot encoding.")
영화 장르별 원핫 인코딩(멀티 라벨 인코딩)
# Load ratings data using Spark DataFrame API from Unity Catalog Volume
# Define the base path for your data in Unity Catalog Volumes
uc_volume_path = "/Volumes/databricks_1dt008/default/movie"
# Load ratings data using Spark DataFrame API from Unity Catalog Volume
# This approach is compatible with Databricks Serverless compute.
ratings_file_path_uc = f"{uc_volume_path}/ratings.csv"
from pyspark.sql.functions import col
try:
# 1. Read CSV into a DataFrame
ratings_df_initial = (
spark
.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load(ratings_file_path_uc)
)
# 2. Select necessary columns and explicitly cast to desired types.
# The original RDD code effectively took (userId, movieId, rating).
# We drop rows if any of these key columns are null after casting.
# This DataFrame will be directly used for splitting.
ratings_for_split_df = ratings_df_initial.select(
col("userId").cast("integer"),
col("movieId").cast("integer"),
col("rating").cast("float")
).na.drop() # Drop rows where userId, movieId, or rating is null
print("Ratings data loaded into DataFrame for splitting:")
ratings_for_split_df.show(5)
ratings_for_split_df.printSchema()
except Exception as e:
print(f"Error loading ratings.csv into DataFrame from {ratings_file_path_uc}: {e}")
ratings_for_split_df = None # Ensure variable is defined for checks below
ratings.csv 파일 타입 캐스팅
if ratings_for_split_df:
# Split the DataFrame directly
(train_df, validation_df, test_df) = ratings_for_split_df.randomSplit([0.6, 0.2, 0.2], seed=7856)
# Cache the DataFrames for performance
train_df.cache()
validation_df.cache()
test_df.cache()
print("Data split into Training, Validation, and Test DataFrames.")
print(f"Training DataFrame count: {train_df.count()}")
print(f"Validation DataFrame count: {validation_df.count()}")
print(f"Test DataFrame count: {test_df.count()}")
print("\nSchema of training DataFrame:")
train_df.printSchema()
train_df.show(3)
else:
print("ratings_for_split_df DataFrame not available for splitting.")
# Define empty DataFrames or handle error appropriately if needed downstream
train_df, validation_df, test_df = [
spark.createDataFrame(
[],
ratings_for_split_df.schema
if ratings_for_split_df
else spark.read.format("csv").load(ratings_file_path_uc).schema
)
for _ in range(3)
]
테스트셋 분리
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
def train_ALS_df(train_data, validation_data, num_iters, reg_params, ranks_list): # Renamed ranks to ranks_list to avoid conflict
min_error = float('inf')
best_rank = -1
best_regularization = 0
best_model = None
for rank_val in ranks_list: # Use rank_val
for reg in reg_params:
als = ALS(
rank=rank_val, maxIter=num_iters, regParam=reg,
userCol="userId", itemCol="movieId", ratingCol="rating",
coldStartStrategy="drop", # Important for handling new users/items in validation/test
seed=42 # Added seed for reproducibility
)
try:
model = als.fit(train_data)
predictions = model.transform(validation_data)
# Remove NaNs from predictions if any, as evaluator cannot handle them
predictions_cleaned = predictions.filter(predictions.prediction.isNotNull())
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
if predictions_cleaned.count() == 0:
rmse_error = float('inf') # Or handle as a failed case
print(f"Warning: No valid predictions for rank={rank_val}, reg={reg}. All predictions were NaN or validation set was empty after coldStartStrategy.")
else:
rmse_error = evaluator.evaluate(predictions_cleaned)
print('Rank = {}, Regularization = {}: Validation RMSE = {}'.format(rank_val, reg, rmse_error))
if rmse_error < min_error:
min_error = rmse_error
best_rank = rank_val
best_regularization = reg
best_model = model
except Exception as e:
print(f"Error training ALS with rank={rank_val}, reg={reg}: {e}")
continue # Skip to next hyperparameter combination
if best_model:
print('\nThe best model has {} latent factors and regularization = {}'.format(best_rank, best_regularization))
else:
print('\nCould not find a best model. All training attempts failed or produced no valid predictions.')
return best_model
ALS 학습 코드
if 'train_df' in locals() and 'validation_df' in locals():
num_iterations = 10
ranks_param = [6, 8, 10, 12] # Renamed from 'ranks'
reg_params_param = [0.05, 0.1, 0.2, 0.4] # Renamed from 'reg_params'; 과적합 방지
import time
start_time = time.time()
final_model = train_ALS_df(train_df, validation_df, num_iterations, reg_params_param, ranks_param)
print('Total Runtime for Hyperparameter Tuning: {:.2f} seconds'.format(time.time() - start_time))
else:
print("Training and validation DataFrames (train_df, validation_df) are not available.")
final_model = None
학습 후 모델 할당
if final_model and 'test_df' in locals():
predictions_test = final_model.transform(test_df)
predictions_test_cleaned = predictions_test.filter(predictions_test.prediction.isNotNull())
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
if predictions_test_cleaned.count() == 0:
rmse_test = float('inf')
print("Warning: No valid predictions on the test set.")
else:
rmse_test = evaluator.evaluate(predictions_test_cleaned)
print("Test Set Root-mean-square error = " + str(rmse_test))
# Generate top 10 movie recommendations for each user
try:
userRecs = final_model.recommendForAllUsers(10)
print("\nTop 10 movie recommendations for each user (sample):")
#userRecs.show(5, truncate=False)
display(userRecs)
except Exception as e:
print(f"Error generating user recommendations: {e}")
# Generate top 10 user recommendations for each movie
try:
movieRecs = final_model.recommendForAllItems(10)
print("\nTop 10 user recommendations for each movie (sample):")
#movieRecs.show(5, truncate=False)
display(movieRecs)
except Exception as e:
print(f"Error generating movie recommendations: {e}")
else:
print("Final model or test DataFrame (test_df) is not available for testing.")
추천 결과 뽑기
매개변수 | 설명 | 주요 설정 값 |
rank | 잠재 요인(Latent Factors) 차원 수 | 10 ~ 100 (도메인에 따라 튜닝) |
maxIter | 최대 반복 횟수 (Epoch) | 10 ~ 20 (수렴 여부에 따라 증가) |
regParam | 정규화 파라미터 (λ) | 0.01 ~ 0.1 (오버피팅 방지) |
alpha | 암시적 피드백 모델에서만 사용 | 1.0 ~ 40.0 (암시적 데이터일 때 튜닝) |
implicitPrefs | 암시적 피드백 사용 여부 (True/False) | 명시적 데이터면 False, 암시적이면 True |
nonnegative | 행렬 분해 결과가 음수인지 여부 제한 | True/False (추천 정확도에 따라 실험) |
coldStartStrategy | 예측 시 NaN 처리 방식 (drop, nan) | drop 권장 (NaN 제거) |
seed | 랜덤 시드 (재현성 보장) | 고정 값 사용 권장 |
매개변수 설명 권장 값 / 튜닝 방향
일반적인 튜닝 순서는 다음을 따른다.
1. implicitPrefs 결정 (명시적/암시적 데이터 여부)
2. rank 튜닝 (낮은 값부터 시작해서 증가)
3. regParam 튜닝 (과적합 방지)
4. maxIter 조절 (수렴하지 않으면 증가)
5. (암시적 모델인 경우) alpha 튜닝
추천 모델 차이: 실시간 추천
1. 즉각적인 사용자 반응 반영
2. 복잡한 인프라 필요
3. 계산 비용 높음
추천 모델 차이: 배치 추천
1. 주기적 모델 업데이트
2. 리소스 효율적 사용
3. 최신성 다소 떨어짐
그밖에 추천 고려 사항
다양성(Diversity): 다양한 장르와 특성의 콘텐츠 추천
참신성(Serendiptiy): 예상치 못한 흥미로운 발견 제공
관련성과 균형: 정확성과 탐색 사이 최적점 찾기
사용자 만족도: 장기적 참여와 충성도 향상
청년 취업예측 모델 구축 및 지역별 청년정책 제안
2024년 SDC통계데이터센터 최우수상 보고서 부문 참고 자료
Minimum Viable Product
프로토타입?
ECommerce Customer Analysis
dd
'공부 > Microsoft Data School 1기' 카테고리의 다른 글
AzureDatabricks를 활용한 데이터 병렬 처리 7 (0) | 2025.05.22 |
---|---|
AzureDatabricks를 활용한 데이터 병렬 처리 6 (0) | 2025.05.21 |
AzureDatabricks를 활용한 데이터 병렬 처리 4 (0) | 2025.05.19 |
AzureDatabricks를 활용한 데이터 병렬 처리 3 (0) | 2025.05.16 |
AzureDatabricks를 활용한 데이터 병렬 처리 2 (0) | 2025.05.15 |