Skip to content

Commit

Permalink
Merge pull request #10 from omec-project/dev-chunk-fix
Browse files Browse the repository at this point in the history
cursor.Decode() will not give _id value but stream returns also fields
  • Loading branch information
thakurajayL authored Nov 8, 2022
2 parents 731cfa4 + 374065b commit 80dd6da
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 21 deletions.
2 changes: 1 addition & 1 deletion drsm/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (d *Drsm) ReleaseInt32ID(id int32) error {
chunk, found := d.localChunkTbl[chunkId]
if found == true {
chunk.ReleaseIntID(id)
logger.AppLog.Debugf("ID Released: ", id)
logger.AppLog.Debugln("ID Released: ", id)
return nil
} else {
chunk, found := d.scanChunks[chunkId]
Expand Down
3 changes: 2 additions & 1 deletion drsm/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (d *Drsm) GetNewChunk() (*chunk, error) {
// Let's confirm if this gets updated in DB
docId := fmt.Sprintf("chunkid-%d", cn)
filter := bson.M{"_id": docId}
update := bson.M{"_id": docId, "type": "chunk", "podId": d.clientId.PodName, "podIp": d.clientId.PodIp}
update := bson.M{"_id": docId, "chunkId": docId, "type": "chunk", "podId": d.clientId.PodName, "podIp": d.clientId.PodIp}
inserted := d.mongo.RestfulAPIPostOnly(d.sharedPoolName, filter, update)
if inserted != true {
log.Printf("Adding chunk %v failed. Retry again ", cn)
Expand Down Expand Up @@ -97,6 +97,7 @@ func (c *chunk) ReleaseIntID(id int32) {
}

func getChunIdFromDocId(id string) int32 {
log.Printf("id received: %v value", id)
z := strings.Split(id, "-")
if len(z) == 2 && z[0] == "chunkid" {
cid, _ := strconv.ParseInt(z[1], 10, 32)
Expand Down
14 changes: 8 additions & 6 deletions drsm/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package drsm

import (
"fmt"

"github.com/omec-project/util/logger"
"go.mongodb.org/mongo-driver/bson"
)

Expand All @@ -13,12 +15,12 @@ func (d *Drsm) podDownDetected() {
for {
select {
case p := <-d.podDown:
fmt.Println("Pod Down detected ", p)
logger.AppLog.Infoln("Pod Down detected ", p)
// Given Pod find out current Chunks owned by this POD
pd := d.podMap[p]
for k, _ := range pd.podChunks {
c, found := d.globalChunkTbl[k]
fmt.Printf("Found : %v chunk : %v ", found, c)
logger.AppLog.Debugf("Found : %v chunk : %v ", found, c)
go c.claimChunk(d)
}
}
Expand All @@ -27,22 +29,22 @@ func (d *Drsm) podDownDetected() {

func (c *chunk) claimChunk(d *Drsm) {
if d.mode != ResourceClient {
fmt.Println("claimChunk ignored demux mode ")
logger.AppLog.Infof("claimChunk ignored demux mode ")
return
}
// try to claim. If success then notification will update owner.
fmt.Println("claimChunk started")
logger.AppLog.Debugf("claimChunk started")
docId := fmt.Sprintf("chunkid-%d", c.Id)
update := bson.M{"_id": docId, "type": "chunk", "podId": d.clientId.PodName, "podIp": d.clientId.PodIp}
filter := bson.M{"_id": docId, "podId": c.Owner.PodName}
updated := d.mongo.RestfulAPIPutOnly(d.sharedPoolName, filter, update)
if updated == nil {
// TODO : don't add to local pool yet. We can add it only if scan is done.
fmt.Println("claimChunk success ")
logger.AppLog.Debugf("claimChunk success")
c.Owner.PodName = d.clientId.PodName
c.Owner.PodIp = d.clientId.PodIp
go c.scanChunk(d)
} else {
fmt.Println("claimChunk failure ")
logger.AppLog.Debugf("claimChunk failure ")
}
}
11 changes: 5 additions & 6 deletions drsm/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,19 @@
package drsm

import (
"log"
"time"

"github.com/omec-project/util/logger"
)

func (c *chunk) scanChunk(d *Drsm) {
if d.mode == ResourceDemux {
logger.AppLog.Debugf("Don't perform scan task when demux mode is ON")
logger.AppLog.Infof("Don't perform scan task when demux mode is ON")
return
}

if c.Owner.PodName != d.clientId.PodName {
logger.AppLog.Debugf("Don't perform scan task if Chunk is not owned by us")
logger.AppLog.Infof("Don't perform scan task if Chunk is not owned by us")
return
}
c.State = Scanning
Expand All @@ -32,7 +31,7 @@ func (c *chunk) scanChunk(d *Drsm) {
for {
select {
case <-ticker.C:
log.Printf("Lets scan one by one id for %v , chunk details %v ", c.Id, c)
logger.AppLog.Debugf("Lets scan one by one id for %v , chunk details %v ", c.Id, c)
// TODO : find candidate and then scan that Id.
// once all Ids are scanned then we can start using this block
if c.resourceValidCb != nil {
Expand All @@ -51,13 +50,13 @@ func (c *chunk) scanChunk(d *Drsm) {
c.State = Owned
d.localChunkTbl[c.Id] = c
delete(d.scanChunks, c.Id)
log.Printf("Scan complete for Chunk %v", c.Id)
logger.AppLog.Debugf("Scan complete for Chunk %v", c.Id)
return
}
}
//no one is writing on stopScan for now. We will use it eventually
case <-c.stopScan:
log.Printf("Received Stop Scan. Closing scan for %v", c.Id)
logger.AppLog.Debugf("Received Stop Scan. Closing scan for %v", c.Id)
return
}
}
Expand Down
20 changes: 13 additions & 7 deletions drsm/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ type UpdatedDesc struct {
}

type FullStream struct {
Id string `bson:"_id,omitempty"`
Id string `bson:"_id"`
ChunkId string `bson:"chunkId"`
PodId string `bson:"podId,omitempty"`
PodIp string `bson:"podIp,omitempty"`
ExpireAt time.Time `bson:"expireAt,omitempty"`
Expand Down Expand Up @@ -160,7 +161,7 @@ func iterateChangeStream(d *Drsm, routineCtx context.Context, stream *mongo.Chan
cp.Owner.PodIp = s.Update.UpdFields.PodIp
podD := d.podMap[owner]
podD.podChunks[c] = cp // add chunk to pod
log.Printf("pod to chunk map %v ", podD.podChunks)
logger.AppLog.Infof("Stream(Update): pod to chunk map %v ", podD.podChunks)
}
case "delete":
logger.AppLog.Debugf("delete operations")
Expand All @@ -169,7 +170,7 @@ func iterateChangeStream(d *Drsm, routineCtx context.Context, stream *mongo.Chan
// delete olnly gets document id
pod, found := d.podMap[s.DId.Id]
if pod != nil {
log.Printf("Pod %v and found %v. Chunks owned by crashed pod = %v ", pod, found, pod.podChunks)
logger.AppLog.Infof("Stream(Delete): Pod %v and found %v. Chunks owned by crashed pod = %v ", pod, found, pod.podChunks)
d.podDown <- s.DId.Id
}
}
Expand Down Expand Up @@ -201,7 +202,7 @@ func (d *Drsm) punchLiveness() {

_, err := d.mongo.PutOneCustomDataStructure(d.sharedPoolName, filter, update)
if err != nil {
logger.AppLog.Debugf("put data failed : ", err)
logger.AppLog.Errorf("put data failed : ", err)
// TODO : should we panic ?
continue
}
Expand All @@ -219,12 +220,13 @@ func (d *Drsm) checkAllChunks() {
case <-ticker.C:
filter := bson.M{"type": "chunk"}
result, err := d.mongo.RestfulAPIGetMany(d.sharedPoolName, filter)
log.Printf("chunk entry: %v", result)
if err == nil && result != nil {
for _, v := range result {
var s FullStream
bsonBytes, _ := bson.Marshal(v)
bson.Unmarshal(bsonBytes, &s)
log.Printf("Individual bson Element %v ", s)
logger.AppLog.Infof("Individual Chunk bson Element %v ", s)
d.addChunk(&s)
}
}
Expand All @@ -240,6 +242,10 @@ func (d *Drsm) addChunk(full *FullStream) {
pod = d.addPod(full)
}
did := full.Id
if did == "" {
did = full.ChunkId
}
logger.AppLog.Infof("received Chunk Doc: %v", full)
cid := getChunIdFromDocId(did)
o := PodId{PodName: full.PodId, PodIp: full.PodIp}
c := &chunk{Id: cid, Owner: o}
Expand All @@ -248,14 +254,14 @@ func (d *Drsm) addChunk(full *FullStream) {
pod.podChunks[cid] = c
d.globalChunkTbl[cid] = c

log.Printf("Chunk id %v, pod.podChunks %v ", cid, pod.podChunks)
logger.AppLog.Infof("Chunk id %v, podChunks %v ", cid, pod.podChunks)
}

func (d *Drsm) addPod(full *FullStream) *podData {
podI := PodId{PodName: full.PodId, PodIp: full.PodIp}
pod := &podData{PodId: podI}
pod.podChunks = make(map[int32]*chunk)
d.podMap[full.PodId] = pod
log.Printf("Keepalive insert d.podMaps %+v", d.podMap)
logger.AppLog.Infof("Keepalive insert d.podMaps %+v", d.podMap)
return pod
}

0 comments on commit 80dd6da

Please sign in to comment.