데이터 엔지니어링/Spark

Spark에서 Auto Increment Key 생성하는 방법

openkmj 2025. 4. 19. 08:09

Spark에서 데이터를 다룰 때는 보통 소스 데이터로부터 고유한 ID를 제공받는 경우가 많습니다. 

하지만 모든 데이터가 그렇게 친절하진 않은데요, 제공된 ID가 없거나 explode 함수 등을 통해 새로운 행을 생성하는 경우처럼 각 행에 대해 직접 고유한 ID를 생성해야 하는 상황이 종종 있습니다.

 

관계형 데이터베이스에 익숙하다면 아마 Auto Increment Key가 먼저 떠오를텐데요, 분산 환경에서는 이야기가 조금 다릅니다.

순차적으로 증가하는 값을 생성하기 위해서는 각 노드들이 공유하는 전역 상태가 필요한데, 이를 유지하기 위한 이나 동기화 같은 메커니즘이 성능 저하병목을 발생시킵니다.

 

그래서 대부분의 분산 시스템에서는 이를 지원하지 않고 분산 환경에 적합한 다른 방식의 키를 사용합니다. 

(Reference) Python uuid 구현으로 알아보는 UUID

 

Python uuid 구현으로 알아보는 UUID

Unique Key를 생성할 때 흔히 관계형 데이터베이스의 Auto increment 칼럼을 사용하는 방식을 떠올리곤 합니다.이 방식은 단일 서버 환경에서는 간단하고 효율적이지만 분산 환경에서는 충돌이나 마스

openkmj.tistory.com

 

 

하지만 단순히 고유한 값으로서의 ID가 아니라, row에 임의의 순서를 부여해야 하는 경우와 같이 요구사항에 따라 Auto Increment Key가 필요한 상황도 존재합니다.

 

이번 글에서는 Spark에서 순차적인 ID를 생성하는 방법에 대해 정리해보겠습니다.

 

 

1. monotonically_increasing_id()

df = spark.range(10).selectExpr("uuid() as uuid").repartition(3)
df = df.withColumn("tmp_id", monotonically_increasing_id())

df.show()

 

monotonically_increasing_id는 각 row에 대해 단조적으로 증가하는 고유한 64비트 정수를 생성하는 함수입니다.

sequentially가 아니라 monotonically 라는 점을 주목할 필요가 있습니다.

전역에서 증가하는 값을 생성하긴 하지만 우리가 일반적으로 기대하는 1, 2, 3, 4, ... 처럼 연속적으로 증가하는 값을 생성하진 않습니다.

 

코드를 통해 Spark 내부 동작을 자세히 알아보겠습니다.

// 파티션 초기화 함수
override protected def initializeInternal(partitionIndex: Int): Unit = {
  count = 0L
  partitionMask = partitionIndex.toLong << 33
}

// 파티션 내에서 Row 단위로 실행
override protected def evalInternal(input: InternalRow): Long = {
  val currentCount = count
  count += 1
  partitionMask + currentCount
}

 

ID는 상위 31비트에 파티션 ID를, 하위 33비트에는 파티션 내에서의 순번(count)을 넣는 방식으로 구성됩니다.

파티션 개수와 파티션 내 row 수가 각각 1B, 8B 이하여야 한다는 전제조건이 붙긴 하지만, 대부분의 상황에서 안전한 범위입니다.

(64비트 정수 80억개 -> 64GB. 파티션 크기가 최소 64GB 이상이어야 ...)

 

노드 간의 통신이 불필요하기 때문에 성능 상의 큰 이점이 있습니다.

각 row에 임시적으로 고유한 값을 붙이는 용도로 간편하고 빠르게 사용할 수 있는 좋은 선택지입니다.

참 똑똑한 아이디어네요.

 

다만 실행 환경파티션 상태에 따라 결과가 달라지는 non-deterministic 함수라는 점과 생성된 값이 연속적이지 않고 중간에 큰 간격이 있다는 점을 유의해야 합니다.

기존 데이터의 마지막 ID를 offset으로 활용해서 데이터를 합치거나 증분 적재하는 방식에는 사용할 수가 없겠네요.

 

(Reference)

 

2. row_number()

df = spark.range(10).selectExpr("uuid() as uuid").repartition(3)
df = df.withColumn("tmp_id", monotonically_increasing_id())

window = Window.orderBy("tmp_id")
df = df.withColumn("id", row_number().over(window))

df.show()

 

row_number는 일반적으로 SQL에서 사용되는 윈도우 함수와 동일합니다.

특정 칼럼으로 정렬해서 각 row에 순차적인 번호를 부여하는 방식으로 우리가 흔히 생각하는 1부터 연속적으로 증가하는 ID를 얻을 수 있습니다.

 

하지만 이 방식은 심각한 성능 이슈가 있습니다.

정렬을 수행한다는 점만으로도 비용이 큰데, 더 큰 문제는 row_number가 전체 데이터를 하나의 파티션으로 처리한다는 점입니다.

Physical Plan. "Exchange SinglePartition"

