atlan_integration

Find full example here

main.go

package main

import (
	extensionsatlanintegrationsv1grpc "buf.build/gen/go/getsynq/api/grpc/go/synq/extensions/atlan/integrations/v1/integrationsv1grpc"
	extensionsatlanintegrationsv1 "buf.build/gen/go/getsynq/api/protocolbuffers/go/synq/extensions/atlan/integrations/v1"
	"context"
	"crypto/tls"
	"encoding/json"
	"fmt"
	"golang.org/x/oauth2/clientcredentials"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc/credentials/oauth"
	"os"
)

func main() {
	ctx := context.Background()

	host := "developer.synq.io"
	port := "443"
	apiUrl := fmt.Sprintf("%s:%s", host, port)

	clientID := os.Getenv("SYNQ_CLIENT_ID")
	clientSecret := os.Getenv("SYNQ_CLIENT_SECRET")
	tokenURL := fmt.Sprintf("https://%s/oauth2/token", host)

	config := &clientcredentials.Config{
		ClientID:     clientID,
		ClientSecret: clientSecret,
		TokenURL:     tokenURL,
	}
	oauthTokenSource := oauth.TokenSource{TokenSource: config.TokenSource(ctx)}
	creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: false})
	opts := []grpc.DialOption{
		grpc.WithTransportCredentials(creds),
		grpc.WithPerRPCCredentials(oauthTokenSource),
		grpc.WithAuthority(host),
	}

	conn, err := grpc.DialContext(ctx, apiUrl, opts...)
	if err != nil {
		panic(err)
	}
	defer conn.Close()

	fmt.Printf("Connected to API...\n\n")

	integrationsApi := extensionsatlanintegrationsv1grpc.NewAtlanIntegrationServiceClient(conn)

	// Upsert tenant info and validate.
	{
		tenantUrl := os.Getenv("ATLAN_TENANT_URL")
		tenantApiToken := os.Getenv("ATLAN_API_TOKEN")
		resp, err := integrationsApi.Upsert(ctx, &extensionsatlanintegrationsv1.UpsertRequest{
			AtlanTenantUrl: tenantUrl,
			AtlanApiToken:  tenantApiToken,
		})
		if err != nil {
			panic(err)
		}
		if !resp.Integration.IsValid {
			panic("integration not valid: connection to atlan failed")
		}
	}

	// Fetch integration.
	{
		resp, err := integrationsApi.Get(ctx, &extensionsatlanintegrationsv1.GetRequest{})
		if err != nil {
			panic(err)
		}
		b, _ := json.Marshal(resp.Integration)
		fmt.Printf("Existing integration -> %+v", string(b))
	}

}

main.go

package main

import (
	extensionsatlanproviderv1grpc "buf.build/gen/go/getsynq/api/grpc/go/synq/extensions/atlan/provider/v1/providerv1grpc"
	extensionsatlanproviderv1 "buf.build/gen/go/getsynq/api/protocolbuffers/go/synq/extensions/atlan/provider/v1"
	"context"
	"crypto/tls"
	"fmt"
	"golang.org/x/oauth2/clientcredentials"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc/credentials/oauth"
	"os"
)

func main() {
	ctx := context.Background()

	host := "developer.synq.io"
	port := "443"
	apiUrl := fmt.Sprintf("%s:%s", host, port)

	clientID := os.Getenv("SYNQ_CLIENT_ID")
	clientSecret := os.Getenv("SYNQ_CLIENT_SECRET")
	tokenURL := fmt.Sprintf("https://%s/oauth2/token", host)

	config := &clientcredentials.Config{
		ClientID:     clientID,
		ClientSecret: clientSecret,
		TokenURL:     tokenURL,
	}
	oauthTokenSource := oauth.TokenSource{TokenSource: config.TokenSource(ctx)}
	creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: false})
	opts := []grpc.DialOption{
		grpc.WithTransportCredentials(creds),
		grpc.WithPerRPCCredentials(oauthTokenSource),
		grpc.WithAuthority(host),
	}

	conn, err := grpc.DialContext(ctx, apiUrl, opts...)
	if err != nil {
		panic(err)
	}
	defer conn.Close()

	fmt.Printf("Connected to API...\n\n")

	atlanProviderApi := extensionsatlanproviderv1grpc.NewAtlanProviderServiceClient(conn)

	// Requires valid integration created in 1_setup_integration.

	// Fetch visible connections, products and domains.
	{
		resp, err := atlanProviderApi.GetAtlanConnections(ctx, &extensionsatlanproviderv1.GetAtlanConnectionsRequest{})
		if err != nil {
			panic(err)
		}
		fmt.Printf("%d Visible Atlan Connections:\n", len(resp.Connections))
		for _, connection := range resp.Connections {
			fmt.Printf(" -> %+v\n", connection)
		}
	}

	{
		resp, err := atlanProviderApi.GetAtlanDataProducts(ctx, &extensionsatlanproviderv1.GetAtlanDataProductsRequest{})
		if err != nil {
			panic(err)
		}
		fmt.Printf("%d Visible Atlan DataProducts:\n", len(resp.DataProducts))
		for _, dataProduct := range resp.DataProducts {
			fmt.Printf(" -> %+v\n", dataProduct)
		}
	}

	{
		resp, err := atlanProviderApi.GetAtlanDomains(ctx, &extensionsatlanproviderv1.GetAtlanDomainsRequest{})
		if err != nil {
			panic(err)
		}
		fmt.Printf("%d Visible Atlan DataDomains:\n", len(resp.Domains))
		for _, domain := range resp.Domains {
			fmt.Printf(" -> %+v\n", domain)
		}
	}
}

