IT이야기

Spark를 사용하여 중앙값과 분위수를 찾는 방법

cyworld 2021. 3. 21. 09:04
반응형

Spark를 사용하여 중앙값과 분위수를 찾는 방법


RDD분산 방법, IPython 및 Spark를 사용하여 정수의 중앙값을 어떻게 찾을 수 있습니까? RDD요소는 약 700,000 개이므로 너무 커서 중앙값을 수집하고 찾을 수 없습니다.

이 질문은이 질문과 유사합니다. 그러나 질문에 대한 대답은 내가 모르는 스칼라를 사용하는 것입니다.

Apache Spark로 정확한 중앙값을 계산하려면 어떻게해야합니까?

Scala 답변에 대한 생각을 사용하여 Python으로 비슷한 답변을 작성하려고합니다.

먼저 RDD. 나는 방법을 모른다. I는 볼 sortBy(주어진하여 소트 RDD keyfunc) 및 sortByKey(이 정렬 RDD(키, 값) 쌍으로 구성되는 것으로 가정들을.) 방법. 둘 다 키 값을 사용하고 내 RDD유일한 정수 요소가 있다고 생각합니다 .

  1. 첫째, 나는 할 생각을했다 myrdd.sortBy(lambda x: x)?
  2. 다음으로 rdd ( rdd.count()) 의 길이를 찾습니다 .
  3. 마지막으로 rdd의 중심에있는 요소 또는 두 요소를 찾고 싶습니다. 이 방법에도 도움이 필요합니다.

편집하다:

나는 아이디어가 있었다. 아마도 나는 내 RDD색인을 생성하고 키 = 색인 및 값 = 요소를 사용할 수 있습니다. 그러면 값별로 정렬 할 수 있습니까? sortByKey방법 이 있기 때문에 이것이 가능한지 모르겠습니다 .


Spark 2.0 이상 :

Greenwald-Khanna 알고리즘approxQuantile 을 구현 하는 방법을 사용할 수 있습니다 .

파이썬 :

df.approxQuantile("x", [0.5], 0.25)

스칼라 :

df.stat.approxQuantile("x", Array(0.5), 0.25)

마지막 매개 변수는 상대 오류입니다. 숫자가 낮을수록 더 정확한 결과와 더 많은 계산 비용이 듭니다.

Spark 2.2 ( SPARK-14352 ) 부터 여러 열에 대한 추정을 지원합니다.

df.approxQuantile(["x", "y", "z"], [0.5], 0.25)

df.approxQuantile(Array("x", "y", "z"), Array(0.5), 0.25)

Spark <2.0

파이썬

댓글에서 언급했듯이 모든 소란의 가치가 없을 가능성이 큽니다. 귀하의 경우와 같이 데이터가 상대적으로 작다면 단순히 중앙값을 수집하고 로컬에서 계산하십시오.

import numpy as np

np.random.seed(323)
rdd = sc.parallelize(np.random.randint(1000000, size=700000))

%time np.median(rdd.collect())
np.array(rdd.collect()).nbytes

몇 년 된 컴퓨터에서는 약 0.01 초가 걸리고 메모리는 약 5.5MB입니다.

데이터가 훨씬 더 큰 경우 정렬이 제한 요소가되므로 정확한 값을 얻는 대신 로컬에서 샘플링, 수집 및 계산하는 것이 좋습니다. 그러나 정말로 Spark를 사용하려면 다음과 같이 트릭을 수행해야합니다 (아무것도 엉망으로 만들지 않은 경우).

from numpy import floor
import time

def quantile(rdd, p, sample=None, seed=None):
    """Compute a quantile of order p ∈ [0, 1]
    :rdd a numeric rdd
    :p quantile(between 0 and 1)
    :sample fraction of and rdd to use. If not provided we use a whole dataset
    :seed random number generator seed to be used with sample
    """
    assert 0 <= p <= 1
    assert sample is None or 0 < sample <= 1

    seed = seed if seed is not None else time.time()
    rdd = rdd if sample is None else rdd.sample(False, sample, seed)

    rddSortedWithIndex = (rdd.
        sortBy(lambda x: x).
        zipWithIndex().
        map(lambda (x, i): (i, x)).
        cache())

    n = rddSortedWithIndex.count()
    h = (n - 1) * p

    rddX, rddXPlusOne = (
        rddSortedWithIndex.lookup(x)[0]
        for x in int(floor(h)) + np.array([0L, 1L]))

    return rddX + (h - floor(h)) * (rddXPlusOne - rddX)

그리고 몇 가지 테스트 :

np.median(rdd.collect()), quantile(rdd, 0.5)
## (500184.5, 500184.5)
np.percentile(rdd.collect(), 25), quantile(rdd, 0.25)
## (250506.75, 250506.75)
np.percentile(rdd.collect(), 75), quantile(rdd, 0.75)
(750069.25, 750069.25)

마지막으로 중앙값을 정의합니다.

from functools import partial
median = partial(quantile, p=0.5)

