추천 시스템

파일 카탈로그에 업로드

 

EDA

 

추천 시스템에 고려해야 할 사항

- 추천 대상자의 성향을 파악하기(협업 기반 필터링)

- 추천 콘텐츠의 성향을 파악하기(콘텐츠 기반 필터링)

- Cold Start Problem

    └ 대표적인 해결 모델: MAD(Multi-Armed Bandit)

    └ 롱테일 현상(소수에게 인기있는 현상)

 

from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("movie_analysis_uc")
    .config("spark.some.config.option", "some-value")
    .getOrCreate()
)

spark session 설정하기

 

# Define the base path for your data in Unity Catalog Volumes
uc_volume_path = "/Volumes/databricks_1dt008/default/movie"

# Load data from Unity Catalog Volume
try:
    movies = spark.read.load(f"{uc_volume_path}/movies.csv", format='csv', header=True, inferSchema=True)
    ratings = spark.read.load(f"{uc_volume_path}/ratings.csv", format='csv', header=True, inferSchema=True)
    links = spark.read.load(f"{uc_volume_path}/links.csv", format='csv', header=True, inferSchema=True)
    tags = spark.read.load(f"{uc_volume_path}/tags.csv", format='csv', header=True, inferSchema=True)
    print("Data loaded successfully from Unity Catalog Volume.")
except Exception as e:
    print(f"Error loading data from Unity Catalog Volume: {e}")
    print(f"Please ensure CSV files (movies.csv, ratings.csv, links.csv, tags.csv) exist in {uc_volume_path}")
    # dbutils.fs.ls(uc_volume_path) # Uncomment to list files in the volume for debugging

파일 로드

 

# Ensure ratings DataFrame is loaded before proceeding
if 'ratings' in locals():
    tmp1 = ratings.groupBy("userID").count().toPandas()['count'].min()
    tmp2 = ratings.groupBy("movieId").count().toPandas()['count'].min()
    print('For the users that rated movies and the movies that were rated:')
    print('Minimum number of ratings per user is {}'.format(tmp1))
    print('Minimum number of ratings per movie is {}'.format(tmp2))
else:
    print("Ratings DataFrame not loaded. Please check the data loading step.")

# >>> For the users that rated movies and the movies that were rated:
# >>> Minimum number of ratings per user is 20
# >>> Minimum number of ratings per movie is 1

유저 및 영화 별 데이터 확인

 

if 'ratings' in locals():
    tmp1 = sum(ratings.groupBy("movieId").count().toPandas()['count'] == 1)
    tmp2 = ratings.select('movieId').distinct().count()
    print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))
else:
    print("Ratings DataFrame not loaded.")

# >>> 3446 out of 9724 movies are rated by only one user

단일 리뷰 영화 확인

 

if 'ratings' in locals():
    tmp_q1 = ratings.select('userid').distinct().count()
    print('There totally have {} users'.format(tmp_q1))
else:
    print("Ratings DataFrame not loaded.")

# >>> There totally have 610 users

총 유저 수 확인

 

if 'movies' in locals():
    tmp_q2 = movies.select('movieid').distinct().count()
    print('There totally have {} movies'.format(tmp_q2))
else:
    print("Movies DataFrame not loaded.")

# >>> There totally have 9742 movies

총 영화 수 확인

 

from pyspark.sql.functions import col, isnan # isnan might not be needed if rating is null

if 'movies' in locals() and 'ratings' in locals():
    tmp_q3_rated_count = ratings.select('movieid').distinct().count()
    total_movies_count = movies.select('movieid').distinct().count() # Re-calculate or use tmp_q2
    print('{} movies have not been rated'.format(total_movies_count - tmp_q3_rated_count))

    # Join movies with ratings to find movies with no ratings
    # Ensure movieId columns are of the same type if join issues occur. inferSchema should help.
    movies_with_ratings = movies.join(ratings, movies.movieId == ratings.movieId, "left_outer")

    # Select movies where the rating (from the ratings table) is null
    unrated_movies = (
        movies_with_ratings
        .where(ratings.rating.isNull())
        .select(
            movies.movieId,
            movies.title
        )
        .distinct() # Added distinct
    )                   
    print("\nList of movies not rated before:")
    unrated_movies.show()
