Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Goroutines with wait-free communication #65

Open
nin-jin opened this issue Jun 14, 2020 · 14 comments
Open

Goroutines with wait-free communication #65

nin-jin opened this issue Jun 14, 2020 · 14 comments

Comments

@nin-jin
Copy link

nin-jin commented Jun 14, 2020

Description

std.cocurrency has fibers, threads, logical threads, sandboxes. But:

  • It uses mutexes (and locks).
  • fibers have large stack size that can't be resized.

std.parallelism has tasks, thread pool. But:

  • tasks can't be suspended.

vide-core reimplements it: suspendable tasks that runs on thread pool. But:

  • nonstandard lib.
  • slow.
  • It uses mutexes (and locks).
  • tasks have large stack size that can't be resized.

golang has native support of stackfull coroutines with autosizable stacks that run on thread poll and communication through channels. But:

  • It's another, dumb language.
  • It doesn't support statically typed thread safety.
  • It uses mutexes (and locks).

I wrote go.d to make go-like API with gorouties that run on the thread pool and communicates with wait-free channels. It uses vibe-core so inherits all it's cons. I tried to reimplement it on std.parallelism, on std.concurrency and on both, but suffer defeat. It's hard for me. And I need help.

I think we should have std api's to do it:

  1. The simple way to run suspendable tasks on threadpool.
  2. Tasks should have a small stack size with automatic growth like in go lang.
  3. Locking should be reduced as more as possible.
  4. Tasks should use wait-free gc-free channels to send data (and other tasks too).
  5. Only immutable, shared and movable data should be transferrable through the channel.
  6. Implementation should be as easy as possible.

What are rough milestones of this project?

  1. Currently, there is a very simple code with remarks, tests and some diagrams.
  2. Remove vibe-core dependency from go.d and implement all required functionality based on std.concurrency.
  3. Update std to better support move semantic (ie we can't use sum with non-copiable ranges).
  4. Remove duplicated abstractions from std (tasks, logical thread, fibers) - all abstractions should be orthogonal and composable.
  5. Reimplement vibe-core API based on new std api.

How does this project help the D community?

It gives a standard simple way to make safe and fast multithreading apps.

Recommended skills

  • Multithreading.
  • Stack and heap at low-level.
  • Go lang.
  • Ownership and borrowing.

What can students expect to get out of doing this project?

  • Respect of community for solving important D problems at the modern world.

Point of Contact

@nin-jin - tell me if I can help you to understand the ideas and join the project.

References

@mw66
Copy link

mw66 commented Jun 14, 2020

  1. Tasks should use wait-free gc-free channels to send data (and other tasks too).

https://forum.dlang.org/post/[email protected]

lock-free queue?

https://www.liblfds.org/mediawiki/index.php?title=r7.1.1:Queue_(unbounded,_many_producer,_many_consumer)

Java uses the same algorithm for ConcurrentLinkedQueue (in C implementation).

I tried some small examples with liblfds, got slightly better performance than Java. Maybe we don’t want to reinvent the wheels, esp the well tested ones.

try: https://github.com/mingwugmail/liblfdsd

received 100000000 messages in 4632 msec sum=4999999950000000 speed=21588 msg/msec

@nin-jin
Copy link
Author

nin-jin commented Jun 14, 2020

We should think about safety too. Not only performance. Ie my implementation guarantee that 1p1c queue really has at most 1 consumer and 1 provider.

@mw66
Copy link

mw66 commented Jun 15, 2020

that liblfds queue is mpmc, i.e. it's correct even under mpmc requests.

@nin-jin
Copy link
Author

nin-jin commented Jun 15, 2020

If it's mpmc than it use locking. Isn't?

@mw66
Copy link

mw66 commented Jun 15, 2020

No, it's mpmc and lock free.

liblfds stands for: lib-lock-free-data-structure.

https://www.liblfds.org/mediawiki/index.php?title=r7.1.1:Queue_(unbounded,_many_producer,_many_consumer)#Lock-free_Specific_Behaviour

@nin-jin
Copy link
Author

nin-jin commented Jun 15, 2020

I don't understand how it can be possible without spinlock and CAS.

@mw66
Copy link

mw66 commented Jun 15, 2020

It has CAS, but not the 'lock' in the usual sense, the implementation is based on:

https://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf

@nin-jin
Copy link
Author

nin-jin commented Jun 16, 2020

CAS based algorithms can produce "live locks" and "false sharing" on many consumers/providers. So I prefer wait-free instead of lock-free.

@Geod24
Copy link
Member

Geod24 commented Jun 18, 2020

  1. Tasks should have a small stack size with automatic growth like in go lang.

That one is going to be a real pain. Go has compiler support for this, we don't (and can't).
We could use the segfault handler but I don't know if we'd match the performance of Go.

@mw66
Copy link

mw66 commented Jun 18, 2020

  1. Tasks should have a small stack size with automatic growth like in go lang.

That one is going to be a real pain. Go has compiler support for this, we don't (and can't).
We could use the segfault handler but I don't know if we'd match the performance of Go.

Then at least we should come close, right now it's 2~4x time slower than Go.
(And the plain message queue in D is ~4x time slower than in Java).
It's hard to impress anyone with this kind of performance.

I'm wondering how much we can gain if the queue is faster. Any detailed profile info to show where is the bottle neck?

@mw66
Copy link

mw66 commented Jun 18, 2020

  1. Tasks should have a small stack size with automatic growth like in go lang.

That one is going to be a real pain. Go has compiler support for this, we don't (and can't).
We could use the segfault handler but I don't know if we'd match the performance of Go.

Actually, this to some extent, can be done in user space, using duff's device:

https://en.wikipedia.org/wiki/Duff%27s_device

https://www.chiark.greenend.org.uk/~sgtatham/coroutines.html

