TFDV skew/drift comparator metrics 알아보기 - L-Infinity Distance & Jensen-Shannon Divergence

5 minute read

Production ML 관점에서 data drift를 계산하는 파이프라인 요소로 TFDV를 사용하다보니 뒷단의 세부 로직의 구현 방법이 궁금해졌다. 직접 같은 결과 값을 내는 코드를 구현해보며 많은 시행착오를 겪었었는데 구현하며 정리된 내용을 공유해보려 한다. TFDV Github에 같은 고민을 하고 있는 이슈가 있어 답변한 내용을 블로그로 옮겨 작성했다.

🔗 Skew/Drift Calculation is unclear. #207

개요

TFDV는 data drift 판단을 위해 skew_comparator와 drift_comparator를 이용한다. 이 때 categorical feature의 경우 L-infiniy Distance, numeric feature의 경우 Jensen-Shannon Divergenc를 사용한다. 구현에 있어 어려운 파트는 두 개의 비교하려는 stat histogram을 aligning 시키는 부분이다. 실제 Metrics 계산 로직은 c++로 되어 있다. 본 포스트에서는 이를 바탕으로 이해를 위해 파이썬으로 구현한 코드를 예시로 계산 로직을 설명하도록 하겠다.

전체 파이썬 구현 코드는 아래 깃허브 링크에서 확인할 수 있다.

https://github.com/jinwoo1990/mlops-with-tensorflow/blob/main/tfdv/tfdv_skew_metrics.ipynb

계산 로직과 별개로 skew/drift comparator를 설정해 TFDV로 skew/drift를 계산하는 코드는 아래와 같다.

...
# TFDV Stat 생성
train_stats = tfdv.generate_statistics_from_dataframe(train_df)
eval_stats = tfdv.generate_statistics_from_dataframe(eval_df)

# TFDV Schema 생성
schema = tfdv.infer_schema(train_stats)

# Skew/Drift 설정 
# skew와 drift의 다른 점은 과거와 비교해서 skew가 발생하냐 (drift)와 현재 데이터와 목표 데이터 간에 차이가 있냐 (skew) 같이 비교 대상의 차이임
# Categorical features를 위한 skew/drift threshold 설정 (L-infinity Distance 사용)
for item in ['City', 'Country']:
    tfdv.get_feature(schema, item).skew_comparator.infinity_norm.threshold = 0.00000000000001  # anomaly 판단을 위해 원하는 threshold 값
# Numeric features를 위한 위한 skew/drift threshold 설정 (Jensen-Shannon Divergence 사용)
for item in feat_dict['Income', 'Age']:
    tfdv.get_feature(schema, item).skew_comparator.jensen_shannon_divergence.threshold = 0.00000000000001  # anomaly 판단을 위해 원하는 threshold 값

# skew_comparator는 input statistics와 serving_statistics를 비교 
skew_anomalies = tfdv.validate_statistics(train_stats, schema,
                                          serving_statistics=eval_stats)
tfdv.display_anomalies(skew_anomalies)
# drift_comparator를 위에서 사용했다면 아래와 같이 코드 작성 (previous_statistics 필요)
# drift_anomalies = tfdv.validate_statistics(train_stats, schema_1, 
#                                            previous_statistics=eval_stats)
# tfdv.display_anomalies(drift_anomalies)

L-Infinity Distance for categorical feature

L-Infinity Distance 계산에 사용되는 input과 logic은 아래와 같다.

Input

  • features[feature_index].string_stats.rank_histogram
    • Collections of (label, sample_count) buckets
    • e.g. buckets { label: “Private”, sample_count: 18117.0 } { … }
  • features[feature_idx].string_stats.common_stats.num_non_missing
    • Value to perform normalization

Logic

  • Normalize rank_histogram with num_non_missing value of each feature
    • sample_count / num_non_missing
def get_normalized_label_list_counts(datasets, feature_idx):
    label_len = len(datasets.features[feature_idx].string_stats.rank_histogram.buckets)
    feature_num_non_missing = datasets.features[feature_idx].string_stats.common_stats.num_non_missing
    label_list = []
    normalized_count_dict = {}
    # num_non_missing 으로 normalize
    for i in range(label_len):
        label = datasets.features[feature_idx].string_stats.rank_histogram.buckets[i].label
        count = datasets.features[feature_idx].string_stats.rank_histogram.buckets[i].sample_count
        label_list.append(label)
        normalized_count_dict[label] = count / feature_num_non_missing
    
    return label_list, normalized_count_dict
  • Union two different rank_histograms
    • To align length of histogram_1 and histogram_2
  • Calculate normalized difference between histogram_1 and histogram_2
    • The normalized value of label that was not included in original histogram is calculated as 0
  • Find max values among normalized difference between two histograms (equal to L-Infinity Distance)
