api

package
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 19, 2025 License: GPL-3.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

Functions

This section is empty.

Types

type APIConnection

type APIConnection struct {
	Spec    Spec
	State   *APIState
	Context *g.Context
	// contains filtered or unexported fields
}

func NewAPIConnection

func NewAPIConnection(ctx context.Context, spec Spec, data map[string]any) (ac *APIConnection, err error)

NewAPIConnection creates an

func (*APIConnection) Authenticate

func (ac *APIConnection) Authenticate() (err error)

Authenticate performs the auth workflow if needed. Like a Connect step. Header based auths (such as Basic, or Bearer) don't need this step save payload in APIState.Auth

func (*APIConnection) Close

func (ac *APIConnection) Close() error

Close performs cleanup of all resources

func (*APIConnection) CloseAllQueues

func (ac *APIConnection) CloseAllQueues()

CloseAllQueues closes all queues associated with this connection

func (*APIConnection) EnsureAuthenticated added in v1.4.14

func (ac *APIConnection) EnsureAuthenticated() error

EnsureAuthenticated checks if authentication is valid and re-authenticates if needed This method ensures thread-safe authentication checks and re-authentication

func (*APIConnection) GetQueue

func (ac *APIConnection) GetQueue(name string) (*iop.Queue, bool)

GetQueue retrieves a queue by name

func (*APIConnection) GetSyncedState

func (ac *APIConnection) GetSyncedState(endpointName string) (data map[string]map[string]any, err error)

GetSyncedState cycles through each endpoint, and collects the values for each of the Endpoint.Sync values. Output is a map[Endpoint.Name]map[Sync.value] = Endpoint.syncMap[Sync.value]

func (*APIConnection) IsAuthExpired added in v1.4.14

func (ac *APIConnection) IsAuthExpired() bool

IsAuthExpired checks if the authentication has expired

func (*APIConnection) ListEndpoints

func (ac *APIConnection) ListEndpoints(patterns ...string) (endpoints Endpoints, err error)

func (*APIConnection) MakeDynamicEndpointIterator

func (ac *APIConnection) MakeDynamicEndpointIterator(iter *Iterate) (err error)

func (*APIConnection) PutSyncedState

func (ac *APIConnection) PutSyncedState(endpointName string, data map[string]map[string]any) (err error)

PutSyncedState restores the state from previous run in each endpoint using the Endpoint.Sync values. Inputs is map[Endpoint.Name]map[Sync.value] = Endpoint.syncMap[Sync.value]

func (*APIConnection) ReadDataflow

func (ac *APIConnection) ReadDataflow(endpointName string, sCfg APIStreamConfig) (df *iop.Dataflow, err error)

func (*APIConnection) RegisterQueue

func (ac *APIConnection) RegisterQueue(name string) (*iop.Queue, error)

RegisterQueue creates a new queue with the given name If a queue with the same name already exists, it is returned

func (*APIConnection) RemoveQueue

func (ac *APIConnection) RemoveQueue(name string) error

RemoveQueue closes and removes a queue

func (*APIConnection) RenderDynamicEndpoints added in v1.4.14

func (ac *APIConnection) RenderDynamicEndpoints() (err error)

RenderDynamicEndpoints will render the dynamic objects basically mutating the spec endpoints. Needs to authenticate first

type APIState

type APIState struct {
	Env     map[string]string     `json:"env,omitempty"`
	State   map[string]any        `json:"state,omitempty"`
	Secrets map[string]any        `json:"secrets,omitempty"`
	Queues  map[string]*iop.Queue `json:"queues,omitempty"` // appends to file
	Auth    APIStateAuth          `json:"auth,omitempty"`
}

type APIStateAuth

type APIStateAuth struct {
	Authenticated bool              `json:"authenticated,omitempty"`
	Token         string            `json:"token,omitempty"`      // refresh token?
	Headers       map[string]string `json:"-"`                    // to inject
	ExpiresAt     int64             `json:"expires_at,omitempty"` // Unix timestamp when auth expires

	Sign  func(context.Context, *http.Request, []byte) error `json:"-"`          // for AWS Sigv4
	Mutex *sync.Mutex                                        `json:"-" yaml:"-"` // Mutex for auth operations
}

type APIStreamConfig

type APIStreamConfig struct {
	Flatten     int // levels of flattening. 0 is infinite
	JmesPath    string
	Select      []string // select specific columns
	Limit       int
	Metadata    iop.Metadata
	Mode        string
	DsConfigMap map[string]any // stream processor options
}