@nin-jin
Copy link
Author

nin-jin commented Jun 18, 2020

Go has compiler support for this, we don't (and can't).

Compiler could mark functions with attribute MaxStackSize depends calls inside.

@Geod24
Copy link
Member

Geod24 commented Jun 18, 2020

I have been working on a similar problem for a while now, I think I might have mentioned this somewhere on the forum.

The gist of it is:

Different communications primitives

Go and std.concurrency have a different model. Go has channels, which are essentially two-way pipes. You can have a single channel per Goroutine, or you can have many.
On the other hand, std.concurrency has a single MessageBox per "task". It was originally conceived as one MessageBox per thread, but when fiber support was added, it was extended.
If you have the Go-style primitives, you can do the std.concurrency style. However the other way around is not possible, obviously. As a result, you do not have the ability to listen to multiple MessageBox at the moment (select in Go). That is a critical missing piece of functionality.

C/C++ interop

The main blocker, IMO, is that D is not a managed language like Go. D program can (and will) do everything: you have inline assembler, call to C or C++ functions, direct interactions with the OS. On the other hand, Go has a much more integrated environment and it much harder (on purpose) to do such things.
In particular, Go has a global scheduler which handles everything. It can suspend your Goroutine, move it, etc.. It has compiler support. D cannot have compiler support, simply because it might call into C / C++ code (Go also does but the runtime has a bit of magic in it).

Scheduler

In order to be efficient, our channels cannot block the thread, they need to cooperate. So the scheduler is definitely needed. The way I approached it in my project is to have a scheduler that manages fibers at the thread level, a queue for message passing, and a class that binds them together.

Hand-off semantic / D specificity

D is a bit different (and I believe, has an advantage) because of shared. If your channel is used for cross-thread communication, we should use one of the queue you mentioned, and the data needs to be shared. If intra-thread communication is used, we can use direct hand off semantic and avoid a costly yield to the scheduler, but if possible we'd like to avoid to require the use of shared.

TL;DR

I am working on a per thread scheduler, a thread-safe message queue, and a class that binds those together. Go has made some very good documented technical choices and we should draw from it. The page-size part is tricky and I believe can be implemented almost independently.

@Cyroxin
Copy link

Cyroxin commented Jun 18, 2020

CAS based algorithms can produce "live locks" and "false sharing" on many consumers/providers. So I prefer wait-free instead of lock-free.

If a single wait free producer&consumer communication is all you need, then you can look into how I did message passing in elembuf. You can close the other thread or give it new orders/tasks. This way you get all the pros of std.parallelism without the cons. You could even make a threadpool out of these threads that can be ordered to do any task.

Elembuf that is using this system is currently about 35x faster at message passing than the benchmark version you posted called go.d. You can make a dependency on the threaded version of Elembuf in your projects, which should make things much easier than reimplementing the wheel. There are still some optimizations left to do, which I haven't gotten into doing yet, you can check these out on the repo's project page for 1.2.X.

elembuf
#!/usr/bin/env dub
/+ dub.sdl:
name "app"
dependency "elembuf" version="~>1.1.5"
dflags "-release" "-m64" "-boundscheck=off" "-O" platform="dmd"
dflags "-O4" "--release" "--boundscheck=off" platform="ldc2"
+/

// dmd.exe  -release -m64 -boundscheck=off -O  buffer.d
// ldc2 -O4 --release --boundscheck=off buffer.d

import buffer;

import std.stdio;
import std.datetime.stopwatch;
import core.atomic;
import core.thread, std.concurrency;

const n=1_000_000_000; //;_000
enum amount = n;


void main() //line 22
{
    Buffer!(size_t, true) buffer = Buffer!(size_t,true)();

	size_t srci = 0; // Source index
	size_t consi = 0; // Consumer index

    size_t sum = 0; 

    auto src = (size_t[] x) // Tell background thread about source //Line 30
    {
		const needToFill = amount - srci;

		if( x.length >= needToFill ) // Final fill!
		{
			foreach(i;0..needToFill){
				x[i] = srci;
				srci++;
			}
			return needToFill;
		}
		else // Long way to go still...
		{
			foreach(ref i;x){
				i = srci;
				srci++;
			}
			return x.length;
		}

    }; 

	buffer.fill = src;

	// START!
	StopWatch sw;
   	sw.start();  

	while(consi < amount)
	{
		buffer.fill();

		foreach(elem; buffer)
			sum += elem;

		consi += buffer.length;

		buffer = buffer[$..$];
	}

	sw.stop();
   	writeln("finished receiving");
   	writefln("received %d messages in %d msec sum=%d speed=%d msg/msec", n, sw.peek().total!("msecs"), sum, n/sw.peek().total!("msecs"));

}
go
#!/usr/bin/env dub
/+ dub.sdl:
name "app"
dependency "jin-go" version="~>2.0.0"
+/

import std.datetime.stopwatch;
import std.stdio;

import jin.go;

const int n = 100_000_000;

void threadProducer(Output!int queue)
{
  foreach (int i; 0..n) {
	queue.put(i);
  }
}

void main()
{
	Input!int queue;
	jin.go.go.go!threadProducer(queue.pair);

	StopWatch sw;
	sw.start();
	long sum = 0;

	foreach (p; queue)
	{
		sum += p;
	}

	sw.stop();

	writefln("received %d messages in %d msec sum=%d speed=%d msg/msec", n,
			sw.peek.total!"msecs", sum, n / sw.peek.total!"msecs");
}
elembuf.d ldc $dub
received 1000000000 messages in 28237 msec sum=499999999500000000 speed=35414 msg/msec
go.d ldc $dub
received 100000000 messages in 124417 msec sum=4999999950000000 speed=803 msg/msec

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants