Agile Data Science 2.0书中代码.

先记录下来,工作中有可能会用到, pyspark+elasticsearch,github地址:https://github.com/rjurney/Agile_Data_Code_2

要把数据从PySpark 写入Elasticsearch 中(或者从Elasticsearch 读取数据到PySpark 中),我们要使用Elasticsearch for Hadoop(https://www.elastic.co/products/hadoop)。在我准备好的映像中,我们已经为本项目配置好了PySpark,因此你不用再做什么就可以加载这个库了。如果你是手动安装的,我们也可以通过安装脚本获得类似的配置。

让PySpark 数据可以被搜索。我们用ch02/pyspark_elasticsearch.py(https://github.com/rjurney/Agile_Data_Code_2/blob/master/ch02/pyspark_elasticsearch.py)把数据从PySpark 保存到Elasticsearch 中:

程序代码 程序代码

csv_lines = sc.textFile("data/example.csv")  
data = csv_lines.map(lambda line: line.split(","))  
schema_data = data.map(  
 lambda x: ('ignored_key', {'name': x[0], 'company': x[1], 'title': x[2]})  
)  
schema_data.saveAsNewAPIHadoopFile(  
 path='-',  
 outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",  
 keyClass="org.apache.hadoop.io.NullWritable",  
 valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",  
 conf={ "es.resource": "agile_data_science/executives"})


搜索数据
程序代码 程序代码

curl 'localhost:9200/agile_data_science/executives/_search?q=name:Russell*&pretty'


搜索结果
程序代码 程序代码

{  
 "took": 19,  
 "timed_out": false,  
 "_shards": {  
"total": 1,  
"successful": 1,  
"failed": 0  
 },  
 "hits": {  
"total": 2,  
"max_score": 1.0,  
"hits": [  
 {  
"_index": "agile_data_science",  
"_type": "executives",  
"_id": "AVrfrAbdfdS5Z0IiIt78",  
"_score": 1.0,  
"_source": {  
 "company": "Relato",  
 "name": "Russell Jurney",  
 "title": "CEO"  
}  
 },  
 {  
"_index": "agile_data_science",  
"_type": "executives",  
"_id": "AVrfrAbdfdS5Z0IiIt79",  
"_score": 1.0,  
"_source": {  
 "company": "Data Syndrome",  
 "name": "Russell Jurney",  
 "title": "Principal Consultant"  
}  
 }  
]  
 }  
}


除非申明,文章均为一号门原创,转载请注明本文地址,谢谢!
[本日志由 yihaomen 于 2019-08-30 09:27 AM 编辑]
文章来自: 本站原创
引用通告: 查看所有引用 | 我要引用此文章
Tags:
相关日志:
评论: 0 | 引用: 0 | 查看次数: -
发表评论
昵 称:
密 码: 游客发言不需要密码.
内 容:
验证码: 验证码
选 项:
虽然发表评论不用注册,但是为了保护您的发言权,建议您注册帐号.