我正在使用从读取消息Kafka
并将其推入的服务Cassandra
。
我使用相同的线程体系结构。
有人说,k threads
从卡夫卡消费话题。这些写入队列,声明为:
public static BlockingQueue<>
现在有许多线程(例如n
)写入Cassandra。这是执行此操作的代码:
public void run(){
LOGGER.log(Level.INFO, "Thread Created: " +Thread.currentThread().getName());
while (!Thread.currentThread().isInterrupted()) {
Thread.yield();
if (!content.isEmpty()) {
try {
JSONObject msg = content.remove();
// JSON
for(String tableName : tableList){
CassandraConnector.getSession().execute(createQuery(tableName, msg));
}
} catch (Exception e) {
}
}
}
}
content
是用于读写操作的BlockingQueue。
我正在扩展Thread
线程实现的类,并且有固定数量的线程可以继续执行,除非被中断。
问题是,这占用了过多的CPU。这是top
命令的第一行:
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
46232 vishran+ 20 0 3010804 188052 14280 S 137.8 3.3 5663:24 java
这是strace
此进程的线程上的输出:
strace -t -p 46322
Process 46322 attached
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
....and so on
我为什么使用Thread.yield()
,是因为这个
如果您需要其他任何调试信息,请告知我。
现在的问题是,如何才能使CPU利用率最小化?
BlockingQueue的整个目的是当它为空时将其阻塞。因此,使用者线程(填充在Cassandra中的那些线程)不必手动检查它们是否为空。您可以只调用take(),如果队列为空,则该调用将阻塞,除非它被中断或有可用的元素。
当线程被阻塞时,调度程序可以在其位置调度其他线程,从而使您不必调用yield()等。请记住,只有当优先级大于或等于正在产生的线程的线程可以运行时,yield()才会让给另一个线程。
public void run(){
LOGGER.log(Level.INFO, "Thread Created: " +Thread.currentThread().getName());
try {
JSONObject msg = content.take();
// JSON
for(String tableName : tableList){
CassandraConnector.getSession().execute(createQuery(tableName, msg));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句