API - refacto weather utils

This commit is contained in:
Sam
2022-11-17 00:13:43 +01:00
parent 64d770a016
commit 940f0a8416
10 changed files with 581 additions and 141 deletions

View File

@ -4,7 +4,9 @@ from typing import Any, Dict, List, Optional, Tuple, Union
import gpxpy.gpx
from ..exceptions import WorkoutGPXException
from .weather import get_weather
from .weather import WeatherService
weather_service = WeatherService()
def open_gpx_file(gpx_file: str) -> Optional[gpxpy.gpx.GPX]:
@ -99,7 +101,7 @@ def get_gpx_info(
if start is None:
start = point.time
if point.time and update_weather_data:
weather_data.append(get_weather(point))
weather_data.append(weather_service.get_weather(point))
# if a previous segment exists, calculate stopped time between
# the two segments
@ -114,7 +116,7 @@ def get_gpx_info(
# last gpx point => get weather
if segment_idx == (segments_nb - 1) and update_weather_data:
weather_data.append(get_weather(point))
weather_data.append(weather_service.get_weather(point))
if update_map_data:
map_data.append([point.longitude, point.latitude])

View File

@ -1,129 +0,0 @@
import os
from datetime import datetime, timedelta
from typing import Dict, Optional, Union
import forecastio
import pytz
import requests
from gpxpy.gpx import GPXTrackPoint
from fittrackee import appLog
API_KEY = os.getenv('WEATHER_API_KEY')
VC_API_KEY = os.getenv('VC_WEATHER_API_KEY')
def get_weather(point: GPXTrackPoint) -> Optional[Dict]:
try:
if not point.time:
# if there's no time associated with the point;
# we cannot get weather
return None
else:
point_time = (
pytz.utc.localize(point.time)
if point.time.tzinfo is None
else point.time.astimezone(pytz.utc)
)
if API_KEY:
# if darksky api key is present, use that
forecast = forecastio.load_forecast(
API_KEY,
point.latitude,
point.longitude,
time=point_time,
units='si',
)
weather = forecast.currently()
return {
'summary': weather.summary,
'icon': weather.icon,
'temperature': weather.temperature,
'humidity': weather.humidity,
'wind': weather.windSpeed,
'windBearing': weather.windBearing,
}
elif VC_API_KEY:
# if visualcrossing API key is present, use that
return get_visual_crossing_weather(
VC_API_KEY, point.latitude, point.longitude, point.time
)
else:
return None
except Exception as e:
appLog.error(e)
return None
def get_visual_crossing_weather(
api_key: str, latitude: float, longitude: float, time: datetime
) -> Dict[str, Union[str, float]]:
# All requests to the Timeline Weather API use the following the form:
# https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services
# /timeline/[location]/[date1]/[date2]?key=YOUR_API_KEY
# location (required) is the address, partial address or
# latitude,longitude location for
# which to retrieve weather data. You can also use US ZIP Codes.
# date1 (optional) is the start date for which to retrieve weather data.
# You can also request the information for a specific time for a single
# date by including time into the date1 field using the format
# yyyy-MM-ddTHH:mm:ss. For example 2020-10-19T13:00:00.
# The results are returned in the currentConditions field and are
# truncated to the hour requested (i.e. 2020-10-19T13:59:00 will return
# data at 2020-10-19T13:00:00).
# first, round datetime to nearest hour by truncating, and then adding an
# hour if the "real" time's number of minutes is 30 or more (we do this
# since the API only truncates)
trunc_time = time.replace(
second=0, microsecond=0, minute=0, hour=time.hour
) + timedelta(hours=time.minute // 30)
appLog.debug(
f'VC_weather: truncated time {time} ({time.timestamp()}) to '
f'{trunc_time} ({trunc_time.timestamp()})'
)
base_url = (
'https://weather.visualcrossing.com/'
+ 'VisualCrossingWebServices/rest/services'
)
url = (
f"{base_url}/timeline/{latitude},{longitude}"
f"/{int(trunc_time.timestamp())}?key={api_key}"
)
params = {
"unitGroup": "metric",
"contentType": "json",
"elements": (
"datetime,datetimeEpoch,temp,humidity,windspeed,"
"winddir,conditions,description,icon"
),
"include": "current",
}
appLog.debug(
f'VC_weather: getting weather from {url}'.replace(api_key, '*****')
)
r = requests.get(url, params=params)
r.raise_for_status()
res = r.json()
weather = res['currentConditions']
# FitTrackee expects the following units:
# temp: Celsius,
# humidity: in fraction (rather than percent)
# windSpeed: m/s
# windBearing: direction wind is from in degrees (0 is north)
# VC provides humidity in percent, wind in km/h
data = {
'summary': weather['conditions'],
'icon': f"vc-{weather['icon']}",
'temperature': weather['temp'],
'humidity': weather['humidity'] / 100,
'wind': weather['windspeed'] * 1000 / (60 * 60), # km/h to m/s
'windBearing': weather['winddir'],
}
return data

View File

@ -0,0 +1,5 @@
from .weather_service import WeatherService
__all__ = [
'WeatherService',
]

View File

@ -0,0 +1,50 @@
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Dict, Optional
from gpxpy.gpx import GPXTrackPoint
class BaseWeather(ABC):
def __init__(self, api_key: str) -> None:
self.api_key: str = api_key
@abstractmethod
def _get_data(
self, latitude: float, longitude: float, time: datetime
) -> Optional[Dict]:
# Expected dict:
# {
# "humidity": 0.69,
# "icon": "partly-cloudy-day",
# "temperature": 12.26,
# "wind": 3.49,
# "windBearing": 315
# }
#
# FitTrackee expects the following units:
# temperature: Celsius,
# humidity: in fraction (rather than percent)
# windSpeed: m/s
# windBearing: direction wind is from in degrees (0 is north)
#
# Expected icon values (for UI):
# - "clear-day",
# - "clear-night",
# - "cloudy",
# - "fog",
# - "partly-cloudy-day",
# - "partly-cloudy-night",
# - "rain",
# - "sleet",
# - "snow",
# - "wind"
pass
def get_weather(self, point: GPXTrackPoint) -> Optional[Dict]:
if not point.time:
# if there's no time associated with the point,
# we cannot get weather
return None
return self._get_data(point.latitude, point.longitude, point.time)

View File

@ -0,0 +1,37 @@
from datetime import datetime
from typing import Dict, Optional
import forecastio
import pytz
from .base_weather import BaseWeather
class DarkSky(BaseWeather):
# Deprecated (API will end on March 31st, 2023)
def _get_data(
self, latitude: float, longitude: float, time: datetime
) -> Optional[Dict]:
# get point time in UTC
point_time = (
pytz.utc.localize(time)
if time.tzinfo is None # naive datetime
else time
)
forecast = forecastio.load_forecast(
self.api_key,
latitude,
longitude,
time=point_time,
units='si',
)
weather = forecast.currently()
return {
'humidity': weather.humidity,
'icon': weather.icon,
'temperature': weather.temperature,
'wind': weather.windSpeed,
'windBearing': weather.windBearing,
}

View File

@ -0,0 +1,84 @@
from datetime import datetime, timedelta
from typing import Dict, Optional
import requests
from fittrackee import appLog
from .base_weather import BaseWeather
class VisualCrossing(BaseWeather):
def __init__(self, api_key: str):
super().__init__(api_key)
self.base_url = (
'https://weather.visualcrossing.com/'
'VisualCrossingWebServices/rest/services'
)
self.params = {
"key": self.api_key,
"iconSet": "icons1", # default value, same as Darksky
"unitGroup": "metric",
"contentType": "json",
"elements": (
"datetime,datetimeEpoch,temp,humidity,windspeed,"
"winddir,conditions,description,icon"
),
"include": "current", # to get only specific time data
}
@staticmethod
def _get_timestamp(time: datetime) -> int:
# The results are returned in the currentConditions field and are
# truncated to the hour requested (i.e. 2020-10-19T13:59:00 will return
# data at 2020-10-19T13:00:00).
# first, round datetime to nearest hour by truncating, and then adding
# an hour if the "real" time's number of minutes is 30 or more (we do
# this since the API only truncates)
trunc_time = time.replace(
second=0, microsecond=0, minute=0, hour=time.hour
) + timedelta(hours=time.minute // 30)
appLog.debug(
f'VC_weather: truncated time {time} ({time.timestamp()})'
f' to {trunc_time} ({trunc_time.timestamp()})'
)
return int(trunc_time.timestamp())
def _get_data(
self, latitude: float, longitude: float, time: datetime
) -> Optional[Dict]:
# All requests to the Timeline Weather API use the following the form:
# https://weather.visualcrossing.com/VisualCrossingWebServices/rest
# /services/timeline/[location]/[date1]/[date2]?key=YOUR_API_KEY
# location (required) is the address, partial address or
# latitude,longitude location for
# which to retrieve weather data. You can also use US ZIP Codes.
# date1 (optional) is the start date for which to retrieve weather
# data. All dates and times are in local time of the **location**
# specified.
url = (
f"{self.base_url}/timeline/{latitude},{longitude}"
f"/{self._get_timestamp(time)}"
)
appLog.debug(
f'VC_weather: getting weather from {url}'.replace(
self.api_key, '*****'
)
)
r = requests.get(url, params=self.params)
r.raise_for_status()
res = r.json()
weather = res['currentConditions']
data = {
'icon': weather['icon'],
'temperature': weather['temp'],
'humidity': weather['humidity'] / 100,
'wind': weather['windspeed'] * 1000 / (60 * 60), # km/h to m/s
'windBearing': weather['winddir'],
}
return data

View File

@ -0,0 +1,44 @@
import os
from typing import Dict, Optional, Union
from gpxpy.gpx import GPXTrackPoint
from fittrackee import appLog
from .dark_sky import DarkSky
from .visual_crossing import VisualCrossing
class WeatherService:
"""
Available API:
- DarkSky (deprecated, will end on March 31st, 2023)
- VisualCrossing
"""
def __init__(self) -> None:
self.weather_api = self._get_weather_api()
@staticmethod
def _get_weather_api() -> Union[DarkSky, VisualCrossing, None]:
weather_api_key: str = os.getenv('WEATHER_API_KEY', '')
weather_api_provider: str = os.getenv(
'WEATHER_API_PROVIDER', ''
).lower()
if not weather_api_key:
return None
if weather_api_provider == 'darksky': # deprecated
return DarkSky(weather_api_key)
if weather_api_provider == 'visualcrossing':
return VisualCrossing(weather_api_key)
return None
def get_weather(self, point: GPXTrackPoint) -> Optional[Dict]:
if not self.weather_api:
return None
try:
return self.weather_api.get_weather(point)
except Exception as e:
appLog.error(e)
return None