Concurrent Communicating Sequential Processes
PDF
Module CSP
The CSP module of the Lua Virtual Machine (lvm) extends Lua with a concurrent and parallel process model based on a set of process constructors that can be nested, Among the process constructors, there is a set of synchronisation objects. Except concurrency, the process constructors and the channel IPC object matches exactly the process and communication behaviour of the original CSP model by Hoare (and the syntax of the OCCAM language).
The CSP module supports two levels of parallelism:
- Threads executing process functions in a separate VM instance. Data can be shared by different VM instances and processes by a global memory store and data serialisation. Userdata objects can be shared by reference, all other data is shared by copy.
- Fibers (coroutines) executing process functions in scheduled fibers in the same VM instance. Data can be shared by all fibers by reference. There is no parallel execution.
Process Constructors
Seq
Seq(function {},shared?:{})
Sequential process constructor executing a list of process functions strict sequentially in the current VM instance. The Seq process terminates if the last sequential process terminates. Following statements are executed after the Seq process terminates. The Seq constructor is trivial as Lua statements are already executed strict sequentially. All process functions share the same local and global scope.
Sched
Sched(function {},shared?:{})
Scheduled process constructor executing a list of process functions (coroutines) pseudo parallel in the current VM instance but each process function in a fiber (non-parallel but scheduled processes). The Sched process terminates if the last process terminates, and following statements are executed after the Sched process terminated. All process functions share the same local and global scope.
Par
Par(function {},shared?:{})
Parallel process constructor executing a list of process function parallel in different VM instances. Each process function has its own local and global scope. The Par process performs joining of (waiting for) all sub-processes and terminates if all parallel sub-processes terminated. Following statements are executed after the Par process terminated. The process function can access free variables and functions, but only with a local scope (except user-data tables). All free variables and functions will be serialised and deserialised in each sub-process.
Fork
Fork(function {},shared?:{})
Parallel process constructor executing a list of process function parallel in different VM instances. The Fork process performs no joining of (waiting for) sub-processes and following statements are executed immediately. The process function can access free variables and functions, but only with a local scope (except user-data tables). All free variables and functions will be serialised and deserialised in each sub-process.
Alt
Alt(processes:function {},shared?:{})
Alternative choice process constructor executing one of the conditional processes. A conditional process have to execute a blocking operation, e.g., channel read. The first process that is ready is executed. Each conditional process is executed in a separate fiber (coroutine).
Interprocess Communication
Interprocess Communication (IPC) is used to synchronise processes for:
- Cooperation
- Coordination (competition)
IPC between threads (parallel processes) and fibers (coroutines) has to be distinguished. Thread IPC blocks entire threads and all contained fibers, whereas fiber IPC only blocks one fiber of a thread,
Channel
Channel(depth:number,fiber?:boolean) -> channel
Creates a new communication channel with the given FIFO depth. A depth = 0 creates an handshake channel (i.e., reader and writer rendezvous). If the fiber argument is true, a channel for scheduled coroutines (fibers) is created.
channel:read
channel:read() -> *
Reads data from the channel. If there is no data available, the operation suspends (blocks) the process execution until data is available.
channel:write
channel:write(data:*)
Writes data to the channel. If the queue is full (or there is no reader for depth=0), the operation suspends (blocks) the process execution until a read operation is performed.
Mutex
Mutex() -> mutex
Create a mutual exclusion lock.
mutex:lock
mutex:lock()
Acquires and locks a mutex. If the mutex is already locked, the operation suspends (blocks) the process execution and enqueues the process until the mutex lock is released and the current process gets the lock.
mutex:unlock
mutex:unlock()
Unlocks a previously acquired mutex lock. If there are waiting processes, the next process is dequeued and resumed.
Semaphore
Semaphore(init:number,fiber?:boolean) -> semaphore
Create a semaphore object with an initial counter value. If the fiber is set true, a semaphore for scheduled coroutines is created (cannot be used across parallel processes).
semaphore:down
semaphore:down()
Decrements the semaphore counter value by one. If the semaphore counter is already zero, the operation enqueues and suspends (blocks) the process until an up operation was performed.
semaphore:up
semaphore:up()
Increments the semaphore counter by one. If there are waiting processes, the next process is dequeued and resumed.
semaphore:level
semaphore:level() -> number
Return current counter value.
Event
Event(fiber?:boolean) -> event
Create an event object used to synchronise parallel processes or scheduled coroutines (fiber is true),.
event:await
event:await()
Waits for an event signal. The operation suspends (blocks) the process execution and enqueues the process until the event was raised by the wakeup operation.
event:wakeup
event:wakeup()
Signals an event and resumes all waiting processes.
Barrier
Barrier(group:number,fiber?:boolean) -> barrier
Create an event object used to synchronise processes
barrier:await
event:await()
Waits for a group of processes entering the barrier. The operation suspends (blocks) the process execution and enqueues the process until the last process (i=group) enters the barrier by the await operation.#
Timer
Timer(interval:number,once:boolean,fiber?:boolean) -> timer
Creates a timer object to synchronise processes temporarily. This timer can only be used in Seq, Sched , or Alt processes if the fiber argument is set. A fiber timer cannot be shared by Par processes (only with fiber=nil|false). The timer must be started after creation.
timer:await
timer:await()
Waits for a timer signal. The operation suspends (blocks) the process execution and enqueues the process until the event was raised by the timer..
timer:start
timer:start()
Starts the timer.
timer:stop
timer:stop()
Stops the timer.
Matrix
Matrix(dims:number [],init?:number|function,type?:string) -> matrix
Creates a new shared matrix object (supported dimensions: 1,2,3) that can be accessed in parallel processes concurrently. Supported data types are: float, double (default), int. Concurrent access is serialised and protected by a mutex. The index order is i=column (1), i=row,j=column (2), and i=level,j=row,k=column (3).
matrix:read
matrix:read(i,j,k) -> number
Reads the value of an element of the matrix.
matrix:write
matrix:write(value:number,i,j,k)
Writes new value to an element of the matrix.
Examples
require('Csp')
local counter=1
Seq({
function () counter=counter+1 end,
function () counter=counter+1 end
})
print('Waiting for counter='..counter)
Seq Constructor Example
require('Csp')
Par({
function (id)
log('Process A starting');
local x=ch:read();
log('hello '..x);
end,
function (id)
ch:write('world');
end
},{
ch=Channel(0)
})
print('Waiting ..')
Par Constructor Example
require('Csp')
Par({
function (id)
local x
Alt({
function () print(1); x=ch1:read(); print('got ch1') end,
function () print(2); x=ch2:read(); print('got ch2') end,
})
log('hello '..x);
end,
function (id)
ch2:write('world');
end
},{
ch1=Channel(1),
ch2=Channel(1),
})
Alt Constructor Example
require('Csp')
local sema = Semaphore(0)
Par(
{function (pid)
Seq({
function (pid2) sema:down(); print('1.1 hello pid='..pid..'.'..pid2) end,
function (pid2) print('1.2 hello pid='..pid..'.'..pid2) end
})
end,
function (pid)
Par({
function (pid2) sema:up(); print ('2.1 hello pid='..pid..'.'..pid2) end,
function (pid2) print ('2.2 hello pid='..pid..'.'..pid2) end
})
end},{
})
print('Waiting ..')
Semaphore Example
require('Csp')
Fork(
{function (pid)
Seq({
function (pid2) print('1.1 hello pid='..pid..'.'..pid2) end,
function (pid2) print('1.2 hello pid='..pid..'.'..pid2) end
})
end,
function (pid)
Par({
function (pid2) print ('2.1 hello pid='..pid..'.'..pid2) end,
function (pid2) print ('2.2 hello pid='..pid..'.'..pid2) end
})
end},{
})
print('Not waiting ..')
loop.start()
Fork Example
Meta Data
Author: Stefan Bosse
Version: 23.5.2019