scala - Stop processing large text files in Apache Spark after certain amount of errors -


i'm new spark. i'm working in 1.6.1. let's imagine have large file, i'm reading rdd[string] thru textfile. want validate each line in function. because file huge, want stop processing when reached amount of errors, let's 1000 lines.

val rdd = sparkcontext.textfile(filename) rdd.map(line => myvalidator.validate(line))

here validate function:

def validate(line:string) : (string, string) = { // 1st in tuple resulted line, 2nd ,say, validation error. }

how calculate errors inside 'validate'?. executed in parallel on multiple nodes? broadcasts? accumulators?

you can achieve behavior using spark's laziness "splitting" result of parsing success , failures, calling take(n) on failures, , using success data if there less n failures.

to achieve more conveniently, i'd suggest changing signature of validate return type can distinguish success failure, e.g. scala.util.try:

def validate(line:string) : try[string] = {     // returns success[string] on success,      // failure (with details in exception object) otherwise  } 

and then, like:

val maxfailures = 1000 val rdd = sparkcontext.textfile(filename) val parsed: rdd[try[string]] = rdd.map(line => myvalidator.validate(line)).cache()  val failures: array[throwable] = parsed.collect { case failure(e) => e }.take(maxfailures)  if (failures.size == maxfailures) {    // report failures...  } else {   val success: rdd[string] = parsed.collect { case success(s) => s }   // continue here... } 

why work?

  • if there less 1000 failures, entire dataset parsed when take(maxfailures) called, successful data cached , ready use
  • if there 1000 failures or more, parsing stop there, take operation won't require anymore reads

Popular posts from this blog

php - How should I create my API for mobile applications (Needs Authentication) -

5 Reasons to Blog Anonymously (and 5 Reasons Not To)

Google AdWords and AdSense - A Dynamic Small Business Marketing Duo