在PySpark中使用HiveContext测试时如何防止内存泄漏

马特·胡佛

我使用pyspark进行一些数据处理,并将HiveContext用于窗口函数。

为了测试代码,我使用TestHiveContext,基本上是从pyspark源代码复制实现:

https://spark.apache.org/docs/preview/api/python/_modules/pyspark/sql/context.html

@classmethod
def _createForTesting(cls, sparkContext):
    """(Internal use only) Create a new HiveContext for testing.

    All test code that touches HiveContext *must* go through this method. Otherwise,
    you may end up launching multiple derby instances and encounter with incredibly
    confusing error messages.
    """
    jsc = sparkContext._jsc.sc()
    jtestHive = sparkContext._jvm.org.apache.spark.sql.hive.test.TestHiveContext(jsc)
    return cls(sparkContext, jtestHive)

然后,我的测试将继承可以访问上下文的基类。

这工作了一段时间。但是,当我添加更多测试时,我开始注意到一些间歇性进程耗尽了内存问题。现在,我无法成功运行测试套件。

"java.lang.OutOfMemoryError: Java heap space"

在每次测试运行后,我都明确停止了spark上下文,但这似乎并没有杀死HiveContext。因此,我相信每次运行新测试时,它都会继续创建新的HiveContext,并且不会删除旧的HiveContext,这会导致内存泄漏。

关于如何拆除基类以使其杀死HiveContext的任何建议?

日拔

如果您很乐意在所有测试中使用单例来保存Spark / Hive上下文,则可以执行以下操作。

test_contexts.py:

_test_spark = None
_test_hive = None

def get_test_spark():
    if _test_spark is None:
        # Create spark context for tests.
        # Not really sure what's involved here for Python.
        _test_spark = ...
    return _test_spark

def get_test_hive():
    if _test_hive is None:
        sc = get_test_spark()
        jsc = test_spark._jsc.sc()
        _test_hive = sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(jsc)
    return _test_hive

然后,您只需在测试中导入这些功能即可。

my_test.py:

from test_contexts import get_test_spark, get_test_hive

def test_some_spark_thing():
    sc = get_test_spark()
    sqlContext = get_test_hive()
    # etc

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章