Automating Athena Queries with Python



Automating Athena Queries with Python

Introduction

Over the last few weeks I’ve been using Amazon Athena quite heavily. For those of you who haven’t encountered it, Athena basically lets you query data stored in various formats on S3 using SQL (under the hood it’s a managed Presto/Hive Cluster). Pricing for Athena is pretty nice as well, you pay only for the amount of data you process and that’s relatively cheap at $5 per TB when you consider the effort to set up EMR Clusters for one-time or very infrequent queries and transformations.

In this post I’m going to share some code I’m using to automate queries in Athena.

Using Athena

Interactively

If you use Athena interactively, it is very simple - you have your schemas and tables on the left, your editor on the right and a big beautiful Run query button. Once you enter your query, you wait for the result, it shows a pretty loading-animation and afterwards you get your data, which you could then download as CSV.

Athena Gui

Inside of your code

Using Athena inside of your code is a little more annoying, at least when you’re using Lambda and/or try to keep things serverless. Running Athena queries from the SDK is pretty straightforward.

If you were to do it using boto3 it would look something like this:

import boto3

query = "Your query"
database = "database_name"
athena_result_bucket = "s3://my-bucket/"

response = client.start_query_execution(
    QueryString=query,
    QueryExecutionContext={
        'Database': database
    },
    ResultConfiguration={
        'OutputLocation': athena_result_bucket,
    }
)

query_execution_id = response["QueryExecutionId"]

Running queries is all fine and dandy, but you usually care about the result of queries as well or at least would like to know, if they succeeded.

Then you encounter the problem, that the order of magnitude for query runtime in Athena is not milliseconds, rather seconds and minutes - up to a limit of 30 minutes. This is a problem, because the Lambda execution limit is currently at 15 minutes and long running Lambdas aren’t cool anyways.

(If you only have short running queries, let’s say up to 5 minutes and you know that beforehand, you can skip the section for short running queries)

Enter: Step Functions. Yes I know, having to use yet another service isn’t ideal, but there are two limitations with Athena:

  1. There is no Lambda trigger, when the query terminates
  2. There is no other integration like SNS or SQS for queries that finish

You could summarize it as: Athena lacks integration for the result of queries (if I have overlooked something, please let me know!).

If you haven’t yet encountered Step Functions: step functions help you automate workflows that include several AWS services - you define your workflow as a state machine and AWS takes care of orchestrating your resources in the order and with the constraints you specified.

I usually use this pattern in my step functions:

Step Function Workflow

A lambda function starts the long running Athena query, then we enter a kind of loop. First of all, a wait step pauses the execution, then another lambda function queries the state of the query execution. A choice-step (wording?) checks if the query has succeeded, if yes - we continue. If it’s still running, we move back to the waiting step (adding error-handling is trivial here).

You can find a sample project with the code for all of the functions on Github. To make life easier for myself I wrote the athena_helper.py mini-library, which wraps some on the annoying parts of the API.

Now I’m going to show you first of all the code for long running queries and afterwards a simplified version for short queries.

Automating Athena

Long running queries

As mentioned above, there are 3 Lambda functions involved in this. We’re going to start with the function that executes the query:

def start_long_running_query(event, context):

    # This is the default table
    query = "select * from elb_logs limit 1"
    database_name = "sampledb"

    # Build the name of the default Athena bucket
    account_id = BOTO_SESSION.client('sts').get_caller_identity().get('Account')
    region = BOTO_SESSION.region_name
    result_bucket = "s3://aws-athena-query-results-{}-{}/".format(account_id, region)

    my_query = AthenaQuery(query, database_name, result_bucket)

    query_execution_id = my_query.execute()

    # This will be processed by our waiting-step
    event = {
        "MyQueryExecutionId": query_execution_id,
        "WaitTask": {
            "QueryExecutionId": query_execution_id,
            "ResultPrefix": "Sample"
        }
    }

    return event

This functions sets up the relevant parameters for the query:

  • query - the query itself
  • database_name - the name of the schema the query is executed in
  • result_bucket - this builds the name of the result bucket that gets created by default

The actual code for executing the query is just two lines, we build the AthenaQuery object and call execute() on it and receive the execution id of the query:

