공부/Microsoft Data School 1기

AzureDatabricks를 활용한 데이터 병렬 처리 4

_빌런 2025. 5. 19. 09:00

Data Intelligence Platform on Azure

End-to-End Solution으로서 Azure Databricks의 포지션을 설명하는 구조

데이터 소스는 크게 3가지로 나눌 수 있다.

Structured(RDB 데이터), Semi-Structured(log, json), Un-Structured(text, video, audio, img)

 

Batch Data: 한 번에 하나의 단위씩 실행하는 기능, 처리하는 데이터

Stream Data: 실시간으로 실행하는 기능, 발생하는 데이터

 

Inteligence: 기계에 있어서 지능이란 무엇인가? 정의하기 나름

Data Intelligence: AI처럼 데이터 측면에서 데이터로 표현할 수 있는 지능(지적인 능력을 발휘함에 도움이 되는 무언가들)

Data Warehousing: 3가지의 데이터를 정리하는 데이터를 저장하는 거대한 공간

Data Mart: 많이 쓰는 데이터(조직의 사업부와 관련된 정보를 포함하는 데이터)를 저장하는 공간

 

Mosaic AI: 전문 AI 서비스를 모아둔 시스템, 어떤 모델을 통해서든 실제 서비스에 적용 가능한 에이전트 시스템을 구축

Vector Search: 텍스트, 이미지 등의 캡처 데이터를 벡터로 표현한 후 유사한 항목이나 데이터 포인트를 찾는 검색 기술

Unity Catalog: 데이터를 카탈로그화하여(메타데이터를 사용하여) 조직의 데이터 자산을 관리하는 플랫폼

 

Azure Databricks Machine Learning

Machine Learning

컴퓨터가 명시적인 프로그래밍 없이 데이터로부터 패턴을 학습하여 예측하거나 의사결정을 수행하는 기술

 

  Scikit Learn Spark MLlib
처리 방식 단일 노드 분산 처리
데이터 크기 소규모 데이터 대용량 데이터
사용 편의성 높음 중간
알고리즘 다양성 매우 높음 중간
확장성 제한적 높음

ㅇㅇ

 

구분 설명 모델
보팅(Voting) 서로 다른 알고리즘을 각 분류기에 투표하여 최종 결과를 예측한다.
하드 보팅과 소프트 보팅의 두 가지로 분류한다.
Hard Voting: 다수결로 최종 예측 결정
Soft Voting: 각 모델의 예측 확률을 평균으로 최종 예측 결정
VotingClassifier
배깅(Bagging) 동일한 알고리즘을 사용하되, 데이터 샘플을 달리 하여
여러 모델을 학습시켜 Voting을 수행한다.
여러 샘플에서 독립적으로 모델을 학습하고 결과를 결합한다.
특징:  분산 감소, 과적합 감소, 병렬 학습 가능
RandomForest
부스팅(Boosting) 약한 학습기부터 순차적으로 학습시키고,
이전 단계에서 오분류된 데이터에 높은 가중치를 주어
이후 학습기에서 추가적으로 학습한다.
잘못된 예측에 집중하여 성능을 끌어올리는 방법이다.
특징: 편향 감소, 순차적 학습, 높은 성능
AdaBoost
Gradient Boosting
XGBoost
LightGBM
CatBoost
스태킹(Stacking) 서로 다른 모델들을 학습하여, 각 모델의 예측 결과를
다시 학습 데이터로 만들어 Meta Model로 재학습한다.
다양한 모델의 장점을 결합하여 성능을 향상하는 방법이다.
-

앙상블 학습 방법 유형

Spark ML 파이프라인
데이터 로드 및 변환

Spark DataFrame을 사용하여
데이터를 로드하고 전처리한다.
DataFrame 변환 API와 SQL을
활용하여 데이터를 준비한다.
파이프라인 구성 요소 정의

Transformer, Estiimator, Pipeline
등의 객체들을 정의한다.
VectorAssembler로 특성 벡터를
생성하고 모델 알고리즘을 선택한다.
파이프라인 실행 및 평가

파이프라인을 훈련 데이터에
적용하고 모델을 생성한다.
검증 데이터로 모델을 평가하고
성능 지표를 계산한다.

 

모델 선택 시 우선순위 판단 기준 및 실무적 적용 전략

평가 지표에 대한 선정 기준

ㆍ 단순 지표가 아닌 과정과 결과를 모두 고려하여 판단한다.

목표 설정을 가능한 한 구체적으로 하는 게 좋아보인다.

 

