scala - Stop processing large text files in Apache Spark after certain amount of errors -
i'm new spark. i'm working in 1.6.1. let's imagine have large file, i'm reading rdd[string] thru textfile. want validate each line in function. because file huge, want stop processing when reached amount of errors, let's 1000 lines.
val rdd = sparkcontext.textfile(filename) rdd.map(line => myvalidator.validate(line))
here validate function:
def validate(line:string) : (string, string) = { // 1st in tuple resulted line, 2nd ,say, validation error. }
how calculate errors inside 'validate'?. executed in parallel on multiple nodes? broadcasts? accumulators?
you can achieve behavior using spark's laziness "splitting" result of parsing success , failures, calling take(n)
on failures, , using success data if there less n
failures.
to achieve more conveniently, i'd suggest changing signature of validate
return type can distinguish success failure, e.g. scala.util.try
:
def validate(line:string) : try[string] = { // returns success[string] on success, // failure (with details in exception object) otherwise }
and then, like:
val maxfailures = 1000 val rdd = sparkcontext.textfile(filename) val parsed: rdd[try[string]] = rdd.map(line => myvalidator.validate(line)).cache() val failures: array[throwable] = parsed.collect { case failure(e) => e }.take(maxfailures) if (failures.size == maxfailures) { // report failures... } else { val success: rdd[string] = parsed.collect { case success(s) => s } // continue here... }
why work?
- if there less 1000 failures, entire dataset parsed when
take(maxfailures)
called, successful data cached , ready use - if there 1000 failures or more, parsing stop there,
take
operation won't require anymore reads