Databircks

ㅇㅇ

 

ㅇㅇ

 

ㅇㅇ

 

ㅇㅇ

 

ephemeral(휘발성)

 

용어 정리

DDD(Domain Driven Design): 개발자와 비즈니스가 같은 언어로 소통하며 시스템을 설계할 수 있게 도와주는 방법론

Bounded Context: 어떤 특정한 의미, 모델, 용어들이 명확하게 통하는 경계 지점을 정의한 것

 

ㅇㅇ

 

NYC Taxi Code

# 파일 메타데이터를 데이터프레임으로 불러오기
df = spark.read.format("binaryFile").load("dbfs:/databricks-datasets/nyctaxi/tripdata/yellow/")

# 임시 뷰로 등록
df.createOrReplaceTempView("file_metadata")

데이터 로드 이후 SQL에서 접근하기 위한 임시 뷰 등록

 

from pyspark.sql.functions import col, lit, expr, when
from pyspark.sql.types import *
from datetime import datetime
import time

# Define schema
nyc_schema = StructType([
  StructField('Vendor', StringType(), True),
  StructField('Pickup_DateTime', TimestampType(), True),
  StructField('Dropoff_DateTime', TimestampType(), True),
  StructField('Passenger_Count', IntegerType(), True),
  StructField('Trip_Distance', DoubleType(), True),
  StructField('Pickup_Longitude', DoubleType(), True),
  StructField('Pickup_Latitude', DoubleType(), True),
  StructField('Rate_Code', StringType(), True),
  StructField('Store_And_Forward', StringType(), True),
  StructField('Dropoff_Longitude', DoubleType(), True),
  StructField('Dropoff_Latitude', DoubleType(), True),
  StructField('Payment_Type', StringType(), True),
  StructField('Fare_Amount', DoubleType(), True),
  StructField('Surcharge', DoubleType(), True),
  StructField('MTA_Tax', DoubleType(), True),
  StructField('Tip_Amount', DoubleType(), True),
  StructField('Tolls_Amount', DoubleType(), True),
  StructField('Total_Amount', DoubleType(), True)
])

rawDF = (
  spark
  .read
  .format('csv')
  .options(header=True)
  .schema(nyc_schema)
  .load("dbfs:/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_2019-12.csv.gz")
)

데이터 프레임으로 가져오기

 

%sql
-- 테이블 만들기
CREATE DATABASE IF NOT EXISTS taxidata;

-- rawDF 데이터프레임의 데이터를 taxidata 데이터베이스 내의 taxi_2019_12 테이블에 저장
rawDF.write.mode("overwrite").saveAsTable("taxidata.taxi_2019_12")

-- 경로 확인하기
DESCRIBE EXTENDED taxidata.taxi_2019_12;

SQL 테이블 만들고 불러오기

abfs(Azure Blob File System)

 

processedDF = (
  rawDF
  .withColumn('Year', expr('cast(year(Pickup_DateTime) as int)'))
  .withColumn('Month', expr('cast(month(Pickup_DateTime) as int)'))
)
processedDF.write.format('delta').mode('append').partitionBy('Year','Month').save("/delta/taxi")

데이터프레임 컬럼 변경

 

%fs
ls dbfs:/delta/taxi

파일 시스템 폴더 경로 확인

 

위의 결과를 보면, dbfs:/delta/taxi 경로에 연도별 파티션 폴더들이 생성되어 있습니다.

Year=2008, Year=2009 등 예상치 못한 연도 폴더들이 보이는 이유는 다음과 같은 원인이 있을 수 있습니다

(1) 잘못된 데이터 입력

Pickup_DateTime 컬럼에 잘못된 연도가 포함된 데이터가 있을 수 있습니다.

예를 들어, 데이터가 잘못 입력되어 2008년이나 2090년 같은 값이 들어갔을 수 있습니다.

(2) 데이터 전처리 과정의 오류

데이터 생성 또는 로딩 중에 Pickup_DateTime 값이 잘못 파싱되었을 수 있습니다.

이로 인해 실제 존재하지 않는 미래 연도나 과거 연도로 분류되었을 가능성이 있습니다.

(3) 테스트 데이터 포함

데이터에 테스트 또는 샘플 데이터가 포함되어 있을 수 있으며, 이 데이터들이 실제 연도 범위를 벗어나는 값을 가질 수 있습니다.

데이터에서 예상하지 못한 연도 값을 가진 레코드를 필터링하거나 Pickup_DateTime 컬럼의 값이 적절한지 확인하는 추가적인 검증 작업을 수행해야 합니다.

 

%python
(
  processedDF
  .filter("Year = 2019 and Month = 12")
  .write
  .format('delta')
  .mode('overwrite')
  .partitionBy('Year','Month')
  .save("/delta/taxiclean")
)

