并行运行while(resultset.next())

在Java 7中。

我有一个查询。

String sql = "select path, job_id from job_table where status = 'NEW' and rownum <= 5 order by created_date"; 


 ResultSet rows = null; 
 try 
 {  
      conn = getConnection();
      preparedStatement = conn.preparedStatement(sql);
     rows  =preparedStatement.executeQuery();

      while(rows.next())
     {  

  // do stuff or call respective methods depending on the values received from resultset.

//假设行的值为1或2或3,则

 if(rows.contains(1))
    doAnalysis1(param1);

 if(rows.contains(2))
    doAnalysis2(param1);


 if(rows.contains(3))
    doAnalysis3(param1);

   }
 }

现在我要做并行处理。以前,当我实际运行我的应用程序时,它会完成每一行的其他功能,然后处理下一行。但是,如果我得到3到4行,那么它应该一次全部处理。如何实现?任何例子都会对我有帮助。

丹·阿姆斯特朗

针对Java 7编辑:

private static final int NUM_THREADS = 8; // Can get from runtime to scale to number of cores
private static final ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);

String sql = "select path, job_id from job_table where status = 'NEW' and rownum <= 5 order by created_date"; 

ResultSet rows = null; 
try 
{  
    conn = getConnection();
    preparedStatement = conn.preparedStatement(sql);
    rows  =preparedStatement.executeQuery();

    List<Future<?>> futures = new ArrayList<>();
    while(rows.next())
    {
        // Getting values from ResultSet here, to not assume thread-safety of ResultSet
        final String path = rows.getString("path");
        final int job_id = rows.getInt("job_id");
        futures.add(executor.submit(new Runnable() {
            @Override
            public void run() {
                // do stuff or call respective methods depending on the values received from resultset.
            }
        });
    }
    // Wait until all finished
    for(Future<?> future : futures) future.get();
}

一些注意事项:

  • 您可能需要终止执行程序服务才能关闭
  • 如果您需要从“做事”中获取结果,则可以使用Callable接口。
  • 您将在try / catch中遇到一些例外情况。
  • 您的情况下NUM_THREADS个可能是5个?
  • run()中的所有已检查异常都可以包装在RuntimeException中,然后重新抛出。使用Callable不需要此。

编辑-这是做事的另一种顺序,您的数据库连接较早发布:

private static final int NUM_THREADS = 8; // Can get from runtime to scale to number of cores
private static final ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);

String sql = "select path, job_id from job_table where status = 'NEW' and rownum <= 5 order by created_date"; 

try {
    List<Future<?>> futures = new ArrayList<>();
    try (
        Connection conn = getConnection();
        PreparedStatement preparedStatement = conn.preparedStatement(sql);
        ResultSet rows = preparedStatement.executeQuery()
    ) {
        while(rows.next())
        {
            // Getting values from ResultSet here, to not assume thread-safety of ResultSet
            final String path = rows.getString("path");
            final int job_id = rows.getInt("job_id");
            futures.add(executor.submit(new Runnable() {
                @Override
                public void run() {
                    // do stuff or call respective methods depending on the values received from resultset.
                }
            });
        }
    }
    // Wait until all finished, database is already release while background processing is still ongoing
    for(Future<?> future : futures) future.get();
} catch(...) {
    ...
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章