Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gobwasalive upgrader/dialer support keepalive #5

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@
_authortools
/_examples/**/node_modules
.directory
*.exe
*.exe
*.exe~
.idea
/vendor
125 changes: 125 additions & 0 deletions _examples/gobwasalive/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package main

import (
"context"
"log"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"

"github.com/kataras/neffos"
"github.com/kataras/neffos/gobwasalive"
)

var events = neffos.WithTimeout{
ReadTimeout: 20 * time.Second,
WriteTimeout: 20 * time.Second,

Namespaces: neffos.Namespaces{
"v1": neffos.Events{
"echo": onEcho,
},
},
}

func onEcho(c *neffos.NSConn, msg neffos.Message) error {
body := string(msg.Body)
log.Println(body)

if !c.Conn.IsClient() {
newBody := append([]byte("echo back: "), msg.Body...)
return neffos.Reply(newBody)
}

return nil
}

func main() {
args := os.Args[1:]
if len(args) == 0 {
log.Fatalf("expected program to start with 'server' or 'client' argument")
}
side := args[0]

clientNum := 0
if len(args) == 2 {
clientNum, _ = strconv.Atoi(args[1])
log.Printf("Start clientNum: %v", clientNum)
}

switch side {
case "server":
runServer()
case "client":
if clientNum == 0 {
runClient()
} else {
runClientMany(clientNum)
}

default:
log.Fatalf("unexpected argument, expected 'server' or 'client' but got '%s'", side)
}
}

func runServer() {
upgrader := gobwasalive.NewUpgrader(16 * time.Second)
websocketServer := neffos.New(upgrader, events)

websocketServer.OnConnect = func(c *neffos.Conn) error {
log.Printf("OnConnect cid: %v", c.ID())
return nil
}

websocketServer.OnDisconnect = func(c *neffos.Conn) {
log.Printf("OnDisconnect cid: %v", c.ID())
}

router := http.NewServeMux()
router.Handle("/echo", websocketServer)

ticker := time.NewTicker(5 * time.Second)
go func() {
for {
select {
case <-ticker.C:
log.Printf("total clients: %v", websocketServer.GetTotalConnections())
}
}
}()

log.Println("Serving websockets on localhost:8080/echo")
log.Fatal(http.ListenAndServe(":8080", router))
}

func runClient() {
ctx := context.Background()
dialer := gobwasalive.NewDialer(15 * time.Second)
client, err := neffos.Dial(ctx, dialer, "ws://localhost:8080/echo", events)
if err != nil {
panic(err)
}

_, err = client.Connect(ctx, "v1")
if err != nil {
panic(err)
}

//c.Emit("echo", []byte("Greetings!"))

// a channel that blocks until client is terminated,
// i.e by CTRL/CMD +C.
<-client.NotifyClose
}

func runClientMany(clientNum int) {
for i := 0; i < clientNum; i++ {
go runClient()
}
ch := make(chan os.Signal)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
log.Println(<-ch)
}
17 changes: 16 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,27 @@ module github.com/kataras/neffos

go 1.12

replace (
golang.org/x/build => github.com/golang/build v0.0.0-20190709001953-30c0e6b89ea0
golang.org/x/crypto => github.com/golang/crypto v0.0.0-20190701094942-4def268fd1a4
golang.org/x/debug => github.com/golang/debug v0.0.0-20190515041333-621e2d3f35da
golang.org/x/lint => github.com/golang/lint v0.0.0-20190409202823-959b441ac422
golang.org/x/net => github.com/golang/net v0.0.0-20190628185345-da137c7871d7
golang.org/x/perf => github.com/golang/perf v0.0.0-20190620143337-7c3f2128ad9b
golang.org/x/sync => github.com/golang/sync v0.0.0-20190423024810-112230192c58
golang.org/x/sys => github.com/golang/sys v0.0.0-20190710143415-6ec70d6a5542
golang.org/x/text => github.com/golang/text v0.3.2
golang.org/x/time => github.com/golang/time v0.0.0-20190308202827-9d24e82272b4
golang.org/x/tools => github.com/golang/tools v0.0.0-20190711191110-9a621aea19f8
)

