Dispatching tasks to multiple Pharo VMs using SystemProcess

Doing a research work in robotic domain using Pharo as a prototype and implementation tool (via phaROS) is a whole new experience. It is quite impressive to see how quick an implementation idea becomes a working prototype/solution in Pharo thanks to its productive development environment. Most of my robotic applications are critical tasks which require real-time performance, some of them are heavily resource-demanding (CPU). Due to the single process nature of Pharo, running these tasks on the same VM results in a performance bottleneck, thus sometime, violate the real-time requirement of the application. Common solution to this problem is to dispatch these tasks to several native system processes to boost the performance. Unfortunately, this feature is not supported in current Pharo. My goal in this case is to have something that allow to:

  • maintain the use of Pharo's productive environment, so implementation the applications in another language is not an option
  • be able do dispatch tasks to different system processes, (in this case, tasks running on different VMs using the same image), and have a simple inter-VM communication mechanism

And that is when SystemProcess plays its role.

Install the package on Pharo >= 6.1

This package is tested on Linux, but should work on other Posix compatible system like MacOS (not tested yet). If you have a problem, please fire an issue.

Metacello new
    repository: 'github://lxsang/SystemProcess';

SystemProcess: simple principle, lightweight library

From spawning system process...

Briefs history of SystemProcess : i have had a compatibility problem in one of my robotic packages that require both OSProcess (used by PhaROS) and OSSubProcess. These two libraries are mutual exclusive, thus they cannot be used together. So, i decided to develop a lightweight package, named SystemProcess to replace OSSubProcess. This package relies on the posix_spawn system call (via UFFI interface) to spawn a subprocess. It works really well for that purpose.

The following examples show how to use SystemProcess to spawn a system process and interact with its input/output from Pharo:

Execute a command and redirect its stdout

o:= SystemProcess new.
o redirectStdout.
o onOutputDo: [
    :d| Transcript show: d
o onFinishDo: [ o cleanup ].
o shellCommand: { 'ls'. '-al' }.

Execute a command and redirect its stdin, stdout:

|o crlf|
crlf := Character cr asString, Character lf asString.
o:= SystemProcess new.
o redirectStdout.
o redirectStdin.
o onOutputDo: [
    :d| Transcript show: d
o onFinishDo: [ o cleanup ].
o shellCommand: { '/bin/sh'}.
o stdinStream nextPutAll: 'ls -al', crlf.
"exit /bin/sh"
o stdinStream nextPutAll: 'exit',crlf

Actually that is not only what i want to show here, the interesting part is that SystemProcess can be used to spawn another Pharo VM from the current one, thus opens the door to our inter-VM thing.

... to dispatching tasks to multiple VMs

Basically, spawning another Pharo VM from the current one consists of invoking the pharo command using SystemProcess. What is missing here is how these VMs communicate to each other, since one task may need data provided by other task. The only option (other than using the network stack) in this case is to use shared memory (via Posix shm_open and mmap system calls using UFFI). The following figure shows the base principle:

The idea is that, when a VM spawns another VM, it opens a communication channel (object of InterVMChannel class) which consists of two shared memory regions, one for reading and other one for writing (512 Kb each by default). These two shared memories are encapsulated in a "fake" iostream object. A VM may send/read data to/from another VM on the channel by putting/getting data object to/from that iostream (#nextPut:/#next). This stream serializes/materializes automatically the data object on both ends (.e.g. VMs) using the Fuel format.

That's all, pretty simple and pretty lightweight.

A "dispatchable" task should be implemented in a subclass of InterVMChannel. Take an example, a simple counting task: one VM takes a number, increases it by one, then sends it to another VM which does the same thing. The definition of the class:

InterVMChannel subclass: #InterVMCounting
    instanceVariableNames: ''
    classVariableNames: ''
    package: 'SystemProcess'

Some conventions:

  • Every methods defined in the class InterVMCounting will be executed on the child/slave VM except methods having the pragma <master>. These latters will be executed on the parent/master VM.
  • A new VM will be spawn and a communication channel will be opened automatically when creating a new object of this class.
  • The communication between VMs is performed via the iostream instance variable; which hold a fake stream.
  • Each subclass of InterVMChannel should implement the abstract #processData: method that defines how a slave VM process the data sent by its master VM. This method will be called automatically each time a data object is available on the stream.
  • The channel can be closed and the slave VM can be destroyed using the #close method

We implement the #processData: of InterVMCounting as follow:

InterVMCounting >> processData: data
    "data is an integer"
    "show it on slave Transcript"
    Transcript show:'Get: ', data asString;cr.
    "increase it"
    val := data + 1.
    "send back to master"
    self iostream nextPut: val

In addition to that method, we create two new methods, one will be executed on the master side and the other one on the slave side:

InterVMCounting >> greet
    "This will be shown in a slave side Transcript"
    Transcript show: 'Im the slave VM'; cr.

InterVMCounting >> run
    "due to the pragma master, this method will be executed on the master side"
    | val|
    "listen to the iostream in another Pharo process"
    [[ self iostream isOpen ] whileTrue: [ 
        self iostream atEnd ifFalse:[
            "read data from slave"
            val :=  self iostream next.
            "print on master Transcript"
            Transcript show: 'get: ',  val asString; cr.
            "increase it and send back to slave"
            val := val + 1.
            self iostream nextPut:val.
    ]] forkAt: Processor userBackgroundPriority.
    "start the task by sending an integer to the slave side"
    self iostream nextPut: 0.

After implementing the task, the current Pharo image should be saved to disk, before running the task, since the slave VM need to use the same image as the master.
The task can be run as follow:

"this will spawn a new VM in mode headfull"
o := InterVMCounting headfull.
"To spawn a VM in mode headless, use the #new class method instead:
o := InterVMCounting new."
"call the greet method on the slave side"
o greet. 
"call the run method on the master side"
o run.

You should see the counting number printed on both Transcript (one prints even number, other prints odd number). Welcome to multi-VM world!!!

To close the task

o close

That's all folks!!!

Related posts

Powered by antd server, (c) 2017 - 2018 Xuan Sang LE