How to use multiset in flink?

volity

I use collect() function in flink sql, and the return type is multiset. However, common functions such as cardinality, char_length, multiset[item] all cannot use for multiset. How can I get the element in multiset?

I saw the Multiset type is bridging to JVM java.util.Map<t, java.lang.Integer>, so I try to write a udf to convert Multiset to String like this,

public String eval(Map<String, Integer> map){
    StringBuffer result = new StringBuffer();
    for(String key: map.keySet()){
        result.append(key);
        result.append(";");
    }
    if(result.length() > 0){
        result.deleteCharAt(result.length()-1);
    }
    return result.toString();
}

However, I got error, Map<String, Integer> could not be used Multiset.

How can I modify this udf?

twalthr

The support for multisets is limited at the moment. However, you can define you own user-defined functions to deal with them. Using a @DataTypeHint("MULTISET<X>") can be used whereever a Map<X, Integer> can be used.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

How to use Flink Metrics

How to use multiset to get associated list of records with JOOQ?

How to use std::unordered_multiset with a custom class?

How to use ListState for BroadcastProcessFunction in Flink

How to use multiple counters in Flink

How to use Flink Temporal Tables?

How to use Jedis in flink map()

How to use Apache Flink with lookup data?

How to use Scala XML with Apache Flink?

How to use flink fold function in scala

How to use Cassandra sink with TestContainers in Flink

Flink AsyncDataStream how to pass and use configuration parameters

How do I use ActiveMQ in Apache Flink?

Flink: how to store state and use in another stream?

How to build and use flink-connector-kinesis?

How to use "every" pattern operator in Flink CEP

How to use Flink's KafkaSource with Scala in 2022

How to generate all the permutations of a multiset?

How can I subtract a multiset from a set with a given multiset?

How to define a multiset using a function pointer?

How does multiset store duplicate elements?

C++: How to create a multiset of objects of a class?

How to create where statement based on result of multiset

How to configure Flink to use S3 for backend state and checkpoints

How to configure Flink to use Hdfs for backend state and checkpoints

What magics does Flink use in distinct()? How are surrogate keys generated?

How to use Flink streaming timeWindow with timestamp and watermark assigners?

Apache Flink: how to use SourceFunction to execute a task at specified interval?

How to use broadcast variables such as list in flink streaming program?