AzureDatabricks를 활용한 데이터 병렬 처리 4
Data Intelligence Platform on Azure
End-to-End Solution으로서 Azure Databricks의 포지션을 설명하는 구조
데이터 소스는 크게 3가지로 나눌 수 있다.
Structured(RDB 데이터), Semi-Structured(log, json), Un-Structured(text, video, audio, img)
Batch Data: 한 번에 하나의 단위씩 실행하는 기능, 처리하는 데이터
Stream Data: 실시간으로 실행하는 기능, 발생하는 데이터
Inteligence: 기계에 있어서 지능이란 무엇인가? 정의하기 나름
Data Intelligence: AI처럼 데이터 측면에서 데이터로 표현할 수 있는 지능(지적인 능력을 발휘함에 도움이 되는 무언가들)
Data Warehousing: 3가지의 데이터를 정리하는 데이터를 저장하는 거대한 공간
Data Mart: 많이 쓰는 데이터(조직의 사업부와 관련된 정보를 포함하는 데이터)를 저장하는 공간
Mosaic AI: 전문 AI 서비스를 모아둔 시스템, 어떤 모델을 통해서든 실제 서비스에 적용 가능한 에이전트 시스템을 구축
Vector Search: 텍스트, 이미지 등의 캡처 데이터를 벡터로 표현한 후 유사한 항목이나 데이터 포인트를 찾는 검색 기술
Unity Catalog: 데이터를 카탈로그화하여(메타데이터를 사용하여) 조직의 데이터 자산을 관리하는 플랫폼
Azure Databricks Machine Learning
Machine Learning
컴퓨터가 명시적인 프로그래밍 없이 데이터로부터 패턴을 학습하여 예측하거나 의사결정을 수행하는 기술
Scikit Learn | Spark MLlib | |
처리 방식 | 단일 노드 | 분산 처리 |
데이터 크기 | 소규모 데이터 | 대용량 데이터 |
사용 편의성 | 높음 | 중간 |
알고리즘 다양성 | 매우 높음 | 중간 |
확장성 | 제한적 | 높음 |
ㅇㅇ
구분 | 설명 | 모델 | |
보팅(Voting) | 서로 다른 알고리즘을 각 분류기에 투표하여 최종 결과를 예측한다. 하드 보팅과 소프트 보팅의 두 가지로 분류한다. Hard Voting: 다수결로 최종 예측 결정 Soft Voting: 각 모델의 예측 확률을 평균으로 최종 예측 결정 |
VotingClassifier | |
배깅(Bagging) | 동일한 알고리즘을 사용하되, 데이터 샘플을 달리 하여 여러 모델을 학습시켜 Voting을 수행한다. 여러 샘플에서 독립적으로 모델을 학습하고 결과를 결합한다. 특징: 분산 감소, 과적합 감소, 병렬 학습 가능 |
RandomForest | |
부스팅(Boosting) | 약한 학습기부터 순차적으로 학습시키고, 이전 단계에서 오분류된 데이터에 높은 가중치를 주어 이후 학습기에서 추가적으로 학습한다. 잘못된 예측에 집중하여 성능을 끌어올리는 방법이다. 특징: 편향 감소, 순차적 학습, 높은 성능 |
AdaBoost Gradient Boosting XGBoost LightGBM CatBoost |
|
스태킹(Stacking) | 서로 다른 모델들을 학습하여, 각 모델의 예측 결과를 다시 학습 데이터로 만들어 Meta Model로 재학습한다. 다양한 모델의 장점을 결합하여 성능을 향상하는 방법이다. |
- |
앙상블 학습 방법 유형
Spark ML 파이프라인 | ||||
데이터 로드 및 변환 Spark DataFrame을 사용하여 데이터를 로드하고 전처리한다. DataFrame 변환 API와 SQL을 활용하여 데이터를 준비한다. |
→ | 파이프라인 구성 요소 정의 Transformer, Estiimator, Pipeline 등의 객체들을 정의한다. VectorAssembler로 특성 벡터를 생성하고 모델 알고리즘을 선택한다. |
→ | 파이프라인 실행 및 평가 파이프라인을 훈련 데이터에 적용하고 모델을 생성한다. 검증 데이터로 모델을 평가하고 성능 지표를 계산한다. |
모델 선택 시 우선순위 판단 기준 및 실무적 적용 전략
평가 지표에 대한 선정 기준
ㆍ 단순 지표가 아닌 과정과 결과를 모두 고려하여 판단한다.
ㆍ 목표 설정을 가능한 한 구체적으로 하는 게 좋아보인다.
머신 러닝이나 AI 선택 시 균형있게 고려해야 할 종합적 요소
ㆍ 성능 지표: 정확도, F1 스코어 등 기본 평가 지표
ㆍ 비용 요소: 학습, 추론에 필요한 인프라 및 운영 비용
ㆍ 속도 고려: 예측 응답 시간, 학습 시간
ㆍ 해석 가능성: 모델 결과에 대한 설명 가능 여부
ㆍ 유지보수 용이성: 운영 중 재학습, 배포 난이도, 버전 관리 등
위의 사진과 같은 의사결정 프레임워크를 사용하여 점수를 매기고 보고한다.
이에 상부에서 정성과 정량 중 중점을 둘 것을 의논한 뒤 최종 결정한다.
가치 측정 부분에서 한계 효용(marginal utilitu) 법칙도 존재한다.
혹은 의료, 금융처럼 생명과 직결한 분야는 비용 증가를 정당화하는 경우도 있다.
실무 판단 가이드
ㆍ 성능 향상 실제 의미: 1 ~ 2% 정확도 차이가 실제 업무에 차이를 만드는가
ㆍ 비용과 속도 민감성: 배포 및 운영비 부담을 감당할 수 있는가
ㆍ 사용자와 조직 선호도: 해석 가능한 모델을 더 수용하지 않는가
ㆍ 향후 운영 및 유지보수: MLOps 체계에 부담은 없는가
실무 적용 전략 요약
1. 정확도 차이가 3% 미만
└ 속도, 비용, 해석 가능성 등 2순위 요소를 적극 고려
2. 업무상 민감한 결과 도출
└ 의료, 금융 등 도메인은 가능한 한 높은 성능을 우선
3. 정확도 차이가 미미하고 비용차이가 큰 경우
└ 경량화 모델 또는 단순 모델 채택이 유리
4. 운영, 협업이 중요한 조직 환경
└ 해석 가능성, 유지보수성 높은 모델 우선 고려
결론
균형 잡힌 판단: 단일 성능 지표에만 의존하지 않는 총합 접근
비즈니스 맥락: 실제 업무 환경과 요구사항 고려
협업 설득 도구: 객관적 판단 근거 제공으로 팀 의사결정 지원
사용자 요구: 최종 사용자의 필요와 선호도 반영
Penguin
%sh
rm -r /dbfs/ml_lab
mkdir /dbfs/ml_lab
wget -O /dbfs/ml_lab/penguins.csv https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/penguins.csv
df = spark.read.format("csv").option("header","true").load("/ml_lab/penguins.csv")
# display(df)
학습용 펭귄 데이터 로드
from pyspark.sql.types import *
from pyspark.sql.functions import *
data = df.dropna().select(
col("Island").astype("string"),
col("CulmenLength").astype("float"),
col("CulmenDepth").astype("float"),
col("FlipperLength").astype("float"),
col("BodyMass").astype("float"),
col("Species").astype("int")
)
# display(data)
스키마 정의
train, test = data.randomSplit([0.7, 0.3])
print ("Training Rows:", train.count(), " Testing Rows:", test.count())
데이터 테스트 셋 분리
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="Island", outputCol="IslandIdx")
indexedData = indexer.fit(train).transform(train).drop("Island")
# display(indexedData)
StringIndexer
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
# Create a vector column containing all numeric features
numericFeatures = ["CulmenLength", "CulmenDepth", "FlipperLength", "BodyMass"]
numericColVector = VectorAssembler(inputCols=numericFeatures, outputCol="numericFeatures")
vectorizedData = numericColVector.transform(indexedData)
# Use a MinMax scaler to normalize the numeric values in the vector
minMax = MinMaxScaler(inputCol = numericColVector.getOutputCol(), outputCol="normalizedFeatures")
scaledData = minMax.fit(vectorizedData).transform(vectorizedData)
# Display the data with numeric feature vectors (before and after scaling)
compareNumerics = scaledData.select("numericFeatures", "normalizedFeatures")
# display(compareNumerics)
VectorAssembler
MinMaxScaler
featVect = VectorAssembler(
inputCols=["IslandIdx", "normalizedFeatures"],
outputCol="featuresVector"
)
preppedData = featVect.transform(scaledData)[
col("featuresVector").alias("features"),
col("Species").alias("label")
]
# display(preppedData)
타켓 컬럼(Island)을 StringIndexer로 바꾼 IslandIdx 컬럼과 합쳐서 VectorAssembler 사용
- 최종 피처 벡터는 범주형 인덱스 "IslandIdx"와 정규화된 "normalizedFeatures"를 합쳐 "featuresVector" 컬럼으로 생성.
- 학습에 사용할 features와 label(타겟 변수, Species)을 최종적으로 준비.
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.3)
model = lr.fit(preppedData)
print ("Model trained!")
모델 선정 후 학습
# Prepare the test data
indexedTestData = indexer.fit(test).transform(test).drop("Island")
vectorizedTestData = numericColVector.transform(indexedTestData)
scaledTestData = minMax.fit(vectorizedTestData).transform(vectorizedTestData)
preppedTestData = featVect.transform(scaledTestData)[
col("featuresVector").alias("features"),
col("Species").alias("label")
]
# Get predictions
prediction = model.transform(preppedTestData)
predicted = prediction.select(
"features",
"probability",
col("prediction").astype("Int"),
col("label").alias("trueLabel")
)
# Display
# display(predicted)
테스트
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
# Simple accuracy
accuracy = evaluator.evaluate(prediction, {evaluator.metricName:"accuracy"})
print("Accuracy:", accuracy)
# Individual class metrics
labels = [0,1,2] # Adelie, Gentoo, Chinstrap
print("\nIndividual class metrics:")
for label in sorted(labels):
print ("Class %s" % (label))
# Precision
precision = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
evaluator.metricName:"precisionByLabel"})
print("\tPrecision:", precision)
# Recall
recall = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
evaluator.metricName:"recallByLabel"})
print("\tRecall:", recall)
# F1 score
f1 = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
evaluator.metricName:"fMeasureByLabel"})
print("\tF1 Score:", f1)
# Weighted (overall) metrics
overallPrecision = evaluator.evaluate(prediction, {evaluator.metricName:"weightedPrecision"})
print("\nOverall Precision:", overallPrecision)
overallRecall = evaluator.evaluate(prediction, {evaluator.metricName:"weightedRecall"})
print("Overall Recall:", overallRecall)
overallF1 = evaluator.evaluate(prediction, {evaluator.metricName:"weightedFMeasure"})
print("Overall F1 Score:", overallF1)
점수 확인
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
catFeature = "Island"
numFeatures = ["CulmenLength", "CulmenDepth", "FlipperLength", "BodyMass"]
# Define the feature engineering and model training algorithm steps
catIndexer = StringIndexer(
inputCol=catFeature,
outputCol=catFeature + "Idx",
handleInvalid="keep" # handleInvalid 추가 권장
)
numVector = VectorAssembler(inputCols=numFeatures, outputCol="numericFeatures")
numScaler = MinMaxScaler(inputCol = numVector.getOutputCol(), outputCol="normalizedFeatures")
featureVector = VectorAssembler(
inputCols=["IslandIdx", "normalizedFeatures"],
outputCol="Features"
)
algo = LogisticRegression(labelCol="Species", featuresCol="Features", maxIter=10, regParam=0.3)
# Chain the steps as stages in a pipeline
pipeline = Pipeline(stages=[catIndexer, numVector, numScaler, featureVector, algo])
# Use the pipeline to prepare data and fit the model algorithm
model = pipeline.fit(train)
print ("Model trained!")
파이프라인화하여서 학습
prediction = model.transform(test)
predicted = prediction.select(
"Features",
"probability",
col("prediction").astype("Int"),
col("Species").alias("trueLabel")
)
display(predicted)
테스트
from py4j.protocol import Py4JJavaError
try:
model.save("/models/penguin.model")
except Py4JJavaError:
model.write().overwrite().save("/models/penguin.model")
모델 저장
from pyspark.ml.pipeline import PipelineModel
# Model load
persistedModel = PipelineModel.load("/models/penguin.model")
# Make temp test data
newData = spark.createDataFrame ([{
"Island": "Biscoe",
"CulmenLength": 47.6,
"CulmenDepth": 14.5,
"FlipperLength": 215.0, # 실수형으로 일치
"BodyMass": 5400.0, # 실수형으로 일치
"Species": 0 # 예측용이므로 이 열은 실제로는 불필요, 모델은 Species 를 예측
}])
predictions = persistedModel.transform(newData)
display(predictions.select(
"Island",
"CulmenDepth",
"CulmenLength",
"FlipperLength",
"BodyMass",
col("prediction").alias("PredictedSpecies")
))
모델 로드 후 재사용