[PATCH] Add support for zstd compression

Sebastian Andrzej Siewior rsync at ml.breakpoint.cc
Thu Feb 6 23:23:29 UTC 2020


From: Sebastian Andrzej Siewior <sebastian at breakpoint.cc>

zstd compression was announced as "good compression with high
throughput" so I gave it a try. With zlib, on high speed links the CPU
is usually the bottle neck. With zstd I'm able to fill a 200Mbit link :)

zstd detection happens automatically via pkg-config. No zstd header means
no error about missing zstd. So that should be okay. However, pkg-config
is now kind of required…

I duplicated the zlib code and replaced it with zstd hooks once I
understood what was going on. I made a few local tests with and without
tokens and it seems to work. The compression can be selected with `-Z'
option. By default `0' is used as the compression level which is a
special default (it currently maps to 3). The compression level can be
specified by the same option as for zlib.
The compressor feeds data into zstd and starts sending data once the
outgoing buffer is full or when a flush is requested. That flush will
close the current compression block and create a new one for the
following transfer (saving the internal compression / history state).
The decompressor allocates space for two blocks. Should one block
contain more data, then it will loop more often.

Signed-off-by: Sebastian Andrzej Siewior <sebastian at breakpoint.cc>
---
 Makefile.in  |   4 +-
 batch.c      |   1 +
 configure.ac |  18 ++++
 options.c    |  54 ++++++++++-
 rsync.yo     |  10 ++-
 token.c      | 248 ++++++++++++++++++++++++++++++++++++++++++++++++++-
 6 files changed, 329 insertions(+), 6 deletions(-)

diff --git a/Makefile.in b/Makefile.in
index f912f312ce89a..9bb977eb6b0a8 100644
--- a/Makefile.in
+++ b/Makefile.in
@@ -10,7 +10,7 @@ mandir=@mandir@
 
 LIBS=@LIBS@
 CC=@CC@
-CFLAGS=@CFLAGS@
+CFLAGS=@CFLAGS@ @LIBZSTD_CFLAGS@
 CPPFLAGS=@CPPFLAGS@
 EXEEXT=@EXEEXT@
 LDFLAGS=@LDFLAGS@
@@ -91,7 +91,7 @@ install-all: install install-ssl-client install-ssl-daemon
 	$(MAKE) INSTALL_STRIP='-s' install
 
 rsync$(EXEEXT): $(OBJS)
-	$(CC) $(CFLAGS) $(LDFLAGS) -o $@ $(OBJS) $(LIBS)
+	$(CC) $(CFLAGS) $(LDFLAGS) -o $@ $(OBJS) $(LIBS) @LIBZSTD_LIBS@
 
 $(OBJS): $(HEADERS)
 $(CHECK_OBJS): $(HEADERS)
