Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dbus: add SetPropertiesSubscriber method #248

Merged
merged 2 commits into from
Feb 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions dbus/dbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package dbus

import (
"encoding/hex"
"fmt"
"os"
"strconv"
Expand Down Expand Up @@ -60,6 +61,27 @@ func PathBusEscape(path string) string {
return string(n)
}

// pathBusUnescape is the inverse of PathBusEscape.
func pathBusUnescape(path string) string {
if path == "_" {
return ""
}
n := []byte{}
for i := 0; i < len(path); i++ {
c := path[i]
if c == '_' && i+2 < len(path) {
res, err := hex.DecodeString(path[i+1 : i+3])
if err == nil {
n = append(n, res...)
}
i += 2
} else {
n = append(n, c)
}
}
return string(n)
}

// Conn is a connection to systemd's dbus endpoint.
type Conn struct {
// sysconn/sysobj are only used to call dbus methods
Expand All @@ -74,13 +96,18 @@ type Conn struct {
jobs map[dbus.ObjectPath]chan<- string
sync.Mutex
}
subscriber struct {
subStateSubscriber struct {
updateCh chan<- *SubStateUpdate
errCh chan<- error
sync.Mutex
ignore map[dbus.ObjectPath]int64
cleanIgnore int64
}
propertiesSubscriber struct {
updateCh chan<- *PropertiesUpdate
errCh chan<- error
sync.Mutex
}
}

// New establishes a connection to any available bus and authenticates.
Expand Down Expand Up @@ -152,7 +179,7 @@ func NewConnection(dialBus func() (*dbus.Conn, error)) (*Conn, error) {
sigobj: systemdObject(sigconn),
}

c.subscriber.ignore = make(map[dbus.ObjectPath]int64)
c.subStateSubscriber.ignore = make(map[dbus.ObjectPath]int64)
c.jobListener.jobs = make(map[dbus.ObjectPath]chan<- string)

// Setup the listeners on jobs so that we can get completions
Expand Down
19 changes: 19 additions & 0 deletions dbus/dbus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,25 @@ func TestPathBusEscape(t *testing.T) {

}

func TestPathBusUnescape(t *testing.T) {
for in, want := range map[string]string{
"_": "",
"foo_2eservice": "foo.service",
"foobar": "foobar",
"woof_40woof_2eservice": "[email protected]",
"_30123456": "0123456",
"account_5fdb_2eservice": "account_db.service",
"got_2ddashes": "got-dashes",
"foobar_": "foobar_",
"foobar_2": "foobar_2",
} {
got := pathBusUnescape(in)
if got != want {
t.Errorf("bad result for pathBusUnescape(%s): got %q, want %q", in, got, want)
}
}
}

// TestNew ensures that New() works without errors.
func TestNew(t *testing.T) {
_, err := New()
Expand Down
5 changes: 5 additions & 0 deletions dbus/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,3 +584,8 @@ func (c *Conn) Reload() error {
func unitPath(name string) dbus.ObjectPath {
return dbus.ObjectPath("/org/freedesktop/systemd1/unit/" + PathBusEscape(name))
}

// unitName returns the unescaped base element of the supplied escaped path
func unitName(dpath dbus.ObjectPath) string {
return pathBusUnescape(path.Base(string(dpath)))
}
17 changes: 17 additions & 0 deletions dbus/methods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1524,3 +1524,20 @@ func TestReload(t *testing.T) {
t.Fatal(err)
}
}

func TestUnitName(t *testing.T) {
for _, unit := range []string{
"",
"foo.service",
"foobar",
"[email protected]",
"0123456",
"account_db.service",
"got-dashes",
} {
got := unitName(unitPath(unit))
if got != unit {
t.Errorf("bad result for unitName(%s): got %q, want %q", unit, got, unit)
}
}
}
85 changes: 69 additions & 16 deletions dbus/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ func (c *Conn) dispatch() {
c.jobComplete(signal)
}

if c.subscriber.updateCh == nil {
if c.subStateSubscriber.updateCh == nil &&
c.propertiesSubscriber.updateCh == nil {
continue
}

Expand All @@ -84,6 +85,12 @@ func (c *Conn) dispatch() {
case "org.freedesktop.DBus.Properties.PropertiesChanged":
if signal.Body[0].(string) == "org.freedesktop.systemd1.Unit" {
unitPath = signal.Path

if len(signal.Body) >= 2 {
if changed, ok := signal.Body[1].(map[string]dbus.Variant); ok {
c.sendPropertiesUpdate(unitPath, changed)
}
}
}
}

Expand Down Expand Up @@ -169,15 +176,19 @@ type SubStateUpdate struct {
// is full, it attempts to write an error to errCh; if errCh is full, the error
// passes silently.
func (c *Conn) SetSubStateSubscriber(updateCh chan<- *SubStateUpdate, errCh chan<- error) {
c.subscriber.Lock()
defer c.subscriber.Unlock()
c.subscriber.updateCh = updateCh
c.subscriber.errCh = errCh
c.subStateSubscriber.Lock()
defer c.subStateSubscriber.Unlock()
c.subStateSubscriber.updateCh = updateCh
c.subStateSubscriber.errCh = errCh
}

func (c *Conn) sendSubStateUpdate(unitPath dbus.ObjectPath) {
c.subscriber.Lock()
defer c.subscriber.Unlock()
c.subStateSubscriber.Lock()
defer c.subStateSubscriber.Unlock()

if c.subStateSubscriber.updateCh == nil {
return
}

if c.shouldIgnore(unitPath) {
return
Expand All @@ -186,7 +197,7 @@ func (c *Conn) sendSubStateUpdate(unitPath dbus.ObjectPath) {
info, err := c.GetUnitPathProperties(unitPath)
if err != nil {
select {
case c.subscriber.errCh <- err:
case c.subStateSubscriber.errCh <- err:
default:
}
}
Expand All @@ -196,10 +207,10 @@ func (c *Conn) sendSubStateUpdate(unitPath dbus.ObjectPath) {

update := &SubStateUpdate{name, substate}
select {
case c.subscriber.updateCh <- update:
case c.subStateSubscriber.updateCh <- update:
default:
select {
case c.subscriber.errCh <- errors.New("update channel full!"):
case c.subStateSubscriber.errCh <- errors.New("update channel full!"):
default:
}
}
Expand All @@ -222,7 +233,7 @@ func (c *Conn) sendSubStateUpdate(unitPath dbus.ObjectPath) {
// the properties).

func (c *Conn) shouldIgnore(path dbus.ObjectPath) bool {
t, ok := c.subscriber.ignore[path]
t, ok := c.subStateSubscriber.ignore[path]
return ok && t >= time.Now().UnixNano()
}

Expand All @@ -231,20 +242,62 @@ func (c *Conn) updateIgnore(path dbus.ObjectPath, info map[string]interface{}) {

// unit is unloaded - it will trigger bad systemd dbus behavior
if info["LoadState"].(string) == "not-found" {
c.subscriber.ignore[path] = time.Now().UnixNano() + ignoreInterval
c.subStateSubscriber.ignore[path] = time.Now().UnixNano() + ignoreInterval
}
}

// without this, ignore would grow unboundedly over time
func (c *Conn) cleanIgnore() {
now := time.Now().UnixNano()
if c.subscriber.cleanIgnore < now {
c.subscriber.cleanIgnore = now + cleanIgnoreInterval
if c.subStateSubscriber.cleanIgnore < now {
c.subStateSubscriber.cleanIgnore = now + cleanIgnoreInterval

for p, t := range c.subscriber.ignore {
for p, t := range c.subStateSubscriber.ignore {
if t < now {
delete(c.subscriber.ignore, p)
delete(c.subStateSubscriber.ignore, p)
}
}
}
}

// PropertiesUpdate holds a map of a unit's changed properties
type PropertiesUpdate struct {
UnitName string
Changed map[string]dbus.Variant
}

// SetPropertiesSubscriber writes to updateCh when any unit's properties
// change. Every property change reported by systemd will be sent; that is, no
// transitions will be "missed" (as they might be with SetSubStateSubscriber).
// However, state changes will only be written to the channel with non-blocking
// writes. If updateCh is full, it attempts to write an error to errCh; if
// errCh is full, the error passes silently.
func (c *Conn) SetPropertiesSubscriber(updateCh chan<- *PropertiesUpdate, errCh chan<- error) {
c.propertiesSubscriber.Lock()
defer c.propertiesSubscriber.Unlock()
c.propertiesSubscriber.updateCh = updateCh
c.propertiesSubscriber.errCh = errCh
}

// we don't need to worry about shouldIgnore() here because
// sendPropertiesUpdate doesn't call GetProperties()
func (c *Conn) sendPropertiesUpdate(unitPath dbus.ObjectPath, changedProps map[string]dbus.Variant) {
c.propertiesSubscriber.Lock()
defer c.propertiesSubscriber.Unlock()

if c.propertiesSubscriber.updateCh == nil {
return
}

update := &PropertiesUpdate{unitName(unitPath), changedProps}

select {
case c.propertiesSubscriber.updateCh <- update:
default:
select {
case c.propertiesSubscriber.errCh <- errors.New("update channel is full"):
default:
}
return
}
}
56 changes: 56 additions & 0 deletions dbus/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,59 @@ func TestSubStateSubscription(t *testing.T) {
}
}
}

// TestPropertiesSubscription exercises the basics of property change event subscriptions
func TestPropertiesSubscription(t *testing.T) {
target := "subscribe-events.service"

conn, err := New()
defer conn.Close()
if err != nil {
t.Fatal(err)
}

err = conn.Subscribe()
if err != nil {
t.Fatal(err)
}

updateCh := make(chan *PropertiesUpdate)
errCh := make(chan error)
conn.SetPropertiesSubscriber(updateCh, errCh)

setupUnit(target, conn, t)
linkUnit(target, conn, t)

reschan := make(chan string)
_, err = conn.StartUnit(target, "replace", reschan)
if err != nil {
t.Fatal(err)
}

job := <-reschan
if job != "done" {
t.Fatal("Couldn't start", target)
}

timeout := make(chan bool, 1)
go func() {
time.Sleep(3 * time.Second)
close(timeout)
}()

for {
select {
case update := <-updateCh:
if update.UnitName == target {
subState, ok := update.Changed["SubState"].Value().(string)
if ok && subState == "running" {
return // success
}
}
case err := <-errCh:
t.Fatal(err)
case <-timeout:
t.Fatal("Reached timeout")
}
}
}