lamina.core documentation

async-promise?

fn

[x]

Returns true if x is a result.

atom-sink

fn

[channel]
[initial-value channel]

Transforms a channel into an atom which updated with the value of each new message.

cancel-callback

fn

[channel callback]

Cancels a callback registered with receive, receive-all, on-closed, on-drained, or on-error.

channel

fn

[& messages]

Returns a channel containing the given messages.

channel*

macro

[& {:keys [grounded? permanent? transactional? messages description meta], :as options}]

A general-purpose channel creator. Can be used to mix and match various properties, such as

(channel* :grounded? true, :description "my very own grounded channel")

:grounded? - ensures that messages cannot accumulate in the queue :permanent? - ensures that the channel cannot be closed or be put in an error state :transactional? - determines whether the channel's queue is transactional :messages - sequence of zero or more messages that will be in the channel's queue :description - a string that will be diplayed in channel visualizations :meta - initial metadata

channel->lazy-seq

fn

[ch]
[ch timeout]

Returns a sequence. As elements of the sequence are realized, messages from the source channel are consumed. If there are no messages are available to be consumed, execution will block until one is available.

A timeout can be defined, either as a number or a no-arg function that returns a number. Each time the seq must wait for a message to consume, it will only wait that many milliseconds before giving up and ending the sequence.

channel->seq

fn

[ch]
[ch timeout]

An eager variant of channel->lazy-seq. Blocks until the channel has been drained, or until timeout milliseconds have elapsed.

channel-pair

fn

[]

Returns a pair of channels, where all messages enqueued into one channel can be received by the other, and vice-versa. Closing one channel will automatically close the other.

channel?

fn

[x]

Returns true if x is a channel. This does not encompass result-channels.

close

fn

[channel]

Closes the channel. Returns if successful, false if channel is permanent, already closed, or in an error state.

close-on-idle

fn

[interval channel]
[interval task-queue channel]

Sets up a watcher which will close channel if it doesn't emit a message for interval milliseconds.

Returns channel, for chaining convenience.

closed-channel

fn

[& messages]

Returns a closed channel containing the given messages.

closed-result

fn

[channel]

Returns a result-channel that will emit a result when channel is closed, or emit an error if channel goes into an error state.

closed?

fn

[channel]

Returns true if channel is closed, false otherwise.

combine-latest

fn

[f & channels]

Given n-many channels and a function which takes n arguments, reevaluates the function for each new value emitted by one of the channels. Effectively a composition of zip-all and map*.

complete

fn

[value]

Returns a redirect signal which causes the pipeline's execution to stop, and simply return value. If value is a Throwable, then the pipeline will be realized as that error.

concat*

fn

[ch]

A dual to concat.

(concat* (channel [1 2] [2 3])) => [1 2 3 4]

defer-onto-queue

fn

[{:keys [timestamp task-queue auto-advance?]} ch]

Takes an input channel, a timestamp which takes each message and returns the associated time, and a task-queue. If auto-advance? is true, then enqueueing a message will automatically advance task-queue to that time.

The returned channel will emit each message from channel only once the designated time has arrived. This assumes the timestamp for each message is monotonically increasing.

distribute-aggregate

fn

[{:keys [facet generator period task-queue], :or {task-queue (t/task-queue), period (t/period)}} ch]

A mechanism similar to a SQL group by or the split-apply-combine strategy for data analysis.

For each message from channel, the value returned by (facet msg) will be examined, and the message will be routed to a channel that consumes all messages with that facet value. If no such channel exists, it will be generated by (generator facet-value facet-channel), which takes the facet-value and associated channel, and returns an output channel which will be merged with the output of all other facet channels.

The output of each facet channel is assumed to be periodic. The :period may be specified, but is not required.

Returns a channel which will periodically emit a map of facet-value onto the output of the generated facet-channel.

Example:

(distribute-aggregate {:facet :uri :generator (fn [uri ch] (rate ch))} ch)

will return a channel which periodically emits a map of the form

{"/abc" 2 "/def" 3}

distributor

fn

[{:keys [facet initializer on-clearance]}]

Returns a channel.

Messages enqueued into this channel are split into multiple streams, grouped by (facet msg). When a new facet-value is encountered, (initializer facet ch) is called, allowing messages with that facet-value to be handled appropriately.

If a facet channel is closed, it will be removed from the distributor, and a new channel will be generated when another message of that type is enqueued. This allows the use of (close-on-idle ch ...), if facet-values will change over time.

Given messages with the form {:type :foo, :value 1}, to print the mean values of all types:

