baseddata.io/backend/app.py

211 lines
6.6 KiB
Python

from flask import Flask, g, jsonify, request, json, Response, send_from_directory, abort
from flask_cors import CORS
import orjson, os
import pandas as pd
import datetime
import time
app = Flask(__name__)
CORS(app)
FILES_DIRECTORY = "../data/"
@app.before_request
def start_timer():
g.start = time.time()
@app.after_request
def log(response):
now = time.time()
duration = round(now - g.start, 4)
dt = datetime.datetime.fromtimestamp(now).strftime("%Y-%m-%d %H:%M:%S")
log_entry = {
"timestamp": dt,
"duration": duration,
"method": request.method,
"url": request.url,
"status": response.status_code,
"remote_addr": request.access_route[-1],
"user_agent": request.user_agent.string,
}
log_line = ",".join(f"{key}={value}" for key, value in log_entry.items())
with open("api_logs.txt", "a") as f:
f.write(log_line + "\n")
return response
@app.route("/bitcoin_business_growth_by_country", methods=["GET"])
def business_growth():
today = datetime.datetime.today()
# Parse args from request
latest_date = request.args.get("latest_date")
country_names = request.args.get("countries")
cumulative_period_type = request.args.get("cumulative_period_type")
# Open json locally
with open("../data/final__bitcoin_business_growth_by_country.json", "rb") as f:
data = orjson.loads(f.read())
# Filter based on args
if latest_date:
latest_date_bool = latest_date == "true"
filtered_data = [
item for item in data if item["latest_date"] == latest_date_bool
]
else:
filtered_data = data
if country_names:
countries = [name.strip() for name in country_names.split(",")]
filtered_data = [
item for item in filtered_data if item["country_name"] in countries
]
if cumulative_period_type == "1 day":
delta = today - datetime.timedelta(days=2)
filtered_data = [
item
for item in filtered_data
if item["cumulative_period_type"] == cumulative_period_type
and delta <= datetime.datetime.strptime(item["date"], "%Y-%m-%d")
]
elif cumulative_period_type == "7 day":
delta = today - datetime.timedelta(days=8)
filtered_data = [
item
for item in filtered_data
if item["cumulative_period_type"] == cumulative_period_type
and delta <= datetime.datetime.strptime(item["date"], "%Y-%m-%d")
]
elif cumulative_period_type == "28 day":
delta = today - datetime.timedelta(days=29)
filtered_data = [
item
for item in filtered_data
if item["cumulative_period_type"] == cumulative_period_type
and delta <= datetime.datetime.strptime(item["date"], "%Y-%m-%d")
]
elif cumulative_period_type == "365 day":
delta = today - datetime.timedelta(days=366)
filtered_data = [
item
for item in filtered_data
if item["cumulative_period_type"] == cumulative_period_type
and delta <= datetime.datetime.strptime(item["date"], "%Y-%m-%d")
]
# Sort by date
sorted_data = sorted(filtered_data, key=lambda x: x["date"], reverse=False)
# Return json
return Response(json.dumps(sorted_data), mimetype="application/json")
@app.route("/get_json/<filename>", methods=["GET"])
def get_json(filename):
period = request.args.get("period")
today = datetime.datetime.today()
file_path = os.path.join(FILES_DIRECTORY, filename)
if not os.path.isfile(file_path):
abort(404)
with open(file_path, "r") as f:
data = orjson.loads(f.read())
if period == "last 7 days":
delta = today - datetime.timedelta(days=7)
filtered_data = [
item
for item in data
if delta <= datetime.datetime.strptime(item["date"], "%Y-%m-%d") <= today
]
sorted_data = sorted(filtered_data, key=lambda x: x["date"])
elif period == "last 28 days":
delta = today - datetime.timedelta(days=28)
filtered_data = [
item
for item in data
if delta <= datetime.datetime.strptime(item["date"], "%Y-%m-%d") <= today
]
sorted_data = sorted(filtered_data, key=lambda x: x["date"])
elif period == "last 365 days":
delta = today - datetime.timedelta(days=365)
filtered_data = [
item
for item in data
if delta <= datetime.datetime.strptime(item["date"], "%Y-%m-%d") <= today
]
sorted_data = sorted(filtered_data, key=lambda x: x["date"])
elif period == "last 2 years":
delta = today - datetime.timedelta(days=730)
filtered_data = [
item
for item in data
if delta <= datetime.datetime.strptime(item["date"], "%Y-%m-%d") <= today
]
sorted_data = sorted(filtered_data, key=lambda x: x["date"])
else:
sorted_data = sorted(data, key=lambda x: x["date"])
return jsonify(sorted_data)
@app.route("/mangrove_data/<method>", methods=["GET"])
def mangrove_data(method):
with open("../data/dev/final__wdpa_pid_mangrove_diff_stats.json", "rb") as f:
data = orjson.loads(f.read())
if method == "countries":
df = pd.read_json(json.dumps(data))
countries = df[["year", "country", "n_pixels", "diff", "cumulative_diff"]]
countriesAgg = countries.groupby(["year", "country"]).agg(
{"n_pixels": "sum", "diff": "sum", "cumulative_diff": "sum"}
)
countriesAgg["year0_pixels"] = (
countriesAgg["n_pixels"] - countriesAgg["cumulative_diff"]
)
countriesAgg["pct_diff"] = (
100
* (countriesAgg["n_pixels"] - countriesAgg["year0_pixels"])
/ countriesAgg["year0_pixels"]
).round(2)
countriesLatest = countriesAgg.loc[[2020]].reset_index().set_index("country")
return Response(
countriesLatest.to_json(orient="index"), mimetype="application/json"
)
@app.route("/download/<filename>", methods=["GET"])
def download_file(filename):
try:
return send_from_directory(FILES_DIRECTORY, filename, as_attachment=True)
except FileNotFoundError:
abort(404)
@app.route("/cog", methods=["GET"])
def serve_cog():
year = request.args.get("year")
pid = request.args.get("pid") # change this line
dir = f"{FILES_DIRECTORY}/cog/{year}/"
try:
return send_from_directory(dir, f"{pid}.tif", as_attachment=True)
except FileNotFoundError:
abort(404)
if __name__ == "__main__":
app.run()