티스토리 뷰

Spark의 Murmur3 hash 구현이 일반적인 Murmur3 구현과 약간 다르다는 점을 알고 계신가요?

이번 글에서는 Spark에서 hash() 함수를 사용하며 겪었던 Murmur3 해시의 호환 문제에 대해 다뤄보겠습니다.

 

Spark에서의 Hash 함수에 대해 다룬 이전 게시글을 읽고 오시면 이해에 도움이 됩니다!

 

Spark 속 해시 이야기 - Murmur3와 xxHash

Spark에서는 데이터를 분산하고 처리 속도를 높이기 위해 내부적으로 해시를 폭넓게 활용합니다.해시는 데이터를 균등하게 분산시키고 복잡한 값을 빠르게 비교할 수 있다는 장점이 있습니다.

openkmj.tistory.com

 

 

문제 상황

Spark를 통해 데이터를 집계한 후, 그 결과를 바탕으로 key-value lookup 테이블을 생성하여 이를 웹 클라이언트에서 활용하고자 합니다.

lookup 성능과 파일 크기를 고려하여 key 문자열을 그대로 쓰는 대신 해시함수를 이용해 (hashedKey, value) 형태의 테이블을 만들기로 했습니다.

 

하지만 실제로 Javascript에서 동일 시드, 동일 문자열에 대해 해시값을 계산해보니, Spark에서 생성한 값과 다른 결과가 나오는 문제를 발견할 수 있었습니다.

 

Spark / JS Hash 결과

 

Spark의 hash() 함수는 Murmur3 알고리즘을 사용하기 때문에, Javascript에서도 동일한 Murmur 해시 구현 라이브러리인 murmurhash-js를 사용했습니다.

 

 

murmurhash-js 로직 분석

처음에는 단순히 Javascript와 Java 간의 타입 차이나 인코딩 방식의 차이를 예상했습니다.

Java에서는 해시 연산에 사용되는 데이터 타입이 4바이트 int인 반면, Javascript에서는 모든 숫자가 number(8바이트 float)로 처리됩니다.

해싱에서는 비트 시프트, XOR 등 비트연산이 반복적으로 사용되고 곱셈을 통해 의도적으로 오버플로우를 유도하기 때문에 이게 문제가 되지 않을까 생각했습니다.

 

하지만 Javascript에서도 내부적으로 비트연산자(<<, ^, &, >>> 등)을 사용할 때 내부적으로 4바이트 정수(ToInt32)로 강제 변환 후 연산하기 때문에 비트연산에서의 큰 문제가 없습니다.

 

또한 murmurhash-js 를 보면 비트회전은 아래와 같이 구현하고 있습니다.

// rotateLeft(k1, 15)
k1 = (k1 << 15) | (k1 >>> 17);

33번째, 34번째 비트를 1번째, 0번째 비트로 이동시키기 위해 k1 >>> 17과 OR 연산을 추가했습니다.

 

마찬가지로 곱셈의 경우에도 오버플로우를 흉내내는 로직이 존재합니다.

// int h1 = h1 * C1
h1 = (((h1 & 0xffff) * C1) + ((((h1 >>> 16) * C1) & 0xffff) << 16)) & 0xffffffff;

 

이렇게 murmurhash-js의 해시 구현의 문제는 아닌 것 같습니다.

 

(Reference)

 

일부 데이터에서는 정상 동작?

재미있는 점은, 일부 데이터에서는 정상적으로 양쪽 다 같은 값이 나오는 경우가 있다는 것입니다.

몇번의 테스트 후에 문자열의 길이가 4의 배수인 경우에는 정상적으로 동작한다는 점을 발견할 수 있었습니다.

 

Murmur3는 내부적으로 입력 문자열을 4바이트 단위로 나눠서 처리하고 남은 1~3바이트에 대해서는 따로 처리하는데, 아마 그 단계에서 문제가 생긴 것 같습니다.

 

  // Spark hash
  public static int hashUnsafeBytes2(Object base, long offset, int lengthInBytes, int seed) {
	// ...
    int k1 = 0;
    for (int i = lengthAligned, shift = 0; i < lengthInBytes; i++, shift += 8) {
      k1 ^= (Platform.getByte(base, offset + i) & 0xFF) << shift;
    }
    // ...
  }

 

    // murmurhash-js
    k1 = 0;
	
	switch (remainder) {
		case 3: k1 ^= (key.charCodeAt(i + 2) & 0xff) << 16;
		case 2: k1 ^= (key.charCodeAt(i + 1) & 0xff) << 8;
		case 1: k1 ^= (key.charCodeAt(i) & 0xff);
		
		k1 = (((k1 & 0xffff) * c1) + ((((k1 >>> 16) * c1) & 0xffff) << 16)) & 0xffffffff;
		k1 = (k1 << 15) | (k1 >>> 17);
		k1 = (((k1 & 0xffff) * c2) + ((((k1 >>> 16) * c2) & 0xffff) << 16)) & 0xffffffff;
		h1 ^= k1;
	}

 

하지만 line단위로 살펴봤음에도 양쪽 다 남은 바이트를 mixK1을 적용한 후 h1과 XOR하는 동일한 구현을 따르고 있었습니다.

 

(Reference)

 

Murmur3 hash generates a different value from other implementations

Python mmh3 라이브러리를 통해 Spark에서 잘못된 해시값을 생성하고 있다는 것을 확인한 뒤, Spark 코드를 좀 더 살펴보기 시작했습니다.

 