type AggregateState

type AggregateState struct {
	// contains filtered or unexported fields
}

AggregateState stores aggregated values during response processing

type AggregationType

type AggregationType string
const (
	AggregationTypeNone    AggregationType = ""        // No aggregation, apply transformation at record level
	AggregationTypeMaximum AggregationType = "maximum" // Keep the maximum value across records
	AggregationTypeMinimum AggregationType = "minimum" // Keep the minimum value across records
	AggregationTypeFlatten AggregationType = "flatten" // Collect all values into an array
	AggregationTypeFirst   AggregationType = "first"   // Keep only the first encountered value
	AggregationTypeLast    AggregationType = "last"    // Keep only the last encountered value
)

type AuthType

type AuthType string
const (
	AuthTypeNone     AuthType = ""
	AuthTypeSequence AuthType = "sequence"
	AuthTypeBasic    AuthType = "basic"
	AuthTypeOAuth2   AuthType = "oauth2"
	AuthTypeAWSSigV4 AuthType = "aws-sigv4"
)

type Authentication

type Authentication struct {
	Type AuthType `yaml:"type" json:"type"`

	// when set, re-auth after number of seconds
	Expires int `yaml:"expires" json:"expires,omitempty"`

	// custom authentication workflow
	Sequence Sequence `yaml:"sequence" json:"sequence,omitempty"`

	// Basic Auth
	Username string `yaml:"username,omitempty" json:"username,omitempty"`
	Password string `yaml:"password,omitempty" json:"password,omitempty"`

	// OAuth
	Flow              OAuthFlow `yaml:"flow,omitempty" json:"flow,omitempty"`
	AuthenticationURL string    `yaml:"authentication_url,omitempty" json:"authentication_url,omitempty"`
	ClientID          string    `yaml:"client_id,omitempty" json:"client_id,omitempty"`
	ClientSecret      string    `yaml:"client_secret,omitempty" json:"client_secret,omitempty"`
	Token             string    `yaml:"token,omitempty" json:"token,omitempty"`
	Scopes            []string  `yaml:"scopes,omitempty" json:"scopes,omitempty"`
	RedirectURI       string    `yaml:"redirect_uri,omitempty" json:"redirect_uri,omitempty"`
	RefreshToken      string    `yaml:"refresh_token,omitempty" json:"refresh_token,omitempty"`
	RefreshOnExpire   bool      `yaml:"refresh_on_expire,omitempty" json:"refresh_on_expire,omitempty"`

	// AWS
	AwsService         string `yaml:"aws_service,omitempty" json:"aws_service,omitempty"`
	AwsAccessKeyID     string `yaml:"aws_access_key_id,omitempty" json:"aws_access_key_id,omitempty"`
	AwsSecretAccessKey string `yaml:"aws_secret_access_key,omitempty" json:"aws_secret_access_key,omitempty"`
	AwsSessionToken    string `yaml:"aws_session_token,omitempty" json:"aws_session_token,omitempty"`
	AwsRegion          string `yaml:"aws_region,omitempty" json:"aws_region,omitempty"`
	AwsProfile         string `yaml:"aws_profile,omitempty" json:"aws_profile,omitempty"`
}

Authentication defines how to authenticate with the API

type BackoffType

type BackoffType string
const (
	BackoffTypeNone        BackoffType = ""            // No delay between retries
	BackoffTypeConstant    BackoffType = "constant"    // Fixed delay between retries
	BackoffTypeLinear      BackoffType = "linear"      // Delay increases linearly with each attempt
	BackoffTypeExponential BackoffType = "exponential" // Delay increases exponentially (common pattern)
	BackoffTypeJitter      BackoffType = "jitter"      // Exponential backoff with randomization to avoid thundering herd
)

type Call

type Call struct {
	If         string     `yaml:"if" json:"if"`
	Request    Request    `yaml:"request" json:"request"`
	Pagination Pagination `yaml:"pagination" json:"pagination"`
	Response   Response   `yaml:"response" json:"response"`
}

type DynamicEndpoint added in v1.4.14

type DynamicEndpoint struct {
	Setup    Sequence `yaml:"setup" json:"setup"`
	Iterate  string   `yaml:"iterate" json:"iterate"`
	Into     string   `yaml:"into" json:"into"`
	Endpoint Endpoint `yaml:"endpoint" json:"endpoint"`
}