diff --git a/batch.c b/batch.c
index 263a9a357d21b..764fd3ebc5acb 100644
--- a/batch.c
+++ b/batch.c
@@ -80,6 +80,7 @@ static char *flag_name[] = {
 	"--checksum (-c)",
 	"--dirs (-d)",
 	"--compress (-z)",
+	"--zstd-compress (-Z)",
 	"--iconv",
 	"--acls (-A)",
 	"--xattrs (-X)",
diff --git a/configure.ac b/configure.ac
index 4f68e98a9e1d1..12db93570d78a 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1038,6 +1038,24 @@ else
     esac
 fi
 
+AC_ARG_WITH([libzstd], AS_HELP_STRING([--without-libzstd], [Build without libzstd (Default: with if possible)]))
+
+AS_IF([test "x$with_libzstd" != "xno"],
+	[want_libzstd=yes],
+	[want_libzstd=no])
+
+AS_IF([test "x$want_libzstd" = "xyes"],
+	[
+		PKG_CHECK_MODULES([LIBZSTD], [libzstd >= 1.3.8], [AC_DEFINE([HAVE_LIBZSTD], [1], [Use LIBZSTD])])
+	],
+	[AS_IF([test "x$with_libzstd" = "xyes"],
+	       [AC_MSG_ERROR([libzstd requested but not found])
+		])
+	])
+
+AC_SUBST([LIBZSTD_CFLAGS])
+AC_SUBST([LIBZSTD_LIBS])
+
 #################################################
 # check for extended attribute support
 AC_MSG_CHECKING(whether to support extended attributes)
diff --git a/options.c b/options.c
index e5b0cb68280ed..07e3f1e5d0ac1 100644
--- a/options.c
+++ b/options.c
@@ -23,6 +23,9 @@
 #include "itypes.h"
 #include <popt.h>
 #include <zlib.h>
+#ifdef HAVE_LIBZSTD
+#include <zstd.h>
+#endif
 
 extern int module_id;
 extern int local_server;
@@ -77,6 +80,7 @@ int protocol_version = PROTOCOL_VERSION;
 int sparse_files = 0;
 int preallocate_files = 0;
 int do_compression = 0;
+int do_compression_zstd = 0;
 int def_compress_level = NOT_SPECIFIED;
 int am_root = 0; /* 0 = normal, 1 = root, 2 = --super, -1 = --fake-super */
 int am_server = 0;
@@ -764,6 +768,9 @@ void usage(enum logcode F)
   rprintf(F,"     --copy-dest=DIR         ... and include copies of unchanged files\n");
   rprintf(F,"     --link-dest=DIR         hardlink to files in DIR when unchanged\n");
   rprintf(F," -z, --compress              compress file data during the transfer\n");
+#ifdef HAVE_LIBZSTD
+  rprintf(F," -Z, --zstd-compress         compress file data during the transfer with zstd\n");
+#endif
   rprintf(F,"     --compress-level=NUM    explicitly set compression level\n");
   rprintf(F,"     --skip-compress=LIST    skip compressing files with a suffix in LIST\n");
   rprintf(F," -C, --cvs-exclude           auto-ignore files the same way CVS does\n");
@@ -968,6 +975,9 @@ static struct poptOption long_options[] = {
   {"no-fuzzy",         0,  POPT_ARG_VAL,    &fuzzy_basis, 0, 0, 0 },
   {"no-y",             0,  POPT_ARG_VAL,    &fuzzy_basis, 0, 0, 0 },
   {"compress",        'z', POPT_ARG_NONE,   0, 'z', 0, 0 },
+#ifdef HAVE_LIBZSTD
+  {"zstd-compress",   'Z', POPT_ARG_NONE,   0, 'Z', 0, 0 },
+#endif
   {"old-compress",     0,  POPT_ARG_VAL,    &do_compression, 1, 0, 0 },
   {"new-compress",     0,  POPT_ARG_VAL,    &do_compression, 2, 0, 0 },
   {"no-compress",      0,  POPT_ARG_VAL,    &do_compression, 0, 0, 0 },
@@ -1163,6 +1173,7 @@ static void set_refuse_options(char *bp)
 					refused_archive_part = op->val;
 					break;
 				case 'z':
+				case 'Z':
 					refused_compress = op->val;
 					break;
 				case '\0':
@@ -1575,6 +1586,16 @@ int parse_arguments(int *argc_p, const char ***argv_p)
 			do_compression++;
 			break;
 
+		case 'Z':
+#ifdef HAVE_LIBZSTD
+			do_compression_zstd++;
+#else
+			snprintf(err_buf, sizeof err_buf, 
+				 "Support for zstd is not enabled.\n");
+			return 0;
+#endif
+			break;
+
 		case 'M':
 			arg = poptGetOptArg(pc);
 			if (*arg != '-') {
@@ -1856,7 +1877,12 @@ int parse_arguments(int *argc_p, const char ***argv_p)
 		exit_cleanup(0);
 	}
 
-	if (do_compression || def_compress_level != NOT_SPECIFIED) {
+	if (do_compression && do_compression_zstd) {
+		snprintf(err_buf, sizeof(err_buf), "Can't mix --compress and zstd\n");
+		return 0;
+	}
+
+	if (do_compression || (def_compress_level != NOT_SPECIFIED && !do_compression_zstd)) {
 		if (def_compress_level == NOT_SPECIFIED)
 			def_compress_level = Z_DEFAULT_COMPRESSION;
 		else if (def_compress_level < Z_DEFAULT_COMPRESSION || def_compress_level > Z_BEST_COMPRESSION) {
@@ -1882,6 +1908,24 @@ int parse_arguments(int *argc_p, const char ***argv_p)
 		}
 #endif
 	}
+#ifdef HAVE_LIBZSTD
+	if (do_compression_zstd) {
+
+		if (def_compress_level == NOT_SPECIFIED)
+			def_compress_level = 0;
+		else if (def_compress_level < ZSTD_minCLevel() ||
+			 def_compress_level > ZSTD_maxCLevel()) {
+			snprintf(err_buf, sizeof err_buf, "--compress-level must %d <= x <= %d for ZSTD\n",
+				 ZSTD_minCLevel(), ZSTD_maxCLevel());
+			return 0;
+		}
+
+		if (refused_compress) {
+			create_refuse_error(refused_compress);
+			return 0;
+		}
+	}
+#endif
 
 #ifdef HAVE_SETVBUF
 	if (outbuf_mode && !am_server) {
@@ -2503,6 +2547,8 @@ void server_options(char **args, int *argc_p)
 		argstr[x++] = 'S';
 	if (do_compression == 1)
 		argstr[x++] = 'z';
+	if (do_compression_zstd == 1)
+		argstr[x++] = 'Z';
 
 	set_allow_inc_recurse();
 
@@ -2577,6 +2623,12 @@ void server_options(char **args, int *argc_p)
 		args[ac++] = arg;
 	}
 
+	if (do_compression_zstd && def_compress_level != Z_DEFAULT_COMPRESSION) {
+		if (asprintf(&arg, "--compress-level=%d", def_compress_level) < 0)
+			goto oom;
+		args[ac++] = arg;
+	}
+
 	if (preserve_devices) {
 		/* Note: sending "--devices" would not be backward-compatible. */
 		if (!preserve_specials)
diff --git a/rsync.yo b/rsync.yo
index 207d487eb11fb..0832a5b2b1b6d 100644
--- a/rsync.yo
+++ b/rsync.yo
@@ -427,6 +427,7 @@ to the detailed description below for a complete description.  verb(
      --copy-dest=DIR         ... and include copies of unchanged files
      --link-dest=DIR         hardlink to files in DIR when unchanged
  -z, --compress              compress file data during the transfer
+ -Z, --zstd-compress         compress file data during the transfer with zstd
      --compress-level=NUM    explicitly set compression level
      --skip-compress=LIST    skip compressing files with suffix in LIST
  -C, --cvs-exclude           auto-ignore files in the same way CVS does
@@ -1981,12 +1982,19 @@ the server is not new enough to support bf(-zz).  Rsync also accepts the
 bf(--old-compress) option for a future time when new-style compression
 becomes the default.
 
+dit(bf(-Z, --zstd-compress)) With this option, rsync compresses the file data
+as it is sent to the destination machine with the ZSTD compression algorithm.
+The zlib related options bf(--new-compress) and bf(--old-compress) are not
+supported. Since the ZSTD compression is very fast it is also usefull on fast
+connections because the CPU won't be the bottle here.
+
 See the bf(--skip-compress) option for the default list of file suffixes
 that will not be compressed.
 
 dit(bf(--compress-level=NUM)) Explicitly set the compression level to use
 (see bf(--compress)) instead of letting it default.  If NUM is non-zero,
-the bf(--compress) option is implied.
+the bf(--compress) option is implied unless (bf(-Z, --zstd-compress) was set.
+With ZSTD enabled, this options sets the compression level for ZSTD.
 
 dit(bf(--skip-compress=LIST)) Override the list of file suffixes that will
 not be compressed.  The bf(LIST) should be one or more file suffixes
diff --git a/token.c b/token.c
index 0a5ed73503d75..3604f7d440f03 100644
--- a/token.c
+++ b/token.c
@@ -22,8 +22,10 @@
 #include "rsync.h"
 #include "itypes.h"
 #include <zlib.h>
+#include <zstd.h>
 
 extern int do_compression;
+extern int do_compression_zstd;
 extern int protocol_version;
 extern int module_id;
 extern int def_compress_level;
@@ -624,6 +626,244 @@ static void see_deflate_token(char *buf, int32 len)
 #endif
 }
 
+#ifdef HAVE_LIBZSTD
+
+static ZSTD_inBuffer zstd_in_buff;
+static ZSTD_outBuffer zstd_out_buff;
+static ZSTD_CCtx *zstd_cctx;
+
+static void send_zstd_token(int f, int32 token, struct map_struct *buf,
+			    OFF_T offset, int32 nb)
+{
+	static int comp_init_done, flush_pending;
+	ZSTD_EndDirective flush = ZSTD_e_continue;
+	int32 n, r;
+
+	/* initialization */
+	if (!comp_init_done) {
+
+		zstd_cctx = ZSTD_createCCtx();
+		if (!zstd_cctx) {
+			rprintf(FERROR, "compression init failed\n");
+			exit_cleanup(RERR_PROTOCOL);
+		}
+
+		obuf = new_array(char, MAX_DATA_COUNT + 2);
+		if (!obuf)
+			out_of_memory("send_deflated_token");
+
+		ZSTD_CCtx_setParameter(zstd_cctx, ZSTD_c_compressionLevel,
+				       def_compress_level);
+		zstd_out_buff.dst = obuf + 2;
+
+		comp_init_done = 1;
+	}
+
+	if (last_token == -1) {
+		last_run_end = 0;
+		run_start = token;
+		flush_pending = 0;
+	} else if (last_token == -2) {
+		run_start = token;
+
+	} else if (nb != 0 || token != last_token + 1
+		   || token >= run_start + 65536) {
+
+		/* output previous run */
+		r = run_start - last_run_end;
+		n = last_token - run_start;
+
+		if (r >= 0 && r <= 63) {
+			write_byte(f, (n==0? TOKEN_REL: TOKENRUN_REL) + r);
+		} else {
+			write_byte(f, (n==0? TOKEN_LONG: TOKENRUN_LONG));
+			write_int(f, run_start);
+		}
+		if (n != 0) {
+			write_byte(f, n);
+			write_byte(f, n >> 8);
+		}
+		last_run_end = last_token;
+		run_start = token;
+	}
+
+	last_token = token;
+
+	if (nb || flush_pending) {
+
+		zstd_in_buff.src = map_ptr(buf, offset, nb);
+		zstd_in_buff.size = nb;
+		zstd_in_buff.pos = 0;
+
+		do {
+			if (zstd_out_buff.size == 0) {
+				zstd_out_buff.size = MAX_DATA_COUNT;
+				zstd_out_buff.pos = 0;
+			}
+
+			/* File ended, flush */
+			if (token != -2)
+				flush = ZSTD_e_flush;
+
+			r = ZSTD_compressStream2(zstd_cctx, &zstd_out_buff, &zstd_in_buff, flush);
+			if (ZSTD_isError(r)) {
+				rprintf(FERROR, "ZSTD_compressStream returned %d\n", r);
+				exit_cleanup(RERR_STREAMIO);
+			}
+
+			/*
+			 * Nothing is sent if the buffer isn't full so avoid smaller
+			 * transfers. If a file is finished then we flush the internal
+			 * state and send a smaller buffer so that the remote side can
+			 * finish the file.
+			 */
+			if (zstd_out_buff.pos == zstd_out_buff.size || flush == ZSTD_e_flush) {
+				n = zstd_out_buff.pos;
+
+				obuf[0] = DEFLATED_DATA + (n >> 8);
+				obuf[1] = n;
+				write_buf(f, obuf, n+2);
+
+				zstd_out_buff.size = 0;
+			}
+			/*
+			 * Loop while the input buffer isn't full consumed or the
+			 * internal state isn't fully flushed.
+			 */
+		} while (zstd_in_buff.pos < zstd_in_buff.size || r > 0);
+		flush_pending = token == -2;
+	}
+
+	if (token == -1) {
+		/* end of file - clean up */
+		write_byte(f, END_FLAG);
+	}
+}
+
+static ZSTD_DCtx *zstd_dctx;
+
+static int32 recv_zstd_token(int f, char **data)
+{
+	static int decomp_init_done;
+	static int out_buffer_size;
+	int32 n, flag;
+	int r;
+
+	if (!decomp_init_done) {
+
+		zstd_dctx = ZSTD_createDCtx();
+		if (!zstd_dctx) {
+			rprintf(FERROR, "ZSTD_createDStream failed\n");
+			exit_cleanup(RERR_PROTOCOL);
+		}
+
+		/* Output buffer fits two decompressed blocks */
+		out_buffer_size = ZSTD_DStreamOutSize() * 2;
+		cbuf = new_array(char, MAX_DATA_COUNT);
+		dbuf = new_array(char, out_buffer_size);
+		if (!cbuf || !dbuf)
+			out_of_memory("recv_zstd_token");
+
+		zstd_in_buff.src = cbuf;
+		zstd_out_buff.dst = dbuf;
+
+		decomp_init_done = 1;
+	}
+
+	do {
+	switch (recv_state) {
+	case r_init:
+		recv_state = r_idle;
+		rx_token = 0;
+		break;
+
+	case r_idle:
+		flag = read_byte(f);
+		if ((flag & 0xC0) == DEFLATED_DATA) {
+			n = ((flag & 0x3f) << 8) + read_byte(f);
+			read_buf(f, cbuf, n);
+
+			zstd_in_buff.size = n;
+			zstd_in_buff.pos = 0;
+
+			recv_state = r_inflating;
+
+		} else if (flag == END_FLAG) {
+			/* that's all folks */
+			recv_state = r_init;
+			return 0;
+
+		} else {
+			/* here we have a token of some kind */
+			if (flag & TOKEN_REL) {
+				rx_token += flag & 0x3f;
+				flag >>= 6;
+			} else
+				rx_token = read_int(f);
+			if (flag & 1) {
+				rx_run = read_byte(f);
+				rx_run += read_byte(f) << 8;
+				recv_state = r_running;
+			}
+			return -1 - rx_token;
+		}
+		break;
+
+	case r_inflating:
+		zstd_out_buff.size = out_buffer_size;
+		zstd_out_buff.pos = 0;
+
+		r = ZSTD_decompressStream(zstd_dctx, &zstd_out_buff, &zstd_in_buff);
+		n = zstd_out_buff.pos;
+		if (ZSTD_isError(r)) {
+			rprintf(FERROR, "ZSTD decomp returned %d (%d bytes)\n", r, n);
+			exit_cleanup(RERR_STREAMIO);
+		}
+
+		/*
+		 * If the input buffer is fully consumed and the output
+		 * buffer is not full then next step is to read more
+		 * data.
+		 */
+		if (zstd_in_buff.size == zstd_in_buff.pos && n < out_buffer_size)
+			recv_state = r_idle;
+
+		if (n != 0) {
+			*data = dbuf;
+			return n;
+		}
+		break;
+
+	case r_running:
+		++rx_token;
+		if (--rx_run == 0)
+			recv_state = r_idle;
+		return -1 - rx_token;
+		break;
+
+	case r_inflated:
+		break;
+	}
+	} while (1);
+}
+#else
+
+static int send_zstd_token(UNUSED(int f), UNUSED(int32 token),
+			   UNUSED(struct map_struct *buf),
+			   UNUSED(OFF_T offset), UNUSED(int32 nb))
+{
+	rprintf(FERROR, "ZSTD is not supported.\n");
+	exit_cleanup(RERR_STREAMIO);
+}
+
+static int32 recv_zstd_token(UNUSED(int f), UNUSED(char **data))
+{
+	rprintf(FERROR, "ZSTD is not supported.\n");
+	exit_cleanup(RERR_STREAMIO);
+}
+#endif
+
+
 /**
  * Transmit a verbatim buffer of length @p n followed by a token.
  * If token == -1 then we have reached EOF
@@ -632,7 +872,9 @@ static void see_deflate_token(char *buf, int32 len)
 void send_token(int f, int32 token, struct map_struct *buf, OFF_T offset,
 		int32 n, int32 toklen)
 {
-	if (!do_compression)
+	if (do_compression_zstd)
+		send_zstd_token(f, token, buf, offset, n);
+	else if (!do_compression)
 		simple_send_token(f, token, buf, offset, n);
 	else
 		send_deflated_token(f, token, buf, offset, n, toklen);
@@ -648,7 +890,9 @@ int32 recv_token(int f, char **data)
 {
 	int tok;
 
-	if (!do_compression) {
+	if (do_compression_zstd) {
+		tok = recv_zstd_token(f, data);
+	} else if (!do_compression) {
 		tok = simple_recv_token(f,data);
 	} else {
 		tok = recv_deflated_token(f, data);
-- 
2.25.0




More information about the rsync mailing list