hadoop - Spark - Finding overlapping values Or A variation of finding mutual friends -


i have problem i'm trying solve using spark. i'm new spark not sure best way design it.

input:

group1=user1,user2 group2=user1,user2,user3 group3=user2,user4 group4=user1,user4 group5=user3,user5 group6=user3,user4,user5 group7=user2,user4 group8=user1,user5 group9=user2,user4,user5 group10=user4,user5 

i want find mutual group count between each pair of users. above input, output i'm expecting be:

output:

1st user || 2nd user || mutual/intersection count || union count ------------------------------------------------------------ user1        user2           2                       7 user1        user3           1                       6 user1        user4           1                       9 user2        user4           3                       8 

i think there several ways solve problem, 1 of solution be:

  • create key,value pair key user , value group
  • group key, have list of groups user belong to
  • then find intersection/union between 2 groups

example:

(1st stage): map group1=user1,user2 ==>           user1, group1           user2, group1 group2=user1,user2,user3 ==>           user1, group2           user2, group2           user3, group2 .... .... ....   (2nd stage): reduce key user1 -> group1, group2, group4, group8 user2 -> group1, group2, group3, group7, group9 

but question is, best way represent count in way want after reduce them key?

is there better way handle problem? maximum number of user constant , not go more 5000, that's maximum number of keys create. input may contains several lines close 1b rows. don't think problem, please correct me if i'm wrong.

update:

this piece of code came solve problem little knowledge of spark (just started learning spark last month):

def createpair(line: string): array[(string, string)] = {     val splits = line.split("=")     val kuid = splits(0)     splits(1).split(",").map { segment => (segment, kuid) } }   val input = sc.textfile("input/test.log") val pair = input.flatmap { line => createpair(line) }  val pairlistdf = pair   .aggregatebykey(scala.collection.mutable.listbuffer.empty[string])(     (kuidlist, kuid) => { kuidlist += kuid; kuidlist },     (kuidlist1, kuidlist2) => { kuidlist1.appendall(kuidlist2); kuidlist1 })   .mapvalues(_.tolist).todf().select($"_1".alias("user"), $"_2".alias("groups"))  pairlistdf.registertemptable("table")  sqlcontext.udf.register("intersectcount", (list1: wrappedarray[string], list2: wrappedarray[string]) => list1.intersect(list2).size) sqlcontext.udf.register("unioncount", (list1: wrappedarray[string], list2: wrappedarray[string]) => list1.union(list2).distinct.size)  val populationdf = sqlcontext.sql("select t1.user user_first,"   + "t2.user user_second,"   + "intersectcount(t1.groups, t2.groups) intersect_count,"   + "unioncount(t1.groups, t2.groups) union_count"   + " table t1 inner join table t2"   + " on t1.user < t2.user"   + " order user_first,user_second") 

output:

+----------+-----------+---------------+-----------+ |user_first|user_second|intersect_count|union_count| +----------+-----------+---------------+-----------+ |     user1|      user2|              2|          7| |     user1|      user3|              1|          6| |     user1|      user4|              1|          9| |     user1|      user5|              1|          8| |     user2|      user3|              1|          7| |     user2|      user4|              3|          8| |     user2|      user5|              1|          9| |     user3|      user4|              1|          8| |     user3|      user5|              2|          6| |     user4|      user5|              3|          8| +----------+-----------+---------------+-----------+ 

would love feedbacks code , things i'm missing. please feel free criticize code started learning spark. again @axiom answer, smaller , nicer solution expected.

summary:

obtain pair counts, , use fact

union(a, b) = count(a) + count(b) - intersection(a, b)

val data = sc.textfile("test") //optionally data.cache(), depending on size of data. val paircounts  = data.flatmap(pairs).reducebykey(_ + _) val singlecounts = data.flatmap(singles).reducebykey(_ + _) val singlecountmap = sc.broadcast(singlecounts.collectasmap()) val result = paircounts.map{case ((user1, user2), intersectioncount) =>(user1, user2, intersectioncount, singlecountmap.value(user1) + singlecountmap.value(user2) - intersectioncount)} 


details:

  1. there total 5000 users, 25 million keys (1 per pair) shouldn't much. can use reducebykey calculate intersection counts.

  2. individual counts can broadcasted in maps.

  3. and known:

    union(user1, user2) = count(user1) + count(user2) - intersection(user1, user2).

the first 2 counts read broadcasted map, while map on rdd of pair counts.

code:

//generate ((user1, user2), 1) pair counts def pairs(str: string) = {  val users = str.split("=")(1).split(",")  val n = users.length  for(i <- 0 until n; j <- + 1 until n) yield {   val pair = if(users(i) < users(j)) {     (users(i), users(j))   } else {    (users(j), users(i))   } //order of user in list shouldn't matter   (pair, 1)  }  }  //generate (user, 1), obtain single counts def singles(str: string) = {   for(user <- str.split("=")(1).split(",")) yield (user, 1) }   //read rdd scala> val data = sc.textfile("test") scala> data.collect.map(println) group1=user1,user2 group2=user1,user2,user3 group3=user2,user4 group4=user1,user4 group5=user3,user5 group6=user3,user4,user5 group7=user2,user4 group8=user1,user5 group9=user2,user4,user5 group10=user4,user5  //get pair counts scala> val paircounts  = data.flatmap(pairs).reducebykey(_ + _) paircounts: org.apache.spark.rdd.rdd[((string, string), int)] = shuffledrdd[16] @ reducebykey @ <console>:25    //just checking scala> paircounts.collect.map(println) ((user2,user3),1) ((user1,user3),1) ((user3,user4),1) ((user2,user5),1) ((user1,user5),1) ((user2,user4),3) ((user4,user5),3) ((user1,user4),1) ((user3,user5),2) ((user1,user2),2)  //single counts scala> val singlecounts = data.flatmap(singles).reducebykey(_ + _) singlecounts: org.apache.spark.rdd.rdd[(string, int)] = shuffledrdd[20] @ reducebykey @ <console>:25  scala> singlecounts.collect.map(println)  (user5,5) (user3,3) (user1,4) (user2,5) (user4,6)   //broadcast single counts scala> val singlecountmap = sc.broadcast(singlecounts.collectasmap())  //calculate results: 

and finally:

scala> val res = paircounts.map{case ((user1, user2), intersectioncount) => (user1, user2, intersectioncount, singlecountmap.value(user1) + singlecountmap.value(user2) - intersectioncount)} res: org.apache.spark.rdd.rdd[(string, string, int, int)] = mappartitionsrdd[23] @ map @ <console>:33  scala> res.collect.map(println) (user2,user3,1,7) (user1,user3,1,6) (user3,user4,1,8) (user2,user5,1,9) (user1,user5,1,8) (user2,user4,3,8) (user4,user5,3,8) (user1,user4,1,9) (user3,user5,2,6) (user1,user2,2,7) 

note:

  1. while generating pairs, sort tuple because don't want order of users in list matter.

  2. do encode username strings integers, may significant performance boost.


Popular posts from this blog

php - How should I create my API for mobile applications (Needs Authentication) -

5 Reasons to Blog Anonymously (and 5 Reasons Not To)

Google AdWords and AdSense - A Dynamic Small Business Marketing Duo