我有一个包含以下数据的文件:
1231212 name1 name2
1431344名称1名称3
2342343 name3 name4
2344255 name2 name1
并且我希望我的Java程序在Spark中执行一些操作,以便输出如下:
[(name1、3),(name2、2),(name3、2),(name4、1)]
但是我仍然不确定如何使用平面和减少操作。我刚刚开始学习Spark。
我现在所拥有的是:
List<Tuple2<String,Long>> result1 =
accessLogs.map(log -> new Tuple2<Long, String>(log.getTimestamp(), log.getHostname1()))
.filter(tuple -> tuple._1() > init_time - 5)
.filter(tuple -> tuple._1() < fin_time + 5)
.map(e -> (new Tuple2<String, Long>(e._2, 1L)))
.take(100);
List<Tuple2<String, Long>> result2 =
accessLogs.map(log -> new Tuple2<Long, String>(log.getTimestamp(), log.getHostname2()))
.filter(tuple -> tuple._1() > init_time - 5)
.filter(tuple -> tuple._1() < fin_time + 5)
.map(e -> (new Tuple2<String, Long>(e._2, 1L)))
.take(100);
所以我的结果是两个不同的列表,其中包含以下数据:
[(name1,1),(name1,1),(name3,1),(name2,1)]
[(name2,1),(name3,1),(name4,1),(name1,1)]
仅使用一个列表,我可以用来获得所需的结果吗?
我的想法是这样开始的:
List<String> finalResult =
accessLogs.map(log -> new Tuple3<Long, String, String>(log.getTimestamp(), log.getHostname1(), log.getHostname2()))
.filter(tuple -> tuple._1() > init_time - 5)
.filter(tuple -> tuple._1() < fin_time + 5)...
然后继续执行操作。
编辑:
现在,我有以下代码:
JavaPairRDD<String, Integer> pairs1 = accessLogs.mapToPair(new PairFunction<LogObject, String, Integer>() {
public Tuple2<String, Integer> call(LogObject s) { return new Tuple2<String, Integer>(s.getHostname1(), 1); }
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});
哪个返回
[(name1,2),(name3,1),(name2,1)]
但我仍然缺少有关如何执行此操作的部分(建议的答案)
.flatMap {case(_,key1,key2)=>列表((key1,1),(key2,1))}
Java中,以便我可以从第二列和第三列中检索数据。
有很多方法可以做到这一点,但是一种方法是使用,flatMap
然后使用reduceByKey
。我的Java技能有点生锈,所以我将其列出在Scala中-希望您能理解这一点,然后自己将其转换为Java :)
val result = accessLogs.flatMap{case(_, key1, key2) => List((key1, 1), (key2, 1))}.reduceByKey(_+_)
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句