def calculate_l_infinity_dist(current_stats, target_stats, dataset_idx, feature_idx):
    current_datasets = current_stats.datasets[dataset_idx]
    target_datasets = target_stats.datasets[dataset_idx]
    
    current_label_list, current_norm_count_dict = get_normalized_label_list_counts(current_datasets, feature_idx)
    target_label_list, target_norm_count_dict = get_normalized_label_list_counts(target_datasets, feature_idx)

    # dataset마다 포함한 item이 다를 수 있어 합집합
    union_label_list = set(current_label_list) | set(target_label_list)

    # 차이 계산
    # 다만, 합집합으로 계산하고 없는 값은 일단 0으로 넣어서 차이 계산
    res_dict = {}
    for label in union_label_list:
        res_dict[label] = abs(current_norm_count_dict.get(label, 0) - target_norm_count_dict.get(label, 0))
    
    return max(res_dict.values())

Jensen-Shannon Divergence for numeric feature

Jensen-Shannon Divergence 계산에 사용되는 input과 logic은 아래와 같다.

Input

  • features[feature_index].num_stats.histogram
    • Collections of (low_value, high_value, sample_count) buckets.
    • e.g. buckets { low_value: 17.0, high_value: 24.3, sample_count: 4435.9744 } { … }
    • Origianl length is 10.
    • As two different histograms are likely to have different low_value and high_value, thus a modification of histogram through union is often necessary.
  • features[feature_index].num_stats.common_stats.num_missing
    • To check wheter to add nan value bucket

Logic

  • Get union boundaries of two histograms using low_value and high_value.
    • Each low_value and high_value is treated as boundary.
    • If two histogram have different boundaries, these boundaries needed to be unioned in order to compare each bucket.
    • e.g. [10, 20, 30, 40, 50, 60, 70, 80, 90, 100] union [15, 25, 35, 45, 55, 65, 75, 85, 95, 105] –> [10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95, 100, 105]
def get_union_boundaries(current_histogram, target_histogram):
    boundaries = set()
    for bucket in current_histogram['buckets']:
        boundaries.add(bucket['low_value'])
        boundaries.add(bucket['high_value'])
        
    for bucket in target_histogram['buckets']:
        boundaries.add(bucket['low_value'])
        boundaries.add(bucket['high_value'])
    
    boundaries = sorted(list(boundaries))
    
    return boundaries
  • Add values to newly created bucket. (Rebucketing)
    • Fill in empty buckets up to the first bucket in the existing histogram.
    • If existing bucket needed to be divided because of new boundary, distribute values based on uniform distribution.
      • {low_value: 10, high_value: 20, sample_count: 100} -> {low_value: 10, high_value: 15, sample_count:50}, {low_value: 15, high_value: 20, sample_count: 50}
    • Fill in empty buckets after the last bucket in the existing histogram.
def add_buckets_to_histogram(bucket_boundaries, 
                            total_sample_count,
                            total_range_covered,
                            histogram):
    """
    e.g.
    # bucket 원래 값
    low_value: 19214.0, high_value: 165763.1, sample_count: 4158.425273604061
    # 비교 histogram 참조하면서 추가된 boundaries (중간에 156600.0 boundary가 추가되면서 히스토그램을 두 개로 쪼개야 되는 상황 생김)
    [19214.0, 156600.0, 165763.1]
    # uniform distribution을 가정해 (low_value - high_value) / cover_range 비율씩 sample_count를 분배
    # cover_range: 165763.1 - 19214.0
    {'low_value': 19214.0, 'high_value': 156600.0, 'sample_count': 3898.4163985951977}
    {'low_value': 156600.0, 'high_value': 165763.1, 'sample_count': 260.0088750088632}

    """
    
    num_new_buckets = len(bucket_boundaries) - 1  # boundaries는 buckets보다 1개 값이 더 있음
    for i in range(num_new_buckets):
        new_bucket = {}
        new_bucket['low_value'] = bucket_boundaries[i]
        new_bucket['high_value'] = bucket_boundaries[i+1]
        # uniform distribution으로 쪼개는 코드
        new_bucket['sample_count'] = ((bucket_boundaries[i+1] - bucket_boundaries[i]) / total_range_covered) * total_sample_count
        
        histogram['buckets'].append(new_bucket)