머신 러닝이나 AI 선택 시 균형있게 고려해야 할 종합적 요소

ㆍ 성능 지표: 정확도, F1 스코어 등 기본 평가 지표

비용 요소: 학습, 추론에 필요한 인프라 및 운영 비용

속도 고려: 예측 응답 시간, 학습 시간

해석 가능성: 모델 결과에 대한 설명 가능 여부

유지보수 용이성: 운영 중 재학습, 배포 난이도, 버전 관리 등

 

위의 사진과 같은 의사결정 프레임워크를 사용하여 점수를 매기고 보고한다.

이에 상부에서 정성과 정량 중 중점을 둘 것을 의논한 뒤 최종 결정한다.

 

가치 측정 부분에서 한계 효용(marginal utilitu) 법칙도 존재한다.

혹은 의료, 금융처럼 생명과 직결한 분야는 비용 증가를 정당화하는 경우도 있다.

 

실무 판단 가이드

ㆍ 성능 향상 실제 의미: 1 ~ 2% 정확도 차이가 실제 업무에 차이를 만드는가

ㆍ 비용과 속도 민감성: 배포 및 운영비 부담을 감당할 수 있는가

ㆍ 사용자와 조직 선호도: 해석 가능한 모델을 더 수용하지 않는가

ㆍ 향후 운영 및 유지보수: MLOps 체계에 부담은 없는가

 

실무 적용 전략 요약

1. 정확도 차이가 3% 미만

    └ 속도, 비용, 해석 가능성 등 2순위 요소를 적극 고려

2. 업무상 민감한 결과 도출

    └ 의료, 금융 등 도메인은 가능한 한 높은 성능을 우선

3. 정확도 차이가 미미하고 비용차이가 큰 경우

    └ 경량화 모델 또는 단순 모델 채택이 유리

4. 운영, 협업이 중요한 조직 환경

    └ 해석 가능성, 유지보수성 높은 모델 우선 고려

 

결론

균형 잡힌 판단: 단일 성능 지표에만 의존하지 않는 총합 접근

비즈니스 맥락: 실제 업무 환경과 요구사항 고려

협업 설득 도구: 객관적 판단 근거 제공으로 팀 의사결정 지원

사용자 요구: 최종 사용자의 필요와 선호도 반영

 

Penguin

%sh
rm -r /dbfs/ml_lab
mkdir /dbfs/ml_lab
wget -O /dbfs/ml_lab/penguins.csv https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/penguins.csv

df = spark.read.format("csv").option("header","true").load("/ml_lab/penguins.csv")
# display(df)

학습용 펭귄 데이터 로드

 

from pyspark.sql.types import *
from pyspark.sql.functions import *

data = df.dropna().select(
    col("Island").astype("string"),
    col("CulmenLength").astype("float"),
    col("CulmenDepth").astype("float"),
    col("FlipperLength").astype("float"),
    col("BodyMass").astype("float"),
    col("Species").astype("int")
)

# display(data)

스키마 정의

 

train, test = data.randomSplit([0.7, 0.3])
print ("Training Rows:", train.count(), " Testing Rows:", test.count())

데이터 테스트 셋 분리

 

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="Island", outputCol="IslandIdx")
indexedData = indexer.fit(train).transform(train).drop("Island")

# display(indexedData)

StringIndexer

 

from pyspark.ml.feature import VectorAssembler, MinMaxScaler

# Create a vector column containing all numeric features
numericFeatures = ["CulmenLength", "CulmenDepth", "FlipperLength", "BodyMass"]
numericColVector = VectorAssembler(inputCols=numericFeatures, outputCol="numericFeatures")
vectorizedData = numericColVector.transform(indexedData)

# Use a MinMax scaler to normalize the numeric values in the vector
minMax = MinMaxScaler(inputCol = numericColVector.getOutputCol(), outputCol="normalizedFeatures")
scaledData = minMax.fit(vectorizedData).transform(vectorizedData)

# Display the data with numeric feature vectors (before and after scaling)
compareNumerics = scaledData.select("numericFeatures", "normalizedFeatures")
# display(compareNumerics)

VectorAssembler

MinMaxScaler

 

featVect = VectorAssembler(
    inputCols=["IslandIdx", "normalizedFeatures"],
    outputCol="featuresVector"
)

preppedData = featVect.transform(scaledData)[
    col("featuresVector").alias("features"),
    col("Species").alias("label")
]

# display(preppedData)