else:
    print("Movies or Ratings DataFrame not loaded.")

영화별 리뷰 사용자 수 확인

 

import pyspark.sql.functions as f

if 'movies' in locals():
    # Explode the genres string into an array, then explode the array into multiple rows
    all_genres_df = (
        movies
        .withColumn("genre_array", f.split(col("genres"), "\|"))
        .select(
            f.explode(col("genre_array")).alias("genre")
        )
        .distinct()
    )

    distinct_genres_list = [row.genre for row in all_genres_df.collect()]
    hashset = set(distinct_genres_list) 

    print("Distinct genres found:")
    print(hashset)
    print("Total number of distinct genres: {}".format(len(hashset)))
else:
    print("Movies DataFrame not loaded.")

영화 장르 종류 확인

 

from pyspark.sql.functions import expr, when

if 'movies' in locals() and 'hashset' in locals() and len(hashset) > 0:
    q5_base = movies.select("movieid", "title", "genres")

    # Create a column for each genre, with 1 if the movie has that genre, 0 otherwise
    # This approach is more scalable and idiomatic in Spark than manual list iteration for DataFrame construction
    genre_expressions = [
        when(col("genres").rlike(genre.replace("(", "\\(").replace(")", "\\)")), 1).otherwise(0).alias(genre)
        for genre in hashset if genre != '(no genres listed)' # Handle special characters if any in genre names for alias
    ]
    # Add (no genres listed) separately if it exists, ensuring valid alias
    if '(no genres listed)' in hashset:
        genre_expressions.append(
            when(col("genres") == '(no genres listed)', 1).otherwise(0).alias("no_genres_listed")
        )

    if genre_expressions: # Check if there are any genres to process
        tmp_q5 = q5_base.select(col("movieid"), col("title"), *genre_expressions)
        print("\nMovies with one-hot encoded genres:")
        tmp_q5.show()

        # Example: List "Drama" movies
        # Adjust the alias if 'Drama' was changed (e.g., due to special characters)
        drama_alias = "Drama" # Assuming 'Drama' is a valid alias
        if drama_alias in tmp_q5.columns:
            tmp_drama = tmp_q5.filter(col(drama_alias) == 1).select("movieid", "title")
            print("\n{} movies are Drama, they are:".format(tmp_drama.count()))
            tmp_drama.show()
        else:
            print(f"\nColumn '{drama_alias}' not found. Available columns: {tmp_q5.columns}")
    else:
        print("\nNo genres found to process for one-hot encoding.")

else:
    print("Movies DataFrame or genre set not available for one-hot encoding.")

영화 장르별 원핫 인코딩(멀티 라벨 인코딩)

 

# Load ratings data using Spark DataFrame API from Unity Catalog Volume

# Define the base path for your data in Unity Catalog Volumes
uc_volume_path = "/Volumes/databricks_1dt008/default/movie"

# Load ratings data using Spark DataFrame API from Unity Catalog Volume
# This approach is compatible with Databricks Serverless compute.
ratings_file_path_uc = f"{uc_volume_path}/ratings.csv"
from pyspark.sql.functions import col