my_query = AthenaQuery(query, database_name, result_bucket)
query_execution_id = my_query.execute()

Afterwards we build the object that gets passed to the next step. Passing down the Query Execution Id would be sufficient, but I like stats.

The next Lambda function is considerably simpler, it takes the QueryExecutionId out of the input event, builds an AthenaQuery object from it and retrieves the current status of the query.

def get_long_running_query_status(event, context):

    query_execution_id = event["WaitTask"]["QueryExecutionId"]
    aq = AthenaQuery.from_execution_id(query_execution_id)

    status_information = aq.get_status_information()
    event["WaitTask"]["QueryState"] = status_information["QueryState"]

    status_key = "{}StatusInformation".format(event["WaitTask"]["ResultPrefix"])
    event[status_key] = status_information

    return event

A choice-step in the step function processes this - you can find the full definition in the serverless.yml of the project, but here is an excerpt of it:

check_query_status:
    Type: Choice
    Choices:
    - Or:
        - Variable: "$.WaitTask.QueryState"
            StringEquals: FAILED
        - Variable: "$.WaitTask.QueryState"
            StringEquals: CANCELED
      Next: query_failed

This basically tells the state machine to go to the error state query_failed when the query FAILED or is in status CANCELED.

We only get to the next step, if the query has succeeded. This Lambda again builds the AthenaQuery object from the QueryExecutionId and retrieves the result:

def get_long_running_result(event, context):
    query_execution_id = event["MyQueryExecutionId"]

    # Build the query object from the execution id
    aq = AthenaQuery.from_execution_id(query_execution_id)

    # Fetch the result
    result_data = aq.get_result()

    # Do whatever you want with the result

    event["GotResult"] = True
    return event

Inside of this function you can process the results the way you want. This is how you can deal with long running Athena-queries in Lambda.

Let’s have a look at the much simpler case now: short running queries:

Short running queries

I’d recommend this for queries that run for up to 5 minutes - otherwise it’s probably worth setting up the state machine as described above.

The code for this one relies on the athena_helper.py as well:

import boto3
from athena_helper import AthenaQuery

BOTO_SESSION = boto3.Session()

def short_running_query(event, context):

    # Build the name of the default Athena bucket
    account_id = BOTO_SESSION.client('sts').get_caller_identity().get('Account')
    region = BOTO_SESSION.region_name
    result_bucket = "s3://aws-athena-query-results-{}-{}/".format(account_id, region)

    my_query = AthenaQuery(
        "select elb_name from elb_logs limit 1",
        "sampledb",
        result_bucket
    )

    my_query.execute()
    result_data = my_query.get_result()

    # Process the result

    return result_data

This uses the same functions that have been described above, only without the waiting step in between - the get_result() function will actually wait for the query to finish - up to a timeout that’s by default set to 60 seconds.

Conclusion

In this post I’ve shown you how to use the athena_helper mini-library to work with long-running and short-running Athena queries in python.

If you have any questions, feedback or suggestions, feel free to reach out to me on Twitter (@Maurice_Brg)

Photo by Hitesh Choudhary on Unsplash

Similar Posts You Might Enjoy

Adding Basic Authentication to the Serverless Dash App

I’ll teach you how to add interactive basic auth to the Serverless Dash app that we deployed recently. - by Maurice Borgmeier

Deploying a Serverless Dash App with AWS SAM and Lambda

Today I’m going to show you how to deploy a Dash app in a Lambda Function behind an API Gateway. This setup is truly serverless and allows you to only pay for infrastructure when there is traffic, which is an ideal deployment model for small (internal) applications. Dash is a Python framework that enables you to build interactive frontend applications without writing a single line of Javascript. Internally and in projects we like to use it in order to build a quick proof of concept for data driven applications because of the nice integration with Plotly and pandas. - by Maurice Borgmeier

Lambda Destinations can improve success- and error handling for asynchronous Lambda Functions

Lambda destinations enable you to respond to successful or failed asynchronous invocations in a way that wasn’t possible before the feature was added. I explain what this feature allows you to do and show you how to use it. - by Maurice Borgmeier