brunch

You can make anything
by writing

C.S.Lewis

by 유윤식 Sep 28. 2020

PySaprk:DataFrame(2)

#Spark #PySpark #ArrayType #Explode

DataFrame을 공부하고 있지.

저번 PySpark:DataFrame(1) 에 이어서 같은 데이터를 사용해보자.


여기서!

하나의 컬럼을 추가해서 오늘 배워볼 explode 함수를 써볼거야.


Code 01.


exam_data = [

    ('A0001', 1, ['A', 'B', 'C']), 

    ('A0002', 2, ['A', 'C', 'R']), 

    ('A0003', 3, ['R', 'V', 'S']), 

    ('A0011', 11, ['R', 'S', 'P']), 

    ('A0021', 21, ['B', 'C', 'F']), 

    ('A0101', 101, ['A', 'D', 'E']), 

    ('A0201', 201, ['E', 'F', 'G']), 

    ('A1001', 1001, ['E', 'K', 'S']), 

    ('A9001', 9001, ['K', 'E', 'S']),

]


exam_data_schema = StructType([

        StructField('CODE', StringType(), True),

        StructField('NUMBER', IntegerType(), True),

        StructField('SIG', ArrayType(StringType()), True)

    ])    

exam_dataframe = spark.createDataFrame(exam_data, exam_data_schema)


SIG 컬럼이 추가됨.


대충 위의 코드(Code 01)를 보면 알 수 있겠지만,

Array 내부에 String 데이터가 포함된 형태를 가짐.


자! 이제 새롭게 추가된 데이터를 가지고 여러가지 Python API 함수를 사용해보자.


먼저,

CODE 컬럼이 '1' 로 끝나면서 SIG 내에 C를 포함하지 않는 데이터를 찾아볼꺼야.


Code 02.


exam_dataframe.filter(col('CODE').like('%1') & ~array_contains(col('SIG'), 'C')).show()


나타난 결과를 보면 SIG 컬럼에 'C'가 없지?


두 조건을 하나의 filter 조건에 나타내기 위해서 & 조건을 걸었어.


이걸 보는사람이라면 스스로

'데이터를 어떻게 바꿔볼까?' 생각해보면서 

변형 할 데이터를 만들어 낼 수 있는 최적의 API 함수를 찾고

공부해보야함.


이번엔 반대로 explode() 를 먼저 살펴보자.


간단히 설명하면 SIG 컬럼의 list를 하나의 각 Row로 만들어 주는 함수임.


Code 03.


exam_dataframe.withColumn('SIG_EXPLODE', explode(col('SIG'))).show()


갑자기 데이터가 많이 늘어난 것 같지?

즉, 위의 데이터가 총 9개 이므로,

9 * 3 = 27, 총 27개의 Row가 만들어짐.


그럼,

여기서 SIG_EXPLODE 컬럼값이 'C' 인 Row 를 제거시키는 코드를 본자.


Code 04.


exam_dataframe.withColumn('SIG_EXPLODE', explode(col('SIG'))).filter(col('SIG_EXPLODE') != 'C').show()


SIG_EXPLODE 컬럼에 'C'가 사라졌지?


이쯤되면 filter, withColumn 함수는 익숙해질거야.

여기서,

진짜 중요한 의문을 가질 수 있어.


만약 'C' 값을 제거했다고 가정하고, 새로운 Array 데이터를 만들려고 한다면?


드디어!

groupBy 를 배워야 할 때가 되었어. 덤으로 collect_list 함수까지 확인해보자.


Code 05.


exam_dataframe.withColumn('SIG_EXPLODE', explode(col('SIG'))). \

filter(col('SIG_EXPLODE') != 'C'). \

groupBy(col('CODE')). \

agg(collect_list(col('SIG_EXPLODE')).alias('SIG_RE_COMPILE')).show()


데이터 전체는 9개로 동일한데,

기존 SIG 컬럼의 값에서 'C'를 제외한 값으로 변경되었음.


groupBy는 현재 데이터 내에서 기준이 되는 컬럼값으로,

그룹을 지어 데이터를 구분하는 문법임.


collect_list 는 말그대로 이해하면 됨.


이제,

위에서 도출한 결과에 빠진 컬럼의 값을 추가하자.


NUMBER 컬럼이 빠져있음.


Code 06.


exam_dataframe.withColumn('SIG_EXPLODE', explode(col('SIG'))). \

filter(col('SIG_EXPLODE') != 'C'). \

groupBy(col('CODE'), col('NUMBER')). \

agg(collect_list(col('SIG_EXPLODE')).alias('SIG_RE_COMPILE')).show()


간단하게 추가했음.


위에 활용한 함수에 대해서는 API DOCS 를 꼭! 확인해야함.

또한 다양한 사용법을 직접 실습해보야함.


다음에는,

두 개의 테이블을 만들어 JOIN 관련 함수를 공부해보자.

브런치는 최신 브라우저에 최적화 되어있습니다. IE chrome safari