Skip to main content

integrations_management

Find full example here

auth.py

"""
Contains helper classes for authorization to the SYNQ server.
"""

import requests
import time
import grpc


class TokenAuth(grpc.AuthMetadataPlugin):
    """AuthMetadataPlugin which adds the access token to the outgoing context metadata."""

    def __init__(self, token_source):
        self._token_source = token_source

    def __call__(self, context, callback):
        try:
            token = self._token_source.get_token()
            callback([("authorization", f"Bearer {token}")], None)
        except Exception as e:
            callback(None, e)


class TokenSource:
    """Token source which maintains the access token and refreshes it when it is expired."""

    def __init__(self, client_id, client_secret, api_endpoint):
        self.api_endpoint = api_endpoint
        self.token_url = f"https://{self.api_endpoint}/oauth2/token"
        self.client_id = client_id
        self.client_secret = client_secret
        self.token = self.obtain_token()

    def obtain_token(self):
        resp = requests.post(
            self.token_url,
            data={
                "client_id": self.client_id,
                "client_secret": self.client_secret,
                "grant_type": "client_credentials",
            },
        )
        resp.raise_for_status()
        self.token = resp.json()
        self.expires_at = time.time() + self.token["expires_in"]
        return self.token

    def get_token(self) -> str:
        if time.time() > self.expires_at:
            self.obtain_token()
        return self.token["access_token"]

bigquery.py

"""
Warehouse-specific IntegrationsService features, using BigQuery as the example:

    create -> read generated outputs -> refresh -> health (+ run history) -> delete

Unlike dbt Cloud, a warehouse integration is "refreshable" (Capabilities.
can_refresh == True) and the server derives read-only Outputs from the supplied
credentials (for BigQuery, the service-account email to grant dataset access to).

The service-account key can be invalid for this demo: create still succeeds and
the management calls work end to end. A bad key simply surfaces later as an
unhealthy status from GetIntegrationHealth.

Prerequisites:
- SYNQ_CLIENT_ID and SYNQ_CLIENT_SECRET (scope: Manage Integrations)
- Optionally BIGQUERY_* connection settings (defaults are illustrative).
"""

import os
import sys

import grpc
from dotenv import load_dotenv

sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__))))

from auth import TokenSource, TokenAuth
from synq.integrations.v1 import (
    integrations_service_pb2 as svc,
    integrations_service_pb2_grpc as svc_grpc,
    integration_pb2,
    bigquery_conf_pb2,
)

load_dotenv()

CLIENT_ID = os.getenv("SYNQ_CLIENT_ID")
CLIENT_SECRET = os.getenv("SYNQ_CLIENT_SECRET")
API_ENDPOINT = os.getenv("API_ENDPOINT", "developer.synq.io")

if not CLIENT_ID or not CLIENT_SECRET:
    raise SystemExit("SYNQ_CLIENT_ID and SYNQ_CLIENT_SECRET must be set (scope: Manage Integrations)")

# BigQuery connection settings. Defaults are illustrative placeholders.
BQ_PROJECT_ID = os.getenv("BIGQUERY_PROJECT_ID", "example-project")
BQ_REGION = os.getenv("BIGQUERY_REGION", "EU")


def load_service_account_key():
    """Return the BigQuery service-account key JSON. BIGQUERY_SA_KEY may be a
    file path or inline JSON; an invalid key still lets management calls work."""
    v = os.getenv("BIGQUERY_SA_KEY")
    if not v:
        return '{"type":"service_account","project_id":"example-project"}'
    if os.path.isfile(v):
        with open(v) as f:
            return f.read()
    return v