type DynamicEndpoints added in v1.4.14

type DynamicEndpoints []DynamicEndpoint

type Endpoint

type Endpoint struct {
	Name        string     `yaml:"name" json:"name"`
	Description string     `yaml:"description" json:"description,omitempty"`
	Docs        string     `yaml:"docs" json:"docs,omitempty"`
	Disabled    bool       `yaml:"disabled" json:"disabled"`
	State       StateMap   `yaml:"state" json:"state"`
	Sync        []string   `yaml:"sync" json:"sync,omitempty"`
	Request     Request    `yaml:"request" json:"request"`
	Pagination  Pagination `yaml:"pagination" json:"pagination"`
	Response    Response   `yaml:"response" json:"response"`
	Iterate     Iterate    `yaml:"iterate" json:"iterate,omitempty"` // state expression to use to loop
	Setup       Sequence   `yaml:"setup" json:"setup,omitempty"`
	Teardown    Sequence   `yaml:"teardown" json:"teardown,omitempty"`
	// contains filtered or unexported fields
}

Endpoint is the top-level configuration structure

func (*Endpoint) SetStateVal

func (ep *Endpoint) SetStateVal(key string, val any)

type EndpointMap

type EndpointMap map[string]Endpoint

Endpoints is a collection of API endpoints

type Endpoints

type Endpoints []Endpoint

func (Endpoints) Sort

func (eps Endpoints) Sort()

type HTTPMethod

type HTTPMethod string
const (
	MethodGet     HTTPMethod = "GET"
	MethodHead    HTTPMethod = "HEAD"
	MethodPost    HTTPMethod = "POST"
	MethodPut     HTTPMethod = "PUT"
	MethodPatch   HTTPMethod = "PATCH"
	MethodDelete  HTTPMethod = "DELETE"
	MethodConnect HTTPMethod = "CONNECT"
	MethodOptions HTTPMethod = "OPTIONS"
	MethodTrace   HTTPMethod = "TRACE"
)

type Iterate

type Iterate struct {
	Over        any    `yaml:"over" json:"iterate,omitempty"` // expression
	Into        string `yaml:"into" json:"into,omitempty"`    // state variable
	If          string `yaml:"id" json:"id,omitempty"`        // if we should iterate
	Concurrency int    `yaml:"concurrency" json:"concurrency,omitempty"`
	// contains filtered or unexported fields
}

Iterate is for configuring looping values for requests

type Iteration

type Iteration struct {
	// contains filtered or unexported fields
}

func (*Iteration) DetermineStateRenderOrder added in v1.4.14

func (iter *Iteration) DetermineStateRenderOrder() (order []string, err error)

type OAuthFlow added in v1.4.14

type OAuthFlow string
const (
	OAuthFlowClientCredentials OAuthFlow = "client_credentials"
	OAuthFlowAuthorizationCode OAuthFlow = "authorization_code"
	OAuthFlowPassword          OAuthFlow = "password"
	OAuthFlowRefreshToken      OAuthFlow = "refresh_token"
)

type Pagination

type Pagination struct {
	NextState     map[string]any `yaml:"next_state" json:"next_state,omitempty"`
	StopCondition string         `yaml:"stop_condition" json:"stop_condition,omitempty"`
}

Pagination configures how to navigate through multiple pages of API results

type Processor

type Processor struct {
	Aggregation AggregationType `yaml:"aggregation" json:"aggregation"`
	Expression  string          `yaml:"expression" json:"expression"`
	Output      string          `yaml:"output" json:"output"`
}

Processor represents a way to process data without aggregation, represents a transformation applied at record level with aggregation to reduce/aggregate record data, and save into the state

type Records

type Records struct {
	JmesPath   string   `yaml:"jmespath" json:"jmespath,omitempty"` // for json or xml
	PrimaryKey []string `yaml:"primary_key" json:"primary_key,omitempty"`
	UpdateKey  string   `yaml:"update_key" json:"update_key,omitempty"`
	Limit      int      `yaml:"limit" json:"limit,omitempty"` // to limit the records, useful for testing

	DuplicateTolerance string `yaml:"duplicate_tolerance" json:"duplicate_tolerance,omitempty"`
}

Records configures how to extract and process data records from a response

type Request

