hadoop - Spark - Finding overlapping values Or A variation of finding mutual friends -
i have problem i'm trying solve using spark. i'm new spark not sure best way design it.
input:
group1=user1,user2 group2=user1,user2,user3 group3=user2,user4 group4=user1,user4 group5=user3,user5 group6=user3,user4,user5 group7=user2,user4 group8=user1,user5 group9=user2,user4,user5 group10=user4,user5
i want find mutual group count between each pair of users. above input, output i'm expecting be:
output:
1st user || 2nd user || mutual/intersection count || union count ------------------------------------------------------------ user1 user2 2 7 user1 user3 1 6 user1 user4 1 9 user2 user4 3 8
i think there several ways solve problem, 1 of solution be:
- create key,value pair key user , value group
- group key, have list of groups user belong to
- then find intersection/union between 2 groups
example:
(1st stage): map group1=user1,user2 ==> user1, group1 user2, group1 group2=user1,user2,user3 ==> user1, group2 user2, group2 user3, group2 .... .... .... (2nd stage): reduce key user1 -> group1, group2, group4, group8 user2 -> group1, group2, group3, group7, group9
but question is, best way represent count in way want after reduce them key?
is there better way handle problem? maximum number of user constant , not go more 5000, that's maximum number of keys create. input may contains several lines close 1b rows. don't think problem, please correct me if i'm wrong.
update:
this piece of code came solve problem little knowledge of spark (just started learning spark last month):
def createpair(line: string): array[(string, string)] = { val splits = line.split("=") val kuid = splits(0) splits(1).split(",").map { segment => (segment, kuid) } } val input = sc.textfile("input/test.log") val pair = input.flatmap { line => createpair(line) } val pairlistdf = pair .aggregatebykey(scala.collection.mutable.listbuffer.empty[string])( (kuidlist, kuid) => { kuidlist += kuid; kuidlist }, (kuidlist1, kuidlist2) => { kuidlist1.appendall(kuidlist2); kuidlist1 }) .mapvalues(_.tolist).todf().select($"_1".alias("user"), $"_2".alias("groups")) pairlistdf.registertemptable("table") sqlcontext.udf.register("intersectcount", (list1: wrappedarray[string], list2: wrappedarray[string]) => list1.intersect(list2).size) sqlcontext.udf.register("unioncount", (list1: wrappedarray[string], list2: wrappedarray[string]) => list1.union(list2).distinct.size) val populationdf = sqlcontext.sql("select t1.user user_first," + "t2.user user_second," + "intersectcount(t1.groups, t2.groups) intersect_count," + "unioncount(t1.groups, t2.groups) union_count" + " table t1 inner join table t2" + " on t1.user < t2.user" + " order user_first,user_second")
output:
+----------+-----------+---------------+-----------+ |user_first|user_second|intersect_count|union_count| +----------+-----------+---------------+-----------+ | user1| user2| 2| 7| | user1| user3| 1| 6| | user1| user4| 1| 9| | user1| user5| 1| 8| | user2| user3| 1| 7| | user2| user4| 3| 8| | user2| user5| 1| 9| | user3| user4| 1| 8| | user3| user5| 2| 6| | user4| user5| 3| 8| +----------+-----------+---------------+-----------+
would love feedbacks code , things i'm missing. please feel free criticize code started learning spark. again @axiom answer, smaller , nicer solution expected.
summary:
obtain pair counts, , use fact
union(a, b) = count(a) + count(b) - intersection(a, b)
val data = sc.textfile("test") //optionally data.cache(), depending on size of data. val paircounts = data.flatmap(pairs).reducebykey(_ + _) val singlecounts = data.flatmap(singles).reducebykey(_ + _) val singlecountmap = sc.broadcast(singlecounts.collectasmap()) val result = paircounts.map{case ((user1, user2), intersectioncount) =>(user1, user2, intersectioncount, singlecountmap.value(user1) + singlecountmap.value(user2) - intersectioncount)}
details:
there total 5000 users, 25 million keys (1 per pair) shouldn't much. can use
reducebykey
calculate intersection counts.individual counts can
broadcasted
in maps.and known:
union(user1, user2) = count(user1) + count(user2) - intersection(user1, user2)
.
the first 2 counts read broadcasted map, while map on rdd of pair counts.
code:
//generate ((user1, user2), 1) pair counts def pairs(str: string) = { val users = str.split("=")(1).split(",") val n = users.length for(i <- 0 until n; j <- + 1 until n) yield { val pair = if(users(i) < users(j)) { (users(i), users(j)) } else { (users(j), users(i)) } //order of user in list shouldn't matter (pair, 1) } } //generate (user, 1), obtain single counts def singles(str: string) = { for(user <- str.split("=")(1).split(",")) yield (user, 1) } //read rdd scala> val data = sc.textfile("test") scala> data.collect.map(println) group1=user1,user2 group2=user1,user2,user3 group3=user2,user4 group4=user1,user4 group5=user3,user5 group6=user3,user4,user5 group7=user2,user4 group8=user1,user5 group9=user2,user4,user5 group10=user4,user5 //get pair counts scala> val paircounts = data.flatmap(pairs).reducebykey(_ + _) paircounts: org.apache.spark.rdd.rdd[((string, string), int)] = shuffledrdd[16] @ reducebykey @ <console>:25 //just checking scala> paircounts.collect.map(println) ((user2,user3),1) ((user1,user3),1) ((user3,user4),1) ((user2,user5),1) ((user1,user5),1) ((user2,user4),3) ((user4,user5),3) ((user1,user4),1) ((user3,user5),2) ((user1,user2),2) //single counts scala> val singlecounts = data.flatmap(singles).reducebykey(_ + _) singlecounts: org.apache.spark.rdd.rdd[(string, int)] = shuffledrdd[20] @ reducebykey @ <console>:25 scala> singlecounts.collect.map(println) (user5,5) (user3,3) (user1,4) (user2,5) (user4,6) //broadcast single counts scala> val singlecountmap = sc.broadcast(singlecounts.collectasmap()) //calculate results:
and finally:
scala> val res = paircounts.map{case ((user1, user2), intersectioncount) => (user1, user2, intersectioncount, singlecountmap.value(user1) + singlecountmap.value(user2) - intersectioncount)} res: org.apache.spark.rdd.rdd[(string, string, int, int)] = mappartitionsrdd[23] @ map @ <console>:33 scala> res.collect.map(println) (user2,user3,1,7) (user1,user3,1,6) (user3,user4,1,8) (user2,user5,1,9) (user1,user5,1,8) (user2,user4,3,8) (user4,user5,3,8) (user1,user4,1,9) (user3,user5,2,6) (user1,user2,2,7)
note:
while generating pairs, sort tuple because don't want order of users in list matter.
do encode username strings integers, may significant performance boost.