我的脚本不起作用。所有10个进程都采用第一个列表项,然后停止。输出10x 1列表项如何解决此问题?错误必须在循环中,或者我需要为此排队吗?
import finanzen_fundamentals.stocks as ff
import mysql.connector
import pandas as pd
import multiprocessing
import time
results = []
def get_list():
try:
mydb = mysql.connector.connect( host="localhost", user="changed", password="changed", database="stockdata")
mycursor = mydb.cursor()
mycursor.execute("select * from url_name")
record = mycursor.fetchall()
return record
except Exception as e:
return str(e)
def create_json(record):
for row in record:
try:
df = ff.get_current_value_lxml(str(row[2])[:-1], exchange = "FSE")
print('Name:' + row[0] + ' WKN:' + df['wkn'].values[0] + ' Preis:' + str(df['price'].values[0]) + ' Currency:' + df['currency'].values[0] + ' Zeit:' + df['time'].values[0])
result = [[row[0], df['wkn'].values[0], df['price'].values[0], df['currency'].values[0], df['time'].values[0]]]
return result
except Exception as e:
print(str(e))
def collect_results(result):
results.extend(result)
if __name__ == '__main__':
record = get_list()
start_time = time.time()
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
for i in range(10):
pool.apply_async(create_json, args=(record, ), callback=collect_results)
pool.close()
pool.join()
df_out = pd.DataFrame(results, columns=['Name', 'WKN', 'Preis', 'Currency', 'Zeit'])
print(df_out)
输出:
Name WKN Preis Currency Zeit
0 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
1 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
2 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
3 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
4 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
5 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
6 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
7 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
8 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
9 21VIANET GRP ADR A/6 O. A1H9DT 20.0 EUR 23.10.2020
您弄错了循环结构。在内部,create_json
您正在循环row
的,record
但始终在第一次迭代时使用相同的原始record
列表来调用它。因此,所有工人将始终只在第一线工作。您需要更改worker函数才能对进行操作:return
row
def create_json(row):
try:
df = ff.get_current_value_lxml(str(row[2])[:-1], exchange = "FSE")
print('Name:' + row[0] + ' WKN:' + df['wkn'].values[0] + ' Preis:' + str(df['price'].values[0]) + ' Currency:' + df['currency'].values[0] + ' Zeit:' + df['time'].values[0])
result = [[row[0], df['wkn'].values[0], df['price'].values[0], df['currency'].values[0], df['time'].values[0]]]
return result
except Exception as e:
print(str(e))
然后在主代码中每一行调用它:
if __name__ == '__main__':
...
for row in record:
pool.apply_async(create_json, args=(row, ), callback=collect_results)
...
请注意,在这种情况下apply_async
,您可以使用代替循环和调用map
。它甚至已经返回了结果列表,因此您甚至都不需要callback
了,例如:
def create_json(row):
try:
df = ff.get_current_value_lxml(str(row[2])[:-1], exchange = "FSE")
print('Name:' + row[0] + ' WKN:' + df['wkn'].values[0] + ' Preis:' + str(df['price'].values[0]) + ' Currency:' + df['currency'].values[0] + ' Zeit:' + df['time'].values[0])
result = [row[0], df['wkn'].values[0], df['price'].values[0], df['currency'].values[0], df['time'].values[0]]
# NOTE THAT NOW IT'S A 1-D LIST!
return result
except Exception as e:
print(str(e))
if __name__ == '__main__':
record = get_list()
start_time = time.time()
with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
results = pool.map(create_json, record)
df_out = pd.DataFrame(results, columns=['Name', 'WKN', 'Preis', 'Currency', 'Zeit'])
print(df_out)
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句