# Copyright The Koukan Authors # SPDX-License-Identifier: Apache-2.0 # derived from # https://github.com/jsbucy/koukan/blob/main/koukan/config.py from typing import Any, Callable, Dict, Optional, Tuple, Union from abc import ABC, abstractmethod class SyncFilter(ABC): @abstractmethod def on_update(self, tx : dict, tx_delta : dict) -> dict: raise NotImplementedError() class FilterSpec: def __init__(self, builder): self.builder = builder class FilterImpl(SyncFilter): def __init__(self, next : SyncFilter): self.next = next def on_update(self, tx, delta): pass class Exploder(SyncFilter): def __init__(self): pass def on_update(self, tx, delta): pass class Config: endpoint_yaml : Optional[dict] = None def __init__(self): self.filters = { 'exploder': FilterSpec(self.exploder), 'message_builder': FilterSpec(self.message_builder), 'message_parser': FilterSpec(self.message_parser), 'remote_host': FilterSpec(self.remote_host), 'received_header': FilterSpec(self.received_header), 'relay_auth': FilterSpec(self.relay_auth), 'dns_resolution': FilterSpec(self.dns_resolution), 'add_route': FilterSpec(self.add_route), } def exploder(self, yaml, next): assert next is None return Exploder() def add_route(self, yaml, next): output = self.get_endpoint(yaml['output_chain']) if output is None: return None add_route, output_yaml = output return FilterImpl(add_route) def message_builder(self, yaml, next): return FilterImpl(next) def message_parser(self, yaml, next): return FilterImpl(next) def remote_host(self, yaml, next): return FilterImpl(next) def received_header(self, yaml, next): return FilterImpl(next) def relay_auth(self, yaml, next): return FilterImpl(next) def dns_resolution(self, yaml, next): return FilterImpl(next) def get_endpoint(self, host) -> Optional[Tuple[SyncFilter, dict]]: if (endpoint_yaml := self.endpoint_yaml.get(host, None)) is None: return None next : Optional[SyncFilter] = None for filter_yaml in reversed(endpoint_yaml['chain']): filter_name = filter_yaml['filter'] spec = self.filters[filter_name] endpoint = spec.builder(filter_yaml, next) assert isinstance(endpoint, SyncFilter) next = endpoint assert next is not None return next, endpoint_yaml