当我使用JDBC连接来馈电时,即使我对数据帧使用了过滤;当我检查oracle数据源上的查询日志时,我看到执行火花:
SELECT [column_names] FROM MY_TABLE
参考https://stackoverflow.com/a/40870714/1941560,
我期待着火花懒惰地计划查询和执行;
SELECT [column_names] FROM MY_TABLE WHERE [filter_predicate]
但是Spark并没有这样做。之后将获取所有数据并进行过滤。我需要这种行为,因为我不想每x分钟检索一次所有表,而只更改了行(通过递增filterin UPDATE_DATE
)。
有没有办法做到这一点?
这是我的python代码:
df = ...
lookup_seconds = 5 * 60;
now = datetime.datetime.now(pytz.timezone("some timezone"))
max_lookup_datetime = now - datetime.timedelta(seconds=lookup_seconds)
df.where(df.UPDATE_DATE > max_lookup_datetime).explain()
说明结果:
Physical Plan == *Filter (isnotnull(UPDATE_DATE#21) && (UPDATE_DATE#21 > 1516283483208806)) +- Scan ExistingRDD[NO#19,AMOUNT#20,UPDATE_DATE#21,CODE#22,AMOUNT_OLD#23]
编辑:完整的答案在这里
从官方文档1开始:
dbtable应该读取的JDBC表。请注意,可以使用在SQL查询的FROM子句中有效的任何东西。例如,除了完整表之外,您还可以在括号中使用子查询。
您可以将JDBC选项dbtable设置为子查询SQL。例如:
jdbcDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "(select * from tbl where UPDATE_DATE > max_lookup_datetime) t") \
.option("user", "username") \
.option("password", "password") \
.load()
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句