这实际上是一个设计问题。而且我不确定在这里读写文件是否是理想的解决方案。尽管如此,我将在下面概述我要执行的操作:我有以下静态方法,一旦调用的reqStreamingData
方法obj
,它将开始以150毫秒的速率不断地从客户端服务器检索数据。
public static void streamingDataOperations(ClientSocket cs) throws InterruptedException, IOException{
// call - retrieve streaming data constantly from client server,
// and write a line in the csv file at a rate of 150 milliseconds
// using bufferedWriter and printWriter (print method).
// Note that the flush method of bufferedWriter is never called,
// I would assume the data is in fact being written in buffered memory
// not the actual file.
cs.reqStreamingData(output_file); // <- this method comes from client's API.
// I would like to another thread (aka data processing thread) which repeats itself every 15 minutes.
// I am aware I can do that by creating a class that extends TimeTask and fix a schedule
// Now when this thread runs, there are things I want to do.
// 1. flush last 15 minutes of data to the output_file (Note no synchronized statement method or statements are used here, hence no object is being locked.)
// 2. process the data in R
// 3. wait for the output in R to come back
// 4. clear file contents, so that it always store data that only occurs in the last 15 minutes
}
现在,我不太熟悉多线程。我担心的是
我知道这是一个漫长的问题。如果您需要更多信息,请告诉我。
可以将这些行(CSV或任何其他文本)写入一个临时文件。当准备好处理时,仅在临时文件被新文件替换时,才需要进行同步。这样可以确保生产者永远不会同时写入消费者正在处理的文件。
完成后,生产者将继续向较新的文件中添加行。使用者将刷新并关闭旧文件,然后按照R应用程序的预期将其移动到文件中。
为了进一步阐明该方法,下面是一个示例实现:
public static void main(String[] args) throws IOException {
// in this sample these dirs are supposed to exist
final String workingDirectory = "./data/tmp";
final String outputDirectory = "./data/csv";
final String outputFilename = "r.out";
final int addIntervalSeconds = 1;
final int drainIntervalSeconds = 5;
final FileBasedTextBatch batch = new FileBasedTextBatch(Paths.get(workingDirectory));
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
final ScheduledFuture<?> producer = executor.scheduleAtFixedRate(
() -> batch.add(
// adding formatted date/time to imitate another CSV line
LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME)
),
0, addIntervalSeconds, TimeUnit.SECONDS);
final ScheduledFuture<?> consumer = executor.scheduleAtFixedRate(
() -> batch.drainTo(Paths.get(outputDirectory, outputFilename)),
0, drainIntervalSeconds, TimeUnit.SECONDS);
try {
// awaiting some limited time for demonstration
producer.get(30, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
catch (ExecutionException e) {
System.err.println("Producer failed: " + e);
}
catch (TimeoutException e) {
System.out.println("Finishing producer/consumer...");
producer.cancel(true);
consumer.cancel(true);
}
executor.shutdown();
}
static class FileBasedTextBatch {
private final Object lock = new Object();
private final Path workingDir;
private Output output;
public FileBasedTextBatch(Path workingDir) throws IOException {
this.workingDir = workingDir;
output = new Output(this.workingDir);
}
/**
* Adds another line of text to the batch.
*/
public void add(String textLine) {
synchronized (lock) {
output.writer.println(textLine);
}
}
/**
* Moves currently collected batch to the file at the specified path.
* The file will be overwritten if exists.
*/
public void drainTo(Path targetPath) {
try {
final long startNanos = System.nanoTime();
final Output output = getAndSwapOutput();
final long elapsedMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
System.out.printf("Replaced the output in %d millis%n", elapsedMillis);
output.close();
Files.move(
output.file,
targetPath,
StandardCopyOption.ATOMIC_MOVE,
StandardCopyOption.REPLACE_EXISTING
);
}
catch (IOException e) {
System.err.println("Failed to drain: " + e);
throw new IllegalStateException(e);
}
}
/**
* Replaces the current output with the new one, returning the old one.
* The method is supposed to execute very quickly to avoid delaying the producer thread.
*/
private Output getAndSwapOutput() throws IOException {
synchronized (lock) {
final Output prev = this.output;
this.output = new Output(this.workingDir);
return prev;
}
}
}
static class Output {
final Path file;
final PrintWriter writer;
Output(Path workingDir) throws IOException {
// performs very well on local filesystems when working directory is empty;
// if too slow, maybe replaced with UUID based name generation
this.file = Files.createTempFile(workingDir, "csv", ".tmp");
this.writer = new PrintWriter(Files.newBufferedWriter(this.file));
}
void close() {
if (this.writer != null)
this.writer.flush();
this.writer.close();
}
}
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句