(distributor {:facet :type :initializer (fn [facet-value ch] (siphon (->> ch (map* :value) moving-average) (sink #(println "average for" facet-value "is" %))))})

drained-result

fn

[channel]

Returns a result-channel that will emit a result when channel is drained, or emit an error if channel goes into an error state.

drained?

fn

[channel]

Returns true if channel is drained, false otherwise.

drop*

fn

[n ch]

A dual to drop.

(drop* 2 (closed-channel 1 2 3 4) => [3 4]

drop-while*

fn

[f ch]

A dual to drop-while.

(drop-while* pos? (closed-channel 1 2 0 4) => [0 4]

emit-in-order

fn

[ch]

Returns a channel that emits messages one at a time.

enqueue

fn

[channel]
[channel message]
[channel message & messages]

Enqueues the message or messages into the channel.

enqueue-and-close

fn

[ch & messages]

Enqueues the message or messages into the channel, and then closes the channel.

error

fn

[channel err]

Puts the channel or result-channel into an error state.

error-result

fn

[error]

Returns a result already realized with an error.

error-value

fn

[x default-value]

Returns the error-value of the channel or async-promise if it's in an error state, and 'default-value' otherwise

expiring-result

fn

[interval]
[interval task-queue]

Returns a result-channel that will be realized as a 'lamina/timeout!' error if a value is not enqueued within interval milliseconds.

filter*

fn

[f channel]

A dual to filter.

(filter* odd? (channel 1 2 3)) => [1 3]

force-close

fn

[channel]

Closes channel, even if it is permanent. Returns if successful, false if channel is already closed or in an error state.

force-error

fn

[channel err]

Puts the channel or result-channel into an error state, even if it's permanent.

fork

fn

[channel]

Returns a channel which is an exact duplicate of the source channel, containing all messages in the source channel's queue, and emitting all messages emitted by the source channel.

If the forked channel is closed, the source channel is unaffected. However, if the source channel is closed all forked channels are closed. Similar propagation rules apply to error states.

ground

fn

[ch]

Ensures that messages will not accumulate in the channel's queue.

grounded-channel

fn

[]

Returns a channel that cannot accumulate messages.

idle-result

fn

[interval channel]
[interval task-queue channel]

A result which will be realized if channel doesn't emit a message for interval milliseconds.

join

fn

[src dst]
[src dst & rest]

Takes all messages from src and forwards them to dst. If either channel closes or goes into an error state, the same is done for the other channel. This is useful for channels which have a 1-to-1 relationship.

If more than two channels are specified, join becomes transitive. (join a b c) is equivalent to (join a b) and (join b c).

join->>

macro

[& transforms+downstream-channel]

A variant of sink->> where the last argument is assumed to be a channel, and the transform chain is joined to it.

(join->> (map* inc) (map* dec) (named-channel :foo))

expands to

(let [ch (channel)] (join (->> ch (map* inc) (map* dec)) (named-channel :foo)) ch)

last*

fn

[ch]

A dual to last.

lazy-seq->channel

fn

[s]

Returns a channel representing the elements of the sequence.

map*

fn

[f channel]

A dual to map.

(map* inc (channel 1 2 3)) => [2 3 4]

mapcat*

fn

[f ch]

A dual to mapcat.

(mapcat* reverse (channel [1 2] [3 4])) => [2 1 4 3]

merge-channels

fn

[& channels]

Combines n-many streams into a single stream. The returned channel will be closed only once all source channels have been closed.

merge-results

fn

[& results]

Given n results returns a single async-promise which will be realized as a sequence of all the realized results.

named-channel

fn

[id on-create]

Returns a permanent channel keyed to id. If the channel doesn't already exist and on-create is non-nil, it will be invoked with the new channel as a parameter.

on-closed

fn

[channel callback]

Registers a callback that will be invoked with no arguments when channel is closed, or immediately if it has already been closed. callback will only be invoked once, and can be cancelled using cancel-callback.

on-drained

fn

[channel callback]

Registers a callback that will be invoked with no arguments when channel is drained, or immediately if it has already been drained. callback will only be invoked once, and can be cancelled using cancel-callback.

on-error

fn

[channel callback]

Registers a callback that will be called with the error when channel enters an error state, or immediately if it's already in an error state. callback will only be invoked once, and can be cancelled using cancel-callback.

on-realized

fn

[result-channel on-success on-error]

Allows two callbacks to be registered on a result-channel, one in the case of a value, the other in case of an error.

This often can and should be replaced by a pipeline.

partition*

fn

[n ch]
[n step ch]

A dual to partition.

(partition* 2 (channel 1 2 3)) => [[1 2]]

partition-all*

fn

[n ch]
[n step ch]

A dual to partition-all.

(partition-all* 2 (closed-channel 1 2 3)) => [[1 2] [3]]

partition-every

fn

[{:keys [period task-queue], :as options} ch]

Takes a source channel, and returns a channel that repeatedly emits a collection of all messages from the source channel in the last period milliseconds.

periodically

fn

[{:keys [task-queue immediate? period priority close-latch lazy?], :or {task-queue (t/task-queue), period (t/period), priority 0}} f]

Returns a channel. Every period milliseconds, f is invoked with no arguments and the value is emitted as a message.

permanent-channel

fn

[& messages]

Returns a channel which cannot be closed or put into an error state.

pipeline

macro

[& opts+stages]

A means for composing asynchronous functions. Returns a function which will pass the value into the first function, the result from that function into the second, and so on.

If any function returns an unrealized async-promise, the next function won't be called until that value is realized. The call into the pipeline itself returns an async-promise, which won't be realized until all functions have completed. If any function throws an exception or returns an async-promise that realizes as an error, this will short-circuit all calls to subsequent functions, and cause the pipeline's result to be realized as an error.

Loops and other more complex flows may be created if any stage returns a redirect signal by returning the result of invoking restart, redirect, or complete. See these functions for more details.

The first argument to pipeline may be a map of optional arguments:

:error-handler - a function which is called when an error occurs in the pipeline. Takes a single argument, the error, and may optionally return a redirect signal to prevent the pipeline from returning the error.

                 If no `:error-handler` is specified, the error will be logged.  If pipelines are nested, this may result
                 in the same error being logged multiple times.  To hide this error you may define a no-op handler, but
                 only do this if you're sure there's an outer pipeline that will handle/log the error.

:finally - a function which is called with zero arguments when the pipeline completes, either due to success or error.

:result - the result into which the pipeline's result will be forwarded. Causes the pipeline to not return any value.

:timeout - the max duration of the pipeline's invocation. If pipeline times out in the middle of a stage it won't terminate computation, but it will not continue onto the next stage.

:implicit? - describes whether the pipeline's execution should show up in higher-level instrumented functions calling into it. Defaults to false.

:unwrap? - if true, and the pipeline does not need to pause between streams, the pipeline will return an actual value rather than an async-promise.

:with-bindings - if true, conveys the binding context of the initial invocation of the pipeline into any deferred stages.

read-channel

fn

[channel]

Returns a result-channel representing the next message from the channel. Only one result-channel can represent any given message; calling (read-channel ...) multiple times will always consume multiple messages.

Enqueueing a value into the result-channel before it is realized will prevent the message from being consumed, effectively cancelling the read-channel call.

read-channel*

macro

[ch & {:keys [timeout predicate result listener-result on-timeout on-error on-false task-queue], :as options}]

A variant of read-channel with more options.

:timeout - the timeout, in milliseconds. If this elapses, the next message will not be consumed.

:predicate - a function that takes the message, and returns true if it should be consumed. If the predicate returns false, the returned result will realize as value defined by :on-false.

:result - the result that the read message should be enqueued into. If the same result is used for read-channel calls from multiple channels, this will have the effect of being realized as the first message from any of the channels, and not consuming any messages from the other channels.

:listener-result - the result that will be returned to the emitter of the message, representing the outcome of the consumption. This should only be done if there is a clear single outcome for this message (i.e. we're not just accumulating the entire stream.)

:on-timeout - the result that will be realized if we timed out. If not specified, the result will be realized as a :lamina/timeout error.

:on-error - the result that will be realized if the channel is in an error state. If not specified, the result will be realized as the channel's error.

:on-false - the result that will be realized if the :predicate returns false. Defaults to :lamina/false.

receive

fn

[channel callback]

Registers a callback that will be invoked with the next message enqueued into the channel, or the first message already in the queue. Only one callback can consume any given message; registering multiple callbacks will consume multiple messages.

This can be cancelled using cancel-callback.

receive-all

fn

[channel callback]

Registers a callback that will consume all messages currently in the queue, and all subsequent messages that are enqueued into channel.

This can be cancelled using cancel-callback.

receive-in-order

fn

[ch f]

Consumes messages from the source channel, passing them to f one at a time. If f returns a result-channel, consumption of the next message is deferred until it's realized.

If an exception is thrown or the return result is realized as an error, the source channel is put into an error state.

redirect

fn

[pipeline value]

Returns a redirect signal which causes the flow to start at the beginning of pipeline, with an input value of value. The outcome of this new pipeline will be forwarded into the result returned by the original pipeline.

reduce*

fn

[f ch]
[f val ch]

A dual to reduce. Returns a result-channel that emits the final reduced value when the source channel has been drained.

(reduce* max (channel 1 3 2)) => 3

reductions*

fn

[f ch]
[f val ch]

A dual to reductions.

(reductions* max (channel 1 3 2)) => [1 3 3]

remove*

fn

[f channel]

A dual to remove.

(remove* even? (channel 2 3 4)) => [3]

restart

fn

[]
[value]

A variant of redirect which redirects flow to the top of the current pipeline.

result-channel

fn

[]

Returns a result-channel, representing an unrealized value or error.

run-pipeline

macro

[value & opts+stages]

Like pipeline, but simply invokes the pipeline with value and returns the result.

sample-every

fn

[{:keys [period task-queue], :as options} ch]

Takes a source channel, and returns a channel that emits the most recent message from the source channel every period milliseconds.

sink

fn

[callback]

Creates a channel which will forward all messages to callback.

sink->>

macro

[& transforms+callback]

Creates a channel, pipes it through the ->> operator, and sends the resulting stream into the callback. This can be useful when defining :probes for an instrumented function, among other places.

(sink->> (map* inc) (map* dec) println)

expands to

(let [ch (channel)] (receive-all (->> ch (map* inc) (map* dec)) println) ch)

siphon

fn

[src dst]
[src dst & rest]

Takes all messages from src and forwards them to dst. If dst closes, src is closed, but not vise-versa. Error states are similarly propagated. This is useful for many transient channels feeding into one channel.

If more than two channels are specified, siphon becomes transitive. (siphon a b c) is equivalent to (siphon a b) and (siphon b c).

siphon->>

macro

[& transforms+downstream-channel]

A variant of sink->> where the last argument is assumed to be a channel, and the contents of the transform chain are siphoned into it.

(siphon->> (map* inc) (map* dec) (named-channel :foo))

expands to

(let [ch (channel)] (siphon (->> ch (map* inc) (map* dec)) (named-channel :foo)) ch)

splice

fn

[emitter receiver]

Returns a channel where all messages are enqueud into receiver, and consumed from emitter.

split

macro

[& downstream-channels]

Returns a channel which will forward each message to all downstream-channels. This can be used with sink->>, siphon->>, and join->> to define complex message flows:

(join->> (map* inc) (split (sink->> (filter* even?) log-even) (sink->> (filter* odd?) log-odd)))

success-result

fn

[value]

Returns a result already realized with a value.

take*

fn

[n ch]

A dual to take.

(take* 2 (channel 1 2 3)) => [1 2]

take-while*

fn

[f ch]

A dual to take-while.

(take-while* pos? (channel 1 2 0 4)) => [1 2]

tap

fn

[channel]

Behaves like fork, except that the source channel will not remain open if only the tap exists downstream.

If the tap channel is closed, the source channel is unaffected. However, if the source channel is closed all tap channels are closed. Similar propagation rules apply to error states.

timed-result

fn

[interval]
[interval value]
[interval value task-queue]

Returns a result-channel that will be realized as value (defaulting to nil) in interval milliseconds.

transactional?

fn

[channel]

Returns true if channel has a transactional queue, false otherwise.

transitions

fn

[ch]

Emits messages only when they differ from the preceding message.

wait-for-message

fn

[ch]
[ch timeout]

Blocks for the next message from the channel. If the timeout elapses without a message, throws a java.util.concurrent.TimeoutException.

wait-for-result

fn

[result-channel]
[result-channel timeout]

Waits for the result to be realized. If the timeout elapses without a value, throws a java.util.concurrent.TimeoutException.

wait-stage

macro

[interval]

Creates a pipeline stage which simply waits for interval milliseconds before continuing onto the next stage.

watch-channel

fn

[reference]

Transforms a watchable reference into a channel of values.

with-timeout

fn

[interval result]

Returns a new result that will mimic the original result, unless interval milliseconds elapse, in which case it will realize as a 'lamina/timeout!' error.

zip

fn

[channels]
[most-frequent? channels]

Emits a tuple containing the most recent message from all channels once a single message has been received from each channel.

zip-all

fn

[channels]

For each message from one of the streams in channels, emits a tuple containing the most recent message from all streams. In order for any tuple to be emitted, at least one message must have been emitted by all channels.