November 13, 2018
Every database change is audited, it's a smart idea for the work that we do. Very quickly the amount of audit data becomes enormous, it cripples backups, uses a boat-load of disk space and makes for some cry-worthy storage costs. Not to mention the anxiety caused by jumbo backup processes, freezing servers and early morning monitoring alerts.
Chart showing increasing AWS costs.
Hmmmm, what to do?
So, I brew another coffee that's blacker than night in the antarctic and wonder how do I turn this horrible unusable data into something useful? I know a little about Elasticsearch and how it can be useful for storing and analyzing large amounts of data.
OK I think I've got an idea!
- The audit is powered by database triggers in PostgreSQL.
- Postgres also has the ability to allow asynchronous triggers using NOTIFY and LISTEN.
- Elasticsearch is good at analyzing data.
Right, looks like we have a combination of technology that will work to solve this problem. Now I just need to figure out how.
Several hours (days) later... postgres-to-elasticsearch was born.
The tool supports 1-way push from PostgreSQL to Elasticsearch and real-time data streaming. The way it works is in essence very simple.
What does it do?
It performs 1 way data pushing between PostgreSQL and Elasticsearch and can optionally remove data from PostgreSQL after it's safely stored within Elasticsearch.
A nice bonus is that it will automatically look for the Hstore data type and parses it into JSON so that you can query the actual row data in Elasticsearch.
When it starts
- It checks Elasticsearch for the last UID that it archived
- If it finds a UID it will load all of the data from Postgres that has been added since the service was last run and push it to Elasticsearch. If it can't find the last UID it will push the whole lot.
- It runs a LISTEN statement on the Postgres and waits for data to be pushed from the database.
When it's running
- It waits patiently for data to be sent to it via the NOTIFY postgres statement. Note that there is 2 NOTIFYs that can run (which can be defined by
PG_LISTEN_TO_ID). This is due to the 8000 character hard limit on the messages sent through NOTIFY.
- When it receives data it will cache it locally for a defined period or up to a defined cache size limit.
- It pushes the data to Elasticsearch
How this solves the problem
An option built into postgres-to-elasticsearch is to delete data from the table as it pushes it to Elasticsearch. This means that while it's running the database table is almost always empty or with very few items in it that are yet to be pushed.
This means that our database backups aren't crippled by huge amounts of write-only data and when we actually need to review the audit data we aren't plagued by huge query times thanks to the speed of Elasticsearch.
Observe your lovely real-time data in Kibana
What does the output actually look like?
On first run
tom@sweet:~/code/postgres-to-elasticsearch/src$ INFO=1 PG_SCHEMA=audit PG_TABLE=logged_actions PG_DELETE_ON_INDEX=1 INFO=1 ES_INDEX=audit ES_TYPE=record INDEX_QUEUE_TIMEOUT=60 PG_USERNAME=tom PG_PASSWORD=tom PG_DATABASE=my_database ES_PROTO=http STATUS_UPDATE_INTERVAL=2 INDEX_QUEUE_LIMIT=20 node index.js INFO Starting INFO Index audit does not exist, creating. INFO Index audit created successfully INFO Processing historic audit INFO Found Hstore type id, Hstore processing enabled. INFO LISTEN statement completed INFO No historic audit found, cannot get last processed event_id INFO Loading all available audit data for backlog processing INFO Historic audit query completed, processing... INFO No more historic rows to process, processed a total of 56 rows INFO Flushed 20 records INFO Flushed 20 records INFO Attempted to delete 20 rows, actually deleted 20 rows INFO Attempted to delete 20 rows, actually deleted 20 rows INFO Flushed 20 records INFO Attempted to delete 20 rows, actually deleted 20 rows INFO Flushed 8 records INFO Attempted to delete 8 rows, actually deleted 8 rows STATUS UPDATE: Created a total of 68 indexes with 12 queued at 1541617222 INFO Flushed 12 records INFO Attempted to delete 12 rows, actually deleted 12 rows
So what's happened here is:
- It connects to Elasticsearch using the
- The Elasticsearch index doesn't exist, so it creates it.
- It tries to find the last processed primary key of the
audit.logged_actionstable (as defined by
- It can't find it because we just created the index so it's empty.
- It fetches all of the data from the
audit.logged_actiondtable and processes it in chunks of 20 (defined by
- It deletes the rows as it processes them from the database
- A status update is shown after 2 minutes (
- It continues to listen to the
NOTIFYstatements run by the trigger function in Postgres until the number of rows in the cache hits
INDEX_QUEUE_LIMITor it has been
INDEX_QUEUE_TIMEOUTseconds since the first item in the cache was added.
On second run
INFO Starting INFO Processing historic audit INFO Found Hstore type id, Hstore processing enabled. INFO LISTEN statement completed INFO Found last processed event_id: 55748 INFO Historic audit query completed, processing... INFO No more historic rows to process, processed a total of 5 rows INFO Deleted a total of 0 rows INFO Flushed 5 records INFO Attempted to delete 5 rows, actually deleted 5 rows
- It's found the last processed id (the last record inserted into Elasticsearch before it was stopped)
- It deletes any audit with an id that is lower than that last processed id (to remove any audit that wasn't deleted before)
- It finds 5 new rows since it last ran and adds them to the cache.
- It continues as before.
The PostgreSQL trigger
If you're considering implementing this then your own trigger is a good idea, this is what ours looks like:
CREATE OR REPLACE FUNCTION audit.if_modified_func() RETURNS trigger AS $BODY$ DECLARE audit_row audit.logged_actions; include_values BOOLEAN; log_diffs BOOLEAN; h_old hstore; h_new hstore; excluded_cols text = ARRAY::text; BEGIN IF TG_WHEN <> 'AFTER' THEN RAISE EXCEPTION 'audit.if_modified_func() may only run as an AFTER trigger'; END IF; audit_row = ROW( NEXTVAL('audit.logged_actions_event_id_seq'), -- event_id TG_TABLE_SCHEMA::text, -- schema_name TG_TABLE_NAME::text, -- table_name NULL, -- table_id TG_RELID, -- relation OID for much quicker searches session_user::text, -- session_user_name statement_timestamp(), -- action_tstamp_stm inet_client_addr(), -- client_addr current_query(), -- top-level query or queries (if multistatement) from client SUBSTRING(TG_OP,1,1), -- action NULL, NULL, -- row_data, changed_fields 'f' -- statement_only ); IF NOT TG_ARGV::BOOLEAN IS DISTINCT FROM 'f'::BOOLEAN THEN audit_row.client_query = NULL; END IF; IF TG_ARGV IS NOT NULL THEN excluded_cols = TG_ARGV::text; END IF; IF (TG_OP = 'UPDATE' AND TG_LEVEL = 'ROW') THEN IF TG_TABLE_NAME != 'migrations' AND TG_TABLE_NAME != 'sessions' THEN audit_row.table_id = OLD.id; END IF; audit_row.row_data = hstore(OLD.*); audit_row.changed_fields = (hstore(NEW.*) - audit_row.row_data) - excluded_cols; IF audit_row.changed_fields = hstore('') THEN -- All changed fields are ignored. Skip this update. RETURN NULL; END IF; ELSIF (TG_OP = 'DELETE' AND TG_LEVEL = 'ROW') THEN IF TG_TABLE_NAME != 'migrations' AND TG_TABLE_NAME != 'sessions' THEN audit_row.table_id = OLD.id; END IF; audit_row.row_data = hstore(OLD.*) - excluded_cols; ELSIF (TG_OP = 'INSERT' AND TG_LEVEL = 'ROW') THEN IF TG_TABLE_NAME != 'migrations' AND TG_TABLE_NAME != 'sessions' THEN audit_row.table_id = NEW.id; END IF; audit_row.row_data = hstore(NEW.*) - excluded_cols; ELSIF (TG_LEVEL = 'STATEMENT' AND TG_OP IN ('INSERT','UPDATE','DELETE','TRUNCATE')) THEN audit_row.statement_only = 't'; ELSE RAISE EXCEPTION '[audit.if_modified_func] - Trigger func added as trigger for unhandled case: %, %',TG_OP, TG_LEVEL; RETURN NULL; END IF; INSERT INTO audit.logged_actions VALUES (audit_row.*); IF (length(CAST(row_to_json(audit_row) as text)) > 8000) THEN PERFORM pg_notify(CAST('audit_id' AS text), audit_row.event_id::text); ELSE PERFORM pg_notify(CAST('audit' AS text), CAST(row_to_json(audit_row) as text)); END IF; RETURN NULL; END; $BODY$ LANGUAGE plpgsql VOLATILE SECURITY DEFINER COST 100;
I hope this article helps someone. Thanks for reading.
About the author
Tom is a Co-founder and CTO of OnSecurity. He has over a decade of web development experience and many years in IT Security. Feel free to connect with Tom on LinkedIn.