Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add paramiko vs sshfs vs scp vs rclone comparisons for raw data transfer to the Readme #2

Open
isidentical opened this issue Jul 28, 2021 · 7 comments

Comments

@isidentical
Copy link
Member

No description provided.

@efiop efiop changed the title Add paramiko vs sshfs comparisons for raw data transfer to the Readme Add paramiko vs sshfs vs scp vs rclone comparisons for raw data transfer to the Readme Jul 28, 2021
@mxmlnkn
Copy link
Contributor

mxmlnkn commented Sep 19, 2024

The ReadMe mentions this implementation to be faster than paramiko. It would be nice to see proof of that.

This is may be related.

@shcheklein
Copy link
Collaborator

Comparison by @mxmlnkn:

import time
import fsspec.implementations.sftp
fs = fsspec.implementations.sftp.SFTPFileSystem("127.0.0.1")
for i in range(5):
    t0=time.time(); size=len(fs.open('silesia.tar.gz').read()); t1=time.time()
    print(f"Read {size} in {t1-t0:.2f} s -> {size/(t1-t0)/1e6:.2f} MB/s")
# Read 68238807 in 16.93 s -> 4.03 MB/s
# Read 68238807 in 16.74 s -> 4.08 MB/s
# Read 68238807 in 16.74 s -> 4.08 MB/s
# Read 68238807 in 16.75 s -> 4.07 MB/s
# Read 68238807 in 16.70 s -> 4.09 MB/s

import sshfs
fs = sshfs.SSHFileSystem("127.0.0.1")
for i in range(5):
    t0=time.time(); size=len(fs.open('silesia.tar.gz').read()); t1=time.time()
    print(f"Read {size} in {t1-t0:.2f} s -> {size/(t1-t0)/1e6:.2f} MB/s")
# Read 68238807 in 2.06 s -> 33.18 MB/s
# Read 68238807 in 2.07 s -> 32.99 MB/s
# Read 68238807 in 2.04 s -> 33.43 MB/s
# Read 68238807 in 2.01 s -> 33.93 MB/s
# Read 68238807 in 2.04 s -> 33.48 MB/s

@mxmlnkn
Copy link
Contributor

mxmlnkn commented Sep 23, 2024

I have expanded on the initial benchmarks to also include some "random" reading in chunks of different size, including the edge case of sequential reading, i.e., a single chunk / read of size "-1". The measurements are surprisingly stable, even when comparing with the results from 2 days ago reposted above.

benchmark-sshfs.py
import random
import pickle
import time

import fsspec.implementations.sftp
import sshfs

hostname = "127.0.0.1"
port = 22
file_path = 'silesia.tar.gz'

fsspec_fs = fsspec.implementations.sftp.SFTPFileSystem(hostname, port=port)
sshfs_fs = sshfs.SSHFileSystem(hostname, port=port)

file_size = len(sshfs_fs.open(file_path).read())
print(f"Test file size: {file_size} B")