def rebucket_histogram(boundaries, histogram):
    rebuck_hist = {}
    rebuck_hist['buckets'] = []
    rebuck_hist['num_nan'] = histogram['num_nan']
    
    index = 0
    max_index = len(boundaries) - 1

    for bucket in histogram['buckets']:
        low_value = bucket['low_value']
        high_value = bucket['high_value']
        sample_count = bucket['sample_count']
        
        # 원래 자신이 가지고 있던 값보다 작은 buket들 값 설정 (0으로 설정)
        while low_value > boundaries[index]:
            new_bucket = {}
            new_bucket['low_value'] = boundaries[index]
            index += 1
            new_bucket['high_value'] = boundaries[index]
            new_bucket['sample_count'] = 0.0

            rebuck_hist['buckets'].append(new_bucket)

        # 추가 예외 처리 부분인데 일단 비활성화
        # if low_value == high_value and low_value == boundaries[index]:
        #     new_bucket = {}
        #     new_bucket['low_value'] = boundaries[index]
        #     index += 1
        #     new_bucket['high_value'] = boundaries[index]
        #     new_bucket['sample_count'] = sample_count

        #     rebuck_hist.append(new_bucket)
        #     continue

        # 쪼개야 되는 범위 산출
        covered_boundaries = []
        while high_value > boundaries[index]:
            covered_boundaries.append(boundaries[index])
            index += 1
        covered_boundaries.append(boundaries[index])  # 같은 값일 때 자기 포함

        if len(covered_boundaries) > 0:
            add_buckets_to_histogram(covered_boundaries, sample_count, high_value - low_value, rebuck_hist)
    
    # 원래 자신의 범위 값 넘어선 범위들 bucket에 대한 값 설정 (0으로 설정)
    for i in range(index, max_index):
        new_bucket = {}
        new_bucket['low_value'] = boundaries[index]
        new_bucket['high_value'] = boundaries[index + 1]
        new_bucket['sample_count'] = 0.0

        rebuck_hist['buckets'].append(new_bucket)
    return rebuck_hist

def align_histogram(boundaries, current_histogram, target_histogram):
    boundaries = get_union_boundaries(current_histogram, target_histogram)
    current_histogram_rebucketted = rebucket_histogram(boundaries, current_histogram)
    target_histogram_rebucketted = rebucket_histogram(boundaries, target_histogram)
    return current_histogram_rebucketted, target_histogram_rebucketted
  • Normalize rebucketted histogram sample_count. (Make it to 0.0 ~ 1.0)
  • Calculate through Jensen-Shannon Divergence formula.
    • Kullback-Leibler Divergence: Dkl(p||q) = E[log(pi)-log(qi)] = sum(pi*log(pi/qi))
    • Jensen-Shannon Divergence: JSD(p||q) = 1/2*Dkl(p||m) + 1/2*Dkl(q||m) where m=(p+q)/2
    • Summation of JSD values
# Calculate the kl divergence
def kl_divergence(p, q):
    res = 0
    for i in range(len(p)):
        if p[i] > 0 and q[i] > 0:
            res += p[i] * log2(p[i]/q[i])
    
    return res

# Calculate the js divergence
def js_divergence(p, q):
    m = 0.5 * (p + q)
    return 0.5 * kl_divergence(p, m) + 0.5 * kl_divergence(q, m)

마치며

TFDV 등 TFX component에 대한 세부 내용들은 한글 자료나 영어 자료 모두 정확한 내용들을 가르쳐주는 자료를 찾기 어려웠다. 그래서 직접 소스코드를 보며 구현해보았는데 데이터 케이스에 따라 수치가 조금씩 안 맞는 부분을 맞춰 구현하느라 시간이 오래 걸렸다. 구현 후에 missing 데이터에 따라 수치가 조금 안 맞는 부분을 발견했는데 시간이 가능하다면 수정할 예정이다. missing 데이터가 없을 때는 모두 tfdv와 같은 값을 산출하도록 구현되었고 skew/drift 계산에 대한 큰 틀에 이해를 위해서 참조하기 바란다.

References

Leave a comment