Thread safety issue in WebDAV syncing?

What is the problem you are having with rclone?

I've integrated rclone into my own app, for syncing files from a local computer to a WebDAV service on another computer in the same local network.

The sync is per file based (so I have more control on the sync'ing process and stats collecting). All is good if the syncs are done sequentially.

However it is too slow due to the handshake cost. So I decied to create a goroutine pool, and run the syncs concurrently (throttled at 4 goroutines)

Then the problem comes - the code is keep getting errors like

2023/11/24 10:52:45 ERROR : instrumenter.js: corrupted on transfer: sizes differ 1704 vs 0
2023/11/24 10:52:45 ERROR: failed to sync file resources\app\node_modules\better-ajv-errors\node_modules\core-js\web\dom-collections.js to device: corrupted on transfer: sizes differ 1704 vs 0

It looks like a thread safety issue somewhere in the rclone code. If anyone can give some guidance, I am happy to dig deeper.

Run the command 'rclone version' and share the full output of the command.

Copied from my go.mod:

module myapp

go 1.20

require (
	...
	github.com/rclone/rclone v1.61.1
	...
)

Which cloud storage system are you using? (eg Google Drive)

WebDAV

The command you were trying to run (eg rclone copy /tmp remote:tmp)

	...
	// issue happens when `transferThrottling` is larger than 1
	transferPool := pool.New().WithContext(ctx).WithMaxGoroutines(transferThrottling)

	remote := rclone.NewRemote(ctx, "webdav", fmt.Sprintf("http://%s:%s", DeviceHost, *backup.TransferPort))

transfer:
	for clientFile, backupFile := range *fileTransferList {
		select {
		case <-ctx.Done():
			break transfer

		default:
			transferPool.Go(
				func(ctx context.Context) error {
					if err := remote.SyncFile(ctx, filepath.Join(folderPath, clientFile), backupFile); err != nil {
						message := fmt.Sprintf("failed to sync file %s to device: %v", clientFile, err)
						logger.Error(message)
						go messageBus.PublishEventWrapper(ctx, EventTypeRunBackupError, map[string]string{PropertyTypeMessage.Name: message})
						return err
					}

					transferRemainingCount--
					eventProperties.Store(PropertyTypeRemainingCount.Name, fmt.Sprintf("%d", transferRemainingCount))

					return nil
				},
			)
		}
	}

	return transferPool.Wait()

and here is how I construct Remote:

import (
	"context"
	"fmt"
	"path"
	"path/filepath"

	"github.com/rclone/rclone/fs"
	"github.com/rclone/rclone/fs/config"
	"github.com/rclone/rclone/fs/operations"
	"github.com/rclone/rclone/fs/rc"
	"github.com/rclone/rclone/fs/sync"

	_ "github.com/rclone/rclone/backend/local"  // add local backend
	_ "github.com/rclone/rclone/backend/webdav" // add webdav backend
)

type Remote struct {
	name   string
	config *fs.ConfigOut
}

func NewRemote(ctx context.Context, remoteType, remoteURL string) *Remote {
	kv := rc.Params{
		"url": remoteURL,
	}

	opts := config.UpdateRemoteOpt{
		NonInteractive: true,
		Continue:       false,
	}

	configOut, err := config.CreateRemote(ctx, remoteType, remoteType, kv, opts)
	if err != nil {
		panic(err)
	}

	return &Remote{
		name:   remoteType,
		config: configOut,
	}
}

func (r *Remote) SyncFile(ctx context.Context, srcFilePath, dstFilePath string) error {
	fsrc, err := fs.NewFs(ctx, filepath.Dir(srcFilePath))
	if err != nil {
		return err
	}

	fdst, err := fs.NewFs(ctx, fmt.Sprintf("%s:%s", r.name, path.Dir(dstFilePath)))
	if err != nil {
		return err
	}

	config := fs.GetConfig(ctx)

	// dry run first
	config.DryRun = true
	if err := operations.CopyFile(ctx, fdst, fsrc, path.Base(dstFilePath), filepath.Base(srcFilePath)); err != nil {
		return err
	}

	// then run for real
	config.DryRun = false
	return operations.CopyFile(ctx, fdst, fsrc, path.Base(dstFilePath), filepath.Base(srcFilePath))
}

The rclone config contents with secrets removed.

	// configure rclone
	{
		globalConfig := fs.GetConfig(context.TODO())
		globalConfig.LogLevel = fs.LogLevelWarning // rclone logs everything to stderr, thus we need to set this to warning to avoid pollution.

		localConfig := fs.GetConfig(ctx)          // if ctx doesn't have any config, this is equivalent to globalConfig
		localConfig.LogLevel = fs.LogLevelWarning // still setting this, just in case someone adds a local config to ctx (who knows?)
	}

A log from the command with the -vv flag

2023/11/24 10:52:45 ERROR : instrumenter.js: corrupted on transfer: sizes differ 1704 vs 0
2023/11/24 10:52:45 ERROR: failed to sync file resources\app\node_modules\better-ajv-errors\node_modules\core-js\web\dom-collections.js to device: corrupted on transfer: sizes differ 1704 vs 0
2023/11/24 10:52:47 ERROR : index.js: corrupted on transfer: sizes differ 4618 vs 0
2023/11/24 10:52:47 ERROR: failed to sync file resources\app\node_modules\@azure\core-client\dist-esm\src\interfaceHelpers.js to device: corrupted on transfer: sizes differ 4618 vs 0
2023/11/24 10:52:47 ERROR : index.js: Failed to copy: object not found
2023/11/24 10:52:47 ERROR: failed to sync file resources\app\node_modules\better-ajv-errors\node_modules\core-js\internals\get-set-iterator.js to device: object not found

When this happens when using the rclone binary it usually means the source file changed while being transferred. Is that possible?

I would be surprised if there was a thread safety problem as rclone is highly concurrent and is tested with the race detector, however it is easy to check with go - just build with the race detector and you'll find out.

Note that rclone wasn't designed to be used as a library like this.

The recommended way of using it is to use librclone which will give you an in memory version of rclone's API.

I note that you are building new Fs objects on each call - this is very inefficient - you should be caching them and re-using them. Rclone will do this for you if you use the API/librclone.

Does this librclone rely on a separate rclone service? or does it include the rclone logics per se?

No, librclone is included into your code. It is an in memory interface to the rclone API - no external service required.

Just to be super clear - by "no external service required" you mean I don't even need a rclone service running aside from my code, correct? That is, everything including actual syncing logic, is inclusive in the librclone, correct?

That is correct.

I'm looking at the example of librclone:

func main() {
  librclone.Initialize()
    syncRequest: = syncRequest {
    SrcFs: "<absolute_path>",
    DstFs: ":s3,env_auth=false,access_key_id=<access>,secret_access_key=<secret>,endpoint='<endpoint>':<bucket>",
    }

    syncRequestJSON, err: = json.Marshal(syncRequest)
    if err != nil {
    fmt.Println(err)
    }
		
    out, status: = librclone.RPC("sync/copy", string(syncRequestJSON))
    fmt.Println("Got status : %d and output %q", status, out)
}

I tried to look for the struct syncRequest but couldn't find it. I searched the repo but didn't find anything either. Can you help?

Rclone doesn't have structs defined for these. It probably should but it doesn't. I suggest you just make a map[string]string and use that.

OK, will give that a try.

How does my code receive real-time stats like speeds and counts? Is that possible with librclone?

Sure, just call core/stats.

You can give your jobs a group id and then get stats just for that group if you want also.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.