257 lines
7.2 KiB
Python
Executable file
257 lines
7.2 KiB
Python
Executable file
#!/usr/bin/env python
|
|
|
|
import json
|
|
import os
|
|
import overpy
|
|
import numpy as np
|
|
import pandas as pd
|
|
import scipy
|
|
import random
|
|
|
|
from pathlib import Path
|
|
|
|
# brandname : overpass query filters
|
|
BRANDS: dict[str, str] = {
|
|
"greggs": "[\"brand:wikidata\"=\"Q3403981\"]",
|
|
"tesco": "[\"brand:wikidata\"~\"^(Q487494|Q98456772|Q25172225|Q65954217)$\"]", # Includes Tesco Express, Tesco Extra, and One Stop
|
|
}
|
|
|
|
CACHE_FOLDER = Path(".cache")
|
|
|
|
LOCS_COUNT = 5
|
|
DISTS_COUNT = 2
|
|
|
|
FORMAT_FACTOR = 1e6 # μm
|
|
FIRST_SEP = ':'
|
|
OTHER_SEP = ','
|
|
LOC_SEP = ';'
|
|
|
|
cached_dists = {}
|
|
cached_series = {}
|
|
|
|
EncodedLocation = list[tuple[float, list[float]]]
|
|
|
|
|
|
def fetch_data(brand: str, cache: bool = True) -> list[tuple[float, float]]:
|
|
"""Fetch a list of locations from OSM."""
|
|
cache_loc = (CACHE_FOLDER / f"{brand}.json")
|
|
|
|
# Try load from cache
|
|
if cache and cache_loc.exists():
|
|
with open(cache_loc, "r") as f:
|
|
data = json.load(f)
|
|
|
|
return data
|
|
|
|
api = overpy.Overpass()
|
|
|
|
filters = BRANDS[brand]
|
|
query = api.query(f"area[\"name\"=\"Great Britain\"]->.searchArea;nwr{filters}(area.searchArea); out center;")
|
|
|
|
result = []
|
|
|
|
for way in query.ways:
|
|
result.append((float(way.center_lat), float(way.center_lon)))
|
|
|
|
for node in query.nodes:
|
|
result.append((float(node.lat), float(node.lon)))
|
|
|
|
for (lat, lon) in result:
|
|
if (lat is None) or (lon is None):
|
|
raise ValueError("Item missing coords!")
|
|
|
|
# Save to cache
|
|
if cache:
|
|
if not CACHE_FOLDER.exists():
|
|
os.makedirs(CACHE_FOLDER)
|
|
|
|
with open(cache_loc, "w") as f:
|
|
json.dump(result, f)
|
|
|
|
print(f"Got {len(result)} {brand}s")
|
|
|
|
return result
|
|
|
|
|
|
def spherical_dist(pos1, pos2, r=6378137):
|
|
"""Calculate sperical distances between two arrays of coordinates.
|
|
|
|
Return value is the same unit as `r`.
|
|
`r` defaults to the radius of the earth, in meters.
|
|
"""
|
|
pos1 = pos1 * np.pi / 180
|
|
pos2 = pos2 * np.pi / 180
|
|
|
|
cos_lat1 = np.cos(pos1[..., 0])
|
|
cos_lat2 = np.cos(pos2[..., 0])
|
|
cos_lat_d = np.cos(pos1[..., 0] - pos2[..., 0])
|
|
cos_lon_d = np.cos(pos1[..., 1] - pos2[..., 1])
|
|
return r * np.arccos(cos_lat_d - cos_lat1 * cos_lat2 * (1 - cos_lon_d))
|
|
|
|
|
|
def get_dist_matrix(brand: str):
|
|
try:
|
|
return cached_dists[brand]
|
|
except KeyError:
|
|
greggs = np.array(fetch_data(brand))
|
|
|
|
repeat_rows = np.tile(greggs, (len(greggs), 1, 1))
|
|
repeat_cols = np.transpose(repeat_rows, (1, 0, 2))
|
|
dist_matrix = spherical_dist(repeat_rows, repeat_cols)
|
|
|
|
cached_dists[brand] = dist_matrix
|
|
|
|
return dist_matrix
|
|
|
|
|
|
def get_dist_series_list(brand: str, n_locs: int):
|
|
if brand in cached_series and cached_series[brand][0] == n_locs:
|
|
return cached_series[brand][1]
|
|
|
|
dist_matrix = get_dist_matrix("greggs")
|
|
|
|
# split the distances matrix into a list of series, which allows us to sort each row
|
|
dist_series_list = []
|
|
for i in dist_matrix:
|
|
dist_series_list.append(pd.Series(i).sort_values().head(n_locs+1)[1:]) #ignore the 0th element (dist to itself)
|
|
|
|
cached_series[brand] = (n_locs, dist_series_list)
|
|
|
|
return dist_series_list
|
|
|
|
|
|
# (lat, lon), dist
|
|
StationT = tuple[tuple[float, float], float]
|
|
|
|
|
|
def trilat_error(stations: list[StationT], position: tuple[float, float]) -> float:
|
|
"""Calculate the error in a trilaterated position."""
|
|
sq_errors = [(sdist - spherical_dist(np.array(position), np.array(spos))) ** 2 for spos, sdist in stations]
|
|
|
|
return sum(sq_errors) / len(sq_errors)
|
|
|
|
|
|
def trilaterate(stations: list[StationT], disp: bool = False) -> tuple[float, float]:
|
|
"""Trilaterate a position, given a list of stations.
|
|
|
|
Each station is of the format ((lat, lon), distance).
|
|
"""
|
|
|
|
best_err = None
|
|
best = (0., 0.)
|
|
|
|
for pos, _ in stations:
|
|
res = scipy.optimize.minimize(
|
|
lambda pos: trilat_error(stations, pos),
|
|
# stations[0][0],
|
|
pos,
|
|
method='Nelder-Mead',
|
|
)
|
|
|
|
if not res.success:
|
|
continue
|
|
|
|
if best_err is None or res.fun < best_err:
|
|
best = res.x
|
|
best_err = res.fun
|
|
|
|
return best
|
|
|
|
|
|
def encode_greggs(loc: int) -> list[float]:
|
|
"""Given the id of a greggs, encode as a list of distances."""
|
|
|
|
dist_matrix = get_dist_matrix("greggs")
|
|
greggs_distances = np.sort(dist_matrix[loc])[1:DISTS_COUNT+1]
|
|
|
|
return list(map(float, greggs_distances))
|
|
|
|
|
|
def decode_greggs(distances: list[float]) -> int:
|
|
"""Get the id of a greggs given a list of distances."""
|
|
|
|
dist_series_list = get_dist_series_list("greggs", len(distances))
|
|
|
|
errors = [sum((pd.Series(j) - distances) ** 2) for j in dist_series_list]
|
|
|
|
minerr = min(errors)
|
|
if minerr > 1:
|
|
print(f"warning: high error value of {minerr}")
|
|
|
|
return errors.index(min(errors))
|
|
|
|
|
|
def encode(location: tuple[float, float]) -> EncodedLocation:
|
|
"""Encode a location."""
|
|
|
|
greggs = np.array(fetch_data("greggs"))
|
|
|
|
repeated = np.tile(location, (len(greggs), 1))
|
|
distances = pd.Series(spherical_dist(repeated, greggs)).sort_values()
|
|
closest = distances.head(LOCS_COUNT)
|
|
|
|
return [(v, encode_greggs(i)) for v, i in zip(closest.values, closest.index)]
|
|
|
|
# distances = spherical_dist(repeated, greggs)
|
|
# distances_i = [(i, dist) for i, dist in enumerate(distances)]
|
|
# random.shuffle(distances_i)
|
|
# closest = distances_i[:LOCS_COUNT]
|
|
|
|
# return [(v, encode_greggs(i)) for i, v in closest]
|
|
|
|
|
|
def decode(location: EncodedLocation, disp: bool = False) -> tuple[float, float]:
|
|
"""Decode into a location."""
|
|
# part 1: find the ID of each gregg's
|
|
closest_greggs = [decode_greggs(dists) for _, dists in location]
|
|
|
|
# part 2: trilaterate
|
|
greggs = fetch_data("greggs")
|
|
stations: list[StationT] = [(greggs[g], location[i][0]) for i, g in enumerate(closest_greggs)]
|
|
return trilaterate(stations, disp=disp)
|
|
|
|
|
|
def format_dist(dist: float) -> str:
|
|
return f"{int(round(dist * FORMAT_FACTOR))}"
|
|
|
|
|
|
def parse_dist(dist: str) -> float:
|
|
return float(dist) / FORMAT_FACTOR
|
|
|
|
|
|
def format_location(location: EncodedLocation) -> str:
|
|
"""Format an encoded location as a string."""
|
|
return LOC_SEP.join([f"{format_dist(a)}{FIRST_SEP}{OTHER_SEP.join(map(format_dist, b))}" for (a, b) in location])
|
|
|
|
|
|
def parse_location(location: str) -> EncodedLocation:
|
|
"""Parse a location string into an EncodedLocation."""
|
|
raw_locations = location.strip().split(LOC_SEP)
|
|
first_split: list[tuple[str, str]] = [tuple(l.strip().split(FIRST_SEP)) for l in raw_locations]
|
|
second_split: list[tuple[str, list[str]]] = [(d, l.strip().split(OTHER_SEP)) for d, l in first_split]
|
|
return [(parse_dist(d1), [parse_dist(d3) for d3 in d2]) for d1, d2 in second_split]
|
|
|
|
|
|
def main():
|
|
"""Testing."""
|
|
coords = (52.210796, 0.091659)
|
|
print("Original:", coords, end="\n\n")
|
|
|
|
encoded = encode(coords)
|
|
print("Encoded:", encoded, end="\n\n")
|
|
|
|
formatted = format_location(encoded)
|
|
print("Formatted:", formatted, end="\n\n")
|
|
|
|
parsed = parse_location(formatted)
|
|
print("Parsed:", parsed, end="\n\n")
|
|
|
|
decoded = decode(encoded, disp=True)
|
|
print("Decoded:", decoded, end="\n\n")
|
|
|
|
error = spherical_dist(np.array(coords), np.array(decoded))
|
|
print(f"Error: {error:.3f}m")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|