TypeError:当我使用asyncio loop时无法腌制协程对象。run_in_executor()

王韦德

我指的是此回购协议,以将mmaction2 grad-cam演示从短视频离线推理转换为长视频在线推理。该脚本如下所示:

注意:为了使此脚本易于复制,我注释掉了一些需要很多依赖的代码。

import cv2
import numpy as np
import torchvision.transforms as transforms
import sys
from PIL import Image
#from mmaction.apis import init_recognizer
#from utils.gradcam_utils import GradCAM
import torch
import asyncio
from concurrent.futures import ProcessPoolExecutor
from functools import partial
# sys.path.append('./utils')


async def preprocess_img(arr):
    image = Image.fromarray(np.uint8(arr))
    mean = [0.485, 0.456, 0.406]
    std = [0.229, 0.224, 0.225]
    transform = transforms.Compose([
        transforms.Resize((model_input_height, model_input_width)),
        transforms.ToTensor(),
        transforms.Normalize(mean, std, inplace=False),
    ])
    normalized_img = transform(image)
    img_np = normalized_img.numpy()
    return img_np


async def inference(frame_buffer):
    print("starting inference")
    # inputs = {}
    # input_tensor = torch.from_numpy(frame_buffer).type(torch.FloatTensor)
    # input_cuda_tensor = input_tensor.cuda()
    # inputs['imgs'] = input_cuda_tensor
    # results = gradcam(inputs)
    # display_buffer = np.squeeze(results[0].cpu().detach().numpy(), axis=0)
    # return display_buffer


async def run_blocking_func(loop_, queue_, frame_buffer):
    with ProcessPoolExecutor() as pool:
        blocking_func = partial(inference, frame_buffer)
        frame = await loop_.run_in_executor(pool, blocking_func)
        print(frame)
        await queue_.put(frame)
        await asyncio.sleep(0.01)

async def get_frames(capture):
    capture.grab()
    ret, frame = capture.retrieve()
    if not ret:
        print("empty frame")
        return
    for i in range(32):
        img = await preprocess_img(frame)
        expandimg = np.expand_dims(img, axis=(0, 1, 3))
        print(f'expandimg.shape{expandimg.shape}')
        frame_buffer[:, :, :, i, :, :] = expandimg[:, :, :, 0, :, :]
    return frame_buffer


async def show_frame(queue_: asyncio.LifoQueue):
    display_buffer = await queue_.get()
    for i in range(32):
        blended_image = display_buffer[i, :, :, :]
        cv2.imshow('Grad-CAM VIS', blended_image)
        if cv2.waitKey(10) & 0xFF == ord('q'):
            cap.release()
            cv2.destroyAllWindows()
            break


async def produce(loop_, queue_, cap):
    while True:
        frame_buffer = await asyncio.create_task(get_frames(cap))
        # Apply Grad-CAM
        display_buffer = await asyncio.create_task(run_blocking_func(loop_, queue_,frame_buffer))
        await queue_.put(display_buffer)


async def consume(queue_):
    while True:
        if queue_.qsize():
            task1 = asyncio.create_task(show_frame(queue_))
            await asyncio.wait(task1)
            if cv2.waitKey(1) == 27:
                break
        else:
            await asyncio.sleep(0.01)


async def run(loop_, queue_, cap_):
    producer_task = asyncio.create_task(produce(loop_, queue_, cap_))
    consumer_task = asyncio.create_task(consume(queue_))
    await asyncio.gather(producer_task, consumer_task)


if __name__ == '__main__':

    # config = '/home/weidawang/Repo/mmaction2/configs/recognition/i3d/i3d_r50_video_inference_32x2x1_100e_kinetics400_rgb.py'
    # checkpoint = '/home/weidawang/Repo/mmaction2/checkpoints/i3d_r50_video_32x2x1_100e_kinetics400_rgb_20200826-e31c6f52.pth'
    # device = torch.device('cuda:0')
    # model = init_recognizer(config, checkpoint, device=device, use_frames=False)
    video_path = 'replace_with_your_video.mp4'
    model_input_height = 256
    model_input_width = 340
    # target_layer_name = 'backbone/layer4/1/relu'
    # gradcam = GradCAM(model, target_layer_name)

    cap = cv2.VideoCapture(video_path)
    width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)  # float
    height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)  # float

    frame_buffer = np.zeros((1, 1, 3, 32, model_input_height, model_input_width))
    display_buffer = np.zeros((32, model_input_height, model_input_width, 3))  # (32, 256, 340, 3)

    loop = asyncio.get_event_loop()
    queue = asyncio.LifoQueue(maxsize=2)

    try:
        loop.run_until_complete(run(loop_=loop, queue_=queue, cap_=cap))
    finally:
        print("shutdown service")
        loop.close()

