hadoop - Spark - Finding overlapping values Or A variation of finding mutual friends -


i have problem i'm trying solve using spark. i'm new spark not sure best way design it.

input:

group1=user1,user2 group2=user1,user2,user3 group3=user2,user4 group4=user1,user4 group5=user3,user5 group6=user3,user4,user5 group7=user2,user4 group8=user1,user5 group9=user2,user4,user5 group10=user4,user5 

i want find mutual group count between each pair of users. above input, output i'm expecting be:

output:

1st user || 2nd user || mutual/intersection count || union count ------------------------------------------------------------ user1        user2           2                       7 user1        user3           1                       6 user1        user4           1                       9 user2        user4           3                       8 

i think there several ways solve problem, 1 of solution be:

  • create key,value pair key user , value group
  • group key, have list of groups user belong to
  • then find intersection/union between 2 groups

example:

(1st stage): map group1=user1,user2 ==>           user1, group1           user2, group1 group2=user1,user2,user3 ==>           user1, group2           user2, group2           user3, group2 .... .... ....   (2nd stage): reduce key user1 -> group1, group2, group4, group8 user2 -> group1, group2, group3, group7, group9 

but question is, best way represent count in way want after reduce them key?

is there better way handle problem? maximum number of user constant , not go more 5000, that's maximum number of keys create. input may contains several lines close 1b rows. don't think problem, please correct me if i'm wrong.

update:

this piece of code came solve problem little knowledge of spark (just started learning spark last month):

def createpair(line: string): array[(string, string)] = {     val splits = line.split("=")     val kuid = splits(0)     splits(1).split(",").map { segment => (segment, kuid) } }   val input = sc.textfile("input/test.log") val pair = input.flatmap { line => createpair(line) }  val pairlistdf = pair   .aggregatebykey(scala.collection.mutable.listbuffer.empty[string])(     (kuidlist, kuid) => { kuidlist += kuid; kuidlist },     (kuidlist1, kuidlist2) => { kuidlist1.appendall(kuidlist2); kuidlist1 })   .mapvalues(_.tolist).todf().select($"_1".alias("user"), $"_2".alias("groups"))  pairlistdf.registertemptable("table")  sqlcontext.udf.register("intersectcount", (list1: wrappedarray[string], list2: wrappedarray[string]) => list1.intersect(list2).size) sqlcontext.udf.register("unioncount", (list1: wrappedarray[string], list2: wrappedarray[string]) => list1.union(list2).distinct.size)  val populationdf = sqlcontext.sql("select t1.user user_first,"   + "t2.user user_second,"   + "intersectcount(t1.groups, t2.groups) intersect_count,"   + "unioncount(t1.groups, t2.groups) union_count"   + " table t1 inner join table t2"   + " on t1.user < t2.user"   + " order user_first,user_second") 

output:

+----------+-----------+---------------+-----------+ |user_first|user_second|intersect_count|union_count| +----------+-----------+---------------+-----------+ |     user1|      user2|              2|          7| |     user1|      user3|              1|          6| |     user1|      user4|              1|          9| |     user1|      user5|              1|          8| |     user2|      user3|              1|          7| |     user2|      user4|              3|          8| |     user2|      user5|              1|          9| |     user3|      user4|              1|          8| |     user3|      user5|              2|          6| |     user4|      user5|              3|          8| +----------+-----------+---------------+-----------+ 

would love feedbacks code , things i'm missing. please feel free criticize code started learning spark. again @axiom answer, smaller , nicer solution expected.

summary:

obtain pair counts, , use fact

union(a, b) = count(a) + count(b) - intersection(a, b)

val data = sc.textfile("test") //optionally data.cache(), depending on size of data. val paircounts  = data.flatmap(pairs).reducebykey(_ + _) val singlecounts = data.flatmap(singles).reducebykey(_ + _) val singlecountmap = sc.broadcast(singlecounts.collectasmap()) val result = paircounts.map{case ((user1, user2), intersectioncount) =>(user1, user2, intersectioncount, singlecountmap.value(user1) + singlecountmap.value(user2) - intersectioncount)} 


details:

  1. there total 5000 users, 25 million keys (1 per pair) shouldn't much. can use reducebykey calculate intersection counts.

  2. individual counts can broadcasted in maps.

  3. and known:

    union(user1, user2) = count(user1) + count(user2) - intersection(user1, user2).

the first 2 counts read broadcasted map, while map on rdd of pair counts.

code:

//generate ((user1, user2), 1) pair counts def pairs(str: string) = {  val users = str.split("=")(1).split(",")  val n = users.length  for(i <- 0 until n; j <- + 1 until n) yield {   val pair = if(users(i) < users(j)) {     (users(i), users(j))   } else {    (users(j), users(i))   } //order of user in list shouldn't matter   (pair, 1)  }  }  //generate (user, 1), obtain single counts def singles(str: string) = {   for(user <- str.split("=")(1).split(",")) yield (user, 1) }   //read rdd scala> val data = sc.textfile("test") scala> data.collect.map(println) group1=user1,user2 group2=user1,user2,user3 group3=user2,user4 group4=user1,user4 group5=user3,user5 group6=user3,user4,user5 group7=user2,user4 group8=user1,user5 group9=user2,user4,user5 group10=user4,user5  //get pair counts scala> val paircounts  = data.flatmap(pairs).reducebykey(_ + _) paircounts: org.apache.spark.rdd.rdd[((string, string), int)] = shuffledrdd[16] @ reducebykey @ <console>:25    //just checking scala> paircounts.collect.map(println) ((user2,user3),1) ((user1,user3),1) ((user3,user4),1) ((user2,user5),1) ((user1,user5),1) ((user2,user4),3) ((user4,user5),3) ((user1,user4),1) ((user3,user5),2) ((user1,user2),2)  //single counts scala> val singlecounts = data.flatmap(singles).reducebykey(_ + _) singlecounts: org.apache.spark.rdd.rdd[(string, int)] = shuffledrdd[20] @ reducebykey @ <console>:25  scala> singlecounts.collect.map(println)  (user5,5) (user3,3) (user1,4) (user2,5) (user4,6)   //broadcast single counts scala> val singlecountmap = sc.broadcast(singlecounts.collectasmap())  //calculate results: 

and finally:

scala> val res = paircounts.map{case ((user1, user2), intersectioncount) => (user1, user2, intersectioncount, singlecountmap.value(user1) + singlecountmap.value(user2) - intersectioncount)} res: org.apache.spark.rdd.rdd[(string, string, int, int)] = mappartitionsrdd[23] @ map @ <console>:33  scala> res.collect.map(println) (user2,user3,1,7) (user1,user3,1,6) (user3,user4,1,8) (user2,user5,1,9) (user1,user5,1,8) (user2,user4,3,8) (user4,user5,3,8) (user1,user4,1,9) (user3,user5,2,6) (user1,user2,2,7) 

note:

  1. while generating pairs, sort tuple because don't want order of users in list matter.

  2. do encode username strings integers, may significant performance boost.


Popular posts from this blog

Apache NiFi ExecuteScript: Groovy script to replace Json values via a mapping file -

python 3.x - PyQt5 - Signal : pyqtSignal no method connect -