我试图用流无论从MongoDB中到Elasticsearch数据pymongo和Python客户端elasticsearch。
我已经设置了映射,我在这里报告与感兴趣的领域有关的代码段:
“ updated_at”:{“ type”:“ date”,“ format”:“ dateOptionalTime”}
我的脚本使用pymongo从MongoDB中获取了每个文档,并尝试将其索引到Elasticsearch中,
from elasticsearch import Elasticsearch
from pymongo import MongoClient
mongo_client = MongoClient('localhost', 27017)
es_client = Elasticsearch(hosts=[{"host": "localhost", "port": 9200}])
db = mongo_client['my_db']
collection = db['my_collection']
for doc in collection.find():
es_client.index(
index='index_name',
doc_type='my_type',
id=str(doc['_id']),
body=json.dumps(doc, default=json_util.default)
)
我在运行它时遇到的问题是:
elasticsearch.exceptions.RequestError:TransportError(400,u'MapperParsingException [无法解析[updated_at]];嵌套:ElasticsearchIllegalArgumentException [未知属性[$ date]];')
我相信问题的根源在于pymongo将字段updated_at序列化为datetime.datetime对象,正如我所看到的是否在for循环中打印文档:
u'updated_at':datetime.datetime(2014,8,31,17,18,13,17000)
这与Elasticsearch寻找映射中指定的类型为date的对象发生冲突。
任何想法如何解决这个问题?
我想您的问题是您正在使用
body=json.dumps(doc, default=json_util.default)
但是你应该使用
body=doc
这样做对我来说是可行的,因为似乎Elasticsearch正在将字典别名化为JSON文档(当然,假设doc是字典,我猜是这样)。
至少在我使用的(2.x)版本的Elasticsearch中,datetime.datetime是正确的别名,不需要映射。例如,这对我有用:
doc = {"updated_on": datetime.now(timezone.utc)}
res = es.index(index=es_index, doc_type='my_type',
id=1, body=doc)
并被Kibana确认为日期。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句