我有一个格式(时间戳、价格、金额)的价格更新表。时间戳是日期时间、价格分类和金额 float64。时间戳列设置为索引。我的目标是在每个时间点获得每个价格水平的可用数量。首先,我使用枢轴将价格分散到列中,然后向前填充。
pivot = price_table.pivot_table(index = 'timestamp',
columns = 'price', values = 'amount')
pivot_ffill = pivot.fillna(method = 'ffill')
我可以compute
或适用head
于pivot_ffill
和它工作正常。显然,在表的开头仍然有 NA 还没有更新。当我申请
pivot_nullfill = pivot_ffill.fillna(0)
pivot_nullfill.head()
我确实收到错误消息The columns in the computed data do not match the columns in the provided metadata
。我尝试用0.0
或替换零float(0)
,但无济于事。由于前面的步骤起作用,我强烈怀疑它与fillna
,但由于延迟计算不一定是真的。
有人知道这是什么原因吗?谢谢!
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-180-f8ab344c7939> in <module>
----> 1 pivot_ffill.fillna(0).head()
C:\ProgramData\Anaconda3\envs\python36\lib\site-packages\dask\dataframe\core.py in head(self, n, npartitions, compute)
896
897 if compute:
--> 898 result = result.compute()
899 return result
900
C:\ProgramData\Anaconda3\envs\python36\lib\site-packages\dask\base.py in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158
C:\ProgramData\Anaconda3\envs\python36\lib\site-packages\dask\base.py in compute(*args, **kwargs)
396 keys = [x.__dask_keys__() for x in collections]
397 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398 results = schedule(dsk, keys, **kwargs)
399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
400
C:\ProgramData\Anaconda3\envs\python36\lib\site-packages\dask\threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
74 results = get_async(pool.apply_async, len(pool._pool), dsk, result,
75 cache=cache, get_id=_thread_get_id,
---> 76 pack_exception=pack_exception, **kwargs)
77
78 # Cleanup pools associated to dead threads
C:\ProgramData\Anaconda3\envs\python36\lib\site-packages\dask\local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
460 _execute_task(task, data) # Re-execute locally
461 else:
--> 462 raise_exception(exc, tb)
463 res, worker_id = loads(res_info)
464 state['cache'][key] = res
C:\ProgramData\Anaconda3\envs\python36\lib\site-packages\dask\compatibility.py in reraise(exc, tb)
110 if exc.__traceback__ is not tb:
111 raise exc.with_traceback(tb)
--> 112 raise exc
113
114 import pickle as cPickle
C:\ProgramData\Anaconda3\envs\python36\lib\site-packages\dask\local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
228 try:
229 task, data = loads(task_info)
--> 230 result = _execute_task(task, data)
231 id = get_id()
232 result = dumps((result, id))
C:\ProgramData\Anaconda3\envs\python36\lib\site-packages\dask\core.py in _execute_task(arg, cache, dsk)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):
C:\ProgramData\Anaconda3\envs\python36\lib\site-packages\dask\core.py in <listcomp>(.0)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):
C:\ProgramData\Anaconda3\envs\python36\lib\site-packages\dask\core.py in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg
C:\ProgramData\Anaconda3\envs\python36\lib\site-packages\dask\optimization.py in __call__(self, *args)
940 % (len(self.inkeys), len(args)))
941 return core.get(self.dsk, self.outkey,
--> 942 dict(zip(self.inkeys, args)))
943
944 def __reduce__(self):
C:\ProgramData\Anaconda3\envs\python36\lib\site-packages\dask\core.py in get(dsk, out, cache)
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)
C:\ProgramData\Anaconda3\envs\python36\lib\site-packages\dask\core.py in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg
C:\ProgramData\Anaconda3\envs\python36\lib\site-packages\dask\compatibility.py in apply(func, args, kwargs)
91 def apply(func, args, kwargs=None):
92 if kwargs:
---> 93 return func(*args, **kwargs)
94 else:
95 return func(*args)
C:\ProgramData\Anaconda3\envs\python36\lib\site-packages\dask\dataframe\core.py in apply_and_enforce(*args, **kwargs)
3800 if not np.array_equal(np.nan_to_num(meta.columns),
3801 np.nan_to_num(df.columns)):
-> 3802 raise ValueError("The columns in the computed data do not match"
3803 " the columns in the provided metadata")
3804 else:
ValueError: The columns in the computed data do not match the columns in the provided metadata
错误消息应该为您提供了如何解决这种情况的建议。我们假设您是从 CSV 加载的(问题没有说),所以您可能会得到如下一行
df = dd.read_csv(..., dtype={...})
它指示 pandas 阅读器您要强制执行的 dtypes,因为您知道的信息比 Pandas 多。这确保所有分区的所有列都具有相同的类型 - 请参阅文档的注释部分。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句