Read implementation

This is the official documentation of the forestadmin-agent-django and forestadmin-agent-flask Python agents.

Developing your query translation layer is much easier when you can preview your work and have intermediary deliverables.

Emulation comes to the rescue: all features that need to be implemented when making a translating data source can be emulated inside your Node.js, at the cost of performance.

This enables you to be up and running in minutes and then optimize your code as you go.

from typings import Optional

from forestadmin.agent_toolkit.utils.context import User
from forestadmin.datasource_toolkit.collections import Collection
from forestadmin.datasource_toolkit.interfaces.query.aggregation import AggregateResult, Aggregation
from forestadmin.datasource_toolkit.interfaces.query.filter.paginated import PaginatedFilter
from forestadmin.datasource_toolkit.interfaces.query.filter.unpaginated import Filter
from forestadmin.datasource_toolkit.interfaces.query.projections import Projection
from forestadmin.datasource_toolkit.interfaces.records import RecordsDataAlias

import requests

class MyCollection(Collection):
"""
    This collection will have terrible performance, but is perfect to test that the
    structure declaration is well done.
"""
    # [... Declare structure and capabilities]
    async def list(
        self,
        caller: User,
        filter_: PaginatedFilter,
        projection: Projection
    ) -> List[RecordsDataAlias]:
        # Fetch all records on all requests (this is _very_ inefficient)
        response = requests.get('https://my-api/my-collection')
        result = response.json()["items"]

        # Use "in-process emulation" for everything else.
        if filter_.condition_tree:
            result = filter_.condition_tree.apply(result, self, str(caller.timezone))
        if filter_.sort:
            result = filter_.sort.apply(result)
        if filter_.page:
            result = filter_.page.apply(result)
        if filter_.segment:
            raise Exception('This collection does not implements native segments')
        if filter_.search:
            raise Exception('This collection is not natively searchable')

        return projection.apply(result)

    async def aggregate(
        self,
        caller: User,
        filter_: Filter,
        aggregation: Aggregation,
        limit: Optional[int]
    ) -> List[AggregateResult]:
      # Fetch all records which should be aggregated
      records = await self.list(
        caller, PaginatedFilter.from_base_filter(filter_), aggregation.projection
      )

      # Use "in-process emulation" to aggregate the results
      return aggregation.apply(records, str(caller.timezone), limit)

Tips

Count queries

The aggregate method is used by Forest Admin both to count records and to extract the data which is needed to generate charts.

If the API/Database you are targeting has an efficient API that is made for counting records, you may want to handle this case first.

from forestadmin.agent_toolkit.utils.context import User
from forestadmin.datasource_toolkit.collections import Collection
from forestadmin.datasource_toolkit.interfaces.query.aggregation import AggregateResult, Aggregation
from forestadmin.datasource_toolkit.interfaces.query.filter.unpaginated import Filter

import requests

class MyCollection(Collection):
    # [... Declare structure and capabilities]

    async def aggregate(
        self,
        caller: User,
        filter_: Filter,
        aggregation: Aggregation,
        limit: Optional[int]
    ) -> List[AggregateResult]:
        if aggregation.operation.value == "Count" and len(aggregation.groups) == 0:
            return [{"value": await self.count(caller, filter_)}]

    async def count(self, caller: User, filter_: Filter):
        response = requests.get('https://my-api/my-collection/count', self._translate_filter(caller, filter_))
        records = response.json()["items"]

    def _translate_filter(self, caller: User, filter_: Filter):
        # [... translate filter]

Last updated