type Request struct {
	URL         string         `yaml:"url" json:"url,omitempty"`
	Timeout     int            `yaml:"timeout" json:"timeout,omitempty"`
	Method      HTTPMethod     `yaml:"method" json:"method,omitempty"`
	Headers     map[string]any `yaml:"headers" json:"headers,omitempty"`
	Parameters  map[string]any `yaml:"parameters" json:"parameters,omitempty"`
	Payload     any            `yaml:"payload" json:"payload,omitempty"`
	Rate        float64        `yaml:"rate" json:"rate,omitempty"`               // maximum request per second
	Concurrency int            `yaml:"concurrency" json:"concurrency,omitempty"` // maximum concurrent requests
}

Request defines how to construct an HTTP request to the API

type RequestState

type RequestState struct {
	Method   string         `yaml:"method" json:"method"`
	URL      string         `yaml:"url" json:"url"`
	Headers  map[string]any `yaml:"headers" json:"headers"`
	Payload  any            `yaml:"payload" json:"payload"`
	Attempts int            `yaml:"attempts" json:"attempts"`
}

RequestState captures the state of the HTTP request for reference and debugging

type Response

type Response struct {
	Format     dbio.FileType `yaml:"format" json:"format,omitempty"` // force response format
	Records    Records       `yaml:"records" json:"records"`
	Processors []Processor   `yaml:"processors" json:"processors,omitempty"`
	Rules      []Rule        `yaml:"rules" json:"rules,omitempty"`
}

Response defines how to process the API response and extract records

type ResponseState

type ResponseState struct {
	Status  int            `yaml:"status" json:"status"`
	Headers map[string]any `yaml:"headers" json:"headers"`
	Text    string         `yaml:"text" json:"text"`
	JSON    any            `yaml:"json" json:"json"`
	Records []any          `yaml:"records" json:"records"`
}

ResponseState captures the state of the HTTP response for reference and debugging

type Rule

type Rule struct {
	Action      RuleType    `yaml:"action" json:"action"`
	Condition   string      `yaml:"condition" json:"condition"` // an expression
	MaxAttempts int         `yaml:"max_attempts" json:"max_attempts"`
	Backoff     BackoffType `yaml:"backoff" json:"backoff"`
	BackoffBase int         `yaml:"backoff_base" json:"backoff_base"` // base duration, number of seconds. default is 1
	Message     string      `yaml:"message" json:"message"`
}

Rule represents a response rule

type RuleType

type RuleType string
const (
	RuleTypeRetry    RuleType = "retry"    // Retry the request up to MaxAttempts times
	RuleTypeContinue RuleType = "continue" // Continue processing responses and rules
	RuleTypeStop     RuleType = "stop"     // Stop processing requests for this endpoint
	RuleTypeFail     RuleType = "fail"     // Stop processing and return an error
)

type Sequence added in v1.4.14

type Sequence []Call

Sequence is many calls (perfect for async jobs, custom auth)

type SingleRequest

type SingleRequest struct {
	Request   *RequestState  `yaml:"request" json:"request"`
	Response  *ResponseState `yaml:"response" json:"response"`
	Aggregate AggregateState `yaml:"-" json:"-"`
	// contains filtered or unexported fields
}

SingleRequest represents a single HTTP request/response cycle

func NewSingleRequest

func NewSingleRequest(iter *Iteration) *SingleRequest

func (*SingleRequest) Debug

func (lrs *SingleRequest) Debug(text string, args ...any)

func (*SingleRequest) Map

func (lrs *SingleRequest) Map() map[string]any

func (*SingleRequest) Records

func (lrs *SingleRequest) Records() []any

func (*SingleRequest) Trace added in v1.4.6

func (lrs *SingleRequest) Trace(text string, args ...any)

type Spec

type Spec struct {
	Name             string           `yaml:"name" json:"name"`
	Description      string           `yaml:"description" json:"description"`
	Queues           []string         `yaml:"queues" json:"queues"`
	Defaults         Endpoint         `yaml:"defaults" json:"defaults"`
	Authentication   Authentication   `yaml:"authentication" json:"authentication"`
	EndpointMap      EndpointMap      `yaml:"endpoints" json:"endpoints"`
	DynamicEndpoints DynamicEndpoints `yaml:"dynamic_endpoints" json:"dynamic_endpoints"`
	// contains filtered or unexported fields
}

Spec defines the complete API specification with endpoints and authentication

func LoadSpec

func LoadSpec(specBody string) (spec Spec, err error)

func (*Spec) IsDynamic

func (s *Spec) IsDynamic() bool

type StateMap

type StateMap map[string]any

StateMap stores the current state of an endpoint's execution

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL