Skip to content

ppqueue.Queue

A parallel processing job runner / data structure.

Parameters:

Name Type Description Default
max_concurrent int

max number of concurrently running jobs.

multiprocessing.cpu_count()
max_size int

max size of the queue (default=0, unlimited).

0
engine str | type[multiprocessing.Process] | type[threading.Thread]

the engine used to run jobs.

Process
name str | None

an identifier for this queue.

None
callback Callable[[Job], Any] | None

a callable that is called immediately after each job is finished.

None
show_progress bool

global setting for showing progress bars.

False
drop_finished bool

if True, the queue will not store finished jobs for retrieval.

False
stop_when_idle bool

if True, the queue will stop the pulse when all jobs are finished.

False
pulse_freq_ms int

the interval at which jobs are transitioned between internal queues.

100
no_start bool

if True, do not start the queue pulse on instantiation.

False

Examples:

>>> from ppqueue import Queue
>>> from time import sleep
...
>>> with Queue() as queue:
...     jobs = queue.map(sleep, [1, 2, 3, 4, 5])
...
>>> len(jobs)
5

Functions

collect

collect(
    n: int = 0, wait: bool = False, **kwargs: Any
) -> list[Job]

Removes and returns all finished jobs from the queue.

Parameters:

Name Type Description Default
n int

collect this many jobs (default=0, all)

0
wait bool

If True, block until this many jobs are finished. Else, immediately return all finished.

False
**kwargs Any

kwargs given to Queue.wait.

{}

Returns:

Type Description
list[Job]

a list of Job instances.

Examples:

>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
...     return x + y
...
>>> with Queue() as queue:
...     for i in range(5):
...         _ = queue.enqueue(add_nums, args=[i, 100])
...
...     jobs = queue.collect(wait=True)
...
>>> type(jobs)
<class 'list'>
>>> len(jobs)
5

dequeue

dequeue(
    *,
    wait: bool = False,
    _peek: bool = False,
    **kwargs: Any
) -> Job | None

Removes and returns the finished job with the highest priority from the queue.

Parameters:

Name Type Description Default
wait bool

if no jobs are finished, wait for one.

False
**kwargs Any

passed to Queue.wait

{}

Returns:

Type Description
Job | None

...

Examples:

>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
...     return x + y
...
>>> with Queue() as queue:
...     for i in range(5):
...         _ = queue.enqueue(add_nums, args=[i, 100])
...
...     jobs = [queue.dequeue(wait=True) for _ in range(queue.size())]
...
>>> [job.result for job in jobs]
[100, 101, 102, 103, 104]

dispose

dispose() -> None

Stop running jobs, then clear the queue, then stop the queue pulse.

enqueue

enqueue(
    fun: Callable[..., Any],
    /,
    args: Sequence[Any] | None = None,
    kwargs: dict[str, Any] | None = None,
    name: str | None = None,
    priority: int = 100,
    group: int | None = None,
    timeout: float = 0,
    suppress_errors: bool = False,
    skip_on_group_error: bool = False,
) -> int

Adds a job to the queue.

Parameters:

Name Type Description Default
fun Callable[..., Any]

...

required
args Sequence[Any] | None

...

None
kwargs dict[str, Any] | None

...

None
name str | None

...

None
priority int

...

100
group int | None

...

None
timeout float

...

0
suppress_errors bool

...

False
skip_on_group_error bool

...

False

Returns:

Type Description
int

...

Examples:

>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
...     return x + y
...
>>> with Queue() as queue:
...     for i in range(5):
...         _ = queue.enqueue(add_nums, args=[i, 100])
...
...     jobs = queue.collect(wait=True)
...
>>> [job.result for job in jobs]
[100, 101, 102, 103, 104]

is_busy

is_busy() -> bool

True if max concurrent limit is reached or if there are waiting jobs.

Returns:

Type Description
bool

...

is_empty

is_empty() -> bool

True if there are no jobs in the queue system.

Returns:

Type Description
bool

...

is_full

is_full() -> bool

True if the number of jobs in the queue system is equal to max_size.

Returns:

Type Description
bool

...

map

map(
    fun: Callable[..., Any],
    iterable: Sequence[Any],
    /,
    *args: Any,
    timeout: float = 0,
    show_progress: bool | None = None,
    **kwargs: Any,
) -> list[Job]

Submits many jobs to the queue -- one for each item in the iterable. Waits for all to finish, then returns the results.

Parameters:

Name Type Description Default
fun Callable[..., Any]

...

required
iterable Sequence[Any]

...

required
*args Any

...

()
timeout float

...

0
show_progress bool | None

...

None
**kwargs Any

...

{}

Returns:

Type Description
list[Job]

...

Examples:

>>> from ppqueue import Queue
>>> from time import sleep
...
>>> with Queue() as queue:
...     jobs = queue.map(sleep, [1, 2, 3, 4, 5])
...
>>> len(jobs)
5

peek

peek(*args: Any, **kwargs: Any) -> Job | None

Returns the job with the highest priority from the queue.

Similar to dequeue / pop, but the job remains in the queue.

Parameters:

Name Type Description Default
*args Any

...

()
**kwargs Any

...

{}

Returns:

Type Description
Job | None

...

Examples:

>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
...     return x + y
...
>>> with Queue() as queue:
...     for i in range(5):
...         _ = queue.put(add_nums, args=[i, 100])
...
...     print('Before:', queue.size())
...
...     job = queue.peek(wait=True)
...
...     print('After:', queue.size())
...
Before: 5
After: 5
>>> job.result
100

pop

pop(*args: Any, **kwargs: Any) -> Job | None

Alias for dequeue.

Parameters:

Name Type Description Default
*args Any

...

()
**kwargs Any

...

{}

Returns:

Type Description
Job | None

...

Examples:

>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
...     return x + y
...
>>> with Queue() as queue:
...     for i in range(5):
...         _ = queue.put(add_nums, args=[i, 100])
...
...     jobs = [queue.pop(wait=True) for _ in range(queue.size())]
...
>>> [job.result for job in jobs]
[100, 101, 102, 103, 104]

put

put(*args, **kwargs) -> int

Alias for enqueue.

Adds a job to the queue.

Parameters:

Name Type Description Default
*args

...

()
**kwargs

...

{}

Returns:

Type Description
int

...

Examples:

>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
...     return x + y
...
>>> with Queue() as queue:
...     for i in range(5):
...         _ = queue.put(add_nums, args=[i, 100])
...     jobs = queue.collect(wait=True)
...
>>> [job.result for job in jobs]
[100, 101, 102, 103, 104]

size

size(
    *,
    waiting: bool = False,
    working: bool = False,
    finished: bool = False
) -> int

Get the number of jobs in the queue in state: waiting, working, and/or finished.

If no options are given, the total number of jobs in the queue is returned.

Parameters:

Name Type Description Default
waiting bool

include waiting jobs.

False
working bool

include working jobs.

False
finished bool

include finished jobs.

False

Returns:

Type Description
int

...

Examples:

>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
...     return x + y
...
>>> with Queue() as queue:
...     for i in range(5):
...         _ = queue.enqueue(add_nums, args=[i, 100])
...         print(queue.size())
1
2
3
4
5

starmap

starmap(
    fun: Callable[..., Any],
    iterable: Sequence[Sequence[Any]],
    /,
    *args: Any,
    timeout: float = 0,
    show_progress: bool | None = None,
    **kwargs: Any,
) -> list[Job]

Submits many jobs to the queue -- one for each sequence in the iterable. Waits for all to finish, then returns the results.

Parameters:

Name Type Description Default
fun Callable[..., Any]

...

required
iterable Sequence[Sequence[Any]]

...

required
*args Any

static arguments passed to the function.

()
timeout float

...

0
show_progress bool | None

...

None
**kwargs Any

static keyword-arguments passed to the function.

{}

Returns:

Type Description
list[Job]

...

Examples:

>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
...     return x + y
...
>>> with Queue() as queue:
...     jobs = queue.starmap(
...         add_nums, [(1, 2), (3, 4)]
...     )
...
>>> [job.result for job in jobs]
[3, 7]

starmapkw

starmapkw(
    fun: Callable[..., Any],
    iterable: Sequence[dict[str, Any]],
    /,
    *args: Any,
    timeout: float = 0,
    show_progress: bool | None = None,
    **kwargs: Any,
) -> list[Job]

Submits many jobs to the queue -- one for each dictionary in the iterable. Waits for all to finish, then returns the results.

Parameters:

Name Type Description Default
fun Callable[..., Any]

...

required
iterable Sequence[dict[str, Any]]

...

required
*args Any

static arguments passed to the function.

()
timeout float

...

0
show_progress bool | None

...

None
**kwargs Any

static keyword-arguments passed to the function.

{}

Returns:

Type Description
list[Job]

...

Examples:

>>> from ppqueue import Queue
>>> from time import sleep
...
>>> def add_nums(x: int, y: int) -> int:
...     return x + y
...
>>> with Queue() as queue:
...     jobs = queue.starmapkw(
...         add_nums, [{'x': 1, 'y': 2}, {'x': 3, 'y': 4}]
...     )
...
>>> [job.result for job in jobs]
[3, 7]

start

start() -> None

Start the queue pulse.

stop

stop() -> None

Stop the queue pulse.

wait

wait(
    *,
    n: int = 0,
    timeout: float = 0,
    poll_ms: int = 0,
    show_progress: bool | None = None
) -> int

Wait for jobs to finish.

Parameters:

Name Type Description Default
n int

the number of jobs to wait for (default=0, all).

0
timeout float

seconds to wait before raising TimeoutError (default=0, indefinitely).

0
poll_ms int

milliseconds to pause between checks (default=100).

0
show_progress bool | None

if True, present a progress bar.

None

Returns:

Type Description
int

If n <= 0, returns the count of unfinished jobs.

int

Else, returns the count of finished jobs.