但是当我运行它时,它报告以下错误:

concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/home/weidawang/miniconda3/lib/python3.7/concurrent/futures/process.py", line 205, in _sendback_result
    exception=exception))
  File "/home/weidawang/miniconda3/lib/python3.7/multiprocessing/queues.py", line 358, in put
    obj = _ForkingPickler.dumps(obj)
  File "/home/weidawang/miniconda3/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: can't pickle coroutine objects
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/weidawang/Repo/Python-AI-Action-Utils/temp2.py", line 120, in <module>
    loop.run_until_complete(run(loop_=loop, queue_=queue, cap_=cap))
  File "/home/weidawang/miniconda3/lib/python3.7/asyncio/base_events.py", line 587, in run_until_complete
    return future.result()
  File "/home/weidawang/Repo/Python-AI-Action-Utils/temp2.py", line 94, in run
    await asyncio.gather(producer_task, consumer_task)
  File "/home/weidawang/Repo/Python-AI-Action-Utils/temp2.py", line 76, in produce
    display_buffer = await asyncio.create_task(run_blocking_func(loop_, queue_,frame_buffer))
  File "/home/weidawang/Repo/Python-AI-Action-Utils/temp2.py", line 42, in run_blocking_func
    frame = await loop_.run_in_executor(pool, blocking_func)
TypeError: can't pickle coroutine objects
Task was destroyed but it is pending!
task: <Task pending coro=<consume() running at /home/weidawang/Repo/Python-AI-Action-Utils/temp2.py:88> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f7cf1418cd0>()]> cb=[gather.<locals>._done_callback() at /home/weidawang/miniconda3/lib/python3.7/asyncio/tasks.py:691]>

Process finished with exit code 1
alex_noname

如果使用run_in_executor,则不应使用目标函数async您需要先删除async关键字def inference()

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

在类中使用ProcessPoolExecutor时无法腌制协程对象

使用run_in_executor和asyncio时的超时处理

在没有 asyncio.get_event_loop() 的 run_in_executor 的异步方法中使用线程池

当我使用“ useCallback”时,出现TypeError错误

当我在VueJS中使用Typescript时出现TypeError

asyncio.await失败,出现TypeError:无法解包不可迭代的协程对象

为什么发出HTTP请求时asyncio的run_in_executor很少提供并行化?

TypeError:当我使用.join时,“类型”对象不是可迭代的python

Asyncio:当我们需要使用异步方法时,如何在__del__方法中清理类的实例

当我使用fastapi和pydantic构建POST API时,出现TypeError:类型为JSON的对象无法序列化

当我使用HttpResponseRedirect时,出现TypeError:quote_from_bytes()Django中的预期字节

当我使用颜色图创建节点着色的网络时,由于 float() 参数导致的 TypeError

当我尝试使用filter_by()时,烧瓶中返回“ TypeError:filter_by()”错误

TypeError:“ NoneType”对象不可调用:当我尝试使用openpyxl在Python中使用Excel文件时显示此错误

TypeError:无法腌制PyCapsule对象

当使用日志配置对对象进行深度复制时,出现“ TypeError:无法使线程锁定对象腌制”

从主线程取消run_in_executor协程不起作用

当我使用线路分析器执行性能统计信息时,发生“ TypeError:'module'对象不可调用”

JavaScript -TypeError 当我在命名对象数组和 get 和 set 方法中使用下划线时

TypeError:在熊猫DataFrame上使用dask时无法腌制_thread._local对象

如何使用 GDB 调试 asyncio 协程?

TypeError:当我们使用(old ='XYZ',new ='ABC')时,replace()不接受关键字参数

当我尝试访问嵌套的哈希数组时收到 (TypeError)

当我使用 vuetify 时,我无法显示图像

Asyncio任务与协程

TypeError:需要asyncio.Future,协程或可等待

当我使用.bashrc时颜色消失

当我使用setSupportActionBar()时,活动增加

Asyncio python:如何在协程中调用对象方法?