#!/usr/bin/env python import json import os import overpy import numpy as np import pandas as pd import scipy from pathlib import Path # brandname : overpass query filters BRANDS: dict[str, str] = { "greggs": "[\"brand:wikidata\"=\"Q3403981\"]", "tesco": "[\"brand:wikidata\"~\"^(Q487494|Q98456772|Q25172225|Q65954217)$\"]", # Includes Tesco Express, Tesco Extra, and One Stop } CACHE_FOLDER = Path(".cache") LOCS_COUNT = 4 DISTS_COUNT = 10 FORMAT_FACTOR = 1e6 # μm FIRST_SEP = ':' OTHER_SEP = ',' LOC_SEP = ';' EncodedLocation = list[tuple[float, list[float]]] def fetch_data(brand: str, cache: bool = True) -> list[tuple[float, float]]: """Fetch a list of locations from OSM.""" cache_loc = (CACHE_FOLDER / f"{brand}.json") # Try load from cache if cache and cache_loc.exists(): with open(cache_loc, "r") as f: data = json.load(f) return data api = overpy.Overpass() filters = BRANDS[brand] query = api.query(f"nwr{filters}; out center;") result = [] for way in query.ways: result.append((float(way.center_lat), float(way.center_lon))) for node in query.nodes: result.append((float(node.lat), float(node.lon))) for (lat, lon) in result: if (lat is None) or (lon is None): raise ValueError("Item missing coords!") # Save to cache if cache: if not CACHE_FOLDER.exists(): os.makedirs(CACHE_FOLDER) with open(cache_loc, "w") as f: json.dump(result, f) print(f"Got {len(result)} {brand}s") return result def spherical_dist(pos1, pos2, r=6378137): """Calculate sperical distances between two arrays of coordinates. Return value is the same unit as `r`. `r` defaults to the radius of the earth, in meters. """ pos1 = pos1 * np.pi / 180 pos2 = pos2 * np.pi / 180 cos_lat1 = np.cos(pos1[..., 0]) cos_lat2 = np.cos(pos2[..., 0]) cos_lat_d = np.cos(pos1[..., 0] - pos2[..., 0]) cos_lon_d = np.cos(pos1[..., 1] - pos2[..., 1]) return r * np.arccos(cos_lat_d - cos_lat1 * cos_lat2 * (1 - cos_lon_d)) # (lat, lon), dist StationT = tuple[tuple[float, float], float] def trilat_error(stations: list[StationT], position: tuple[float, float]) -> float: """Calculate the error in a trilaterated position.""" sq_errors = [(sdist - spherical_dist(np.array(position), np.array(spos))) ** 2 for spos, sdist in stations] return sum(sq_errors) / len(sq_errors) def trilaterate(stations: list[StationT]) -> tuple[float, float]: """Trilaterate a position, given a list of stations. Each station is of the format ((lat, lon), distance). """ return scipy.optimize.fmin(lambda pos: trilat_error(stations, pos), (0., 0.)) def encode(location: tuple[float, float]) -> EncodedLocation: """Encode a location.""" greggs = np.array(fetch_data("greggs")) repeat_rows = np.tile(greggs, (len(greggs), 1, 1)) repeat_cols = np.transpose(repeat_rows, (1, 0, 2)) dist_matrix = spherical_dist(repeat_rows, repeat_cols) repeated = np.tile(location, (len(greggs), 1)) distances = spherical_dist(repeated, greggs) distances = pd.Series(distances) distances = distances.sort_values() closest = distances.head(LOCS_COUNT) result: EncodedLocation = [] for v, i in zip(closest.values, closest.index): greggs_distances = np.sort(dist_matrix[i])[1:DISTS_COUNT+1] result.append((v, list(map(float, greggs_distances)))) # Stub return result def decode(location: EncodedLocation) -> tuple[float, float]: """Decode into a location.""" # form the distances matrix greggs_raw = fetch_data("greggs") greggs = np.array(greggs_raw) repeat_rows = np.tile(greggs, (len(greggs), 1, 1)) repeat_cols = np.transpose(repeat_rows, (1, 0, 2)) dist_matrix = spherical_dist(repeat_rows, repeat_cols) # split the distances matrix into a list of series, which allows us to sort each row dist_series_list = [] for i in dist_matrix: dist_series_list.append(pd.Series(i).sort_values().head(len(location[0][1])+1)[1:]) # part 1: find the ID of each gregg's closest_greggs = [] for loc in location: dists = loc[1] errors = [sum((j - dists) ** 2) for j in dist_series_list] minerr = min(errors) if minerr > 1: print(f"warning: high error value of {minerr}") closest_greggs.append(errors.index(min(errors))) # part 2: trilaterate stations: list[StationT] = [(greggs_raw[g], location[i][0]) for i, g in enumerate(closest_greggs)] return trilaterate(stations) def format_dist(dist: float) -> str: return f"{int(round(dist * FORMAT_FACTOR))}" def parse_dist(dist: str) -> float: return float(dist) / FORMAT_FACTOR def format_location(location: EncodedLocation) -> str: """Format an encoded location as a string.""" return LOC_SEP.join([f"{format_dist(a)}{FIRST_SEP}{OTHER_SEP.join(map(format_dist, b))}" for (a, b) in location]) def parse_location(location: str) -> EncodedLocation: """Parse a location string into an EncodedLocation.""" raw_locations = location.strip().split(LOC_SEP) first_split: list[tuple[str, str]] = [tuple(l.strip().split(FIRST_SEP)) for l in raw_locations] second_split: list[tuple[str, list[str]]] = [(d, l.strip().split(OTHER_SEP)) for d, l in first_split] return [(parse_dist(d1), [parse_dist(d3) for d3 in d2]) for d1, d2 in second_split] def main(): """Testing.""" coords = (52.210796, 0.091659) print("Original:", coords, end="\n\n") encoded = encode(coords) print("Encoded:", encoded, end="\n\n") formatted = format_location(encoded) print("Formatted:", formatted, end="\n\n") parsed = parse_location(formatted) print("Parsed:", parsed, end="\n\n") decoded = decode(encoded) print("Decoded:", decoded, end="\n\n") error = spherical_dist(np.array(coords), np.array(decoded)) print(f"Error: {error:.3f}m") if __name__ == "__main__": main()