Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async RandomAccessReader and zips within zips #100

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 137 additions & 57 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ exports.validateFileName = validateFileName;
exports.ZipFile = ZipFile;
exports.Entry = Entry;
exports.RandomAccessReader = RandomAccessReader;
exports.AsyncRandomAccessReader = AsyncRandomAccessReader;
exports.StoredEntryAsyncRandomAccessReader = StoredEntryAsyncRandomAccessReader;

function open(path, options, callback) {
if (typeof options === "function") {
Expand Down Expand Up @@ -193,6 +195,7 @@ function ZipFile(reader, centralDirectoryOffset, fileSize, entryCount, comment,
var self = this;
EventEmitter.call(self);
self.reader = reader;
self.isReaderAsync = reader instanceof AsyncRandomAccessReader;
// forward close events
self.reader.on("error", function(err) {
// error closing the fd
Expand Down Expand Up @@ -534,47 +537,65 @@ ZipFile.prototype.openReadStream = function(entry, options, callback) {
fileDataStart + " + " + entry.compressedSize + " > " + self.fileSize));
}
}
var readStream = self.reader.createReadStream({
start: fileDataStart + relativeStart,
end: fileDataStart + relativeEnd,
});
var endpointStream = readStream;
if (decompress) {
var destroyed = false;
var inflateFilter = zlib.createInflateRaw();
readStream.on("error", function(err) {
// setImmediate here because errors can be emitted during the first call to pipe()
setImmediate(function() {
if (!destroyed) inflateFilter.emit("error", err);
});
});
readStream.pipe(inflateFilter);

if (self.validateEntrySizes) {
endpointStream = new AssertByteCountStream(entry.uncompressedSize);
inflateFilter.on("error", function(err) {
// forward zlib errors to the client-visible stream
var readStreamCallback = function(err, readStream) {
if (err) {
return callback(err);
}
var endpointStream = readStream;
if (decompress) {
var destroyed = false;
var inflateFilter = zlib.createInflateRaw();
readStream.on("error", function(err) {
// setImmediate here because errors can be emitted during the first call to pipe()
setImmediate(function() {
if (!destroyed) endpointStream.emit("error", err);
if (!destroyed) inflateFilter.emit("error", err);
});
});
inflateFilter.pipe(endpointStream);
} else {
// the zlib filter is the client-visible stream
endpointStream = inflateFilter;
readStream.pipe(inflateFilter);

if (self.validateEntrySizes) {
endpointStream = new AssertByteCountStream(entry.uncompressedSize);
inflateFilter.on("error", function(err) {
// forward zlib errors to the client-visible stream
setImmediate(function() {
if (!destroyed) endpointStream.emit("error", err);
});
});
inflateFilter.pipe(endpointStream);
} else {
// the zlib filter is the client-visible stream
endpointStream = inflateFilter;
}
// this is part of yauzl's API, so implement this function on the client-visible stream
endpointStream.destroy = function() {
destroyed = true;
if (inflateFilter !== endpointStream) inflateFilter.unpipe(endpointStream);
readStream.unpipe(inflateFilter);
// TODO: the inflateFilter may cause a memory leak. see Issue #27.
readStream.destroy();
};
}
// this is part of yauzl's API, so implement this function on the client-visible stream
endpointStream.destroy = function() {
destroyed = true;
if (inflateFilter !== endpointStream) inflateFilter.unpipe(endpointStream);
readStream.unpipe(inflateFilter);
// TODO: the inflateFilter may cause a memory leak. see Issue #27.
readStream.destroy();
};
callback(null, endpointStream);
};
var createReadStreamOptions = {
start: fileDataStart + relativeStart,
end: fileDataStart + relativeEnd,
};
if (self.isReaderAsync) {
self.reader.createReadStream(createReadStreamOptions, function(err, readStream) {
try {
readStreamCallback(err, readStream);
} finally {
self.reader.unref();
}
});
} else {
readStreamCallback(null, self.reader.createReadStream(createReadStreamOptions));
}
callback(null, endpointStream);
} finally {
self.reader.unref();
if (!self.isReaderAsync) {
brucehappy marked this conversation as resolved.
Show resolved Hide resolved
self.reader.unref();
}
}
});
};
Expand Down Expand Up @@ -654,15 +675,15 @@ AssertByteCountStream.prototype._flush = function(cb) {
cb();
};

util.inherits(RandomAccessReader, EventEmitter);
function RandomAccessReader() {
util.inherits(BaseRandomAccessReader, EventEmitter);
function BaseRandomAccessReader() {
EventEmitter.call(this);
this.refCount = 0;
}
RandomAccessReader.prototype.ref = function() {
BaseRandomAccessReader.prototype.ref = function() {
this.refCount += 1;
};
RandomAccessReader.prototype.unref = function() {
BaseRandomAccessReader.prototype.unref = function() {
var self = this;
self.refCount -= 1;

Expand All @@ -676,18 +697,14 @@ RandomAccessReader.prototype.unref = function() {
self.emit('close');
}
};
RandomAccessReader.prototype.createReadStream = function(options) {
var start = options.start;
var end = options.end;
if (start === end) {
var emptyStream = new PassThrough();
setImmediate(function() {
emptyStream.end();
});
return emptyStream;
}
var stream = this._readStreamForRange(start, end);

BaseRandomAccessReader.prototype._createEmptyReadStream = function() {
var emptyStream = new PassThrough();
setImmediate(function() {
emptyStream.end();
});
return emptyStream;
};
BaseRandomAccessReader.prototype._setupReadStream = function(stream, start, end) {
var destroyed = false;
var refUnrefFilter = new RefUnrefFilter(this);
stream.on("error", function(err) {
Expand Down Expand Up @@ -715,11 +732,7 @@ RandomAccessReader.prototype.createReadStream = function(options) {

return stream.pipe(refUnrefFilter).pipe(byteCounter);
};
RandomAccessReader.prototype._readStreamForRange = function(start, end) {
throw new Error("not implemented");
};
RandomAccessReader.prototype.read = function(buffer, offset, length, position, callback) {
var readStream = this.createReadStream({start: position, end: position + length});
BaseRandomAccessReader.prototype._performRead = function(readStream, buffer, offset, callback) {
var writeStream = new Writable();
var written = 0;
writeStream._write = function(chunk, encoding, cb) {
Expand All @@ -733,10 +746,77 @@ RandomAccessReader.prototype.read = function(buffer, offset, length, position, c
});
readStream.pipe(writeStream);
};
RandomAccessReader.prototype.close = function(callback) {
BaseRandomAccessReader.prototype.close = function(callback) {
setImmediate(callback);
};

util.inherits(RandomAccessReader, BaseRandomAccessReader);
function RandomAccessReader() {
BaseRandomAccessReader.call(this);
}
RandomAccessReader.prototype.createReadStream = function(options) {
var start = options.start;
var end = options.end;
if (start === end) {
return this._createEmptyReadStream();
} else {
return this._setupReadStream(this._readStreamForRange(start, end), start, end);
}
};
RandomAccessReader.prototype._readStreamForRange = function(start, end) {
throw new Error("not implemented");
};
RandomAccessReader.prototype.read = function(buffer, offset, length, position, callback) {
this._performRead(this.createReadStream({start: position, end: position + length}), buffer, offset, callback);
};

util.inherits(AsyncRandomAccessReader, BaseRandomAccessReader);
function AsyncRandomAccessReader() {
BaseRandomAccessReader.call(this);
}
AsyncRandomAccessReader.prototype.createReadStream = function(options, callback) {
var self = this;
var start = options.start;
var end = options.end;
if (start === end) {
return callback(null, this._createEmptyReadStream());
} else {
this._readStreamForRange(start, end, function(err, readStream) {
if (err) {
return callback(err);
}
return callback(null, self._setupReadStream(readStream, start, end));
});
}
};
AsyncRandomAccessReader.prototype._readStreamForRange = function(start, end, callback) {
return callback(new Error("not implemented"));
};
AsyncRandomAccessReader.prototype.read = function(buffer, offset, length, position, callback) {
var self = this;
this.createReadStream({start: position, end: position + length}, function(err, readStream) {
if (err) {
return callback(err);
}
self._performRead(readStream, buffer, offset, callback);
});
};

util.inherits(StoredEntryAsyncRandomAccessReader, AsyncRandomAccessReader);
function StoredEntryAsyncRandomAccessReader(zipfile, entry) {
AsyncRandomAccessReader.call(this);
this.zipfile = zipfile;
this.entry = entry;
};
StoredEntryAsyncRandomAccessReader.prototype._readStreamForRange = function(start, end, callback) {
if (this.entry.isCompressed()) {
return callback(new Error('Cannot read from compressed entry'));
} else if (this.entry.isEncrypted()) {
return callback(new Error('Cannot read from encrypted entry'));
}
return this.zipfile.openReadStream(this.entry, {start: start, end: end}, callback);
};

util.inherits(RefUnrefFilter, PassThrough);
function RefUnrefFilter(context) {
PassThrough.call(this);
Expand Down
4 changes: 4 additions & 0 deletions test/test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
var yauzl = require("../");
var zip64 = require("./zip64");
var rangeTest = require("./range-test");
var zipInZipTest = require("./zip-in-zip-test");
var fs = require("fs");
var path = require("path");
var Pend = require("pend");
Expand Down Expand Up @@ -347,6 +348,9 @@ pend.go(zip64.runTest);
// openReadStream with range
pend.go(rangeTest.runTest);

// openReadStream with range for files in a zip that is in another zip
pend.go(zipInZipTest.runTest);

pend.wait(function() {
// if you don't see this, something never happened.
console.log("done");
Expand Down
Loading