我试图为Java并发问题找到一个不太笨拙的解决方案。
问题的要点是,当仍然有工作线程处于活动状态时,我需要一个shutdown调用来阻塞,但关键的方面是工作任务都是异步生成和完成的,因此保留和释放必须由不同的线程完成。他们的工作完成后,我需要他们以某种方式向关闭线程发送信号。只是为了使事情变得更有趣,工作线程不能互相阻塞,因此我不确定在此特定实例中信号灯的应用。
我有一个可以安全地完成工作的解决方案,但是我对Java并发实用程序不熟悉,这使我认为可能存在更简单或更优雅的模式。在这方面的任何帮助将不胜感激。
到目前为止,这里是我所拥有的,除了评论之外,还相当稀疏:
final private ReentrantReadWriteLock shutdownLock = new ReentrantReadWriteLock();
volatile private int activeWorkerThreads;
private boolean isShutdown;
private void workerTask()
{
try
{
// Point A: Worker tasks mustn't block each other.
shutdownLock.readLock().lock();
// Point B: I only want worker tasks to continue if the shutdown signal
// hasn't already been received.
if (isShutdown)
return;
activeWorkerThreads ++;
// Point C: This async method call returns immediately, soon after which
// we release our lock. The shutdown thread may then acquire the write lock
// but we want it to continue blocking until all of the asynchronous tasks
// have completed.
executeAsynchronously(new Runnable()
{
@Override
final public void run()
{
try
{
// Do stuff.
}
finally
{
// Point D: Release of shutdown thread loop, if there are no other
// active worker tasks.
activeWorkerThreads --;
}
}
});
}
finally
{
shutdownLock.readLock().unlock();
}
}
final public void shutdown()
{
try
{
// Point E: Shutdown thread must block while any worker threads
// have breached Point A.
shutdownLock.writeLock().lock();
isShutdown = true;
// Point F: Is there a better way to wait for this signal?
while (activeWorkerThreads > 0)
;
// Do shutdown operation.
}
finally
{
shutdownLock.writeLock().unlock();
}
}
在此先感谢您的帮助!
拉斯
您可以在这种情况下使用信号量,而不需要忙于等待shutdown()调用。想到它的方法是将一组票发给工作人员以表明他们在飞行中。如果shutdown()方法可以获取所有票证,那么它将知道它已耗尽所有工作人员并且没有任何活动。因为#acquire()是阻塞调用,所以shutdown()不会旋转。我已经将这种方法用于分布式的master-worker库,并且可以轻松地将其扩展为处理超时和重试。
Executor executor = // ...
final int permits = // ...
final Semaphore semaphore = new Semaphore(permits);
void schedule(final Runnable task) {
semaphore.acquire();
try {
executor.execute(new Runnable() {
@Override public run() {
try {
task.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
throw e;
}
}
void shutDown() {
semaphore.acquireUninterruptibly(permits);
// do stuff
}
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句