main.go

package main

import (
	extensionsatlanintegrationsv1grpc "buf.build/gen/go/getsynq/api/grpc/go/synq/extensions/atlan/integrations/v1/integrationsv1grpc"
	extensionsatlanworkflowsv1grpc "buf.build/gen/go/getsynq/api/grpc/go/synq/extensions/atlan/workflows/v1/workflowsv1grpc"
	extensionsatlanintegrationsv1 "buf.build/gen/go/getsynq/api/protocolbuffers/go/synq/extensions/atlan/integrations/v1"
	extensionsatlanworkflowsv1 "buf.build/gen/go/getsynq/api/protocolbuffers/go/synq/extensions/atlan/workflows/v1"
	platformsv1 "buf.build/gen/go/getsynq/api/protocolbuffers/go/synq/platforms/v1"
	"context"
	"crypto/tls"
	"encoding/json"
	"fmt"
	"golang.org/x/oauth2/clientcredentials"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc/credentials/oauth"
	"os"
)

func main() {
	ctx := context.Background()

	host := "developer.synq.io"
	port := "443"
	apiUrl := fmt.Sprintf("%s:%s", host, port)

	clientID := os.Getenv("SYNQ_CLIENT_ID")
	clientSecret := os.Getenv("SYNQ_CLIENT_SECRET")
	tokenURL := fmt.Sprintf("https://%s/oauth2/token", host)

	config := &clientcredentials.Config{
		ClientID:     clientID,
		ClientSecret: clientSecret,
		TokenURL:     tokenURL,
	}
	oauthTokenSource := oauth.TokenSource{TokenSource: config.TokenSource(ctx)}
	creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: false})
	opts := []grpc.DialOption{
		grpc.WithTransportCredentials(creds),
		grpc.WithPerRPCCredentials(oauthTokenSource),
		grpc.WithAuthority(host),
	}

	conn, err := grpc.DialContext(ctx, apiUrl, opts...)
	if err != nil {
		panic(err)
	}
	defer conn.Close()

	fmt.Printf("Connected to API...\n\n")

	integrationsApi := extensionsatlanintegrationsv1grpc.NewAtlanIntegrationServiceClient(conn)
	workflowsApi := extensionsatlanworkflowsv1grpc.NewAtlanWorkflowServiceClient(conn)

	// Requires valid integration created in 1_setup_integration.

	// Map Atlan connections to SYNQ integrations.
	// Use 2_fetch_atlan_resources to find visible connections.
	{
		_, err := workflowsApi.SetConnectionMappings(ctx, &extensionsatlanworkflowsv1.SetConnectionMappingsRequest{
			Mappings: []*extensionsatlanworkflowsv1.ConnectionMapping{
				{
					AtlanConnectionQualifiedName: "default/dbt/1",
					SynqDataPlatformIdentifier: &platformsv1.DataPlatformIdentifier{
						Id: &platformsv1.DataPlatformIdentifier_DbtCloud{
							DbtCloud: &platformsv1.DbtCloudIdentifier{
								ApiEndpoint: "cloud.getdbt.com",
								AccountId:   "1234",
								ProjectId:   "5678",
							},
						},
					},
				},
			},
		})
		if err != nil {
			panic(err)
		}
	}

	// Activate integration once you are ready to synchronize.
	// This runs a synchronization every 5 minutes.
	{
		_, err := integrationsApi.Activate(ctx, &extensionsatlanintegrationsv1.ActivateRequest{
			Activate: true,
		})
		if err != nil {
			panic(err)
		}
	}

	// You can also manually synchronize with atlan.
	// This creates the dataproducts and associated domains visible from Atlan into SYNQ
	{
		resp, err := workflowsApi.Synchronize(ctx, &extensionsatlanworkflowsv1.SynchronizeRequest{})
		if err != nil {
			panic(err)
		}
		b, _ := json.Marshal(resp.WorkflowRun)
		fmt.Printf("Synchronization result -> %s\n\n", string(b))
		if resp.HasErrors {
			panic("synchronization has errors")
		}
	}

	// Fetch mapped products and domains.
	{
		resp, err := workflowsApi.GetDomainMappings(ctx, &extensionsatlanworkflowsv1.GetDomainMappingsRequest{})
		if err != nil {
			panic(err)
		}
		b, _ := json.Marshal(resp.Mappings)
		fmt.Printf("Mapped domains -> %s\n\n", string(b))
	}
	{
		resp, err := workflowsApi.GetProductMappings(ctx, &extensionsatlanworkflowsv1.GetProductMappingsRequest{})
		if err != nil {
			panic(err)
		}
		b, _ := json.Marshal(resp.Mappings)
		fmt.Printf("Mapped products -> %s\n\n", string(b))
	}

	// Fetch synchronization runs history.
	{
		resp, err := workflowsApi.FetchRuns(ctx, &extensionsatlanworkflowsv1.FetchRunsRequest{
			From:  0,
			Limit: 10,
		})
		if err != nil {
			panic(err)
		}
		fmt.Printf("Found %d runs.\n", len(resp.WorkflowRuns))
		for _, run := range resp.WorkflowRuns {
			b, _ := json.Marshal(run)
			fmt.Printf(" -> [%+v] (%+v) %s\n", run.StartedAt, run.Status, string(b))
		}
	}
}

