我有1000个传感器,我需要对数据进行分区(即每天每个传感器),然后将每个数据点列表提交给R算法)。 使用Spark,简化示例如下所示:
//Spark val rddData = List( ("1:3", List(1,1,456,1,1,2,480,0,1,3,425,0)), ("1:4", List(1,4,437,1,1,5,490,0)), ("1:6", List(1,6,500,0,1,7,515,1,1,8,517,0,1,9,522,0,1,10,525,0)), ("1:11", List(1,11,610,1)) ) case class DataPoint( key: String, value: List[Int]) // 4 value pattern, sensorID:seq#, seq#, value, state我转换为镶木地板文件,保存。 在SparkR中加载镶木地板,没问题,架构说:
#SparkR df <- read.df(sqlContext, filespec, "parquet") schema(df) StructType |-name = "key", type = "StringType", nullable = TRUE |-name = "value", type = "ArrayType(IntegerType,true)", nullable = TRUE所以在SparkR中,我有一个数据框,其中每条记录都有我想要的所有数据(df $ value)。 我想将该数组提取为R可以使用的内容然后使用包含结果数组的新列来改变我的原始数据帧(df)。 逻辑上类似于results = function(df $ value)。 然后我需要将结果(对于所有行)返回到SparkR数据帧以进行输出。
如何从SparkR数据框中提取数组然后使用结果进行变异?
I have 1000s of sensors, I need to partition the data (i.e. per sensor per day) then submit each list of data points to an R algorithm). Using Spark, simplified sample looks like:
//Spark val rddData = List( ("1:3", List(1,1,456,1,1,2,480,0,1,3,425,0)), ("1:4", List(1,4,437,1,1,5,490,0)), ("1:6", List(1,6,500,0,1,7,515,1,1,8,517,0,1,9,522,0,1,10,525,0)), ("1:11", List(1,11,610,1)) ) case class DataPoint( key: String, value: List[Int]) // 4 value pattern, sensorID:seq#, seq#, value, stateI convert to a parquet file, save it. Load the parquet in SparkR, no problem, the schema says:
#SparkR df <- read.df(sqlContext, filespec, "parquet") schema(df) StructType |-name = "key", type = "StringType", nullable = TRUE |-name = "value", type = "ArrayType(IntegerType,true)", nullable = TRUESo in SparkR, I have a dataframe where each record has all of the data I want (df$value). I want to extract that array into something R can consume then mutate my original dataframe(df) with a new column holding the resultant array. Logically something like results = function(df$value). Then I need to get results (for all rows) back into a SparkR dataframe for output.
How to I extract an array from the SparkR dataframe then mutate with the results?
最满意答案
让spark数据帧为, df和R数据帧为df_r若要将sparkR df转换为R df,请使用代码
df_r <- collect(df)
使用R数据帧df_r ,可以在R中执行所有计算。假设您在列df_r$result
Then for converting back to SparkR data frame use code, #this is a new SparkR data frame, df_1 df_1 <- createDataFrame(sqlContext, df_r) For adding the result back to SparkR data frame `df` use code #this adds the df_1$result to a new column df$result #note that number of rows should be same in df and `df_1`, if not use `join` operation df$result <- df_1$result希望这能解决你的问题
Let spark data frame be, df and R data frame be df_r To convert sparkR df to R df, use code
df_r <- collect(df)
with R data frame df_r, you can do all computations you want to do in R. let say you have the result in column df_r$result
Then for converting back to SparkR data frame use code, #this is a new SparkR data frame, df_1 df_1 <- createDataFrame(sqlContext, df_r) For adding the result back to SparkR data frame `df` use code #this adds the df_1$result to a new column df$result #note that number of rows should be same in df and `df_1`, if not use `join` operation df$result <- df_1$resultHope this solves your problem
更多推荐
发布评论