지금까지는 좋지만 네트워크 통신이없는 로컬 모드에서 4.66 초가 걸립니다. 이를 개선 할 수있는 방법이있을 수 있지만 왜 귀찮게합니까?

언어 독립적 ( Hive UDAF ) :

당신이 사용하는 경우 HiveContext당신은 또한 하이브 UDAFs를 사용할 수 있습니다. 정수 값 사용 :

rdd.map(lambda x: (float(x), )).toDF(["x"]).registerTempTable("df")

sqlContext.sql("SELECT percentile_approx(x, 0.5) FROM df")

연속 값 :

sqlContext.sql("SELECT percentile(x, 0.5) FROM df")

에서 percentile_approx당신이 사용하는 레코드 수를 결정하는 추가 인수를 전달할 수 있습니다.


RDD 방법 만 원하고 DF로 이동하지 않으려는 경우 솔루션 추가. 이 스 니펫은 RDD가 double 인 백분위 수를 얻을 수 있습니다.

백분위 수를 50으로 입력하면 필요한 중앙값을 얻어야합니다. 설명되지 않은 코너 케이스가 있으면 알려주세요.

/**
  * Gets the nth percentile entry for an RDD of doubles
  *
  * @param inputScore : Input scores consisting of a RDD of doubles
  * @param percentile : The percentile cutoff required (between 0 to 100), e.g 90%ile of [1,4,5,9,19,23,44] = ~23.
  *                     It prefers the higher value when the desired quantile lies between two data points
  * @return : The number best representing the percentile in the Rdd of double
  */    
  def getRddPercentile(inputScore: RDD[Double], percentile: Double): Double = {
    val numEntries = inputScore.count().toDouble
    val retrievedEntry = (percentile * numEntries / 100.0 ).min(numEntries).max(0).toInt


    inputScore
      .sortBy { case (score) => score }
      .zipWithIndex()
      .filter { case (score, index) => index == retrievedEntry }
      .map { case (score, index) => score }
      .collect()(0)
  }

다음은 창 함수를 사용하여 사용한 방법입니다 (pyspark 2.2.0 사용).

from pyspark.sql import DataFrame

class median():
    """ Create median class with over method to pass partition """
    def __init__(self, df, col, name):
        assert col
        self.column=col
        self.df = df
        self.name = name

    def over(self, window):
        from pyspark.sql.functions import percent_rank, pow, first

        first_window = window.orderBy(self.column)                                  # first, order by column we want to compute the median for
        df = self.df.withColumn("percent_rank", percent_rank().over(first_window))  # add percent_rank column, percent_rank = 0.5 coressponds to median
        second_window = window.orderBy(pow(df.percent_rank-0.5, 2))                 # order by (percent_rank - 0.5)^2 ascending
        return df.withColumn(self.name, first(self.column).over(second_window))     # the first row of the window corresponds to median

def addMedian(self, col, median_name):
    """ Method to be added to spark native DataFrame class """
    return median(self, col, median_name)

# Add method to DataFrame class
DataFrame.addMedian = addMedian

그런 다음 addMedian 메서드를 호출하여 col2의 중앙값을 계산합니다.

from pyspark.sql import Window

median_window = Window.partitionBy("col1")
df = df.addMedian("col2", "median").over(median_window)

마지막으로 필요한 경우 그룹화 할 수 있습니다.

df.groupby("col1", "median")

데이터 프레임을 입력으로 사용하고 파티션에 대한 출력으로 중앙값을 갖는 데이터 프레임을 반환하는 함수를 작성했으며 order_col은 part_col에 대한 중앙값을 계산하려는 열입니다. :

from pyspark.sql import Window
import pyspark.sql.functions as F

def calculate_median(dataframe, part_col, order_col):
    win = Window.partitionBy(*part_col).orderBy(order_col)
#     count_row = dataframe.groupby(*part_col).distinct().count()
    dataframe.persist()
    dataframe.count()
    temp = dataframe.withColumn("rank", F.row_number().over(win))
    temp = temp.withColumn(
        "count_row_part",
        F.count(order_col).over(Window.partitionBy(part_col))
    )
    temp = temp.withColumn(
        "even_flag",
        F.when(
            F.col("count_row_part") %2 == 0,
            F.lit(1)
        ).otherwise(
            F.lit(0)
        )
    ).withColumn(
        "mid_value",
        F.floor(F.col("count_row_part")/2)
    )

    temp = temp.withColumn(
        "avg_flag",
        F.when(
            (F.col("even_flag")==1) &
            (F.col("rank") == F.col("mid_value"))|
            ((F.col("rank")-1) == F.col("mid_value")),
            F.lit(1)
        ).otherwise(
        F.when(
            F.col("rank") == F.col("mid_value")+1,
            F.lit(1)
            )
        )
    )
    temp.show(10)
    return temp.filter(
        F.col("avg_flag") == 1
    ).groupby(
        part_col + ["avg_flag"]
    ).agg(
        F.avg(F.col(order_col)).alias("median")
    ).drop("avg_flag")

참조 URL : https://stackoverflow.com/questions/31432843/how-to-find-median-and-quantiles-using-spark

반응형