How to use multiple counters in Flink

Rocel

(kinda related to How to create dynamic metric in Flink)

I have a stream of events(someid:String, name:String) and for monitoring reasons, I need a counter per event ID. In all the Flink documentations and examples, I can see that the counter is , for instance, initialised with a name in the open of a map function.

But in my case I can not initialise the counter as I will need one per eventId and I do not know the value in advance. Also, I understand how expensive it would be to create a new counter every time an even passes in the map() method of the MapFunction. Finally, I can not keep a "cache" of counters as it would be too big.

Ideally, I would like something like this :

class Event(id: String, name: String)

class ExampleMapFunction extends RichMapFunction[Event, Event] {
  @transient private var counter: Counter = _

  override def open(parameters: Configuration): Unit = {
    counter = new Counter()
  }

  override def map(event: Event): Event = {
    counter.inc(event.id)
    event
  }
}

Or basically could I implement my own counter that allow me to pass a dimension? if yes, how?

Any advise or best practice for this kind of use-case?

David Anderson

If keeping a cache of the counters would be too big, then I don't think using metrics is going to scale in a way that will satisfy your requirements.

A few alternatives:

  • Use side outputs to collect meaningful events in some external, queryable/visualizable data store -- e.g., influxdb.

  • Hold the info in keyed state, and use broadcast messages to trigger output of relevant portions of it as desired (again using side outputs).

  • Hold the info in keyed state, and take periodic savepoints, which you then analyze via queries using the state processor API.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related