Skip to content

Commit 1a0de87

Browse files
committed
Improve exception handling of stream readers using resource management via 'using',
1 parent 91d7e45 commit 1a0de87

File tree

4 files changed

+73
-76
lines changed

4 files changed

+73
-76
lines changed

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala

Lines changed: 5 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ import za.co.absa.cobrix.cobol.processor.impl.{CobolProcessorInPlace, CobolProce
2121
import za.co.absa.cobrix.cobol.reader.parameters.{CobolParametersParser, Parameters, ReaderParameters}
2222
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema
2323
import za.co.absa.cobrix.cobol.reader.stream.{FSStream, SimpleStream}
24+
import za.co.absa.cobrix.cobol.utils.UsingUtils
2425

25-
import java.io.{BufferedInputStream, BufferedOutputStream, FileOutputStream, OutputStream}
26+
import java.io.{BufferedOutputStream, FileOutputStream, OutputStream}
2627
import scala.collection.mutable
2728

2829

@@ -159,43 +160,12 @@ object CobolProcessor {
159160
case CobolProcessingStrategy.ToVariableLength => new CobolProcessorToRdw(readerParameters, copybook, copybookContents, options)
160161
}
161162

162-
val ifs = new FSStream(fileToProcess)
163-
val ofs = new BufferedOutputStream(new FileOutputStream(outputFile))
164-
165-
var originalException: Throwable = null
166-
167-
val recordCount = try {
168-
processor.process(ifs, ofs)(rawRecordProcessor)
169-
} catch {
170-
case ex: Throwable =>
171-
originalException = ex
172-
0L
173-
} finally {
174-
try {
175-
ifs.close()
176-
} catch {
177-
case e: Throwable =>
178-
if (originalException != null) {
179-
originalException.addSuppressed(e)
180-
} else {
181-
originalException = e
182-
}
183-
}
184-
185-
try {
186-
ofs.close()
187-
} catch {
188-
case e: Throwable =>
189-
if (originalException != null) {
190-
originalException.addSuppressed(e)
191-
} else {
192-
originalException = e
193-
}
163+
val recordCount = UsingUtils.using(new FSStream(fileToProcess)) { ifs =>
164+
UsingUtils.using(new BufferedOutputStream(new FileOutputStream(outputFile))) { ofs =>
165+
processor.process(ifs, ofs)(rawRecordProcessor)
194166
}
195167
}
196168

197-
if (originalException != null) throw originalException
198-
199169
recordCount
200170
}
201171
}

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package za.co.absa.cobrix.cobol.reader.stream
1818

1919

2020
/** This trait defines a simple abstraction for processing variable length record data. */
21-
trait SimpleStream {
21+
trait SimpleStream extends AutoCloseable {
2222
def size: Long
2323

2424
def totalSize: Long
@@ -35,6 +35,4 @@ trait SimpleStream {
3535
def copyStream(): SimpleStream
3636

3737
@throws(classOf[Exception]) def next(numberOfBytes: Int): Array[Byte]
38-
39-
@throws(classOf[Exception]) def close(): Unit
4038
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.cobol.utils
18+
19+
import scala.util.control.NonFatal
20+
21+
object UsingUtils {
22+
/**
23+
* Executes the given action with a resource that implements the AutoCloseable interface, ensuring
24+
* proper closure of the resource. Any exception that occurs during the action or resource closure
25+
* is handled appropriately, with suppressed exceptions added where relevant. Null resources are not supported.
26+
*
27+
* @param resource a lazily evaluated resource that implements AutoCloseable
28+
* @param action a function to be executed using the provided resource
29+
* @tparam T the type of the resource, which must extend AutoCloseable
30+
* @throws Throwable if either the action or resource closure fails. If both fail, the action's exception
31+
* is thrown with the closure's exception added as suppressed
32+
*/
33+
def using[T <: AutoCloseable,U](resource: => T)(action: T => U): U = {
34+
var thrownException: Option[Throwable] = None
35+
var suppressedException: Option[Throwable] = None
36+
val openedResource = resource
37+
38+
val result = try {
39+
Option(action(openedResource))
40+
} catch {
41+
case NonFatal(ex) =>
42+
thrownException = Option(ex)
43+
None
44+
} finally
45+
if (openedResource != null) {
46+
try
47+
openedResource.close()
48+
catch {
49+
case NonFatal(ex) => suppressedException = Option(ex)
50+
}
51+
}
52+
53+
(thrownException, suppressedException) match {
54+
case (Some(thrown), Some(suppressed)) =>
55+
thrown.addSuppressed(suppressed)
56+
throw thrown
57+
case (Some(thrown), None) => throw thrown
58+
case (None, Some(suppressed)) => throw suppressed
59+
case (None, None) => result.getOrElse(throw new IllegalArgumentException("Action returned null"))
60+
}
61+
}
62+
}

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala

Lines changed: 5 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import za.co.absa.cobrix.cobol.reader.index.entry.SparseIndexEntry
2828
import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser.PARAM_GENERATE_RECORD_ID
2929
import za.co.absa.cobrix.cobol.reader.parameters.{CobolParameters, CobolParametersParser, Parameters}
3030
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema
31+
import za.co.absa.cobrix.cobol.utils.UsingUtils
3132
import za.co.absa.cobrix.spark.cobol.reader.VarLenReader
3233
import za.co.absa.cobrix.spark.cobol.source.index.IndexBuilder
3334
import za.co.absa.cobrix.spark.cobol.source.parameters.LocalityParameters
@@ -223,7 +224,6 @@ object SparkCobolProcessor {
223224
indexes.flatMap(indexEntry => {
224225
val filePathName = filesMap(indexEntry.fileId)
225226
val path = new Path(filePathName)
226-
val fileSystem = path.getFileSystem(sconf.value)
227227
val fileName = path.getName
228228
val numOfBytes = if (indexEntry.offsetTo > 0L) indexEntry.offsetTo - indexEntry.offsetFrom else 0L
229229
val numOfBytesMsg = if (numOfBytes > 0) s"${numOfBytes / Constants.megabyte} MB" else "until the end"
@@ -237,9 +237,7 @@ object SparkCobolProcessor {
237237

238238
case _ =>
239239
spark.sparkContext.parallelize(listOfFiles).flatMap { inputFile =>
240-
val hadoopConfig = sconf.value
241240
log.info(s"Going to process data from $inputFile")
242-
val inputFs = new Path(inputFile).getFileSystem(hadoopConfig)
243241
val ifs = new FileStreamer(inputFile, sconf.value)
244242

245243
CobolProcessorBase.getRecordExtractor(readerParameters, copybookContents, ifs, None)
@@ -265,47 +263,16 @@ object SparkCobolProcessor {
265263

266264
Future {
267265
val hadoopConfig = sconf.value
268-
val inputFs = new Path(inputFIle).getFileSystem(hadoopConfig)
269-
val ifs = new FileStreamer(inputFIle, sconf.value)
266+
270267
val outputFile = new Path(outputPath, fileName)
271268
val outputFs = outputFile.getFileSystem(hadoopConfig)
272-
val ofs = new BufferedOutputStream(outputFs.create(outputFile, true))
273-
274-
var originalException: Throwable = null
275-
276-
val recordCount = try {
277-
cobolProcessor.process(ifs, ofs)(rawRecordProcessor)
278-
} catch {
279-
case ex: Throwable =>
280-
originalException = ex
281-
0L
282-
} finally {
283-
// Ugly code to ensure no exceptions escape unnoticed.
284-
try {
285-
ifs.close()
286-
} catch {
287-
case e: Throwable =>
288-
if (originalException != null) {
289-
originalException.addSuppressed(e)
290-
} else {
291-
originalException = e
292-
}
293-
}
294269

295-
try {
296-
ofs.close()
297-
} catch {
298-
case e: Throwable =>
299-
if (originalException != null) {
300-
originalException.addSuppressed(e)
301-
} else {
302-
originalException = e
303-
}
270+
val recordCount = UsingUtils.using(new FileStreamer(inputFIle, sconf.value)) { ifs =>
271+
UsingUtils.using(new BufferedOutputStream(outputFs.create(outputFile, true))) { ofs =>
272+
cobolProcessor.process(ifs, ofs)(rawRecordProcessor)
304273
}
305274
}
306275

307-
if (originalException != null) throw originalException
308-
309276
log.info(s"Writing to $outputFile succeeded!")
310277
recordCount
311278
}

0 commit comments

Comments
 (0)