lamina.core documentation
atom-sink
fn
Transforms a channel
into an atom which updated with the value of each new message.
cancel-callback
fn
Cancels a callback registered with receive
, receive-all
, on-closed
, on-drained
, or on-error
.
channel*
macro
A general-purpose channel creator. Can be used to mix and match various properties, such as
(channel* :grounded? true, :description "my very own grounded channel")
:grounded? - ensures that messages cannot accumulate in the queue :permanent? - ensures that the channel cannot be closed or be put in an error state :transactional? - determines whether the channel's queue is transactional :messages - sequence of zero or more messages that will be in the channel's queue :description - a string that will be diplayed in channel visualizations :meta - initial metadata
channel->lazy-seq
fn
Returns a sequence. As elements of the sequence are realized, messages from the source channel are consumed. If there are no messages are available to be consumed, execution will block until one is available.
A timeout
can be defined, either as a number or a no-arg function that returns a
number. Each time the seq must wait for a message to consume, it will only wait
that many milliseconds before giving up and ending the sequence.
channel->seq
fn
An eager variant of channel->lazy-seq. Blocks until the channel has been drained,
or until timeout
milliseconds have elapsed.
channel-pair
fn
Returns a pair of channels, where all messages enqueued into one channel can be received by the other, and vice-versa. Closing one channel will automatically close the other.
close
fn
Closes the channel
. Returns if successful, false if channel
is permanent, already closed,
or in an error state.
close-on-idle
fn
Sets up a watcher which will close channel
if it doesn't emit a message for interval
milliseconds.
Returns channel
, for chaining convenience.
closed-result
fn
Returns a result-channel that will emit a result when channel
is closed, or emit an error
if channel
goes into an error state.
combine-latest
fn
Given n-many channels
and a function which takes n arguments, reevaluates the function for each new
value emitted by one of the channels. Effectively a composition of zip-all
and map*
.
complete
fn
Returns a redirect signal which causes the pipeline's execution to stop, and simply return value
. If value
is a Throwable
, then the pipeline will be realized as that error.
defer-onto-queue
fn
Takes an input channel
, a timestamp
which takes each message and returns the associated time,
and a task-queue
. If auto-advance?
is true, then enqueueing a message will automatically
advance task-queue
to that time.
The returned channel will emit each message from channel
only once the designated time has arrived.
This assumes the timestamp for each message is monotonically increasing.
distribute-aggregate
fn
A mechanism similar to a SQL group by
or the split-apply-combine strategy for data analysis.
For each message from channel
, the value returned by (facet msg)
will be examined, and the
message will be routed to a channel that consumes all messages with that facet value. If no
such channel exists, it will be generated by (generator facet-value facet-channel)
, which takes
the facet-value and associated channel, and returns an output channel which will be merged with
the output of all other facet channels.
The output of each facet channel is assumed to be periodic. The :period
may be specified, but
is not required.
Returns a channel which will periodically emit a map of facet-value onto the output of the generated facet-channel.
Example:
(distribute-aggregate {:facet :uri :generator (fn [uri ch] (rate ch))} ch)
will return a channel which periodically emits a map of the form
{"/abc" 2 "/def" 3}
distributor
fn
Returns a channel.
Messages enqueued into this channel are split into multiple streams, grouped by (facet msg). When a new facet-value is encountered, (initializer facet ch) is called, allowing messages with that facet-value to be handled appropriately.
If a facet channel is closed, it will be removed from the distributor, and a new channel will be generated when another message of that type is enqueued. This allows the use of (close-on-idle ch ...), if facet-values will change over time.
Given messages with the form {:type :foo, :value 1}, to print the mean values of all types:
(distributor {:facet :type :initializer (fn [facet-value ch] (siphon (->> ch (map* :value) moving-average) (sink #(println "average for" facet-value "is" %))))})
drained-result
fn
Returns a result-channel that will emit a result when channel
is drained, or emit an error
if channel
goes into an error state.
enqueue
fn
Enqueues the message or messages into the channel.
enqueue-and-close
fn
Enqueues the message or messages into the channel, and then closes the channel.
error-value
fn
Returns the error-value of the channel or async-promise if it's in an error state, and 'default-value' otherwise
expiring-result
fn
Returns a result-channel that will be realized as a 'lamina/timeout!' error if a value is not enqueued within
interval
milliseconds.
force-close
fn
Closes channel
, even if it is permanent. Returns if successful, false if channel
is
already closed or in an error state.
force-error
fn
Puts the channel or result-channel into an error state, even if it's permanent.
fork
fn
Returns a channel which is an exact duplicate of the source channel, containing all messages in the source channel's queue, and emitting all messages emitted by the source channel.
If the forked channel is closed, the source channel is unaffected. However, if the source channel is closed all forked channels are closed. Similar propagation rules apply to error states.
idle-result
fn
A result which will be realized if channel
doesn't emit a message for interval
milliseconds.
join
fn
Takes all messages from src
and forwards them to dst
. If either channel closes or goes into an
error state, the same is done for the other channel. This is useful for channels which have a 1-to-1
relationship.
If more than two channels are specified, join
becomes transitive. (join a b c)
is equivalent to
(join a b)
and (join b c)
.
join->>
macro
A variant of sink->> where the last argument is assumed to be a channel, and the transform chain is joined to it.
(join->> (map* inc) (map* dec) (named-channel :foo))
expands to
(let [ch (channel)] (join (->> ch (map* inc) (map* dec)) (named-channel :foo)) ch)
merge-channels
fn
Combines n-many streams into a single stream. The returned channel will be closed only once all source channels have been closed.
merge-results
fn
Given n results
returns a single async-promise which will be realized as a sequence of all the realized
results.
named-channel
fn
Returns a permanent channel keyed to id
. If the channel doesn't already exist and on-create
is non-nil,
it will be invoked with the new channel as a parameter.
on-closed
fn
Registers a callback that will be invoked with no arguments when channel
is closed, or
immediately if it has already been closed. callback
will only be invoked once, and can
be cancelled using cancel-callback
.
on-drained
fn
Registers a callback that will be invoked with no arguments when channel
is drained, or
immediately if it has already been drained. callback
will only be invoked once, and can
be cancelled using cancel-callback
.
on-error
fn
Registers a callback that will be called with the error when channel
enters an error state,
or immediately if it's already in an error state. callback
will only be invoked once,
and can be cancelled using cancel-callback
.
on-realized
fn
Allows two callbacks to be registered on a result-channel, one in the case of a value, the other in case of an error.
This often can and should be replaced by a pipeline.
partition-all*
fn
A dual to partition-all.
(partition-all* 2 (closed-channel 1 2 3)) => [[1 2] [3]]
partition-every
fn
Takes a source channel, and returns a channel that repeatedly emits a collection
of all messages from the source channel in the last period
milliseconds.
periodically
fn
Returns a channel. Every period
milliseconds, f
is invoked with no arguments
and the value is emitted as a message.
permanent-channel
fn
Returns a channel which cannot be closed or put into an error state.
pipeline
macro
A means for composing asynchronous functions. Returns a function which will pass the value into the first function, the result from that function into the second, and so on.
If any function returns an unrealized async-promise, the next function won't be called until that value is realized. The call into the pipeline itself returns an async-promise, which won't be realized until all functions have completed. If any function throws an exception or returns an async-promise that realizes as an error, this will short-circuit all calls to subsequent functions, and cause the pipeline's result to be realized as an error.
Loops and other more complex flows may be created if any stage returns a redirect signal by returning the result of
invoking restart
, redirect
, or complete
. See these functions for more details.
The first argument to pipeline
may be a map of optional arguments:
:error-handler
- a function which is called when an error occurs in the pipeline. Takes a single argument, the error
,
and may optionally return a redirect signal to prevent the pipeline from returning the error.
If no `:error-handler` is specified, the error will be logged. If pipelines are nested, this may result
in the same error being logged multiple times. To hide this error you may define a no-op handler, but
only do this if you're sure there's an outer pipeline that will handle/log the error.
:finally
- a function which is called with zero arguments when the pipeline completes, either due to success or error.
:result
- the result into which the pipeline's result will be forwarded. Causes the pipeline to not return any value.
:timeout
- the max duration of the pipeline's invocation. If pipeline times out in the middle of a stage it won't terminate
computation, but it will not continue onto the next stage.
:implicit?
- describes whether the pipeline's execution should show up in higher-level instrumented functions calling into it.
Defaults to false.
:unwrap?
- if true, and the pipeline does not need to pause between streams, the pipeline will return an actual value
rather than an async-promise.
:with-bindings
- if true, conveys the binding context of the initial invocation of the pipeline into any deferred stages.
read-channel
fn
Returns a result-channel representing the next message from the channel. Only one
result-channel can represent any given message; calling (read-channel ...)
multiple times
will always consume multiple messages.
Enqueueing a value into the result-channel before it is realized will prevent the message
from being consumed, effectively cancelling the read-channel
call.
read-channel*
macro
A variant of read-channel
with more options.
:timeout
- the timeout, in milliseconds. If this elapses, the next message will not be consumed.
:predicate
- a function that takes the message, and returns true if it should be consumed. If the
predicate returns false, the returned result will realize as value defined by :on-false
.
:result
- the result that the read message should be enqueued into. If the same result is used for
read-channel
calls from multiple channels, this will have the effect of being realized
as the first message from any of the channels, and not consuming any messages from the other
channels.
:listener-result
- the result that will be returned to the emitter of the message, representing the
outcome of the consumption. This should only be done if there is a clear single
outcome for this message (i.e. we're not just accumulating the entire stream.)
:on-timeout
- the result that will be realized if we timed out. If not specified, the result will be
realized as a :lamina/timeout
error.
:on-error
- the result that will be realized if the channel is in an error state. If not specified,
the result will be realized as the channel's error.
:on-false
- the result that will be realized if the :predicate
returns false. Defaults to :lamina/false
.
receive
fn
Registers a callback that will be invoked with the next message enqueued into the channel, or the first message already in the queue. Only one callback can consume any given message; registering multiple callbacks will consume multiple messages.
This can be cancelled using cancel-callback.
receive-all
fn
Registers a callback
that will consume all messages currently in the queue, and all
subsequent messages that are enqueued into channel
.
This can be cancelled using cancel-callback
.
receive-in-order
fn
Consumes messages from the source channel, passing them to f
one at a time. If
f
returns a result-channel, consumption of the next message is deferred until
it's realized.
If an exception is thrown or the return result is realized as an error, the source channel is put into an error state.
redirect
fn
Returns a redirect signal which causes the flow to start at the beginning of pipeline
, with an input
value of value
. The outcome of this new pipeline
will be forwarded into the result returned by the
original pipeline.
reduce*
fn
A dual to reduce. Returns a result-channel that emits the final reduced value when the source channel has been drained.
(reduce* max (channel 1 3 2)) => 3
run-pipeline
macro
Like pipeline
, but simply invokes the pipeline with value
and returns the result.
sample-every
fn
Takes a source channel, and returns a channel that emits the most recent message
from the source channel every period
milliseconds.
sink->>
macro
Creates a channel, pipes it through the ->> operator, and sends the resulting stream into the callback. This can be useful when defining :probes for an instrumented function, among other places.
(sink->> (map* inc) (map* dec) println)
expands to
(let [ch (channel)] (receive-all (->> ch (map* inc) (map* dec)) println) ch)
siphon
fn
Takes all messages from src
and forwards them to dst
. If dst
closes, src
is closed, but
not vise-versa. Error states are similarly propagated. This is useful for many transient channels
feeding into one channel.
If more than two channels are specified, siphon
becomes transitive. (siphon a b c)
is equivalent to
(siphon a b)
and (siphon b c)
.
siphon->>
macro
A variant of sink->> where the last argument is assumed to be a channel, and the contents of the transform chain are siphoned into it.
(siphon->> (map* inc) (map* dec) (named-channel :foo))
expands to
(let [ch (channel)] (siphon (->> ch (map* inc) (map* dec)) (named-channel :foo)) ch)
splice
fn
Returns a channel where all messages are enqueud into receiver
, and
consumed from emitter
.
split
macro
Returns a channel which will forward each message to all downstream-channels. This can be used with sink->>, siphon->>, and join->> to define complex message flows:
(join->> (map* inc) (split (sink->> (filter* even?) log-even) (sink->> (filter* odd?) log-odd)))
tap
fn
Behaves like fork
, except that the source channel will not remain open if only the tap
exists downstream.
If the tap channel is closed, the source channel is unaffected. However, if the source channel is closed all tap channels are closed. Similar propagation rules apply to error states.
timed-result
fn
Returns a result-channel that will be realized as value
(defaulting to nil) in interval
milliseconds.
wait-for-message
fn
Blocks for the next message from the channel. If the timeout elapses without a message, throws a java.util.concurrent.TimeoutException.
wait-for-result
fn
Waits for the result to be realized. If the timeout elapses without a value, throws a java.util.concurrent.TimeoutException.
wait-stage
macro
Creates a pipeline stage which simply waits for interval
milliseconds before continuing onto the next stage.
with-timeout
fn
Returns a new result that will mimic the original result, unless interval
milliseconds elapse, in which
case it will realize as a 'lamina/timeout!' error.
zip
fn
Emits a tuple containing the most recent message from all channels
once a single message has been received
from each channel.
zip-all
fn
For each message from one of the streams in channels
, emits a tuple containing the most recent message
from all streams. In order for any tuple to be emitted, at least one message must have been emitted by
all channels.