我需要每天处理大量用户列表,以便根据某些情况向他们发送电子邮件和SMS通知。我为此使用Java EE批处理模型。我的Job xml如下:
<step id="sendNotification">
<chunk item-count="10" retry-limit="3">
<reader ref="myItemReader"></reader>
<processor ref="myItemProcessor"></processor>
<writer ref="myItemWriter"></writer>
<retryable-exception-classes>
<include class="java.lang.IllegalArgumentException"/>
</retryable-exception-classes>
</chunk>
</step>
MyItemReader的onOpen方法从数据库中读取所有用户,而readItem()使用列表迭代器一次读取一个用户。在myItemProcessor中,实际的电子邮件通知发送给用户,然后将用户保留在该块的myItemWriter类中的数据库中。
@Named
public class MyItemReader extends AbstractItemReader {
private Iterator<User> iterator = null;
private User lastUser;
@Inject
private MyService service;
@Override
public void open(Serializable checkpoint) throws Exception {
super.open(checkpoint);
List<User> users = service.getUsers();
iterator = users.iterator();
if(checkpoint != null) {
User checkpointUser = (User) checkpoint;
System.out.println("Checkpoint Found: " + checkpointUser.getUserId());
while(iterator.hasNext() && !iterator.next().getUserId().equals(checkpointUser.getUserId())) {
System.out.println("skipping already read users ... ");
}
}
}
@Override
public Object readItem() throws Exception {
User user=null;
if(iterator.hasNext()) {
user = iterator.next();
lastUser = user;
}
return user;
}
@Override
public Serializable checkpointInfo() throws Exception {
return lastUser;
}
}
我的问题是检查点存储了在上一个块中执行的最后一条记录。如果我的下一个10个用户中有一个大块,并且第5个用户的myItemProcessor中引发了异常,则在重试时将执行整个块,并再次处理所有10个用户。我不希望将通知再次发送给已处理的用户。
有办法解决吗?应该如何有效地做到这一点?
任何帮助将不胜感激。谢谢。
我将以@cheng的评论为基础。我在此感谢他,希望我的回答能为组织和有效介绍这些选项提供额外的价值。
正如@cheng指出的那样,失败意味着整个事务都会回滚,并且检查点不会前进。
那么,如何处理您的数据块已向某些用户而非全部用户发送电子邮件的事实呢?(您可能会说它回滚了,但有“副作用”。)
因此,我们可以将您的问题重述为:如何从批处理步骤发送电子邮件?
好吧,假设您有一种方法可以通过事务性API发送电子邮件(实现XAResource等),则可以使用该API。
假设您不这样做,我将对JMS队列进行事务性写入,然后使用单独的MDB发送电子邮件(如@cheng在他的评论中建议的那样)。
使用这种方法,您仍然可以通过批量处理和更新数据库来提高效率(无论如何您一次只发送一封电子邮件),并且可以从简单的检查点和重新启动中受益,而不必编写复杂的错误处理程序。
这也有可能作为批处理作业甚至批处理之外的模式重用。
为了讨论起见,列出了一些我认为不太好的其他想法:
您可以使用ItemProcessListener方法构建自己的成功/失败电子邮件列表,包括afterProcess和onProcessError。
重新启动后,即使您已经发送了一些电子邮件,您仍可以知道在当前块中向哪些用户发送了电子邮件,由于整个块都已回滚,因此我们将其重新定位到该用户。
这肯定会使批处理逻辑复杂化,并且还必须以某种方式保留此成功或失败列表。另外,这种方法可能非常针对此工作(与排队等待MDB进行处理相反)。
但这很简单,因为您只需一个批处理作业,而无需消息传递提供程序和单独的应用程序组件。
如果走这条路线,则可能要同时使用可跳过和“无回滚”可重试异常。
如果使用item-count =“ 1”定义块,则可以避免复杂的检查点和错误处理代码。但是,您牺牲了效率,因此只有在批处理的其他方面非常引人注目时,这才有意义:例如,通过通用接口进行作业的调度和管理,在作业失败的步骤重新启动的能力
如果要走这条路线,您可能需要考虑将套接字和超时异常定义为“无回滚”异常(使用 ),因为回滚没有任何好处,您可能想重试网络超时问题。
既然您特别提到了效率,我想这对您来说不合适。
这也许可以工作,但是批处理API并没有使它变得特别容易,并且您仍然可能遇到块已完成但一个或多个电子邮件发送失败的情况。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句