Multiple Threads in Java for ConcurrentHashMap

Daniel Lee

I made a two thread one is get Data and another is save Data. My problem is it is not handled in the process of storing the data read from Thread1.

I want to extract 1,000,000 elements and create them as file. The element size is so big, So i divide a elements size by 100,000. And then, the loop will run a 10 time. One thread reads a data from other server by 100,000. Another thread takes the data from the first thread and writes it to a file.

My Original scenario is below:

First thread read a total key, value size. It will be 100,000 ~ 1,000,000. I would assume that i would process 1,000,000 data.Then Count sets 1,000,000. First Thread divide by 100,000 and read a data from server by 100,000. And then, First Thread calls a setData(Key,Value map). It will loop 10 times.

Second Thread will loop 10 time. first, get a data by calling getMap() method. And It calls writeSeq(hashmap) method. It writes a data to writer stream. It is not yet flush. There is a problem here. It successfully gets a data size by calling getMap(). But, writeSeq method can not process all of size of value. When i gets a size of 100,000, it process as random. It will be 100, 1500, 0, 8203 ...

First Thread is below:

public void run() {
        getValueCount(); //initialize value.

        while (this.jobFlag) {
            getSortedMap(this.count); //count starts the number of all elements size.
//For example, Total size is 1,000,000. Then count will sets a 1,000,000 and it is decreased as 100,000.
// Also setMap() is called in this method.
            if (!jobFlag) //If all processing is done, jobFlag is set as false.
                break;
        }

        resetValue();
    }

Second Thread is below:

public void run() {
        setWriter(); //Writer Stream creates;

        double count  = 10; //the number of loop. 

        ConcurrentHashMap<String, String> hash = new ConcurrentHashMap<String,String>();

        for (int i = 0; i <= count - 1; i++) {
            hash = share.getMap();
            writeSeq(hash);
        }

        closeWriter(); //close Writer stream
    }

This is shared source:

import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;

public class ShareData {

    ConcurrentHashMap<String, String> map;

    public synchronized ConcurrentHashMap<String, String> getMap(){
        if (this.map == null) {
            try {
                wait();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        ConcurrentHashMap<String, String> hashmap = map;

        this.map = null;

        return hashmap;
    }

    public synchronized void setMap(ConcurrentHashMap<String, String> KV) {
        if (this.map != null) {
            try {
                wait();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        this.map = KV;
        notify();
    }

}

After that, second thread which save a data is stared. The size of KV is fine, but all values are not processed when foreach is proecessed. Also, each time i create a file, the size is different. Is it problem of synchronized?

public synchronized void writeSeq(ConcurrentHashMap<String, String> KV) {

        AtomicInteger a = new AtomicInteger(0);
        System.out.println(KV.size()); //ex) 65300
        redisKV.entrySet().parallelStream().forEach(
                entry -> { 
                    try {
                        a.incrementAndGet();
                        writer.append(new Text(entry.getKey()), new Text(entry.getValue()));
                    } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                });
        System.out.println(a.get()); //ex) 1300
        i = 0;
        notify();

    }
Stephen C

The size of KV is fine, but all values are not processed when foreach is processed. Also, each time i create a file, the size is different. Is it problem of synchronized?

Unclear. I can see a small issue but it is not likely to cause the problem you describe.

  • The if (map == null) wait(); code should be a while loop.

  • The if (map != null) wait(); code should be a while loop.

The issue is that if one thread gets a spurious notify, it may proceed with map in the wrong state. You need to retry the test. (If you read the javadoc for Object, you will see an example that correctly implements a condition variable.)

Apart from that, the root cause of your problem does not appear to be in the code that you have shown us.


However, if I was to take a guess, my guess would be that one thread is adding or removing entries in the ConcurrentHashMap while the second thread is processing it1. The getMap / setMap methods you have shown us have to be used appropriately (i.e. called at appropriate points with appropriate arguments) to avoid the two threads interfering with each other. You haven't shown us that code.

So, if my guess is correct, your problem is a logic error rather than a low level synchronization problem. But if you need a better answer you will need to write and post a proper MCVE.


1 - A ConcurrentHashMap's iterators are weakly consistent. This means that if you update the map while iterating, you may miss entries in the iteration, or possibly see them more than once.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related