How we solved our problem with audit, Postgres and Elasticsearch

November 13, 2018

The problem

Every database change is audited, it's a smart idea for the work that we do. Very quickly the amount of audit data becomes enormous, it cripples backups, uses a boat-load of disk space and makes for some cry-worthy storage costs. Not to mention the anxiety caused by jumbo backup processes, freezing servers and early morning monitoring alerts.

Chart showing increasing AWS costs. Costs

Hmmmm, what to do?

So, I brew another coffee that's blacker than night in the antarctic and wonder how do I turn this horrible unusable data into something useful? I know a little about Elasticsearch and how it can be useful for storing and analyzing large amounts of data.

OK I think I've got an idea!

Right, looks like we have a combination of technology that will work to solve this problem. Now I just need to figure out how.

Idea

Several hours (days) later... postgres-to-elasticsearch was born.

The tool supports 1-way push from PostgreSQL to Elasticsearch and real-time data streaming. The way it works is in essence very simple.

What does it do?

It performs 1 way data pushing between PostgreSQL and Elasticsearch and can optionally remove data from PostgreSQL after it's safely stored within Elasticsearch.

A nice bonus is that it will automatically look for the Hstore data type and parses it into JSON so that you can query the actual row data in Elasticsearch.

When it starts

When it's running

How this solves the problem

An option built into postgres-to-elasticsearch is to delete data from the table as it pushes it to Elasticsearch. This means that while it's running the database table is almost always empty or with very few items in it that are yet to be pushed.

This means that our database backups aren't crippled by huge amounts of write-only data and when we actually need to review the audit data we aren't plagued by huge query times thanks to the speed of Elasticsearch.

Lovely data

Observe your lovely real-time data in Kibana

Kibana

What does the output actually look like?

Behind the scenes

On first run

tom@sweet:~/code/postgres-to-elasticsearch/src$ INFO=1 PG_SCHEMA=audit PG_TABLE=logged_actions PG_DELETE_ON_INDEX=1 INFO=1 ES_INDEX=audit ES_TYPE=record INDEX_QUEUE_TIMEOUT=60 PG_USERNAME=tom PG_PASSWORD=tom PG_DATABASE=my_database ES_PROTO=http STATUS_UPDATE_INTERVAL=2 INDEX_QUEUE_LIMIT=20 node index.js 
INFO Starting
INFO Index audit does not exist, creating.
INFO Index audit created successfully
INFO Processing historic audit
INFO Found Hstore type id, Hstore processing enabled.
INFO LISTEN statement completed
INFO No historic audit found, cannot get last processed event_id
INFO Loading all available audit data for backlog processing
INFO Historic audit query completed, processing...
INFO No more historic rows to process, processed a total of 56 rows
INFO Flushed 20 records
INFO Flushed 20 records
INFO Attempted to delete 20 rows, actually deleted 20 rows
INFO Attempted to delete 20 rows, actually deleted 20 rows
INFO Flushed 20 records
INFO Attempted to delete 20 rows, actually deleted 20 rows
INFO Flushed 8 records
INFO Attempted to delete 8 rows, actually deleted 8 rows
STATUS UPDATE: Created a total of 68 indexes with 12 queued at 1541617222
INFO Flushed 12 records
INFO Attempted to delete 12 rows, actually deleted 12 rows

So what's happened here is:

On second run

INFO Starting
INFO Processing historic audit
INFO Found Hstore type id, Hstore processing enabled.
INFO LISTEN statement completed
INFO Found last processed event_id: 55748
INFO Historic audit query completed, processing...
INFO No more historic rows to process, processed a total of 5 rows
INFO Deleted a total of 0 rows
INFO Flushed 5 records
INFO Attempted to delete 5 rows, actually deleted 5 rows

The PostgreSQL trigger

If you're considering implementing this then your own trigger is a good idea, this is what ours looks like:

CREATE OR REPLACE FUNCTION audit.if_modified_func()
  RETURNS trigger AS