# Benchmark random access with different granularity including the extreme case
# of rading a single chunk, i.e., sequential reading.
times = {}
for i in range(10):
    for chunk_size_in_KiB in [-1, 128 * 1024, 32 * 1024, 1024, 128, 32, 4]:
        chunk_size = chunk_size_in_KiB * 1024 if chunk_size_in_KiB > 0 else -1
        chunk_indices = list(range((file_size + chunk_size - 1) // chunk_size)) if chunk_size > 0 else [0]
        print(
            f"Try to read {len(chunk_indices)} chunk{'s' if len(chunk_indices) > 1 else ''} "
            f"each sized {chunk_size_in_KiB} KiB"
        )
        random.shuffle(chunk_indices)
        for open_file_name in ['fsspec_fs', 'sshfs_fs']:
            if open_file_name not in globals():
                continue
            file = globals()[open_file_name].open(file_path)

            t0=time.time()
            size = 0
            for i in chunk_indices:
                file.seek(i * chunk_size)
                size += len(file.read(chunk_size))
            assert size == file_size
            t1=time.time()

            if open_file_name not in times:
                times[open_file_name] = {}
            backend_times = times[open_file_name]
            if chunk_size not in backend_times:
                backend_times[chunk_size] = []
            backend_times[chunk_size].append(t1 - t0)
            print(
                f"Read {size / 1e6:.2f} MB in {chunk_size_in_KiB} KiB chunks with {open_file_name} "
                f"in {t1-t0:.2f} s -> {size/(t1-t0)/1e6:.2f} MB/s"
            )

        with open("benchmark-sshfs.times.pickle", 'wb') as file:
            pickle.dump(times, file)
    print()
plot-benchmark-sshfs.py
import pickle
import numpy as np
import matplotlib.pyplot as plt

file_size = 68238807 / 1e6

labels = {
    'fsspec_fs': 'fsspec.implementations.sftp.SFTPFileSystem',
    'sshfs_fs': 'fsspec/sshfs.SSHFileSystem',
}

def format_bytes(size):
    if size < 0:
        return str(size)

    # Format assuming that the value is an integer of KiB or MiB
    if size < 1024:
        return f"{size} B"

    size = size // 1024
    if size < 1024:
        return f"{size} KiB"

    size = size // 1024
    return f"{size} MiB"

with open("benchmark-sshfs.times.pickle", 'rb') as file:
    data = pickle.load(file)

def compute_statistics(t):
    return np.mean(t), np.std(t, ddof=1)

def compute_bandwidths(t):
    return {
        chunk_size: compute_statistics(file_size / np.array(times))
        for chunk_size, times in t.items()
    }

results = {
    label: compute_bandwidths(times_per_chunk_size)
    for label, times_per_chunk_size in data.items()
}
chunk_sizes = [np.sort(list(times_per_chunk_size.keys())) for label, times_per_chunk_size in data.items()]
assert all(np.all(chunk_sizes[0] == sizes) for sizes in chunk_sizes)
chunk_sizes = chunk_sizes[0]
chunk_sizes = np.concatenate([chunk_sizes[chunk_sizes >= 0], chunk_sizes[chunk_sizes < 0]])

fig = plt.figure(figsize=(6, 4))
ax = fig.add_subplot(111, xlabel = "Chunk Size", ylabel = "Bandwidth / (MB/s)", ylim=[0, 40])

width = 0.3
i_bar = 0
bar_positions = np.arange(len(chunk_sizes))
for label, stats_per_chunk_size in results.items():
    times_mean = [stats_per_chunk_size[size][0] for size in chunk_sizes]
    times_std = [stats_per_chunk_size[size][1] for size in chunk_sizes]
    ax.bar(
        bar_positions - width / 2 + i_bar * width, times_mean,
        yerr=times_std, width=width, label=labels[label], capsize=2
    )
    i_bar += 1

ax.set_xticks(bar_positions)
ax.set_xticklabels([format_bytes(size) for size in chunk_sizes])
ax.legend(loc='upper left')

fig.tight_layout()
fig.savefig("plot-benchmark-ssfs.png")
fig.savefig("plot-benchmark-ssfs.pdf")
plt.show()

plot-benchmark-ssfs

The degrading performance for larger chunk sizes is probably because the read call for the chunk at the end of the file reads a bit over the file end (66 MB file), which triggers ronf/asyncssh#691 .

@mxmlnkn
Copy link
Contributor

mxmlnkn commented Sep 25, 2024

I also did an extended comparison with other programs according to the issue title.

benchmark-sshfs-full-read-comparison.py
#! /usr/bin/env python3

# sudo apt install lftp rclone rsync ssh
# rclone config  # n (new remote), localssh, 27 (sftp), 127.0.0.1, key_file> ~/.ssh/id_ed25519
# leave blank for current user name, port, confirm the rest fo default choices until we can quit...
# cat ~/.config/rclone/rclone.conf
#   [localssh]
#   type = sftp
#   host = 127.0.0.1
#   port = 22
#   key_file = ~/.ssh/id_ed25519
#   md5sum_command = md5sum
#   sha1sum_command = sha1sum

import io
import os
import subprocess
import sys
import time

import fsspec
import fsspec.implementations.sftp
import paramiko
import sshfs

hostname = "127.0.0.1"
port = 22
test_file = '20xsilesia.tar.gz'
#test_file = 'silesia.tar.gz'
src_path = '/dev/shm/sftp_shared/' + test_file

client = paramiko.SSHClient()
client.load_system_host_keys()
client.connect(hostname, port=port)
sftp = client.open_sftp()

sshfs_fs = sshfs.SSHFileSystem(hostname, port=port)
fsspec_fs = fsspec.implementations.sftp.SFTPFileSystem(hostname, port=port)


def download(label, command, folder, csv_file):
    old_cwd = os.getcwd()
    os.makedirs(folder, exist_ok=True)
    os.chdir(folder)

    if os.path.isfile(test_file):
        os.remove(test_file)

    t0=time.time()
    subprocess.run(command)
    t1=time.time()

    size = os.stat(test_file).st_size
    if os.path.isfile(test_file):
        os.remove(test_file)
    os.chdir(old_cwd)

    csv_file.write(','.join([command[0], folder, str(size), str(t1 - t0)]).encode() + b'\n')
    csv_file.flush()
    print(f"[{label}] Read {size} in {t1-t0:.2f} s -> {size/(t1 - t0)/1e6:.2f} MB/s")


local_folder = '.'
memory_folder = '/dev/shm/sftp_downloads'

with open("full-read-timings.csv", 'wb') as csv_file:
    csv_file.write(b"# tool, target folder, size/B, time/s\n")

    for i in range(15):
        rclone_command = ['rclone', 'copy', f'localssh:{src_path}', '.']
        lftpget_command = ['lftpget', f"sftp://{hostname}:{port}{src_path}"]
        scp_command = ['scp', '-q', '-P', str(port), f'scp://{hostname}/{src_path}', '.']
        sftp_command = ['sftp', '-q', '-P', str(port), f'{hostname}:{src_path}', '.']
        rsync_command = ['rsync', '-e', f'ssh -p {port}', f'{hostname}:{src_path}', '.']

        download("rclone to memory", rclone_command, memory_folder, csv_file)
        download("lftpget to memory", lftpget_command, memory_folder, csv_file)
        download("scp to memory", scp_command, memory_folder, csv_file)
        download("sftp to memory", sftp_command, memory_folder, csv_file)
        download("rsync to memory", rsync_command, memory_folder, csv_file)

        download("rclone to SSD", rclone_command, local_folder, csv_file)
        download("lftpget to SSD", lftpget_command, local_folder, csv_file)
        download("scp to SSD", scp_command, local_folder, csv_file)
        download("sftp to SSD", sftp_command, local_folder, csv_file)
        download("rsync to SSD", rsync_command, local_folder, csv_file)

        t0=time.time(); size = sftp.getfo(src_path, io.BytesIO()); t1=time.time()
        csv_file.write(','.join(["Paramiko getfo", ":memory:", str(size), str(t1 - t0)]).encode() + b'\n')
        print(f"[Paramiko getfo] Read {size} in {t1-t0:.2f} s -> {size/(t1-t0)/1e6:.2f} MB/s")

        t0=time.time(); size=len(sshfs_fs.open(src_path).read()); t1=time.time()
        csv_file.write(','.join(["fsspec/sshfs", ":memory:", str(size), str(t1 - t0)]).encode() + b'\n')
        print(f"[fsspec/sshfs] Read {size} in {t1-t0:.2f} s -> {size/(t1-t0)/1e6:.2f} MB/s")

        # Too slow! We need to use a smaller file for this one.
        t0=time.time(); size=len(fsspec_fs.open('/dev/shm/sftp_shared/silesia.tar.gz').read()); t1=time.time()
        csv_file.write(
            ','.join(["fsspec.sftp", ":memory:", str(size), str(t1 - t0)]).encode() + b'\n'
        )
        print(f"[fsspec.sftp] Read {size} in {t1-t0:.2f} s -> {size/(t1-t0)/1e6:.2f} MB/s")
        print()
plot-sshfs-full-read-comparison.py
import numpy as np
import matplotlib.pyplot as plt


def get_magnitude(x):
    """
    Returns:
        x in [0.1, 1  ) -> -1
        x in [1  , 10 ) ->  0
        x in [10 , 100) -> +1
    """
    return int(np.floor(np.log10(np.abs(x))))


def test_uncertain_value_to_str():
    tests = {
        -1: [0.1, 0.2, 0.9],
        0: [1, 2, 9],
        1: [10, 20, 99],
    }
    for magnitude, values in tests.items():
        for value in values:
            result = get_magnitude(value)
            if result != magnitude:
                print(f"Test failed for {value}! Magnitude is {result} but should be {magnitude}")


test_uncertain_value_to_str()


def get_first_digit(x):
    mag = get_magnitude(x)
    return int(x / 10**mag)


def existing_digits(s):
    """
    Counts the number of significant digits. Examples:
    Returns:
        0.00013 -> 2
        1.3013  -> 5
        0.13    -> 2
        1000.3  -> 5
        1.20e-5 -> 3
    """
    s = str(s).split('e')[0].lstrip('+-0.')

    nDigits = 0
    for i in range(len(s)):
        if s[i] in '0123456789':
            nDigits += 1

    return nDigits


def round_to_significant(x, n):
    mag = get_magnitude(x)
    # numpy.around can also round to 10 or 100, ... by specifying negative values for the second argument
    return np.around(x, -mag + n - 1)


def round_stddev(sx):
    # Format exponent and error https://www.ncbi.nlm.nih.gov/pmc/articles/PMC4483789/
    # I can't find that particular standard right now, but I thought some standard specified
    # two significant digits for the first digit being < 3 and else one significant digits on the errors.
    # And of course the mean should have as much precision as the error has
    n_digits_err = 2 if get_first_digit(sx) in [1, 2] else 1
    mag_sx = get_magnitude(sx)
    sx_rounded = round_to_significant(sx, n_digits_err)
    if mag_sx + 1 >= n_digits_err:
        sx_short = str(int(sx_rounded))
    else:
        sx_short = str(sx_rounded)
        sx_short += '0' * max(0, n_digits_err - existing_digits(sx_short))

    return n_digits_err, mag_sx, sx_short


def test_round_stddev():
    tests = [
        (3.111, "3"),
        (31.11, "30"),
        (0.1234, "0.12"),
        (0.1434, "0.14"),
        (0.19, "0.19"),
        (0.3111, "0.3"),
        (1.234, "1.2"),
        (1.434, "1.4"),
        (1.9, "1.9"),
        (3.111, "3"),
        (12.34, "12"),
        (14.34, "14"),
        (19.0, "19"),
        (19, "19"),
        (31.11, "30"),
    ]
    for value, expected in tests:
        result = round_stddev(value)[2]
        if result != expected:
            print(f"Test failed for {value}! Got rounded to {result} but should be {expected}")


test_round_stddev()


def uncertain_value_to_str(x, sx):
    n_digits_err, mag_sx, sx_short = round_stddev(sx)
    # pad with 0s if necessary, showing the rounding, i.e., change "2.093 +- 0.02" (s = 0.0203...) to "2.093 +- 0.020"
    x_rounded = np.around(x, -mag_sx + n_digits_err - 1)
    if mag_sx + 1 >= n_digits_err:
        x_short = str(int(x_rounded))
    else:
        x_short = str(x_rounded)
        x_short += '0' * max(0, n_digits_err - existing_digits(x_short))
    return x_short, sx_short


def test_uncertain_value_to_str():
    tests = [
        [0.9183782255, 0.00081245, "0.9184", "0.0008"],
        [12.5435892, 1.0234, "12.5", "1.0"],
        [12.0123, 1.0234, "12.0", "1.0"],
        [141, 15, "141", "15"],
        [1.25435892, 0.10234, "1.25", "0.10"],
        [19235198, 310, "19235200", "300"],
        [52349e-15, 4.25315e-12, "5.2e-11", "4e-12"],
        [138.1, 13, "138", "13"],
    ]
    for test in tests:
        sm, ss = uncertain_value_to_str(test[0], test[1])
        if sm != test[2] or ss != test[3]:
            print(f"Test failed! m={test[0]}, s={test[1]} => {sm} +- {ss} but should be {test[2]} +- {test[3]}")
            assert False


test_uncertain_value_to_str()


data = {}
with open("full-read-timings-2024-09-25.csv", 'rt') as file:
    for line in file:
        if line.startswith('#'):
            continue
        parts = line.strip().split(',')
        if parts[1] == ".":  # Ignore SSD benchmarks
            continue

        assert len(parts) == 4

        if parts[0] not in data:
            data[parts[0]] = []
        data[parts[0]].append(float(parts[2]) / float(parts[3]) / 1e6)


def compute_statistics(t):
    return np.mean(t), np.std(t, ddof=1)


results = {label: compute_statistics(np.array(bandwidths)) for label, bandwidths in data.items()}
bar_labels = sorted(list(results.keys()), key=lambda x: results[x][0])

bar_positions = np.arange(len(bar_labels))
bar_values = [results[x][0] for x in bar_labels]
bar_errors = [results[x][1] for x in bar_labels]

fig = plt.figure(figsize=(6, 4))
ax = fig.add_subplot(111, xlabel="Read Bandwidth / (MB/s)")

ax.barh(bar_positions, bar_values, xerr=bar_errors, capsize=3)
ax.set_yticks(bar_positions)
ax.set_yticklabels(bar_labels)

for position, value, stddev in zip(bar_positions, bar_values, bar_errors):
    if value < 200:
        x, sx = uncertain_value_to_str(value, stddev)
        plt.text(value + stddev + 10, position, f"({x} ± {sx}) MB/s", ha='left', va='center')

fig.tight_layout()
fig.savefig("plot-sshfs-full-read-comparison.png", dpi=300)
fig.savefig("plot-sshfs-full-read-comparison.pdf", dpi=300)
plt.show()

plot-sshfs-full-read-comparison

The same plot could also be done for write / upload speeds. And I forgot to include the original sshfs FUSE tool...

@shcheklein
Copy link
Collaborator

Nice! Is it CPU bounded in this case (bc of Python overhead for example)? or some configuration (just curious why it can be ~10x slower vs rclone/scp

@mxmlnkn
Copy link
Contributor

mxmlnkn commented Sep 25, 2024

I tried to analyze the performance a bit in this post: ronf/asyncssh#691 (comment)

@mxmlnkn
Copy link
Contributor

mxmlnkn commented Sep 28, 2024

I have added sshfs to the benchmarks:

Edit 2024-09-30: Turns out that asyncssh is the only one out of these 9 alternatives that enables compression by default if nothing is specified. After explicitly disabling compression, the performance is finally comparable to other tools. Furthermore, ayncssh was extended to query the best block size from the server and use that as default. This further improves the performance.

full-read timings

I have also done write benchmarks, although I dropped lftpget and sftp because I could not be bothered to rewrite the command lines to work for uploads:

full-write timings

rclone is surprisingly bad at uploads, but aside from that there are the same hilarious one to two orders of magnitude performance differences between the worst and best tools.

I'd say these benchmarks should be enough for this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants