Agile Data Science 2.0书中代码.
By:Roy.LiuLast updated:2019-08-27
先记录下来,工作中有可能会用到, pyspark+elasticsearch,github地址:https://github.com/rjurney/Agile_Data_Code_2
要把数据从PySpark 写入Elasticsearch 中(或者从Elasticsearch 读取数据到PySpark 中),我们要使用Elasticsearch for Hadoop(https://www.elastic.co/products/hadoop)。在我准备好的映像中,我们已经为本项目配置好了PySpark,因此你不用再做什么就可以加载这个库了。如果你是手动安装的,我们也可以通过安装脚本获得类似的配置。
让PySpark 数据可以被搜索。我们用ch02/pyspark_elasticsearch.py(https://github.com/rjurney/Agile_Data_Code_2/blob/master/ch02/pyspark_elasticsearch.py)把数据从PySpark 保存到Elasticsearch 中:
搜索数据
搜索结果
要把数据从PySpark 写入Elasticsearch 中(或者从Elasticsearch 读取数据到PySpark 中),我们要使用Elasticsearch for Hadoop(https://www.elastic.co/products/hadoop)。在我准备好的映像中,我们已经为本项目配置好了PySpark,因此你不用再做什么就可以加载这个库了。如果你是手动安装的,我们也可以通过安装脚本获得类似的配置。
让PySpark 数据可以被搜索。我们用ch02/pyspark_elasticsearch.py(https://github.com/rjurney/Agile_Data_Code_2/blob/master/ch02/pyspark_elasticsearch.py)把数据从PySpark 保存到Elasticsearch 中:
csv_lines = sc.textFile("data/example.csv") data = csv_lines.map(lambda line: line.split(",")) schema_data = data.map( lambda x: ('ignored_key', {'name': x[0], 'company': x[1], 'title': x[2]}) ) schema_data.saveAsNewAPIHadoopFile( path='-', outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf={ "es.resource": "agile_data_science/executives"})
搜索数据
curl 'localhost:9200/agile_data_science/executives/_search?q=name:Russell*&pretty'
搜索结果
{ "took": 19, "timed_out": false, "_shards": { "total": 1, "successful": 1, "failed": 0 }, "hits": { "total": 2, "max_score": 1.0, "hits": [ { "_index": "agile_data_science", "_type": "executives", "_id": "AVrfrAbdfdS5Z0IiIt78", "_score": 1.0, "_source": { "company": "Relato", "name": "Russell Jurney", "title": "CEO" } }, { "_index": "agile_data_science", "_type": "executives", "_id": "AVrfrAbdfdS5Z0IiIt79", "_score": 1.0, "_source": { "company": "Data Syndrome", "name": "Russell Jurney", "title": "Principal Consultant" } } ] } }
From:一号门
COMMENTS