def main():
    token_source = TokenSource(CLIENT_ID, CLIENT_SECRET, API_ENDPOINT)
    credentials = grpc.composite_channel_credentials(
        grpc.ssl_channel_credentials(),
        grpc.metadata_call_credentials(TokenAuth(token_source)),
    )
    with grpc.secure_channel(
        f"{API_ENDPOINT}:443",
        credentials,
        options=(("grpc.default_authority", API_ENDPOINT),),
    ) as channel:
        grpc.channel_ready_future(channel).result(timeout=10)
        print(f"Connected to {API_ENDPOINT}\n")
        stub = svc_grpc.IntegrationsServiceStub(channel)

        # --- Step 1: Create ---
        print("=== Step 1: Create BigQuery integration ===")
        config = integration_pb2.IntegrationConfig(
            bigquery=bigquery_conf_pb2.BigQueryCloudConf(
                project_id=BQ_PROJECT_ID,
                region=BQ_REGION,
                service_account_key=load_service_account_key(),
                # datasets left empty: discover all visible datasets.
            )
        )
        created = stub.CreateIntegration(
            svc.CreateIntegrationRequest(title="Example BigQuery", config=config)
        ).integration
        integration_id = created.id
        print(f"Created id={integration_id} etag={created.etag}")
        print(f'Service account key in response (masked): "{created.config.bigquery.service_account_key}"\n')

        # --- Step 2: Generated outputs + capabilities ---
        # Outputs are read-only values the server derives on create. For BigQuery
        # that is the service-account email you grant dataset access to.
        print("=== Step 2: Outputs & capabilities ===")
        print(f'Service account to grant access: "{created.outputs.bigquery.service_account_email}"')
        caps = created.capabilities
        print(f"can_refresh={caps.can_refresh} can_disable={caps.can_disable} can_delete={caps.can_delete}\n")

        # --- Step 3: Refresh ---
        # Valid only when Capabilities.can_refresh is true (warehouse types).
        print("=== Step 3: Trigger refresh ===")
        if caps.can_refresh:
            stub.RefreshIntegration(svc.RefreshIntegrationRequest(integration_id=integration_id))
            print("Refresh enqueued\n")
        else:
            print("Type does not support refresh\n")

        # --- Step 4: Health + run history ---
        # Omitting pagination returns a bounded recent window (last 7 days).
        print("=== Step 4: Health ===")
        health = stub.GetIntegrationHealth(
            svc.GetIntegrationHealthRequest(integration_id=integration_id)
        )
        print(f'status={svc.HealthStatus.Name(health.health.status)} '
              f'healthy={health.health.healthy} message="{health.health.message}"')
        for run in health.runs:
            print(f'  run {run.run_id} status={svc.HealthStatus.Name(run.status)} "{run.message}"')
        print()

        # --- Step 5: Delete ---
        print("=== Step 5: Delete ===")
        stub.DeleteIntegration(svc.DeleteIntegrationRequest(integration_id=integration_id))
        try:
            stub.GetIntegration(svc.GetIntegrationRequest(integration_id=integration_id))
            print("Unexpected: integration still present")
        except grpc.RpcError as e:
            print(f"Deleted {integration_id}; Get after delete -> {e.code().name}")

        print("\nDone: warehouse lifecycle (outputs, refresh, health) exercised end to end.")


if __name__ == "__main__":
    main()

dbt_cloud.py

"""
Integration management lifecycle using the Coalesce Quality IntegrationsService,
with a dbt Cloud connection as the example type:

    create -> get -> list -> update (with etag) -> disable -> enable -> health -> delete

dbt Cloud is a "managed" type: Coalesce Quality stores the connection and syncs
it in the background. This script works even with an invalid / disabled dbt Cloud
token: every management call (create, update, enable/disable, delete) still
succeeds because they operate on the stored configuration. Only the background
sync would fail, which you can observe later via GetIntegrationHealth.

Prerequisites:
- SYNQ_CLIENT_ID and SYNQ_CLIENT_SECRET (scope: Manage Integrations)
- Optionally DBT_CLOUD_* connection settings (defaults are illustrative).
"""

import os
import sys

import grpc
from dotenv import load_dotenv

sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__))))

from auth import TokenSource, TokenAuth
from synq.integrations.v1 import (
    integrations_service_pb2 as svc,
    integrations_service_pb2_grpc as svc_grpc,
    integration_pb2,
    dbt_cloud_conf_pb2,
)

load_dotenv()

CLIENT_ID = os.getenv("SYNQ_CLIENT_ID")
CLIENT_SECRET = os.getenv("SYNQ_CLIENT_SECRET")
API_ENDPOINT = os.getenv("API_ENDPOINT", "developer.synq.io")

if not CLIENT_ID or not CLIENT_SECRET:
    raise SystemExit("SYNQ_CLIENT_ID and SYNQ_CLIENT_SECRET must be set (scope: Manage Integrations)")

# dbt Cloud connection settings. Defaults are illustrative placeholders — the
# token can be invalid/disabled; the management calls below still succeed.
DBT_ACCOUNT_ID = os.getenv("DBT_CLOUD_ACCOUNT_ID", "12345")
DBT_PROJECT_ID = os.getenv("DBT_CLOUD_PROJECT_ID", "example-dbt-project")
DBT_TOKEN = os.getenv("DBT_CLOUD_TOKEN", "dbtc_disabled-demo-token")
DBT_API_ENDPOINT = os.getenv("DBT_CLOUD_API_ENDPOINT", "cloud.getdbt.com")
# Tracked dbt Cloud job ids (comma-separated). Empty tracks every job the token
# can see.
DBT_JOB_IDS = [j.strip() for j in os.getenv("DBT_CLOUD_JOB_IDS", "100,200").split(",") if j.strip()]


def dbt_config(token=None, project_id=DBT_PROJECT_ID, job_ids=None):
    """Build an IntegrationConfig for dbt Cloud. Omit ``token`` to keep the
    stored secret on update (write-only secret semantics). ``job_ids`` is
    replace-semantics: pass the FULL desired set, not a delta."""
    conf = dbt_cloud_conf_pb2.DbtCloudConf(
        account_id=DBT_ACCOUNT_ID,
        project_id=project_id,
        api_endpoint=DBT_API_ENDPOINT,
        job_ids=job_ids if job_ids is not None else DBT_JOB_IDS,
    )
    if token is not None:
        conf.token = token
    return integration_pb2.IntegrationConfig(dbt_cloud=conf)


