baseddata.io/backend/app.py

124 lines
4.8 KiB
Python

from flask import Flask, g, jsonify, request, json, Response, send_from_directory, abort
from flask_cors import CORS
import orjson, os
import datetime
import time
app = Flask(__name__)
CORS(app)
FILES_DIRECTORY = '../data/'
today = datetime.datetime.today()
@app.before_request
def start_timer():
g.start = time.time()
@app.after_request
def log(response):
now = time.time()
duration = round(now - g.start, 4)
dt = datetime.datetime.fromtimestamp(now).strftime('%Y-%m-%d %H:%M:%S')
log_entry = {
'timestamp': dt,
'duration': duration,
'method': request.method,
'url': request.url,
'status': response.status_code,
'remote_addr': request.remote_addr,
'user_agent': request.user_agent.string,
}
log_line = ','.join(f'{key}={value}' for key, value in log_entry.items())
with open('api_logs.txt', 'a') as f:
f.write(log_line + '\n')
return response
@app.route('/bitcoin_business_growth_by_country', methods=['GET'])
def business_growth():
# Parse args from request
latest_date = request.args.get('latest_date')
country_names = request.args.get('countries') # change this line
cumulative_period_type = request.args.get('cumulative_period_type')
# Open json locally
with open('../data/final__bitcoin_business_growth_by_country.json', 'rb') as f:
data = orjson.loads(f.read())
# Filter based on args
if latest_date:
latest_date_bool = latest_date == 'true'
filtered_data = [item for item in data if item['latest_date'] == latest_date_bool]
else:
filtered_data = data
if country_names:
countries = [name.strip() for name in country_names.split(",")]
filtered_data = [item for item in filtered_data if item['country_name'] in countries]
if cumulative_period_type == "1 day":
delta = today - datetime.timedelta(days=2)
filtered_data = [item for item in filtered_data if item['cumulative_period_type'] == cumulative_period_type and delta <= datetime.datetime.strptime(item['date'], '%Y-%m-%d')]
elif cumulative_period_type == "7 day":
delta = today - datetime.timedelta(days=7)
filtered_data = [item for item in filtered_data if item['cumulative_period_type'] == cumulative_period_type and delta <= datetime.datetime.strptime(item['date'], '%Y-%m-%d')]
elif cumulative_period_type == "28 day":
delta = today - datetime.timedelta(days=28)
filtered_data = [item for item in filtered_data if item['cumulative_period_type'] == cumulative_period_type and delta <= datetime.datetime.strptime(item['date'], '%Y-%m-%d')]
elif cumulative_period_type == "365 day":
delta = today - datetime.timedelta(days=365)
filtered_data = [item for item in filtered_data if item['cumulative_period_type'] == cumulative_period_type and delta <= datetime.datetime.strptime(item['date'], '%Y-%m-%d')]
# Sort by date
sorted_data = sorted(filtered_data, key=lambda x: x['date'], reverse=True)
# Return json
return Response(json.dumps(sorted_data), mimetype='application/json')
@app.route('/get_json/<filename>', methods=['GET'])
def get_json(filename):
period = request.args.get('period')
file_path = os.path.join(FILES_DIRECTORY, filename)
if not os.path.isfile(file_path):
abort(404)
with open(file_path, 'r') as f:
data = orjson.loads(f.read())
if period == "last 7 days":
delta = today - datetime.timedelta(days=7)
filtered_data = [item for item in data if delta <= datetime.datetime.strptime(item['date'], '%Y-%m-%d') <= today]
sorted_data = sorted(filtered_data, key=lambda x: x['date'])
elif period == "last 28 days":
delta = today - datetime.timedelta(days=28)
filtered_data = [item for item in data if delta <= datetime.datetime.strptime(item['date'], '%Y-%m-%d') <= today]
sorted_data = sorted(filtered_data, key=lambda x: x['date'])
elif period == "last 365 days":
delta = today - datetime.timedelta(days=365)
filtered_data = [item for item in data if delta <= datetime.datetime.strptime(item['date'], '%Y-%m-%d') <= today]
sorted_data = sorted(filtered_data, key=lambda x: x['date'])
elif period == "last 2 years":
delta = today - datetime.timedelta(days=730)
filtered_data = [item for item in data if delta <= datetime.datetime.strptime(item['date'], '%Y-%m-%d') <= today]
sorted_data = sorted(filtered_data, key=lambda x: x['date'])
else:
sorted_data = sorted(data, key=lambda x: x['date'])
return jsonify(sorted_data)
@app.route('/download/<filename>', methods=['GET'])
def download_file(filename):
try:
return send_from_directory(FILES_DIRECTORY, filename, as_attachment=True)
except FileNotFoundError:
abort(404)
if __name__ == '__main__':
app.run()