Spark의 murmur3 구현 중 문자열(바이트 단위)를 처리하는 메서드를 보면 두가지가 있습니다.

 

hashUnsafeBytes와 hashUnsafeBytes2

 

기존 메서드가 imcompatible, 새로운 메서드가 compatible한 구현이라는 주석이 있어서 당연히 새로운 메서드를 사용하겠거니 싶었지만 찾아보니 실제로 사용되는 메서드는 기존 메서드인 hashUnsafeBytes였습니다.

  public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) {
	//
    for (int i = lengthAligned; i < lengthInBytes; i++) {
      int halfWord = Platform.getByte(base, offset + i);
      int k1 = mixK1(halfWord);
      h1 = mixH1(h1, k1);
    }
	//
  }

 

실제로 나머지 바이트를 처리하는 로직이 다른 것을 알 수 있습니다.

(각각의 바이트를 따로 mixK1, mixH1 처리)

 

 

깃 히스토리를 보니 관련된 IssuePR을 확인할 수 있었습니다.

 

실제로 생성된 해시값이 다른 플랫폼과 달라 호환되는 메서드(hashUnsafeBytes2)를 추가했고, 이후 mllib의 FeatureHasher 에서는 해당 메서드를 사용하도록 했습니다.

 

 

(Reference)

 

왜 기존 메서드를 고치지 않았을까?

왜 잘못 구현된 메서드를 수정하지 않고 새로운 메서드를 추가하는 방향으로 갔을까 의문이 듭니다.

Guava 코드를 기반으로 작성했다고 해서 Guava 레포를 확인해봤지만 문제가 된 코드는 찾아볼 수 없었습니다.

표준을 따르지 않는 버그성 코드..로 볼 수 있는데 왜 수정하지 않았을까요?

 

PR 커멘트를 보면 그 이유를 알 수 있는데 Backward Compatibility 때문입니다.

However I am very hesitant to merge this because this will probably break bucketing (it uses murmur3 to create the buckets); for example a bucketed table written by Spark 2.2 cannot be safely read by Spark after this change.

 

사용자 수준의 코드뿐만아니라 Spark 내부적으로도 기존 구현에 의존하고 있기 때문에, 기존 사용자 및 데이터와의 호환성을 위해 의도적으로 수정하지 않은 것이었습니다.

 

 

Solution

아쉽게도 Spark SQL에서 Compatible한 murmur3 구현을 따로 제공하지 않습니다.

Spark에서 생성한 해시값과 다른 곳에서 생성한 해시값을 비교하는 방식이 일반적인 use case가 아니라는 뜻일수도 있겠네요.

 

그래서 저는 Spark와 동일한 로직의 해시함수를 Javascript로 구현하였습니다.

const C1 = 0xcc9e2d51;
const C2 = 0x1b873593;
const SEED = 42;

function mixK1(k1) {
  k1 = Math.imul(k1, C1);
  k1 = (k1 << 15) | (k1 >>> 17);
  k1 = Math.imul(k1, C2);
  return k1 >>> 0;
}

function mixH1(h1, k1) {
  h1 ^= k1;
  h1 = (h1 << 13) | (h1 >>> 19);
  h1 = Math.imul(h1, 5) + 0xe6546b64;
  return h1 >>> 0;
}

function fmix(h1, length) {
  h1 ^= length;
  h1 ^= h1 >>> 16;
  h1 = Math.imul(h1, 0x85ebca6b);
  h1 ^= h1 >>> 13;
  h1 = Math.imul(h1, 0xc2b2ae35);
  h1 ^= h1 >>> 16;
  return h1;
}

export function murmur3(key) {
  let h1 = SEED;
  const len = key.length;
  const remainder = len % 4;
  const bytes = len - remainder;

  let i = 0;
  while (i < bytes) {
    let k1 =
      (key.charCodeAt(i) & 0xff) |
      ((key.charCodeAt(i + 1) & 0xff) << 8) |
      ((key.charCodeAt(i + 2) & 0xff) << 16) |
      ((key.charCodeAt(i + 3) & 0xff) << 24);
    i += 4;

    k1 = mixK1(k1);
    h1 = mixH1(h1, k1);
  }

  for (let j = 0; j < remainder; j++) {
    const byte = key.charCodeAt(i + j) & 0xff;
    const k1 = mixK1(byte);
    h1 = mixH1(h1, k1);
  }

  return fmix(h1, len);
}

 

 


 

이번 글에서는 Spark의 hash()가 표준 Murmur3 구현과 호환되지 않는 이유에 대해 살펴봤습니다.

 

단순히 구현이 달라서 발생하는 문제라기보단, 이미 운영중인 시스템과의 호환성을 유지해야하는 현실적인 제약에서 비롯된 문제였습니다.

 

더 나은 구현이 있더라도 혹은 약간 잘못된 구현이었더라도, 기존 시스템과 강하게 결합되어 있으면 쉽게 바꿀 수가 없는 상황이 생기기도 합니다.

 

이상적인 방향과 현실적인 제약. 그 사이에서 실제 시스템이 어떤 식으로 타협하고 설계해왔는지 엿볼 수 있는 좋은 사례였습니다.

 

 

그래도, 가능하다면 처음부터 표준을 잘 따르도록 하는게 좋겠네요.

최근에 올라온 글
Total
Today
Yesterday