How to support multiple KeyBy in Flink

Abhijit Pathak

In code sample below, I am trying to get a stream of employee records {Country, Employer, Name, Salary, Age } and dumping highest paid employee in every country. Unfortunately Multiple KEY By doesn't work.

Only KeyBy(Employer) is reflecting, thus I don't get correct result. What am I missing?

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Employee> streamEmployee = env.addSource(
            new FlinkKafkaConsumer010<ObjectNode>("flink-demo", new JSONDeserializationSchema(), properties))
            .map(new MapFunction<ObjectNode, Employee>() {

                private static final long serialVersionUID = 6111226274068863916L;

                @Override
                public Employee map(ObjectNode value) throws Exception {
                    final Gson gson = new GsonBuilder().create();
                    Employee uMsg = gson.fromJson(value.toString(), Employee.class);
                    return uMsg;
                }
            });

    KeyedStream<Employee, String> employeesKeyedByCountryndEmployer = streamEmployee
            .keyBy(new KeySelector<Employee, String>() {
                private static final long serialVersionUID = -6867736771747690202L;

                @Override
                public String getKey(Employee value) throws Exception {
                    // TODO Auto-generated method stub
                    return value.getCountry();
                }
            }).keyBy(new KeySelector<Employee, String>() {
                private static final long serialVersionUID = -6867736771747690202L;

                @Override
                public String getKey(Employee value) throws Exception {
                    // TODO Auto-generated method stub
                    return value.getEmployer();
                }
            });
    // This should display employees highly paid in a given country , for a
    // given employer
    DataStream<Employee> uHighlyPaidEmployee = employeesKeyedByCountryndEmployer.timeWindow(Time.seconds(5))
            .maxBy("salary");

    // Assume toString() is overridden , so print works well.
    uHighlyPaidEmployee.print();

    env.execute("Employee-employer log processor");
Fabian Hueske

You can define a KeySelector that returns a composite key:

KeyedStream<Employee, Tuple2<String, String>> employeesKeyedByCountryndEmployer = 
  streamEmployee.keyBy(
    new KeySelector<Employee, Tuple2<String, String>>() {

      @Override
      public Tuple2<String, String> getKey(Employee value) throws Exception {
        return Tuple2.of(value.getCountry(), value.getEmployer());
      }
    }
  );

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

How to get state for multiple keyBy in flink using queryable state client?

flink DataStream keyBy API

Flink KeyBy fields

flink keyBy adding delay; how can I reduce this latency?

Apache Flink:keyby and window operator

How to KeyBy where multiple items have the same key

Multiple Streams support in Apache Flink Job

Flink keyby then window then aggregate all results?

Flink datastream keyby using composite key

Apache flink keyby function with field expression

How to use multiple counters in Flink

Flink keyby/window operator task execution place and internals

Keyby data distribution in Apache Flink, Logical or Physical Operator?

Does keyBy partition the DataStream across parallel tasks in Flink (scala)?

How to submit multiple Flink Jobs using Single Flink Application

Query Builder - how to chain paginate and keyBy?

Multiple version of keyBy in lodash? (Group values sharing a key as an array)

R data.table keyby multiple columns when chaining

Does Flink keyby on the same field which isn't changed cause a shuffle?

How can i give support Multiple Device Support in react native?

How to add multiple modules with support design support libraries

Flink, how to set parallelism properly when using multiple Kafka source?

Apache Flink: How to apply multiple counting window functions?

How to add multiple consumer of same data stream in Flink

Zebra jPOS: How to have support for multiple scanners

How to construct a makefile that support multiple targets with main()

How to support multiple app configs in Rider

How to support multiple android version in your code?

Elasticsearch how to support transaction involving multiple documents