架设ftp服务器-消灭星星电脑版

operationtimedout
2023年4月3日发(作者:dell驱动官网)

Spark-ES:SparkDataFrame读写ElasticSearch

摘要:

Spark

ElasticSearch

依赖准备

注意Scala的版本(2.11),es的版本(6.7,2)和Maven仓库中的jar⼀致

csearch

elasticsearch-spark-20_2.11

6.7.2

me读es⽣成SparekDataFrame

scala>valdf=("")

.options(

Map(""->"192.168.61.240",

""->"8200")

).load("ent_label")

只需要指定es的ip和端⼝号即可,这个读进来是全表的所有字段。

另⼀种使⽤结合

DSL

查询语句实现条件过滤查询,例如过滤条件为isList等于Y

scala>._

scala>valoptions=Map(""->"192.168.61.240",""->"8200")

scala>valdf=("ent_label/_doc","?q=isList:Y",options)

使⽤请求体

scala>valdf=("ent_label/_doc","""{"query":{"match":{"isList":"Y"}}}""",Map(""->"192.168.61.240",""->"8200"))

SparkDataFrame写⼊es

scala>valdf=Seq((1,"d"),(2,"c")).toDF("id","a")

scala>("")

.options(Map(""->"192.168.61.240",

""->"8200",

""->"id",

""->"true"))

.mode("append")

.save("my_label/_doc")

写⼊的时候除了ip和port之外,需要指定es主键字段

,如果不设置_id字段会⾃动随机⽣成,mode存在

overwrite

append

两种模

型,overwrite为完全覆盖,即先删除某个索引所有数据再插⼊,append为只对DataFrame的id在索引中的,删除索引的⽂档数据,重新插⼊。

最后指定save到哪个es索引,

my_label/_doc

为索引名和type名,新版本es的index中只有⼀个默认的type即

_doc

append只是⽂档级别的追加,不是字段级别的,如果要实现字段级别的有则更新⽆则插⼊,需要在options中指定参

数"ion"->"upsert",或者"ion"->"update",这个和ES本⾝的update,upsert操作效果是⼀致的

例如使⽤upsert,当id不在时更新,id存在是修改(append),修改的⽅式是字段不存在就插⼊,存在就修改(upsert)

("").options(

Map(""->"192.168.61.240",

""->"8200",

""->"id",

""->"true",

"ion"->"upsert")).mode("append").save("my_label/_doc")

如果指定->update,则DataFrame中的id列不能出现es索引中不存在的,否则直接报错

注意:如果Spark的DataFrame中列的值为None(null),但是es中现有字段有值,则采⽤upsert+append模式之后

不会更新,还是采⽤es的现有值

还要⼀种写法使⽤

saveToEs

api,这种貌似只能append插⼊

scala>._

scala>Es("my_label/_doc",Map(""->"192.168.61.240",""->"8200",""->"id",""->"true"))

最后看⼀下es中插⼊数据

GET/my_label/_doc/_search

{

"query":{

"match_all":{}

}

}

{

"took":0,

"timed_out":false,

"_shards":{

"total":5,

"successful":5,

"skipped":0,

"failed":0

},

"hits":{

"total":2,

"max_score":1.0,

"hits":[

{

"_index":"my_label",

"_type":"_doc",

"_id":"2",

"_score":1.0,

"_source":{

"id":2,

"a":"c"

}

},

{

"_index":"my_label",

"_type":"_doc",

"_id":"1",

"_score":1.0,

"_source":{

"id":1,

"a":"d"

}

}

]

}

}

更多推荐

operationtimedout