November 13, 2018
Every database change is audited, it's a smart idea for the work that we do. Very quickly the amount of audit data becomes enormous, it cripples backups, uses a boat-load of disk space and makes for some cry-worthy storage costs. Not to mention the anxiety caused by jumbo backup processes, freezing servers and early morning monitoring alerts.
Chart showing increasing AWS costs.
So, I brew another coffee that's blacker than night in the antarctic and wonder how do I turn this horrible unusable data into something useful? I know a little about Elasticsearch and how it can be useful for storing and analyzing large amounts of data.
Right, looks like we have a combination of technology that will work to solve this problem. Now I just need to figure out how.
The tool supports 1-way push from PostgreSQL to Elasticsearch and real-time data streaming. The way it works is in essence very simple.
It performs 1 way data pushing between PostgreSQL and Elasticsearch and can optionally remove data from PostgreSQL after it's safely stored within Elasticsearch.
A nice bonus is that it will automatically look for the Hstore data type and parses it into JSON so that you can query the actual row data in Elasticsearch.
PG_LISTEN_TO_ID). This is due to the 8000 character hard limit on the messages sent through NOTIFY.
An option built into postgres-to-elasticsearch is to delete data from the table as it pushes it to Elasticsearch. This means that while it's running the database table is almost always empty or with very few items in it that are yet to be pushed.
This means that our database backups aren't crippled by huge amounts of write-only data and when we actually need to review the audit data we aren't plagued by huge query times thanks to the speed of Elasticsearch.
Observe your lovely real-time data in Kibana
tom@sweet:~/code/postgres-to-elasticsearch/src$ INFO=1 PG_SCHEMA=audit PG_TABLE=logged_actions PG_DELETE_ON_INDEX=1 INFO=1 ES_INDEX=audit ES_TYPE=record INDEX_QUEUE_TIMEOUT=60 PG_USERNAME=tom PG_PASSWORD=tom PG_DATABASE=my_database ES_PROTO=http STATUS_UPDATE_INTERVAL=2 INDEX_QUEUE_LIMIT=20 node index.js INFO Starting INFO Index audit does not exist, creating. INFO Index audit created successfully INFO Processing historic audit INFO Found Hstore type id, Hstore processing enabled. INFO LISTEN statement completed INFO No historic audit found, cannot get last processed event_id INFO Loading all available audit data for backlog processing INFO Historic audit query completed, processing... INFO No more historic rows to process, processed a total of 56 rows INFO Flushed 20 records INFO Flushed 20 records INFO Attempted to delete 20 rows, actually deleted 20 rows INFO Attempted to delete 20 rows, actually deleted 20 rows INFO Flushed 20 records INFO Attempted to delete 20 rows, actually deleted 20 rows INFO Flushed 8 records INFO Attempted to delete 8 rows, actually deleted 8 rows STATUS UPDATE: Created a total of 68 indexes with 12 queued at 1541617222 INFO Flushed 12 records INFO Attempted to delete 12 rows, actually deleted 12 rows
So what's happened here is:
audit.logged_actionstable (as defined by
audit.logged_actiondtable and processes it in chunks of 20 (defined by
NOTIFYstatements run by the trigger function in Postgres until the number of rows in the cache hits
INDEX_QUEUE_LIMITor it has been
INDEX_QUEUE_TIMEOUTseconds since the first item in the cache was added.
INFO Starting INFO Processing historic audit INFO Found Hstore type id, Hstore processing enabled. INFO LISTEN statement completed INFO Found last processed event_id: 55748 INFO Historic audit query completed, processing... INFO No more historic rows to process, processed a total of 5 rows INFO Deleted a total of 0 rows INFO Flushed 5 records INFO Attempted to delete 5 rows, actually deleted 5 rows
If you're considering implementing this then your own trigger is a good idea, this is what ours looks like:
CREATE OR REPLACE FUNCTION audit.if_modified_func() RETURNS trigger AS $BODY$ DECLARE audit_row audit.logged_actions; include_values BOOLEAN; log_diffs BOOLEAN; h_old hstore; h_new hstore; excluded_cols text = ARRAY::text; BEGIN IF TG_WHEN <> 'AFTER' THEN RAISE EXCEPTION 'audit.if_modified_func() may only run as an AFTER trigger'; END IF; audit_row = ROW( NEXTVAL('audit.logged_actions_event_id_seq'), -- event_id TG_TABLE_SCHEMA::text, -- schema_name TG_TABLE_NAME::text, -- table_name NULL, -- table_id TG_RELID, -- relation OID for much quicker searches session_user::text, -- session_user_name statement_timestamp(), -- action_tstamp_stm inet_client_addr(), -- client_addr current_query(), -- top-level query or queries (if multistatement) from client SUBSTRING(TG_OP,1,1), -- action NULL, NULL, -- row_data, changed_fields 'f' -- statement_only ); IF NOT TG_ARGV::BOOLEAN IS DISTINCT FROM 'f'::BOOLEAN THEN audit_row.client_query = NULL; END IF; IF TG_ARGV IS NOT NULL THEN excluded_cols = TG_ARGV::text; END IF; IF (TG_OP = 'UPDATE' AND TG_LEVEL = 'ROW') THEN IF TG_TABLE_NAME != 'migrations' AND TG_TABLE_NAME != 'sessions' THEN audit_row.table_id = OLD.id; END IF; audit_row.row_data = hstore(OLD.*); audit_row.changed_fields = (hstore(NEW.*) - audit_row.row_data) - excluded_cols; IF audit_row.changed_fields = hstore('') THEN -- All changed fields are ignored. Skip this update. RETURN NULL; END IF; ELSIF (TG_OP = 'DELETE' AND TG_LEVEL = 'ROW') THEN IF TG_TABLE_NAME != 'migrations' AND TG_TABLE_NAME != 'sessions' THEN audit_row.table_id = OLD.id; END IF; audit_row.row_data = hstore(OLD.*) - excluded_cols; ELSIF (TG_OP = 'INSERT' AND TG_LEVEL = 'ROW') THEN IF TG_TABLE_NAME != 'migrations' AND TG_TABLE_NAME != 'sessions' THEN audit_row.table_id = NEW.id; END IF; audit_row.row_data = hstore(NEW.*) - excluded_cols; ELSIF (TG_LEVEL = 'STATEMENT' AND TG_OP IN ('INSERT','UPDATE','DELETE','TRUNCATE')) THEN audit_row.statement_only = 't'; ELSE RAISE EXCEPTION '[audit.if_modified_func] - Trigger func added as trigger for unhandled case: %, %',TG_OP, TG_LEVEL; RETURN NULL; END IF; INSERT INTO audit.logged_actions VALUES (audit_row.*); IF (length(CAST(row_to_json(audit_row) as text)) > 8000) THEN PERFORM pg_notify(CAST('audit_id' AS text), audit_row.event_id::text); ELSE PERFORM pg_notify(CAST('audit' AS text), CAST(row_to_json(audit_row) as text)); END IF; RETURN NULL; END; $BODY$ LANGUAGE plpgsql VOLATILE SECURITY DEFINER COST 100;
I hope this article helps someone. Thanks for reading.
Tom is a Co-founder and CTO of OnSecurity. He has over a decade of web development experience and many years in IT Security. Feel free to connect with Tom on LinkedIn.