Data, English, Travel

[TIL] pyspark(2) - 파이스파크 셸 사용해보기 본문

Data/Spark

[TIL] pyspark(2) - 파이스파크 셸 사용해보기

J._.haza 2023. 3. 17. 23:33

오늘도 조금씩 도전해보는 spark

오늘은 저번 설치에 이어 러닝스파크 책 p.23~ 34 내용을 실습해보았다.

 

gitbash에서 pyspark를 사용할 경우 (이유는 모르겠지만) 에러가 발생한다. 따라서 윈도우의 경우 cmd에서 `pyspark` 를 실행하여 파이스파이 셸을 사용해보았다.

 

# 첫 대화형 쉘 사용!
pyspark
spark.version  # version checking

 

나는 한 번 배워보는 입장인지라 일단 로컬 모드에서 구동 및 실습을 진행할 예정이다. 오늘은 대화형 쉘을 활용한 실습을 진행한다.

스파크의 연산은 작업으로 표현되고, 이 작업은 태스크라고 불리는 저수준의 RDD바이트 코드로 변환된다고 한다. 또, 실행은 스파크 이그제큐터에 의해 분산된다고 한다.

 

먼저 데이터 프레임을 사용한 간단한 예제는 아래와 같다.

# text 파일 읽어보기 (README.md)
# 해당 파일이 실행된 경로는 spark-3.3.2-bin-hadoop3/.

strings = spark.read.text('./README.md')  # 경로 내 md 파일을 strings에 저장한다
strings.show(10, truncate=False)  # 데이터 프레임의 show(10, false) 연산은 문자열을 자르지 않고 첫 번째 열 줄만 보여줌

여기서 truncate 인자는 출력줄이 너무 길면 자를지를 결정하는 인자.

 

쉘을 빠져나가기 위해서는 Ctrl-D를 누르면 된다.

 

스파크 애플리케이션의 이해?

책에서는 스파크 애플리케이션의 이해라고 SparkSession, job, stage, task 등등 다양한 개념이 소개되는데 우선 나는 실습 위주로 진행할 것이므로 가볍게(...) 읽고 넘어갔다. 대략 드라이버가 스파크 잡을 생성하고, 그 잡들에는 최소 실행단위인 스테이지가 있고 그 스테이지에는 연합으로 실행되는 테스크들이 있어 병렬처리가 된다고 한다.

 

트랜스포메이션, 액션, 지연 평가

스파크는 특이하게 어떤 연산을 시행했을 때 바로 적용되는 것이 아닌, 원본 데이터를 새로운 데이터 프레임으로 변형하여 연산결과를 되돌려주는 트랜스포메이션의 작업을 거친다. 원본데이터가 불변성의 특징을 가지기 때문이라고 한다.

 

스파크는 또 지연연산이라는 (나같은) 뒤늦게 액션이 실행되는 시점이나 실제 데이터에 접근할 시점까지 실제 실행을 미루는 작업이 이루어진다. 이것은 지연 평가를 통해 스파크가 사용자의 연계된 트랜스포메이션을 살펴보아 쿼리 최적화를 하도록 만들고 장애에 대한 데이터 내구성을 제공하는 기능을 한다. 따라서 스파크 연산에서 트랜스포메이션과 액션을 구분하는 것은 중요해보인다.

 

아래는 pyspark 예제이다.

# 아까 저장한 md 파일에서 filter() 트랜스포메이션과 count() 액션을 적용해본다.
# count()가 입력되기 전까지 아무것도 쉘에서 실제로 실행되지 않는다.

filtered = strings.filter(strings.value.contain("Spark"))  # Transformation
filtered.count()  # Action

위 명령어의 실행결과는 아래와 같이 나타난다.

`20`

 

좁은/넓은 트랜스포메이션(의존성)

filter()와 contains()는 하나의 파티션을 처리하여 데이터 교환 없이 결과 파티션을 생성해 내므로 (하나의 파티션을 내놓으므로) 좁은 트랜스포메이션이라고 한다.

그러나 groupBy()나 orderBy()는 스파크가 넓은 트랜스포메이션을 수행하게 하는데, 다른 파티션으로부터 데이터를 읽어들여 합치고 디스크에 쓰는 일을 하기 때문이라고 한다.

 

스파크 UI

pyspark 대화형 쉘을 실행하면 localhost:4040에서 실행되는 Spark UI

일단 이것은 스파크 내부 작업에 대한 디버깅 툴로서 생각하고 넘어간다. DAG 시각화도 확인할 수 있고, Job, Stage에서의 각각의 연산을 확인할 수 있다고 한다.(병렬처리가 어떻게 되는지 등) 일단 넘어간다.

 

 

 

spark 예제 java word count에서 계속 에러가 나므로 오늘은 여기까지...