How to use ListState for BroadcastProcessFunction in Flink

M_Gh

We have a non-keyed data stream which contains transactions and a broadcast stream which contains rules. In fact, we want to process transactions based on the last seen rule. If our last seen rule is daily, we have to add current transaction to dailyTrnsList. Also, if the dailyTrnsList size is greater than threshold, we must clear the list and write transactions to database. We do the same thing if the last seen rule is temp.

The code is in following:

public class TransactionProcess extends BroadcastProcessFunction<String, String, String>{
private List<String> dailyTrnsList = new ArrayList<>();
private List<String> tempTrnsList = new ArrayList<>();

private final static int threshold = 100;

private final MapStateDescriptor<String, String> ruleStateDesc =
        new MapStateDescriptor<>(
                "ControlMapState",
                BasicTypeInfo.STRING_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO);

  @Override
  public void processElement(String s,
                           ReadOnlyContext readOnlyContext,
                           Collector<Transaction> collector) throws Exception
 {
    String ruleName = readOnlyContext.getBroadcastState(ruleStateDesc).get("rule");

    if(ruleName.equals("daily"))
        {
            dailyTrnsList.add(s);
            if(dailyTrnsList.size()>=threshold)
                {
                    List<String> buffer = dailyTrnsList;
                    dailyTrnsList = new ArrayList<>();
                    insert_to_db(buffer,"daily");
                }
        }
    else if(ruleName.equals("temp"))
        {
            tempTrnsList.add(s);
            if(tempTrnsList.size()>=threshold)
                {
                    List<String> buffer = tempTrnsList;
                    tempTrnsList = new ArrayList<>();
                    insert_to_db(buffer,"temp");
                }
        }

    collector.collect(s);

   }
  @Override
  public void processBroadcastElement(String s,
                                    Context context,
                                    Collector<CardTransaction> collector) throws Exception
  {
    if (s.equals("temp"))
    {
        context.getBroadcastState(ruleStateDesc).put("rule", "temp");
    List<String> buffer = dailyTrnsList;
        dailyTrnsList = new ArrayList<>();
        insert_to_db(buffer,"daily");
    }
    else if (s.equals("daily"))
    {
        context.getBroadcastState(ruleStateDesc).put("rule", "daily");
        List<String> buffer = tempTrnsList;
        tempTrnsList = new ArrayList<>();
        insert_to_db(buffer,"temp");
      }
    }
  }

Our problem is writing a fault tolerance approach. We do not know how to use ListState for our issue. The only solutioln that we have found so far is implementation of CheckpointedFunction interface which was under Working with State section in Flink document.

private ListState<String> dailyTrns;
private ListState<String> tempTrns;

@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
    dailyTrns.clear();
    tempTrns.clear();
    for (String[] element : dailyTrnsList)
        dailyTrns.add(element);
    for (String[] element : tempTrnsList)
        tempTrns.add(element);
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {

    dailyTrns = context.getOperatorStateStore().getListState(dailyDescriptor);
    tempTrns = context.getOperatorStateStore().getListState(tempDescriptor);
    if (context.isRestored()) {
        for (String[] element : dailyTrns.get())
            dailyTrnsList.add(element);
        for (String[] element : tempTrns.get())
            tempTrnsList.add(element);
    }
}

Would you please guide us, if this approach is not the correct solution, what else we can do? and if the solution is correct, what happen for elements that are not transferred from dailyTrnsList and tempTrnsList to dailyTrns and tempTrns?

Any help would be appreciated.

Thank you in advance.

David Anderson

You could simplify your implementation so as to not have to worry about this. You could do the following:

(1) Simplify the BroadcastProcessFunction so that all it does is to split the incoming stream into two streams: a stream of daily transactions and a stream of temporary transactions. It does this by choosing one of two side outputs based on the latest rule.

(2) Follow the BroadcastProcessFunction with count windows that create batches and write them to the database.

Or instead of using side outputs, the BroadcastProcessFunction could write out tuples of (rule, transaction), and then you could key the stream by the rule. Either way, the idea would be to let the window API take care of managing fault tolerant lists for you.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

How to unit test BroadcastProcessFunction in flink when processElement depends on broadcasted data

Mocking a ListState in Apache Flink 1.4

Apache Flink 1.6.0 - StateTtlConfig and ListState

Jetpack Compose LazyColumnFor deprecated, how to use LazyColumn with listState and list of objects?

How to configure Flink DataStream job to handle an immutable ListState of a table of 725MB?

How to use Flink Metrics

How to use multiset in flink?

How to use multiple counters in Flink

How to use Flink Temporal Tables?

How to use Jedis in flink map()

How to use Apache Flink with lookup data?

How to use Scala XML with Apache Flink?

How to use flink fold function in scala

How to use Cassandra sink with TestContainers in Flink

Flink AsyncDataStream how to pass and use configuration parameters

How do I use ActiveMQ in Apache Flink?

Flink: how to store state and use in another stream?

How to build and use flink-connector-kinesis?

How to use "every" pattern operator in Flink CEP

How to use Flink's KafkaSource with Scala in 2022

How to configure Flink to use S3 for backend state and checkpoints

How to configure Flink to use Hdfs for backend state and checkpoints

What magics does Flink use in distinct()? How are surrogate keys generated?

How to use Flink streaming timeWindow with timestamp and watermark assigners?

Apache Flink: how to use SourceFunction to execute a task at specified interval?

How to use broadcast variables such as list in flink streaming program?

How to use window state with Flink 1.7.1 Session Windows

Flink SQL: How can use a Long type column to Rowtime

Is it necessary to use windows in Flink?