(
    processedDF
    .filter("Year <> 2019 and Month <> 12")
    .write.mode('overwrite')
    .saveAsTable("taxidata.taxi_excluding_dec_2019")
)

데이터 클리닝 이후 저장

 

# Read Delta Table : Delta 테이블을 DataFrame으로 로드
taxi_clean_df = spark.read.format("delta").load("/delta/taxiclean")

# 데이터 샘플 확인
taxi_clean_df.show(5)

# or in the SQL way, you can read delta table
spark.sql("SELECT * FROM delta.`/delta/taxiclean`")
display(spark.sql("SELECT * FROM delta.`/delta/taxiclean`"))

# 통계 정보 확인
taxi_clean_df.select("Fare_Amount", "Tip_Amount", "Total_Amount").describe().show()

데이터 확인하기

 

# 특정 조건으로 필터링
long_trips_df = taxi_clean_df.filter(taxi_clean_df.Trip_Distance > 10)
long_trips_df.show(5)

조건 필터링 후 확인하기

 

이제 NYC 택시 데이터의 2019년 11월 데이터를 읽어와 rawDF2라는 데이터프레임에 로드합니다. spark.read.format('csv'): 데이터를 CSV 형식으로 읽도록 지정합니다. options(header=True): 첫 번째 행을 헤더로 인식합니다. schema(nyc_schema): 이전에 정의한 nyc_schema 스키마를 사용하여 각 컬럼의 타입을 미리 지정합니다. load("dbfs:/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_2019-11.csv.gz"): 지정된 경로에서 압축된 CSV 파일을 불러옵니다. Lazy Execution (지연 실행): Spark에서는 데이터프레임에 대한 변환(transformation) 작업이 즉시 실행되지 않습니다. 데이터는 필요할 때(예: show(), count() 등 액션을 호출할 때)까지 로드되지 않으므로 메모리와 처리 속도를 효율적으로 사용할 수 있습니다. 여기서 rawDF2는 지연 실행 방식으로 정의되며, 실제 데이터 로드는 액션을 호출할 때 발생합니다.

 

MLflow

구성 요소

tracking에서 실행 환경까지 전부 기록 가능

projects에서 위의 환경을 모두 재현 가능

registry에서 버전 관리 가능

 

MLflow run() 없이 import만 했는데 자동으로 언제나 MLflow의 Experiment에 기록

default가 auto_logging 상태

 

%sh
rm -r /dbfs/mlflow_lab -f
mkdir /dbfs/mlflow_lab
wget -O /dbfs/mlflow_lab/penguins.csv https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/penguins.csv

MLflow 확인용 파일 다운로드

 

from pyspark.sql.types import *
from pyspark.sql.functions import *
   
# CSV 파일을 DataFrame으로 읽어옵니다. header 옵션은 첫 번째 줄을 컬럼 이름으로 사용하도록 합니다.
data = spark.read.format("csv").option("header", "true").load("/mlflow_lab/penguins.csv")

# 결측치가 있는 행을 제거하고, 각 컬럼을 적절한 데이터 타입으로 변환합니다.
data = data.dropna().select(
    col("Island").astype("string"),
    col("CulmenLength").astype("float"), # 부리 길이
    col("CulmenDepth").astype("float"),  # 부리 두께
    col("FlipperLength").astype("float"),# 지느러미 길이
    col("BodyMass").astype("float"),   # 몸무게
    col("Species").astype("int")  # 펭귄 종 (목표 변수)
)

# 데이터의 20%를 무작위로 샘플링하여 표시합니다.
display(data.sample(0.2))
   
# 데이터를 학습 데이터(70%)와 테스트 데이터(30%)로 무작위 분할합니다.
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1]
print ("Training Rows:", train.count(), " Testing Rows:", test.count())

가져와서 데이터 쌓기

 

# MLflow 실행(run) 시작
with mlflow.start_run():
	""" something_code """

mlflow 실행하기

 

catFeature = "Island" # 범주형 특성 이름
numFeatures = ["CulmenLength", "CulmenDepth", "FlipperLength", "BodyMass"] # 수치형 특성 이름 리스트

# 하이퍼파라미터 설정
maxIterations = 5  # 로지스틱 회귀의 최대 반복 횟수
regularization = 0.5 # 로지스틱 회귀의 정규화 파라미터

something code 부분 첫 번째

 

