Dapper批量插入返回序列号

杰夫·G

我试图在Npgsql上使用Dapper执行大容量插入,该插入返回新插入的行的ID。在我的两个示例中都使用了以下插入语句:

var query = "INSERT INTO \"MyTable\" (\"Value\") VALUES (@Value) RETURNING \"ID\"";

首先,我尝试添加具有“值”属性的对象数组:

var values = new[] {
    new { Value = 0.0 },
    new { Value = 0.5 }
};
var ids = connection.Query<int>(query, values);

但是,该操作失败,并显示NpgsqlException:“错误:42703:列“值”不存在”。读完这个问题之后,我想也许我必须传递一个DataTable对象而不是一个对象数组:

var dataTable = new DataTable();
dataTable.Columns.Add("Value", typeof(double));
dataTable.Rows.Add(0.0);
dataTable.Rows.Add(0.5);
var ids = connection.Query<int>(query, dataTable);

但是,这完全相同,但失败。如何执行批量插入并通过Npgsql从Dapper中获得结果序列号?

我确实注意到异常的大小写与列名不匹配,但是我确定表名和列名周围都有引号,因此我不确定为什么它在表中用“ value”而不是“ Value”例外。只是以为我会提到它,以防它以某种方式与错误相关,因为很容易忽略大小写。

-编辑-

澄清一下,这是创建表的SQL

CREATE TABLE "MyTable" (
    "ID" SERIAL PRIMARY KEY,
    "Value" DOUBLE PRECISION NOT NULL
);

使用上面定义的变量“ query”和“ values”,这是按行工作的代码:

var ids = new List<int>();
foreach (var valueObj in values) {
    var queryParams = new DynamicParamaters();
    queryParams.Add("Value", valueObj.Value);
    ids.AddRange(connection.Query<int>(query, queryParams));
}

问题是我需要能够每秒将数百行(可能在不久的将来插入数千行)到“ MyTable”中,因此等待此循环将每个值迭代地发送到数据库很麻烦,并且(我认为,但是尚未进行基准测试)。此外,我需要对“ MyTable”条目使用外键引用的值进行额外的计算,这可能会或可能不会导致额外的插入。

由于这些问题,我正在寻找一种替代方法,可将所有值在单个语句中发送到数据库,以减少网络流量和处理延迟。再说一次,我还没有对迭代方法进行基准测试……我正在寻找的是做大量插入操作的替代方法,因此我可以将两种方法相互对照。

杰夫·G

最终,我想出了四种解决此问题的方法。我生成了500个随机值以插入MyTable中,并对四种方法(包括启动和回滚运行它的事务)中的每种方法进行计时。在我的测试中,数据库位于localhost上。但是,具有最佳性能的解决方案也只需要与数据库服务器进行一次往返,因此,当我发现最佳解决方案部署到与数据库不同的服务器上时,仍然应该能胜过其他选择。

请注意,变量connectiontransaction在以下代码中使用,并且被假定为有效的Npgsql数据对象。还要注意,符号Nx更慢表示操作花费的时间等于最优解乘以N的时间

方法1(1,494ms =慢18.7x):将数组展开为各个参数

public List<MyTable> InsertEntries(double[] entries)
{
    // Create a variable used to dynamically build the query
    var query = new StringBuilder(
        "INSERT INTO \"MyTable\" (\"Value\") VALUES ");

    // Create the dictionary used to store the query parameters
    var queryParams = new DynamicParameters();

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Add a unique parameter for each id
    var paramIdx = 0;
    foreach (var entry in result)
    {
        var paramName = string.Format("value{1:D6}", paramIdx);
        if (0 < paramIdx++) query.Append(',');
        query.AppendFormat("(:{0})", paramName);
        queryParams.Add(paramName, entry.Value);
    }
    query.Append(" RETURNING \"ID\"");

    // Execute the query, and store the ids
    var ids = connection.Query<int>(query, queryParams, transaction);
    ids.ForEach((id, i) => result[i].ID = id);

    // Return the result
    return result;
}

