How do I handle errors in mapped functions in AWS Glue?

Christopher Armstrong

I'm using the map method of DynamicFrame (or, equivalently, the Map.apply method). I've noticed that any errors in the function that I pass to these functions are silently ignored and cause the returned DynamicFrame to be empty.

Say I have a job script like this:

import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *

glueContext = GlueContext(SparkContext.getOrCreate())
dyF = glueContext.create_dynamic_frame.from_catalog(database="radixdemo", table_name="census_csv")

def my_mapper(rec):
    import logging
    logging.error("[RADIX] An error-log from in the mapper!")
    print "[RADIX] from in the mapper!"
    raise Exception("[RADIX] A bug!")
dyF = dyF.map(my_mapper, 'my_mapper')

print "Count:  ", dyF.count()
dyF.printSchema()
dyF.toDF().show()

If I run this script in my Glue Dev Endpoint with gluepython, I get output like this:

[glue@ip-172-31-83-196 ~]$ gluepython gluejob.py
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/share/aws/glue/etl/jars/glue-assembly.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/05/23 20:56:46 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
Count:   0
root

++
||
++
++

Notes about this output:

  • I don't see the result of the print statement or the logging.error statement.
  • There's no indication that my_mapper raised an exception.
  • The printSchema call is showing that there is no schema metadata on the produced DynamicFrame
  • the show method also isn't producing any output, indicating that all the rows are gone.

Likewise, when I save this script as a job in the AWS Glue console, and run it, the job doesn't indicate any error occurred -- The Job Status is "Succeeded". Notably, I do get the print statements and logging.error calls output to the job logs, but only in the regular "Logs", not the "Error Logs".

What I want is to be able to indicate that my job has failed, and to be able to easily find these error logs. Most important is to just indicate that it has failed.

Is there a way to log an error within a mapped function in such a way that Glue will pick it up as an "Error Log" (and put it in that separate AWS CloudWatch Logs path)? If this happens, will it automatically mark the entire Job as Failing? Or is there some other way to explicitly fail the job from within a mapped function?

(my plan, if there is a way to log errors and/or mark the job as failed, is to create a decorator or other utility function that will automatically catch exceptions in my mapped functions and ensure that they are logged & marked as a failure).

Christopher Armstrong

The only way I have discovered to make a Glue job show up as "Failed" is to raise an exception from the main script (not inside a mapper or filter function, as those seem to get spun out to the Data Processing Units).

Fortunately, there is a way to detect if an exception occurred inside of a map or filter function: using the DynamicFrame.stageErrorsCount() method. It will return a number indicating how many exceptions were raised while running the most recent transformation.

So the correct way to solve all the problems:

  • make sure your map or transform function explicitly logs any exceptions that occur inside of it. This is best done by using a decorator function or via some other reusable mechanism, instead of relying on putting try/except statements in every single function you write.
  • after every single transformation that you want to catch errors in, call the stageErrorsCount() method and check if it's greater than 0. If you want to abort the job, just raise an exception.

For example:

import logging

def log_errors(inner):
    def wrapper(*args, **kwargs):
        try:
            inner(*args, **kwargs)
        except Exception as e:
            logging.exception('Error in function: {}'.format(inner))
            raise
    return wrapper

@log_errors
def foo(record):
    1 / 0

Then, inside your job, you'd do something like:

df = df.map(foo, "foo")
if df.stageErrorsCount() > 0:
    raise Exception("Error in job! See the log!")

Note that even calling logging.exception from inside the mapper function still doesn't write the logs to the error log in AWS CloudWatch Logs, for some reason. It gets written to the regular success logs. However, with this technique you will at least see that the job failed and be able to find the info in the logs. Another caveat: Dev Endpoints don't seem to show ANY logs from the mapper or filter functions.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

How do I handle errors from cleanup / destroy functions

How do I handle errors from libc functions in an idiomatic Rust manner?

How do I handle Import Errors?

How do I handle errors in a deferred function?

How do I handle errors with promises?

How do I handle JavaScript Fetch errors?

How do I handle RapidXml errors?

How do I handle transaction errors?

How Do I Handle Errors Globally in TestCafe

How do I handle stripe errors for "redirectToCheckout"?

How do i handle errors in Dio package

How do I escape spaces in argument values for an AWS Glue job?

How do I write messages to the output log on AWS Glue?

How do I override the couchbase query timeout in aws glue job?

How do I set multiple --conf table parameters in AWS Glue?

how can I handle both coroutines and regular functions in aws lambda

How do I handle errors in passport.deserializeUser()?

How do I handle errors in a worker pool using WaitGroup?

How do I handle errors that are already handled by another error?

How do I handle errors when responseType is blob using Vuejs?

In React, how do I handle errors coming back from the server?

How do i handle errors correctly and prevent showing folder directory

How do I handle errors in an f-string?

How do I handle different errors when working with an API?

How do I rethrow errors thrown by functions I'm calling?

How to do schema evolution in aws glue?

AWS Glue: How to handle nested JSON with varying schemas

Firebase Functions How To Handle Errors Properly

How to handle errors for inline functions inside an object