35 lines
1.1 KiB
Python
35 lines
1.1 KiB
Python
|
from psycopg2.extras import RealDictCursor
|
||
|
import psycopg2, os
|
||
|
|
||
|
class PostgresHandler:
|
||
|
def __init__(self):
|
||
|
self.connection = self.connect_to_pg()
|
||
|
self.cur = self.connection.cursor(cursor_factory=RealDictCursor)
|
||
|
|
||
|
def connect_to_pg(self):
|
||
|
try:
|
||
|
connection = psycopg2.connect(
|
||
|
dbname=os.getenv('PGDATABASE'),
|
||
|
host=os.getenv('PGHOST'),
|
||
|
user=os.getenv('PGUSER'),
|
||
|
password=os.getenv('PGPASSWORD'),
|
||
|
port=os.getenv('PGPORT'),
|
||
|
)
|
||
|
except Exception as e:
|
||
|
message=f"Connection to postgres database failed: {e}"
|
||
|
raise Exception(message)
|
||
|
print(f"Successfully connected to DB")
|
||
|
return connection
|
||
|
|
||
|
def execute_query(self, query):
|
||
|
try:
|
||
|
self.cur.execute(query)
|
||
|
results = self.cur.fetchall()
|
||
|
self.connection.commit()
|
||
|
self.connection.close()
|
||
|
return results
|
||
|
except Exception:
|
||
|
print("Error executing query")
|
||
|
raise
|
||
|
|