try:
    # 1. Read CSV into a DataFrame
    ratings_df_initial = (
        spark
        .read
        .format("csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .load(ratings_file_path_uc)
    )

    # 2. Select necessary columns and explicitly cast to desired types.
    #    The original RDD code effectively took (userId, movieId, rating).
    #    We drop rows if any of these key columns are null after casting.
    #    This DataFrame will be directly used for splitting.
    ratings_for_split_df = ratings_df_initial.select(
        col("userId").cast("integer"),
        col("movieId").cast("integer"),
        col("rating").cast("float")
    ).na.drop() # Drop rows where userId, movieId, or rating is null

    print("Ratings data loaded into DataFrame for splitting:")
    ratings_for_split_df.show(5)
    ratings_for_split_df.printSchema()

except Exception as e:
    print(f"Error loading ratings.csv into DataFrame from {ratings_file_path_uc}: {e}")
    ratings_for_split_df = None # Ensure variable is defined for checks below

ratings.csv 파일 타입 캐스팅

 

if ratings_for_split_df:
    # Split the DataFrame directly
    (train_df, validation_df, test_df) = ratings_for_split_df.randomSplit([0.6, 0.2, 0.2], seed=7856)

    # Cache the DataFrames for performance
    train_df.cache()
    validation_df.cache()
    test_df.cache()

    print("Data split into Training, Validation, and Test DataFrames.")
    print(f"Training DataFrame count: {train_df.count()}")
    print(f"Validation DataFrame count: {validation_df.count()}")
    print(f"Test DataFrame count: {test_df.count()}")
    print("\nSchema of training DataFrame:")
    train_df.printSchema()
    train_df.show(3)
else:
    print("ratings_for_split_df DataFrame not available for splitting.")
    # Define empty DataFrames or handle error appropriately if needed downstream
    train_df, validation_df, test_df = [
        spark.createDataFrame(
            [],
            ratings_for_split_df.schema
            if ratings_for_split_df
            else spark.read.format("csv").load(ratings_file_path_uc).schema
        )
        for _ in range(3)
    ]

테스트셋 분리

 

from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

def train_ALS_df(train_data, validation_data, num_iters, reg_params, ranks_list): # Renamed ranks to ranks_list to avoid conflict
    min_error = float('inf')
    best_rank = -1
    best_regularization = 0
    best_model = None
    for rank_val in ranks_list: # Use rank_val
        for reg in reg_params:
            als = ALS(
                rank=rank_val, maxIter=num_iters, regParam=reg,
                userCol="userId", itemCol="movieId", ratingCol="rating",
                coldStartStrategy="drop", # Important for handling new users/items in validation/test
                seed=42 # Added seed for reproducibility
            )
            try:
                model = als.fit(train_data)
                predictions = model.transform(validation_data)
                
                # Remove NaNs from predictions if any, as evaluator cannot handle them
                predictions_cleaned = predictions.filter(predictions.prediction.isNotNull())
                
                evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
                
                if predictions_cleaned.count() == 0:
                    rmse_error = float('inf') # Or handle as a failed case
                    print(f"Warning: No valid predictions for rank={rank_val}, reg={reg}. All predictions were NaN or validation set was empty after coldStartStrategy.")
                else:
                    rmse_error = evaluator.evaluate(predictions_cleaned)

                print('Rank = {}, Regularization = {}: Validation RMSE = {}'.format(rank_val, reg, rmse_error))
                
                if rmse_error < min_error:
                    min_error = rmse_error
                    best_rank = rank_val
                    best_regularization = reg
                    best_model = model
            except Exception as e:
                print(f"Error training ALS with rank={rank_val}, reg={reg}: {e}")
                continue # Skip to next hyperparameter combination

    if best_model:
        print('\nThe best model has {} latent factors and regularization = {}'.format(best_rank, best_regularization))
    else:
        print('\nCould not find a best model. All training attempts failed or produced no valid predictions.')
    return best_model

ALS 학습 코드

 

if 'train_df' in locals() and 'validation_df' in locals():
    num_iterations = 10
    ranks_param = [6, 8, 10, 12] # Renamed from 'ranks'
    reg_params_param = [0.05, 0.1, 0.2, 0.4] # Renamed from 'reg_params'; 과적합 방지
    import time

    start_time = time.time()
    final_model = train_ALS_df(train_df, validation_df, num_iterations, reg_params_param, ranks_param)
    print('Total Runtime for Hyperparameter Tuning: {:.2f} seconds'.format(time.time() - start_time))
else:
    print("Training and validation DataFrames (train_df, validation_df) are not available.")
    final_model = None

학습 후 모델 할당

 

if final_model and 'test_df' in locals():
    predictions_test = final_model.transform(test_df)
    predictions_test_cleaned = predictions_test.filter(predictions_test.prediction.isNotNull())

    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
    
    if predictions_test_cleaned.count() == 0:
        rmse_test = float('inf')
        print("Warning: No valid predictions on the test set.")
    else:
        rmse_test = evaluator.evaluate(predictions_test_cleaned)
    
    print("Test Set Root-mean-square error = " + str(rmse_test))

    # Generate top 10 movie recommendations for each user
    try:
        userRecs = final_model.recommendForAllUsers(10)
        print("\nTop 10 movie recommendations for each user (sample):")
        #userRecs.show(5, truncate=False)
        display(userRecs)
    except Exception as e:
        print(f"Error generating user recommendations: {e}")


    # Generate top 10 user recommendations for each movie
    try:
        movieRecs = final_model.recommendForAllItems(10)
        print("\nTop 10 user recommendations for each movie (sample):")
        #movieRecs.show(5, truncate=False)
        display(movieRecs)
    except Exception as e:
        print(f"Error generating movie recommendations: {e}")

else:
    print("Final model or test DataFrame (test_df) is not available for testing.")

추천 결과 뽑기

 

매개변수 설명 주요 설정 값
rank 잠재 요인(Latent Factors) 차원 수 10 ~ 100 (도메인에 따라 튜닝)
maxIter 최대 반복 횟수 (Epoch) 10 ~ 20 (수렴 여부에 따라 증가)
regParam 정규화 파라미터 (λ) 0.01 ~ 0.1 (오버피팅 방지)
alpha 암시적 피드백 모델에서만 사용 1.0 ~ 40.0 (암시적 데이터일 때 튜닝)
implicitPrefs 암시적 피드백 사용 여부 (True/False) 명시적 데이터면 False, 암시적이면 True
nonnegative 행렬 분해 결과가 음수인지 여부 제한 True/False (추천 정확도에 따라 실험)
coldStartStrategy 예측 시 NaN 처리 방식 (drop, nan) drop 권장 (NaN 제거)
seed 랜덤 시드 (재현성 보장) 고정 값 사용 권장

매개변수 설명 권장 값 / 튜닝 방향

일반적인 튜닝 순서는 다음을 따른다.

1. implicitPrefs 결정 (명시적/암시적 데이터 여부)

2. rank 튜닝 (낮은 값부터 시작해서 증가)

3. regParam 튜닝 (과적합 방지)

4. maxIter 조절 (수렴하지 않으면 증가)

5. (암시적 모델인 경우) alpha 튜닝

 

추천 모델 차이: 실시간 추천

1. 즉각적인 사용자 반응 반영

2. 복잡한 인프라 필요

3. 계산 비용 높음

 

추천 모델 차이: 배치 추천

1. 주기적 모델 업데이트

2. 리소스 효율적 사용

3. 최신성 다소 떨어짐

 

 

 

그밖에 추천 고려 사항

다양성(Diversity): 다양한 장르와 특성의 콘텐츠 추천

참신성(Serendiptiy): 예상치 못한 흥미로운 발견 제공

관련성과 균형: 정확성과 탐색 사이 최적점 찾기

사용자 만족도: 장기적 참여와 충성도 향상

 

청년 취업예측 모델 구축 및 지역별 청년정책 제안

2024년 SDC통계데이터센터 최우수상 보고서 부문 참고 자료

Minimum Viable Product

프로토타입?

 

ECommerce Customer Analysis

dd

+ Recent posts