Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

watchtower: replace in-mem task queue with a disk over-flow queue #7380

Merged
merged 8 commits into from
May 16, 2023

Conversation

ellemouton
Copy link
Collaborator

@ellemouton ellemouton commented Feb 3, 2023

The Problem:

At the moment, updates are passed to the tower client using an unbounded
in-memory list. This is a problem for a few reasons:

  1. If the client does not have any active towers available, then updates will
    indefinitely be queued in memory.
  2. If the queue is not empty at shutdown, then the client will hang for 10
    seconds while it waits for the updates to be sent to a tower. This results in
    the shutdown of the whole LND binary to be delayed. Not ideal.
  3. Building on point 2 - if those updates dont get backed up to a tower within
    those 10 seconds then they are lost forever!

This PR aims to fix all of these issues.

The Solution at a high level:

The main thing that this PR introduces is a disk overflow queue which replaces
the current in-memory queue. This queue ensures that there is a cap on the
number of updates that are held in memory and ensures reliable restarts so
that no updates are lost on restart. With this, the 10 second delay on shutdown
can also be removed.

The flow of the PR:

  1. A test is added to demonstrate that updates in the queue are lost on restart.
    By the end of the PR, this test will be changed to show that the updates are
    no longer lost.
  2. Add a new interface defining a generic DB Queue.
  3. Add a Bbolt implementation of the generic DB queue.
  4. Add an in-mem impl of the DB queue that can be used for tests.
  5. Add a generic Disk Overflow Queue.
  6. Replace the current in-mem task queue with the new Disk Overflow Queue.

Fixes #5983

@ellemouton ellemouton force-pushed the wtclientDiskQueue branch 6 times, most recently from 1885f79 to 2efadc1 Compare February 13, 2023 12:52
@ellemouton ellemouton requested a review from guggero February 23, 2023 15:27
@saubyk saubyk added this to the v0.16.1 milestone Mar 9, 2023
@saubyk saubyk requested a review from bitromortac March 13, 2023 17:32
@ellemouton ellemouton changed the base branch from master to 0-16-1-staging March 16, 2023 13:20
Copy link
Collaborator

@guggero guggero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work on this PR 💯
The commit structure and commit messages are really beautiful and easy to process!

My main concern is about the complexity of the disk queue. Maybe we can simplify it somewhat, left a few suggestions around that.

Copy link
Collaborator Author

@ellemouton ellemouton left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review @guggero!

I updated as per your nits & suggestions around the non-overflow queue code. I have mostly just responded to your comments around the overflow queue. Wasn't yet able to figure out how to make it simpler. Let me know if I am missing something with any of the comments :)

@ellemouton ellemouton force-pushed the wtclientDiskQueue branch 2 times, most recently from 0f34b6c to 82fce00 Compare March 22, 2023 13:52
@ellemouton
Copy link
Collaborator Author

@guggero - im gonna re-request just to hear your thoughts on my latest comments (ie, no need for another full review yet)

@ellemouton ellemouton requested a review from guggero March 22, 2023 13:55
@ellemouton ellemouton force-pushed the wtclientDiskQueue branch 2 times, most recently from fc886e2 to 51379fd Compare March 27, 2023 08:34
Copy link
Collaborator

@bitromortac bitromortac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really important work 💪 Thank you for the detailed PR description and commit structure! I'm still trying to understand the exact buffering logic for the overflow queue, but left some comments and questions.

Copy link
Collaborator Author

@ellemouton ellemouton left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @bitromortac ! updated accordingly :)

@saubyk saubyk removed this from the v0.16.1 milestone Apr 12, 2023
@guggero
Copy link
Collaborator

guggero commented Apr 28, 2023

I also created this test case to make sure the start/stop works correctly under load, feel free to add:

	t.Run("start stop queue", func(t *testing.T) {
		t.Parallel()

		// Generate a lot of backup IDs that we want to add to the
		// queue one after the other.
		tasks := genBackupIDs(500_000)

		// Create a mock db that simulates the disk queue.
		db := wtmock.NewQueueDB[*wtdb.BackupID]()

		// New mock logger.
		log := newMockLogger(t.Logf)

		// Init the queue with the mock DB.
		q, err := NewDiskOverflowQueue[*wtdb.BackupID](
			db, DefaultMaxTasksInMemQueue, log,
		)
		require.NoError(t, err)

		// Start the queue.
		require.NoError(t, q.Start())

		// Initially there should be no items on disk.
		assertNumDisk(db, 0)

		// We need to guard the queue with a mutex since we will be
		// stopping, re-creating and starting the queue multiple times.
		var (
			queueMtx sync.RWMutex
			wg       sync.WaitGroup
			sendDone = make(chan struct{})
		)

		// This goroutine will constantly try to add new items to the
		// queue, even if the queue is stopped.
		wg.Add(1)
		go func() {
			defer wg.Done()
			defer close(sendDone)

			for idx := range tasks {
				queueMtx.RLock()
				err := q.QueueBackupID(tasks[idx])
				require.NoError(t, err)
				queueMtx.RUnlock()
			}
		}()

		// This goroutine will repeatedly stop, re-create and start the
		// queue until we're done sending items.
		wg.Add(1)
		go func() {
			defer wg.Done()

			numRestarts := 0
			for {
				select {
				case <-sendDone:
					t.Logf("Restarted queue %d times",
						numRestarts)

					return
				case <-time.After(50 * time.Millisecond):
				}

				queueMtx.Lock()
				require.NoError(t, q.Stop())
				q, err = NewDiskOverflowQueue[*wtdb.BackupID](
					db, DefaultMaxTasksInMemQueue, log,
				)
				require.NoError(t, err)
				require.NoError(t, q.Start())
				queueMtx.Unlock()

				numRestarts++
			}
		}()

		// We should be able to read all items from the queue, not being
		// affected by restarts, other than needing to wait for the
		// queue to be started again.
		results := make([]*wtdb.BackupID, 0, len(tasks))
		for i := 0; i < len(tasks); i++ {
			queueMtx.RLock()
			task := getNext(q, i)
			queueMtx.RUnlock()

			results = append(results, task)
		}
		require.Equal(t, tasks, results)

		require.NoError(t, q.Stop())
		wg.Wait()
	})

