AzureDatabricks를 활용한 데이터 병렬 처리 2
Complex Transformations
메소드 | 설명 |
Mainpulate Complex Types | |
: | |
. | |
schema_of_json() | |
from_json() | |
* | |
Array Data | |
explode() | 배열의 요소를 여러 행으로 분리, 각 요소에 대해 새로운 행을 생성 |
size() | 각 행에 대해 배열의 요소 개수를 센 뒤 반환 |
collect_set() | 배열 내의 필드를 포함하여 필드에 대한 고윳값을 수집 |
flatten() | 여러 배열을 하나의 배열로 결합 |
array_distinct() | 배열에서 중복 요소를 제거 |
Data Combine and Restruct | |
JOIN | inner, outer, left, right, anti, cross, semi |
PIVOT |
Manipulate Complex Types
Spark SQL에는 JSON 문자열 또는 구조체 타입으로 저장된 중첩 데이터와 직접 상호 작용하는 기본 제공 기능이 있습니다.
- JSON 문자열의 하위 필드에 액세스하려면 쿼리에서 : 구문을 사용하세요.
- 구조체 타입의 하위 필드에 액세스하려면 쿼리에서 . 구문을 사용하세요.=
- **schema_of_json()**은 예제 JSON 문자열에서 도출된 스키마를 반환합니다.
- **from_json()**은 지정된 스키마를 사용하여 JSON 문자열이 포함된 열을 구조체 유형으로 구문 분석합니다.
- * 압축 해제는 구조체를 평면화하는 데 사용할 수 있습니다. **col_name.***은 **col_name**의 하위 필드를 자체 열로 추출합니다.
UDF
사용자 정의 함수(UDF)
사용자 정의 열 변환 함수
- Catalyst Optimizer로 최적화할 수 없음
- 함수가 직렬화되어 실행기로 전송됨
- 행 데이터는 Spark의 네이티브 바이너리 형식에서 역직렬화되어 UDF로 전달되고, 결과는 Spark의 네이티브 형식으로 다시 직렬화됨
- Python UDF의 경우, 실행기와 각 워커 노드에서 실행되는 Python 인터프리터 간에 추가적인 프로세스 간 통신 오버헤드가 발생함
# Our input/output is a string
@udf("string")
def first_letter_udf(email: str) -> str:
return email[0]
데코레이터랑
ㅇㅇ
dd
Spark
고차 함수 | 설명 |
FILTER() | 주어진 람다 함수를 사용하여 배열을 필터링 |
EXISTS() | 배열의 하나 이상의 요소에 대해 특정 명령문이 참인지 여부를 테스트 |
TRANSFORM() | 주어진 람다 함수를 사용하여 배열의 모든 요소를 변환 |
REDUCE() | 2개의 람다 함수로 배열의 요소를 버퍼에 병합, 단일 값으로 축소 후 최종 버퍼에 마무리 함수를 적용 |
Spark SQL 고차 함수
메소드 | 설명 |
sql() | 주어진 쿼리의 결과를 나타내는 DataFrame을 반환 |
table() | 지정된 테이블을 DataFrame으로 반환 |
read() | DataFrame으로 데이터를 읽는데 사용하는 DataFrameReader를 반환 |
range() | 시작부터 끝까지(제외) 범위에 있는 요소를 포함하는 열과 단계 값과 파티션 개수를 가진 DataFrame을 생성 |
createDataFrame() | 주로 테스트용으로 사용하는 튜플 목록에서 DataFrame을 생성 |
udf.register() | 사용자 정의 함수(UDF) 등록 |
Spark Session 메소드
Action Method | 설명 |
schema | DataFrame의 스키마를 엑세스 |
printSchema() | DataFrame의 스키마를 보기 좋은 형태로 출력 |
show() | 화면에 데이터를 출력 |
describe() summary() |
작업 메소드로 숫자 및 문자열 column에 대한 기본 통계를 계산하여 반환 |
first() head() |
작업 메소드로 DataFrame의 첫 번째 행을 반환 |
collect() | 모든 행을 포함하는 배열을 반환 |
take() | 처음 N개 행을 포함하는 배열을 반환 |
count() | 전체 행 개수를 반환 |
createOrReplace TempView() |
DataFrame을 기반으로 임시 뷰를 생성 해당 뷰의 수명은 DataFrame을 생성하는데 사용한 SparkSession에 연결 |
write() | 객체를 저장하는 메소드 |
DataFrame 작업 메소드 eager evaluation 동작
Transform Method | 설명 |
select() | 특정 컬럼만 선택 |
drop() | 열을 삭제한 뒤 새로운 DataFrame을 반환 |
withColumn() | 새로운 컬럼 추가 혹은 기존 컬럼을 변경하여 새로운 DataFrame 반환 |
withColumnRenamed() | 컬럼 이름을 변경한 새로운 DataFrame을 반환 |
filter() where() |
주어진 조건을 사용하여 행을 필터링 |
sort(), orderBy() | 주어진 표현식으로 정렬한 새로운 DataFrame을 반환 |
groupBy() | 지정한 열을 사용하여 DataFrame을 그룹핑하고 해당 열에 대한 집계를 실행 |
dropDuplicates() distinct() |
중복 행 제거 |
agg() | 집계 함수 사용 |
join() | 다양한 조인 사용 |
limit() | 반환할 행 개수 제한 |
alias() | 컬럼이나 테이블에 별칭 부여 |
DataFrame 변환 메소드 lazy evaluation 동작
열 연산자 메소드 | 설명 |
*, + , <, >= | 수학 및 비교 연산자 |
==, != | 같음 및 같지 않음 테스트 |
alias | 열에 별칭을 지정 |
cast, astype | 열을 다른 데이터 유형으로 변환 |
isNull, isNotNull, isNan | null인지, null이 아닌지, NaN인지 |
asc, desc | 열의 오름차순/내림차순 정렬 표현식을 반환 |
메서드설명
실습
CREATE TABLE IF NOT EXISTS events_jsons
(key BINARY, offset BIGINT, partition INT, timestamp BIGINT, topic STRING, value BINARY)
USING JSON
LOCATION '${DA.paths.kafka_events}'
타입 캐스팅
-- TODO
CREATE OR REPLACE TEMP VIEW events_pivot AS
SELECT user_id AS user,
cart, pillows, login, main, careers, guest, faq, down, warranty, finalize,
register, shipping_info, checkout, mattresses, add_item, press, email_coupon,
cc_info, foam, reviews, original, delivery, premium
FROM (
SELECT user_id, event_name
FROM events
) base_events
PIVOT (
COUNT(*) FOR event_name IN (
'cart', 'pillows', 'login', 'main', 'careers', 'guest', 'faq', 'down',
'warranty', 'finalize', 'register', 'shipping_info', 'checkout',
'mattresses', 'add_item', 'press', 'email_coupon', 'cc_info',
'foam', 'reviews', 'original', 'delivery', 'premium'
)
);
SQL 버전 PIVOT 실습 코드
%python
# TODO
(
spark.read.table("events")
.groupby("user_id")
.pivot("event_name")
.count()
.withColumnRenamed("user_id", "user")
.createOrReplaceTempView("events_pivot")
)
# display(spark.table("events_pivot"))
Spark 버전 PIVOT 실습 코드
-- TODO
CREATE OR REPLACE TEMP VIEW clickpaths AS
SELECT *
FROM events_pivot ep
JOIN transactions t
ON ep.user = t.user_id
SQL 버전 JOIN 실습 코드
%python
# TODO 강사
from pyspark.sql.functions import col
(
spark.read.table("events_pivot").join(
spark.table("transactions"),
col("events_pivot.user") == col("transactions.user_id"),
"inner"
).createOrReplaceTempView("clickpaths")
)
# display(spark.table("clickpaths"))
Spark 버전 JOIN 실습 강사 코드
# TODO 실습
events_pivot_df = spark.table("events_pivot")
transactions_df = spark.table("transactions")
(
events_pivot_df.join(
transactions_df,
on=events_pivot_df.user == transactions_df.user_id
).createOrReplaceTempView("clickpaths")
)
# display(spark.table("clickpaths"))
Spark 버전 JOIN 실습 본인 코드
SELECT * FROM (
SELECT
order_id,
FILTER (items, i -> i.item_id LIKE "%K") AS king_items
FROM sales)
WHERE size(king_items) > 0
고차 함수 중 FILTER
SELECT *,
TRANSFORM (
items, i -> CAST(i.item_revenue_in_usd * 100 AS INT)
) AS item_revenues
FROM sales
고차 함수 중 TRANSFORM
-- TODO
CREATE OR REPLACE TABLE sales_product_flags AS
SELECT
items,
EXISTS(items, i -> i.item_name LIKE "%Mattress") AS mattress,
EXISTS(items, i -> i.item_name LIKE "%Pillow") AS pillow
FROM sales
고차 함수 중 EXSITS
Quiz
1. Azure Databricks를 사용하는 첫 번째 단계는 무엇인가?
1. Spark 클러스터 생성
2. Azure Databricks 작업 영역 프로비저닝
3. Azure Blob Storage에 데이터 업로드
2. 데이터 처리를 담당하는 Azure Databricks 구성 요소는 무엇인가?
1. 작업 영역
2. 노트북
3. 클러스터
3. Azure Databricks에서 데이터 분석 작업을 완료한 후 비용을 최소화하려면 어떻게 해야 하는가?
1. Notebook 삭제
2. 클러스터에서 Notebook 분리
3. 클러스터 종료
4. Azure Databricks에서 DataFrame API를 사용하는 주요 목적은 무엇인가?
1. 시각화를 만들기 위해서
2. 구조화된 데이터를 탐색, 조작, 분석하기 위해서
3. 사용자 권한을 관리하기 위해서
5. Databricks에서 어떤 라이브러리를 주로 기계 학습에 사용하는가?
1. Spark SQL
2. Mllib
3. GraphX
6. Databricks에서 DataFrame으로 변환하지 않고도 SQL 쿼리를 실행하고 결과를 직접 시각화하기 위한 방법은 무엇인가?
1. Notebook
2. Databricks SQL
3. Databricks 대시보드
7. Databricks의 Cluster 모드 중 자동 종료(Auto Termiantion) 기능은 언제 유용한가?
1. 클러스터가 너무 커졌을 때
2. 사용자가 너무 많을 때
3. 클러스터를 일정 시간 사용하지 않을 때
4. 노트북이 너무 많을 때
8. Azure Databricks에서 데이터를 저장하고 관리하기 위한 주요 저장소는 무엇인가?
1. SQL Server
2. ADLS(Azure Data Lake Storage)
3. MongoDB
4. Excel
9. Databricks에서 .display(df) 함수는 어떤 동작을 하는가?
1. DataFrame을 CSV로 저장
2. DataFrame을 HTML로 변환
3. DataFrame을 표 형태로 보여줌
4. DataFrame을 삭제
10. Azure Databricks에서 테이블을 만드는 방법은 무엇인가?
1. CRAETE TABLE SQL 명령
2. Azure CLI
3. powershell
4. cmd
11. Databricks에서 %sql, %python, %scala는 무엇을 위한 건가?
1. 시각화 옵션
2. 언어 매직 커맨드
3. 클러스터 제어 코드
4. 보안 설정
12. Databricks에서 Notebook의 주요 기능이 아닌 것은?
1. SQL 쿼리 실행
2. 시각화
3. 머신러닝 모델 배포
4. 하드웨어 구매