from flask import Flask, g, jsonify, request, json, Response, send_from_directory, abort from flask_cors import CORS import orjson, os import pandas as pd import datetime import time app = Flask(__name__) CORS(app) FILES_DIRECTORY = "../data/" @app.before_request def start_timer(): g.start = time.time() @app.after_request def log(response): now = time.time() duration = round(now - g.start, 4) dt = datetime.datetime.fromtimestamp(now).strftime("%Y-%m-%d %H:%M:%S") log_entry = { "timestamp": dt, "duration": duration, "method": request.method, "url": request.url, "status": response.status_code, "remote_addr": request.access_route[-1], "user_agent": request.user_agent.string, } log_line = ",".join(f"{key}={value}" for key, value in log_entry.items()) with open("api_logs.txt", "a") as f: f.write(log_line + "\n") return response @app.route("/bitcoin_business_growth_by_country", methods=["GET"]) def business_growth(): today = datetime.datetime.today() # Parse args from request latest_date = request.args.get("latest_date") country_names = request.args.get("countries") cumulative_period_type = request.args.get("cumulative_period_type") # Open json locally with open("../data/final__bitcoin_business_growth_by_country.json", "rb") as f: data = orjson.loads(f.read()) # Filter based on args if latest_date: latest_date_bool = latest_date == "true" filtered_data = [ item for item in data if item["latest_date"] == latest_date_bool ] else: filtered_data = data if country_names: countries = [name.strip() for name in country_names.split(",")] filtered_data = [ item for item in filtered_data if item["country_name"] in countries ] if cumulative_period_type == "1 day": delta = today - datetime.timedelta(days=2) filtered_data = [ item for item in filtered_data if item["cumulative_period_type"] == cumulative_period_type and delta <= datetime.datetime.strptime(item["date"], "%Y-%m-%d") ] elif cumulative_period_type == "7 day": delta = today - datetime.timedelta(days=8) filtered_data = [ item for item in filtered_data if item["cumulative_period_type"] == cumulative_period_type and delta <= datetime.datetime.strptime(item["date"], "%Y-%m-%d") ] elif cumulative_period_type == "28 day": delta = today - datetime.timedelta(days=29) filtered_data = [ item for item in filtered_data if item["cumulative_period_type"] == cumulative_period_type and delta <= datetime.datetime.strptime(item["date"], "%Y-%m-%d") ] elif cumulative_period_type == "365 day": delta = today - datetime.timedelta(days=366) filtered_data = [ item for item in filtered_data if item["cumulative_period_type"] == cumulative_period_type and delta <= datetime.datetime.strptime(item["date"], "%Y-%m-%d") ] # Sort by date sorted_data = sorted(filtered_data, key=lambda x: x["date"], reverse=False) # Return json return Response(json.dumps(sorted_data), mimetype="application/json") @app.route("/get_json/", methods=["GET"]) def get_json(filename): period = request.args.get("period") today = datetime.datetime.today() file_path = os.path.join(FILES_DIRECTORY, filename) if not os.path.isfile(file_path): abort(404) with open(file_path, "r") as f: data = orjson.loads(f.read()) if period == "last 7 days": delta = today - datetime.timedelta(days=7) filtered_data = [ item for item in data if delta <= datetime.datetime.strptime(item["date"], "%Y-%m-%d") <= today ] sorted_data = sorted(filtered_data, key=lambda x: x["date"]) elif period == "last 28 days": delta = today - datetime.timedelta(days=28) filtered_data = [ item for item in data if delta <= datetime.datetime.strptime(item["date"], "%Y-%m-%d") <= today ] sorted_data = sorted(filtered_data, key=lambda x: x["date"]) elif period == "last 365 days": delta = today - datetime.timedelta(days=365) filtered_data = [ item for item in data if delta <= datetime.datetime.strptime(item["date"], "%Y-%m-%d") <= today ] sorted_data = sorted(filtered_data, key=lambda x: x["date"]) elif period == "last 2 years": delta = today - datetime.timedelta(days=730) filtered_data = [ item for item in data if delta <= datetime.datetime.strptime(item["date"], "%Y-%m-%d") <= today ] sorted_data = sorted(filtered_data, key=lambda x: x["date"]) else: sorted_data = sorted(data, key=lambda x: x["date"]) return jsonify(sorted_data) @app.route("/mangrove_data/", methods=["GET"]) def mangrove_data(method): with open("../data/dev/final__wdpa_pid_mangrove_diff_stats.json", "rb") as f: data = orjson.loads(f.read()) if method == "countries": df = pd.read_json(json.dumps(data)) countries = df[["year", "country", "n_pixels", "diff", "cumulative_diff"]] countriesAgg = countries.groupby(["year", "country"]).agg( {"n_pixels": "sum", "diff": "sum", "cumulative_diff": "sum"} ) countriesAgg["year0_pixels"] = ( countriesAgg["n_pixels"] - countriesAgg["cumulative_diff"] ) countriesAgg["pct_diff"] = ( 100 * (countriesAgg["n_pixels"] - countriesAgg["year0_pixels"]) / countriesAgg["year0_pixels"] ).round(2) countriesLatest = countriesAgg.loc[[2020]].reset_index().set_index("country") return Response( countriesLatest.to_json(orient="index"), mimetype="application/json" ) @app.route("/download/", methods=["GET"]) def download_file(filename): try: return send_from_directory(FILES_DIRECTORY, filename, as_attachment=True) except FileNotFoundError: abort(404) @app.route("/cog", methods=["GET"]) def serve_cog(): year = request.args.get("year") pid = request.args.get("pid") # change this line dir = f"{FILES_DIRECTORY}/cog/{year}/" try: return send_from_directory(dir, f"{pid}.tif", as_attachment=True) except FileNotFoundError: abort(404) if __name__ == "__main__": app.run()