Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor/fix state sync logs #515

Merged
merged 20 commits into from
Feb 17, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions app/pocket/doc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.0.0.5] - 2023-02-15

- Removed unnecessary server mode enabling call via EnableServerMode() function

## [0.0.0.4] - 2023-02-07

- Added GITHUB_WIKI tags where it was missing
Expand Down
1 change: 0 additions & 1 deletion app/pocket/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ func main() {
if err != nil {
logger.Global.Fatal().Err(err).Msg("Failed to create pocket node")
}
pocketNode.GetBus().GetConsensusModule().EnableServerMode()

if err = pocketNode.Start(); err != nil {
logger.Global.Fatal().Err(err).Msg("Failed to start pocket node")
Expand Down
8 changes: 4 additions & 4 deletions consensus/debugging.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ func (m *consensusModule) sendGetBlockStateSyncMessage(_ *messaging.DebugMessage

for _, val := range validators {
valAddress := cryptoPocket.AddressFromString(val.GetAddress())
if err := m.stateSync.SendStateSyncMessage(stateSyncGetBlockMessage, valAddress, requestHeight); err != nil {
m.logger.Debug().Msgf(typesCons.SendingStateSyncMessage(valAddress, requestHeight), err)
if err := m.stateSync.SendStateSyncMessage(stateSyncGetBlockMessage, typesCons.StateSyncGetBlockRequest, valAddress, requestHeight); err != nil {
m.logger.Error().Err(err).Msg("failed to send get block request state sync message")
}
}
}
Expand All @@ -150,8 +150,8 @@ func (m *consensusModule) sendGetMetadataStateSyncMessage(_ *messaging.DebugMess

for _, val := range validators {
valAddress := cryptoPocket.AddressFromString(val.GetAddress())
if err := m.stateSync.SendStateSyncMessage(stateSyncMetaDataReqMessage, valAddress, requestHeight); err != nil {
m.logger.Debug().Msgf(typesCons.SendingStateSyncMessage(valAddress, requestHeight), err)
if err := m.stateSync.SendStateSyncMessage(stateSyncMetaDataReqMessage, typesCons.StateSyncGetMetadataRequest, valAddress, requestHeight); err != nil {
m.logger.Error().Err(err).Msg("failed to send get metadata request state sync message")
}
}

Expand Down
5 changes: 5 additions & 0 deletions consensus/doc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.0.0.29] - 2023-02-15

- Updated log messages in the state sync submodule with consistent style and add height information
- Added state sync message types to the types package

## [0.0.0.28] - 2023-02-14

- Add a few `nolint` comments to fix the code on main
Expand Down
4 changes: 2 additions & 2 deletions consensus/state_sync/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"google.golang.org/protobuf/types/known/anypb"
)

func (m *stateSync) SendStateSyncMessage(stateSyncMsg *typesCons.StateSyncMessage, peerId cryptoPocket.Address, blockHeight uint64) error {
func (m *stateSync) SendStateSyncMessage(stateSyncMsg *typesCons.StateSyncMessage, msgType string, peerId cryptoPocket.Address, blockHeight uint64) error {
anyMsg, err := anypb.New(stateSyncMsg)
if err != nil {
return err
}
m.logger.Info().Uint64("height", blockHeight).Msg(typesCons.SendingStateSyncMessage(peerId, blockHeight))
m.logger.Info().Uint64("height", blockHeight).Msg(typesCons.SendingStateSyncMessage(peerId, msgType, blockHeight))
return m.sendToPeer(anyMsg, peerId)
}

Expand Down
18 changes: 15 additions & 3 deletions consensus/state_sync/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type StateSyncModule interface {
IsServerModEnabled() bool
EnableServerMode()

SendStateSyncMessage(*typesCons.StateSyncMessage, cryptoPocket.Address, uint64) error
SendStateSyncMessage(*typesCons.StateSyncMessage, string, cryptoPocket.Address, uint64) error
}

var (
Expand Down Expand Up @@ -70,6 +70,7 @@ func (*stateSync) Create(bus modules.Bus) (modules.Module, error) {
}

// when node is starting, it is in sync mode, as it might need to bootstrap to the latest state
// TODO: consider setting this thorugh event sent by state machine for consistency
m.currentMode = Sync
m.serverMode = false

Expand Down Expand Up @@ -117,13 +118,24 @@ func (m *stateSync) EnableServerMode() {
// TODO(#352): Implement this function
// Placeholder function
func (m *stateSync) HandleGetBlockResponse(blockRes *typesCons.GetBlockResponse) error {
m.logger.Debug().Msgf("Received get block response: %s", blockRes.Block.String())
consensusMod := m.GetBus().GetConsensusModule()
serverNodePeerId := consensusMod.GetNodeAddress()
clientPeerId := blockRes.PeerAddress

m.logger.Info().Msgf("%s received get block response from: %s, for height %d. Received block's header is: %s, \n", serverNodePeerId, clientPeerId, blockRes.Block.BlockHeader.Height, blockRes.Block.BlockHeader)

return nil
}

// TODO(#352): Implement the business to handle these correctly
// Placeholder function
func (m *stateSync) HandleStateSyncMetadataResponse(metaDataRes *typesCons.StateSyncMetadataResponse) error {
m.logger.Debug().Msgf("Received get metadata response: %s", metaDataRes.String())
consensusMod := m.GetBus().GetConsensusModule()
serverNodePeerId := consensusMod.GetNodeAddress()
clientPeerId := metaDataRes.PeerAddress
currentHeight := consensusMod.CurrentHeight()

m.logger.Info().Msgf("%s received get metadata response from: %s, current height is %d. Received metadata is: %s \n", serverNodePeerId, clientPeerId, currentHeight, metaDataRes)

return nil
}
27 changes: 15 additions & 12 deletions consensus/state_sync/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ type StateSyncServerModule interface {

func (m *stateSync) HandleStateSyncMetadataRequest(metadataReq *typesCons.StateSyncMetadataRequest) error {
consensusMod := m.GetBus().GetConsensusModule()
serverNodePeerId := m.GetBus().GetConsensusModule().GetNodeAddress()

serverNodePeerId := consensusMod.GetNodeAddress()
currentHeight := consensusMod.CurrentHeight()
clientPeerAddress := metadataReq.PeerAddress
m.logger.Info().Msgf("%s received state sync metadata request from: %s", serverNodePeerId, clientPeerAddress)

// last finalized block
persistenceContext, err := m.GetBus().GetPersistenceModule().NewReadContext(int64(consensusMod.CurrentHeight()) - 1)
m.logger.Info().Msgf("%s received state sync metadata request from: %s, at height: %d \n", serverNodePeerId, clientPeerAddress, currentHeight)

// current height is the height of the block that is being processed, so we need to subtract 1 for the last finalized block
lastPersistedBlockHeight := currentHeight - 1
persistenceContext, err := m.GetBus().GetPersistenceModule().NewReadContext(int64(lastPersistedBlockHeight))
if err != nil {
return nil
}
Expand All @@ -54,20 +56,21 @@ func (m *stateSync) HandleStateSyncMetadataRequest(metadataReq *typesCons.StateS
},
}

return m.SendStateSyncMessage(&stateSyncMessage, cryptoPocket.AddressFromString(clientPeerAddress), m.bus.GetConsensusModule().CurrentHeight())
return m.SendStateSyncMessage(&stateSyncMessage, typesCons.StateSyncGetMetadataRequest, cryptoPocket.AddressFromString(clientPeerAddress), m.bus.GetConsensusModule().CurrentHeight())
}

func (m *stateSync) HandleGetBlockRequest(blockReq *typesCons.GetBlockRequest) error {
consensusMod := m.GetBus().GetConsensusModule()
serverNodePeerAddress := consensusMod.GetNodeAddress()

clientPeerAddress := blockReq.PeerAddress
m.logger.Info().Msgf("%s received state sync Get Block Req from: %s", serverNodePeerAddress, clientPeerAddress)
currentHeight := consensusMod.CurrentHeight()

currentHeight := m.GetBus().GetConsensusModule().CurrentHeight()
m.logger.Info().Msgf("%s received state sync get block request from: %s, at height: %d \n", serverNodePeerAddress, clientPeerAddress, currentHeight)

if currentHeight < blockReq.Height {
return fmt.Errorf("requested block height: %d is higher than node's block height: %d", blockReq.Height, consensusMod.CurrentHeight())
// current height is the height of the block that is being processed, so we need to subtract 1 for the last finalized block
lastPersistedBlockHeight := currentHeight - 1
if lastPersistedBlockHeight < blockReq.Height {
return fmt.Errorf("requested block height: %d is higher than current persisted block height: %d", blockReq.Height, lastPersistedBlockHeight)
}

// get block from the persistence module
Expand All @@ -85,7 +88,7 @@ func (m *stateSync) HandleGetBlockRequest(blockReq *typesCons.GetBlockRequest) e
},
}

return m.SendStateSyncMessage(&stateSyncMessage, cryptoPocket.AddressFromString(clientPeerAddress), blockReq.Height)
return m.SendStateSyncMessage(&stateSyncMessage, typesCons.StateSyncGetBlockRequest, cryptoPocket.AddressFromString(clientPeerAddress), blockReq.Height)
}

// Get a block from persistence module given block height
Expand Down
4 changes: 2 additions & 2 deletions consensus/types/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ func SendingMessage(msg *HotstuffMessage, nodeId NodeId) string {
return fmt.Sprintf("✉️ Sending message ✉️ to %d at (height, step, round) (%d, %d, %d)", nodeId, msg.Height, msg.Step, msg.Round)
}

func SendingStateSyncMessage(nodeId cryptoPocket.Address, height uint64) string {
return fmt.Sprintf("🔄 Sending State sync message ✉️ to node %s at height: (%d) 🔄", nodeId, height)
func SendingStateSyncMessage(nodeId cryptoPocket.Address, msgType string, height uint64) string {
return fmt.Sprintf("🔄 Sending %s state sync message ✉️ to node %s at height: (%d) 🔄", msgType, nodeId, height)
}

func BroadcastingMessage(msg *HotstuffMessage) string {
Expand Down
7 changes: 7 additions & 0 deletions consensus/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ import (
coreTypes "github.com/pokt-network/pocket/shared/core/types"
)

const (
StateSyncGetBlockRequest = "get block request"
StateSyncGetMetadataRequest = "get metadata request"
StateSyncGetBlockResponse = "get block response"
StateSyncGetMetadataResponse = "get metadata response"
)

type NodeId uint64

type (
Expand Down