架设ftp服务器-消灭星星电脑版
![operationtimedout](/uploads/image/0075.jpg)
2023年4月3日发(作者:dell驱动官网)
Spark-ES:SparkDataFrame读写ElasticSearch
摘要:
Spark
,
ElasticSearch
依赖准备
注意Scala的版本(2.11),es的版本(6.7,2)和Maven仓库中的jar⼀致
me读es⽣成SparekDataFrame
scala>valdf=("")
.options(
Map(""->"192.168.61.240",
""->"8200")
).load("ent_label")
只需要指定es的ip和端⼝号即可,这个读进来是全表的所有字段。
另⼀种使⽤结合
DSL
查询语句实现条件过滤查询,例如过滤条件为isList等于Y
scala>._
scala>valoptions=Map(""->"192.168.61.240",""->"8200")
scala>valdf=("ent_label/_doc","?q=isList:Y",options)
使⽤请求体
scala>valdf=("ent_label/_doc","""{"query":{"match":{"isList":"Y"}}}""",Map(""->"192.168.61.240",""->"8200"))
SparkDataFrame写⼊es
scala>valdf=Seq((1,"d"),(2,"c")).toDF("id","a")
scala>("")
.options(Map(""->"192.168.61.240",
""->"8200",
""->"id",
""->"true"))
.mode("append")
.save("my_label/_doc")
写⼊的时候除了ip和port之外,需要指定es主键字段
,如果不设置_id字段会⾃动随机⽣成,mode存在
overwrite
和
append
两种模
型,overwrite为完全覆盖,即先删除某个索引所有数据再插⼊,append为只对DataFrame的id在索引中的,删除索引的⽂档数据,重新插⼊。
最后指定save到哪个es索引,
my_label/_doc
为索引名和type名,新版本es的index中只有⼀个默认的type即
_doc
。
append只是⽂档级别的追加,不是字段级别的,如果要实现字段级别的有则更新⽆则插⼊,需要在options中指定参
数"ion"->"upsert",或者"ion"->"update",这个和ES本⾝的update,upsert操作效果是⼀致的
例如使⽤upsert,当id不在时更新,id存在是修改(append),修改的⽅式是字段不存在就插⼊,存在就修改(upsert)
("").options(
Map(""->"192.168.61.240",
""->"8200",
""->"id",
""->"true",
"ion"->"upsert")).mode("append").save("my_label/_doc")
如果指定->update,则DataFrame中的id列不能出现es索引中不存在的,否则直接报错
注意:如果Spark的DataFrame中列的值为None(null),但是es中现有字段有值,则采⽤upsert+append模式之后
不会更新,还是采⽤es的现有值
还要⼀种写法使⽤
saveToEs
api,这种貌似只能append插⼊
scala>._
scala>Es("my_label/_doc",Map(""->"192.168.61.240",""->"8200",""->"id",""->"true"))
最后看⼀下es中插⼊数据
GET/my_label/_doc/_search
{
"query":{
"match_all":{}
}
}
{
"took":0,
"timed_out":false,
"_shards":{
"total":5,
"successful":5,
"skipped":0,
"failed":0
},
"hits":{
"total":2,
"max_score":1.0,
"hits":[
{
"_index":"my_label",
"_type":"_doc",
"_id":"2",
"_score":1.0,
"_source":{
"id":2,
"a":"c"
}
},
{
"_index":"my_label",
"_type":"_doc",
"_id":"1",
"_score":1.0,
"_source":{
"id":1,
"a":"d"
}
}
]
}
}
更多推荐
operationtimedout
发布评论