Java:同时写入和读取文件

我的名字叫杰夫

这实际上是一个设计问题。而且我不确定在这里读写文件是否是理想的解决方案。尽管如此,我将在下面概述我要执行的操作:我有以下静态方法,一旦调用reqStreamingData方法obj,它将开始以150毫秒的速率不断地从客户端服务器检索数据。

    public static void streamingDataOperations(ClientSocket cs) throws InterruptedException, IOException{
        // call - retrieve streaming data constantly from client server, 
        // and write a line in the csv file at a rate of 150 milliseconds
        // using bufferedWriter and printWriter (print method).
        // Note that the flush method of bufferedWriter is never called,
        // I would assume the data is in fact being written in buffered memory
        // not the actual file. 
       cs.reqStreamingData(output_file); // <- this method comes from client's API.

       // I would like to another thread (aka data processing thread) which repeats itself every 15 minutes.
       // I am aware I can do that by creating a class that extends TimeTask and fix a schedule
       // Now when this thread runs, there are things I want to do. 
       // 1. flush last 15 minutes of data to the output_file (Note no synchronized statement method or statements are used here, hence no object is being locked.)
       // 2. process the data in R
       // 3. wait for the output in R to come back 
       // 4. clear file contents, so that it always store data that only occurs in the last 15 minutes
    }

现在,我不太熟悉多线程。我担心的是

  1. 请求数据线程和数据处理线程正在同时读写文件,但是速率不同,我不确定数据处理线程是否会大大延迟请求数据线程,因为数据处理的计算量更大比请求数据线程要执行的繁重任务。但是,鉴于它们是2个独立的线程,在这里会发生任何错误或异常吗?
  2. 我不太支持同时写入和读取同一文件的想法,但是因为我必须使用R来实时处理和存储R的数据帧中的数据,所以我真的想不出其他方法来实现此目的。有更好的选择吗?
  3. 有没有更好的设计来解决这个问题?

我知道这是一个漫长的问题。如果您需要更多信息,请告诉我。

Yegodm

可以将这些行(CSV或任何其他文本)写入一个临时文件。当准备好处理时,仅在临时文件被新文件替换时,才需要进行同步。这样可以确保生产者永远不会同时写入消费者正在处理的文件。

完成后,生产者将继续向较新的文件中添加行。使用者将刷新并关闭旧文件,然后按照R应用程序的预期将其移动到文件中。

为了进一步阐明该方法,下面是一个示例实现:

public static void main(String[] args) throws IOException {
    // in this sample these dirs are supposed to exist
    final String workingDirectory = "./data/tmp";
    final String outputDirectory = "./data/csv";

    final String outputFilename = "r.out";
    final int addIntervalSeconds = 1;
    final int drainIntervalSeconds = 5;

    final FileBasedTextBatch batch = new FileBasedTextBatch(Paths.get(workingDirectory));
    final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

    final ScheduledFuture<?> producer = executor.scheduleAtFixedRate(
        () -> batch.add(
            // adding formatted date/time to imitate another CSV line
            LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME)
        ),
        0, addIntervalSeconds, TimeUnit.SECONDS);

    final ScheduledFuture<?> consumer = executor.scheduleAtFixedRate(
        () -> batch.drainTo(Paths.get(outputDirectory, outputFilename)),
        0, drainIntervalSeconds, TimeUnit.SECONDS);

    try {
        // awaiting some limited time for demonstration 
        producer.get(30, TimeUnit.SECONDS);
    }
    catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    catch (ExecutionException e) {
        System.err.println("Producer failed: " + e);
    }
    catch (TimeoutException e) {
        System.out.println("Finishing producer/consumer...");
        producer.cancel(true);
        consumer.cancel(true);
    }
    executor.shutdown();
}

static class FileBasedTextBatch {
    private final Object lock = new Object();
    private final Path workingDir;
    private Output output;

    public FileBasedTextBatch(Path workingDir) throws IOException {
        this.workingDir = workingDir;
        output = new Output(this.workingDir);
    }

    /**
     * Adds another line of text to the batch.
     */
    public void add(String textLine) {
        synchronized (lock) {
            output.writer.println(textLine);
        }
    }

    /**
     * Moves currently collected batch to the file at the specified path.
     * The file will be overwritten if exists.
     */
    public void drainTo(Path targetPath) {
        try {
            final long startNanos = System.nanoTime();
            final Output output = getAndSwapOutput();
            final long elapsedMillis =
                TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
            System.out.printf("Replaced the output in %d millis%n", elapsedMillis);
            output.close();
            Files.move(
                output.file,
                targetPath,
                StandardCopyOption.ATOMIC_MOVE,
                StandardCopyOption.REPLACE_EXISTING
            );
        }
        catch (IOException e) {
            System.err.println("Failed to drain: " + e);
            throw new IllegalStateException(e);
        }
    }

    /**
     * Replaces the current output with the new one, returning the old one.
     * The method is supposed to execute very quickly to avoid delaying the producer thread.
     */
    private Output getAndSwapOutput() throws IOException {
        synchronized (lock) {
            final Output prev = this.output;
            this.output = new Output(this.workingDir);
            return prev;
        }
    }
}

static class Output {
    final Path file;
    final PrintWriter writer;

    Output(Path workingDir) throws IOException {
        // performs very well on local filesystems when working directory is empty;
        // if too slow, maybe replaced with UUID based name generation
        this.file = Files.createTempFile(workingDir, "csv", ".tmp");
        this.writer = new PrintWriter(Files.newBufferedWriter(this.file));
    }

    void close() {
        if (this.writer != null)
            this.writer.flush();
            this.writer.close();
    }
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章