python - Filter in Apache Spark is not working expected -
i'am executing code below:
ordersrdd = sc.textfile("/user/cloudera/sqoop_import/orders") orderitemsrdd = sc.textfile("/user/cloudera/sqoop_import/order_items") ordersparsedrdd = ordersrdd.filter(lambda rec: rec.split(",")[3] in "canceled").map(lambda rec: (int(rec.split(",")[0]), rec)) orderitemsparsedrdd = orderitemsrdd.map(lambda rec: (int(rec.split(",")[1]), float(rec.split(",")[4]))) orderitemsagg = orderitemsparsedrdd.reducebykey(lambda acc, value: (acc + value)) ordersjoinorderitems = orderitemsagg.join(ordersparsedrdd) in ordersjoinorderitems.filter(lambda rec: rec[1][0] >= 1000).take(5): print(i)
final results not getting displayed me, display stops @ condition.
before join
command able display records, after join
data not getting displayed.
error shown below:
16/06/01 17:26:05 info storage.shuffleblockfetcheriterator: getting 12 non-empty blocks out of 12 blocks 16/06/01 17:26:05 info storage.shuffleblockfetcheriterator: started 0 remote fetches in 0 ms 16/06/01 17:26:05 info python.pythonrunner: times: total = 67, boot = -57, init = 61, finish = 63 16/06/01 17:26:05 info python.pythonrunner: times: total = 73, boot = -68, init = 75, finish = 66 16/06/01 17:26:05 info executor.executor: finished task 11.0 in stage 289.0 (tid 2689). 1461 bytes result sent driver 16/06/01 17:26:05 info scheduler.tasksetmanager: finished task 11.0 in stage 289.0 (tid 2689) in 153 ms on localhost (12/24)
i start printing count on ordersjoinorderitems. if bigger 0, suggest filter culprit.