# 특성 공학 및 모델 단계 정의
# 1. StringIndexer: 범주형 특성 "Island"를 숫자 인덱스로 변환 ("IslandIdx" 컬럼 생성)
catIndexer = StringIndexer(inputCol=catFeature, outputCol=catFeature + "Idx")
# 2. VectorAssembler: 수치형 특성들을 단일 벡터 "numericFeatures"로 결합
numVector = VectorAssembler(inputCols=numFeatures, outputCol="numericFeatures")
# 3. MinMaxScaler: "numericFeatures" 벡터를 정규화하여 "normalizedFeatures" 컬럼 생성
numScaler = MinMaxScaler(inputCol = numVector.getOutputCol(), outputCol="normalizedFeatures")
# 4. VectorAssembler: 인덱싱된 범주형 특성과 정규화된 수치형 특성을 최종 "Features" 벡터로 결합
featureVector = VectorAssembler(inputCols=["IslandIdx", "normalizedFeatures"], outputCol="Features")
# 5. LogisticRegression: 로지스틱 회귀 모델 정의
algo = LogisticRegression(
    labelCol="Species", featuresCol="Features", maxIter=maxIterations, regParam=regularization
)

# 단계들을 파이프라인(Pipeline)의 스테이지(stages)로 연결
pipeline = Pipeline(stages=[catIndexer, numVector, numScaler, featureVector, algo])

something code 부분 두 번째

 

# 학습 파라미터 값 로깅
print ("Training Logistic Regression model...")
mlflow.log_param('maxIter', algo.getMaxIter()) # 최대 반복 횟수 로깅
mlflow.log_param('regParam', algo.getRegParam()) # 정규화 파라미터 로깅

# 파이프라인을 학습 데이터(train)에 적합(fit)시켜 모델 생성
model = pipeline.fit(train)

# 모델 평가 및 메트릭 로깅
prediction = model.transform(test) # 테스트 데이터로 예측 수행
metrics = ["accuracy", "weightedRecall", "weightedPrecision"] # 평가할 메트릭 리스트
for metric in metrics:
    evaluator = MulticlassClassificationEvaluator(
    	labelCol="Species", predictionCol="prediction", metricName=metric
    )
    metricValue = evaluator.evaluate(prediction) # 예측 결과 평가
    print("%s: %s" % (metric, metricValue))
    mlflow.log_metric(metric, metricValue) # 평가 메트릭 로깅

something code 부분 세 번째

 

# 모델 자체를 로깅
unique_model_name = "classifier-" + str(time.time()) # 고유한 모델 이름 생성
# MLflow에 Spark 모델을 로깅합니다. 모델, 아티팩트 경로 내 이름, conda 환경을 지정합니다.
mlflow.spark.log_model(model, unique_model_name, mlflow.spark.get_default_conda_env())
# DBFS 경로에 모델 저장 (선택 사항, log_model로도 충분히 MLflow에 기록됨)
modelpath = "/model/%s" % (unique_model_name) 
mlflow.spark.save_model(model, modelpath)

print("Experiment run complete.")

최종적으로 모델 자체를 mlflow에 로드

 

train_penguin_model(train, test, 10, 0.2)

원한다면 위의 동작을 함수로 빼서 따로 돌릴 수 있다.

 

dd

 

dd

 

dd

 

dd

 

dd

 

dd

 

dd

 

Endpoint

사용자가 호출하는 주소

웹서비스를 하는 것

특정 경로로 내가 원하는 정보를 보내는 것

외부에서 접근할 수 있는 접점(GPT)

 

토큰 생성

 

Optuna

from pyspark.sql.types import *
from pyspark.sql.functions import *
   
# CSV 파일을 Spark DataFrame으로 읽어옵니다. header 옵션은 CSV 파일의 첫 줄을 컬럼 이름으로 사용하도록 합니다.
data = spark.read.format("csv").option("header", "true").load("/hyperparam_tune_lab/penguins.csv")

# 결측값이 있는 행을 제거하고(dropna), 각 컬럼을 적절한 데이터 타입으로 변환합니다.
data = data.dropna().select(
    col("Island").astype("string"),
    col("CulmenLength").astype("float"), # 부리 길이
    col("CulmenDepth").astype("float"),  # 부리 깊이
    col("FlipperLength").astype("float"),# 지느러미 길이
    col("BodyMass").astype("float"),   # 몸무게
    col("Species").astype("int")       # 펭귄 종 (레이블)
)

# 데이터의 20%를 무작위로 샘플링하여 Databricks notebook에 시각적으로 표시합니다.
display(data.sample(0.2))
   
# 데이터를 training set (70%)과 test set (30%)으로 무작위로 분할합니다.
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1]
print ("Training Rows:", train.count(), " Testing Rows:", test.count())

데이터 로드

 

