brunch

You can make anything
by writing

C.S.Lewis

by 유윤식 Sep 28. 2020

PySaprk:DataFrame(1)

#튜터리얼 #찾다가없어서 #내가쓴다

수평적인 문화를 중시하므로,

반말을 사용함.


DaetaFrame 은

관계형 테이블(SQL 같은)과 같다고 보면됨.


장점은 다양한 함수가 있음.


좋은 튜터리얼이 있으면 소개해 주시면 감사하겠음.


우선,

SQL에 너무나 익숙하고 누구보다 SQL 하나는 잘 짤 수 있다.

이런 사람은 그냥 SQL을 주로 사용하시고.


Python API를 통해서 잔망잔망(?)한 데이터 전/후처리 로직을 만들어야 겠다는 사람은

꼭 DataFrame 기본 사용법을 알아야함.


Scala, Java, R 등의 언어를 사용해서 Spark를 활용할 수 있음.

여기에서는 Python을 사용함.


첫번째니까 전체적으로 어떻게 Python을 Spark답게 사용할 수 있는지.

그리고 간단하게 데이터를 읽어서 DataFrame으로 조작하고 활용 할 수 있는지.

알아보자.


어떻게 시작해야할까?

데이터를 좀 보고 시작해야지.


참고로 환경은 Linux 에서 진행함.


데이터도 저작권이 있어. 그래서 그냥 노가다로 내가 만들꺼야.


Code 01.


from pyspark.sql import SparkSession, Row, Window

from pyspark.sql.functions import col, udf, size, explode, array, coalesce, \

    collect_list, collect_set, struct, concat, concat_ws, desc, exp, lit, lower, \

    max, min, rand, row_number, dense_rank, struct, sum, count, avg, array_contains, \

    unix_timestamp, from_unixtime, to_json

from pyspark.sql.types import StructField, StructType, StringType, IntegerType, LongType, DoubleType, ArrayType, TimestampType


spark = SparkSession.builder \

    .master("local") \

    .appName("exam_ch01) \

    .getOrCreate()


exam_data = [('A0001', 1), ('A0002', 2), ('A0003', 3), ('A0011', 11), ('A0021', 21), ('A0101', 101), ('A0201', 201), ('A1001', 1001), ('A9001', 9001)]

exam_dataframe = spark.createDataFrame(exam_data)


exam_dataframe.show()


이러면 결과가...

내가 원하던 SQL 쿼리 결과와 좀 달라.

컬럼명이 이상하거든.

Scala를 먼저 해본 사람은 _1, _2 라는 컬럼명에 익숙할꺼야.

실제로 저런 필드명을 사용하니까.

Python을 사용하는 사람은 어색해.


컬럼명은?

데이터의 스키마의 일종이지.

스키마란?

데이터에 대한 메타정보(?) 라고 해두자.


Code 02.


exam_data_schema = StructType([

        StructField('CODE', StringType(), True),

        StructField('NUMBER', IntegerType(), True),

    ])    

exam_dataframe = spark.createDataFrame(exam_data, exam_data_schema)


exam_dataframe.show()


이제 눈에 거슬리지 않은 테이블 쿼리 결과가 보일거야.


데이터는 만들었으니까.

DataFrame으로 SQL을 대신할 수 있는 다양한 함수를 맛보기로 알아보자.


SQL 에서 합계, 위도우, 조인 등등 다양한 기능을 사용해서 데이터를 조합, 분해 할 수 있지.


여기서도 똑!같!아!


먼저, 가장 간단한게 데이터의 양을 살펴보자.


Code 03.


exam_dataframe.count()

--> 9


9개가 맞는지 하나씩 세어봐. 아마도 맞을거야.


다음으로 집계를 한 번 내어보자.

저기 NUMBER 컬럼에 있는 모든 숫자를 다 더하면 얼마일까?


Code 04.


exam_dataframe.agg(sum(col('NUMBER')).alias('sum_of_number')).show()


+-------------+ |sum_of_number| +-------------+ |        10342| +-------------+


이것도 눈으로 데충 훑었을 때 비슷한게. 맞을거야.


갑자기,

CODE 컬럼의 값이 1로 끝나는 녀석의 NUMBER 컬럼 값들의 평균이 궁굼해.

그냥 궁굼해졌어.


Code 05.


exam_dataframe.filter(col('CODE').like('%1')).agg(avg(col('NUMBER')).alias('avg_of_number')).show()


+------------------+ |     avg_of_number| +------------------+ |1476.7142857142858| +------------------+



우선 집계를 내는 부분은 좀 간단해 보이지.


앞으로 조인, 그룹, 윈도우 등의 함수를 활용하면 좀 더 복잡하고 대규모의 데이터 뭉치들을 이쁘게 처리 할 수 있지.


위의 것들을 SQL로 바꿔서 실제 값이 같은 결과를 내는지 확인해보면,


Code 06.


exam_dataframe.createTempView('exam_df')


spark.sql('''

    select count(*) from exam_df

''').show()

--> 9


spark.sql('''

    select sum(NUMBER) from exam_df

''').show()

--> 10342


spark.sql('''

    select AVG(NUMBER) from exam_df where CODE like "%1"

''').show()

--> 1476.7142857142858



마지막으로,

NUMBER 컬럼에 있는 모든 값에 +100 을 해주는데,

CODE 컬럼이 '1'로 끝나지 않는 경우에만 위 작업을 수행하는 거야.


Code 07.


# withColumn 함수를 활용

exam_dataframe.filter(~col('CODE').like('%1')).withColumn('NUMBER_PLUS_100', col('NUMBER')+100).show()



그냥 모든 데이터가 전부 보이게끔 할 수 없을까?


Code 08.


# withColumn, when 함수 활용

exam_dataframe.withColumn('NUMBER_PLUS_100', when(~col('CODE').like('%1'), col('NUMBER')+100).otherwise(col('NUMBER'))).show()



'A0002', 'A0003' 데이터에 대해서만 100 이 더해진 걸 확인했어.


이런식이야.

이렇게 작은 데이터를 가지고,

스스로 다양한 데이터 조작 시나리오를 생각하면,

정말 많은 공부를 할 수 있지.


다음엔 좀 더 큰 데이터 뭉치들을 통해서,

더 많은 데이터 조작을 해보자.

작가의 이전글 Elasticsearch with MySQL
브런치는 최신 브라우저에 최적화 되어있습니다. IE chrome safari