Who we are

Contacts

1815 W 14th St, Houston, TX 77008

info@newmathdata.com

281-817-6190

General

S3 Custom Lifecycles: An AWS Glue solution to incorporate read operations into S3 Lifecycles

S3 Custom LifecyclesAn AWS Glue solution to incorporate read operations into S3 Lifecycles

Introduction

S3 Lifecycles provide a fantastic way to manage the lifetimes of S3 Objects. Working in AWS and in bigger organizations, you will inevitably have objects lingering (often in standard storage), costing you money. Lifecycle rules offer an easy way to trim some fat, which can both help you save money and exist in a tidier s3 environment.

S3 Lifecycles accomplish these goals via storage transitions and expirations, providing two separate avenues for cost reduction depending on your needs. An example of a storage transition is when you move an object from s3 standard to Glacier, or any other storage classExpirations happen when you delete all non-current versions of an object, keeping only the latest version; this assumes your bucket has versioning enabled which might not make sense in all systems.

Lifecycles can be applied to an entire bucket or to a specific prefix within a bucket. Additionally, you can use object tagging to determine which rules to apply to objects. Finally, you can specify a minimum or maximum object size to further enhance the flexibility of the rule.

They are a pivotal part of the venerable S3 service, and if you have been working in AWS for a decent amount of time, you have likely taken advantage of this feature. Even if you have used the lifecycle policy feature before, it is worth periodically revisiting your configuration to make sure you are using it in the most efficient and cost-effective ways for your system.

The Problem

While I am a fan of S3 Lifecycles and think they’re a must in any good system design, they have one glaring weakness: they only use the LastModifiedTime of an object or version to determine the age of an object.

Why is this important?

It is important because reads of an object do not update this field. So, if you have a lifecycle rule in a non-versioned bucket that expires objects older than a week and that week has passed, this object is going to be deleted if it has only been read in the interim.

Maybe Lifecycles in their current form are enough for your needs. In some of my recent machine learning engineering work, we determined the current capabilities were not enough to meet our needs.

Our Scenario

To set the stage: we have a few buckets we use for intermediate data science development. Teams create datasets and then use them for a variety of models and development. Model development and tuning can take a while and datasets tend to evolve as the model crystallizes. As a result, this bucket can become very messy and costly. We want to constantly take out the trash, but we don’t want to delete things that are actually useful. This was not possible using S3 Lifecycles.

It was not only our data science buckets that got bloated, plenty of other buckets suffered the same problem, and we realized we could create a general pattern and apply it to whatever buckets or bucket prefixes necessary.

We toyed with the idea of using a Lambda, Glue, or AWS Batch as well, and depending on your use-case, any would be viable. Ultimately we decided to implement a custom S3 lifecycle using a scheduled glue job.

How We Implemented a Custom Glue Lifecycle

This section will go through how we created a custom lifecycle that takes into account when the file was accessed. To do this we used a Glue job and CloudTrail data events.

Creating the Initial Inventory

We know we need to get the initial state of any given bucket. Additionally, while we know that S3 Inventories exist, we decided to create our own inventory using a simple Python script that uses the boto3 list_objects_v2() API call.

This function gets us a list of all the latest versions of objects in a bucket. The basic script can be run from a Glue job, a Lambda, or a local machine with appropriately configured access.

From here we had a couple options. We could call head_object() on each object to get its LastModifiedTime or just assign them all some chosen datetime and in a sense “start fresh” with the state of our bucket. It’s as if all our objects were created today or whatever date we choose.

We took our list of objects and created our initial bucket inventory csv file with a name like modelling_bucket_inventory_2025–02–01T14:19:15.366645+00:00.csv

The schema of this inventory was as follows:

bucket_name: str
object_key: str
last_accessed_date: ISO 8601 str
status: str
deletion_date: ISO 8601 str | None

This is our initial state, and after this we never need to perform this inventory creation again because our Glue job outputs a newer inventory after every run.

Below is an example of the script we used to create our initial inventories. This uses head_object and you cannot batch this API call so it is slower. A reminder that the other option is to just use a default value for last_accessed_date.

"""
Script that generates an initial inventory for the sagemaker modelling bucket

We first crawl the bucket to get all objects, then we call head_object on each object to get the last modified date

We construct a pandas dataframe with the following columns:

- bucket_name
- object_key
- last_accessed_date
- status
- deletion_date

where last_accessed_date is the last modified time of the s3 object

We output this as a csv and upload it to s3 to be used as the intial state of our inventories
"""

import argparse
from datetime import datetime, timezone

import boto3
import pandas as pd


def crawl_s3_bucket(s3_client, bucket_name, prefix=""):
    """
    Crawls an S3 bucket and lists all objects.

    Args:
        bucket_name (str): The name of the S3 bucket to crawl.
        prefix (str): (Optional) A prefix to filter objects (e.g., "folder1/").

    Returns:
        List of all object keys in the bucket.
    """
    all_objects = []
    try:
        paginator = s3_client.get_paginator("list_objects_v2")
        response_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix)

        for page in response_iterator:
            if "Contents" in page:
                for obj in page["Contents"]:
                    all_objects.append(obj["Key"])

    except Exception as e:
        print(f"Error crawling bucket {bucket_name}: {e}")

    return all_objects