타켓 컬럼(Island)을 StringIndexer로 바꾼 IslandIdx 컬럼과 합쳐서 VectorAssembler 사용

  • 최종 피처 벡터는 범주형 인덱스 "IslandIdx"와 정규화된 "normalizedFeatures"를 합쳐 "featuresVector" 컬럼으로 생성.
  • 학습에 사용할 features와 label(타겟 변수, Species)을 최종적으로 준비.

 

from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.3)
model = lr.fit(preppedData)
print ("Model trained!")

모델 선정 후 학습

 

# Prepare the test data
indexedTestData = indexer.fit(test).transform(test).drop("Island") 
vectorizedTestData = numericColVector.transform(indexedTestData)
scaledTestData = minMax.fit(vectorizedTestData).transform(vectorizedTestData)
preppedTestData = featVect.transform(scaledTestData)[
    col("featuresVector").alias("features"),
    col("Species").alias("label")
]

# Get predictions
prediction = model.transform(preppedTestData)
predicted = prediction.select(
    "features",
    "probability",
    col("prediction").astype("Int"),
    col("label").alias("trueLabel")
)

# Display
# display(predicted)

테스트

 

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# Simple accuracy
accuracy = evaluator.evaluate(prediction, {evaluator.metricName:"accuracy"})
print("Accuracy:", accuracy)

# Individual class metrics
labels = [0,1,2] # Adelie, Gentoo, Chinstrap
print("\nIndividual class metrics:")

for label in sorted(labels):
    print ("Class %s" % (label))

    # Precision
    precision = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
    evaluator.metricName:"precisionByLabel"})
    print("\tPrecision:", precision)

    # Recall
    recall = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
    evaluator.metricName:"recallByLabel"})
    print("\tRecall:", recall)

    # F1 score
    f1 = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
    evaluator.metricName:"fMeasureByLabel"})
    print("\tF1 Score:", f1)

# Weighted (overall) metrics
overallPrecision = evaluator.evaluate(prediction, {evaluator.metricName:"weightedPrecision"})
print("\nOverall Precision:", overallPrecision)
overallRecall = evaluator.evaluate(prediction, {evaluator.metricName:"weightedRecall"})
print("Overall Recall:", overallRecall)
overallF1 = evaluator.evaluate(prediction, {evaluator.metricName:"weightedFMeasure"})
print("Overall F1 Score:", overallF1)

점수 확인

 

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import LogisticRegression

catFeature = "Island"
numFeatures = ["CulmenLength", "CulmenDepth", "FlipperLength", "BodyMass"]

# Define the feature engineering and model training algorithm steps
catIndexer = StringIndexer(
    inputCol=catFeature,
    outputCol=catFeature + "Idx",
    handleInvalid="keep"            # handleInvalid 추가 권장
)

numVector = VectorAssembler(inputCols=numFeatures, outputCol="numericFeatures")
numScaler = MinMaxScaler(inputCol = numVector.getOutputCol(), outputCol="normalizedFeatures")
featureVector = VectorAssembler(
    inputCols=["IslandIdx", "normalizedFeatures"],
    outputCol="Features"
)
algo = LogisticRegression(labelCol="Species", featuresCol="Features", maxIter=10, regParam=0.3)

# Chain the steps as stages in a pipeline
pipeline = Pipeline(stages=[catIndexer, numVector, numScaler, featureVector, algo])

# Use the pipeline to prepare data and fit the model algorithm
model = pipeline.fit(train)

print ("Model trained!")

파이프라인화하여서 학습

 

prediction = model.transform(test)
predicted = prediction.select(
    "Features",
    "probability",
    col("prediction").astype("Int"), 
    col("Species").alias("trueLabel")
)

display(predicted)

테스트

 

from py4j.protocol import Py4JJavaError

try:
    model.save("/models/penguin.model")
except Py4JJavaError:
    model.write().overwrite().save("/models/penguin.model")

모델 저장

 

from pyspark.ml.pipeline import PipelineModel

# Model load
persistedModel = PipelineModel.load("/models/penguin.model")

# Make temp test data
newData = spark.createDataFrame ([{
    "Island": "Biscoe",
    "CulmenLength": 47.6,
    "CulmenDepth": 14.5,
    "FlipperLength": 215.0, # 실수형으로 일치
    "BodyMass": 5400.0, # 실수형으로 일치
    "Species": 0 # 예측용이므로 이 열은 실제로는 불필요, 모델은 Species 를 예측
 }])

predictions = persistedModel.transform(newData)

display(predictions.select(
    "Island",
    "CulmenDepth",
    "CulmenLength",
    "FlipperLength",
    "BodyMass", 
    col("prediction").alias("PredictedSpecies")
))

모델 로드 후 재사용