위와 같이 partitionBy를 지정하지 않고 윈도우 함수를 사용하는 경우, Spark는 전체 데이터를 하나의 파티션으로 처리하게 되며 이런 경고 메세지를 출력합니다.

WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

 

데이터가 작은 경우가 아니면 OutOfMemory가 발생하겠네요.

그럴거면 차라리 monotonically_increasing_id를 사용하는 게 간단합니다.

df.coalesce(1).withColumn("id", monotonically_increasing_id())

 

SQL이 익숙하다면 row_number를 직관적으로 사용할 수 있겠지만, 성능 상의 문제로 되도록 사용하지 않는 것이 좋겠습니다.

 

 

3. zipWithIndex()

앞서 monotonically_increasing_id를 살펴볼 때, 각 파티션의 row 개수를 알 수 있다면 연속적인 값을 만들 수 있지 않을까 하는 생각이 들었습니다.

상위 비트에 넣었던 파티션 ID 대신, 각 파티션의 시작 offset을 더하면 중간중간 간격 없이 연속된 값을 만들 수 있을 것 같습니다.

실제로 이런 아이디어로 동작하는 방식이 바로 zipWithIndex 입니다.

df = spark.range(10).selectExpr("uuid() as uuid").repartition(3)

rdd = df.rdd.zipWithIndex().map(lambda x: (x[1],) + tuple(x[0]))
df = rdd.toDF(["id"] + df.columns)

df.show()

 

zipWithIndex는 모든 row에 대해 0부터 시작하는 연속적이고 순차적인 ID를 부여합니다.

각 파티션 내의 row 개수를 먼저 센 뒤, 각 파티션에 부여할 offset을 미리 정하고 그 offset부터 순번을 올려가는 방식입니다.

 

zipWithIndex의 구현을 보겠습니다.

  def getIteratorSize(iterator: Iterator[_]): Long = {
    if (iterator.knownSize >= 0) iterator.knownSize.toLong
    else {
      var count = 0L
      while (iterator.hasNext) {
        count += 1L
        iterator.next()
      }
      count
    }
  }

  def getIteratorZipWithIndex[T](iter: Iterator[T], startIndex: Long): Iterator[(T, Long)] = {
    new Iterator[(T, Long)] {
      require(startIndex >= 0, "startIndex should be >= 0.")
      var index: Long = startIndex - 1L
      def hasNext: Boolean = iter.hasNext
      def next(): (T, Long) = {
        index += 1L
        (iter.next(), index)
      }
    }
  }

class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev) {

  /** The start index of each partition. */
  @transient private val startIndices: Array[Long] = {
    val n = prev.partitions.length
    if (n == 0) {
      Array.empty
    } else if (n == 1) {
      Array(0L)
    } else {
      prev.context.runJob(
        prev,
        Utils.getIteratorSize _,
        0 until n - 1 // do not need to count the last partition
      ).scanLeft(0L)(_ + _)
    }
  }

  override def getPartitions: Array[Partition] = {
    firstParent[T].partitions.map(x => new ZippedWithIndexRDDPartition(x, startIndices(x.index)))
  }

  override def compute(splitIn: Partition, context: TaskContext): Iterator[(T, Long)] = {
    val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition]
    val parentIter = firstParent[T].iterator(split.prev, context)
    Utils.getIteratorZipWithIndex(parentIter, split.startIndex)
  }
}

 

zipWithIndex는 두단계로 동작합니다.

 

먼저 zipWithIndex가 트리거되면 각 파티션의 크기를 계산합니다(Utils.getIteratorSize).

각 파티션의 크기를 구했다면 이를 바탕으로 누적합을 계산하여 startIndices에 저장합니다.

각 파티션의 row 수가 [3, 5, 2]라면 [0, 3, 8]이 저장되는 것이죠.

 

그 이후, 각 파티션은 자신에게 할당된 startIndex로부터 1씩 증가시키면서 row에 인덱스를 붙입니다(Utils.getIteratorZipWithIndex).

 

이 과정에서 셔플링이 발생하지 않고 driver와 executor간에 파티션 크기 정보만 오가기 때문에 성능 부하가 크지 않습니다.

 

RDD API라 데이터프레임으로 오가는 약간의 오버헤드는 있지만 연속된 ID를 생성하는 방식 중엔 가장 괜찮은 선택이라 할 수 있습니다.

 

(Reference)

 


 

지금까지 Spark에서 Auto Increment Key를 생성하는 3가지 방법을 살펴봤습니다.

 

단순히 고유한 ID가 필요하거나 연속적일 필요가 없는 경우 monotonically_increasing_id를,

연속된 ID가 꼭 필요한 경우에는 zipWithIndex를 사용하는 것을 추천합니다.

 

다만 그 전에, 정말로 Auto Increment Key가 필요한지 먼저 고민해볼 필요가 있겠습니다.

분산 환경에서는 대부분의 경우 Uniqueness만 확보해도 충분하고 타임스탬프나 복합 키로 대체할 수 있는 경우가 많기 때문입니다.

 

필요성과 상황을 잘 따져보고 가장 적합한 방식을 선택하도록 합시다.