#튜터리얼 #찾다가없어서 #내가쓴다
수평적인 문화를 중시하므로,
반말을 사용함.
DaetaFrame 은
관계형 테이블(SQL 같은)과 같다고 보면됨.
장점은 다양한 함수가 있음.
좋은 튜터리얼이 있으면 소개해 주시면 감사하겠음.
우선,
SQL에 너무나 익숙하고 누구보다 SQL 하나는 잘 짤 수 있다.
이런 사람은 그냥 SQL을 주로 사용하시고.
Python API를 통해서 잔망잔망(?)한 데이터 전/후처리 로직을 만들어야 겠다는 사람은
꼭 DataFrame 기본 사용법을 알아야함.
Scala, Java, R 등의 언어를 사용해서 Spark를 활용할 수 있음.
여기에서는 Python을 사용함.
첫번째니까 전체적으로 어떻게 Python을 Spark답게 사용할 수 있는지.
그리고 간단하게 데이터를 읽어서 DataFrame으로 조작하고 활용 할 수 있는지.
알아보자.
어떻게 시작해야할까?
데이터를 좀 보고 시작해야지.
참고로 환경은 Linux 에서 진행함.
데이터도 저작권이 있어. 그래서 그냥 노가다로 내가 만들꺼야.
Code 01.
from pyspark.sql import SparkSession, Row, Window
from pyspark.sql.functions import col, udf, size, explode, array, coalesce, \
collect_list, collect_set, struct, concat, concat_ws, desc, exp, lit, lower, \
max, min, rand, row_number, dense_rank, struct, sum, count, avg, array_contains, \
unix_timestamp, from_unixtime, to_json
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, LongType, DoubleType, ArrayType, TimestampType
spark = SparkSession.builder \
.master("local") \
.appName("exam_ch01) \
.getOrCreate()
exam_data = [('A0001', 1), ('A0002', 2), ('A0003', 3), ('A0011', 11), ('A0021', 21), ('A0101', 101), ('A0201', 201), ('A1001', 1001), ('A9001', 9001)]
exam_dataframe = spark.createDataFrame(exam_data)
exam_dataframe.show()
이러면 결과가...
내가 원하던 SQL 쿼리 결과와 좀 달라.
컬럼명이 이상하거든.
Scala를 먼저 해본 사람은 _1, _2 라는 컬럼명에 익숙할꺼야.
실제로 저런 필드명을 사용하니까.
Python을 사용하는 사람은 어색해.
컬럼명은?
데이터의 스키마의 일종이지.
스키마란?
데이터에 대한 메타정보(?) 라고 해두자.
Code 02.
exam_data_schema = StructType([
StructField('CODE', StringType(), True),
StructField('NUMBER', IntegerType(), True),
])
exam_dataframe = spark.createDataFrame(exam_data, exam_data_schema)
exam_dataframe.show()
이제 눈에 거슬리지 않은 테이블 쿼리 결과가 보일거야.
데이터는 만들었으니까.
DataFrame으로 SQL을 대신할 수 있는 다양한 함수를 맛보기로 알아보자.
SQL 에서 합계, 위도우, 조인 등등 다양한 기능을 사용해서 데이터를 조합, 분해 할 수 있지.
여기서도 똑!같!아!
먼저, 가장 간단한게 데이터의 양을 살펴보자.
Code 03.
exam_dataframe.count()
--> 9
9개가 맞는지 하나씩 세어봐. 아마도 맞을거야.
다음으로 집계를 한 번 내어보자.
저기 NUMBER 컬럼에 있는 모든 숫자를 다 더하면 얼마일까?
Code 04.
exam_dataframe.agg(sum(col('NUMBER')).alias('sum_of_number')).show()
+-------------+ |sum_of_number| +-------------+ | 10342| +-------------+
이것도 눈으로 데충 훑었을 때 비슷한게. 맞을거야.
갑자기,
CODE 컬럼의 값이 1로 끝나는 녀석의 NUMBER 컬럼 값들의 평균이 궁굼해.
그냥 궁굼해졌어.
Code 05.
exam_dataframe.filter(col('CODE').like('%1')).agg(avg(col('NUMBER')).alias('avg_of_number')).show()
+------------------+ | avg_of_number| +------------------+ |1476.7142857142858| +------------------+
우선 집계를 내는 부분은 좀 간단해 보이지.
앞으로 조인, 그룹, 윈도우 등의 함수를 활용하면 좀 더 복잡하고 대규모의 데이터 뭉치들을 이쁘게 처리 할 수 있지.
위의 것들을 SQL로 바꿔서 실제 값이 같은 결과를 내는지 확인해보면,
Code 06.
exam_dataframe.createTempView('exam_df')
spark.sql('''
select count(*) from exam_df
''').show()
--> 9
spark.sql('''
select sum(NUMBER) from exam_df
''').show()
--> 10342
spark.sql('''
select AVG(NUMBER) from exam_df where CODE like "%1"
''').show()
--> 1476.7142857142858
마지막으로,
NUMBER 컬럼에 있는 모든 값에 +100 을 해주는데,
CODE 컬럼이 '1'로 끝나지 않는 경우에만 위 작업을 수행하는 거야.
Code 07.
# withColumn 함수를 활용
exam_dataframe.filter(~col('CODE').like('%1')).withColumn('NUMBER_PLUS_100', col('NUMBER')+100).show()
그냥 모든 데이터가 전부 보이게끔 할 수 없을까?
Code 08.
# withColumn, when 함수 활용
exam_dataframe.withColumn('NUMBER_PLUS_100', when(~col('CODE').like('%1'), col('NUMBER')+100).otherwise(col('NUMBER'))).show()
'A0002', 'A0003' 데이터에 대해서만 100 이 더해진 걸 확인했어.
이런식이야.
이렇게 작은 데이터를 가지고,
스스로 다양한 데이터 조작 시나리오를 생각하면,
정말 많은 공부를 할 수 있지.
다음엔 좀 더 큰 데이터 뭉치들을 통해서,
더 많은 데이터 조작을 해보자.