通过在slick 3中批量插入每五秒钟插入数千条记录时,我得到
org.postgresql.util.PSQLException: FATAL: sorry, too many clients already
我的数据访问层如下所示:
val db: CustomPostgresDriver.backend.DatabaseDef = Database.forURL(url, user=user, password=password, driver= jdbcDriver)
override def insertBatch(rowList: List[T#TableElementType]): Future[Long] = {
val res = db.run(insertBatchQuery(rowList)).map(_.head.toLong).recover{ case ex:Throwable=> RelationalRepositoryUtility.handleBatchOperationErrors(ex)}
//db.close()
res
}
override def insertBatchQuery(rowList: List[T#TableElementType]): FixedSqlAction[Option[Int], NoStream, Write] = {
query ++= (rowList)
}
在插入批处理中关闭连接无效...它仍然会给出相同的错误。
我正在从我的代码中像这样调用插入批处理:
val temp1 = list1.flatMap { li =>
Future.sequence(li.map { trip =>
val data = for {
tripData <- TripDataRepository.insertQuery( trip.tripData)
subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData, tripData.id))
} yield ((tripData, subTripData))
val res=db.run(data.transactionally)
res
//db.close()
})
}
如果我在这里工作后关闭连接,如注释代码所示,我会收到错误消息:
java.util.concurrent.RejectedExecutionException: Task slick.backend.DatabaseComponent$DatabaseDef$$anon$2@6c3ae2b6 rejected from java.util.concurrent.ThreadPoolExecutor@79d2d4eb[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
像这样调用没有Future.sequence的方法后:
val temp1 =list.map { trip =>
val data = for {
tripData <- TripDataRepository.insertQuery( trip.tripData)
subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData, tripData.id))
} yield ((tripData, subTripData))
val res=db.run(data.transactionally)
res
}
我仍然有太多客户错误...
此问题的根源在于,您正在同时旋转一个无限制的列表Future
,每个列表都连接到数据库-中的每个条目一个list
。
这可以通过以串行方式运行插入来解决,强制每个插入批处理依赖于先前的插入:
// Empty Future for the results. Replace Unit with the correct type - whatever
// "res" is below.
val emptyFuture = Future.successful(Seq.empty[Unit])
// This will only insert one at a time. You could use list.sliding to batch the
// inserts if that was important.
val temp1 = list.foldLeft(emptyFuture) { (previousFuture, trip) =>
previousFuture flatMap { previous =>
// Inner code copied from your example.
val data = for {
tripData <- TripDataRepository.insertQuery(trip.tripData)
subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData, tripData.id))
} yield ((tripData, subTripData))
val res = db.run(data.transactionally)
previous :+ res
}
}
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句