@ellemouton ellemouton force-pushed the wtclientDiskQueue branch 2 times, most recently from 49b1875 to d2de0a8 Compare May 1, 2023 12:07
@ellemouton
Copy link
Collaborator Author

Thanks for the reviews @bitromortac & @guggero 🚀 and for the awesome test addition @guggero 🙏

took most of your suggestions, there are two regarding channel closing & buffering that I have not though - will discuss with you offline tomorrow regarding these 👍

@lightninglabs-deploy
Copy link

@ellemouton, remember to re-request review from reviewers when ready

@ellemouton ellemouton force-pushed the wtclientDiskQueue branch 5 times, most recently from 8f9f8cf to 52efe08 Compare May 10, 2023 09:17
@ellemouton
Copy link
Collaborator Author

ellemouton commented May 10, 2023

Thanks for the very thorough initial review @guggero & @bitromortac 🎉 Also - I really appreciate the extra test added
by @guggero as this helped me realise where the performance can be improved and also helped find a bug in the bbolt queue impl! 🎉

I think this is now ready for the next pass. Here are a few changes to be aware of:

  1. The queue gets a namespace per session policy. In other words, if I have an anchors client and a non-anchors client, then there will be a queue per one of those clients.
  2. The disk queue now has a PopUpTo(n int) []T method instead of a Pop() T method. This greatly increases the performance of reading items from the DB when there is lots of capacity in the memQueue channel.
  3. The implementation of the bbolt queue is changes slightly: see the details in the PushHead impl.
  4. I have also added an extra commit to make the MaxInMemTasks number configurable.

I have performed the following test locally to ensure things work as expected:

  1. start a node with a small in-mem tasks size (4) And explicitly dont start a tower for the node to connect to.
  2. open a non-anchor and anchor channel for the node & perform a couple of payments on each channel. Enough to ensure that an overflow to disk happens.
  3. shut down the node & check the db to ensure that the 2 queues look as expected.
  4. restart the node and this time, connect to a tower.
  5. assert that all the updates have been made to the tower and the queues are now empty

@ellemouton ellemouton requested review from guggero and bitromortac May 10, 2023 09:26
Copy link
Collaborator

@guggero guggero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice, LGTM 🎉

@ellemouton ellemouton force-pushed the wtclientDiskQueue branch from 52efe08 to 9b0bce4 Compare May 12, 2023 13:39
Copy link
Collaborator

@bitromortac bitromortac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many nice async patterns in this PR and great comments. Awesome work
@ellemouton and @guggero, was cool to review this! LGTM 🎉

This commit adds a test to the wtclient. The test demonstrates that if a
client tries to back up states while it has no active sessions with a
server then those updates are accumlated in memory and lost on restart.
This will be fixed in upcoming commits.
This commit adds a new generic DiskQueueDB type which is an
bbolt implementation of the Queue interface.
This commit adds an in-memory implementation of the Queue interface.
This can be used for tests.
In this commit, a new generic DiskOverflowQueue implementation is added.
This allows a user to specify a maximum number of items that the queue
can hold in-memory. Any new items will then overflow to disk. The
producer and consumer of the queue items will interact with the queue
just like a normal in-memory queue.
Add a `MaxTasksInMemQueue` field to the `WtClient` config so that users
can change the default if they please.
@ellemouton ellemouton force-pushed the wtclientDiskQueue branch from 9b0bce4 to 250d764 Compare May 16, 2023 08:58
@guggero guggero merged commit bdb41e5 into lightningnetwork:master May 16, 2023
@ellemouton ellemouton deleted the wtclientDiskQueue branch May 16, 2023 10:31
@feelancer21
Copy link
Contributor

@ellemouton I am finalizing #7612 atm. I am wondering if the new option wtclient.max-tasks-in-mem-queue=2000 is a default value or only an example? It is not mentioned in lnd --help as default. But there seems to be a default value in the code

DefaultMaxTasksInMemQueue = 2000

@ellemouton
Copy link
Collaborator Author

good catch @feelancer21! opening a PR soon

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Watchtower updates are queued in memory when tower is offline (memory leak)
6 participants