Read implementation
from typings import Optional
from forestadmin.agent_toolkit.utils.context import User
from forestadmin.datasource_toolkit.collections import Collection
from forestadmin.datasource_toolkit.interfaces.query.aggregation import AggregateResult, Aggregation
from forestadmin.datasource_toolkit.interfaces.query.filter.paginated import PaginatedFilter
from forestadmin.datasource_toolkit.interfaces.query.filter.unpaginated import Filter
from forestadmin.datasource_toolkit.interfaces.query.projections import Projection
from forestadmin.datasource_toolkit.interfaces.records import RecordsDataAlias
import requests
class MyCollection(Collection):
"""
This collection will have terrible performance, but is perfect to test that the
structure declaration is well done.
"""
# [... Declare structure and capabilities]
async def list(
self,
caller: User,
filter_: PaginatedFilter,
projection: Projection
) -> List[RecordsDataAlias]:
# Fetch all records on all requests (this is _very_ inefficient)
response = requests.get('https://my-api/my-collection')
result = response.json()["items"]
# Use "in-process emulation" for everything else.
if filter_.condition_tree:
result = filter_.condition_tree.apply(result, self, str(caller.timezone))
if filter_.sort:
result = filter_.sort.apply(result)
if filter_.page:
result = filter_.page.apply(result)
if filter_.segment:
raise Exception('This collection does not implements native segments')
if filter_.search:
raise Exception('This collection is not natively searchable')
return projection.apply(result)
async def aggregate(
self,
caller: User,
filter_: Filter,
aggregation: Aggregation,
limit: Optional[int]
) -> List[AggregateResult]:
# Fetch all records which should be aggregated
records = await self.list(
caller, PaginatedFilter.from_base_filter(filter_), aggregation.projection
)
# Use "in-process emulation" to aggregate the results
return aggregation.apply(records, str(caller.timezone), limit)
Tips
Count queries
Last updated