在Pyspark中如何在列表中添加所有值?

流浪者

我正在jupyter笔记本中运行以下pyspark转换。我的要求是在元素中添加所有值,例如469 + 84451 + 903 ...,并且应该只返回总数。

以下是转换和操作:

In [46]: newdispokey1.collect()

[(u'Hello', 469),
 (u'is', 84451),
 (u'the', 903),
 (u'an', 21208),
 (u'and', 19903),
 (u'route', 185),
 (u'bag', 1894),
 (u'metal', 315),
 (u'bus', 620194),
 (u'cloud', 1036)]

预期结果是所有值的加法。我正在尝试以下转换和操作:

   In [46]: newdispokey1.reduce( lambda x,y: x[1]+y[1] ).collect()

并得到错误。

    ---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-46-9557bb70b499> in <module>()
----> 1 newdispokey1.reduce( lambda x,y: x[1]+y[1] ).collect()

/home/newuser/spark/python/pyspark/rdd.pyc in reduce(self, f)
    797             yield reduce(f, iterator, initial)
    798 
--> 799         vals = self.mapPartitions(func).collect()
    800         if vals:
    801             return reduce(f, vals)

/home/newuser/spark/python/pyspark/rdd.pyc in collect(self)
    772         with SCCallSiteSync(self.context) as css:
--> 773             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    774         return list(_load_from_socket(port, self._jrdd_deserializer))
    775 

/home/newuser/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/home/newuser/spark/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     34     def deco(*a, **kw):
     35         try:
---> 36             return f(*a, **kw)
     37         except py4j.protocol.Py4JJavaError as e:
     38             s = e.java_exception.toString()

/home/newuser/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 48.0 failed 1 times, most recent failure: Lost task 0.0 in stage 48.0 (TID 167, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/newuser/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/home/newuser/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/newuser/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/newuser/spark/python/pyspark/rdd.py", line 797, in func
    yield reduce(f, iterator, initial)
  File "<ipython-input-46-9557bb70b499>", line 1, in <lambda>
TypeError: 'int' object has no attribute '__getitem__'

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1921)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:909)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:908)
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
    at sun.reflect.GeneratedMethodAccessor46.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:207)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/newuser/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/home/newuser/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/newuser/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/newuser/spark/python/pyspark/rdd.py", line 797, in func
    yield reduce(f, iterator, initial)
  File "<ipython-input-46-9557bb70b499>", line 1, in <lambda>
TypeError: 'int' object has no attribute '__getitem__'

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more

我是Apache Spark的新手,如何解决此问题?

埃拉扎尔

最简单的解决方案是

>>> newdispokey1.values().sum()
750558

问题在于reduce方法的参数类型-减速器的类型。它接收两个元素:一个是前一个结果,或者第一个元素,第二个是新元素。因此,您必须返回一个看起来与其他任何元素一样的元素-您必须返回一个对:

>>> newdispokey1.reduce(lambda x, y: ('', x[1] + y[1]))
('', 750558)

或更详细地说明,以防您要更改此内容:

>>> newdispokey1.map(lambda x: x[1]).reduce(lambda x, y: x + y)
750558

与第一个解决方案等效(但可读性较差)。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

如何在列表中添加所有相同的浮点值,然后将这些值放在新列表中

当这些列的名称与列表中的名称匹配时,如何在所有pandas列中添加所有值?

如何在HashMap中添加所有值?

如何在XSLT中添加所有逗号分隔的值

如何在字典中添加所有int值

如何在python / pyspark数据框中的所有列中添加后缀和前缀

如何在python中添加列表值

如何从列表中获取所有值?

Pyspark DataFrame如何在所有列中删除带有空值的行?

如何在PySpark DataFrame中删除具有空值的所有列?

如何在列表中添加集合中的所有文档ID

如何在F#中递归地添加列表中的所有元素?

如何在Pyspark中替换数据框的所有Null值

如何在Matlab中的结构中添加所有字段的值?

如何在Python中从列表的dict中的值生成所有组合

如何在python中反转列表中的所有布尔值?

如何在Haskell中创建无限列表,其中新值使用所有先前值

如何在 Int64 列表中查找值大于特定值的所有元素的索引?

如何在PySpark数据框中添加具有备用值的列?

如何在pyspark数据框中创建具有列表值总和的列

如何在带有不同键的 json 对象中添加所有值

如何在pyspark列表中的值上过滤列?

如何在pyspark中压缩/连接值和列表

如何在python列表中的所有其他项目中添加文本?

如何在Java和MySQL中将休假添加到列表中的所有员工

如何在python 3的列表中添加所有字符串类型的数字

添加後如何在列表中顯示所有數字?

如何在ansible中的单行命令中添加列表值?

如何从 Java 中的 Map 列表中获取所有值?