在主要位置上传文件,同时将同一文件读写到多个次要位置

哈里辛格·拉杰普特

我需要完成以下任务:-

1)。在主要位置上传文件:-

我想从文件中读取并将其写入主要位置(远程文件服务器)。

2)。在多个辅助位置上传文件:-

在写入主要位置的同时运行,同时我想从主要位置文件中读取一些字节块并将其写入多个辅助位置。

我为上述方法尝试了以下程序:-

BufferedInputStream bin = null;
        ReadableByteChannel channel = null;
        int bufferSize = 1048576;
        int readBufferSize = 1024*4;
        java.nio.ByteBuffer byteBuffer = java.nio.ByteBuffer.allocate(readBufferSize);
        InputStream is = new FileInputStream(new File("D:\\Harisingh\\300MB.txt"));

        bin = new BufferedInputStream(is,bufferSize);
        channel = Channels.newChannel(bin);
        int retryCnt = 0;
        ByteArrayOutputStream baOS = new ByteArrayOutputStream(bufferSize);
        int totalBytes=0;
        int itrCount=0;
        int maxIterateCnt = 1;
        int len;
        //primary location writing
        SmbFile smbFile = new SmbFile("smb://user:[email protected]/data/Harisingh/collab_4_1_4/primary.txt");
        BufferedOutputStream bFout = new BufferedOutputStream(new SmbFileOutputStream(smbFile));

        SmbFileInputStream fis = new SmbFileInputStream("smb://user:[email protected]/data/Harisingh/collab_4_1_4/primary.txt");
        BufferedInputStream binPrimary = new BufferedInputStream(fis);

        SmbFileOutputStream secLocation1= new SmbFileOutputStream(new SmbFile("smb://user:[email protected]/data/Harisingh/collab_4_1_4/Secondary1.txt"));
        SmbFileOutputStream secLocation2 = new SmbFileOutputStream(new SmbFile("smb://user:[email protected]/data/Harisingh/collab_4_1_4/Secondary2.txt"));
        SmbFileOutputStream secLocation3 = new SmbFileOutputStream(new SmbFile("smb://user:[email protected]/data/Harisingh/Secondary/Secondary3.txt"));
        try {
            if(bufferSize > readBufferSize){
                maxIterateCnt = bufferSize/readBufferSize;
            }
            while((len=channel.read(byteBuffer))>=0) 
            {
                itrCount++;
                totalBytes+=len;
                baOS.write(byteBuffer.array(),0,len);
                if(itrCount>=maxIterateCnt)
                {
                    //primary location writing
                    try{
                        bFout.write(baOS.toByteArray(),0,totalBytes);
                    }catch(Exception se)
                    {
                    }

                    // secondary location writing
                    new Thread(){
                           public void run(){
                              System.out.println("Thread Running");
                              try {
                                int count;
                                byte[] readByteArray = new byte[1024*4];
                                while ((count = binPrimary.read(readByteArray)) != -1)
                                    {
                                        secLocation1.write(readByteArray, 0, count);
                                        secLocation2.write(readByteArray, 0, count);
                                        secLocation3.write(readByteArray, 0, count);
                                        readByteArray = new byte[1024*4];
                                        count= 0;
                                    }
                            } catch (IOException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                          }
                      }.start();
                    totalBytes=0;
                    baOS.reset();
                    itrCount=0;
                }
                byteBuffer.clear();
            }

            //primary location writing
            try{
                bFout.write(baOS.toByteArray(),0,totalBytes);
            }catch(Exception se)
            {
            }
            bFout.flush();
            bFout.close();
            int count;
            // secondary location writing
            new Thread(){
                public void run(){
                  System.out.println("Thread Running");
                  try {
                    int count;
                    byte[] readByteArray = new byte[1024*4];
                    while ((count = binPrimary.read(readByteArray)) != -1)
                    {
                            secLocation1.write(readByteArray, 0, count);
                            secLocation2.write(readByteArray, 0, count);
                            secLocation3.write(readByteArray, 0, count);
                            readByteArray = new byte[1024*4];
                            count= 0;
                    }
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                }
              }.start();

现在使用上面的程序,它通过主线程将文件写入主要位置,辅助位置写入在单独的线程中运行,但由于多线程,我面临一些次要位置丢失某些字节写入的问题。

供参考

此问题仅与 io 流有关。它不是特定于 JCIFS,因此您可以使用具有简单 io 流的相同程序,不需要 smb io 流。你能帮我解决这个问题吗?

约凯里

这是一个我不鼓励“按原样”使用的示例 - 它的目的是作为概念证明。在示例中,首先完成主要过程以实现该阶段的最佳性能。然后每个辅助节点都在自己的 Thread 并行中完成。

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.UnknownHostException;

import jcifs.smb.NtlmPasswordAuthentication;
import jcifs.smb.SmbException;
import jcifs.smb.SmbFile;
import jcifs.smb.SmbFileInputStream;
import jcifs.smb.SmbFileOutputStream;

public class testSmb {

    static boolean append = true;
    static int threadCount = 0;

    static int bufferSize = 2048;

    static NtlmPasswordAuthentication auth;

    static File localFile;

    static SmbFile primarySmbFile;
    static BufferedInputStream input;
    static SmbFileOutputStream output;

    static SmbFile secondary1SmbFile;
    static BufferedInputStream sec1Input;
    static SmbFileOutputStream sec1Output;

    static SmbFile secondary2SmbFile;
    static BufferedInputStream sec2Input;
    static SmbFileOutputStream sec2Output;

    static SmbFile secondary3SmbFile;
    static BufferedInputStream sec3Input;
    static SmbFileOutputStream sec3Output;

    public static Object lock = new Object();

    public static void main(String... args) throws IOException {
        System.out.println("Main thread Started");
        init();
        write(input, output);
        writeInThread(sec1Input, sec1Output);
        writeInThread(sec2Input, sec2Output);
        writeInThread(sec3Input, sec3Output);

        System.out.println("Main thread Finished");
    }

    public static void init() throws MalformedURLException,
            FileNotFoundException, SmbException, UnknownHostException {

        localFile = new File("c:\\temp\\myFile.txt");
        if (localFile.length() > 20971520l) {
            bufferSize = 131072;
        }

        String server = "myServer";
        String username = "myUser";
        String password = "myPass";
        String path = "myPath";
        auth = new NtlmPasswordAuthentication(server, username, password);

        input = new BufferedInputStream(new FileInputStream(localFile));
        primarySmbFile = new SmbFile("smb://" + server + "/" + path
                + "/primary.txt", auth, SmbFile.FILE_SHARE_READ
                | SmbFile.FILE_SHARE_WRITE | SmbFile.FILE_SHARE_DELETE);
        output = new SmbFileOutputStream(primarySmbFile, append);
        if (!primarySmbFile.exists()) {
            primarySmbFile.createNewFile();
        }

        sec1Input = new BufferedInputStream(new SmbFileInputStream(new SmbFile(
                primarySmbFile, primarySmbFile.getName())));
        secondary1SmbFile = new SmbFile("smb://" + server + "/" + path
                + "/secondary1.txt", auth, SmbFile.FILE_SHARE_READ
                | SmbFile.FILE_SHARE_WRITE | SmbFile.FILE_SHARE_DELETE);
        sec1Output = new SmbFileOutputStream(secondary1SmbFile, append);
        if (!secondary1SmbFile.exists()) {
            secondary1SmbFile.createNewFile();
        }

        sec2Input = new BufferedInputStream(new SmbFileInputStream(new SmbFile(
                primarySmbFile, primarySmbFile.getName())));
        secondary2SmbFile = new SmbFile("smb://" + server + "/" + path
                + "/secondary2.txt", auth, SmbFile.FILE_SHARE_READ
                | SmbFile.FILE_SHARE_WRITE | SmbFile.FILE_SHARE_DELETE);
        sec2Output = new SmbFileOutputStream(secondary2SmbFile, append);
        if (!secondary2SmbFile.exists()) {
            secondary2SmbFile.createNewFile();
        }

        sec3Input = new BufferedInputStream(new SmbFileInputStream(new SmbFile(
                primarySmbFile, primarySmbFile.getName())));
        secondary3SmbFile = new SmbFile("smb://" + server + "/" + path
                + "/secondary3.txt", auth, SmbFile.FILE_SHARE_READ
                | SmbFile.FILE_SHARE_WRITE | SmbFile.FILE_SHARE_DELETE);
        sec3Output = new SmbFileOutputStream(secondary3SmbFile, append);
        if (!secondary3SmbFile.exists()) {
            secondary3SmbFile.createNewFile();
        }

    }

    public static void write(BufferedInputStream bufferedInputStream,
            SmbFileOutputStream smbFileOutputStream) throws IOException {

        byte[] buffer = new byte[bufferSize];
        int len = 0;

        try {

            while ((len = bufferedInputStream.read(buffer)) > 0) {
                synchronized (lock) {
                    System.out.println("'" + Thread.currentThread().getName()
                            + "' writing " + bufferSize + "bytes");
                    smbFileOutputStream.write(buffer, 0, len);
                    smbFileOutputStream.flush();
                }
            }

        } catch (IOException e) {
            throw e;
        } finally {
            try {
                bufferedInputStream.close();
            } catch (Exception e) {
            }

            try {
                smbFileOutputStream.flush();
                smbFileOutputStream.close();
            } catch (Exception e) {
            }
        }

    }

    public static void writeInThread(
            final BufferedInputStream bufferedInputStream,
            final SmbFileOutputStream smbFileOutputStream) {
        threadCount++;

        new Thread("Secondary thread " + threadCount) {
            public void run() {
                System.out.println(Thread.currentThread().getName()
                        + ": started");
                try {
                    write(bufferedInputStream, smbFileOutputStream);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()
                        + ": finished");
            }
        }.start();

    }
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章