diff --git a/backend/api/pipelines.py b/backend/api/pipelines.py new file mode 100644 index 0000000..683e1ae --- /dev/null +++ b/backend/api/pipelines.py @@ -0,0 +1,215 @@ +def parse_int(args): + try: + return int( args ) + except ValueError: + print(f"{args} cannot be cast to int") + raise + +def mangrove_by_country_latest(): + return """ + select * from models_final.final__protected_mangroves_summary_stats_by_country_agg + where year = '2020' + order by cumulative_pixels_diff desc + """ + +def bitcoin_business_growth_timeseries(args): + days_ago = parse_int(args["days_ago"]) + country_name = args["country_name"] + return f""" + select * from models_final.final__bitcoin_business_growth_by_country + where days_ago <= {days_ago} and country_name = '{country_name}' + order by date + """ + +def bitcoin_business_growth_percent_diff_days_ago(args): + days_ago = parse_int(args["days_ago"]) + return f""" + with + filtered_data as ( + select country_name, date, days_ago, cumulative_value + from models_final.final__bitcoin_business_growth_by_country + where days_ago <= {days_ago} + order by country_name, days_ago desc + ), + first_and_last_values as ( + select + country_name, + date, + days_ago, + cumulative_value, + first_value(cumulative_value) over ( + partition by country_name order by days_ago desc + ) as first_value, + first_value(date) over ( + partition by country_name order by days_ago desc + ) as first_date, + first_value(cumulative_value) over ( + partition by country_name order by days_ago + ) as last_value, + first_value(date) over ( + partition by country_name order by days_ago + ) as last_date + from filtered_data + ), + diff as ( + select + country_name, + date, + first_date, + last_date, + days_ago, + cumulative_value, + first_value, + last_value, + last_value - first_value as difference, + round( + 100 * safe_divide((last_value - first_value), first_value), 2 + ) as percent_difference + from first_and_last_values + ) + select * + from diff + where days_ago = 1 + order by difference desc + """ +# def bitcoin_business_growth_timeseries(query): +# pipeline = [ +# { +# "$match": { +# "days_ago": {"$lte": int(query["days_ago"])}, +# "country_name": query["country_name"], +# } +# }, +# { +# "$project": { +# "country_name": "$country_name", +# "date": "$date", +# "cumulative_value": "$cumulative_value", +# } +# }, +# {"$sort": {"country_name": 1, "days_ago": 1}}, +# ] +# return pipeline +# def mangrove_by_country_latest(): +# pipeline = [ +# { +# "$match": {"year": "2020"}, +# }, +# ] +# return pipeline +# +# +# def mangrove_by_country_agg(query): +# pipeline = [ +# {"$match": {"country_with_parent": query["country_with_parent"]}}, +# { +# "$group": { +# "_id": {"country_with_parent": "$country_with_parent", "year": "$year"}, +# "total_pixels": {"$sum": "$total_n_pixels"}, +# } +# }, +# { +# "$project": { +# "_id": 0, +# "country_with_parent": "$_id.country_with_parent", +# "year": "$_id.year", +# "total_pixels": 1, +# } +# }, +# {"$sort": {"year": 1}}, +# ] +# return pipeline +# +# +# def bitcoin_business_growth_timeseries(query): +# pipeline = [ +# { +# "$match": { +# "days_ago": {"$lte": int(query["days_ago"])}, +# "country_name": query["country_name"], +# } +# }, +# { +# "$project": { +# "country_name": "$country_name", +# "date": "$date", +# "cumulative_value": "$cumulative_value", +# } +# }, +# {"$sort": {"country_name": 1, "days_ago": 1}}, +# ] +# return pipeline +# +# +# def bitcoin_business_growth_percent_diff_days_ago(query): + pipeline = [ + {"$match": {"days_ago": {"$lte": int(query["days_ago"])}}}, + {"$sort": {"country_name": 1, "days_ago": 1}}, + { + "$group": { + "_id": "$country_name", + "firstvalue": {"$first": "$cumulative_value"}, + "lastvalue": {"$last": "$cumulative_value"}, + "firstdate": {"$min": "$date"}, + "lastdate": {"$max": "$date"}, + } + }, + { + "$project": { + "country_name": "$_id", + "first_value": "$firstvalue", + "last_value": "$lastvalue", + "difference": { + "$subtract": [ + {"$todouble": "$firstvalue"}, + {"$todouble": "$lastvalue"}, + ] + }, + "first_date": "$firstdate", + "last_date": "$lastdate", + "percent_difference": { + "$cond": { + "if": {"$eq": [{"$todouble": "$lastvalue"}, 0]}, + "then": { + "$cond": { + "if": {"$gt": [{"$todouble": "$firstvalue"}, 0]}, + "then": "new", + "else": "none", + } + }, + "else": { + "$round": [ + { + "$multiply": [ + { + "$divide": [ + { + "$subtract": [ + {"$todouble": "$firstvalue"}, + {"$todouble": "$lastvalue"}, + ] + }, + {"$todouble": "$lastvalue"}, + ] + }, + 100, + ] + } + ] + }, + } + }, + } + }, + ] + return pipeline +# +# +# def bitcoin_business_growth_latest(query): +# pipeline = [ +# { +# "$match": query["filter"], +# }, +# {"$sort": {"date": 1}}, +# ] +# return pipeline diff --git a/backend/api/postgres_handler.py b/backend/api/postgres_handler.py new file mode 100644 index 0000000..64f0677 --- /dev/null +++ b/backend/api/postgres_handler.py @@ -0,0 +1,34 @@ +from psycopg2.extras import RealDictCursor +import psycopg2, os + +class PostgresHandler: + def __init__(self): + self.connection = self.connect_to_pg() + self.cur = self.connection.cursor(cursor_factory=RealDictCursor) + + def connect_to_pg(self): + try: + connection = psycopg2.connect( + dbname=os.getenv('PGDATABASE'), + host=os.getenv('PGHOST'), + user=os.getenv('PGUSER'), + password=os.getenv('PGPASSWORD'), + port=os.getenv('PGPORT'), + ) + except Exception as e: + message=f"Connection to postgres database failed: {e}" + raise Exception(message) + print(f"Successfully connected to DB") + return connection + + def execute_query(self, query): + try: + self.cur.execute(query) + results = self.cur.fetchall() + self.connection.commit() + self.connection.close() + return results + except Exception: + print("Error executing query") + raise + diff --git a/backend/api/route.py b/backend/api/route.py new file mode 100644 index 0000000..22c8455 --- /dev/null +++ b/backend/api/route.py @@ -0,0 +1,96 @@ +from fastapi import APIRouter +from api.postgres_handler import PostgresHandler +import api.pipelines as pipelines +import api.schemas as schemas +from api.schemas import DataSerializer +import json + +router = APIRouter() + +def parse_args_to_dict(query): + try: + return json.loads(query) + except json.JSONDecodeError as e: + return {"error": f"Invalid JSON: {e}"} + +@router.get("/mangrove_by_country_latest") +async def mangrove_by_country_latest(): + pipeline = pipelines.mangrove_by_country_latest() + handler = PostgresHandler() + + schema = schemas.mangrove_by_country_latest_schema + serializer = DataSerializer(schema) + rawData = handler.execute_query(pipeline) + serializedData = serializer.serialize_many(rawData) + return serializedData + +@router.get("/bitcoin_business_growth_timeseries") +async def bitcoin_business_growth_timeseries(query: str): + args = parse_args_to_dict(query) + + pipeline = pipelines.bitcoin_business_growth_timeseries(args) + handler = PostgresHandler() + + schema = schemas.bitcoin_business_growth_timeseries_schema + serializer = DataSerializer(schema) + + rawData = handler.execute_query(pipeline) + serializedData = serializer.serialize_many(rawData) + + return serializedData + +@router.get("/bitcoin_business_growth_percent_diff") +async def bitcoin_business_growth_percent_diff(query: str): + args = parse_args_to_dict(query) + + pipeline = pipelines.bitcoin_business_growth_percent_diff_days_ago(args) + handler = PostgresHandler() + + schema = schemas.bitcoin_business_growth_percent_diff_schema + serializer = DataSerializer(schema) + + rawData = handler.execute_query(pipeline) + serializedData = serializer.serialize_many(rawData) + return serializedData + +# @router.get("/bitcoin_business_growth_percent_diff") +# async def bitcoin_business_growth_percent_diff(query: str): +# query = ast.literal_eval(query) +# +# query = queries.bitcoin_business_growth_percent_diff_days_ago(query) +# handler = PostgresHandler(connection) +# +# schema = schemas.bitcoin_business_growth_percent_diff_schema +# pipeline = pipelines.bitcoin_business_growth_percent_diff_days_ago(query) +# serializer = DataSerializer(schema) +# handler = MongoDBHandler(collection_name) +# rawData = handler.aggregate(pipeline) +# serializedData = serializer.serialize_many(rawData) +# return serializedData +# @router.get("/mangrove_by_country_agg") +# async def mangrove_by_country_agg(query: str): +# query = ast.literal_eval(query) +# db = client.baseddata +# collection_name = db["final__protected_mangroves_summary_stats_by_country_agg"] +# schema = schemas.mangrove_by_country_agg_schema +# pipeline = pipelines.mangrove_by_country_agg(query) +# serializer = DataSerializer(schema) +# handler = MongoDBHandler(collection_name) +# rawData = handler.aggregate(pipeline) +# serializedData = serializer.serialize_many(rawData) +# return serializedData +# + +# @router.get("/bitcoin_business_growth_timeseries") +# async def bitcoin_business_growth_timeseries(query: str): +# query = ast.literal_eval(query) +# db = client.baseddata +# collection_name = db["final__bitcoin_business_growth_by_country"] +# schema = schemas.bitcoin_business_growth_timeseries_schema +# pipeline = pipelines.bitcoin_business_growth_timeseries(query) +# serializer = DataSerializer(schema) +# handler = MongoDBHandler(collection_name) +# rawData = handler.aggregate(pipeline) +# serializedData = serializer.serialize_many(rawData) +# return serializedData + diff --git a/backend/api/schemas.py b/backend/api/schemas.py new file mode 100644 index 0000000..f822d3a --- /dev/null +++ b/backend/api/schemas.py @@ -0,0 +1,42 @@ +def mangrove_by_country_latest_schema(data): + return { + "country_with_parent": str(data["country_with_parent"]), + "original_pixels": int(data["original_pixels"]), + "total_n_pixels": int(data["total_n_pixels"]), + "cumulative_pixels_diff": int(data["cumulative_pixels_diff"]), + "cumulative_pct_diff": float(data["cumulative_pct_diff"]), + } + +def mangrove_by_country_agg_schema(data): + return { + "country_with_parent": str(data["country_with_parent"]), + "year": int(data["year"]), + "total_pixels": int(data["total_pixels"]) + } + +def bitcoin_business_growth_percent_diff_schema(data): + return { + "country_name": str(data["country_name"]), + "date_range": str(f'{data["first_date"]} to {data["last_date"]}'), + "first_value": int(data["first_value"]), + "last_value": int(data["last_value"]), + "difference": int(data["difference"]), + "percent_difference": str(data["percent_difference"]) + } + +def bitcoin_business_growth_timeseries_schema(data): + return { + "country_name": str(data["country_name"]), + "date": data["date"], + "cumulative_value": int(data["cumulative_value"]) + } + +class DataSerializer: + def __init__(self, schema_func): + self.schema_func = schema_func + + def serialize_one(self, data) -> dict: + return self.schema_func(dict( data )) + + def serialize_many(self, data_list) -> list: + return [self.serialize_one(data) for data in data_list]