$BODY$
DECLARE
    audit_row audit.logged_actions;
    include_values BOOLEAN;
    log_diffs BOOLEAN;
    h_old hstore;
    h_new hstore;
    excluded_cols text[] = ARRAY[]::text[];
BEGIN
    IF TG_WHEN <> 'AFTER' THEN
        RAISE EXCEPTION 'audit.if_modified_func() may only run as an AFTER trigger';
    END IF;

    audit_row = ROW(
        NEXTVAL('audit.logged_actions_event_id_seq'), -- event_id
        TG_TABLE_SCHEMA::text,                        -- schema_name
        TG_TABLE_NAME::text,                          -- table_name
        NULL,                                         -- table_id
        TG_RELID,                                     -- relation OID for much quicker searches
        session_user::text,                           -- session_user_name
        statement_timestamp(),                        -- action_tstamp_stm
        inet_client_addr(),                           -- client_addr
        current_query(),                              -- top-level query or queries (if multistatement) from client
        SUBSTRING(TG_OP,1,1),                         -- action
        NULL, NULL,                                   -- row_data, changed_fields
        'f'                                           -- statement_only
        );

    IF NOT TG_ARGV[0]::BOOLEAN IS DISTINCT FROM 'f'::BOOLEAN THEN
        audit_row.client_query = NULL;
    END IF;

    IF TG_ARGV[1] IS NOT NULL THEN
        excluded_cols = TG_ARGV[1]::text[];
    END IF;

    IF (TG_OP = 'UPDATE' AND TG_LEVEL = 'ROW') THEN
        IF TG_TABLE_NAME != 'migrations' AND TG_TABLE_NAME != 'sessions' THEN
            audit_row.table_id = OLD.id;
        END IF;
        audit_row.row_data = hstore(OLD.*);
        audit_row.changed_fields =  (hstore(NEW.*) - audit_row.row_data) - excluded_cols;
        IF audit_row.changed_fields = hstore('') THEN
            -- All changed fields are ignored. Skip this update.
            RETURN NULL;
        END IF;
    ELSIF (TG_OP = 'DELETE' AND TG_LEVEL = 'ROW') THEN
        IF TG_TABLE_NAME != 'migrations' AND TG_TABLE_NAME != 'sessions' THEN
            audit_row.table_id = OLD.id;
        END IF;
        audit_row.row_data = hstore(OLD.*) - excluded_cols;
    ELSIF (TG_OP = 'INSERT' AND TG_LEVEL = 'ROW') THEN
        IF TG_TABLE_NAME != 'migrations' AND TG_TABLE_NAME != 'sessions' THEN
            audit_row.table_id = NEW.id;
        END IF;
        audit_row.row_data = hstore(NEW.*) - excluded_cols;
    ELSIF (TG_LEVEL = 'STATEMENT' AND TG_OP IN ('INSERT','UPDATE','DELETE','TRUNCATE')) THEN
        audit_row.statement_only = 't';
    ELSE
        RAISE EXCEPTION '[audit.if_modified_func] - Trigger func added as trigger for unhandled case: %, %',TG_OP, TG_LEVEL;
        RETURN NULL;
    END IF;
    INSERT INTO audit.logged_actions VALUES (audit_row.*);

    IF (length(CAST(row_to_json(audit_row) as text)) > 8000) THEN
    PERFORM pg_notify(CAST('audit_id' AS text), audit_row.event_id::text);
    ELSE 
    PERFORM pg_notify(CAST('audit' AS text), CAST(row_to_json(audit_row) as text));
    END IF;

    RETURN NULL;
END;
$BODY$
  LANGUAGE plpgsql VOLATILE SECURITY DEFINER
  COST 100;

I hope this article helps someone. Thanks for reading.


About the author

Tom is a Co-founder and CTO of OnSecurity. He has over a decade of web development experience and many years in IT Security. Feel free to connect with Tom on LinkedIn.

Get started now

Send us your email and we'll be in touch soon.