def main():
    token_source = TokenSource(CLIENT_ID, CLIENT_SECRET, API_ENDPOINT)
    credentials = grpc.composite_channel_credentials(
        grpc.ssl_channel_credentials(),
        grpc.metadata_call_credentials(TokenAuth(token_source)),
    )
    with grpc.secure_channel(
        f"{API_ENDPOINT}:443",
        credentials,
        options=(("grpc.default_authority", API_ENDPOINT),),
    ) as channel:
        grpc.channel_ready_future(channel).result(timeout=10)
        print(f"Connected to {API_ENDPOINT}\n")
        stub = svc_grpc.IntegrationsServiceStub(channel)

        # --- Step 1: Create ---
        print("=== Step 1: Create dbt Cloud integration ===")
        created = stub.CreateIntegration(
            svc.CreateIntegrationRequest(title="Example dbt Cloud", config=dbt_config(token=DBT_TOKEN))
        ).integration
        integration_id = created.id
        print(f"Created id={integration_id} etag={created.etag}")
        print(f"Tracked job ids: {list(created.config.dbt_cloud.job_ids)}")
        # Secret is write-only: masked (empty) on every read.
        print(f'Token in response (masked): "{created.config.dbt_cloud.token}"\n')

        # --- Step 2: Get ---
        print("=== Step 2: Get ===")
        got = stub.GetIntegration(svc.GetIntegrationRequest(integration_id=integration_id)).integration
        print(f'title="{got.title}" disabled={got.disabled} project_id="{got.config.dbt_cloud.project_id}"\n')

        # --- Step 3: List ---
        print("=== Step 3: List all integrations ===")
        listed = stub.ListIntegrations(svc.ListIntegrationsRequest())
        print(f"Workspace has {len(listed.integrations)} integration(s)\n")

        # --- Step 4: Patch the tracked job ids (with optimistic concurrency) ---
        # The config is replaced wholesale, so job_ids is replace-semantics: send
        # the FULL desired set, not a delta. Here we drop one job and add another.
        # We omit the token to keep the stored one, and pass the etag we last read
        # so we only update the version we saw (a stale etag -> ABORTED / 409).
        patched_job_ids = ["100", "300"]  # was ["100","200"]: keep 100, drop 200, add 300
        print("=== Step 4: Patch tracked job ids ===")
        updated = stub.UpdateIntegration(
            svc.UpdateIntegrationRequest(
                integration_id=integration_id,
                title="Example dbt Cloud (updated)",
                etag=created.etag,
                config=dbt_config(token=None, job_ids=patched_job_ids),
            )
        ).integration
        print(f"Patched job ids: {DBT_JOB_IDS} -> {list(updated.config.dbt_cloud.job_ids)}")
        print(f"new etag={updated.etag}")

        # Re-using the now-stale etag is rejected — the concurrency guard.
        try:
            stub.UpdateIntegration(
                svc.UpdateIntegrationRequest(
                    integration_id=integration_id, etag=created.etag, config=dbt_config(token=None)
                )
            )
            print("Unexpected: stale etag accepted")
        except grpc.RpcError as e:
            ok = e.code() == grpc.StatusCode.ABORTED
            print(f"Stale etag rejected with {e.code().name} ({'expected' if ok else 'unexpected'})\n")

        # --- Step 5: Disable then Enable ---
        print("=== Step 5: Disable / Enable ===")
        dis = stub.DisableIntegration(svc.DisableIntegrationRequest(integration_id=integration_id)).integration
        print(f"disabled={dis.disabled} can_enable={dis.capabilities.can_enable}")
        en = stub.EnableIntegration(svc.EnableIntegrationRequest(integration_id=integration_id)).integration
        print(f"disabled={en.disabled} can_disable={en.capabilities.can_disable}\n")

        # --- Step 6: Health ---
        # Right after create there are usually no runs yet (UNSPECIFIED). If the
        # token is invalid/disabled, a later sync surfaces ERROR here.
        print("=== Step 6: Health ===")
        health = stub.GetIntegrationHealth(
            svc.GetIntegrationHealthRequest(integration_id=integration_id)
        )
        status_name = svc.HealthStatus.Name(health.health.status)
        print(f'status={status_name} healthy={health.health.healthy} runs={len(health.runs)}\n')

        # --- Step 7: Delete (with etag) ---
        print("=== Step 7: Delete ===")
        stub.DeleteIntegration(
            svc.DeleteIntegrationRequest(integration_id=integration_id, etag=en.etag)
        )
        try:
            stub.GetIntegration(svc.GetIntegrationRequest(integration_id=integration_id))
            print("Unexpected: integration still present")
        except grpc.RpcError as e:
            print(f"Deleted {integration_id}; Get after delete -> {e.code().name}")

        print("\nDone: full management lifecycle exercised end to end.")


if __name__ == "__main__":
    main()