Reading from a subresource
In this section, we'll implement a stream for the survey responses stream. This stream structure is a little different because it depends on the surveys stream.
Start by creating a new base class for substreams:
class SurveyMonkeySubstream(HttpStream, ABC):
    def __init__(self, name: str, path: str, primary_key: Union[str, List[str]], parent_stream: Stream, **kwargs: Any) -> None:
        self._name = name
        self._path = path
        self._primary_key = primary_key
        self._parent_stream = parent_stream
        super().__init__(**kwargs)
    url_base = "https://api.surveymonkey.com"
    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
        links = response.json().get("links", {})
        if "next" in links:
            return {"next_url": links["next"]}
        else:
            return {}
    def request_params(
        self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
    ) -> MutableMapping[str, Any]:
        if next_page_token:
            return urlparse(next_page_token["next_url"]).query
        else:
            return {"per_page": _PAGE_SIZE}
    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
        yield from response.json().get("data", [])
    @property
    def name(self) -> str:
        return self._name
    def path(
        self,
        *,
        stream_state: Optional[Mapping[str, Any]] = None,
        stream_slice: Optional[Mapping[str, Any]] = None,
        next_page_token: Optional[Mapping[str, Any]] = None,
    ) -> str:
        try:
            return self._path.format(stream_slice=stream_slice)
        except Exception as e:
            raise e
    @property
    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
        return self._primary_key
    def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
        for _slice in self._parent_stream.stream_slices():
            for parent_record in self._parent_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=_slice):
                yield parent_record
This class is similar to the base class, but it does not support incremental reads, and its stream slices are generated by reading records from a parent stream. This is how we'll ensure we always read all survey responses.
Note that using this approach, the connector will checkpoint after reading responses for each survey.
Don't forget to update the streams method to also instantiate the surveys responses stream:
    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
        auth = TokenAuthenticator(token=config["access_token"])
        surveys = SurveyMonkeyBaseStream(name="surveys", path="/v3/surveys", primary_key="id", data_field="data", cursor_field="date_modified", authenticator=auth)
        survey_responses = SurveyMonkeySubstream(name="survey_responses", path="/v3/surveys/{stream_slice[id]}/responses/", primary_key="id", authenticator=auth, parent_stream=surveys)
        return [
            surveys,
            survey_responses
            ]
Before moving on, we'll enable request caching on the surveys stream to avoid fetching the records
both for the surveys stream and for the survey responses stream. You can do this by setting the
use_cache property to true on the SurveyMonkeyBaseStream class.
    @property
    def use_cache(self) -> bool:
        return True
Now add the stream to the configured catalog:
{
  "streams": [
    {
      "stream": {
        "name": "surveys",
        "json_schema": {},
        "supported_sync_modes": ["full_refresh", "incremental"]
      },
      "sync_mode": "incremental",
      "destination_sync_mode": "overwrite"
    },
    {
      "stream": {
        "name": "survey_responses",
        "json_schema": {},
        "supported_sync_modes": ["full_refresh"]
      },
      "sync_mode": "full_refresh",
      "destination_sync_mode": "overwrite"
    }
  ]
}
and create a new schema file in source_survey_monkey_demo/schemas/survey_responses.json. You can
use the connector builder to generate the schema, or paste the one provided below:
{
  "$schema": "http://json-schema.org/schema#",
  "properties": {
    "analyze_url": {
      "type": ["string", "null"]
    },
    "collect_stats": {
      "properties": {
        "status": {
          "properties": {
            "open": {
              "type": ["number", "null"]
            }
          },
          "type": ["object", "null"]
        },
        "total_count": {
          "type": ["number", "null"]
        },
        "type": {
          "properties": {
            "weblink": {
              "type": ["number", "null"]
            }
          },
          "type": ["object", "null"]
        }
      },
      "type": ["object", "null"]
    },
    "date_created": {
      "type": ["string", "null"]
    },
    "date_modified": {
      "type": ["string", "null"]
    },
    "href": {
      "type": ["string", "null"]
    },
    "id": {
      "type": ["string", "null"]
    },
    "language": {
      "type": ["string", "null"]
    },
    "nickname": {
      "type": ["string", "null"]
    },
    "preview": {
      "type": ["string", "null"]
    },
    "question_count": {
      "type": ["number", "null"]
    },
    "response_count": {
      "type": ["number", "null"]
    },
    "title": {
      "type": ["string", "null"]
    }
  },
  "type": "object"
}
You should now be able to read your survey responses:
poetry run source-survey-monkey-demo read --config secrets/config.json --catalog integration_tests/configured_catalog.json
In the next section we'll update the connector so it reads stream slices concurrently.