我试图在Npgsql上使用Dapper执行大容量插入,该插入返回新插入的行的ID。在我的两个示例中都使用了以下插入语句:
var query = "INSERT INTO \"MyTable\" (\"Value\") VALUES (@Value) RETURNING \"ID\"";
首先,我尝试添加具有“值”属性的对象数组:
var values = new[] {
new { Value = 0.0 },
new { Value = 0.5 }
};
var ids = connection.Query<int>(query, values);
但是,该操作失败,并显示NpgsqlException:“错误:42703:列“值”不存在”。读完这个问题之后,我想也许我必须传递一个DataTable对象而不是一个对象数组:
var dataTable = new DataTable();
dataTable.Columns.Add("Value", typeof(double));
dataTable.Rows.Add(0.0);
dataTable.Rows.Add(0.5);
var ids = connection.Query<int>(query, dataTable);
但是,这完全相同,但失败。如何执行批量插入并通过Npgsql从Dapper中获得结果序列号?
我确实注意到异常的大小写与列名不匹配,但是我确定表名和列名周围都有引号,因此我不确定为什么它在表中用“ value”而不是“ Value”例外。只是以为我会提到它,以防它以某种方式与错误相关,因为很容易忽略大小写。
-编辑-
澄清一下,这是创建表的SQL
CREATE TABLE "MyTable" (
"ID" SERIAL PRIMARY KEY,
"Value" DOUBLE PRECISION NOT NULL
);
使用上面定义的变量“ query”和“ values”,这是按行工作的代码:
var ids = new List<int>();
foreach (var valueObj in values) {
var queryParams = new DynamicParamaters();
queryParams.Add("Value", valueObj.Value);
ids.AddRange(connection.Query<int>(query, queryParams));
}
问题是我需要能够每秒将数百行(可能在不久的将来插入数千行)到“ MyTable”中,因此等待此循环将每个值迭代地发送到数据库很麻烦,并且(我认为,但是尚未进行基准测试)。此外,我需要对“ MyTable”条目使用外键引用的值进行额外的计算,这可能会或可能不会导致额外的插入。
由于这些问题,我正在寻找一种替代方法,可将所有值在单个语句中发送到数据库,以减少网络流量和处理延迟。再说一次,我还没有对迭代方法进行基准测试……我正在寻找的是做大量插入操作的替代方法,因此我可以将两种方法相互对照。
最终,我想出了四种解决此问题的方法。我生成了500个随机值以插入MyTable中,并对四种方法(包括启动和回滚运行它的事务)中的每种方法进行计时。在我的测试中,数据库位于localhost上。但是,具有最佳性能的解决方案也只需要与数据库服务器进行一次往返,因此,当我发现最佳解决方案部署到与数据库不同的服务器上时,仍然应该能胜过其他选择。
请注意,变量connection
和transaction
在以下代码中使用,并且被假定为有效的Npgsql数据对象。还要注意,符号Nx更慢表示操作花费的时间等于最优解乘以N的时间。
方法1(1,494ms =慢18.7x):将数组展开为各个参数
public List<MyTable> InsertEntries(double[] entries)
{
// Create a variable used to dynamically build the query
var query = new StringBuilder(
"INSERT INTO \"MyTable\" (\"Value\") VALUES ");
// Create the dictionary used to store the query parameters
var queryParams = new DynamicParameters();
// Get the result set without auto-assigned ids
var result = entries.Select(e => new MyTable { Value = e }).ToList();
// Add a unique parameter for each id
var paramIdx = 0;
foreach (var entry in result)
{
var paramName = string.Format("value{1:D6}", paramIdx);
if (0 < paramIdx++) query.Append(',');
query.AppendFormat("(:{0})", paramName);
queryParams.Add(paramName, entry.Value);
}
query.Append(" RETURNING \"ID\"");
// Execute the query, and store the ids
var ids = connection.Query<int>(query, queryParams, transaction);
ids.ForEach((id, i) => result[i].ID = id);
// Return the result
return result;
}
我真的不确定为什么这是最慢的,因为它只需要单次往返数据库,但是确实如此。
方法2(267毫秒=慢3.3倍):标准循环迭代
public List<MyTable> InsertEntries(double[] entries)
{
const string query =
"INSERT INTO \"MyTable\" (\"Value\") VALUES (:val) RETURNING \"ID\"";
// Get the result set without auto-assigned ids
var result = entries.Select(e => new MyTable { Value = e }).ToList();
// Add each entry to the database
foreach (var entry in result)
{
var queryParams = new DynamicParameters();
queryParams.Add("val", entry.Value);
entry.ID = connection.Query<int>(
query, queryParams, transaction);
}
// Return the result
return result;
}
我感到震惊的是,它仅比最佳解决方案慢3.3倍,但是我希望在实际环境中情况会变得更糟,因为此解决方案需要向服务器串行发送500条消息。但是,这也是最简单的解决方案。
方法3(223ms =慢2.8倍):异步循环迭代
public List<MyTable> InsertEntries(double[] entries)
{
const string query =
"INSERT INTO \"MyTable\" (\"Value\") VALUES (:val) RETURNING \"ID\"";
// Get the result set without auto-assigned ids
var result = entries.Select(e => new MyTable { Value = e }).ToList();
// Add each entry to the database asynchronously
var taskList = new List<Task<IEnumerable<int>>>();
foreach (var entry in result)
{
var queryParams = new DynamicParameters();
queryParams.Add("val", entry.Value);
taskList.Add(connection.QueryAsync<int>(
query, queryParams, transaction));
}
// Now that all queries have been sent, start reading the results
for (var i = 0; i < result.Count; ++i)
{
result[i].ID = taskList[i].Result.First();
}
// Return the result
return result;
}
这会变得更好,但仍不是最佳选择,因为我们只能将与线程池中可用线程一样多的插入排队。但是,这几乎与非线程方法一样简单,因此是速度和可读性之间的良好折衷。
方法4(134毫秒=慢1.7倍):批量插入
这种方法要求在运行下面的代码段之前,定义以下Postgres SQL:
CREATE TYPE "MyTableType" AS (
"Value" DOUBLE PRECISION
);
CREATE FUNCTION "InsertIntoMyTable"(entries "MyTableType"[])
RETURNS SETOF INT AS $$
DECLARE
insertCmd TEXT := 'INSERT INTO "MyTable" ("Value") '
'VALUES ($1) RETURNING "ID"';
entry "MyTableType";
BEGIN
FOREACH entry IN ARRAY entries LOOP
RETURN QUERY EXECUTE insertCmd USING entry."Value";
END LOOP;
END;
$$ LANGUAGE PLPGSQL;
和相关代码:
public List<MyTable> InsertEntries(double[] entries)
{
const string query =
"SELECT * FROM \"InsertIntoMyTable\"(:entries::\"MyTableType\")";
// Get the result set without auto-assigned ids
var result = entries.Select(e => new MyTable { Value = e }).ToList();
// Convert each entry into a Postgres string
var entryStrings = result.Select(
e => string.Format("({0:E16})", e.Value).ToArray();
// Create a parameter for the array of MyTable entries
var queryParam = new {entries = entryStrings};
// Perform the insert
var ids = connection.Query<int>(query, queryParam, transaction);
// Assign each id to the result
ids.ForEach((id, i) => result[i].ID = id);
// Return the result
return result;
}
这种方法有两个问题。首先是我必须对MyTableType成员的顺序进行硬编码。如果顺序更改,则必须修改此代码以使其匹配。第二个是在将所有输入值发送到postgres之前,我必须将所有输入值转换为字符串(在实际代码中,我有不止一列,因此我不能只是将数据库函数的签名更改为两倍) precision [],除非我传入N个数组,其中N是MyTableType上的字段数)。
尽管存在这些缺陷,但这种方法已接近理想状态,只需要往返数据库一次即可。
-开始编辑-
从最初的帖子开始,我想出了四种其他方法,这些方法都比上面列出的方法快。我修改了Nx较慢的数字,以反映以下新的最快方法。
方法5(105毫秒=慢1.3倍):与方法4相同,没有动态查询
此方法与方法4之间的唯一区别是对“ InsertIntoMyTable”函数的以下更改:
CREATE FUNCTION "InsertIntoMyTable"(entries "MyTableType"[])
RETURNS SETOF INT AS $$
DECLARE
entry "MyTableType";
BEGIN
FOREACH entry IN ARRAY entries LOOP
RETURN QUERY INSERT INTO "MyTable" ("Value")
VALUES (entry."Value") RETURNING "ID";
END LOOP;
END;
$$ LANGUAGE PLPGSQL;
除了方法4的问题外,它的缺点还在于,在生产环境中,“ MyTable”已分区。使用这种方法,每个目标分区都需要一种方法。
方法#6(89ms = 1.1x慢):使用数组参数插入语句
public List<MyTable> InsertEntries(double[] entries)
{
const string query =
"INSERT INTO \"MyTable\" (\"Value\") SELECT a.* FROM " +
"UNNEST(:entries::\"MyTableType\") a RETURNING \"ID\"";
// Get the result set without auto-assigned ids
var result = entries.Select(e => new MyTable { Value = e }).ToList();
// Convert each entry into a Postgres string
var entryStrings = result.Select(
e => string.Format("({0:E16})", e.Value).ToArray();
// Create a parameter for the array of MyTable entries
var queryParam = new {entries = entryStrings};
// Perform the insert
var ids = connection.Query<int>(query, queryParam, transaction);
// Assign each id to the result
ids.ForEach((id, i) => result[i].ID = id);
// Return the result
return result;
}
唯一的缺点与方法4的第一个问题相同。即,它将实现与的顺序耦合在一起"MyTableType"
。不过,我发现这是我第二喜欢的方法,因为它非常快,并且不需要任何数据库功能即可正常工作。
方法7(80毫秒=稍慢):与方法1相同,但没有参数
public List<MyTable> InsertEntries(double[] entries)
{
// Create a variable used to dynamically build the query
var query = new StringBuilder(
"INSERT INTO \"MyTable\" (\"Value\") VALUES");
// Get the result set without auto-assigned ids
var result = entries.Select(e => new MyTable { Value = e }).ToList();
// Add each row directly into the insert statement
for (var i = 0; i < result.Count; ++i)
{
entry = result[i];
query.Append(i == 0 ? ' ' : ',');
query.AppendFormat("({0:E16})", entry.Value);
}
query.Append(" RETURNING \"ID\"");
// Execute the query, and store the ids
var ids = connection.Query<int>(query, null, transaction);
ids.ForEach((id, i) => result[i].ID = id);
// Return the result
return result;
}
这是我最喜欢的方法。它仅比最快的速度慢一点(即使有4000条记录,它仍在1秒内运行),但是不需要特殊的数据库功能或类型。我唯一不喜欢的是我必须对双精度值进行字符串化,然后再由Postgres进行解析。最好以二进制形式发送值,以便它们占用8个字节,而不是我为它们分配的大量20字节左右。
方法#8(80ms):与#5相同,但使用纯SQL
此方法与方法#5之间的唯一区别是对“ InsertIntoMyTable”函数的以下更改:
CREATE FUNCTION "InsertIntoMyTable"(
entries "MyTableType"[]) RETURNS SETOF INT AS $$
INSERT INTO "MyTable" ("Value")
SELECT a.* FROM UNNEST(entries) a RETURNING "ID";
$$ LANGUAGE SQL;
像#5一样,这种方法每个“ MyTable”分区都需要一个函数。这是最快的,因为查询计划可以为每个功能生成一次,然后重新使用。在其他方法中,必须先解析查询,然后计划查询,然后执行查询。尽管这是最快的方法,但由于方法7对数据库方面的附加要求,因此我没有选择它,但是速度收益很少。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句