Python多处理For循环

海军

我的脚本不起作用。所有10个进程都采用第一个列表项,然后停止。输出10x 1列表项如何解决此问题?错误必须在循环中,或者我需要为此排队吗?

import finanzen_fundamentals.stocks as ff
import mysql.connector
import pandas as pd
import multiprocessing
import time

results = []

def get_list():
    try:
        mydb = mysql.connector.connect( host="localhost", user="changed", password="changed", database="stockdata")
        mycursor = mydb.cursor()
        mycursor.execute("select * from url_name")
        record = mycursor.fetchall()
        return record
    except Exception as e:
        return str(e)

def create_json(record):
    for row in record:
        try:
            df = ff.get_current_value_lxml(str(row[2])[:-1], exchange = "FSE")
            print('Name:' + row[0] + ' WKN:' + df['wkn'].values[0] + ' Preis:' + str(df['price'].values[0]) + ' Currency:' + df['currency'].values[0] + ' Zeit:' + df['time'].values[0])
            result = [[row[0], df['wkn'].values[0], df['price'].values[0], df['currency'].values[0], df['time'].values[0]]]
            return result
        except Exception as e:
            print(str(e))

def collect_results(result):
     results.extend(result)

if __name__ == '__main__':
    record = get_list()
    start_time = time.time()
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    for i in range(10):
        pool.apply_async(create_json, args=(record, ), callback=collect_results)
    pool.close()
    pool.join()

    df_out = pd.DataFrame(results, columns=['Name', 'WKN', 'Preis', 'Currency', 'Zeit'])
    print(df_out)

输出:

                      Name     WKN  Preis Currency        Zeit
0  21VIANET GRP ADR A/6 O.  A1H9DT   20.0      EUR  23.10.2020
1  21VIANET GRP ADR A/6 O.  A1H9DT   20.0      EUR  23.10.2020
2  21VIANET GRP ADR A/6 O.  A1H9DT   20.0      EUR  23.10.2020
3  21VIANET GRP ADR A/6 O.  A1H9DT   20.0      EUR  23.10.2020
4  21VIANET GRP ADR A/6 O.  A1H9DT   20.0      EUR  23.10.2020
5  21VIANET GRP ADR A/6 O.  A1H9DT   20.0      EUR  23.10.2020
6  21VIANET GRP ADR A/6 O.  A1H9DT   20.0      EUR  23.10.2020
7  21VIANET GRP ADR A/6 O.  A1H9DT   20.0      EUR  23.10.2020
8  21VIANET GRP ADR A/6 O.  A1H9DT   20.0      EUR  23.10.2020
9  21VIANET GRP ADR A/6 O.  A1H9DT   20.0      EUR  23.10.2020
富美男

您弄错了循环结构。在内部,create_json您正在循环row的,record但始终在第一次迭代时使用相同的原始record列表来调用它因此,所有工人将始终只在第一线工作。您需要更改worker函数才能对进行操作returnrow

def create_json(row):
    try:
        df = ff.get_current_value_lxml(str(row[2])[:-1], exchange = "FSE")
        print('Name:' + row[0] + ' WKN:' + df['wkn'].values[0] + ' Preis:' + str(df['price'].values[0]) + ' Currency:' + df['currency'].values[0] + ' Zeit:' + df['time'].values[0])
        result = [[row[0], df['wkn'].values[0], df['price'].values[0], df['currency'].values[0], df['time'].values[0]]]
        return result
    except Exception as e:
        print(str(e))

然后在主代码中每一行调用它:

if __name__ == '__main__':
    ...
    for row in record:
        pool.apply_async(create_json, args=(row, ), callback=collect_results)
    ...

请注意,在这种情况下apply_async,您可以使用代替循环和调用map它甚至已经返回了结果列表,因此您甚至都不需要callback,例如:

def create_json(row):
    try:
        df = ff.get_current_value_lxml(str(row[2])[:-1], exchange = "FSE")
        print('Name:' + row[0] + ' WKN:' + df['wkn'].values[0] + ' Preis:' + str(df['price'].values[0]) + ' Currency:' + df['currency'].values[0] + ' Zeit:' + df['time'].values[0])
        result = [row[0], df['wkn'].values[0], df['price'].values[0], df['currency'].values[0], df['time'].values[0]]
        # NOTE THAT NOW IT'S A 1-D LIST!
        return result
    except Exception as e:
        print(str(e))

if __name__ == '__main__':
    record = get_list()
    start_time = time.time()
    with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
        results = pool.map(create_json, record)

    df_out = pd.DataFrame(results, columns=['Name', 'WKN', 'Preis', 'Currency', 'Zeit'])
    print(df_out)

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章