def get_last_access_dates(s3_client, bucket, objects):
    """
    For a list of objects in a bucket we call head_object on each of them and grabbed hte LastModified
    field in the response

    We use this to create our intial inventory where last_accessed_date is originally the last modified date

    The modellingBucketLifecycle glue job will eventually update this using s3 data access event logs
    """
    last_accessed_date = []

    for i, obj in enumerate(objects):
        last_modified = s3_client.head_object(Bucket=bucket, Key=obj)["LastModified"]
        last_modified_iso = last_modified.isoformat()
        last_accessed_date.append(last_modified_iso)

    return last_accessed_date


def main(env: str):
    AWS_ENV = env
    BUCKET_NAME = f"my-bucket-{AWS_ENV}"

    session = boto3.Session(profile_name=f"data-engineer-{AWS_ENV}", region_name="us-east-1")
    s3_client = session.client("s3")

    objects = crawl_s3_bucket(s3_client, bucket_name=BUCKET_NAME)

    df = pd.DataFrame(
        {
            "bucket_name": BUCKET_NAME,
            "object_key": objects,
            "last_accessed_date": get_last_access_dates(s3_client, BUCKET_NAME, objects),
            "status": "active",
            "deletion_date": pd.NA,
        },
        index=None,
    )
    output_file = f"my_bucket_inventory_{datetime.now(tz=timezone.utc).isoformat()}.csv"
    df.to_csv(output_file, index=False)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--env", type=str, default="dev", help="The environment to run the script in.")
    args, _ = parser.parse_known_args()
    main(args.env)

CloudTrail Data Events

The next key piece of this involves CloudTrail Data Events.

By default, CloudTrail does not log anything related to data events, and using this feature does cost extra; however, if you are dealing with sensitive data you will likely want a log of operations done on your data.

We already had trails setup for our buckets of interest, outputting logs to another s3 bucket. We could access this bucket where the logs are all stored in year/month/day partitions as batches of json.gz files.

These logs contain the history of all events that have occurred in a bucket, and we can use this to update the state of our inventory.

Glue Job

Our glue job first ingests the most recent inventory. Using this we look at the most recent accessed_date in the inventory and then pull in all the data event logs between then and now. This is a simple way to ensure we always have the full picture of what has transpired in the interim.

"""
Grab the most recent inventory, find the latest access date and use that 
to calculate how many days to "look back" in our cloudtrail data event logs.

Then, using this generate the s3 paritions we need to read and use Glue 
Dynamic frames to read in all json.gz files for all relevant prefixes
"""

sorted_inventories = get_sorted_inventory_keys(MY_BUCKET, INVENTORY_PREFIX)
latest_inventory_key = sorted_inventories[0]
logger.info(f"Latest inventory key: {latest_inventory_key}")

# Read in our latest inventory
inventory_df = read_bucket_inventory_csv(spark, MY_BUCKET, latest_inventory_key)
logger.info(f"Number of rows in inventory: {inventory_df.count()}")

# Grab the maximum date from the inventory to determine the lookback days for the s3 access logs
latest_inventory_access_date = get_latest_inventory_access_date(inventory_df)
lookback_days = calculate_lookback_days(latest_inventory_access_date)
logger.info(f"lookback_days: {lookback_days}")

# Generating the prefixes we need to read
access_log_paths = generate_s3_access_log_paths(S3_DATA_ACCESS_LOGS_BUCKET, ACCESS_LOGS_PREFIX, lookback_days)
    logger.info(f"access_log_paths: {access_log_paths}")

# Use Glue Dynamic frame to grab all of the relevant logs
dynamic_frame = glue_context.create_dynamic_frame.from_options(
        connection_type="s3",
        connection_options={
            "paths": access_log_paths,
            "recurse": True,
        },
        format="json",
        format_options={"compression": "gzip"},
    )

As you can see, the create_dynamic_frame method from the glue context handles reading in multiple daily partitions containing json.gz files really well.

Once we have all the access logs we can perform some basic filtering. Afterwards, we have a second dataframe that shows the most recent access dates alongside our initial inventory dataframe. We can use these two dataframes to create an up-to-date inventory of our current bucket.

The final part of this process involves outer joining our two dataframes and performing some data transformations.

In an attempt to limit the complexity of this article I won’t go into it here, but if you are interested in how we do this part feel free to leave a comment and I would be happy sharing the code for this as well.

Armed with our updated inventory, we can now take a look at which objects haven’t been accessed in X number of days and delete those objects. This process ensures that any object that has been accessed at all will not be deleted.

Finally, we output the new inventory into its proper location in s3 where it will be picked up on the next run.

This process allowed for us to make sure we were not too quick to delete files our data scientists need. We hope you found this article useful and encourage you to reach out if you have any questions or comments!