main.go

package main

import (
	extensionsatlanintegrationsv1grpc "buf.build/gen/go/getsynq/api/grpc/go/synq/extensions/atlan/integrations/v1/integrationsv1grpc"
	extensionsatlanintegrationsv1 "buf.build/gen/go/getsynq/api/protocolbuffers/go/synq/extensions/atlan/integrations/v1"
	"context"
	"crypto/tls"
	"fmt"
	"golang.org/x/oauth2/clientcredentials"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc/credentials/oauth"
	"os"
)

func main() {
	ctx := context.Background()

	host := "developer.synq.io"
	port := "443"
	apiUrl := fmt.Sprintf("%s:%s", host, port)

	clientID := os.Getenv("SYNQ_CLIENT_ID")
	clientSecret := os.Getenv("SYNQ_CLIENT_SECRET")
	tokenURL := fmt.Sprintf("https://%s/oauth2/token", host)

	config := &clientcredentials.Config{
		ClientID:     clientID,
		ClientSecret: clientSecret,
		TokenURL:     tokenURL,
	}
	oauthTokenSource := oauth.TokenSource{TokenSource: config.TokenSource(ctx)}
	creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: false})
	opts := []grpc.DialOption{
		grpc.WithTransportCredentials(creds),
		grpc.WithPerRPCCredentials(oauthTokenSource),
		grpc.WithAuthority(host),
	}

	conn, err := grpc.DialContext(ctx, apiUrl, opts...)
	if err != nil {
		panic(err)
	}
	defer conn.Close()

	fmt.Printf("Connected to API...\n\n")

	integrationsApi := extensionsatlanintegrationsv1grpc.NewAtlanIntegrationServiceClient(conn)

	// Deactivate integraton.
	// This stops the scheduled synchronization.
	{
		_, err := integrationsApi.Activate(ctx, &extensionsatlanintegrationsv1.ActivateRequest{
			Activate: false,
		})
		if err != nil {
			panic(err)
		}
		fmt.Println("Deactivated Atlan integration.")
	}

	// Optionally delete integration.
	{
		_, err := integrationsApi.Remove(ctx, &extensionsatlanintegrationsv1.RemoveRequest{})
		if err != nil {
			panic(err)
		}
		fmt.Println("Removed Atlan integration.")
	}

	// Note that the domains and products created on SYNQ by the synchronization
	// are *NOT* automatically removed. You can remove them from SYNQ if you wish to.
}