require (
github.com/RussellLuo/timingwheel v0.0.0-20190518031256-7b3d146a266a
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee // indirect
github.com/gobwas/pool v0.2.0 // indirect
github.com/gobwas/ws v1.0.1
github.com/gorilla/websocket v1.4.0
github.com/iris-contrib/go.uuid v2.0.0+incompatible
github.com/mediocregopher/radix/v3 v3.3.0
github.com/nats-io/nats.go v1.8.1 // indirect
github.com/nats-io/nats.go v1.8.1
)
14 changes: 14 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
github.com/RussellLuo/timingwheel v0.0.0-20190518031256-7b3d146a266a h1:BBXTjl+/xTT72HpWqPe2oqKex5fZwsqHaxUEVqjT2Sg=
github.com/RussellLuo/timingwheel v0.0.0-20190518031256-7b3d146a266a/go.mod h1:3VIJp8oOAlnDUnPy3kwyBGqsMiJJujqTP6ic9Jv6NbM=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8=
github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.0.1 h1:iYpM3WoNpsexO6bqCN1MnvVRylnKg6278zivIZDRXUM=
github.com/gobwas/ws v1.0.1/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
github.com/golang/crypto v0.0.0-20190701094942-4def268fd1a4 h1:SqpWDZAu6UkmbvUTCtyNpBZLY8110TJ7bgxIki3pZw0=
github.com/golang/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
github.com/golang/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
github.com/golang/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
github.com/golang/sys v0.0.0-20190710143415-6ec70d6a5542 h1:nj3zoh9xX6lavzCnatfuuBVNMi37YqbhEThbgW5ysWs=
github.com/golang/sys v0.0.0-20190710143415-6ec70d6a5542/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
github.com/golang/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
github.com/golang/tools v0.0.0-20190711191110-9a621aea19f8/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/iris-contrib/go.uuid v2.0.0+incompatible h1:XZubAYg61/JwnJNbZilGjf3b3pB80+OQg2qf6c8BfWE=
github.com/iris-contrib/go.uuid v2.0.0+incompatible/go.mod h1:iz2lgM/1UnEf1kP0L/+fafWORmlnuysV2EMP8MW+qe0=
github.com/mediocregopher/mediocre-go-lib v0.0.0-20181029021733-cb65787f37ed/go.mod h1:dSsfyI2zABAdhcbvkXqgxOxrCsbYeHCPgrZkku60dSg=
github.com/mediocregopher/radix/v3 v3.3.0 h1:oacPXPKHJg0hcngVVrdtTnfGJiS+PtwoQwTBZGFlV4k=
github.com/mediocregopher/radix/v3 v3.3.0/go.mod h1:EmfVyvspXz1uZEyPBMyGK+kjWiKQGvsUt6O3Pj+LDCQ=
github.com/nats-io/nats.go v1.8.1 h1:6lF/f1/NN6kzUDBz6pyvQDEXO39jqXcWRLu/tKjtOUQ=
Expand All @@ -16,3 +28,5 @@ github.com/nats-io/nkeys v0.0.2 h1:+qM7QpgXnvDDixitZtQUBDY9w/s9mu1ghS+JIbsrx6M=
github.com/nats-io/nkeys v0.0.2/go.mod h1:dab7URMsZm6Z/jp9Z5UGa87Uutgc2mVpXLC4B7TDb/4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
39 changes: 39 additions & 0 deletions gobwasalive/dialer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package gobwasalive

import (
"context"
"github.com/RussellLuo/timingwheel"
"time"

"github.com/kataras/neffos"

gobwas "github.com/gobwas/ws"
)

// DefaultDialer is a gobwas/ws dialer with all fields set to the default values.
//var DefaultDialer = Dialer(gobwas.DefaultDialer)

var twDialer = timingwheel.NewTimingWheel(time.Millisecond, 20)
var twDialerRunning = false

// Dialer is a `neffos.Dialer` type for the gobwas/ws subprotocol implementation.
// Should be used on `Dial` to create a new client/client-side connection.
// To send headers to the server set the dialer's `Header` field to a `gobwas.HandshakeHeaderHTTP`.
func dialer(dialer gobwas.Dialer, idleTime time.Duration) neffos.Dialer {
return func(ctx context.Context, url string) (neffos.Socket, error) {
underline, _, _, err := dialer.Dial(ctx, url)
if err != nil {
return nil, err
}

return newSocket(underline, nil, true, idleTime, twDialer), nil
}
}

func NewDialer(idleTime time.Duration) neffos.Dialer {
if !twDialerRunning {
twDialer.Start()
twDialerRunning = true
}
return dialer(gobwas.DefaultDialer, idleTime)
}
12 changes: 12 additions & 0 deletions gobwasalive/helpers_go19.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// +build go1.9

package gobwasalive

import gobwas "github.com/gobwas/ws"

// Options is just an alias for the `gobwas/ws.Dialer` struct type.
type Options = gobwas.Dialer

// Header is an alias to the adapter that allows the use of `http.Header` as
// `gobwas/ws.Dialer.HandshakeHeader`.
type Header = gobwas.HandshakeHeaderHTTP
Loading