#!/usr/bin/env python import json import os import overpy import numpy as np import pandas as pd import scipy import random from pathlib import Path # brandname : overpass query filters BRANDS: dict[str, str] = { "greggs": "[\"brand:wikidata\"=\"Q3403981\"]", "tesco": "[\"brand:wikidata\"~\"^(Q487494|Q98456772|Q25172225|Q65954217)$\"]", # Includes Tesco Express, Tesco Extra, and One Stop } CACHE_FOLDER = Path(".cache") LOCS_COUNT = 5 DISTS_COUNT = 2 FORMAT_FACTOR = 1e6 # μm FIRST_SEP = ':' OTHER_SEP = ',' LOC_SEP = '; ' cached_dists = {} cached_series = {} EncodedLocation = list[tuple[float, list[float]]] def fetch_data(brand: str, cache: bool = True) -> list[tuple[float, float]]: """Fetch a list of locations from OSM.""" cache_loc = (CACHE_FOLDER / f"{brand}.json") # Try load from cache if cache and cache_loc.exists(): with open(cache_loc, "r") as f: data = json.load(f) return data api = overpy.Overpass() filters = BRANDS[brand] query = api.query(f"area[\"name\"=\"Great Britain\"]->.searchArea;nwr{filters}(area.searchArea); out center;") result = [] for way in query.ways: result.append((float(way.center_lat), float(way.center_lon))) for node in query.nodes: result.append((float(node.lat), float(node.lon))) for (lat, lon) in result: if (lat is None) or (lon is None): raise ValueError("Item missing coords!") # Save to cache if cache: if not CACHE_FOLDER.exists(): os.makedirs(CACHE_FOLDER) with open(cache_loc, "w") as f: json.dump(result, f) print(f"Got {len(result)} {brand}s") return result def spherical_dist(pos1, pos2, r=6378137): """Calculate sperical distances between two arrays of coordinates. Return value is the same unit as `r`. `r` defaults to the radius of the earth, in meters. """ pos1 = pos1 * np.pi / 180 pos2 = pos2 * np.pi / 180 cos_lat1 = np.cos(pos1[..., 0]) cos_lat2 = np.cos(pos2[..., 0]) cos_lat_d = np.cos(pos1[..., 0] - pos2[..., 0]) cos_lon_d = np.cos(pos1[..., 1] - pos2[..., 1]) return r * np.arccos(cos_lat_d - cos_lat1 * cos_lat2 * (1 - cos_lon_d)) def get_dist_matrix(brand: str): try: return cached_dists[brand] except KeyError: greggs = np.array(fetch_data(brand)) repeat_rows = np.tile(greggs, (len(greggs), 1, 1)) repeat_cols = np.transpose(repeat_rows, (1, 0, 2)) dist_matrix = spherical_dist(repeat_rows, repeat_cols) cached_dists[brand] = dist_matrix return dist_matrix def get_dist_series_list(brand: str, n_locs: int): if brand in cached_series and cached_series[brand][0] == n_locs: return cached_series[brand][1] dist_matrix = get_dist_matrix("greggs") # split the distances matrix into a list of series, which allows us to sort each row dist_series_list = [] for i in dist_matrix: dist_series_list.append(pd.Series(i).sort_values().head(n_locs+1)[1:]) #ignore the 0th element (dist to itself) cached_series[brand] = (n_locs, dist_series_list) return dist_series_list # (lat, lon), dist StationT = tuple[tuple[float, float], float] def trilat_error(stations: list[StationT], position: tuple[float, float]) -> float: """Calculate the error in a trilaterated position.""" sq_errors = [(sdist - spherical_dist(np.array(position), np.array(spos))) ** 2 for spos, sdist in stations] return sum(sq_errors) / len(sq_errors) def trilaterate(stations: list[StationT], disp: bool = False) -> tuple[float, float]: """Trilaterate a position, given a list of stations. Each station is of the format ((lat, lon), distance). """ best_err = None best = (0., 0.) for pos, _ in stations: res = scipy.optimize.minimize( lambda pos: trilat_error(stations, pos), # stations[0][0], pos, method='Nelder-Mead', ) if not res.success: continue if best_err is None or res.fun < best_err: best = res.x best_err = res.fun return best def encode_greggs(loc: int) -> list[float]: """Given the id of a greggs, encode as a list of distances.""" dist_matrix = get_dist_matrix("greggs") greggs_distances = np.sort(dist_matrix[loc])[1:DISTS_COUNT+1] return list(map(float, greggs_distances)) def decode_greggs(distances: list[float]) -> int: """Get the id of a greggs given a list of distances.""" dist_series_list = get_dist_series_list("greggs", len(distances)) errors = [sum((pd.Series(j) - distances) ** 2) for j in dist_series_list] minerr = min(errors) if minerr > 1: print(f"warning: high error value of {minerr}") return errors.index(min(errors)) def encode(location: tuple[float, float]) -> EncodedLocation: """Encode a location.""" greggs = np.array(fetch_data("greggs")) repeated = np.tile(location, (len(greggs), 1)) distances = pd.Series(spherical_dist(repeated, greggs)).sort_values() closest = distances.head(LOCS_COUNT) return [(v, encode_greggs(i)) for v, i in zip(closest.values, closest.index)] # distances = spherical_dist(repeated, greggs) # distances_i = [(i, dist) for i, dist in enumerate(distances)] # random.shuffle(distances_i) # closest = distances_i[:LOCS_COUNT] # return [(v, encode_greggs(i)) for i, v in closest] def decode(location: EncodedLocation, disp: bool = False) -> tuple[float, float]: """Decode into a location.""" # part 1: find the ID of each gregg's closest_greggs = [decode_greggs(dists) for _, dists in location] # part 2: trilaterate greggs = fetch_data("greggs") stations: list[StationT] = [(greggs[g], location[i][0]) for i, g in enumerate(closest_greggs)] return trilaterate(stations, disp=disp) def format_dist(dist: float) -> str: return f"{int(round(dist * FORMAT_FACTOR))}" def parse_dist(dist: str) -> float: return float(dist) / FORMAT_FACTOR def format_location(location: EncodedLocation) -> str: """Format an encoded location as a string.""" return LOC_SEP.join([f"{format_dist(a)}{FIRST_SEP}{OTHER_SEP.join(map(format_dist, b))}" for (a, b) in location]) def parse_location(location: str) -> EncodedLocation: """Parse a location string into an EncodedLocation.""" raw_locations = location.strip().split(LOC_SEP) first_split: list[tuple[str, str]] = [tuple(l.strip().split(FIRST_SEP)) for l in raw_locations] second_split: list[tuple[str, list[str]]] = [(d, l.strip().split(OTHER_SEP)) for d, l in first_split] return [(parse_dist(d1), [parse_dist(d3) for d3 in d2]) for d1, d2 in second_split] def main(): """Testing.""" coords = (52.210796, 0.091659) print("Original:", coords, end="\n\n") encoded = encode(coords) print("Encoded:", encoded, end="\n\n") formatted = format_location(encoded) print("Formatted:", formatted, end="\n\n") parsed = parse_location(formatted) print("Parsed:", parsed, end="\n\n") decoded = decode(encoded, disp=True) print("Decoded:", decoded, end="\n\n") error = spherical_dist(np.array(coords), np.array(decoded)) print(f"Error: {error:.3f}m") if __name__ == "__main__": main()