使用 Python 腳本的 ADF 管道中的 Azure 函數

天使

我正在嘗試在管道中的 Azure 數據工廠中運行我的以下腳本。我的 Python 代碼從 Blob 存儲中檢索 2 個 CSV 文件,並根據密鑰將它們合併為一個文件,然後將其上傳到數據湖存儲。我嘗試過使用功能應用程序塊,它給了我 InternalServerError,我還嘗試了運行沒有錯誤的 Web 活動。問題是當我運行管道時沒有創建文件,即使管道運行成功(使用 Web 塊)。當我調用 main 函數並在數據湖存儲中創建文件時,該函數也會在本地運行。我在 VS Code 中也嘗試過 http 觸發器和持久函數,但沒有一個在 Azure 中創建了“merged.csv”文件。

我的 Python 腳本(init .py):

import pandas as pd
import logging
from azure.storage.blob import BlobServiceClient
from azure.storage.filedatalake import DataLakeServiceClient
import azure.functions as func


def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')

    STORAGEACCOUNTURL= 'https://storage.blob.core.windows.net/'
    STORAGEACCOUNTKEY= '****'
    LOCALFILENAME= ['file1.csv', 'file2.csv']
    CONTAINERNAME= 'inputblob'

    file1 = pd.DataFrame()
    file2 = pd.DataFrame()
    #download from blob

    blob_service_client_instance = BlobServiceClient(account_url=STORAGEACCOUNTURL, credential=STORAGEACCOUNTKEY)

    for i in LOCALFILENAME:
        with open(i, "wb") as my_blobs:
            blob_client_instance = blob_service_client_instance.get_blob_client(container=CONTAINERNAME, blob=i, snapshot=None)
            blob_data = blob_client_instance.download_blob()
            blob_data.readinto(my_blobs)
            if i == 'file1.csv':
                file1 = pd.read_csv(i)
            if i == 'file2.csv':
                file2 = pd.read_csv(i)
    
    # load

  
    summary = pd.merge(left=file1, right=file2, on='key', how='inner')
        
    summary.to_csv()

    global service_client
            
    service_client = DataLakeServiceClient(account_url="https://storage.dfs.core.windows.net/", credential='****')
        
    file_system_client = service_client.get_file_system_client(file_system="outputdatalake")

    directory_client = file_system_client.get_directory_client("functionapp") 

    file_client = directory_client.create_file("merged.csv") 

    file_contents = summary.to_csv()

    file_client.upload_data(file_contents, overwrite=True) 

    return("This HTTP triggered function executed successfully.")

我的 JSON 文件(function.json):

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get",
        "post"
      ]
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    }
  ]
}
阿努帕姆·錢德

我能想到的原因有 2 個,這可能是導致您出現問題的原因。

A - 檢查您的 requirements.txt。你所有的 python 庫都應該在那裡。它應該是這樣的。

azure-functions
pandas==1.3.4
azure-storage-blob==12.9.0
azure-storage-file-datalake==12.5.0

B - 接下來,您似乎正在將文件寫入 Functions 工作內存中。這是不允許的,完全沒有必要的。這將解釋為什麼它可以在您的本地計算機中運行,而不能在 Azure 中運行。你可以在不這樣做的情況下實現你想要的。請參閱下面的代碼部分,它應該符合您的目的。我們將 csv 從 blob 加載到數據幀的方式略有變化。

import pandas as pd
import logging
from azure.storage.blob import BlobServiceClient
from azure.storage.filedatalake import DataLakeServiceClient
import azure.functions as func
from io import StringIO

def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')

    STORAGEACCOUNTURL= 'https://storage.blob.core.windows.net/'
    STORAGEACCOUNTKEY= 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx'
    LOCALFILENAME= ['file1.csv', 'file2.csv']
    CONTAINERNAME= 'inputblob'

    file1 = pd.DataFrame()
    file2 = pd.DataFrame()
    #download from blob

    blob_service_client_instance = BlobServiceClient(account_url=STORAGEACCOUNTURL, credential=STORAGEACCOUNTKEY)
    for i in LOCALFILENAME:
            blob_client_instance = blob_service_client_instance.get_blob_client(container=CONTAINERNAME, blob=i, snapshot=None)
            blob_data = blob_client_instance.download_blob()
            if i == 'file1.csv':
                file1 = pd.read_csv(StringIO(blob_data.content_as_text()))
            if i == 'file2.csv':
                file2 = pd.read_csv(StringIO(blob_data.content_as_text()))

    
    # load
    summary = pd.merge(left=file1, right=file2, on='key', how='inner')
    summary.to_csv()

    service_client = DataLakeServiceClient(account_url="https://storage.dfs.core.windows.net/", credential=STORAGEACCOUNTKEY)
    file_system_client = service_client.get_file_system_client(file_system="outputdatalake")
    directory_client = file_system_client.get_directory_client("my-directory") 
    file_client = directory_client.create_file("merged.csv") 
    file_contents = summary.to_csv()
    file_client.upload_data(file_contents, overwrite=True) 

    return("This HTTP triggered function executed successfully.")

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

使用Azure EventHub调用ADF管道

使用 azure devops 部署 ADF 是否会影响环境中的现有管道/作业

使用參數在批處理文件中運行腳本

使用 Pandas Resample 函數的多個腳本的 OHLC

對xml輸出python腳本中的單詞進行排序和計數

如何使用 Google Apps 腳本重新格式化二維數組中的日期?

有沒有辦法在 Google Apps 腳本中每隔幾秒調用一次函數?

將 Apps 腳本函數應用於 Google 表格中的其他行

如何使用腳本在python中安裝特定版本的模塊/庫

腳本無法與 Qlik Sense 中的 STARTS WITH 函數一起使用

當用戶按下回車鍵時,python 腳本中的“輸入”函數在終端中顯示回車符 (^M)

如何在谷歌應用程序腳本中使用另一個函數停止一個函數?

AHK:在窗口/文件資源管理器中選擇後,如何將選定的文件存儲在變量中以使用 FileSelectFile 函數執行腳本?

使用 ES6 模塊從命令行(Node JS)運行腳本中的函數

在數據工廠中使用 Azure Function APP 運行 Python 腳本

在 Python 中:如何在不同的文件函數中使用函數局部變量

為什麼JS在babel腳本中找不到這個函數?

如何“重置”在使用 Flask 應用程序數據庫的 Python 腳本中運行的 db.session?

您可以在 Azure 管道 YAML 文件中的內聯 powershell 腳本中使用函數嗎?

什麼是腳本中的 $pdf 以及如何使用 page_text 函數在頁面 dompdf 內添加文本?

如何使用參數在我的腳本中調用 void?

僅當函數與 bash 腳本中的指定 URL 匹配時,如何運行該函數?

在 Python 腳本中獲取命令行參數的最佳方法

bash 腳本中函數參數周圍的文字雙引號

如何使用bash腳本搜索數組中是否存在類似變量的值?

在使用特殊變量 $1, $# 的 bash 腳本中創建一個 bash 腳本

使用 pine 腳本中的函數循環數組推送

使用管道的多處理腳本

松腳本 hline 函數