import optuna
import mlflow # 실험을 로깅하고 싶다면 사용합니다.
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
   
   
def objective(trial):
    # Hyperparameter 값 제안 (maxDepth 및 maxBins):
    # "MaxDepth"라는 이름의 hyperparameter에 대해 0에서 9 사이의 정수 값을 제안합니다.
    max_depth = trial.suggest_int("MaxDepth", 0, 9) 
    # "MaxBins"라는 이름의 hyperparameter에 대해 [10, 20, 30] 리스트 중 하나의 범주형 값을 제안합니다.
    max_bins = trial.suggest_categorical("MaxBins", [10, 20, 30])

    # Pipeline 구성 요소 정의
    cat_feature = "Island" # 범주형 feature 컬럼 이름
    num_features = ["CulmenLength", "CulmenDepth", "FlipperLength", "BodyMass"] # 수치형 feature 컬럼 이름 리스트
    
    # StringIndexer: 범주형 문자열 컬럼("Island")을 숫자 인덱스 컬럼("IslandIdx")으로 변환합니다.
    catIndexer = StringIndexer(inputCol=cat_feature, outputCol=cat_feature + "Idx")
    # VectorAssembler: 여러 수치형 feature 컬럼들을 단일 벡터 컬럼("numericFeatures")으로 결합합니다.
    numVector = VectorAssembler(inputCols=num_features, outputCol="numericFeatures")
    # MinMaxScaler: "numericFeatures" 벡터의 값들을 0과 1 사이로 정규화하여 "normalizedFeatures" 컬럼을 만듭니다.
    numScaler = MinMaxScaler(inputCol=numVector.getOutputCol(), outputCol="normalizedFeatures")
    # VectorAssembler: 인덱싱된 범주형 feature와 정규화된 수치형 feature들을 최종 "Features" 벡터 컬럼으로 결합합니다.
    featureVector = VectorAssembler(inputCols=[cat_feature + "Idx", "normalizedFeatures"], outputCol="Features")

    # DecisionTreeClassifier: 의사결정 트리 분류기를 설정합니다.
    # labelCol: 예측 대상 컬럼 (펭귄 종)
    # featuresCol: 학습에 사용될 feature 컬럼
    # maxDepth, maxBins: Optuna가 제안한 hyperparameter 값 사용
    dt = DecisionTreeClassifier(
        labelCol="Species",
        featuresCol="Features",
        maxDepth=max_depth,
        maxBins=max_bins
    )

    # Pipeline: 정의된 모든 변환 단계(Indexer, Assembler, Scaler)와 분류기(dt)를 순서대로 실행하는 파이프라인을 만듭니다.
    pipeline = Pipeline(stages=[catIndexer, numVector, numScaler, featureVector, dt])
    # 모델 학습: training data(train)를 사용하여 파이프라인을 학습시킵니다.
    model = pipeline.fit(train)

    # 정확도를 사용하여 모델 평가
    # 학습된 모델을 사용하여 test data에 대한 예측을 생성합니다.
    predictions = model.transform(test)
    # MulticlassClassificationEvaluator: 다중 클래스 분류 모델의 성능을 평가합니다.
    # metricName="accuracy": 평가 지표로 정확도를 사용합니다.
    evaluator = MulticlassClassificationEvaluator(
        labelCol="Species",
        predictionCol="prediction", # 모델이 생성한 예측값 컬럼
        metricName="accuracy"
    )
    # 예측 결과에 대한 정확도를 계산합니다.
    accuracy = evaluator.evaluate(predictions)

    # Optuna는 목적 함수를 최소화하므로, 정확도의 음수 값을 반환합니다.
    # 이렇게 하면 Optuna가 정확도를 최대화하는 방향으로 최적화를 수행하게 됩니다.
    return -accuracy

모델 학습 및 파이프라인 함수화

 

# 5번의 trial로 최적화 실행:
# Optuna study 객체를 생성합니다. study는 최적화 과정을 관리합니다.
study = optuna.create_study() 

# objective 함수를 사용하여 최적화를 시작합니다. n_trials=5는 5번의 다른 hyperparameter 조합을 시도하라는 의미입니다.
study.optimize(objective, n_trials=5)

print("최적화 실행에서 찾은 최상의 매개변수 값:")
# study.best_params는 최적화 과정에서 가장 좋은 성능(가장 낮은 손실 값, 즉 가장 높은 정확도)을 보인 hyperparameter 조합을 반환합니다.
print(study.best_params)

optuna 최적화

 

optuna 대략의 결과

 

AutoML

이거는 왜 실행하는 거지??

 

AutoML 실습을 위해 올릴 데이터 메뉴

 

데이터 올리기 전 확인하기

 

AutoML 실행

 

AutoML에 사용할 데이터 로드

 

상세 설정

 

timeout 시간만큼 기다리면 결과가 나온다

하단에 진행한 모델 결과들이 쌓인다

best model을 찾을 수도 있고, EDA를 볼 수도 있다.

 

best model

 

내리면 볼 수 있다

 

EDA

 

내리면 엄청 다양하게 볼 수 있다.

 

자동으로 이런 것들을 보여준다.

+ Recent posts