Documentation
¶
Index ¶
- Variables
- type APIConnection
- func (ac *APIConnection) Authenticate() (err error)
- func (ac *APIConnection) Close() error
- func (ac *APIConnection) CloseAllQueues()
- func (ac *APIConnection) EnsureAuthenticated() error
- func (ac *APIConnection) GetQueue(name string) (*iop.Queue, bool)
- func (ac *APIConnection) GetSyncedState(endpointName string) (data map[string]map[string]any, err error)
- func (ac *APIConnection) IsAuthExpired() bool
- func (ac *APIConnection) ListEndpoints(patterns ...string) (endpoints Endpoints, err error)
- func (ac *APIConnection) MakeDynamicEndpointIterator(iter *Iterate) (err error)
- func (ac *APIConnection) PutSyncedState(endpointName string, data map[string]map[string]any) (err error)
- func (ac *APIConnection) ReadDataflow(endpointName string, sCfg APIStreamConfig) (df *iop.Dataflow, err error)
- func (ac *APIConnection) RegisterQueue(name string) (*iop.Queue, error)
- func (ac *APIConnection) RemoveQueue(name string) error
- func (ac *APIConnection) RenderDynamicEndpoints() (err error)
- type APIState
- type APIStateAuth
- type APIStreamConfig
- type AggregateState
- type AggregationType
- type AuthType
- type Authentication
- type BackoffType
- type Call
- type DynamicEndpoint
- type DynamicEndpoints
- type Endpoint
- type EndpointMap
- type Endpoints
- type HTTPMethod
- type Iterate
- type Iteration
- type OAuthFlow
- type Pagination
- type Processor
- type Records
- type Request
- type RequestState
- type Response
- type ResponseState
- type Rule
- type RuleType
- type Sequence
- type SingleRequest
- type Spec
- type StateMap
Constants ¶
This section is empty.
Variables ¶
var AggregationTypes = []AggregationType{ AggregationTypeNone, AggregationTypeMaximum, AggregationTypeMinimum, AggregationTypeFlatten, AggregationTypeFirst, AggregationTypeLast, }
Functions ¶
This section is empty.
Types ¶
type APIConnection ¶
type APIConnection struct { Spec Spec State *APIState Context *g.Context // contains filtered or unexported fields }
func NewAPIConnection ¶
func NewAPIConnection(ctx context.Context, spec Spec, data map[string]any) (ac *APIConnection, err error)
NewAPIConnection creates an
func (*APIConnection) Authenticate ¶
func (ac *APIConnection) Authenticate() (err error)
Authenticate performs the auth workflow if needed. Like a Connect step. Header based auths (such as Basic, or Bearer) don't need this step save payload in APIState.Auth
func (*APIConnection) Close ¶
func (ac *APIConnection) Close() error
Close performs cleanup of all resources
func (*APIConnection) CloseAllQueues ¶
func (ac *APIConnection) CloseAllQueues()
CloseAllQueues closes all queues associated with this connection
func (*APIConnection) EnsureAuthenticated ¶ added in v1.4.14
func (ac *APIConnection) EnsureAuthenticated() error
EnsureAuthenticated checks if authentication is valid and re-authenticates if needed This method ensures thread-safe authentication checks and re-authentication
func (*APIConnection) GetQueue ¶
func (ac *APIConnection) GetQueue(name string) (*iop.Queue, bool)
GetQueue retrieves a queue by name
func (*APIConnection) GetSyncedState ¶
func (ac *APIConnection) GetSyncedState(endpointName string) (data map[string]map[string]any, err error)
GetSyncedState cycles through each endpoint, and collects the values for each of the Endpoint.Sync values. Output is a map[Endpoint.Name]map[Sync.value] = Endpoint.syncMap[Sync.value]
func (*APIConnection) IsAuthExpired ¶ added in v1.4.14
func (ac *APIConnection) IsAuthExpired() bool
IsAuthExpired checks if the authentication has expired
func (*APIConnection) ListEndpoints ¶
func (ac *APIConnection) ListEndpoints(patterns ...string) (endpoints Endpoints, err error)
func (*APIConnection) MakeDynamicEndpointIterator ¶
func (ac *APIConnection) MakeDynamicEndpointIterator(iter *Iterate) (err error)
func (*APIConnection) PutSyncedState ¶
func (ac *APIConnection) PutSyncedState(endpointName string, data map[string]map[string]any) (err error)
PutSyncedState restores the state from previous run in each endpoint using the Endpoint.Sync values. Inputs is map[Endpoint.Name]map[Sync.value] = Endpoint.syncMap[Sync.value]
func (*APIConnection) ReadDataflow ¶
func (ac *APIConnection) ReadDataflow(endpointName string, sCfg APIStreamConfig) (df *iop.Dataflow, err error)
func (*APIConnection) RegisterQueue ¶
func (ac *APIConnection) RegisterQueue(name string) (*iop.Queue, error)
RegisterQueue creates a new queue with the given name If a queue with the same name already exists, it is returned
func (*APIConnection) RemoveQueue ¶
func (ac *APIConnection) RemoveQueue(name string) error
RemoveQueue closes and removes a queue
func (*APIConnection) RenderDynamicEndpoints ¶ added in v1.4.14
func (ac *APIConnection) RenderDynamicEndpoints() (err error)
RenderDynamicEndpoints will render the dynamic objects basically mutating the spec endpoints. Needs to authenticate first
type APIStateAuth ¶
type APIStateAuth struct { Authenticated bool `json:"authenticated,omitempty"` Token string `json:"token,omitempty"` // refresh token? Headers map[string]string `json:"-"` // to inject ExpiresAt int64 `json:"expires_at,omitempty"` // Unix timestamp when auth expires Sign func(context.Context, *http.Request, []byte) error `json:"-"` // for AWS Sigv4 Mutex *sync.Mutex `json:"-" yaml:"-"` // Mutex for auth operations }
type APIStreamConfig ¶
type AggregateState ¶
type AggregateState struct {
// contains filtered or unexported fields
}
AggregateState stores aggregated values during response processing
type AggregationType ¶
type AggregationType string
const ( AggregationTypeNone AggregationType = "" // No aggregation, apply transformation at record level AggregationTypeMaximum AggregationType = "maximum" // Keep the maximum value across records AggregationTypeMinimum AggregationType = "minimum" // Keep the minimum value across records AggregationTypeFlatten AggregationType = "flatten" // Collect all values into an array AggregationTypeFirst AggregationType = "first" // Keep only the first encountered value AggregationTypeLast AggregationType = "last" // Keep only the last encountered value )
type Authentication ¶
type Authentication struct { Type AuthType `yaml:"type" json:"type"` // when set, re-auth after number of seconds Expires int `yaml:"expires" json:"expires,omitempty"` // custom authentication workflow Sequence Sequence `yaml:"sequence" json:"sequence,omitempty"` // Basic Auth Username string `yaml:"username,omitempty" json:"username,omitempty"` Password string `yaml:"password,omitempty" json:"password,omitempty"` // OAuth Flow OAuthFlow `yaml:"flow,omitempty" json:"flow,omitempty"` AuthenticationURL string `yaml:"authentication_url,omitempty" json:"authentication_url,omitempty"` ClientID string `yaml:"client_id,omitempty" json:"client_id,omitempty"` ClientSecret string `yaml:"client_secret,omitempty" json:"client_secret,omitempty"` Token string `yaml:"token,omitempty" json:"token,omitempty"` Scopes []string `yaml:"scopes,omitempty" json:"scopes,omitempty"` RedirectURI string `yaml:"redirect_uri,omitempty" json:"redirect_uri,omitempty"` RefreshToken string `yaml:"refresh_token,omitempty" json:"refresh_token,omitempty"` RefreshOnExpire bool `yaml:"refresh_on_expire,omitempty" json:"refresh_on_expire,omitempty"` // AWS AwsService string `yaml:"aws_service,omitempty" json:"aws_service,omitempty"` AwsAccessKeyID string `yaml:"aws_access_key_id,omitempty" json:"aws_access_key_id,omitempty"` AwsSecretAccessKey string `yaml:"aws_secret_access_key,omitempty" json:"aws_secret_access_key,omitempty"` AwsSessionToken string `yaml:"aws_session_token,omitempty" json:"aws_session_token,omitempty"` AwsRegion string `yaml:"aws_region,omitempty" json:"aws_region,omitempty"` AwsProfile string `yaml:"aws_profile,omitempty" json:"aws_profile,omitempty"` }
Authentication defines how to authenticate with the API
type BackoffType ¶
type BackoffType string
const ( BackoffTypeNone BackoffType = "" // No delay between retries BackoffTypeConstant BackoffType = "constant" // Fixed delay between retries BackoffTypeLinear BackoffType = "linear" // Delay increases linearly with each attempt BackoffTypeExponential BackoffType = "exponential" // Delay increases exponentially (common pattern) BackoffTypeJitter BackoffType = "jitter" // Exponential backoff with randomization to avoid thundering herd )
type Call ¶
type Call struct { If string `yaml:"if" json:"if"` Request Request `yaml:"request" json:"request"` Pagination Pagination `yaml:"pagination" json:"pagination"` Response Response `yaml:"response" json:"response"` }
type DynamicEndpoint ¶ added in v1.4.14
type DynamicEndpoints ¶ added in v1.4.14
type DynamicEndpoints []DynamicEndpoint
type Endpoint ¶
type Endpoint struct { Name string `yaml:"name" json:"name"` Description string `yaml:"description" json:"description,omitempty"` Docs string `yaml:"docs" json:"docs,omitempty"` Disabled bool `yaml:"disabled" json:"disabled"` State StateMap `yaml:"state" json:"state"` Sync []string `yaml:"sync" json:"sync,omitempty"` Request Request `yaml:"request" json:"request"` Pagination Pagination `yaml:"pagination" json:"pagination"` Response Response `yaml:"response" json:"response"` Iterate Iterate `yaml:"iterate" json:"iterate,omitempty"` // state expression to use to loop Setup Sequence `yaml:"setup" json:"setup,omitempty"` Teardown Sequence `yaml:"teardown" json:"teardown,omitempty"` // contains filtered or unexported fields }
Endpoint is the top-level configuration structure
func (*Endpoint) SetStateVal ¶
type HTTPMethod ¶
type HTTPMethod string
const ( MethodGet HTTPMethod = "GET" MethodHead HTTPMethod = "HEAD" MethodPost HTTPMethod = "POST" MethodPut HTTPMethod = "PUT" MethodPatch HTTPMethod = "PATCH" MethodDelete HTTPMethod = "DELETE" MethodConnect HTTPMethod = "CONNECT" MethodOptions HTTPMethod = "OPTIONS" MethodTrace HTTPMethod = "TRACE" )
type Iterate ¶
type Iterate struct { Over any `yaml:"over" json:"iterate,omitempty"` // expression Into string `yaml:"into" json:"into,omitempty"` // state variable If string `yaml:"id" json:"id,omitempty"` // if we should iterate Concurrency int `yaml:"concurrency" json:"concurrency,omitempty"` // contains filtered or unexported fields }
Iterate is for configuring looping values for requests
type Iteration ¶
type Iteration struct {
// contains filtered or unexported fields
}
func (*Iteration) DetermineStateRenderOrder ¶ added in v1.4.14
type Pagination ¶
type Pagination struct { NextState map[string]any `yaml:"next_state" json:"next_state,omitempty"` StopCondition string `yaml:"stop_condition" json:"stop_condition,omitempty"` }
Pagination configures how to navigate through multiple pages of API results
type Processor ¶
type Processor struct { Aggregation AggregationType `yaml:"aggregation" json:"aggregation"` Expression string `yaml:"expression" json:"expression"` Output string `yaml:"output" json:"output"` }
Processor represents a way to process data without aggregation, represents a transformation applied at record level with aggregation to reduce/aggregate record data, and save into the state
type Records ¶
type Records struct { JmesPath string `yaml:"jmespath" json:"jmespath,omitempty"` // for json or xml PrimaryKey []string `yaml:"primary_key" json:"primary_key,omitempty"` UpdateKey string `yaml:"update_key" json:"update_key,omitempty"` Limit int `yaml:"limit" json:"limit,omitempty"` // to limit the records, useful for testing DuplicateTolerance string `yaml:"duplicate_tolerance" json:"duplicate_tolerance,omitempty"` }
Records configures how to extract and process data records from a response
type Request ¶
type Request struct { URL string `yaml:"url" json:"url,omitempty"` Timeout int `yaml:"timeout" json:"timeout,omitempty"` Method HTTPMethod `yaml:"method" json:"method,omitempty"` Headers map[string]any `yaml:"headers" json:"headers,omitempty"` Parameters map[string]any `yaml:"parameters" json:"parameters,omitempty"` Payload any `yaml:"payload" json:"payload,omitempty"` Rate float64 `yaml:"rate" json:"rate,omitempty"` // maximum request per second Concurrency int `yaml:"concurrency" json:"concurrency,omitempty"` // maximum concurrent requests }
Request defines how to construct an HTTP request to the API
type RequestState ¶
type RequestState struct { Method string `yaml:"method" json:"method"` URL string `yaml:"url" json:"url"` Headers map[string]any `yaml:"headers" json:"headers"` Payload any `yaml:"payload" json:"payload"` Attempts int `yaml:"attempts" json:"attempts"` }
RequestState captures the state of the HTTP request for reference and debugging
type Response ¶
type Response struct { Format dbio.FileType `yaml:"format" json:"format,omitempty"` // force response format Records Records `yaml:"records" json:"records"` Processors []Processor `yaml:"processors" json:"processors,omitempty"` Rules []Rule `yaml:"rules" json:"rules,omitempty"` }
Response defines how to process the API response and extract records
type ResponseState ¶
type ResponseState struct { Status int `yaml:"status" json:"status"` Headers map[string]any `yaml:"headers" json:"headers"` Text string `yaml:"text" json:"text"` JSON any `yaml:"json" json:"json"` Records []any `yaml:"records" json:"records"` }
ResponseState captures the state of the HTTP response for reference and debugging
type Rule ¶
type Rule struct { Action RuleType `yaml:"action" json:"action"` Condition string `yaml:"condition" json:"condition"` // an expression MaxAttempts int `yaml:"max_attempts" json:"max_attempts"` Backoff BackoffType `yaml:"backoff" json:"backoff"` BackoffBase int `yaml:"backoff_base" json:"backoff_base"` // base duration, number of seconds. default is 1 Message string `yaml:"message" json:"message"` }
Rule represents a response rule
type RuleType ¶
type RuleType string
const ( RuleTypeRetry RuleType = "retry" // Retry the request up to MaxAttempts times RuleTypeContinue RuleType = "continue" // Continue processing responses and rules RuleTypeStop RuleType = "stop" // Stop processing requests for this endpoint RuleTypeFail RuleType = "fail" // Stop processing and return an error )
type Sequence ¶ added in v1.4.14
type Sequence []Call
Sequence is many calls (perfect for async jobs, custom auth)
type SingleRequest ¶
type SingleRequest struct { Request *RequestState `yaml:"request" json:"request"` Response *ResponseState `yaml:"response" json:"response"` Aggregate AggregateState `yaml:"-" json:"-"` // contains filtered or unexported fields }
SingleRequest represents a single HTTP request/response cycle
func NewSingleRequest ¶
func NewSingleRequest(iter *Iteration) *SingleRequest
func (*SingleRequest) Debug ¶
func (lrs *SingleRequest) Debug(text string, args ...any)
func (*SingleRequest) Map ¶
func (lrs *SingleRequest) Map() map[string]any
func (*SingleRequest) Records ¶
func (lrs *SingleRequest) Records() []any
func (*SingleRequest) Trace ¶ added in v1.4.6
func (lrs *SingleRequest) Trace(text string, args ...any)
type Spec ¶
type Spec struct { Name string `yaml:"name" json:"name"` Description string `yaml:"description" json:"description"` Queues []string `yaml:"queues" json:"queues"` Defaults Endpoint `yaml:"defaults" json:"defaults"` Authentication Authentication `yaml:"authentication" json:"authentication"` EndpointMap EndpointMap `yaml:"endpoints" json:"endpoints"` DynamicEndpoints DynamicEndpoints `yaml:"dynamic_endpoints" json:"dynamic_endpoints"` // contains filtered or unexported fields }
Spec defines the complete API specification with endpoints and authentication