I have a function that is a query, trying to determine locks on a redshift database and send an OpsGenie alert if there are any:
def check_redshift_for_long_running_transactions():
# import psycopg2
from airflow.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHook
print("Connecting to DB")
rs_hook = RedshiftSQLHook(redshift_conn_id=config['connections']['redshift']['conn_id'])
cur = rs_hook.get_cursor()
print("Running query")
cur.execute("""
select b.block_txn_owner, b.block_txn_db, b.block_pid, b.block_txn_start, b.block_txn_duration,
a.txn_owner, a.txn_db, a.pid, a.txn_start, a.lock_mode, tablename, a.granted, txn_duration
from
(select a.txn_owner as block_txn_owner, a.txn_db as block_txn_db, a.pid as block_pid, a.txn_start as block_txn_start,
datediff(s,a.txn_start,getdate()) as block_txn_duration
from svv_transactions a
left join (select * from stv_tbl_perm where slice=0) c
on a.relation=c.id
left join pg_class d on a.relation=d.oid
group by txn_owner, a.txn_db, a.pid, a.txn_start,
datediff(s,a.txn_start,getdate())
) b
inner join
(
select a.txn_owner, a.txn_db, a.pid, b.pid as blocking_pid, a.txn_start, a.lock_mode,
nvl(trim(c."name"),d.relname) as tablename,
a.granted,
datediff(s,a.txn_start,getdate()) as txn_duration
from svv_transactions a
left join (select pid,relation,granted from pg_locks group by 1,2,3) b
on a.relation=b.relation and a.granted='f' and b.granted='t'
left join (select * from stv_tbl_perm where slice=0) c
on a.relation=c.id
left join pg_class d on a.relation=d.oid
where a.relation is not null
and b.pid is not null) a
on a.blocking_pid = b.block_pid
where txn_duration > 900
""")
response = cur.fetchall()
print(f'Found {len(response)} long-running queries.')
if len(response) != 0:
raise ValueError(f"Error: Found a long running transaction.\nQuery output:\n{response}")
cur.close()
I have my dag specified at the bottom with
with DAG(dag_id=dag_id, **config['airflow'],
on_failure_callback = opsgenie_alert.create_alert) as dag:
check_redshift_for_long_running_transactions = PythonOperator(
task_id='Check_Redshift_for_long_running_transactions',
python_callable=check_redshift_for_long_running_transactions
)
check_redshift_for_long_running_transactions
Current process: Sends OpsGenie alert when there is data returned in the query, there is no other information other than the alert going off.
Desired Result: Query detects locks and is able to send the response of that query to OpsGenie alerts so when the alert is sent out, we can see the query information.
Ive tried create alert methods to pass a payload but cannot pickup the response from the query.