我正在嘗試運行以下流程:
但它似乎加載了空數據。這是代碼:
func (c *Client) Do(ctx context.Context) error {
bqClient, err := bigquerypkg.NewBigQueryUtil(ctx, "projectID", "datasetID")
if err != nil {
return err
}
data, err := c.GetSomeData(ctx)
if err != nil {
return err
}
file, err := os.Create("example.csv")
if err != nil {
return err
}
defer file.Close()
// also file need to be delete
writer := csv.NewWriter(file)
defer writer.Flush()
timestamp := time.Now().UTC().Format("2006-01-02 03:04:05.000000000")
for _, d := range data {
csvRow := []string{
d.ID,
d.Name,
timestamp,
}
err = writer.Write(csvRow)
if err != nil {
log.Printf("error writing data to CSV: %v\n", err)
}
}
source := bigquery.NewReaderSource(file)
source.Schema = bigquery.Schema{
{Name: "id", Type: bigquery.StringFieldType},
{Name: "name", Type: bigquery.StringFieldType},
{Name: "createdAt", Type: bigquery.TimestampFieldType},
}
if _, err = bqClient.LoadCsv(ctx, "tableID", source); err != nil {
return err
}
return nil
}
LoadCSV()
看起來像這樣:
func (c *Client) LoadCsv(ctx context.Context, tableID string, src bigquery.LoadSource) (string, error) {
loader := c.bigQueryClient.Dataset(c.datasetID).Table(tableID).LoaderFrom(src)
loader.WriteDisposition = bigquery.WriteTruncate
job, err := loader.Run(ctx)
if err != nil {
return "", err
}
status, err := job.Wait(ctx)
if err != nil {
return job.ID(), err
}
if status.Err() != nil {
return job.ID(), fmt.Errorf("job completed with error: %v", status.Err())
}
return job.ID(), nil
}
運行此操作後,bigquery 確實創建了架構但沒有數據。如果我要更改os.Create()
為os.Open()
並且文件已經存在,則一切正常。就像加載CSV時文件數據尚未寫入(?)是什麼原因?
我在這裡看到的問題是您沒有將文件句柄的光標倒回到文件的開頭。因此,下一個讀將在端的文件,並且將一個0字節讀取。這就解釋了為什麼文件中似乎沒有內容。
https://pkg.go.dev/os#File.Seek可以為您處理。
實際上,Flush
不相關,因為您使用相同的文件句柄來讀取文件而不是寫入文件,因此即使沒有刷新,您也會看到自己寫入的字節。如果文件由不同的進程打開或重新打開,則不會出現這種情況。
示範:
package main
import (
"fmt"
"io"
"os"
)
func main() {
f, err := os.CreateTemp("", "data.csv")
if err != nil {
panic(err)
} else {
defer f.Close()
defer os.Remove(f.Name())
}
fmt.Fprintf(f, "hello, world")
fmt.Fprintln(os.Stderr, "Before rewind: ")
if _, err := io.Copy(os.Stderr, f); err != nil {
panic(err)
}
f.Seek(0, io.SeekStart)
fmt.Fprintln(os.Stderr, "\nAfter rewind: ")
if _, err := io.Copy(os.Stderr, f); err != nil {
panic(err)
}
fmt.Fprintln(os.Stderr, "\n")
}
% go run t.go
Before rewind:
After rewind:
hello, world
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句