brunch

You can make anything
by writing

C.S.Lewis

by 유윤식 Sep 10. 2019

PySpark: SQL(with @UDF)

# 유연하게 SQL 작성 가능

PySpark를 사용하면

U.D.F를 사용해야 할 때가 있다.


snappy 무손실 압축 => parquet, orc 형태를 사용하는 데이터는 걱정이 없다.

마치 DataSet을 사용하는 것처럼 schema 구조가 그대로 살아있다.


문제는 csv, tsv 같은 파일에서 Spark로 데이터를 로드 할 때 발생한다.


예를 들어,

분명히 배열의 형태인데 문자로 인식되는 경우!

[1, 2, 3, 4] 를 원하지만 실제로는 " [1, 2, 3, 4] " 로 인식된다.


이때 udf를 활용해서 데이터를 array<string> 형식으로 바꿔 줄 수 있다.


보통 데이터를 Spark로 가져올 때


df = spark.read.format('com.databricks.spark.csv',)\

                        .options(header=True, inferschema=True, delimiter='\t')\

                        .load('./dataset.tsv')



또는,


df = spark.read.format('com.databricks.spark.csv',)\

                        .options(header=True, inferschema=True, delimiter=',')\

                        .load('./dataset.csv')



간단하게 가져온 데이터프레임의 스키마를 살펴보면,

c3 컬럼에 주목하자.


[VOCAL] 이 보인다. 분명히 배열의 모습인데 지정된 스키마는 스트링이다.


이러면 무슨 문제가 발생 하느냐!?


PySpark 함수를 통한 데이터 클링징이 어렵다.

예를 들어,


df.\

select(col('id').alias("c1"),

       col('score').alias("c2"),

       col('job').alias("c3")).\

where(size(col("c3")) > 0).\

show()


바로 오류 메시지가 보이는데,

Py4JJavaError: An error occurred while calling o5176.filter. : org.apache.spark.sql.AnalysisException: cannot resolve 'size(`c3`)' due to data type mismatch: argument 1 requires (array or map) type, however, '`c3`' is of string type.


이런 상황에서 udf를 적절히 사용해 주면 손쉽게 해결 할 수 있다.


먼저 udf를 선언하는데, (참고로 선언하는 방법은 여러, 다양, 가지각색이다.)


from pyspark.sql.functions import *

from pyspark.sql.types import *


str2arr = udf(lambda x: x.replace("[", "").replace("]", "").split(",") \

                        if x is not None else [], ArrayType(StringType()))


문자열을 배열 형태로 변환한다는 목적으로 만든 udf 함수이다.

udf 키워드를 통해 함수를 생성하면 자동으로 Spark에 등록되어 사용 할 수 있다.


위에서 선언한 udf를 통해서 'c3' 컬럼의 데이터를 변환한다.


df = df.\

select(col('id').alias("c1"),

       col('score').alias("c2"),

       str2arr(col('job')).alias("c3")).\

where(size(col("c3")) > 0)



변환된 스키마를 먼저 확인하면,

>> df.printSchema()

'c3'의 스키마가 문자열에서 배열로 바뀌었다.


실제로 데이터를 확인하면 이전에 보여진 오류가 아닌 실제 데이터를 볼 수 있다.

>> df.show()

이런 수고(?)를 안하는 방법은 처음부터 데이터를 DataSet으로 잘 만들어서

정의된 스키마를 사용하는 것이다.


끝.

작가의 이전글 PySpark: SQL(with @Partition)
작품 선택
키워드 선택 0 / 3 0
댓글여부
afliean
브런치는 최신 브라우저에 최적화 되어있습니다. IE chrome safari