Spark를 사용하여 중앙값과 분위수를 찾는 방법
RDD
분산 방법, IPython 및 Spark를 사용하여 정수의 중앙값을 어떻게 찾을 수 있습니까? 이 RDD
요소는 약 700,000 개이므로 너무 커서 중앙값을 수집하고 찾을 수 없습니다.
이 질문은이 질문과 유사합니다. 그러나 질문에 대한 대답은 내가 모르는 스칼라를 사용하는 것입니다.
Apache Spark로 정확한 중앙값을 계산하려면 어떻게해야합니까?
Scala 답변에 대한 생각을 사용하여 Python으로 비슷한 답변을 작성하려고합니다.
먼저 RDD
. 나는 방법을 모른다. I는 볼 sortBy
(주어진하여 소트 RDD keyfunc
) 및 sortByKey
(이 정렬 RDD
(키, 값) 쌍으로 구성되는 것으로 가정들을.) 방법. 둘 다 키 값을 사용하고 내 RDD
유일한 정수 요소가 있다고 생각합니다 .
- 첫째, 나는 할 생각을했다
myrdd.sortBy(lambda x: x)
? - 다음으로 rdd (
rdd.count()
) 의 길이를 찾습니다 . - 마지막으로 rdd의 중심에있는 요소 또는 두 요소를 찾고 싶습니다. 이 방법에도 도움이 필요합니다.
편집하다:
나는 아이디어가 있었다. 아마도 나는 내 RDD
색인을 생성하고 키 = 색인 및 값 = 요소를 사용할 수 있습니다. 그러면 값별로 정렬 할 수 있습니까? sortByKey
방법 이 있기 때문에 이것이 가능한지 모르겠습니다 .
Spark 2.0 이상 :
Greenwald-Khanna 알고리즘approxQuantile
을 구현 하는 방법을 사용할 수 있습니다 .
파이썬 :
df.approxQuantile("x", [0.5], 0.25)
스칼라 :
df.stat.approxQuantile("x", Array(0.5), 0.25)
마지막 매개 변수는 상대 오류입니다. 숫자가 낮을수록 더 정확한 결과와 더 많은 계산 비용이 듭니다.
Spark 2.2 ( SPARK-14352 ) 부터 여러 열에 대한 추정을 지원합니다.
df.approxQuantile(["x", "y", "z"], [0.5], 0.25)
과
df.approxQuantile(Array("x", "y", "z"), Array(0.5), 0.25)
Spark <2.0
파이썬
댓글에서 언급했듯이 모든 소란의 가치가 없을 가능성이 큽니다. 귀하의 경우와 같이 데이터가 상대적으로 작다면 단순히 중앙값을 수집하고 로컬에서 계산하십시오.
import numpy as np
np.random.seed(323)
rdd = sc.parallelize(np.random.randint(1000000, size=700000))
%time np.median(rdd.collect())
np.array(rdd.collect()).nbytes
몇 년 된 컴퓨터에서는 약 0.01 초가 걸리고 메모리는 약 5.5MB입니다.
데이터가 훨씬 더 큰 경우 정렬이 제한 요소가되므로 정확한 값을 얻는 대신 로컬에서 샘플링, 수집 및 계산하는 것이 좋습니다. 그러나 정말로 Spark를 사용하려면 다음과 같이 트릭을 수행해야합니다 (아무것도 엉망으로 만들지 않은 경우).
from numpy import floor
import time
def quantile(rdd, p, sample=None, seed=None):
"""Compute a quantile of order p ∈ [0, 1]
:rdd a numeric rdd
:p quantile(between 0 and 1)
:sample fraction of and rdd to use. If not provided we use a whole dataset
:seed random number generator seed to be used with sample
"""
assert 0 <= p <= 1
assert sample is None or 0 < sample <= 1
seed = seed if seed is not None else time.time()
rdd = rdd if sample is None else rdd.sample(False, sample, seed)
rddSortedWithIndex = (rdd.
sortBy(lambda x: x).
zipWithIndex().
map(lambda (x, i): (i, x)).
cache())
n = rddSortedWithIndex.count()
h = (n - 1) * p
rddX, rddXPlusOne = (
rddSortedWithIndex.lookup(x)[0]
for x in int(floor(h)) + np.array([0L, 1L]))
return rddX + (h - floor(h)) * (rddXPlusOne - rddX)
그리고 몇 가지 테스트 :
np.median(rdd.collect()), quantile(rdd, 0.5)
## (500184.5, 500184.5)
np.percentile(rdd.collect(), 25), quantile(rdd, 0.25)
## (250506.75, 250506.75)
np.percentile(rdd.collect(), 75), quantile(rdd, 0.75)
(750069.25, 750069.25)
마지막으로 중앙값을 정의합니다.
from functools import partial
median = partial(quantile, p=0.5)
지금까지는 좋지만 네트워크 통신이없는 로컬 모드에서 4.66 초가 걸립니다. 이를 개선 할 수있는 방법이있을 수 있지만 왜 귀찮게합니까?
언어 독립적 ( Hive UDAF ) :
당신이 사용하는 경우 HiveContext
당신은 또한 하이브 UDAFs를 사용할 수 있습니다. 정수 값 사용 :
rdd.map(lambda x: (float(x), )).toDF(["x"]).registerTempTable("df")
sqlContext.sql("SELECT percentile_approx(x, 0.5) FROM df")
연속 값 :
sqlContext.sql("SELECT percentile(x, 0.5) FROM df")
에서 percentile_approx
당신이 사용하는 레코드 수를 결정하는 추가 인수를 전달할 수 있습니다.
RDD 방법 만 원하고 DF로 이동하지 않으려는 경우 솔루션 추가. 이 스 니펫은 RDD가 double 인 백분위 수를 얻을 수 있습니다.
백분위 수를 50으로 입력하면 필요한 중앙값을 얻어야합니다. 설명되지 않은 코너 케이스가 있으면 알려주세요.
/**
* Gets the nth percentile entry for an RDD of doubles
*
* @param inputScore : Input scores consisting of a RDD of doubles
* @param percentile : The percentile cutoff required (between 0 to 100), e.g 90%ile of [1,4,5,9,19,23,44] = ~23.
* It prefers the higher value when the desired quantile lies between two data points
* @return : The number best representing the percentile in the Rdd of double
*/
def getRddPercentile(inputScore: RDD[Double], percentile: Double): Double = {
val numEntries = inputScore.count().toDouble
val retrievedEntry = (percentile * numEntries / 100.0 ).min(numEntries).max(0).toInt
inputScore
.sortBy { case (score) => score }
.zipWithIndex()
.filter { case (score, index) => index == retrievedEntry }
.map { case (score, index) => score }
.collect()(0)
}
다음은 창 함수를 사용하여 사용한 방법입니다 (pyspark 2.2.0 사용).
from pyspark.sql import DataFrame
class median():
""" Create median class with over method to pass partition """
def __init__(self, df, col, name):
assert col
self.column=col
self.df = df
self.name = name
def over(self, window):
from pyspark.sql.functions import percent_rank, pow, first
first_window = window.orderBy(self.column) # first, order by column we want to compute the median for
df = self.df.withColumn("percent_rank", percent_rank().over(first_window)) # add percent_rank column, percent_rank = 0.5 coressponds to median
second_window = window.orderBy(pow(df.percent_rank-0.5, 2)) # order by (percent_rank - 0.5)^2 ascending
return df.withColumn(self.name, first(self.column).over(second_window)) # the first row of the window corresponds to median
def addMedian(self, col, median_name):
""" Method to be added to spark native DataFrame class """
return median(self, col, median_name)
# Add method to DataFrame class
DataFrame.addMedian = addMedian
그런 다음 addMedian 메서드를 호출하여 col2의 중앙값을 계산합니다.
from pyspark.sql import Window
median_window = Window.partitionBy("col1")
df = df.addMedian("col2", "median").over(median_window)
마지막으로 필요한 경우 그룹화 할 수 있습니다.
df.groupby("col1", "median")
데이터 프레임을 입력으로 사용하고 파티션에 대한 출력으로 중앙값을 갖는 데이터 프레임을 반환하는 함수를 작성했으며 order_col은 part_col에 대한 중앙값을 계산하려는 열입니다. :
from pyspark.sql import Window
import pyspark.sql.functions as F
def calculate_median(dataframe, part_col, order_col):
win = Window.partitionBy(*part_col).orderBy(order_col)
# count_row = dataframe.groupby(*part_col).distinct().count()
dataframe.persist()
dataframe.count()
temp = dataframe.withColumn("rank", F.row_number().over(win))
temp = temp.withColumn(
"count_row_part",
F.count(order_col).over(Window.partitionBy(part_col))
)
temp = temp.withColumn(
"even_flag",
F.when(
F.col("count_row_part") %2 == 0,
F.lit(1)
).otherwise(
F.lit(0)
)
).withColumn(
"mid_value",
F.floor(F.col("count_row_part")/2)
)
temp = temp.withColumn(
"avg_flag",
F.when(
(F.col("even_flag")==1) &
(F.col("rank") == F.col("mid_value"))|
((F.col("rank")-1) == F.col("mid_value")),
F.lit(1)
).otherwise(
F.when(
F.col("rank") == F.col("mid_value")+1,
F.lit(1)
)
)
)
temp.show(10)
return temp.filter(
F.col("avg_flag") == 1
).groupby(
part_col + ["avg_flag"]
).agg(
F.avg(F.col(order_col)).alias("median")
).drop("avg_flag")
참조 URL : https://stackoverflow.com/questions/31432843/how-to-find-median-and-quantiles-using-spark
'IT이야기' 카테고리의 다른 글
파이썬 키 누름 감지 방법 (0) | 2021.03.21 |
---|---|
commonjs / amd 모듈을 가져 오기위한 새로운 es6 구문, 즉 import foo = require ( 'foo') (0) | 2021.03.21 |
Chrome에 이미 '$'가 정의되어 있는 것일까 (0) | 2021.03.21 |
ElasticSearch를 Mysql과 통합하는 방법 (0) | 2021.03.21 |
Jupyter 노트북에서 셀 출력을 지우는 바로 가기 키 (0) | 2021.03.21 |