我真的不确定为什么这是最慢的,因为它只需要单次往返数据库,但是确实如此。

方法2(267毫秒=慢3.3倍):标准循环迭代

public List<MyTable> InsertEntries(double[] entries)
{
    const string query =
        "INSERT INTO \"MyTable\" (\"Value\") VALUES (:val) RETURNING \"ID\"";

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Add each entry to the database
    foreach (var entry in result)
    {
        var queryParams = new DynamicParameters();
        queryParams.Add("val", entry.Value);
        entry.ID = connection.Query<int>(
            query, queryParams, transaction);
    }

    // Return the result
    return result;
}

我感到震惊的是,它仅比最佳解决方案慢3.3倍,但是我希望在实际环境中情况会变得更糟,因为此解决方案需要向服务器串行发送500条消息。但是,这也是最简单的解决方案。

方法3(223ms =慢2.8倍):异步循环迭代

public List<MyTable> InsertEntries(double[] entries)
{
    const string query =
        "INSERT INTO \"MyTable\" (\"Value\") VALUES (:val) RETURNING \"ID\"";

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Add each entry to the database asynchronously
    var taskList = new List<Task<IEnumerable<int>>>();
    foreach (var entry in result)
    {
        var queryParams = new DynamicParameters();
        queryParams.Add("val", entry.Value);
        taskList.Add(connection.QueryAsync<int>(
            query, queryParams, transaction));
    }

    // Now that all queries have been sent, start reading the results
    for (var i = 0; i < result.Count; ++i)
    {
        result[i].ID = taskList[i].Result.First();
    }

    // Return the result
    return result;
}

这会变得更好,但仍不是最佳选择,因为我们只能将与线程池中可用线程一样多的插入排队。但是,这几乎与非线程方法一样简单,因此是速度和可读性之间的良好折衷。

方法4(134毫秒=慢1.7倍):批量插入

这种方法要求在运行下面的代码段之前,定义以下Postgres SQL:

CREATE TYPE "MyTableType" AS (
    "Value" DOUBLE PRECISION
);

CREATE FUNCTION "InsertIntoMyTable"(entries "MyTableType"[])
    RETURNS SETOF INT AS $$

    DECLARE
        insertCmd TEXT := 'INSERT INTO "MyTable" ("Value") '
            'VALUES ($1) RETURNING "ID"';
        entry "MyTableType";
    BEGIN
        FOREACH entry IN ARRAY entries LOOP
            RETURN QUERY EXECUTE insertCmd USING entry."Value";
        END LOOP;
    END;
$$ LANGUAGE PLPGSQL;

和相关代码:

public List<MyTable> InsertEntries(double[] entries)
{
    const string query =
        "SELECT * FROM \"InsertIntoMyTable\"(:entries::\"MyTableType\")";

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Convert each entry into a Postgres string
    var entryStrings = result.Select(
        e => string.Format("({0:E16})", e.Value).ToArray();

    // Create a parameter for the array of MyTable entries
    var queryParam = new {entries = entryStrings};

    // Perform the insert
    var ids = connection.Query<int>(query, queryParam, transaction);

    // Assign each id to the result
    ids.ForEach((id, i) => result[i].ID = id);

    // Return the result
    return result;
}

这种方法有两个问题。首先是我必须对MyTableType成员的顺序进行硬编码。如果顺序更改,则必须修改此代码以使其匹配。第二个是在将所有输入值发送到postgres之前,我必须将所有输入值转换为字符串(在实际代码中,我有不止一列,因此我不能只是将数据库函数的签名更改为两倍) precision [],除非我传入N个数组,其中N是MyTableType上的字段数)。

尽管存在这些缺陷,但这种方法已接近理想状态,只需要往返数据库一次即可。

-开始编辑-

