CamHack25/engine.py
2025-11-02 10:45:01 +00:00

210 lines
5.9 KiB
Python
Executable file

#!/usr/bin/env python
import json
import os
import overpy
import numpy as np
import pandas as pd
import scipy
from pathlib import Path
# brandname : overpass query filters
BRANDS: dict[str, str] = {
"greggs": "[\"brand:wikidata\"=\"Q3403981\"]",
"tesco": "[\"brand:wikidata\"~\"^(Q487494|Q98456772|Q25172225|Q65954217)$\"]", # Includes Tesco Express, Tesco Extra, and One Stop
}
CACHE_FOLDER = Path(".cache")
LOCS_COUNT = 3
DISTS_COUNT = 10
FORMAT_FACTOR = 1e6 # μm
FIRST_SEP = ':'
OTHER_SEP = ','
LOC_SEP = ';'
EncodedLocation = list[tuple[float, list[float]]]
def fetch_data(brand: str, cache: bool = True) -> list[tuple[float, float]]:
"""Fetch a list of locations from OSM."""
cache_loc = (CACHE_FOLDER / f"{brand}.json")
# Try load from cache
if cache and cache_loc.exists():
with open(cache_loc, "r") as f:
data = json.load(f)
return data
api = overpy.Overpass()
filters = BRANDS[brand]
query = api.query(f"nwr{filters}; out center;")
result = []
for way in query.ways:
result.append((float(way.center_lat), float(way.center_lon)))
for node in query.nodes:
result.append((float(node.lat), float(node.lon)))
for (lat, lon) in result:
if (lat is None) or (lon is None):
raise ValueError("Item missing coords!")
# Save to cache
if cache:
if not CACHE_FOLDER.exists():
os.makedirs(CACHE_FOLDER)
with open(cache_loc, "w") as f:
json.dump(result, f)
print(f"Got {len(result)} {brand}s")
return result
def spherical_dist(pos1, pos2, r=6378137):
"""Calculate sperical distances between two arrays of coordinates.
Return value is the same unit as `r`.
`r` defaults to the radius of the earth, in meters.
"""
pos1 = pos1 * np.pi / 180
pos2 = pos2 * np.pi / 180
cos_lat1 = np.cos(pos1[..., 0])
cos_lat2 = np.cos(pos2[..., 0])
cos_lat_d = np.cos(pos1[..., 0] - pos2[..., 0])
cos_lon_d = np.cos(pos1[..., 1] - pos2[..., 1])
return r * np.arccos(cos_lat_d - cos_lat1 * cos_lat2 * (1 - cos_lon_d))
# (lat, lon), dist
StationT = tuple[tuple[float, float], float]
def trilat_error(stations: list[StationT], position: tuple[float, float]) -> float:
"""Calculate the error in a trilaterated position."""
sq_errors = [(sdist - spherical_dist(np.array(position), np.array(spos))) ** 2 for spos, sdist in stations]
return sum(sq_errors) / len(sq_errors)
def trilaterate(stations: list[StationT]) -> tuple[float, float]:
"""Trilaterate a position, given a list of stations.
Each station is of the format ((lat, lon), distance).
"""
return scipy.optimize.fmin(lambda pos: trilat_error(stations, pos), (0., 0.))
def encode(location: tuple[float, float]) -> EncodedLocation:
"""Encode a location."""
greggs = np.array(fetch_data("greggs"))
repeat_rows = np.tile(greggs, (len(greggs), 1, 1))
repeat_cols = np.transpose(repeat_rows, (1, 0, 2))
dist_matrix = spherical_dist(repeat_rows, repeat_cols)
repeated = np.tile(location, (len(greggs), 1))
distances = spherical_dist(repeated, greggs)
distances = pd.Series(distances)
distances = distances.sort_values()
closest = distances.head(LOCS_COUNT)
result: EncodedLocation = []
for v, i in zip(closest.values, closest.index):
greggs_distances = np.sort(dist_matrix[i])[1:DISTS_COUNT+1]
result.append((v, list(map(float, greggs_distances))))
# Stub
return result
def decode(location: EncodedLocation) -> tuple[float, float]:
"""Decode into a location."""
# form the distances matrix
greggs_raw = fetch_data("greggs")
greggs = np.array(greggs_raw)
repeat_rows = np.tile(greggs, (len(greggs), 1, 1))
repeat_cols = np.transpose(repeat_rows, (1, 0, 2))
dist_matrix = spherical_dist(repeat_rows, repeat_cols)
# split the distances matrix into a list of series, which allows us to sort each row
dist_series_list = []
for i in dist_matrix:
dist_series_list.append(pd.Series(i).sort_values().head(len(location[0][1])+1)[1:])
# part 1: find the ID of each gregg's
closest_greggs = []
for loc in location:
dists = loc[1]
errors = [sum((j - dists) ** 2) for j in dist_series_list]
minerr = min(errors)
if minerr > 1:
print(f"warning: high error value of {minerr}")
closest_greggs.append(errors.index(min(errors)))
# part 2: trilaterate
stations: list[StationT] = [(greggs_raw[g], location[i][0]) for i, g in enumerate(closest_greggs)]
return trilaterate(stations)
def format_dist(dist: float) -> str:
return f"{int(round(dist * FORMAT_FACTOR))}"
def parse_dist(dist: str) -> float:
return float(dist) / FORMAT_FACTOR
def format_location(location: EncodedLocation) -> str:
"""Format an encoded location as a string."""
return LOC_SEP.join([f"{format_dist(a)}{FIRST_SEP}{OTHER_SEP.join(map(format_dist, b))}" for (a, b) in location])
def parse_location(location: str) -> EncodedLocation:
"""Parse a location string into an EncodedLocation."""
raw_locations = location.strip().split(LOC_SEP)
first_split: list[tuple[str, str]] = [tuple(l.strip().split(FIRST_SEP)) for l in raw_locations]
second_split: list[tuple[str, list[str]]] = [(d, l.strip().split(OTHER_SEP)) for d, l in first_split]
return [(parse_dist(d1), [parse_dist(d3) for d3 in d2]) for d1, d2 in second_split]
def main():
"""Testing."""
coords = (52.210796, 0.091659)
print("Original:", coords, end="\n\n")
encoded = encode(coords)
print("Encoded:", encoded, end="\n\n")
formatted = format_location(encoded)
print("Formatted:", formatted, end="\n\n")
parsed = parse_location(formatted)
print("Parsed:", parsed, end="\n\n")
decoded = decode(encoded)
print("Decoded:", decoded, end="\n\n")
error = spherical_dist(np.array(coords), np.array(decoded))
print(f"Error: {error:.3f}m")
if __name__ == "__main__":
main()