Skip to content

Commit

Permalink
journal: implementation for listOmapVals
Browse files Browse the repository at this point in the history
Added a implementation for the listOmapVals
which list the object keys and values from
the rados omap.

Signed-off-by: Madhu Rajanna <[email protected]>
  • Loading branch information
Madhu-1 authored and mergify[bot] committed Feb 9, 2024
1 parent 141928d commit 9e08a67
Showing 1 changed file with 56 additions and 1 deletion.
57 changes: 56 additions & 1 deletion internal/journal/omap.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package journal
import (
"context"
"errors"
"fmt"

"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
Expand Down Expand Up @@ -79,7 +80,7 @@ func getOMapValues(
log.ErrorLog(ctx, "omap not found (pool=%q, namespace=%q, name=%q): %v",
poolName, namespace, oid, err)

return nil, util.JoinErrors(util.ErrKeyNotFound, err)
return nil, fmt.Errorf("%w: %w", util.ErrKeyNotFound, err)
}

return nil, err
Expand Down Expand Up @@ -168,3 +169,57 @@ func omapPoolError(err error) error {

return err
}

// listOMapValues fetches all omap values for a given oid, prefix, and namespace.
func listOMapValues(
ctx context.Context,
conn *Connection,
poolName, namespace, oid, prefix string,
) (map[string]string, error) {
// fetch and configure the rados ioctx
ioctx, err := conn.conn.GetIoctx(poolName)
if err != nil {
return nil, omapPoolError(err)
}
defer ioctx.Destroy()

if namespace != "" {
ioctx.SetNamespace(namespace)
}

results := map[string]string{}

numKeys := uint64(0)
startAfter := ""
for {
prevNumKeys := numKeys
err = ioctx.ListOmapValues(
oid, startAfter, prefix, chunkSize,
func(key string, value []byte) {
numKeys++
startAfter = key
results[key] = string(value)
},
)
// if we hit an error, or no new keys were seen, exit the loop
if err != nil || numKeys == prevNumKeys {
break
}
}

if err != nil {
if errors.Is(err, rados.ErrNotFound) {
log.ErrorLog(ctx, "omap not found (pool=%q, namespace=%q, name=%q): %v",
poolName, namespace, oid, err)

return nil, fmt.Errorf("%w: %w", util.ErrKeyNotFound, err)
}

return nil, err
}

log.DebugLog(ctx, "got omap values: (pool=%q, namespace=%q, name=%q): %+v",
poolName, namespace, oid, results)

return results, nil
}

0 comments on commit 9e08a67

Please sign in to comment.