从最初的帖子开始,我想出了四种其他方法,这些方法都比上面列出的方法快。我修改了Nx较慢的数字,以反映以下新的最快方法。

方法5(105毫秒=慢1.3倍):与方法4相同,没有动态查询

此方法与方法4之间的唯一区别是对“ InsertIntoMyTable”函数的以下更改:

CREATE FUNCTION "InsertIntoMyTable"(entries "MyTableType"[])
    RETURNS SETOF INT AS $$

    DECLARE
        entry "MyTableType";
    BEGIN
        FOREACH entry IN ARRAY entries LOOP
            RETURN QUERY INSERT INTO "MyTable" ("Value")
                VALUES (entry."Value") RETURNING "ID";
        END LOOP;
    END;
$$ LANGUAGE PLPGSQL;

除了方法4的问题外,它的缺点还在于,在生产环境中,“ MyTable”已分区。使用这种方法,每个目标分区都需要一种方法。

方法#6(89ms = 1.1x慢):使用数组参数插入语句

public List<MyTable> InsertEntries(double[] entries)
{
    const string query =
        "INSERT INTO \"MyTable\" (\"Value\") SELECT a.* FROM " +
            "UNNEST(:entries::\"MyTableType\") a RETURNING \"ID\"";

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Convert each entry into a Postgres string
    var entryStrings = result.Select(
        e => string.Format("({0:E16})", e.Value).ToArray();

    // Create a parameter for the array of MyTable entries
    var queryParam = new {entries = entryStrings};

    // Perform the insert
    var ids = connection.Query<int>(query, queryParam, transaction);

    // Assign each id to the result
    ids.ForEach((id, i) => result[i].ID = id);

    // Return the result
    return result;
}

唯一的缺点与方法4的第一个问题相同即,它将实现与的顺序耦合在一起"MyTableType"不过,我发现这是我第二喜欢的方法,因为它非常快,并且不需要任何数据库功能即可正常工作。

方法7(80毫秒=稍慢):与方法1相同,但没有参数

public List<MyTable> InsertEntries(double[] entries)
{
    // Create a variable used to dynamically build the query
    var query = new StringBuilder(
        "INSERT INTO \"MyTable\" (\"Value\") VALUES");

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Add each row directly into the insert statement
    for (var i = 0; i < result.Count; ++i)
    {
        entry = result[i];
        query.Append(i == 0 ? ' ' : ',');
        query.AppendFormat("({0:E16})", entry.Value);
    }
    query.Append(" RETURNING \"ID\"");

    // Execute the query, and store the ids
    var ids = connection.Query<int>(query, null, transaction);
    ids.ForEach((id, i) => result[i].ID = id);

    // Return the result
    return result;
}

这是我最喜欢的方法。它仅比最快的速度慢一点(即使有4000条记录,它仍在1秒内运行),但是不需要特殊的数据库功能或类型。我唯一不喜欢的是我必须对双精度值进行字符串化,然后再由Postgres进行解析。最好以二进制形式发送值,以便它们占用8个字节,而不是我为它们分配的大量20字节左右。

方法#8(80ms):与#5相同,但使用纯SQL

此方法与方法#5之间的唯一区别是对“ InsertIntoMyTable”函数的以下更改:

CREATE FUNCTION "InsertIntoMyTable"(
    entries "MyTableType"[]) RETURNS SETOF INT AS $$

    INSERT INTO "MyTable" ("Value")
        SELECT a.* FROM UNNEST(entries) a RETURNING "ID";
$$ LANGUAGE SQL;

像#5一样,这种方法每个“ MyTable”分区都需要一个函数这是最快的,因为查询计划可以为每个功能生成一次,然后重新使用。在其他方法中,必须先解析查询,然后计划查询,然后执行查询。尽管这是最快的方法,但由于方法7对数据库方面的附加要求,因此我没有选择它,但是速度收益很少。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章