pgqueue.core
count
(count q)
Count the items in queue.
count-deleted
(count-deleted q)
Count the deleted items in queue.
These rows only exist when the :delete
behavior in pgqueue/queue's config is set
to false.
delete
(delete item)
Delete a PGQueueItem item from queue.
Delete behavior is controlled by the
queue config option :delete in pgqueue/queue.
If true, this actually deletes rows,
otherwise, it sets the "deleted" flag to true.
Returns boolean if a row was deleted.
usage: (delete item)
delete-and-unlock
(delete-and-unlock locked-item)
Delete and unlock a PGQueueLockedItem.
This is a convenience function wrapping
pgqueue/delete and pgqueue/unlock.
Returns boolean "and" of above functions.
usage: (delete-and-unlock locked-item)
destroy-all-queues!
(destroy-all-queues! config)
Drop the queue table, then unlock any existing advisory locks.
This function takes the same config hashmap used in pgqueue/queue.
destroy-queue!
(destroy-queue! q)
Unlocks any existing advisory locks for rows of this queue's
table and deletes all rows for the queue from the queue table.
locking-take
(locking-take q)
Lock and take item, returning a PGQueueLockedItem.
usage: (locking-take q)
example: (let [locked-item (pgqueue/locking-take q)]
; do some work here with item
(pgqueue/delete-and-unlock locked-item))
It is expected that pgqueue/delete and pgqueue/unlock
will later be called on the returned item and lock,
respectively.
See the pgqueue/take-with macro, which wraps up the
use case for takers doing work and then deleting
the item only after the work is safely completed.
locking-take-batch
(locking-take-batch q n)
Lock and take a batch of up to n items from queue.
Returns a sequence of PGQueueLockedItem instances.
usage: (locking-take-batch q n)
It is expected that pgqueue/delete and pgqueue/unlock
will later be called on each of the items and locks
in the PGQeueuLockedItems returned.
purge-deleted
(purge-deleted q)
Purge deleted rows for the given queue.
These rows only exist when the :delete
behavior in pgqueue/queue's config is set
to false.
Returns number of rows deleted.
put
(put q item)
(put q priority item)
Put item onto queue.
usage: (put q item)
(put q priority item)
Returns true on success, false on failure, nil on no-op.
item can be any serializable Clojure data.
When item is nil, put is a no-op and returns nil.
For arity of 2, a default priority is used.
For arity of 3, the second argument is a priority integer
where a lower value = higher priority; negative integers
are also accepted.
Examples:
(pgq/put q -10 "urgent")
(pgq/put q 1 "high")
(pgq/put q 100 "medium/default")
(pgq/put q 200 "low")
(pgq/put q 500 "least")
put-batch
(put-batch q batch)
(put-batch q priority batch)
Put batch of items onto queue.
usage: (put q batch)
(put q priority batch)
Returns true on success, false on failure.
batch is a sequence items.
An item can be any serializable Clojure data.
When an item in the batch is nil, it is removed from
the batch. (put q nil) is a no-op, so put-batch does
likewise.
For arity of 2, a default priority is used.
For arity of 3, the second argument is a priority integer
where a lower value = higher priority; negative integers
are also accepted. All items of the batch will have the
same priority.
queue
(queue name config)
Specify a queue with a name and a config.
Creates the underlying queue table if it
does not yet exist.
- name can be a keyword or string
- config is a hashmap with the following keys:
:db - clojure.java.jdbc db-spec
and optional keys:
:schema - schema name (default is "public"
:table - table name (default is "pgqueues")
:delete - delete behavior upon successful take:
- true (default) deletes queue item row
- false sets a deleted_flag to true
(persists all queue items; see pgqueue/purge-deleted)
:default-priority - default priority for puts not specifying
a priority; must be an integer
where a lower value = higher priority; negative integers
are also accepted
:analyze-threshold - run a 'vacuum analyze' on queue table when this number
of put/put-batch items is hit. 0 disables this feature.
default value is 0 (disabled).
:serializer - instance of a type that implements
pgqueue.serializer.Serializer protocol
default is instance of pgqueue.serializer.nippy/NippySerializer
(pgqueue.serializer.fressian/FressianSerializer is available, too)
take
(take q)
Take item off queue.
Returns nil if no item available.
usage: (take q)
item is retrieved from the queue with the sort order:
- priority (low number = high priority)
- inserted order
This function uses Postgresql's advisory locks
to ensure that only one taker gains access to the item,
such that multiple takers can pull items from the queue
without the fear of another taker pulling the same item.
The item is retrieved from the queue with an advisory lock,
deleted (see pgqueue/queue for delete behavior), unlocked,
and returned.
Also see pgqueue/take-with for use cases requiring the
item to only be removed from the queue after successfully
completing work.
take-batch
(take-batch q n)
Take batch up to n items off queue.
Returns seq of items.
usage: (take-batch q n)
item in batch are retrieved from the queue with the sort order:
- priority (low number = high priority)
- inserted order
take-with
macro
(take-with binding & body)
Lock and take an item off queue, bind the taken item,
execute the body, and ensure delete and unlock after body.
usage: (take-with [binding & body])
binding takes the form [item q], where
item is the binding name, and q is the queue.
This macro uses Postgresql's advisory locks
to ensure that only one taker gains access to the item,
such that multiple takers can pull items from the queue
without the fear of another taker pulling the same item.
unlock
(unlock lock)
Unlock a PGQueueLock.
Returns boolean.
usage: (unlock lock)