기록의 정석

개발/트러블슈팅

[pyspark] TypeError: Column is not iterable 에러 트러블 슈팅

sakjung 2023. 5. 12. 00:22

 

에러 발생 정보

  • Zeppelin 에서 pyspark 개발 중, TypeError: Column is not iterable 에러 발생
  • 에러가 발생한 코드:
# ... 생략 ...
.withColumn(
    "representative_artists",
    when(size(col("representative_artists")) == 0, lit(""))
    .when(
        size(col("representative_artists")) == 1,
        col("representative_artists").getItem(0),
    )
    .otherwise(
        concat_ws(
            " & ",
            array_join(
            	# 에러 발생 지점
                slice(col("representative_artists"), 1, size(col("representative_artists")) - 1),
                ", "
            ),
            col("representative_artists").getItem(
                size(col("representative_artists")) - 1
            ),
        )
    ),
)
# ... 생략 ...
  • StackTrace:
Fail to execute line 37:                         col("representative_artists"), 1, size(col("representative_artists")) - 1),
Traceback (most recent call last):
  File "/tmp/1683173873462-0/zeppelin_python.py", line 158, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 37, in <module>
  File "/usr/lib/spark/python/pyspark/sql/functions.py", line 2036, in slice
    return Column(sc._jvm.functions.slice(_to_java_column(x), start, length))
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1296, in __call__
    args_command, temp_args = self._build_args(*args)
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1260, in _build_args
    (new_args, temp_args) = self._get_args(args)
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1247, in _get_args
    temp_arg = converter.convert(arg, self.gateway_client)
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py", line 510, in convert
    for element in object:
  File "/usr/lib/spark/python/pyspark/sql/column.py", line 353, in __iter__
    raise TypeError("Column is not iterable")
TypeError: Column is not iterable

트러블슈팅

stacktrace 를 보면 SparkContext의 JVM slice function을 실행하는 과정에서 에러가 발생했다. slice 메서드의 Column 타입 파라미터에 대해서 iteration 하면서 터진 것으로 보인다.

 

그렇다면 slice 메서드의 파라미터에 Column 타입으로 들어가면 안되는 것이 있다는 것 같은데...

 

범인은 첫 번째 인자 (col("representative_artists")) 아니면 세 번째 인자 (size(col("representative_artists")) - 1) 둘 중 하나다.

 

한 번 까보자.

 

 

IDE 상에서 slice 메서드 구현체를 타고 들어가 살펴보아도 딱히 문제되는 부분이 보이지 않는다. 떡하니 타입 힌트와 주석으로 친절히 모든 파라미터에 Column 타입이 가능하다고 명시해놓았다.

 

뭔가 이상함을 느끼고 있던 그 때, StackTrace의 line number 가 눈에 들어왔다. slice 메서드가 터진 라인이 2036번째 라인인데... 왜 내가 보는 라인 (사진 속의 라인)은 3584번째 라인이지? 심지어 return 라인의 코드도 다르다!

 

이건 버젼 문제다.

 

곧바로 Zeppelin이 띄워져있는 Persistent EMR cluster의 master node 에 접속해서 spark 버젼을 확인했다.

 

 

3.0.1 버젼이다. 해당 버젼의 pyspark 소스를 살펴보면 실마리가 풀릴 것이다.

 

 

소스 코드를 까보니 3.0.1 버젼에서 slice 메서드는 파라미터에 Column 이나 str 타입이 허용되지 않는듯 하다. 다만 x 파라미터는 _to_java_column 메서드를 통해 Java Column으로 변환되기 때문에 이슈가 없다 (이미 JavaObject로 변환된 상태이므로).

 

반면, start와 length 파라미터는 그대로 bypass 된다. 이렇게 되면 py4j 라이브러리 상에서 JavaObject 가 아닌 arg에 대해서 JavaObject 로의 변환을 시도한다 (위의 에러 StackTrace 참고 / 관련 py4j 코드). 이 때, PySpark Column 타입이 Java List 로 간주 된다. 왜냐하면 PySpark Column 이 ListConverter의 can_convert 메서드 조건을 만족하기 때문 (관련 py4j 코드). ListConverter가 convert를 수행할 때 내부적으로 convert 하는 대상을 iteration 한다. 하지만 Column 타입은 iteration이 안되기 때문에 이 부분에서 에러가 발생한다. 그러므로 start나 length 파라미터 중 Column 타입이 있다면 Column is not iterable 에러가 발생하는 것이다.

 

그러므로 기존 코드에서 length 파라미터 (size(col("representative_artists")) - 1)가 에러를 발생시키는 지점이었던 것이다.

 

내 로컬 (IDE)의 pyspark 버젼도 한 번 살펴보자.

 

 

3.3.2 버젼이다.

 

이 버젼의 slice 메서드 소스를 더 살펴보면 x 파라미터 뿐만 아니라 start, length 파라미터도 _to_java_column 메서드를 통해 Java Column 으로 변환해주는 것을 확인할 수 있다. 그러므로 이 버젼에서는 Column이나 str 타입으로 파라미터를 받아도 Column is not iterable 에러가 나지 않는 것이다.

 

해결

가능한 해결 방안:

  • EMR Cluster의 Spark version upgrade
  • SQL 사용
  • expr function 사용

 

첫 번째 방법은, 고려해봄직은 하지만 굳이? 다. 좀 과하다는 생각이 들었다 (오버 엔지니어링). 지금과 같은 문제가 많이 생기거나 버젼으로 인한 더 크리티컬한 문제가 생기면 그 때 업그레이드 하는게 맞다는 생각이 든다.

 

두 번째 방법은, SQL 사용이다. 즉, pyspark.sql.functions의 메서드들을 사용하지 않고 순수 SQL로 쿼리를 짜는 것. 하지만 코드 스니펫에서 보이는 것 외에도 쿼리가 난해해질 수 있는 요소들 (e.g. 윈도잉)이 꽤 있어서 배제했다. 그리고 이것 때문에 기존 코드를 SQL로 모두 마이그레이션하는게… 그만한 공수를 들일 가치가 있을까 라는 생각도 들었다.

 

최종적으로 expr 을 사용해서 해결했다. 결국엔 length 파라미터로 들어가는 size(col("representative_artists")) - 1 이 int 타입으로 들어가게하면 해결되는 부분이다. 다음과 같이 expr을 사용하면 된다.

 

# ... 생략 ...
.withColumn(
    "representative_artists",
    when(size(col("representative_artists")) == 0, lit(""))
    .when(
        size(col("representative_artists")) == 1,
        col("representative_artists").getItem(0),
    )
    .otherwise(
        concat_ws(
            " & ",
            array_join(
                expr("""slice(representative_artists, 1, size(representative_artists) - 1)"""),
                ", "
            ),
            col("representative_artists").getItem(
                size(col("representative_artists")) - 1
            ),
        )
    ),
)
# ... 생략 ...

Reference

 

 

'개발 > 트러블슈팅' 카테고리의 다른 글

Airflow WorkerLostError 딥 다이브  (1) 2024.01.06